Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp;
17
use std::future::Future;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::task::{Context, Poll};
21
use std::time::Duration;
22
23
use async_trait::async_trait;
24
use aws_config::default_provider::credentials;
25
use aws_config::provider_config::ProviderConfig;
26
use aws_config::retry::ErrorKind::TransientError;
27
use aws_config::{AppName, BehaviorVersion};
28
use aws_sdk_s3::Client;
29
use aws_sdk_s3::config::Region;
30
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
31
use aws_sdk_s3::operation::get_object::GetObjectError;
32
use aws_sdk_s3::operation::head_object::HeadObjectError;
33
use aws_sdk_s3::primitives::ByteStream; // SdkBody
34
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
35
use aws_smithy_runtime_api::client::http::{
36
    HttpClient as SmithyHttpClient, HttpConnector as SmithyHttpConnector, HttpConnectorFuture,
37
    HttpConnectorSettings, SharedHttpConnector,
38
};
39
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
40
use aws_smithy_runtime_api::client::result::ConnectorError;
41
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
42
use aws_smithy_runtime_api::http::Response;
43
use aws_smithy_types::body::SdkBody;
44
use bytes::{Bytes, BytesMut};
45
use futures::future::FusedFuture;
46
use futures::stream::{FuturesUnordered, unfold};
47
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
48
use http_body::{Frame, SizeHint};
49
use http_body_util::BodyExt;
50
use hyper::{Method, Request};
51
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
52
use hyper_util::client::legacy::Client as LegacyClient;
53
use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector;
54
use hyper_util::rt::TokioExecutor;
55
use nativelink_config::stores::S3Spec;
56
// Note: S3 store should be very careful about the error codes it returns
57
// when in a retryable wrapper. Always prefer Code::Aborted or another
58
// retryable code over Code::InvalidArgument or make_input_err!().
59
// ie: Don't import make_input_err!() to help prevent this.
60
use nativelink_error::{Code, Error, ResultExt, make_err};
61
use nativelink_metric::MetricsComponent;
62
use nativelink_util::buf_channel::{
63
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
64
};
65
use nativelink_util::fs;
66
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
67
use nativelink_util::instant_wrapper::InstantWrapper;
68
use nativelink_util::retry::{Retrier, RetryResult};
69
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
70
use rand::Rng;
71
use tokio::sync::mpsc;
72
use tokio::time::sleep;
73
use tracing::{Level, event};
74
75
use crate::cas_utils::is_zero_digest;
76
77
// S3 parts cannot be smaller than this number. See:
78
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
79
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
80
81
// S3 parts cannot be larger than this number. See:
82
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
83
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
84
85
// S3 parts cannot be more than this number. See:
86
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
87
const MAX_UPLOAD_PARTS: usize = 10_000;
88
89
// Default max buffer size for retrying upload requests.
90
// Note: If you change this, adjust the docs in the config.
91
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
92
93
// Default limit for concurrent part uploads per multipart upload.
94
// Note: If you change this, adjust the docs in the config.
95
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
96
97
#[derive(Clone)]
98
pub struct TlsClient {
99
    client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
100
    retrier: Retrier,
101
}
102
103
impl TlsClient {
104
    #[must_use]
105
0
    pub fn new(spec: &S3Spec, jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>) -> Self {
106
0
        let connector_with_roots = HttpsConnectorBuilder::new().with_webpki_roots();
107
108
0
        let connector_with_schemes = if spec.insecure_allow_http {
  Branch (108:41): [True: 0, False: 0]
  Branch (108:41): [Folded - Ignored]
109
0
            connector_with_roots.https_or_http()
110
        } else {
111
0
            connector_with_roots.https_only()
112
        };
113
114
0
        let connector = if spec.disable_http2 {
  Branch (114:28): [True: 0, False: 0]
  Branch (114:28): [Folded - Ignored]
115
0
            connector_with_schemes.enable_http1().build()
116
        } else {
117
0
            connector_with_schemes.enable_http1().enable_http2().build()
118
        };
119
120
0
        let client = LegacyClient::builder(TokioExecutor::new()).build(connector);
121
0
122
0
        Self {
123
0
            client,
124
0
            retrier: Retrier::new(
125
0
                Arc::new(|duration| Box::pin(sleep(duration))),
126
0
                jitter_fn,
127
0
                spec.retry.clone(),
128
0
            ),
129
0
        }
130
0
    }
131
}
132
133
impl std::fmt::Debug for TlsClient {
134
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
135
0
        f.debug_struct("TlsClient").finish_non_exhaustive()
136
0
    }
