Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::future::Future;
17
use std::pin::Pin;
18
use std::sync::Arc;
19
use std::task::{Context, Poll};
20
use std::time::Duration;
21
use std::{cmp, env};
22
23
use async_trait::async_trait;
24
use aws_config::default_provider::credentials;
25
use aws_config::{AppName, BehaviorVersion};
26
use aws_sdk_s3::config::Region;
27
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
28
use aws_sdk_s3::operation::get_object::GetObjectError;
29
use aws_sdk_s3::operation::head_object::HeadObjectError;
30
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
31
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
32
use aws_sdk_s3::Client;
33
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
34
use bytes::Bytes;
35
use futures::future::FusedFuture;
36
use futures::stream::{unfold, FuturesUnordered};
37
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
38
use http_body::{Frame, SizeHint};
39
use hyper::client::connect::{Connected, Connection, HttpConnector};
40
use hyper::service::Service;
41
use hyper::Uri;
42
use hyper_rustls::{HttpsConnector, MaybeHttpsStream};
43
// Note: S3 store should be very careful about the error codes it returns
44
// when in a retryable wrapper. Always prefer Code::Aborted or another
45
// retryable code over Code::InvalidArgument or make_input_err!().
46
// ie: Don't import make_input_err!() to help prevent this.
47
use nativelink_error::{make_err, Code, Error, ResultExt};
48
use nativelink_metric::MetricsComponent;
49
use nativelink_util::buf_channel::{
50
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
51
};
52
use nativelink_util::fs;
53
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
54
use nativelink_util::instant_wrapper::InstantWrapper;
55
use nativelink_util::retry::{Retrier, RetryResult};
56
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
57
use rand::rngs::OsRng;
58
use rand::Rng;
59
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
60
use tokio::net::TcpStream;
61
use tokio::sync::{mpsc, SemaphorePermit};
62
use tokio::time::sleep;
63
use tracing::{event, Level};
64
65
use crate::cas_utils::is_zero_digest;
66
67
// S3 parts cannot be smaller than this number. See:
68
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
69
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
70
71
// S3 parts cannot be larger than this number. See:
72
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
73
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
74
75
// S3 parts cannot be more than this number. See:
76
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
77
const MAX_UPLOAD_PARTS: usize = 10_000;
78
79
// Default max buffer size for retrying upload requests.
80
// Note: If you change this, adjust the docs in the config.
81
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
82
83
// Default limit for concurrent part uploads per multipart upload.
84
// Note: If you change this, adjust the docs in the config.
85
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
86
87
pub struct ConnectionWithPermit<T: Connection + AsyncRead + AsyncWrite + Unpin> {
88
    connection: T,
89
    _permit: SemaphorePermit<'static>,
90
}
91
92
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> Connection for ConnectionWithPermit<T> {
93
0
    fn connected(&self) -> Connected {
94
0
        self.connection.connected()
95
0
    }
96
}
97
98
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> AsyncRead for ConnectionWithPermit<T> {
99
    #[inline]
100
0
    fn poll_read(
101
0
        self: Pin<&mut Self>,
102
0
        cx: &mut Context,
103
0
        buf: &mut ReadBuf<'_>,
104
0
    ) -> Poll<Result<(), tokio::io::Error>> {
105
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_read(cx, buf)
106
0
    }
107
}
108
109
impl<T: Connection + AsyncWrite + AsyncRead + Unpin> AsyncWrite for ConnectionWithPermit<T> {
110
    #[inline]
111
0
    fn poll_write(
112
0
        self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
        buf: &[u8],
115
0
    ) -> Poll<Result<usize, tokio::io::Error>> {
116
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_write(cx, buf)
117
0
    }
118
119
    #[inline]
120
0
    fn poll_flush(
121
0
        self: Pin<&mut Self>,
122
0
        cx: &mut Context<'_>,
123
0
    ) -> Poll<Result<(), tokio::io::Error>> {
124
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_flush(cx)
125
0
    }
126
127
    #[inline]
128
0
    fn poll_shutdown(
129
0
        self: Pin<&mut Self>,
130
0
        cx: &mut Context<'_>,
131
0
    ) -> Poll<Result<(), tokio::io::Error>> {
132
0
        Pin::new(&mut Pin::get_mut(self).connection).poll_shutdown(cx)
133
0
    }
134
}
135
136
#[derive(Clone)]
137
pub struct TlsConnector {
138
    connector: HttpsConnector<HttpConnector>,
139
    retrier: Retrier,
140
}
141
142
impl TlsConnector {
143
    #[must_use]
144
0
    pub fn new(
145
0
        config: &nativelink_config::stores::S3Store,
146
0
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
147
0
    ) -> Self {
148
0
        let connector_with_roots = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots();
149
150
0
        let connector_with_schemes = if config.insecure_allow_http {
  Branch (150:41): [True: 0, False: 0]
  Branch (150:41): [Folded - Ignored]
151
0
            connector_with_roots.https_or_http()
152
        } else {
153
0
            connector_with_roots.https_only()
154
        };
155
156
0
        let connector = if config.disable_http2 {
  Branch (156:28): [True: 0, False: 0]
  Branch (156:28): [Folded - Ignored]
157
0
            connector_with_schemes.enable_http1().build()
158
        } else {
159
0
            connector_with_schemes.enable_http1().enable_http2().build()
160
        };
161
162
0
        Self {
163
0
            connector,
164
0
            retrier: Retrier::new(
165
0
                Arc::new(|duration| Box::pin(sleep(duration))),
166
0
                jitter_fn,
167
0
                config.retry.clone(),
168
0
            ),
169
0
        }
170
0
    }
171
172
0
    async fn call_with_retry(
173
0
        &self,
174
0
        req: &Uri,
175
0
    ) -> Result<ConnectionWithPermit<MaybeHttpsStream<TcpStream>>, Error> {
176
0
        let retry_stream_fn = unfold(self.connector.clone(), move |mut connector| async move {
177
0
            let _permit = fs::get_permit().await.unwrap();
178
0
            match connector.call(req.clone()).await {
179
0
                Ok(connection) => Some((
180
0
                    RetryResult::Ok(ConnectionWithPermit {
181
0
                        connection,
182
0
                        _permit,
183
0
                    }),
184
0
                    connector,
185
0
                )),
186
0
                Err(e) => Some((
187
0
                    RetryResult::Retry(make_err!(
188
0
                        Code::Unavailable,
189
0
                        "Failed to call S3 connector: {e:?}"
190
0
                    )),
191
0
                    connector,
192
0
                )),
193
            }
194
0
        });
195
0
        self.retrier.retry(retry_stream_fn).await
196
0
    }
197
}
198
199
impl Service<Uri> for TlsConnector {
200
    type Response = ConnectionWithPermit<MaybeHttpsStream<TcpStream>>;
201
    type Error = Error;
202
    type Future =
203
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
204
205
0
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
206
0
        self.connector
207
0
            .poll_ready(cx)
208
0
            .map_err(|e| make_err!(Code::Unavailable, "Failed poll in S3: {e}"))
209
0
    }
