Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::sync::Arc;
20
21
use async_trait::async_trait;
22
use aws_config::default_provider::credentials;
23
use aws_config::provider_config::ProviderConfig;
24
use aws_config::{AppName, BehaviorVersion};
25
use aws_sdk_s3::Client;
26
use aws_sdk_s3::config::Region;
27
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
28
use aws_sdk_s3::operation::get_object::GetObjectError;
29
use aws_sdk_s3::operation::head_object::HeadObjectError;
30
use aws_sdk_s3::primitives::ByteStream; // SdkBody
31
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
32
use aws_smithy_types::body::SdkBody;
33
use futures::future::FusedFuture;
34
use futures::stream::{FuturesUnordered, unfold};
35
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36
use nativelink_config::stores::ExperimentalAwsSpec;
37
// Note: S3 store should be very careful about the error codes it returns
38
// when in a retryable wrapper. Always prefer Code::Aborted or another
39
// retryable code over Code::InvalidArgument or make_input_err!().
40
// ie: Don't import make_input_err!() to help prevent this.
41
use nativelink_error::{Code, Error, ResultExt, make_err};
42
use nativelink_metric::MetricsComponent;
43
use nativelink_util::buf_channel::{
44
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
45
};
46
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
47
use nativelink_util::instant_wrapper::InstantWrapper;
48
use nativelink_util::retry::{Retrier, RetryResult};
49
use nativelink_util::store_trait::{
50
    RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo,
51
};
52
use parking_lot::Mutex;
53
use tokio::sync::mpsc;
54
use tokio::time::sleep;
55
use tracing::{error, info};
56
57
use crate::cas_utils::is_zero_digest;
58
use crate::common_s3_utils::{BodyWrapper, TlsClient};
59
60
// S3 parts cannot be smaller than this number. See:
61
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
62
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
63
64
// S3 parts cannot be larger than this number. See:
65
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
66
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
67
68
// S3 parts cannot be more than this number. See:
69
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
70
const MAX_UPLOAD_PARTS: usize = 10_000;
71
72
// Default max buffer size for retrying upload requests.
73
// Note: If you change this, adjust the docs in the config.
74
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
75
76
// Default limit for concurrent part uploads per multipart upload.
77
// Note: If you change this, adjust the docs in the config.
78
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
79
80
#[derive(Debug, MetricsComponent)]
81
pub struct S3Store<NowFn> {
82
    s3_client: Arc<Client>,
83
    now_fn: NowFn,
84
    #[metric(help = "The bucket name for the S3 store")]
85
    bucket: String,
86
    #[metric(help = "The key prefix for the S3 store")]
87
    key_prefix: String,
88
    retrier: Retrier,
89
    #[metric(help = "The number of seconds to consider an object expired")]
90
    consider_expired_after_s: i64,
91
    #[metric(help = "The number of bytes to buffer for retrying requests")]
92
    max_retry_buffer_per_request: usize,
93
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
94
    multipart_max_concurrent_uploads: usize,
95
96
    remove_callbacks: Mutex<Vec<Arc<dyn RemoveItemCallback>>>,
97
}
98
99
impl<I, NowFn> S3Store<NowFn>
100
where
101
    I: InstantWrapper,
102
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
103
{
104
0
    pub async fn new(spec: &ExperimentalAwsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
105
0
        let jitter_fn = spec.common.retry.make_jitter_fn();
106
0
        let s3_client = {
107
0
            let http_client = TlsClient::new(&spec.common.clone());
108
109
0
            let credential_provider = credentials::DefaultCredentialsChain::builder()
110
0
                .configure(
111
0
                    ProviderConfig::without_region()
112
0
                        .with_region(Some(Region::new(Cow::Owned(spec.region.clone()))))
113
0
                        .with_http_client(http_client.clone()),
114
0
                )
115
0
                .build()
116
0
                .await;
117
118
0
            let config = aws_config::defaults(BehaviorVersion::v2025_08_07())
119
0
                .credentials_provider(credential_provider)
120
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
121
0
                .timeout_config(
122
0
                    aws_config::timeout::TimeoutConfig::builder()
123
0
                        .connect_timeout(Duration::from_secs(15))
124
0
                        .build(),
125
0
                )
126
0
                .region(Region::new(Cow::Owned(spec.region.clone())))
127
0
                .http_client(http_client)
128
0
                .load()
129
0
                .await;
130
131
0
            Client::new(&config)
132
        };
133
0
        Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn)
134
0
    }