137
}
138
139
impl SmithyHttpClient for TlsClient {
140
0
    fn http_connector(
141
0
        &self,
142
0
        _settings: &HttpConnectorSettings,
143
0
        _components: &RuntimeComponents,
144
0
    ) -> SharedHttpConnector {
145
0
        SharedHttpConnector::new(self.clone())
146
0
    }
147
}
148
149
enum BufferedBodyState {
150
    Cloneable(SdkBody),
151
    Buffered(Bytes),
152
    Empty,
153
}
154
155
mod body_processing {
156
    use super::{BodyExt, BufferedBodyState, BytesMut, ConnectorError, SdkBody, TransientError};
157
158
    /// Buffer a request body fully into memory.
159
    ///
160
    /// TODO(aaronmondal): This could lead to OOMs in extremely constrained
161
    ///                    environments. Probably better to implement something
162
    ///                    like a rewindable stream logic.
163
    #[inline]
164
0
    pub(crate) async fn buffer_body(body: SdkBody) -> Result<BufferedBodyState, ConnectorError> {
165
0
        let mut bytes = BytesMut::new();
166
0
        let mut body_stream = body;
167
0
        while let Some(frame) = body_stream.frame().await {
  Branch (167:19): [True: 0, False: 0]
  Branch (167:19): [Folded - Ignored]
168
0
            match frame {
169
0
                Ok(frame) => {
170
0
                    if let Some(data) = frame.data_ref() {
  Branch (170:28): [True: 0, False: 0]
  Branch (170:28): [Folded - Ignored]
171
0
                        bytes.extend_from_slice(data);
172
0
                    }
173
                }
174
0
                Err(e) => {
175
0
                    return Err(ConnectorError::other(
176
0
                        format!("Failed to read request body: {e}").into(),
177
0
                        Some(TransientError),
178
0
                    ));
179
                }
180
            }
181
        }
182
183
0
        Ok(BufferedBodyState::Buffered(bytes.freeze()))
184
0
    }
185
}
186
187
struct RequestComponents {
188
    method: Method,
189
    uri: hyper::Uri,
190
    version: hyper::Version,
191
    headers: hyper::HeaderMap,
192
    body_data: BufferedBodyState,
193
}
194
195
mod conversions {
196
    use super::{
197
        BufferedBodyState, ConnectorError, Future, HttpRequest, Method, RequestComponents,
198
        Response, SdkBody, TransientError, body_processing,
199
    };
200
201
    pub(crate) trait RequestExt {
202
        fn into_components(self)
203
        -> impl Future<Output = Result<RequestComponents, ConnectorError>>;
204
    }
205
206
    impl RequestExt for HttpRequest {
207
0
        async fn into_components(self) -> Result<RequestComponents, ConnectorError> {
208
            // Note: This does *not* refer the the HTTP protocol, but to the
209
            //       version of the http crate.
210
0
            let hyper_req = self.try_into_http1x().map_err(|e| {
211
0
                ConnectorError::other(
212
0
                    format!("Failed to convert to HTTP request: {e}").into(),
213
0
                    Some(TransientError),
214
0
                )
215
0
            })?;
216
217
0
            let method = hyper_req.method().clone();
218
0
            let uri = hyper_req.uri().clone();
219
0
            let version = hyper_req.version();
220
0
            let headers = hyper_req.headers().clone();
221
0
222
0
            let body = hyper_req.into_body();
223
224
            // Only buffer bodies for methods likely to have payloads.
225
0
            let needs_buffering = matches!(method, Method::POST | Method::PUT);
226
227
            // Preserve the body in case we need to retry.
228
0
            let body_data = if needs_buffering {
  Branch (228:32): [True: 0, False: 0]
  Branch (228:32): [Folded - Ignored]
229
0
                if let Some(cloneable_body) = body.try_clone() {
  Branch (229:24): [True: 0, False: 0]
  Branch (229:24): [Folded - Ignored]
230
0
                    BufferedBodyState::Cloneable(cloneable_body)
231
                } else {
232
0
                    body_processing::buffer_body(body).await?
233
                }
234
            } else {
235
0
                BufferedBodyState::Empty
236
            };
237
238
0
            Ok(RequestComponents {
239
0
                method,
240
0
                uri,
241
0
                version,
242
0
                headers,
243
0
                body_data,
244
0
            })
245
0
        }
246
    }
247
248
    pub(crate) trait ResponseExt {
249
        fn into_smithy_response(self) -> Response<SdkBody>;
250
    }
251
252
    impl ResponseExt for hyper::Response<hyper::body::Incoming> {
253
0
        fn into_smithy_response(self) -> Response<SdkBody> {
254
0
            let (parts, body) = self.into_parts();
255
0
            let sdk_body = SdkBody::from_body_1_x(body);
256
0
            let mut smithy_resp = Response::new(parts.status.into(), sdk_body);
257
0
            let header_pairs: Vec<(String, String)> = parts
258
0
                .headers
259
0
                .iter()
260
0
                .filter_map(|(name, value)| {
261
0
                    value
262
0
                        .to_str()
263
0
                        .ok()
264
0
                        .map(|value_str| (name.as_str().to_owned(), value_str.to_owned()))
265
0
                })
266
0
                .collect();
267
268
0
            for (name, value) in header_pairs {
269
0
                smithy_resp.headers_mut().insert(name, value);
270
0
            }
271
272
0
            smithy_resp
273
0
        }
274
    }
275
}
276
277
struct RequestBuilder<'a> {
278
    components: &'a RequestComponents,
