Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::future::Future;
17
use std::pin::Pin;
18
use std::sync::Arc;
19
use std::task::{Context, Poll};
20
use std::time::Duration;
21
use std::{cmp, env};
22
23
use async_trait::async_trait;
24
use aws_config::default_provider::credentials;
25
use aws_config::{AppName, BehaviorVersion};
26
use aws_sdk_s3::config::Region;
27
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
28
use aws_sdk_s3::operation::get_object::GetObjectError;
29
use aws_sdk_s3::operation::head_object::HeadObjectError;
30
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
31
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
32
use aws_sdk_s3::Client;
33
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
34
use bytes::Bytes;
35
use futures::future::FusedFuture;
36
use futures::stream::{unfold, FuturesUnordered};
37
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
38
use http_body::{Frame, SizeHint};
39
use hyper::client::connect::{Connected, Connection, HttpConnector};
40
use hyper::service::Service;
41
use hyper::Uri;
42
use hyper_rustls::{HttpsConnector, MaybeHttpsStream};
43
use nativelink_config::stores::S3Spec;
44
// Note: S3 store should be very careful about the error codes it returns
45
// when in a retryable wrapper. Always prefer Code::Aborted or another
46
// retryable code over Code::InvalidArgument or make_input_err!().
47
// ie: Don't import make_input_err!() to help prevent this.
48
use nativelink_error::{make_err, Code, Error, ResultExt};
49
use nativelink_metric::MetricsComponent;
50
use nativelink_util::buf_channel::{
51
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
52
};
53
use nativelink_util::fs;
54
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
55
use nativelink_util::instant_wrapper::InstantWrapper;
56
use nativelink_util::retry::{Retrier, RetryResult};
57
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
58
use rand::Rng;
59
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
60
use tokio::net::TcpStream;
61
use tokio::sync::{mpsc, SemaphorePermit};
62
use tokio::time::sleep;
63
use tracing::{event, Level};
64
65
use crate::cas_utils::is_zero_digest;
66
67
// S3 parts cannot be smaller than this number. See:
68
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
69
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
70
71
// S3 parts cannot be larger than this number. See:
72
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
73
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
74
75
// S3 parts cannot be more than this number. See:
76
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
77
const MAX_UPLOAD_PARTS: usize = 10_000;
78
79
// Default max buffer size for retrying upload requests.
80
// Note: If you change this, adjust the docs in the config.
81
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
82
83
// Default limit for concurrent part uploads per multipart upload.
84
// Note: If you change this, adjust the docs in the config.
85
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
86
87
pub struct ConnectionWithPermit<T: Connection + AsyncRead + AsyncWrite + Unpin> {
88
    connection: T,
89
    _permit: SemaphorePermit<'static>,
90
}
91
92
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> Connection for ConnectionWithPermit<T> {
93
0
    fn connected(&self) -> Connected {
94
0
        self.connection.connected()
95
0
    }
96
}
97
98
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> AsyncRead for ConnectionWithPermit<T> {
99
    #[inline]
100
0
    fn poll_read(
101
0
        self: Pin<&mut Self>,
102
0
        cx: &mut Context,
103
0
        buf: &mut ReadBuf<'_>,
104
0
    ) -> Poll<Result<(), tokio::io::Error>> {
105
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_read(cx, buf)
106
0
    }
107
}
108
109
impl<T: Connection + AsyncWrite + AsyncRead + Unpin> AsyncWrite for ConnectionWithPermit<T> {
110
    #[inline]
111
0
    fn poll_write(
112
0
        self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
        buf: &[u8],
115
0
    ) -> Poll<Result<usize, tokio::io::Error>> {
116
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_write(cx, buf)
117
0
    }
118
119
    #[inline]
120
0
    fn poll_flush(
121
0
        self: Pin<&mut Self>,
122
0
        cx: &mut Context<'_>,
123
0
    ) -> Poll<Result<(), tokio::io::Error>> {
124
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_flush(cx)
125
0
    }
126
127
    #[inline]
128
0
    fn poll_shutdown(
129
0
        self: Pin<&mut Self>,
130
0
        cx: &mut Context<'_>,
131
0
    ) -> Poll<Result<(), tokio::io::Error>> {
132
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_shutdown(cx)
133
0
    }
134
}
135
136
#[derive(Clone)]
137
pub struct TlsConnector {
138
    connector: HttpsConnector<HttpConnector>,
139
    retrier: Retrier,
140
}
141
142
impl TlsConnector {
143
    #[must_use]
144
0
    pub fn new(spec: &S3Spec, jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>) -> Self {
145
0
        let connector_with_roots = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots();
146
147
0
        let connector_with_schemes = if spec.insecure_allow_http {
  Branch (147:41): [True: 0, False: 0]
  Branch (147:41): [Folded - Ignored]
148
0
            connector_with_roots.https_or_http()
149
        } else {
150
0
            connector_with_roots.https_only()
151
        };
152
153
0
        let connector = if spec.disable_http2 {
  Branch (153:28): [True: 0, False: 0]
  Branch (153:28): [Folded - Ignored]
154
0
            connector_with_schemes.enable_http1().build()
155
        } else {
156
0
            connector_with_schemes.enable_http1().enable_http2().build()
157
        };
158
159
0
        Self {
160
0
            connector,
161
0
            retrier: Retrier::new(
162
0
                Arc::new(|duration| Box::pin(sleep(duration))),
163
0
                jitter_fn,
164
0
                spec.retry.clone(),
165
0
            ),
166
0
        }
167
0
    }
168
169
0
    async fn call_with_retry(
170
0
        &self,
171
0
        req: &Uri,
172
0
    ) -> Result<ConnectionWithPermit<MaybeHttpsStream<TcpStream>>, Error> {
173
0
        let retry_stream_fn = unfold(self.connector.clone(), move |mut connector| async move {
174
0
            let _permit = fs::get_permit().await.unwrap();
175
0
            match connector.call(req.clone()).await {
176
0
                Ok(connection) => Some((
177
0
                    RetryResult::Ok(ConnectionWithPermit {
178
0
                        connection,
179
0
                        _permit,
180
0
                    }),
181
0
                    connector,
182
0
                )),
183
0
                Err(e) => Some((
184
0
                    RetryResult::Retry(make_err!(
185
0
                        Code::Unavailable,
186
0
                        "Failed to call S3 connector: {e:?}"
187
0
                    )),
188
0
                    connector,
189
0
                )),
190
            }
191
0
        });
192
0
        self.retrier.retry(retry_stream_fn).await
193
0
    }
194
}
195
196
impl Service<Uri> for TlsConnector {
197
    type Response = ConnectionWithPermit<MaybeHttpsStream<TcpStream>>;
198
    type Error = Error;
199
    type Future =
200
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
201
202
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203
0
        self.connector
204
0
            .poll_ready(cx)
205
0
            .map_err(|e| make_err!(Code::Unavailable, "Failed poll in S3: {e}"))
206
0
    }