210
211
0
    fn call(&mut self, req: Uri) -> Self::Future {
212
0
        let connector_clone = self.clone();
213
0
        Box::pin(async move { connector_clone.call_with_retry(&req).await })
214
0
    }
215
}
216
217
pub struct BodyWrapper {
218
    reader: DropCloserReadHalf,
219
    size: u64,
220
}
221
222
impl http_body::Body for BodyWrapper {
223
    type Data = Bytes;
224
    type Error = std::io::Error;
225
226
76
    fn poll_frame(
227
76
        self: Pin<&mut Self>,
228
76
        cx: &mut Context<'_>,
229
76
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
230
76
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
231
76
        reader
232
76
            .poll_next(cx)
233
76
            .map(|maybe_bytes_res| 
maybe_bytes_res.map(51
|res|
res.map(Frame::data)50
)51
)
234
76
    }
235
236
2
    fn size_hint(&self) -> SizeHint {
237
2
        SizeHint::with_exact(self.size)
238
2
    }
239
}
240
241
0
#[derive(MetricsComponent)]
242
pub struct S3Store<NowFn> {
243
    s3_client: Arc<Client>,
244
    now_fn: NowFn,
245
    #[metric(help = "The bucket name for the S3 store")]
246
    bucket: String,
247
    #[metric(help = "The key prefix for the S3 store")]
248
    key_prefix: String,
249
    retrier: Retrier,
250
    #[metric(help = "The number of seconds to consider an object expired")]
251
    consider_expired_after_s: i64,
252
    #[metric(help = "The number of bytes to buffer for retrying requests")]
253
    max_retry_buffer_per_request: usize,
254
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
255
    multipart_max_concurrent_uploads: usize,
256
}
257
258
impl<I, NowFn> S3Store<NowFn>
259
where
260
    I: InstantWrapper,