279
}
280
281
impl<'a> RequestBuilder<'a> {
282
    #[inline]
283
0
    const fn new(components: &'a RequestComponents) -> Self {
284
0
        Self { components }
285
0
    }
286
287
    #[inline]
288
    #[allow(unused_qualifications, reason = "false positive on hyper::http::Error")]
289
0
    fn build(&self) -> Result<Request<SdkBody>, hyper::http::Error> {
290
0
        let mut req_builder = Request::builder()
291
0
            .method(self.components.method.clone())
292
0
            .uri(self.components.uri.clone())
293
0
            .version(self.components.version);
294
0
295
0
        let headers_map = req_builder.headers_mut().unwrap();
296
0
        for (name, value) in &self.components.headers {
297
0
            headers_map.insert(name, value.clone());
298
0
        }
299
300
0
        match &self.components.body_data {
301
0
            BufferedBodyState::Cloneable(body) => {
302
0
                let cloned_body = body.try_clone().expect("Body should be cloneable");
303
0
                req_builder.body(cloned_body)
304
            }
305
0
            BufferedBodyState::Buffered(bytes) => req_builder.body(SdkBody::from(bytes.clone())),
306
0
            BufferedBodyState::Empty => req_builder.body(SdkBody::empty()),
307
        }
308
0
    }
309
}
310
311
mod execution {
312
    use super::conversions::ResponseExt;
313
    use super::{
314
        Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents,
315
        Response, RetryResult, SdkBody, fs, make_err,
316
    };
317
318
    #[inline]
319
0
    pub(crate) async fn execute_request(
320
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
321
0
        components: &RequestComponents,
322
0
    ) -> RetryResult<Response<SdkBody>> {
323
0
        let _permit = match fs::get_permit().await {
324
0
            Ok(permit) => permit,
325
0
            Err(e) => {
326
0
                return RetryResult::Retry(make_err!(
327
0
                    Code::Unavailable,
328
0
                    "Failed to acquire permit: {e}"
329
0
                ));
330
            }
331
        };
332
333
0
        let request = match RequestBuilder::new(components).build() {
334
0
            Ok(req) => req,
335
0
            Err(e) => {
336
0
                return RetryResult::Err(make_err!(
337
0
                    Code::Internal,
338
0
                    "Failed to create request: {e}",
339
0
                ));
340
            }
341
        };
342
343
0
        match client.request(request).await {
344
0
            Ok(resp) => RetryResult::Ok(resp.into_smithy_response()),
345
0
            Err(e) => RetryResult::Retry(make_err!(
346
0
                Code::Unavailable,
347
0
                "Failed request in S3Store: {e}"
348
0
            )),
349
        }
350
0
    }
351
352
    #[inline]
353
0
    pub(crate) fn create_retry_stream(
354
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
355
0
        components: RequestComponents,
356
0
    ) -> impl futures::Stream<Item = RetryResult<Response<SdkBody>>> {
357
0
        futures::stream::unfold(components, move |components| {
358
0
            let client_clone = client.clone();
359
0
            async move {
360
0
                let result = execute_request(client_clone, &components).await;
361
362
0
                Some((result, components))
363
0
            }
364
0
        })
365
0
    }
366
}
367
368
impl SmithyHttpConnector for TlsClient {
369
0
    fn call(&self, req: HttpRequest) -> HttpConnectorFuture {
370
        use conversions::RequestExt;
371
372
0
        let client = self.client.clone();
373
0
        let retrier = self.retrier.clone();
374
0
375
0
        HttpConnectorFuture::new(Box::pin(async move {
376
0
            let components = req.into_components().await?;
377
378
0
            let retry_stream = execution::create_retry_stream(client, components);
379
0
380
0
            match retrier.retry(retry_stream).await {
381
0
                Ok(response) => Ok(response),
382
0
                Err(e) => Err(ConnectorError::other(
383
0
                    format!("Connection failed after retries: {e}").into(),
384
0
                    Some(TransientError),
385
0
                )),
386
            }
387
0
        }))
388
0
    }
389
}
390
391
#[derive(Debug)]
392
pub struct BodyWrapper {
393
    reader: DropCloserReadHalf,
394
    size: u64,
395
}
396
397
impl http_body::Body for BodyWrapper {
398
    type Data = Bytes;
399
    type Error = std::io::Error;
400
401
76
    fn poll_frame(
402
76
        self: Pin<&mut Self>,
403
76
        cx: &mut Context<'_>,
404
76
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
405
76
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
406
76
        reader
407
76
            .poll_next(cx)
408
76
            .map(|maybe_bytes_res| 
maybe_bytes_res.map51
(|res|
res.map(Frame::data)50
))
409
76
    }
410
411
2
    fn size_hint(&self) -> SizeHint {
412
2
        SizeHint::with_exact(self.size)
413
2
    }
414
}
415
416
#[derive(Debug, MetricsComponent)]
417
pub struct S3Store<NowFn> {
418
    s3_client: Arc<Client>,
419
    now_fn: NowFn,
420
    #[metric(help = "The bucket name for the S3 store")]
421
    bucket: String,
422
    #[metric(help = "The key prefix for the S3 store")]
423
    key_prefix: String,
424
    retrier: Retrier,
425
    #[metric(help = "The number of seconds to consider an object expired")]
426
    consider_expired_after_s: i64,
427
    #[metric(help = "The number of bytes to buffer for retrying requests")]
428
    max_retry_buffer_per_request: usize,
429
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
430
    multipart_max_concurrent_uploads: usize,
431
}
432
433
impl<I, NowFn> S3Store<NowFn>
434
where
435
    I: InstantWrapper,