207
208
0
    fn call(&mut self, req: Uri) -> Self::Future {
209
0
        let connector_clone = self.clone();
210
0
        Box::pin(async move { connector_clone.call_with_retry(&req).await })
211
0
    }
212
}
213
214
pub struct BodyWrapper {
215
    reader: DropCloserReadHalf,
216
    size: u64,
217
}
218
219
impl http_body::Body for BodyWrapper {
220
    type Data = Bytes;
221
    type Error = std::io::Error;
222
223
76
    fn poll_frame(
224
76
        self: Pin<&mut Self>,
225
76
        cx: &mut Context<'_>,
226
76
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
227
76
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
228
76
        reader
229
76
            .poll_next(cx)
230
76
            .map(|maybe_bytes_res| 
maybe_bytes_res.map(51
|res|
res.map(Frame::data)50
)51
)
231
76
    }
232
233
2
    fn size_hint(&self) -> SizeHint {
234
2
        SizeHint::with_exact(self.size)
235
2
    }
236
}
237
238
#[derive(MetricsComponent)]
239
pub struct S3Store<NowFn> {
240
    s3_client: Arc<Client>,
241
    now_fn: NowFn,
242
    #[metric(help = "The bucket name for the S3 store")]
243
    bucket: String,
244
    #[metric(help = "The key prefix for the S3 store")]
245
    key_prefix: String,
246
    retrier: Retrier,
247
    #[metric(help = "The number of seconds to consider an object expired")]
248
    consider_expired_after_s: i64,
249
    #[metric(help = "The number of bytes to buffer for retrying requests")]
250
    max_retry_buffer_per_request: usize,
251
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
252
    multipart_max_concurrent_uploads: usize,
253
}
254
255
impl<I, NowFn> S3Store<NowFn>
256
where
257
    I: InstantWrapper,