261
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
262
{
263
0
    pub async fn new(
264
0
        config: &nativelink_config::stores::S3Store,
265
0
        now_fn: NowFn,
266
0
    ) -> Result<Arc<Self>, Error> {
267
0
        let jitter_amt = config.retry.jitter;
268
0
        let jitter_fn = Arc::new(move |delay: Duration| {
269
0
            if jitter_amt == 0. {
  Branch (269:16): [True: 0, False: 0]
  Branch (269:16): [Folded - Ignored]
270
0
                return delay;
271
0
            }
272
0
            let min = 1. - (jitter_amt / 2.);
273
0
            let max = 1. + (jitter_amt / 2.);
274
0
            delay.mul_f32(OsRng.gen_range(min..max))
275
0
        });
276
0
        let s3_client = {
277
0
            let http_client =
278
0
                HyperClientBuilder::new().build(TlsConnector::new(config, jitter_fn.clone()));
279
0
            let credential_provider = credentials::default_provider().await;
280
0
            let mut config_builder = aws_config::defaults(BehaviorVersion::v2024_03_28())
281
0
                .credentials_provider(credential_provider)
282
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
283
0
                .timeout_config(
284
0
                    aws_config::timeout::TimeoutConfig::builder()
285
0
                        .connect_timeout(Duration::from_secs(15))
286
0
                        .build(),
287
0
                )
288
0
                .region(Region::new(Cow::Owned(config.region.clone())))
289
0
                .http_client(http_client);
290
            // TODO(allada) When aws-sdk supports this env variable we should be able
291
            // to remove this.
292
            // See: https://github.com/awslabs/aws-sdk-rust/issues/932
293
0
            if let Ok(endpoint_url) = env::var("AWS_ENDPOINT_URL") {
  Branch (293:20): [True: 0, False: 0]
  Branch (293:20): [Folded - Ignored]
294
0
                config_builder = config_builder.endpoint_url(endpoint_url);
295
0
            }
296
0
            aws_sdk_s3::Client::new(&config_builder.load().await)
297
        };
298
0
        Self::new_with_client_and_jitter(config, s3_client, jitter_fn, now_fn)
299
0
    }
300
301
12
    pub fn new_with_client_and_jitter(
302
12
        config: &nativelink_config::stores::S3Store,
303
12
        s3_client: Client,
304
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
305
12
        now_fn: NowFn,
306
12
    ) -> Result<Arc<Self>, Error> {
307
12
        Ok(Arc::new(Self {
308
12
            s3_client: Arc::new(s3_client),
309
12
            now_fn,
310
12
            bucket: config.bucket.to_string(),
311
12
            key_prefix: config.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
312
12
            retrier: Retrier::new(
313
12
                Arc::new(|duration| 
Box::pin(sleep(duration))6
),
314
12
                jitter_fn,
315
12
                config.retry.clone(),
316
12
            ),
317
12
            consider_expired_after_s: i64::from(config.consider_expired_after_s),
318
12
            max_retry_buffer_per_request: config
319
12
                .max_retry_buffer_per_request
320
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
321
12
            multipart_max_concurrent_uploads: config
322
12
                .multipart_max_concurrent_uploads
323
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| 
v0
),
324
12
        }))
325
12
    }