436
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
437
{
438
0
    pub async fn new(spec: &S3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
439
0
        let jitter_amt = spec.retry.jitter;
440
0
        let jitter_fn = Arc::new(move |delay: Duration| {
441
0
            if jitter_amt == 0. {
  Branch (441:16): [True: 0, False: 0]
  Branch (441:16): [Folded - Ignored]
442
0
                return delay;
443
0
            }
444
0
            let min = 1. - (jitter_amt / 2.);
445
0
            let max = 1. + (jitter_amt / 2.);
446
0
            delay.mul_f32(rand::rng().random_range(min..max))
447
0
        });
448
0
        let s3_client = {
449
0
            let http_client = TlsClient::new(&spec.clone(), jitter_fn.clone());
450
451
0
            let credential_provider = credentials::DefaultCredentialsChain::builder()
452
0
                .configure(
453
0
                    ProviderConfig::without_region()
454
0
                        .with_region(Some(Region::new(Cow::Owned(spec.region.clone()))))
455
0
                        .with_http_client(http_client.clone()),
456
0
                )
457
0
                .build()
458
0
                .await;
459
460
0
            let config = aws_config::defaults(BehaviorVersion::v2025_01_17())
461
0
                .credentials_provider(credential_provider)
462
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
463
0
                .timeout_config(
464
0
                    aws_config::timeout::TimeoutConfig::builder()
465
0
                        .connect_timeout(Duration::from_secs(15))
466
0
                        .build(),
467
0
                )
468
0
                .region(Region::new(Cow::Owned(spec.region.clone())))
469
0
                .http_client(http_client)
470
0
                .load()
471
0
                .await;
472
473
0
            Client::new(&config)
474
0
        };
475
0
        Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn)
476
0
    }
477
478
12
    pub fn new_with_client_and_jitter(
479
12
        spec: &S3Spec,
480
12
        s3_client: Client,
481
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
482
12
        now_fn: NowFn,
483
12
    ) -> Result<Arc<Self>, Error> {
484
12
        Ok(Arc::new(Self {
485
12
            s3_client: Arc::new(s3_client),
486
12
            now_fn,
487
12
            bucket: spec.bucket.to_string(),
488
12
            key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
489
12
            retrier: Retrier::new(
490
12
                Arc::new(|duration| 
Box::pin(sleep(duration))6
),
491
12
                jitter_fn,
492
12
                spec.retry.clone(),
493
12
            ),
494
12
            consider_expired_after_s: i64::from(spec.consider_expired_after_s),
495
12
            max_retry_buffer_per_request: spec
496
12
                .max_retry_buffer_per_request
497
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
498
12
            multipart_max_concurrent_uploads: spec
499
12
                .multipart_max_concurrent_uploads
500
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| 
v0
),
501
        }))
502
12
    }
503
504
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
505
14
        format!("{}{}", self.key_prefix, key.as_str(),)
506
14
    }
507
508
5
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
509
5
        self.retrier