258
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
259
{
260
0
    pub async fn new(spec: &S3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
261
0
        let jitter_amt = spec.retry.jitter;
262
0
        let jitter_fn = Arc::new(move |delay: Duration| {
263
0
            if jitter_amt == 0. {
  Branch (263:16): [True: 0, False: 0]
  Branch (263:16): [Folded - Ignored]
264
0
                return delay;
265
0
            }
266
0
            let min = 1. - (jitter_amt / 2.);
267
0
            let max = 1. + (jitter_amt / 2.);
268
0
            delay.mul_f32(rand::rng().random_range(min..max))
269
0
        });
270
0
        let s3_client = {
271
0
            let http_client =
272
0
                HyperClientBuilder::new().build(TlsConnector::new(spec, jitter_fn.clone()));
273
0
            let credential_provider = credentials::default_provider().await;
274
0
            let mut config_builder = aws_config::defaults(BehaviorVersion::v2024_03_28())
275
0
                .credentials_provider(credential_provider)
276
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
277
0
                .timeout_config(
278
0
                    aws_config::timeout::TimeoutConfig::builder()
279
0
                        .connect_timeout(Duration::from_secs(15))
280
0
                        .build(),
281
0
                )
282
0
                .region(Region::new(Cow::Owned(spec.region.clone())))
283
0
                .http_client(http_client);
284
            // TODO(allada) When aws-sdk supports this env variable we should be able
285
            // to remove this.
286
            // See: https://github.com/awslabs/aws-sdk-rust/issues/932
287
0
            if let Ok(endpoint_url) = env::var("AWS_ENDPOINT_URL") {
  Branch (287:20): [True: 0, False: 0]
  Branch (287:20): [Folded - Ignored]
288
0
                config_builder = config_builder.endpoint_url(endpoint_url);
289
0
            }
290
0
            aws_sdk_s3::Client::new(&config_builder.load().await)
291
        };
292
0
        Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn)
293
0
    }
294
295
12
    pub fn new_with_client_and_jitter(
296
12
        spec: &S3Spec,
297
12
        s3_client: Client,
298
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
299
12
        now_fn: NowFn,
300
12
    ) -> Result<Arc<Self>, Error> {
301
12
        Ok(Arc::new(Self {
302
12
            s3_client: Arc::new(s3_client),
303
12
            now_fn,
304
12
            bucket: spec.bucket.to_string(),
305
12
            key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
306
12
            retrier: Retrier::new(
307
12
                Arc::new(|duration| 
Box::pin(sleep(duration))6
),
308
12
                jitter_fn,
309
12
                spec.retry.clone(),
310
12
            ),
311
12
            consider_expired_after_s: i64::from(spec.consider_expired_after_s),
312
12
            max_retry_buffer_per_request: spec
313
12
                .max_retry_buffer_per_request
314
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
315
12
            multipart_max_concurrent_uploads: spec
316
12
                .multipart_max_concurrent_uploads
317
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| 
v0
),
318
12
        }))
319
12
    }
320
321
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
322
14
        format!("{}{}", self.key_prefix, key.as_str(),)
323
14
    }
324
325
5
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
326
5
        self.retrier