326
327
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
328
14
        format!("{}{}", self.key_prefix, key.as_str(),)
329
14
    }
330
331
5
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
332
5
        self.retrier
333
8
            .retry(unfold((), move |state| async move {
334
8
                let result = self
335
8
                    .s3_client
336
8
                    .head_object()
337
8
                    .bucket(&self.bucket)
338
8
                    .key(self.make_s3_path(&digest.borrow()))
339
8
                    .send()
340
0
                    .await;
341
342
8
                match result {
343
4
                    Ok(head_object_output) => {
344
4
                        if self.consider_expired_after_s != 0 {
  Branch (344:28): [True: 0, False: 0]
  Branch (344:28): [Folded - Ignored]
  Branch (344:28): [True: 2, False: 2]
345
2
                            if let Some(last_modified) = head_object_output.last_modified {
  Branch (345:36): [True: 0, False: 0]
  Branch (345:36): [Folded - Ignored]
  Branch (345:36): [True: 2, False: 0]
346
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
347
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
  Branch (347:36): [True: 0, False: 0]
  Branch (347:36): [Folded - Ignored]
  Branch (347:36): [True: 1, False: 1]
348
1
                                    return Some((RetryResult::Ok(None), state));
349
1
                                }
350
0
                            }
351
2
                        }
352
3
                        let Some(length) = head_object_output.content_length else {
  Branch (352:29): [True: 0, False: 0]
  Branch (352:29): [Folded - Ignored]
  Branch (352:29): [True: 3, False: 0]
353
0
                            return Some((RetryResult::Ok(None), state));
354
                        };
355
3
                        if length >= 0 {
  Branch (355:28): [True: 0, False: 0]
  Branch (355:28): [Folded - Ignored]
  Branch (355:28): [True: 3, False: 0]
356
3
                            return Some((RetryResult::Ok(Some(length as u64)), state));
357
0
                        }
358
0
                        Some((
359
0
                            RetryResult::Err(make_err!(
360
0
                                Code::InvalidArgument,
361
0
                                "Negative content length in S3: {length:?}",
362
0
                            )),
363
0
                            state,
364
0
                        ))
365
                    }
366
4
                    Err(sdk_error) => match sdk_error.into_service_error() {
367
1
                        HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
368
3
                        other => Some((
369
3
                            RetryResult::Retry(make_err!(
370
3
                                Code::Unavailable,
371
3
                                "Unhandled HeadObjectError in S3: {other:?}"
372
3
                            )),
373
3
                            state,
374
3
                        )),
375
                    },
376
                }
377
16
            }
))5
378
3
            .await
379
5
    }
380
}
381
382
#[async_trait]
383
impl<I, NowFn> StoreDriver for S3Store<NowFn>
384
where
385
    I: InstantWrapper,