135
136
12
    pub fn new_with_client_and_jitter(
137
12
        spec: &ExperimentalAwsSpec,
138
12
        s3_client: Client,
139
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
140
12
        now_fn: NowFn,
141
12
    ) -> Result<Arc<Self>, Error> {
142
12
        Ok(Arc::new(Self {
143
12
            s3_client: Arc::new(s3_client),
144
12
            now_fn,
145
12
            bucket: spec.bucket.to_string(),
146
12
            key_prefix: spec
147
12
                .common
148
12
                .key_prefix
149
12
                .as_ref()
150
12
                .unwrap_or(&String::new())
151
12
                .clone(),
152
12
            retrier: Retrier::new(
153
12
                Arc::new(|duration| 
Box::pin6
(
sleep6
(
duration6
))),
154
12
                jitter_fn,
155
12
                spec.common.retry.clone(),
156
            ),
157
12
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
158
12
            max_retry_buffer_per_request: spec
159
12
                .common
160
12
                .max_retry_buffer_per_request
161
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
162
12
            multipart_max_concurrent_uploads: spec
163
12
                .common
164
12
                .multipart_max_concurrent_uploads
165
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| v),
166
12
            remove_callbacks: Mutex::new(Vec::new()),
167
        }))
168
12
    }
169
170
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
171
14
        format!("{}{}", self.key_prefix, key.as_str(),)
172
14
    }
173
174
5
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
175
5
        let digest_clone = digest.into_owned();
176
5
        self.retrier
177
8
            .
retry5
(
unfold5
(
()5
, move |state| {
178
8
                let local_digest = digest_clone.clone();
179
8
                async move {
180
8
                    let result = self
181
8
                        .s3_client
182
8
                        .head_object()
183
8
                        .bucket(&self.bucket)
184
8
                        .key(self.make_s3_path(&local_digest))
185
8
                        .send()
186
8
                        .await;
187
188
8
                    match result {
189
4
                        Ok(head_object_output) => {
190
4
                            if self.consider_expired_after_s != 0 {
  Branch (190:32): [True: 0, False: 0]
  Branch (190:32): [Folded - Ignored]
  Branch (190:32): [True: 2, False: 2]
191
2
                                if let Some(last_modified) = head_object_output.last_modified {
  Branch (191:40): [True: 0, False: 0]
  Branch (191:40): [Folded - Ignored]
  Branch (191:40): [True: 2, False: 0]
192
2
                                    let now_s = (self.now_fn)().unix_timestamp() as i64;
193
2
                                    if last_modified.secs() + self.consider_expired_after_s <= now_s
  Branch (193:40): [True: 0, False: 0]
  Branch (193:40): [Folded - Ignored]
  Branch (193:40): [True: 1, False: 1]
194
                                    {
195
1
                                        let remove_callbacks = self.remove_callbacks.lock().clone();
196
1
                                        let mut callbacks: FuturesUnordered<_> = remove_callbacks
197
1
                                            .iter()
198
1
                                            .map(|callback| 
{0
199
0
                                                callback.callback(local_digest.borrow())
200
0
                                            })
201
1
                                            .collect();
202
1
                                        while callbacks.next().await.is_some() 
{}0
  Branch (202:47): [True: 0, False: 0]
  Branch (202:47): [Folded - Ignored]
  Branch (202:47): [True: 0, False: 1]
203
1
                                        return Some((RetryResult::Ok(None), state));
204
1
                                    }
205
0
                                }
206
2
                            }
207
3
                            let Some(length) = head_object_output.content_length else {
  Branch (207:33): [True: 0, False: 0]
  Branch (207:33): [Folded - Ignored]
  Branch (207:33): [True: 3, False: 0]
208
0
                                return Some((RetryResult::Ok(None), state));
209
                            };
210
3
                            if length >= 0 {
  Branch (210:32): [True: 0, False: 0]
  Branch (210:32): [Folded - Ignored]
  Branch (210:32): [True: 3, False: 0]
211
3
                                return Some((RetryResult::Ok(Some(length as u64)), state));
212
0
                            }
213
0
                            Some((
214
0
                                RetryResult::Err(make_err!(
215
0
                                    Code::InvalidArgument,
216
0
                                    "Negative content length in S3: {length:?}",
217
0
                                )),
218
0
                                state,
219
0
                            ))
220
                        }
221
4
                        Err(sdk_error) => match sdk_error.into_service_error() {
222
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
223
3
                            other => Some((
224
3
                                RetryResult::Retry(make_err!(
225
3
                                    Code::Unavailable,
226
3
                                    "Unhandled HeadObjectError in S3: {other:?}"
227
3
                                )),
228
3
                                state,
229
3
                            )),
230
                        },
231
                    }
232
8
                }
233
8
            }))