510
8
            .retry(unfold((), move |state| async move {
511
8
                let result = self
512
8
                    .s3_client
513
8
                    .head_object()
514
8
                    .bucket(&self.bucket)
515
8
                    .key(self.make_s3_path(&digest.borrow()))
516
8
                    .send()
517
8
                    .await;
518
519
8
                match result {
520
4
                    Ok(head_object_output) => {
521
4
                        if self.consider_expired_after_s != 0 {
  Branch (521:28): [True: 0, False: 0]
  Branch (521:28): [Folded - Ignored]
  Branch (521:28): [True: 2, False: 2]
522
2
                            if let Some(last_modified) = head_object_output.last_modified {
  Branch (522:36): [True: 0, False: 0]
  Branch (522:36): [Folded - Ignored]
  Branch (522:36): [True: 2, False: 0]
523
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
524
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
  Branch (524:36): [True: 0, False: 0]
  Branch (524:36): [Folded - Ignored]
  Branch (524:36): [True: 1, False: 1]
525
1
                                    return Some((RetryResult::Ok(None), state));
526
1
                                }
527
0
                            }
528
2
                        }
529
3
                        let Some(length) = head_object_output.content_length else {
  Branch (529:29): [True: 0, False: 0]
  Branch (529:29): [Folded - Ignored]
  Branch (529:29): [True: 3, False: 0]
530
0
                            return Some((RetryResult::Ok(None), state));
531
                        };
532
3
                        if length >= 0 {
  Branch (532:28): [True: 0, False: 0]
  Branch (532:28): [Folded - Ignored]
  Branch (532:28): [True: 3, False: 0]
533
3
                            return Some((RetryResult::Ok(Some(length as u64)), state));
534
0
                        }
535
0
                        Some((
536
0
                            RetryResult::Err(make_err!(
537
0
                                Code::InvalidArgument,
538
0
                                "Negative content length in S3: {length:?}",
539
0
                            )),
540
0
                            state,
541
0
                        ))
542
                    }
543
4
                    Err(sdk_error) => match sdk_error.into_service_error() {
544
1
                        HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
545
3
                        other => Some((
546
3
                            RetryResult::Retry(make_err!(
547
3
                                Code::Unavailable,
548
3
                                "Unhandled HeadObjectError in S3: {other:?}"
549
3
                            )),
550
3
                            state,
551
3
                        )),
552
                    },
553
                }
554
16
            }))
555
5
            .await
556
5
    }
557
}
558
559
#[async_trait]
560
impl<I, NowFn> StoreDriver for S3Store<NowFn>
561
where
562
    I: InstantWrapper,
563
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
564
{
565
    async fn has_with_results(
566
        self: Pin<&Self>,
567
        keys: &[StoreKey<'_>],
568
        results: &mut [Option<u64>],
569
12
    ) -> Result<(), Error> {
570
6
        keys.iter()
571
6
            .zip(results.iter_mut())
572
6
            .map(|(key, result)| async move {
573
6
                // We need to do a special pass to ensure our zero key exist.
574
6
                if is_zero_digest(key.borrow()) {
  Branch (574:20): [True: 0, False: 0]
  Branch (574:20): [Folded - Ignored]
  Branch (574:20): [True: 1, False: 5]
575
1
                    *result = Some(0);
576
1
                    return Ok::<_, Error>(());
577
5
                }
578
5
                *result = self.has(key).await
?0
;
579
5
                Ok::<_, Error>(())
580
12
            })
581
6
            .collect::<FuturesUnordered<_>>()
582
6
            .try_collect()
583
6
            .await
584
12
    }
585
586
    async fn update(
587
        self: Pin<&Self>,
588
        digest: StoreKey<'_>,
589
        mut reader: DropCloserReadHalf,
590
        upload_size: UploadSizeInfo,
591
4
    ) -> Result<(), Error> {
592
2
        let s3_path = &self.make_s3_path(&digest.borrow());
593
594
2
        let max_size = match upload_size {
595
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
596
2
        };
597
2
598
2
        // Note(allada) It might be more optimal to use a different
599
2
        // heuristic here, but for simplicity we use a hard coded value.
600
2
        // Anything going down this if-statement will have the advantage of only
601
2
        // 1 network request for the upload instead of minimum of 3 required for
602
2
        // multipart upload requests.
603
2
        //
604
2
        // Note(allada) If the upload size is not known, we go down the multipart upload path.
605
2
        // This is not very efficient, but it greatly reduces the complexity of the code.
606
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
upload_size1
, UploadSizeInfo::ExactSize(_)) {
  Branch (606:12): [True: 0, False: 0]
  Branch (606:12): [Folded - Ignored]
  Branch (606:12): [True: 1, False: 1]
607
1
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
  Branch (607:17): [True: 0, False: 0]
  Branch (607:17): [Folded - Ignored]
  Branch (607:17): [True: 1, False: 0]
608
0
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
609
            };
610
1
            reader.set_max_recent_data_size(
611
1
                u64::try_from(self.max_retry_buffer_per_request)
612
1
                    .err_tip(|| 
"Could not convert max_retry_buffer_per_request to u64"0
)
?0
,
613
            );
614
1
            return self
615
1
                .retrier
616
1
                .retry(unfold(reader, move |mut reader| async move {
617
1
                    // We need to make a new pair here because the aws sdk does not give us
618
1
                    // back the body after we send it in order to retry.
619
1
                    let (mut tx, rx) = make_buf_channel_pair();
620
621
                    // Upload the data to the S3 backend.
622
1
                    let result = {
623
1
                        let reader_ref = &mut reader;
624
1
                        let (upload_res, bind_res) = tokio::join!(
625
1
                            self.s3_client
626
1
                                .put_object()
627
1
                                .bucket(&self.bucket)
628
1
                                .key(s3_path.clone())
629
1
                                .content_length(sz as i64)
630
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
631
1
                                    reader: rx,
632
1
                                    size: sz,
633
1
                                }))
634
1
                                .send()
635
1
                                .map_ok_or_else(|e| 
Err(make_err!(Code::Aborted, "{e:?}"))0
, |_| Ok(())),
636
                            // Stream all data from the reader channel to the writer channel.
637
1
                            tx.bind_buffered(reader_ref)
638
                        );
639
1
                        upload_res
640
1
                            .merge(bind_res)
641
1
                            .err_tip(|| 
"Failed to upload file to s3 in single chunk"0
)
642
                    };
643
644
                    // If we failed to upload the file, check to see if we can retry.
645
1
                    let retry_result = result.map_or_else(|mut err| {
646
0
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
647
0
                        err.code = Code::Aborted;
648
0
                        let bytes_received = reader.get_bytes_received();
649
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (649:32): [True: 0, False: 0]
  Branch (649:32): [Folded - Ignored]
  Branch (649:32): [True: 0, False: 0]
650
0
                            event!(
651
0
                                Level::ERROR,
652
                                ?bytes_received,
653
                                err = ?try_reset_err,
654
0
                                "Unable to reset stream after failed upload in S3Store::update"
655
                            );
656
0
                            return RetryResult::Err(err
657
0
                                .merge(try_reset_err)
658
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
659
0
                        }
660
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
661
0
                        event!(
662
0
                            Level::INFO,
663
                            ?err,
664
                            ?bytes_received,
665
0
                            "Retryable S3 error"
666
                        );
667
0
                        RetryResult::Retry(err)
668
1
                    
}0
, |()| RetryResult::Ok(()));
669
1
                    Some((retry_result, reader))
670
1
                }))