386
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
387
{
388
    async fn has_with_results(
389
        self: Pin<&Self>,
390
        keys: &[StoreKey<'_>],
391
        results: &mut [Option<u64>],
392
6
    ) -> Result<(), Error> {
393
6
        keys.iter()
394
6
            .zip(results.iter_mut())
395
6
            .map(|(key, result)| async move {
396
6
                // We need to do a special pass to ensure our zero key exist.
397
6
                if is_zero_digest(key.borrow()) {
  Branch (397:20): [True: 0, False: 0]
  Branch (397:20): [Folded - Ignored]
  Branch (397:20): [True: 1, False: 5]
398
1
                    *result = Some(0);
399
1
                    return Ok::<_, Error>(());
400
5
                }
401
5
                *result = self.has(key).
await3
?0
;
402
5
                Ok::<_, Error>(())
403
12
            })
404
6
            .collect::<FuturesUnordered<_>>()
405
6
            .try_collect()
406
6
            .await
407
12
    }
408
409
    async fn update(
410
        self: Pin<&Self>,
411
        digest: StoreKey<'_>,
412
        mut reader: DropCloserReadHalf,
413
        upload_size: UploadSizeInfo,
414
2
    ) -> Result<(), Error> {
415
2
        let s3_path = &self.make_s3_path(&digest.borrow());
416
417
2
        let max_size = match upload_size {
418
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
419
2
        };
420
2
421
2
        // Note(allada) It might be more optimal to use a different
422
2
        // heuristic here, but for simplicity we use a hard coded value.
423
2
        // Anything going down this if-statement will have the advantage of only
424
2
        // 1 network request for the upload instead of minimum of 3 required for
425
2
        // multipart upload requests.
426
2
        //
427
2
        // Note(allada) If the upload size is not known, we go down the multipart upload path.
428
2
        // This is not very efficient, but it greatly reduces the complexity of the code.
429
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
upload_size1
, UploadSizeInfo::ExactSize(_)) {
  Branch (429:12): [True: 0, False: 0]
  Branch (429:12): [Folded - Ignored]
  Branch (429:12): [True: 1, False: 1]
430
1
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
  Branch (430:17): [True: 0, False: 0]
  Branch (430:17): [Folded - Ignored]
  Branch (430:17): [True: 1, False: 0]
431
0
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
432
            };
433
1
            reader.set_max_recent_data_size(
434
1
                u64::try_from(self.max_retry_buffer_per_request)
435
1
                    .err_tip(|| 
"Could not convert max_retry_buffer_per_request to u64"0
)
?0
,
436
            );
437
1
            return self
438
1
                .retrier
439
1
                .retry(unfold(reader, move |mut reader| async move {
440
1
                    // We need to make a new pair here because the aws sdk does not give us
441
1
                    // back the body after we send it in order to retry.
442
1
                    let (mut tx, rx) = make_buf_channel_pair();
443
444
                    // Upload the data to the S3 backend.
445
1
                    let result = {
446
1
                        let reader_ref = &mut reader;
447
1
                        let (upload_res, bind_res) = tokio::join!(
448
1
                            self.s3_client
449
1
                                .put_object()
450
1
                                .bucket(&self.bucket)
451
1
                                .key(s3_path.clone())
452
1
                                .content_length(sz as i64)
453
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
454
1
                                    reader: rx,
455
1
                                    size: sz,
456
1
                                }))
457
1
                                .send()
458
1
                                .map_ok_or_else(|e| 
Err(make_err!(Code::Aborted, "{e:?}"))0
, |_| Ok(())),
459
1
                            // Stream all data from the reader channel to the writer channel.
460
1
                            tx.bind_buffered(reader_ref)
461
1
                        );
462
1
                        upload_res
463
1
                            .merge(bind_res)
464
1
                            .err_tip(|| 
"Failed to upload file to s3 in single chunk"0
)
465
1
                    };
466
1
467
1
                    // If we failed to upload the file, check to see if we can retry.
468
1
                    let retry_result = result.map_or_else(|mut err| {
469
0
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
470
0
                        err.code = Code::Aborted;
471
0
                        let bytes_received = reader.get_bytes_received();
472
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (472:32): [True: 0, False: 0]
  Branch (472:32): [Folded - Ignored]
  Branch (472:32): [True: 0, False: 0]
473
0
                            event!(
474
0
                                Level::ERROR,
475
                                ?bytes_received,
476
                                err = ?try_reset_err,
477
0
                                "Unable to reset stream after failed upload in S3Store::update"
478
                            );
479
0
                            return RetryResult::Err(err
480
0
                                .merge(try_reset_err)
481
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
482
0
                        }
483
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
484
0
                        event!(
485
0
                            Level::INFO,
486
                            ?err,
487
                            ?bytes_received,
488
0
                            "Retryable S3 error"
489
                        );
490
0
                        RetryResult::Retry(err)
491
1
                    
}0
, |()| RetryResult::Ok(()));
492
1
                    Some((retry_result, reader))
493
1
                }))
494
51
                .await;
495
1
        }
496
497
1
        let upload_id = &self
498
1
            .retrier