234
5
            .await
235
5
    }
236
}
237
238
#[async_trait]
239
impl<I, NowFn> StoreDriver for S3Store<NowFn>
240
where
241
    I: InstantWrapper,
242
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
243
{
244
    async fn has_with_results(
245
        self: Pin<&Self>,
246
        keys: &[StoreKey<'_>],
247
        results: &mut [Option<u64>],
248
6
    ) -> Result<(), Error> {
249
        keys.iter()
250
            .zip(results.iter_mut())
251
6
            .map(|(key, result)| async move {
252
                // We need to do a special pass to ensure our zero key exist.
253
6
                if is_zero_digest(key.borrow()) {
  Branch (253:20): [True: 0, False: 0]
  Branch (253:20): [Folded - Ignored]
  Branch (253:20): [True: 1, False: 5]
254
1
                    *result = Some(0);
255
1
                    return Ok::<_, Error>(());
256
5
                }
257
5
                *result = self.has(key.borrow()).await
?0
;
258
5
                Ok::<_, Error>(())
259
12
            })
260
            .collect::<FuturesUnordered<_>>()
261
            .try_collect()
262
            .await
263
6
    }
264
265
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
266
0
        matches!(optimization, StoreOptimizations::LazyExistenceOnSync)
267
0
    }
268
269
    async fn update(
270
        self: Pin<&Self>,
271
        digest: StoreKey<'_>,
272
        mut reader: DropCloserReadHalf,
273
        upload_size: UploadSizeInfo,
274
2
    ) -> Result<(), Error> {
275
        let s3_path = &self.make_s3_path(&digest);
276
277
        let max_size = match upload_size {
278
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
279
        };
280
281
        // Note(aaronmondal) It might be more optimal to use a different
282
        // heuristic here, but for simplicity we use a hard coded value.
283
        // Anything going down this if-statement will have the advantage of only
284
        // 1 network request for the upload instead of minimum of 3 required for
285
        // multipart upload requests.
286
        //
287
        // Note(aaronmondal) If the upload size is not known, we go down the multipart upload path.
288
        // This is not very efficient, but it greatly reduces the complexity of the code.
289
        if max_size < MIN_MULTIPART_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) {
290
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
291
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
292
            };
293
            reader.set_max_recent_data_size(
294
                u64::try_from(self.max_retry_buffer_per_request)
295
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
296
            );
297
            return self
298
                .retrier
299
1
                .retry(unfold(reader, move |mut reader| async move {
300
                    // We need to make a new pair here because the aws sdk does not give us
301
                    // back the body after we send it in order to retry.
302
1
                    let (mut tx, rx) = make_buf_channel_pair();
303
304
                    // Upload the data to the S3 backend.
305
1
                    let result = {
306
1
                        let reader_ref = &mut reader;
307
1
                        let (upload_res, bind_res) = tokio::join!(
308
1
                            self.s3_client
309
1
                                .put_object()
310
1
                                .bucket(&self.bucket)
311
1
                                .key(s3_path.clone())
312
1
                                .content_length(sz as i64)
313
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
314
1
                                    reader: rx,
315
1
                                    size: sz,
316
1
                                }))
317
1
                                .send()
318
1
                                .map_ok_or_else(|e| Err(
make_err!0
(
Code::Aborted0
, "{e:?}")), |_| Ok(())),
319
                            // Stream all data from the reader channel to the writer channel.
320
1
                            tx.bind_buffered(reader_ref)
321
                        );
322
1
                        upload_res
323
1
                            .merge(bind_res)
324
1
                            .err_tip(|| "Failed to upload file to s3 in single chunk")
325
                    };
326
327
                    // If we failed to upload the file, check to see if we can retry.
328
1
                    let retry_result = result.map_or_else(|mut err| 
{0
329
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
330
0
                        err.code = Code::Aborted;
331
0
                        let bytes_received = reader.get_bytes_received();
332
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (332:32): [True: 0, False: 0]
  Branch (332:32): [Folded - Ignored]
  Branch (332:32): [True: 0, False: 0]
333
0
                            error!(
334
                                ?bytes_received,
335
                                err = ?try_reset_err,
336
0
                                "Unable to reset stream after failed upload in S3Store::update"
337
                            );
338
0
                            return RetryResult::Err(err
339
0
                                .merge(try_reset_err)
340
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
341
0
                        }
342
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
343
0
                        info!(
344
                            ?err,
345
                            ?bytes_received,
346
0
                            "Retryable S3 error"
347
                        );
348
0
                        RetryResult::Retry(err)
349
1
                    
}0
, |()| RetryResult::Ok(()));
350
1
                    Some((retry_result, reader))
351
2
                }))