327
8
            .retry(unfold((), move |state| async move {
328
8
                let result = self
329
8
                    .s3_client
330
8
                    .head_object()
331
8
                    .bucket(&self.bucket)
332
8
                    .key(self.make_s3_path(&digest.borrow()))
333
8
                    .send()
334
8
                    .await;
335
336
8
                match result {
337
4
                    Ok(head_object_output) => {
338
4
                        if self.consider_expired_after_s != 0 {
  Branch (338:28): [True: 0, False: 0]
  Branch (338:28): [Folded - Ignored]
  Branch (338:28): [True: 2, False: 2]
339
2
                            if let Some(last_modified) = head_object_output.last_modified {
  Branch (339:36): [True: 0, False: 0]
  Branch (339:36): [Folded - Ignored]
  Branch (339:36): [True: 2, False: 0]
340
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
341
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
  Branch (341:36): [True: 0, False: 0]
  Branch (341:36): [Folded - Ignored]
  Branch (341:36): [True: 1, False: 1]
342
1
                                    return Some((RetryResult::Ok(None), state));
343
1
                                }
344
0
                            }
345
2
                        }
346
3
                        let Some(length) = head_object_output.content_length else {
  Branch (346:29): [True: 0, False: 0]
  Branch (346:29): [Folded - Ignored]
  Branch (346:29): [True: 3, False: 0]
347
0
                            return Some((RetryResult::Ok(None), state));
348
                        };
349
3
                        if length >= 0 {
  Branch (349:28): [True: 0, False: 0]
  Branch (349:28): [Folded - Ignored]
  Branch (349:28): [True: 3, False: 0]
350
3
                            return Some((RetryResult::Ok(Some(length as u64)), state));
351
0
                        }
352
0
                        Some((
353
0
                            RetryResult::Err(make_err!(
354
0
                                Code::InvalidArgument,
355
0
                                "Negative content length in S3: {length:?}",
356
0
                            )),
357
0
                            state,
358
0
                        ))
359
                    }
360
4
                    Err(sdk_error) => match sdk_error.into_service_error() {
361
1
                        HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
362
3
                        other => Some((
363
3
                            RetryResult::Retry(make_err!(
364
3
                                Code::Unavailable,
365
3
                                "Unhandled HeadObjectError in S3: {other:?}"
366
3
                            )),
367
3
                            state,
368
3
                        )),
369
                    },
370
                }
371
16
            }))
372
5
            .await
373
5
    }
374
}
375
376
#[async_trait]
377
impl<I, NowFn> StoreDriver for S3Store<NowFn>
378
where
379
    I: InstantWrapper,