499
1
            .retry(unfold((), move |()| async move {
500
1
                let retry_result = self
501
1
                    .s3_client
502
1
                    .create_multipart_upload()
503
1
                    .bucket(&self.bucket)
504
1
                    .key(s3_path)
505
1
                    .send()
506
0
                    .await
507
1
                    .map_or_else(
508
1
                        |e| {
509
0
                            RetryResult::Retry(make_err!(
510
0
                                Code::Aborted,
511
0
                                "Failed to create multipart upload to s3: {e:?}"
512
0
                            ))
513
1
                        },
514
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
515
1
                            upload_id.map_or_else(
516
1
                                || {
517
0
                                    RetryResult::Err(make_err!(
518
0
                                        Code::Internal,
519
0
                                        "Expected upload_id to be set by s3 response"
520
0
                                    ))
521
1
                                },
522
1
                                RetryResult::Ok,
523
1
                            )
524
1
                        },
525
1
                    );
526
1
                Some((retry_result, ()))
527
1
            }))
528
0
            .await?;
529
530
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
531
        // 5mb (except last part) and can have up to 10,000 parts.
532
1
        let bytes_per_upload_part =
533
1
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
534
1
535
1
        let upload_parts = move || async move {
536
1
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
537
1
            // bytes in memory at any given time waiting to be uploaded.
538
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
539
1
540
1
            let read_stream_fut = async move {
541
1
                let retrier = &Pin::get_ref(self).retrier;
542
                // Note: Our break condition is when we reach EOF.
543
4
                for part_number in 1..i32::MAX {
544
4
                    let write_buf = reader
545
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(|| 
"Could not convert bytes_per_upload_part to usize"0
)
?0
))
546
0
                        .await
547
4
                        .err_tip(|| 
"Failed to read chunk in s3_store"0
)
?0
;
548
4
                    if write_buf.is_empty() {
  Branch (548:24): [True: 0, False: 0]
  Branch (548:24): [Folded - Ignored]
  Branch (548:24): [True: 1, False: 3]
549
1
                        break; // Reached EOF.
550
3
                    }
551
3
552
3
                    tx.send(retrier.retry(unfold(
553
3
                        write_buf,
554
3
                        move |write_buf| {
555
3
                            async move {
556
3
                                let retry_result = self
557
3
                                    .s3_client
558
3
                                    .upload_part()
559
3
                                    .bucket(&self.bucket)
560
3
                                    .key(s3_path)
561
3
                                    .upload_id(upload_id)
562
3
                                    .body(ByteStream::new(SdkBody::from(write_buf.clone())))
563
3
                                    .part_number(part_number)
564
3
                                    .send()
565
0
                                    .await
566
3
                                    .map_or_else(
567
3
                                        |e| {
568
0
                                            RetryResult::Retry(make_err!(
569
0
                                                Code::Aborted,
570
0
                                                "Failed to upload part {part_number} in S3 store: {e:?}"
571
0
                                            ))
572
3
                                        },
573
3
                                        |mut response| {
574
3
                                            RetryResult::Ok(
575
3
                                                CompletedPartBuilder::default()
576
3
                                                    // Only set an entity tag if it exists. This saves
577
3
                                                    // 13 bytes per part on the final request if it can
578
3
                                                    // omit the `<ETAG><ETAG/>` string.
579
3
                                                    .set_e_tag(response.e_tag.take())
580
3
                                                    .part_number(part_number)
581
3
                                                    .build(),
582
3
                                            )
583
3
                                        },
584
3
                                    );
585
3
                                Some((retry_result, write_buf))
586
3
                            }
587
3
                        }
588
3
                    ))).
await0
.map_err(|_|
make_err!(Code::Internal, "Failed to send part to channel in s3_store")0
)
?0
;
589
                }
590
1
                Result::<_, Error>::Ok(())
591
1
            }.fuse();
592
1
593
1
            let mut upload_futures = FuturesUnordered::new();
594
595
1
            let mut completed_parts = Vec::with_capacity(
596
1
                usize::try_from(cmp::min(
597
1
                    MAX_UPLOAD_PARTS as u64,
598
1
                    (max_size / bytes_per_upload_part) + 1,
599
1
                ))
600
1
                .err_tip(|| 
"Could not convert u64 to usize"0
)
?0
,
601
            );