352
                .await;
353
        }
354
355
        let upload_id = &self
356
            .retrier
357
1
            .retry(unfold((), move |()| async move {
358
1
                let retry_result = self
359
1
                    .s3_client
360
1
                    .create_multipart_upload()
361
1
                    .bucket(&self.bucket)
362
1
                    .key(s3_path)
363
1
                    .send()
364
1
                    .await
365
1
                    .map_or_else(
366
0
                        |e| {
367
0
                            RetryResult::Retry(make_err!(
368
0
                                Code::Aborted,
369
0
                                "Failed to create multipart upload to s3: {e:?}"
370
0
                            ))
371
0
                        },
372
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
373
1
                            upload_id.map_or_else(
374
0
                                || {
375
0
                                    RetryResult::Err(make_err!(
376
0
                                        Code::Internal,
377
0
                                        "Expected upload_id to be set by s3 response"
378
0
                                    ))
379
0
                                },
380
                                RetryResult::Ok,
381
                            )
382
1
                        },
383
                    );
384
1
                Some((retry_result, ()))
385
2
            }))
386
            .await?;
387
388
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
389
        // 5mb (except last part) and can have up to 10,000 parts.
390
        let bytes_per_upload_part =
391
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
392
393
1
        let upload_parts = move || async move {
394
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
395
            // bytes in memory at any given time waiting to be uploaded.
396
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
397
398
1
            let read_stream_fut = async move {
399
1
                let retrier = &Pin::get_ref(self).retrier;
400
                // Note: Our break condition is when we reach EOF.
401
4
                for part_number in 1..i32::MAX {
402
4
                    let write_buf = reader
403
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(
404
                            || "Could not convert bytes_per_upload_part to usize",
405
0
                        )?))
406
4
                        .await
407
4
                        .err_tip(|| "Failed to read chunk in s3_store")
?0
;
408
4
                    if write_buf.is_empty() {
  Branch (408:24): [True: 0, False: 0]
  Branch (408:24): [Folded - Ignored]
  Branch (408:24): [True: 1, False: 3]
409
1
                        break; // Reached EOF.
410
3
                    }
411
412
3
                    tx.send(retrier.retry(unfold(write_buf, move |write_buf| {
413
3
                        async move {
414
3
                            let retry_result = self
415
3
                                .s3_client
416
3
                                .upload_part()
417
3
                                .bucket(&self.bucket)
418
3
                                .key(s3_path)
419
3
                                .upload_id(upload_id)
420
3
                                .body(ByteStream::new(SdkBody::from(write_buf.clone())))
421
3
                                .part_number(part_number)
422
3
                                .send()
423
3
                                .await
424
3
                                .map_or_else(
425
0
                                    |e| {
426
0
                                        RetryResult::Retry(make_err!(
427
0
                                            Code::Aborted,
428
0
                                            "Failed to upload part {part_number} in S3 store: {e:?}"
429
0
                                        ))
430
0
                                    },
431
3
                                    |mut response| {
432
3
                                        RetryResult::Ok(
433
3
                                            CompletedPartBuilder::default()
434
3
                                                // Only set an entity tag if it exists. This saves
435
3
                                                // 13 bytes per part on the final request if it can
436
3
                                                // omit the `<ETAG><ETAG/>` string.
437
3
                                                .set_e_tag(response.e_tag.take())
438
3
                                                .part_number(part_number)
439
3
                                                .build(),
440
3
                                        )
441
3
                                    },
442
                                );
443
3
                            Some((retry_result, write_buf))
444
3
                        }
445
3
                    })))
446
3
                    .await
447
3
                    .map_err(|_| 
{0
448
0
                        make_err!(Code::Internal, "Failed to send part to channel in s3_store")
449
0
                    })?;
450
                }