380
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
381
{
382
    async fn has_with_results(
383
        self: Pin<&Self>,
384
        keys: &[StoreKey<'_>],
385
        results: &mut [Option<u64>],
386
6
    ) -> Result<(), Error> {
387
6
        keys.iter()
388
6
            .zip(results.iter_mut())
389
6
            .map(|(key, result)| async move {
390
6
                // We need to do a special pass to ensure our zero key exist.
391
6
                if is_zero_digest(key.borrow()) {
  Branch (391:20): [True: 0, False: 0]
  Branch (391:20): [Folded - Ignored]
  Branch (391:20): [True: 1, False: 5]
392
1
                    *result = Some(0);
393
1
                    return Ok::<_, Error>(());
394
5
                }
395
5
                *result = self.has(key).await
?0
;
396
5
                Ok::<_, Error>(())
397
12
            })
398
6
            .collect::<FuturesUnordered<_>>()
399
6
            .try_collect()
400
6
            .await
401
12
    }
402
403
    async fn update(
404
        self: Pin<&Self>,
405
        digest: StoreKey<'_>,
406
        mut reader: DropCloserReadHalf,
407
        upload_size: UploadSizeInfo,
408
2
    ) -> Result<(), Error> {
409
2
        let s3_path = &self.make_s3_path(&digest.borrow());
410
411
2
        let max_size = match upload_size {
412
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
413
2
        };
414
2
415
2
        // Note(allada) It might be more optimal to use a different
416
2
        // heuristic here, but for simplicity we use a hard coded value.
417
2
        // Anything going down this if-statement will have the advantage of only
418
2
        // 1 network request for the upload instead of minimum of 3 required for
419
2
        // multipart upload requests.
420
2
        //
421
2
        // Note(allada) If the upload size is not known, we go down the multipart upload path.
422
2
        // This is not very efficient, but it greatly reduces the complexity of the code.
423
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
upload_size1
, UploadSizeInfo::ExactSize(_)) {
  Branch (423:12): [True: 0, False: 0]
  Branch (423:12): [Folded - Ignored]
  Branch (423:12): [True: 1, False: 1]
424
1
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
  Branch (424:17): [True: 0, False: 0]
  Branch (424:17): [Folded - Ignored]
  Branch (424:17): [True: 1, False: 0]
425
0
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
426
            };
427
1
            reader.set_max_recent_data_size(
428
1
                u64::try_from(self.max_retry_buffer_per_request)
429
1
                    .err_tip(|| 
"Could not convert max_retry_buffer_per_request to u64"0
)
?0
,
430
            );
431
1
            return self
432
1
                .retrier
433
1
                .retry(unfold(reader, move |mut reader| async move {
434
1
                    // We need to make a new pair here because the aws sdk does not give us
435
1
                    // back the body after we send it in order to retry.
436
1
                    let (mut tx, rx) = make_buf_channel_pair();
437
438
                    // Upload the data to the S3 backend.
439
1
                    let result = {
440
1
                        let reader_ref = &mut reader;
441
1
                        let (upload_res, bind_res) = tokio::join!(
442
1
                            self.s3_client
443
1
                                .put_object()
444
1
                                .bucket(&self.bucket)
445
1
                                .key(s3_path.clone())
446
1
                                .content_length(sz as i64)
447
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
448
1
                                    reader: rx,
449
1
                                    size: sz,
450
1
                                }))
451
1
                                .send()
452
1
                                .map_ok_or_else(|e| 
Err(make_err!(Code::Aborted, "{e:?}"))0
, |_| Ok(())),
453
1
                            // Stream all data from the reader channel to the writer channel.
454
1
                            tx.bind_buffered(reader_ref)
455
1
                        );
456
1
                        upload_res
457
1
                            .merge(bind_res)
458
1
                            .err_tip(|| 
"Failed to upload file to s3 in single chunk"0
)
459
1
                    };
460
1
461
1
                    // If we failed to upload the file, check to see if we can retry.
462
1
                    let retry_result = result.map_or_else(|mut err| {
463
0
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
464
0
                        err.code = Code::Aborted;
465
0
                        let bytes_received = reader.get_bytes_received();
466
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (466:32): [True: 0, False: 0]
  Branch (466:32): [Folded - Ignored]
  Branch (466:32): [True: 0, False: 0]
467
0
                            event!(
468
0
                                Level::ERROR,
469
                                ?bytes_received,
470
                                err = ?try_reset_err,
471
0
                                "Unable to reset stream after failed upload in S3Store::update"
472
                            );
473
0
                            return RetryResult::Err(err
474
0
                                .merge(try_reset_err)
475
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
476
0
                        }
477
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
478
0
                        event!(
479
0
                            Level::INFO,
480
                            ?err,
481
                            ?bytes_received,
482
0
                            "Retryable S3 error"
483
                        );
484
0
                        RetryResult::Retry(err)
485
1
                    
}0
, |()| RetryResult::Ok(()));
486
1
                    Some((retry_result, reader))
487
1
                }))
488
1
                .await;
489
1
        }
490
491
1
        let upload_id = &self
492
1
            .retrier