602
1
            tokio::pin!(read_stream_fut);
603
            loop {
604
8
                if read_stream_fut.is_terminated() && 
rx.is_empty()7
&&
upload_futures.is_empty()2
{
  Branch (604:20): [True: 0, False: 0]
  Branch (604:55): [True: 0, False: 0]
  Branch (604:72): [True: 0, False: 0]
  Branch (604:20): [Folded - Ignored]
  Branch (604:55): [Folded - Ignored]
  Branch (604:72): [Folded - Ignored]
  Branch (604:20): [True: 7, False: 1]
  Branch (604:55): [True: 2, False: 5]
  Branch (604:72): [True: 1, False: 1]
605
1
                    break; // No more data to process.
606
7
                }
607
7
                tokio::select! {
608
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
609
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts.push(upload_result3
?0
),
610
7
                    Some(
fut3
) = rx.recv() =>
upload_futures.push(fut)3
,
611
                }
612
            }
613
614
            // Even though the spec does not require parts to be sorted by number, we do it just in case
615
            // there's an S3 implementation that requires it.
616
4
            
completed_parts.sort_unstable_by_key(1
|part| part.part_number);
617
1
618
1
            self.retrier
619
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
620
1
                    Some((
621
1
                        self.s3_client
622
1
                            .complete_multipart_upload()
623
1
                            .bucket(&self.bucket)
624
1
                            .key(s3_path)
625
1
                            .multipart_upload(
626
1
                                CompletedMultipartUploadBuilder::default()
627
1
                                    .set_parts(Some(completed_parts.clone()))
628
1
                                    .build(),
629
1
                            )
630
1
                            .upload_id(upload_id)
631
1
                            .send()
632
0
                            .await
633
1
                            .map_or_else(
634
1
                                |e| {
635
0
                                    RetryResult::Retry(make_err!(
636
0
                                        Code::Aborted,
637
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
638
0
                                    ))
639
1
                                },
640
1
                                |_| RetryResult::Ok(()),
641
1
                            ),
642
1
                        completed_parts,
643
1
                    ))
644
1
                }))
645
0
                .await
646
2
        };
647
        // Upload our parts and complete the multipart upload.
648
        // If we fail attempt to abort the multipart upload (cleanup).
649
1
        upload_parts()
650
1
            .or_else(move |e| async move {
651
0
                Result::<(), _>::Err(e).merge(
652
0
                    // Note: We don't retry here because this is just a best attempt.
653
0
                    self.s3_client
654
0
                        .abort_multipart_upload()
655
0
                        .bucket(&self.bucket)
656
0
                        .key(s3_path)
657
0
                        .upload_id(upload_id)
658
0
                        .send()
659
0
                        .await
660
0
                        .map_or_else(
661
0
                            |e| {
662
0
                                let err = make_err!(
663
0
                                    Code::Aborted,
664
0
                                    "Failed to abort multipart upload in S3 store : {e:?}"
665
0
                                );
666
0
                                event!(Level::INFO, ?err, "Multipart upload error");
667
0
                                Err(err)
668
0
                            },
669
0
                            |_| Ok(()),
670
0
                        ),
671
0
                )
672
1
            
}0
)
673
0
            .await
674
4
    }