451
1
                Result::<_, Error>::Ok(())
452
1
            }
453
1
            .fuse();
454
455
1
            let mut upload_futures = FuturesUnordered::new();
456
457
1
            let mut completed_parts = Vec::with_capacity(
458
1
                usize::try_from(cmp::min(
459
1
                    MAX_UPLOAD_PARTS as u64,
460
1
                    (max_size / bytes_per_upload_part) + 1,
461
                ))
462
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
463
            );
464
1
            tokio::pin!(read_stream_fut);
465
            loop {
466
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures2
.
is_empty2
() {
  Branch (466:20): [True: 0, False: 0]
  Branch (466:55): [True: 0, False: 0]
  Branch (466:72): [True: 0, False: 0]
  Branch (466:20): [Folded - Ignored]
  Branch (466:55): [Folded - Ignored]
  Branch (466:72): [Folded - Ignored]
  Branch (466:20): [True: 7, False: 1]
  Branch (466:55): [True: 2, False: 5]
  Branch (466:72): [True: 1, False: 1]
467
1
                    break; // No more data to process.
468
7
                }
469
7
                tokio::select! {
470
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
471
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
472
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
473
                }
474
            }
475
476
            // Even though the spec does not require parts to be sorted by number, we do it just in case
477
            // there's an S3 implementation that requires it.
478
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
479
480
1
            self.retrier
481
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
482
                    Some((
483
1
                        self.s3_client
484
1
                            .complete_multipart_upload()
485
1
                            .bucket(&self.bucket)
486
1
                            .key(s3_path)
487
1
                            .multipart_upload(
488
1
                                CompletedMultipartUploadBuilder::default()
489
1
                                    .set_parts(Some(completed_parts.clone()))
490
1
                                    .build(),
491
1
                            )
492
1
                            .upload_id(upload_id)
493
1
                            .send()
494
1
                            .await
495
1
                            .map_or_else(
496
0
                                |e| {
497
0
                                    RetryResult::Retry(make_err!(
498
0
                                        Code::Aborted,
499
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
500
0
                                    ))
501
0
                                },
502
1
                                |_| RetryResult::Ok(()),
503
                            ),
504
1
                        completed_parts,
505
                    ))
506
2
                }))
507
1
                .await
508
2
        };
509
        // Upload our parts and complete the multipart upload.
510
        // If we fail attempt to abort the multipart upload (cleanup).
511
        upload_parts()
512
0
            .or_else(move |e| async move {
513
0
                Result::<(), _>::Err(e).merge(
514
                    // Note: We don't retry here because this is just a best attempt.
515
0
                    self.s3_client
516
0
                        .abort_multipart_upload()
517
0
                        .bucket(&self.bucket)
518
0
                        .key(s3_path)
519
0
                        .upload_id(upload_id)
520
0
                        .send()
521
0
                        .await
522
0
                        .map_or_else(
523
0
                            |e| {
524
0
                                let err = make_err!(
525
0
                                    Code::Aborted,
526
                                    "Failed to abort multipart upload in S3 store : {e:?}"
527
                                );
528
0
                                info!(?err, "Multipart upload error");
529
0
                                Err(err)
530
0
                            },
531
0
                            |_| Ok(()),
532
                        ),
533
                )
534
0
            })
535
            .await
536
2
    }