493
1
            .retry(unfold((), move |()| async move {
494
1
                let retry_result = self
495
1
                    .s3_client
496
1
                    .create_multipart_upload()
497
1
                    .bucket(&self.bucket)
498
1
                    .key(s3_path)
499
1
                    .send()
500
1
                    .await
501
1
                    .map_or_else(
502
1
                        |e| {
503
0
                            RetryResult::Retry(make_err!(
504
0
                                Code::Aborted,
505
0
                                "Failed to create multipart upload to s3: {e:?}"
506
0
                            ))
507
1
                        },
508
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
509
1
                            upload_id.map_or_else(
510
1
                                || {
511
0
                                    RetryResult::Err(make_err!(
512
0
                                        Code::Internal,
513
0
                                        "Expected upload_id to be set by s3 response"
514
0
                                    ))
515
1
                                },
516
1
                                RetryResult::Ok,
517
1
                            )
518
1
                        },
519
1
                    );
520
1
                Some((retry_result, ()))
521
1
            }))
522
1
            .await
?0
;
523
524
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
525
        // 5mb (except last part) and can have up to 10,000 parts.
526
1
        let bytes_per_upload_part =
527
1
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
528
1
529
1
        let upload_parts = move || async move {
530
1
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
531
1
            // bytes in memory at any given time waiting to be uploaded.
532
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
533
1
534
1
            let read_stream_fut = async move {
535
1
                let retrier = &Pin::get_ref(self).retrier;
536
                // Note: Our break condition is when we reach EOF.
537
4
                for part_number in 1..i32::MAX {
538
4
                    let write_buf = reader
539
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(|| 
"Could not convert bytes_per_upload_part to usize"0
)
?0
))
540
4
                        .await
541
4
                        .err_tip(|| 
"Failed to read chunk in s3_store"0
)
?0
;
542
4
                    if write_buf.is_empty() {
  Branch (542:24): [True: 0, False: 0]
  Branch (542:24): [Folded - Ignored]
  Branch (542:24): [True: 1, False: 3]
543
1
                        break; // Reached EOF.
544
3
                    }
545
3
546
3
                    tx.send(retrier.retry(unfold(
547
3
                        write_buf,
548
3
                        move |write_buf| {
549
3
                            async move {
550
3
                                let retry_result = self
551
3
                                    .s3_client
552
3
                                    .upload_part()
553
3
                                    .bucket(&self.bucket)
554
3
                                    .key(s3_path)
555
3
                                    .upload_id(upload_id)
556
3
                                    .body(ByteStream::new(SdkBody::from(write_buf.clone())))
557
3
                                    .part_number(part_number)
558
3
                                    .send()
559
3
                                    .await
560
3
                                    .map_or_else(
561
3
                                        |e| {
562
0
                                            RetryResult::Retry(make_err!(
563
0
                                                Code::Aborted,
564
0
                                                "Failed to upload part {part_number} in S3 store: {e:?}"
565
0
                                            ))
566
3
                                        },
567
3
                                        |mut response| {
568
3
                                            RetryResult::Ok(
569
3
                                                CompletedPartBuilder::default()
570
3
                                                    // Only set an entity tag if it exists. This saves
571
3
                                                    // 13 bytes per part on the final request if it can
572
3
                                                    // omit the `<ETAG><ETAG/>` string.
573
3
                                                    .set_e_tag(response.e_tag.take())
574
3
                                                    .part_number(part_number)
575
3
                                                    .build(),
576
3
                                            )
577
3
                                        },
578
3
                                    );
579
3
                                Some((retry_result, write_buf))
580
3
                            }
581
3
                        }
582
3
                    ))).await.map_err(|_| 
make_err!(Code::Internal, "Failed to send part to channel in s3_store")0
)
?0
;
583
                }
584
1
                Result::<_, Error>::Ok(())
585
1
            }.fuse();
586
1
587
1
            let mut upload_futures = FuturesUnordered::new();
588
589
1
            let mut completed_parts = Vec::with_capacity(
590
1
                usize::try_from(cmp::min(
591
1
                    MAX_UPLOAD_PARTS as u64,
592
1
                    (max_size / bytes_per_upload_part) + 1,
593
1
                ))
594
1
                .err_tip(|| 
"Could not convert u64 to usize"0
)
?0
,
595
            );
596
1
            tokio::pin!(read_stream_fut);