675
676
    async fn get_part(
677
        self: Pin<&Self>,
678
        key: StoreKey<'_>,
679
        writer: &mut DropCloserWriteHalf,
680
        offset: u64,
681
        length: Option<u64>,
682
5
    ) -> Result<(), Error> {
683
5
        if is_zero_digest(key.borrow()) {
  Branch (683:12): [True: 0, False: 0]
  Branch (683:12): [Folded - Ignored]
  Branch (683:12): [True: 1, False: 4]
684
1
            writer
685
1
                .send_eof()
686
1
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
687
1
            return Ok(());
688
4
        }
689
4
690
4
        let s3_path = &self.make_s3_path(&key);
691
4
        let end_read_byte = length
692
4
            .map_or(Some(None), |length| 
Some(offset.checked_add(length))2
)
693
4
            .err_tip(|| 
"Integer overflow protection triggered"0
)
?0
;
694
695
4
        self.retrier
696
7
            .retry(unfold(writer, move |writer| async move {
697
7
                let result = self
698
7
                    .s3_client
699
7
                    .get_object()
700
7
                    .bucket(&self.bucket)
701
7
                    .key(s3_path)
702
7
                    .range(format!(
703
7
                        "bytes={}-{}",
704
7
                        offset + writer.get_bytes_written(),
705
7
                        end_read_byte.map_or_else(String::new, |v| 
v.to_string()2
)
706
7
                    ))
707
7
                    .send()
708
0
                    .await;
709
710
7
                let 
mut s3_in_stream4
= match result {
711
4
                    Ok(head_object_output) => head_object_output.body,
712
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
713
0
                        GetObjectError::NoSuchKey(e) => {
714
0
                            return Some((
715
0
                                RetryResult::Err(make_err!(
716
0
                                    Code::NotFound,
717
0
                                    "No such key in S3: {e}"
718
0
                                )),
719
0
                                writer,
720
0
                            ));
721
                        }
722
3
                        other => {
723
3
                            return Some((
724
3
                                RetryResult::Retry(make_err!(
725
3
                                    Code::Unavailable,
726
3
                                    "Unhandled GetObjectError in S3: {other:?}",
727
3
                                )),
728
3
                                writer,
729
3
                            ));
730
                        }
731
                    },
732
                };
733
734
                // Copy data from s3 input stream to the writer stream.
735
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().
await2
{
  Branch (735:27): [True: 0, False: 0]
  Branch (735:27): [Folded - Ignored]
  Branch (735:27): [True: 4, False: 4]
736
4
                    match maybe_bytes {
737
4
                        Ok(bytes) => {
738
4
                            if bytes.is_empty() {
  Branch (738:32): [True: 0, False: 0]
  Branch (738:32): [Folded - Ignored]
  Branch (738:32): [True: 1, False: 3]
739
                                // Ignore possible EOF. Different implimentations of S3 may or may not
740
                                // send EOF this way.
741
1
                                continue;
742
3
                            }
743
3
                            if let Err(
e0
) = writer.send(bytes).
await0
{
  Branch (743:36): [True: 0, False: 0]
  Branch (743:36): [Folded - Ignored]
  Branch (743:36): [True: 0, False: 3]
744
0
                                return Some((
745
0
                                    RetryResult::Err(make_err!(
746
0
                                        Code::Aborted,
747
0
                                        "Error sending bytes to consumer in S3: {e}"
748
0
                                    )),
749
0
                                    writer,
750
0
                                ));
751
3
                            }
752
                        }
753
0
                        Err(e) => {
754
0
                            return Some((
755
0
                                RetryResult::Retry(make_err!(
756
0
                                    Code::Aborted,
757
0
                                    "Bad bytestream element in S3: {e}"
758
0
                                )),
759
0
                                writer,
760
0
                            ));
761
                        }
762
                    }
763
                }
764
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (764:24): [True: 0, False: 0]
  Branch (764:24): [Folded - Ignored]
  Branch (764:24): [True: 0, False: 4]
765
0
                    return Some((
766
0
                        RetryResult::Err(make_err!(
767
0
                            Code::Aborted,
768
0
                            "Failed to send EOF to consumer in S3: {e}"
769
0
                        )),
770
0
                        writer,
771
0
                    ));
772
4
                }
773
4
                Some((RetryResult::Ok(()), writer))
774
14
            }
))4
775
5
            .await
776
10
    }
777
778
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
779
0
        self
780
0
    }
781
782
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
783
0
        self
784
0
    }
785
786
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
787
0
        self
788
0
    }
789
}
790
791
#[async_trait]
792
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
793
where
794
    I: InstantWrapper,
795
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
796
{
797
0
    fn get_name(&self) -> &'static str {
798
0
        "S3Store"
799
0
    }
800
801
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
802
0
        StoreDriver::check_health(Pin::new(self), namespace).await
803
0
    }
804
}