537
538
    async fn get_part(
539
        self: Pin<&Self>,
540
        key: StoreKey<'_>,
541
        writer: &mut DropCloserWriteHalf,
542
        offset: u64,
543
        length: Option<u64>,
544
5
    ) -> Result<(), Error> {
545
        if is_zero_digest(key.borrow()) {
546
            writer
547
                .send_eof()
548
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
549
            return Ok(());
550
        }
551
552
        let s3_path = &self.make_s3_path(&key);
553
        let end_read_byte = length
554
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
555
            .err_tip(|| "Integer overflow protection triggered")?;
556
557
        self.retrier
558
7
            .retry(unfold(writer, move |writer| async move {
559
7
                let result = self
560
7
                    .s3_client
561
7
                    .get_object()
562
7
                    .bucket(&self.bucket)
563
7
                    .key(s3_path)
564
7
                    .range(format!(
565
7
                        "bytes={}-{}",
566
7
                        offset + writer.get_bytes_written(),
567
7
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
568
                    ))
569
7
                    .send()
570
7
                    .await;
571
572
7
                let 
mut s3_in_stream4
= match result {
573
4
                    Ok(head_object_output) => head_object_output.body,
574
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
575
0
                        GetObjectError::NoSuchKey(e) => {
576
0
                            return Some((
577
0
                                RetryResult::Err(make_err!(
578
0
                                    Code::NotFound,
579
0
                                    "No such key in S3: {e}"
580
0
                                )),
581
0
                                writer,
582
0
                            ));
583
                        }
584
3
                        other => {
585
3
                            return Some((
586
3
                                RetryResult::Retry(make_err!(
587
3
                                    Code::Unavailable,
588
3
                                    "Unhandled GetObjectError in S3: {other:?}",
589
3
                                )),
590
3
                                writer,
591
3
                            ));
592
                        }
593
                    },
594
                };
595
596
                // Copy data from s3 input stream to the writer stream.
597
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (597:27): [True: 0, False: 0]
  Branch (597:27): [Folded - Ignored]
  Branch (597:27): [True: 4, False: 4]
598
4
                    match maybe_bytes {
599
4
                        Ok(bytes) => {
600
4
                            if bytes.is_empty() {
  Branch (600:32): [True: 0, False: 0]
  Branch (600:32): [Folded - Ignored]
  Branch (600:32): [True: 1, False: 3]
601
                                // Ignore possible EOF. Different implementations of S3 may or may not
602
                                // send EOF this way.
603
1
                                continue;
604
3
                            }
605
3
                            if let Err(
e0
) = writer.send(bytes).await {
  Branch (605:36): [True: 0, False: 0]
  Branch (605:36): [Folded - Ignored]
  Branch (605:36): [True: 0, False: 3]
606
0
                                return Some((
607
0
                                    RetryResult::Err(make_err!(
608
0
                                        Code::Aborted,
609
0
                                        "Error sending bytes to consumer in S3: {e}"
610
0
                                    )),
611
0
                                    writer,
612
0
                                ));
613
3
                            }
614
                        }
615
0
                        Err(e) => {
616
0
                            return Some((
617
0
                                RetryResult::Retry(make_err!(
618
0
                                    Code::Aborted,
619
0
                                    "Bad bytestream element in S3: {e}"
620
0
                                )),
621
0
                                writer,
622
0
                            ));
623
                        }
624
                    }
625
                }
626
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (626:24): [True: 0, False: 0]
  Branch (626:24): [Folded - Ignored]
  Branch (626:24): [True: 0, False: 4]
627
0
                    return Some((
628
0
                        RetryResult::Err(make_err!(
629
0
                            Code::Aborted,
630
0
                            "Failed to send EOF to consumer in S3: {e}"
631
0
                        )),
632
0
                        writer,
633
0
                    ));
634
4
                }
635
4
                Some((RetryResult::Ok(()), writer))
636
14
            }))
637
            .await
638
5
    }
639
640
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
641
0
        self
642
0
    }
643
644
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
645
0
        self
646
0
    }
647
648
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
649
0
        self
650
0
    }
651
652
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
653
0
        registry.register_indicator(self);
654
0
    }
655
656
0
    fn register_remove_callback(
657
0
        self: Arc<Self>,
658
0
        callback: Arc<dyn RemoveItemCallback>,
659
0
    ) -> Result<(), Error> {
660
0
        self.remove_callbacks.lock().push(callback);
661
0
        Ok(())
662
0
    }
663
}
664
665
#[async_trait]
666
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
667
where
668
    I: InstantWrapper,
669
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
670
{
671
0
    fn get_name(&self) -> &'static str {
672
0
        "S3Store"
673
0
    }
674
675
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
676
        StoreDriver::check_health(Pin::new(self), namespace).await
677
0
    }
678
}