671
1
                .await;
672
1
        }
673
674
1
        let upload_id = &self
675
1
            .retrier
676
1
            .retry(unfold((), move |()| async move {
677
1
                let retry_result = self
678
1
                    .s3_client
679
1
                    .create_multipart_upload()
680
1
                    .bucket(&self.bucket)
681
1
                    .key(s3_path)
682
1
                    .send()
683
1
                    .await
684
1
                    .map_or_else(
685
0
                        |e| {
686
0
                            RetryResult::Retry(make_err!(
687
0
                                Code::Aborted,
688
0
                                "Failed to create multipart upload to s3: {e:?}"
689
0
                            ))
690
0
                        },
691
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
692
1
                            upload_id.map_or_else(
693
0
                                || {
694
0
                                    RetryResult::Err(make_err!(
695
0
                                        Code::Internal,
696
0
                                        "Expected upload_id to be set by s3 response"
697
0
                                    ))
698
0
                                },
699
                                RetryResult::Ok,
700
                            )
701
1
                        },
702
                    );
703
1
                Some((retry_result, ()))
704
1
            }))
705
1
            .await
?0
;
706
707
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
708
        // 5mb (except last part) and can have up to 10,000 parts.
709
1
        let bytes_per_upload_part =
710
1
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
711
1
712
1
        let upload_parts = move || async move {
713
1
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
714
1
            // bytes in memory at any given time waiting to be uploaded.
715
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
716
1
717
1
            let read_stream_fut = async move {
718
1
                let retrier = &Pin::get_ref(self).retrier;
719
                // Note: Our break condition is when we reach EOF.
720
4
                for part_number in 1..i32::MAX {
721
4
                    let write_buf = reader
722
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(
723
0
                            || "Could not convert bytes_per_upload_part to usize",
724
0
                        )?))
725
4
                        .await
726
4
                        .err_tip(|| 
"Failed to read chunk in s3_store"0
)
?0
;
727
4
                    if write_buf.is_empty() {
  Branch (727:24): [True: 0, False: 0]
  Branch (727:24): [Folded - Ignored]
  Branch (727:24): [True: 1, False: 3]
728
1
                        break; // Reached EOF.
729
3
                    }
730
3
731
3
                    tx.send(retrier.retry(unfold(write_buf, move |write_buf| {
732
3
                        async move {
733
3
                            let retry_result = self
734
3
                                .s3_client
735
3
                                .upload_part()
736
3
                                .bucket(&self.bucket)
737
3
                                .key(s3_path)
738
3
                                .upload_id(upload_id)
739
3
                                .body(ByteStream::new(SdkBody::from(write_buf.clone())))
740
3
                                .part_number(part_number)
741
3
                                .send()
742
3
                                .await
743
3
                                .map_or_else(
744
0
                                    |e| {
745
0
                                        RetryResult::Retry(make_err!(
746
0
                                            Code::Aborted,
747
0
                                            "Failed to upload part {part_number} in S3 store: {e:?}"
748
0
                                        ))
749
0
                                    },
750
3
                                    |mut response| {
751
3
                                        RetryResult::Ok(
752
3
                                            CompletedPartBuilder::default()
753
3
                                                // Only set an entity tag if it exists. This saves
754
3
                                                // 13 bytes per part on the final request if it can
755
3
                                                // omit the `<ETAG><ETAG/>` string.
756
3
                                                .set_e_tag(response.e_tag.take())
757
3
                                                .part_number(part_number)
758
3
                                                .build(),
759
3
                                        )
760
3
                                    },
761
                                );
762
3
                            Some((retry_result, write_buf))
763
3
                        }
764
3
                    })))
765
3
                    .await
766
3
                    .map_err(|_| {
767
0
                        make_err!(Code::Internal, "Failed to send part to channel in s3_store")
768
0
                    })?;
769
                }