597
            loop {
598
8
                if read_stream_fut.is_terminated() && 
rx.is_empty()7
&&
upload_futures.is_empty()4
{
  Branch (598:20): [True: 0, False: 0]
  Branch (598:55): [True: 0, False: 0]
  Branch (598:72): [True: 0, False: 0]
  Branch (598:20): [Folded - Ignored]
  Branch (598:55): [Folded - Ignored]
  Branch (598:72): [Folded - Ignored]
  Branch (598:20): [True: 7, False: 1]
  Branch (598:55): [True: 4, False: 3]
  Branch (598:72): [True: 1, False: 3]
599
1
                    break; // No more data to process.
600
7
                }
601
7
                tokio::select! {
602
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
603
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts.push(upload_result3
?0
),
604
7
                    Some(
fut3
) = rx.recv() =>
upload_futures.push(fut)3
,
605
                }
606
            }
607
608
            // Even though the spec does not require parts to be sorted by number, we do it just in case
609
            // there's an S3 implementation that requires it.
610
4
            
completed_parts.sort_unstable_by_key(1
|part| part.part_number);
611
1
612
1
            self.retrier
613
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
614
1
                    Some((
615
1
                        self.s3_client
616
1
                            .complete_multipart_upload()
617
1
                            .bucket(&self.bucket)
618
1
                            .key(s3_path)
619
1
                            .multipart_upload(
620
1
                                CompletedMultipartUploadBuilder::default()
621
1
                                    .set_parts(Some(completed_parts.clone()))
622
1
                                    .build(),
623
1
                            )
624
1
                            .upload_id(upload_id)
625
1
                            .send()
626
1
                            .await
627
1
                            .map_or_else(
628
1
                                |e| {
629
0
                                    RetryResult::Retry(make_err!(
630
0
                                        Code::Aborted,
631
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
632
0
                                    ))
633
1
                                },
634
1
                                |_| RetryResult::Ok(()),
635
1
                            ),
636
1
                        completed_parts,
637
1
                    ))
638
1
                }))
639
1
                .await
640
2
        };
641
        // Upload our parts and complete the multipart upload.
642
        // If we fail attempt to abort the multipart upload (cleanup).
643
1
        upload_parts()
644
1
            .or_else(move |e| async move {
645
0
                Result::<(), _>::Err(e).merge(
646
0
                    // Note: We don't retry here because this is just a best attempt.
647
0
                    self.s3_client
648
0
                        .abort_multipart_upload()
649
0
                        .bucket(&self.bucket)
650
0
                        .key(s3_path)
651
0
                        .upload_id(upload_id)
652
0
                        .send()
653
0
                        .await
654
0
                        .map_or_else(
655
0
                            |e| {
656
0
                                let err = make_err!(
657
0
                                    Code::Aborted,
658
0
                                    "Failed to abort multipart upload in S3 store : {e:?}"
659
0
                                );
660
0
                                event!(Level::INFO, ?err, "Multipart upload error");
661
0
                                Err(err)
662
0
                            },
663
0
                            |_| Ok(()),
664
0
                        ),
665
0
                )
666
1
            
}0
)
667
1
            .await
668
4
    }
669
670
    async fn get_part(
671
        self: Pin<&Self>,
672
        key: StoreKey<'_>,
673
        writer: &mut DropCloserWriteHalf,
674
        offset: u64,
675
        length: Option<u64>,
676
5
    ) -> Result<(), Error> {
677
5
        if is_zero_digest(key.borrow()) {
  Branch (677:12): [True: 0, False: 0]
  Branch (677:12): [Folded - Ignored]
  Branch (677:12): [True: 1, False: 4]
678
1
            writer
679
1
                .send_eof()
680
1
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
681
1
            return Ok(());
682
4
        }
683
4
684
4
        let s3_path = &self.make_s3_path(&key);
685
4
        let end_read_byte = length
686
4
            .map_or(Some(None), |length| 
Some(offset.checked_add(length))2
)
687
4
            .err_tip(|| 
"Integer overflow protection triggered"0
)
?0
;
688
689
4
        self.retrier
690
7
            .retry(unfold(writer, move |writer| async move {
691
7
                let result = self
692
7
                    .s3_client
693
7
                    .get_object()
694
7
                    .bucket(&self.bucket)
695
7
                    .key(s3_path)
696
7
                    .range(format!(
697
7
                        "bytes={}-{}",
698
7
                        offset + writer.get_bytes_written(),
699
7
                        end_read_byte.map_or_else(String::new, |v| 
v.to_string()2
)
700
7
                    ))
701
7
                    .send()
702
7
                    .await;
703
704
7
                let 
mut s3_in_stream4
= match result {
705
4
                    Ok(head_object_output) => head_object_output.body,
706
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
707
0
                        GetObjectError::NoSuchKey(e) => {
708
0
                            return Some((
709
0
                                RetryResult::Err(make_err!(
710
0
                                    Code::NotFound,
711
0
                                    "No such key in S3: {e}"
712
0
                                )),
713
0
                                writer,
714
0
                            ));
715
                        }
716
3
                        other => {
717
3
                            return Some((
718
3
                                RetryResult::Retry(make_err!(
719
3
                                    Code::Unavailable,
720
3
                                    "Unhandled GetObjectError in S3: {other:?}",
721
3
                                )),
722
3
                                writer,
723
3
                            ));
724
                        }
725
                    },
726
                };
727
728
                // Copy data from s3 input stream to the writer stream.
729
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (729:27): [True: 0, False: 0]
  Branch (729:27): [Folded - Ignored]
  Branch (729:27): [True: 4, False: 4]
730
4
                    match maybe_bytes {
731
4
                        Ok(bytes) => {
732
4
                            if bytes.is_empty() {
  Branch (732:32): [True: 0, False: 0]
  Branch (732:32): [Folded - Ignored]
  Branch (732:32): [True: 1, False: 3]
733
                                // Ignore possible EOF. Different implimentations of S3 may or may not
734
                                // send EOF this way.
735
1
                                continue;
736
3
                            }
737
3
                            if let Err(
e0
) = writer.send(bytes).await {
  Branch (737:36): [True: 0, False: 0]
  Branch (737:36): [Folded - Ignored]
  Branch (737:36): [True: 0, False: 3]
738
0
                                return Some((
739
0
                                    RetryResult::Err(make_err!(
740
0
                                        Code::Aborted,
741
0
                                        "Error sending bytes to consumer in S3: {e}"
742
0
                                    )),
743
0
                                    writer,
744
0
                                ));
745
3
                            }
746
                        }
747
0
                        Err(e) => {
748
0
                            return Some((
749
0
                                RetryResult::Retry(make_err!(
750
0
                                    Code::Aborted,
751
0
                                    "Bad bytestream element in S3: {e}"
752
0
                                )),
753
0
                                writer,
754
0
                            ));
755
                        }
756
                    }
757
                }
758
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (758:24): [True: 0, False: 0]
  Branch (758:24): [Folded - Ignored]
  Branch (758:24): [True: 0, False: 4]
759
0
                    return Some((
760
0
                        RetryResult::Err(make_err!(
761
0
                            Code::Aborted,
762
0
                            "Failed to send EOF to consumer in S3: {e}"
763
0
                        )),
764
0
                        writer,
765
0
                    ));
766
4
                }
767
4
                Some((RetryResult::Ok(()), writer))
768
14
            }))
769
4
            .await
770
10
    }
771
772
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
773
0
        self
774
0
    }
775
776
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
777
0
        self
778
0
    }
779
780
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
781
0
        self
782
0
    }
783
784
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
785
0
        registry.register_indicator(self);
786
0
    }
787
}
788
789
#[async_trait]
790
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
791
where
792
    I: InstantWrapper,
793
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
794
{
795
0
    fn get_name(&self) -> &'static str {
796
0
        "S3Store"
797
0
    }
798
799
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
800
0
        StoreDriver::check_health(Pin::new(self), namespace).await
801
0
    }
802
}