770
1
                Result::<_, Error>::Ok(())
771
1
            }
772
1
            .fuse();
773
1
774
1
            let mut upload_futures = FuturesUnordered::new();
775
776
1
            let mut completed_parts = Vec::with_capacity(
777
1
                usize::try_from(cmp::min(
778
1
                    MAX_UPLOAD_PARTS as u64,
779
1
                    (max_size / bytes_per_upload_part) + 1,
780
1
                ))
781
1
                .err_tip(|| 
"Could not convert u64 to usize"0
)
?0
,
782
            );
783
1
            tokio::pin!(read_stream_fut);
784
            loop {
785
8
                if read_stream_fut.is_terminated() && 
rx.is_empty7
() &&
upload_futures.is_empty2
() {
  Branch (785:20): [True: 0, False: 0]
  Branch (785:55): [True: 0, False: 0]
  Branch (785:72): [True: 0, False: 0]
  Branch (785:20): [Folded - Ignored]
  Branch (785:55): [Folded - Ignored]
  Branch (785:72): [Folded - Ignored]
  Branch (785:20): [True: 7, False: 1]
  Branch (785:55): [True: 2, False: 5]
  Branch (785:72): [True: 1, False: 1]
786
1
                    break; // No more data to process.
787
7
                }
788
7
                tokio::select! {
789
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
790
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
791
7
                    Some(
fut3
) = rx.recv() =>
upload_futures.push(fut3
),
792
                }
793
            }
794
795
            // Even though the spec does not require parts to be sorted by number, we do it just in case
796
            // there's an S3 implementation that requires it.
797
4
            
completed_parts.sort_unstable_by_key1
(|part| part.part_number);
798
799
1
            self.retrier
800
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
801
1
                    Some((
802
1
                        self.s3_client
803
1
                            .complete_multipart_upload()
804
1
                            .bucket(&self.bucket)
805
1
                            .key(s3_path)
806
1
                            .multipart_upload(
807
1
                                CompletedMultipartUploadBuilder::default()
808
1
                                    .set_parts(Some(completed_parts.clone()))
809
1
                                    .build(),
810
1
                            )
811
1
                            .upload_id(upload_id)
812
1
                            .send()
813
1
                            .await
814
1
                            .map_or_else(
815
0
                                |e| {
816
0
                                    RetryResult::Retry(make_err!(
817
0
                                        Code::Aborted,
818
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
819
0
                                    ))
820
0
                                },
821
1
                                |_| RetryResult::Ok(()),
822
                            ),
823
1
                        completed_parts,
824
1
                    ))
825
1
                }))
826
1
                .await
827
2
        };
828
        // Upload our parts and complete the multipart upload.
829
        // If we fail attempt to abort the multipart upload (cleanup).
830
1
        upload_parts()
831
1
            .or_else(move |e| async move {
832
0
                Result::<(), _>::Err(e).merge(
833
                    // Note: We don't retry here because this is just a best attempt.
834
0
                    self.s3_client
835
0
                        .abort_multipart_upload()
836
0
                        .bucket(&self.bucket)
837
0
                        .key(s3_path)
838
0
                        .upload_id(upload_id)
839
0
                        .send()
840
0
                        .await
841
0
                        .map_or_else(
842
0
                            |e| {
843
0
                                let err = make_err!(
844
0
                                    Code::Aborted,
845
0
                                    "Failed to abort multipart upload in S3 store : {e:?}"
846
0
                                );
847
0
                                event!(Level::INFO, ?err, "Multipart upload error");
848
0
                                Err(err)
849
0
                            },
850
0
                            |_| Ok(()),
851
                        ),
852
                )
853
0
            })
854
1
            .await
855
4
    }
856
857
    async fn get_part(
858
        self: Pin<&Self>,
859
        key: StoreKey<'_>,
860
        writer: &mut DropCloserWriteHalf,
861
        offset: u64,
862
        length: Option<u64>,
863
10
    ) -> Result<(), Error> {
864
5
        if is_zero_digest(key.borrow()) {
  Branch (864:12): [True: 0, False: 0]
  Branch (864:12): [Folded - Ignored]
  Branch (864:12): [True: 1, False: 4]
865
1
            writer
866
1
                .send_eof()
867
1
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
868
1
            return Ok(());
869
4
        }
870
4
871
4
        let s3_path = &self.make_s3_path(&key);
872
4
        let end_read_byte = length
873
4
            .map_or(Some(None), |length| 
Some(offset.checked_add(length))2
)
874
4
            .err_tip(|| 
"Integer overflow protection triggered"0
)
?0
;
875
876
4
        self.retrier
877
7
            .retry(unfold(writer, move |writer| async move {
878
7
                let result = self
879
7
                    .s3_client
880
7
                    .get_object()
881
7
                    .bucket(&self.bucket)
882
7
                    .key(s3_path)
883
7
                    .range(format!(
884
7
                        "bytes={}-{}",
885
7
                        offset + writer.get_bytes_written(),
886
7
                        end_read_byte.map_or_else(String::new, |v| 
v.to_string()2
)
887
                    ))
888
7
                    .send()
889
7
                    .await;
890
891
7
                let 
mut s3_in_stream4
= match result {
892
4
                    Ok(head_object_output) => head_object_output.body,
893
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
894
0
                        GetObjectError::NoSuchKey(e) => {
895
0
                            return Some((
896
0
                                RetryResult::Err(make_err!(
897
0
                                    Code::NotFound,
898
0
                                    "No such key in S3: {e}"
899
0
                                )),
900
0
                                writer,
901
0
                            ));
902
                        }
903
3
                        other => {
904
3
                            return Some((
905
3
                                RetryResult::Retry(make_err!(
906
3
                                    Code::Unavailable,
907
3
                                    "Unhandled GetObjectError in S3: {other:?}",
908
3
                                )),
909
3
                                writer,
910
3
                            ));
911
                        }
912
                    },
913
                };
914
915
                // Copy data from s3 input stream to the writer stream.
916
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (916:27): [True: 0, False: 0]
  Branch (916:27): [Folded - Ignored]
  Branch (916:27): [True: 4, False: 4]
917
4
                    match maybe_bytes {
918
4
                        Ok(bytes) => {
919
4
                            if bytes.is_empty() {
  Branch (919:32): [True: 0, False: 0]
  Branch (919:32): [Folded - Ignored]
  Branch (919:32): [True: 1, False: 3]
920
                                // Ignore possible EOF. Different implimentations of S3 may or may not
921
                                // send EOF this way.
922
1
                                continue;
923
3
                            }
924
3
                            if let Err(
e0
) = writer.send(bytes).await {
  Branch (924:36): [True: 0, False: 0]
  Branch (924:36): [Folded - Ignored]
  Branch (924:36): [True: 0, False: 3]
925
0
                                return Some((
926
0
                                    RetryResult::Err(make_err!(
927
0
                                        Code::Aborted,
928
0
                                        "Error sending bytes to consumer in S3: {e}"
929
0
                                    )),
930
0
                                    writer,
931
0
                                ));
932
3
                            }
933
                        }
934
0
                        Err(e) => {
935
0
                            return Some((
936
0
                                RetryResult::Retry(make_err!(
937
0
                                    Code::Aborted,
938
0
                                    "Bad bytestream element in S3: {e}"
939
0
                                )),
940
0
                                writer,
941
0
                            ));
942
                        }
943
                    }
944
                }
945
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (945:24): [True: 0, False: 0]
  Branch (945:24): [Folded - Ignored]
  Branch (945:24): [True: 0, False: 4]
946
0
                    return Some((
947
0
                        RetryResult::Err(make_err!(
948
0
                            Code::Aborted,
949
0
                            "Failed to send EOF to consumer in S3: {e}"
950
0
                        )),
951
0
                        writer,
952
0
                    ));
953
4
                }
954
4
                Some((RetryResult::Ok(()), writer))
955
14
            }))
956
4
            .await
957
10
    }
958
959
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
960
0
        self
961
0
    }
962
963
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
964
0
        self
965
0
    }
966
967
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
968
0
        self
969
0
    }
970
971
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
972
0
        registry.register_indicator(self);
973
0
    }
974
}
975
976
#[async_trait]
977
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
978
where
979
    I: InstantWrapper,
980
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
981
{
982
0
    fn get_name(&self) -> &'static str {
983
0
        "S3Store"
984
0
    }
985
986
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
987
0
        StoreDriver::check_health(Pin::new(self), namespace).await
988
0
    }
989
}