Coverage Report

Created: 2025-10-17 16:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/s3_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::future::Future;
17
use core::pin::Pin;
18
use core::task::{Context, Poll};
19
use core::time::Duration;
20
use std::borrow::Cow;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::default_provider::credentials;
25
use aws_config::provider_config::ProviderConfig;
26
use aws_config::retry::ErrorKind::TransientError;
27
use aws_config::{AppName, BehaviorVersion};
28
use aws_sdk_s3::Client;
29
use aws_sdk_s3::config::Region;
30
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
31
use aws_sdk_s3::operation::get_object::GetObjectError;
32
use aws_sdk_s3::operation::head_object::HeadObjectError;
33
use aws_sdk_s3::primitives::ByteStream; // SdkBody
34
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
35
use aws_smithy_runtime_api::client::http::{
36
    HttpClient as SmithyHttpClient, HttpConnector as SmithyHttpConnector, HttpConnectorFuture,
37
    HttpConnectorSettings, SharedHttpConnector,
38
};
39
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
40
use aws_smithy_runtime_api::client::result::ConnectorError;
41
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
42
use aws_smithy_runtime_api::http::Response;
43
use aws_smithy_types::body::SdkBody;
44
use bytes::{Bytes, BytesMut};
45
use futures::future::FusedFuture;
46
use futures::stream::{FuturesUnordered, unfold};
47
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
48
use http_body::{Frame, SizeHint};
49
use http_body_util::BodyExt;
50
use hyper::{Method, Request};
51
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
52
use hyper_util::client::legacy::Client as LegacyClient;
53
use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector;
54
use hyper_util::rt::TokioExecutor;
55
use nativelink_config::stores::ExperimentalAwsSpec;
56
// Note: S3 store should be very careful about the error codes it returns
57
// when in a retryable wrapper. Always prefer Code::Aborted or another
58
// retryable code over Code::InvalidArgument or make_input_err!().
59
// ie: Don't import make_input_err!() to help prevent this.
60
use nativelink_error::{Code, Error, ResultExt, make_err};
61
use nativelink_metric::MetricsComponent;
62
use nativelink_util::buf_channel::{
63
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
64
};
65
use nativelink_util::fs;
66
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
67
use nativelink_util::instant_wrapper::InstantWrapper;
68
use nativelink_util::retry::{Retrier, RetryResult};
69
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
70
use parking_lot::Mutex;
71
use rand::Rng;
72
use tokio::sync::mpsc;
73
use tokio::time::sleep;
74
use tracing::{error, info};
75
76
use crate::cas_utils::is_zero_digest;
77
78
// S3 parts cannot be smaller than this number. See:
79
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
80
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
81
82
// S3 parts cannot be larger than this number. See:
83
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
84
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
85
86
// S3 parts cannot be more than this number. See:
87
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
88
const MAX_UPLOAD_PARTS: usize = 10_000;
89
90
// Default max buffer size for retrying upload requests.
91
// Note: If you change this, adjust the docs in the config.
92
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB.
93
94
// Default limit for concurrent part uploads per multipart upload.
95
// Note: If you change this, adjust the docs in the config.
96
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
97
98
#[derive(Clone)]
99
pub struct TlsClient {
100
    client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
101
    retrier: Retrier,
102
}
103
104
impl TlsClient {
105
    #[must_use]
106
0
    pub fn new(
107
0
        spec: &ExperimentalAwsSpec,
108
0
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
109
0
    ) -> Self {
110
0
        let connector_with_roots = HttpsConnectorBuilder::new().with_platform_verifier();
111
112
0
        let connector_with_schemes = if spec.common.insecure_allow_http {
  Branch (112:41): [True: 0, False: 0]
  Branch (112:41): [Folded - Ignored]
113
0
            connector_with_roots.https_or_http()
114
        } else {
115
0
            connector_with_roots.https_only()
116
        };
117
118
0
        let connector = if spec.common.disable_http2 {
  Branch (118:28): [True: 0, False: 0]
  Branch (118:28): [Folded - Ignored]
119
0
            connector_with_schemes.enable_http1().build()
120
        } else {
121
0
            connector_with_schemes.enable_http1().enable_http2().build()
122
        };
123
124
0
        let client = LegacyClient::builder(TokioExecutor::new()).build(connector);
125
126
        Self {
127
0
            client,
128
0
            retrier: Retrier::new(
129
0
                Arc::new(|duration| Box::pin(sleep(duration))),
130
0
                jitter_fn,
131
0
                spec.common.retry.clone(),
132
            ),
133
        }
134
0
    }
135
}
136
137
impl core::fmt::Debug for TlsClient {
138
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
139
0
        f.debug_struct("TlsClient").finish_non_exhaustive()
140
0
    }
141
}
142
143
impl SmithyHttpClient for TlsClient {
144
0
    fn http_connector(
145
0
        &self,
146
0
        _settings: &HttpConnectorSettings,
147
0
        _components: &RuntimeComponents,
148
0
    ) -> SharedHttpConnector {
149
0
        SharedHttpConnector::new(self.clone())
150
0
    }
151
}
152
153
enum BufferedBodyState {
154
    Cloneable(SdkBody),
155
    Buffered(Bytes),
156
    Empty,
157
}
158
159
mod body_processing {
160
    use super::{BodyExt, BufferedBodyState, BytesMut, ConnectorError, SdkBody, TransientError};
161
162
    /// Buffer a request body fully into memory.
163
    ///
164
    /// TODO(palfrey): This could lead to OOMs in extremely constrained
165
    ///                    environments. Probably better to implement something
166
    ///                    like a rewindable stream logic.
167
    #[inline]
168
0
    pub(crate) async fn buffer_body(body: SdkBody) -> Result<BufferedBodyState, ConnectorError> {
169
0
        let mut bytes = BytesMut::new();
170
0
        let mut body_stream = body;
171
0
        while let Some(frame) = body_stream.frame().await {
  Branch (171:19): [True: 0, False: 0]
  Branch (171:19): [Folded - Ignored]
172
0
            match frame {
173
0
                Ok(frame) => {
174
0
                    if let Some(data) = frame.data_ref() {
  Branch (174:28): [True: 0, False: 0]
  Branch (174:28): [Folded - Ignored]
175
0
                        bytes.extend_from_slice(data);
176
0
                    }
177
                }
178
0
                Err(e) => {
179
0
                    return Err(ConnectorError::other(
180
0
                        format!("Failed to read request body: {e}").into(),
181
0
                        Some(TransientError),
182
0
                    ));
183
                }
184
            }
185
        }
186
187
0
        Ok(BufferedBodyState::Buffered(bytes.freeze()))
188
0
    }
189
}
190
191
struct RequestComponents {
192
    method: Method,
193
    uri: hyper::Uri,
194
    version: hyper::Version,
195
    headers: hyper::HeaderMap,
196
    body_data: BufferedBodyState,
197
}
198
199
mod conversions {
200
    use super::{
201
        BufferedBodyState, ConnectorError, Future, HttpRequest, Method, RequestComponents,
202
        Response, SdkBody, TransientError, body_processing,
203
    };
204
205
    pub(crate) trait RequestExt {
206
        fn into_components(self)
207
        -> impl Future<Output = Result<RequestComponents, ConnectorError>>;
208
    }
209
210
    impl RequestExt for HttpRequest {
211
0
        async fn into_components(self) -> Result<RequestComponents, ConnectorError> {
212
            // Note: This does *not* refer the the HTTP protocol, but to the
213
            //       version of the http crate.
214
0
            let hyper_req = self.try_into_http1x().map_err(|e| {
215
0
                ConnectorError::other(
216
0
                    format!("Failed to convert to HTTP request: {e}").into(),
217
0
                    Some(TransientError),
218
                )
219
0
            })?;
220
221
0
            let method = hyper_req.method().clone();
222
0
            let uri = hyper_req.uri().clone();
223
0
            let version = hyper_req.version();
224
0
            let headers = hyper_req.headers().clone();
225
226
0
            let body = hyper_req.into_body();
227
228
            // Only buffer bodies for methods likely to have payloads.
229
0
            let needs_buffering = matches!(method, Method::POST | Method::PUT);
230
231
            // Preserve the body in case we need to retry.
232
0
            let body_data = if needs_buffering {
  Branch (232:32): [True: 0, False: 0]
  Branch (232:32): [Folded - Ignored]
233
0
                if let Some(cloneable_body) = body.try_clone() {
  Branch (233:24): [True: 0, False: 0]
  Branch (233:24): [Folded - Ignored]
234
0
                    BufferedBodyState::Cloneable(cloneable_body)
235
                } else {
236
0
                    body_processing::buffer_body(body).await?
237
                }
238
            } else {
239
0
                BufferedBodyState::Empty
240
            };
241
242
0
            Ok(RequestComponents {
243
0
                method,
244
0
                uri,
245
0
                version,
246
0
                headers,
247
0
                body_data,
248
0
            })
249
0
        }
250
    }
251
252
    pub(crate) trait ResponseExt {
253
        fn into_smithy_response(self) -> Response<SdkBody>;
254
    }
255
256
    impl ResponseExt for hyper::Response<hyper::body::Incoming> {
257
0
        fn into_smithy_response(self) -> Response<SdkBody> {
258
0
            let (parts, body) = self.into_parts();
259
0
            let sdk_body = SdkBody::from_body_1_x(body);
260
0
            let mut smithy_resp = Response::new(parts.status.into(), sdk_body);
261
0
            let header_pairs: Vec<(String, String)> = parts
262
0
                .headers
263
0
                .iter()
264
0
                .filter_map(|(name, value)| {
265
0
                    value
266
0
                        .to_str()
267
0
                        .ok()
268
0
                        .map(|value_str| (name.as_str().to_owned(), value_str.to_owned()))
269
0
                })
270
0
                .collect();
271
272
0
            for (name, value) in header_pairs {
273
0
                smithy_resp.headers_mut().insert(name, value);
274
0
            }
275
276
0
            smithy_resp
277
0
        }
278
    }
279
}
280
281
struct RequestBuilder<'a> {
282
    components: &'a RequestComponents,
283
}
284
285
impl<'a> RequestBuilder<'a> {
286
    #[inline]
287
0
    const fn new(components: &'a RequestComponents) -> Self {
288
0
        Self { components }
289
0
    }
290
291
    #[inline]
292
    #[allow(unused_qualifications, reason = "false positive on hyper::http::Error")]
293
0
    fn build(&self) -> Result<Request<SdkBody>, hyper::http::Error> {
294
0
        let mut req_builder = Request::builder()
295
0
            .method(self.components.method.clone())
296
0
            .uri(self.components.uri.clone())
297
0
            .version(self.components.version);
298
299
0
        let headers_map = req_builder.headers_mut().unwrap();
300
0
        for (name, value) in &self.components.headers {
301
0
            headers_map.insert(name, value.clone());
302
0
        }
303
304
0
        match &self.components.body_data {
305
0
            BufferedBodyState::Cloneable(body) => {
306
0
                let cloned_body = body.try_clone().expect("Body should be cloneable");
307
0
                req_builder.body(cloned_body)
308
            }
309
0
            BufferedBodyState::Buffered(bytes) => req_builder.body(SdkBody::from(bytes.clone())),
310
0
            BufferedBodyState::Empty => req_builder.body(SdkBody::empty()),
311
        }
312
0
    }
313
}
314
315
mod execution {
316
    use super::conversions::ResponseExt;
317
    use super::{
318
        Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents,
319
        Response, RetryResult, SdkBody, fs, make_err,
320
    };
321
322
    #[inline]
323
0
    pub(crate) async fn execute_request(
324
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
325
0
        components: &RequestComponents,
326
0
    ) -> RetryResult<Response<SdkBody>> {
327
0
        let _permit = match fs::get_permit().await {
328
0
            Ok(permit) => permit,
329
0
            Err(e) => {
330
0
                return RetryResult::Retry(make_err!(
331
0
                    Code::Unavailable,
332
0
                    "Failed to acquire permit: {e}"
333
0
                ));
334
            }
335
        };
336
337
0
        let request = match RequestBuilder::new(components).build() {
338
0
            Ok(req) => req,
339
0
            Err(e) => {
340
0
                return RetryResult::Err(make_err!(
341
0
                    Code::Internal,
342
0
                    "Failed to create request: {e}",
343
0
                ));
344
            }
345
        };
346
347
0
        match client.request(request).await {
348
0
            Ok(resp) => RetryResult::Ok(resp.into_smithy_response()),
349
0
            Err(e) => RetryResult::Retry(make_err!(
350
0
                Code::Unavailable,
351
0
                "Failed request in S3Store: {e}"
352
0
            )),
353
        }
354
0
    }
355
356
    #[inline]
357
0
    pub(crate) fn create_retry_stream(
358
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
359
0
        components: RequestComponents,
360
0
    ) -> impl futures::Stream<Item = RetryResult<Response<SdkBody>>> {
361
0
        futures::stream::unfold(components, move |components| {
362
0
            let client_clone = client.clone();
363
0
            async move {
364
0
                let result = execute_request(client_clone, &components).await;
365
366
0
                Some((result, components))
367
0
            }
368
0
        })
369
0
    }
370
}
371
372
impl SmithyHttpConnector for TlsClient {
373
0
    fn call(&self, req: HttpRequest) -> HttpConnectorFuture {
374
        use conversions::RequestExt;
375
376
0
        let client = self.client.clone();
377
0
        let retrier = self.retrier.clone();
378
379
0
        HttpConnectorFuture::new(Box::pin(async move {
380
0
            let components = req.into_components().await?;
381
382
0
            let retry_stream = execution::create_retry_stream(client, components);
383
384
0
            match retrier.retry(retry_stream).await {
385
0
                Ok(response) => Ok(response),
386
0
                Err(e) => Err(ConnectorError::other(
387
0
                    format!("Connection failed after retries: {e}").into(),
388
0
                    Some(TransientError),
389
0
                )),
390
            }
391
0
        }))
392
0
    }
393
}
394
395
#[derive(Debug)]
396
pub struct BodyWrapper {
397
    reader: DropCloserReadHalf,
398
    size: u64,
399
}
400
401
impl http_body::Body for BodyWrapper {
402
    type Data = Bytes;
403
    type Error = std::io::Error;
404
405
76
    fn poll_frame(
406
76
        self: Pin<&mut Self>,
407
76
        cx: &mut Context<'_>,
408
76
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
409
76
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
410
76
        reader
411
76
            .poll_next(cx)
412
76
            .map(|maybe_bytes_res| 
maybe_bytes_res51
.
map51
(|res|
res50
.
map50
(Frame::data)))
413
76
    }
414
415
2
    fn size_hint(&self) -> SizeHint {
416
2
        SizeHint::with_exact(self.size)
417
2
    }
418
}
419
420
#[derive(Debug, MetricsComponent)]
421
pub struct S3Store<NowFn> {
422
    s3_client: Arc<Client>,
423
    now_fn: NowFn,
424
    #[metric(help = "The bucket name for the S3 store")]
425
    bucket: String,
426
    #[metric(help = "The key prefix for the S3 store")]
427
    key_prefix: String,
428
    retrier: Retrier,
429
    #[metric(help = "The number of seconds to consider an object expired")]
430
    consider_expired_after_s: i64,
431
    #[metric(help = "The number of bytes to buffer for retrying requests")]
432
    max_retry_buffer_per_request: usize,
433
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
434
    multipart_max_concurrent_uploads: usize,
435
436
    remove_callbacks: Mutex<Vec<Arc<dyn RemoveItemCallback>>>,
437
}
438
439
impl<I, NowFn> S3Store<NowFn>
440
where
441
    I: InstantWrapper,
442
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
443
{
444
0
    pub async fn new(spec: &ExperimentalAwsSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
445
0
        let jitter_amt = spec.common.retry.jitter;
446
0
        let jitter_fn = Arc::new(move |delay: Duration| {
447
0
            if jitter_amt == 0. {
  Branch (447:16): [True: 0, False: 0]
  Branch (447:16): [Folded - Ignored]
448
0
                return delay;
449
0
            }
450
0
            delay.mul_f32(jitter_amt.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
451
0
        });
452
0
        let s3_client = {
453
0
            let http_client = TlsClient::new(&spec.clone(), jitter_fn.clone());
454
455
0
            let credential_provider = credentials::DefaultCredentialsChain::builder()
456
0
                .configure(
457
0
                    ProviderConfig::without_region()
458
0
                        .with_region(Some(Region::new(Cow::Owned(spec.region.clone()))))
459
0
                        .with_http_client(http_client.clone()),
460
0
                )
461
0
                .build()
462
0
                .await;
463
464
0
            let config = aws_config::defaults(BehaviorVersion::v2025_08_07())
465
0
                .credentials_provider(credential_provider)
466
0
                .app_name(AppName::new("nativelink").expect("valid app name"))
467
0
                .timeout_config(
468
0
                    aws_config::timeout::TimeoutConfig::builder()
469
0
                        .connect_timeout(Duration::from_secs(15))
470
0
                        .build(),
471
0
                )
472
0
                .region(Region::new(Cow::Owned(spec.region.clone())))
473
0
                .http_client(http_client)
474
0
                .load()
475
0
                .await;
476
477
0
            Client::new(&config)
478
        };
479
0
        Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn)
480
0
    }
481
482
12
    pub fn new_with_client_and_jitter(
483
12
        spec: &ExperimentalAwsSpec,
484
12
        s3_client: Client,
485
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
486
12
        now_fn: NowFn,
487
12
    ) -> Result<Arc<Self>, Error> {
488
12
        Ok(Arc::new(Self {
489
12
            s3_client: Arc::new(s3_client),
490
12
            now_fn,
491
12
            bucket: spec.bucket.to_string(),
492
12
            key_prefix: spec
493
12
                .common
494
12
                .key_prefix
495
12
                .as_ref()
496
12
                .unwrap_or(&String::new())
497
12
                .clone(),
498
12
            retrier: Retrier::new(
499
12
                Arc::new(|duration| 
Box::pin6
(
sleep6
(
duration6
))),
500
12
                jitter_fn,
501
12
                spec.common.retry.clone(),
502
            ),
503
12
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
504
12
            max_retry_buffer_per_request: spec
505
12
                .common
506
12
                .max_retry_buffer_per_request
507
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
508
12
            multipart_max_concurrent_uploads: spec
509
12
                .common
510
12
                .multipart_max_concurrent_uploads
511
12
                .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| v),
512
12
            remove_callbacks: Mutex::new(Vec::new()),
513
        }))
514
12
    }
515
516
14
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
517
14
        format!("{}{}", self.key_prefix, key.as_str(),)
518
14
    }
519
520
5
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
521
5
        let digest_clone = digest.into_owned();
522
5
        self.retrier
523
8
            .
retry5
(
unfold5
(
()5
, move |state| {
524
8
                let local_digest = digest_clone.clone();
525
8
                async move {
526
8
                    let result = self
527
8
                        .s3_client
528
8
                        .head_object()
529
8
                        .bucket(&self.bucket)
530
8
                        .key(self.make_s3_path(&local_digest))
531
8
                        .send()
532
8
                        .await;
533
534
8
                    match result {
535
4
                        Ok(head_object_output) => {
536
4
                            if self.consider_expired_after_s != 0 {
  Branch (536:32): [True: 0, False: 0]
  Branch (536:32): [Folded - Ignored]
  Branch (536:32): [True: 2, False: 2]
537
2
                                if let Some(last_modified) = head_object_output.last_modified {
  Branch (537:40): [True: 0, False: 0]
  Branch (537:40): [Folded - Ignored]
  Branch (537:40): [True: 2, False: 0]
538
2
                                    let now_s = (self.now_fn)().unix_timestamp() as i64;
539
2
                                    if last_modified.secs() + self.consider_expired_after_s <= now_s
  Branch (539:40): [True: 0, False: 0]
  Branch (539:40): [Folded - Ignored]
  Branch (539:40): [True: 1, False: 1]
540
                                    {
541
1
                                        let remove_callbacks = self.remove_callbacks.lock().clone();
542
1
                                        let mut callbacks: FuturesUnordered<_> = remove_callbacks
543
1
                                            .iter()
544
1
                                            .map(|callback| 
{0
545
0
                                                callback.callback(local_digest.borrow())
546
0
                                            })
547
1
                                            .collect();
548
1
                                        while callbacks.next().await.is_some() 
{}0
  Branch (548:47): [True: 0, False: 0]
  Branch (548:47): [Folded - Ignored]
  Branch (548:47): [True: 0, False: 1]
549
1
                                        return Some((RetryResult::Ok(None), state));
550
1
                                    }
551
0
                                }
552
2
                            }
553
3
                            let Some(length) = head_object_output.content_length else {
  Branch (553:33): [True: 0, False: 0]
  Branch (553:33): [Folded - Ignored]
  Branch (553:33): [True: 3, False: 0]
554
0
                                return Some((RetryResult::Ok(None), state));
555
                            };
556
3
                            if length >= 0 {
  Branch (556:32): [True: 0, False: 0]
  Branch (556:32): [Folded - Ignored]
  Branch (556:32): [True: 3, False: 0]
557
3
                                return Some((RetryResult::Ok(Some(length as u64)), state));
558
0
                            }
559
0
                            Some((
560
0
                                RetryResult::Err(make_err!(
561
0
                                    Code::InvalidArgument,
562
0
                                    "Negative content length in S3: {length:?}",
563
0
                                )),
564
0
                                state,
565
0
                            ))
566
                        }
567
4
                        Err(sdk_error) => match sdk_error.into_service_error() {
568
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
569
3
                            other => Some((
570
3
                                RetryResult::Retry(make_err!(
571
3
                                    Code::Unavailable,
572
3
                                    "Unhandled HeadObjectError in S3: {other:?}"
573
3
                                )),
574
3
                                state,
575
3
                            )),
576
                        },
577
                    }
578
8
                }
579
8
            }))
580
5
            .await
581
5
    }
582
}
583
584
#[async_trait]
585
impl<I, NowFn> StoreDriver for S3Store<NowFn>
586
where
587
    I: InstantWrapper,
588
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
589
{
590
    async fn has_with_results(
591
        self: Pin<&Self>,
592
        keys: &[StoreKey<'_>],
593
        results: &mut [Option<u64>],
594
6
    ) -> Result<(), Error> {
595
        keys.iter()
596
            .zip(results.iter_mut())
597
6
            .map(|(key, result)| async move {
598
                // We need to do a special pass to ensure our zero key exist.
599
6
                if is_zero_digest(key.borrow()) {
  Branch (599:20): [True: 0, False: 0]
  Branch (599:20): [Folded - Ignored]
  Branch (599:20): [True: 1, False: 5]
600
1
                    *result = Some(0);
601
1
                    return Ok::<_, Error>(());
602
5
                }
603
5
                *result = self.has(key.borrow()).await
?0
;
604
5
                Ok::<_, Error>(())
605
12
            })
606
            .collect::<FuturesUnordered<_>>()
607
            .try_collect()
608
            .await
609
6
    }
610
611
    async fn update(
612
        self: Pin<&Self>,
613
        digest: StoreKey<'_>,
614
        mut reader: DropCloserReadHalf,
615
        upload_size: UploadSizeInfo,
616
2
    ) -> Result<(), Error> {
617
        let s3_path = &self.make_s3_path(&digest);
618
619
        let max_size = match upload_size {
620
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
621
        };
622
623
        // Note(aaronmondal) It might be more optimal to use a different
624
        // heuristic here, but for simplicity we use a hard coded value.
625
        // Anything going down this if-statement will have the advantage of only
626
        // 1 network request for the upload instead of minimum of 3 required for
627
        // multipart upload requests.
628
        //
629
        // Note(aaronmondal) If the upload size is not known, we go down the multipart upload path.
630
        // This is not very efficient, but it greatly reduces the complexity of the code.
631
        if max_size < MIN_MULTIPART_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) {
632
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
633
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
634
            };
635
            reader.set_max_recent_data_size(
636
                u64::try_from(self.max_retry_buffer_per_request)
637
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
638
            );
639
            return self
640
                .retrier
641
1
                .retry(unfold(reader, move |mut reader| async move {
642
                    // We need to make a new pair here because the aws sdk does not give us
643
                    // back the body after we send it in order to retry.
644
1
                    let (mut tx, rx) = make_buf_channel_pair();
645
646
                    // Upload the data to the S3 backend.
647
1
                    let result = {
648
1
                        let reader_ref = &mut reader;
649
1
                        let (upload_res, bind_res) = tokio::join!(
650
1
                            self.s3_client
651
1
                                .put_object()
652
1
                                .bucket(&self.bucket)
653
1
                                .key(s3_path.clone())
654
1
                                .content_length(sz as i64)
655
1
                                .body(ByteStream::from_body_1_x(BodyWrapper {
656
1
                                    reader: rx,
657
1
                                    size: sz,
658
1
                                }))
659
1
                                .send()
660
1
                                .map_ok_or_else(|e| Err(
make_err!0
(
Code::Aborted0
, "{e:?}")), |_| Ok(())),
661
                            // Stream all data from the reader channel to the writer channel.
662
1
                            tx.bind_buffered(reader_ref)
663
                        );
664
1
                        upload_res
665
1
                            .merge(bind_res)
666
1
                            .err_tip(|| "Failed to upload file to s3 in single chunk")
667
                    };
668
669
                    // If we failed to upload the file, check to see if we can retry.
670
1
                    let retry_result = result.map_or_else(|mut err| 
{0
671
                        // Ensure our code is Code::Aborted, so the client can retry if possible.
672
0
                        err.code = Code::Aborted;
673
0
                        let bytes_received = reader.get_bytes_received();
674
0
                        if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (674:32): [True: 0, False: 0]
  Branch (674:32): [Folded - Ignored]
  Branch (674:32): [True: 0, False: 0]
675
0
                            error!(
676
                                ?bytes_received,
677
                                err = ?try_reset_err,
678
0
                                "Unable to reset stream after failed upload in S3Store::update"
679
                            );
680
0
                            return RetryResult::Err(err
681
0
                                .merge(try_reset_err)
682
0
                                .append(format!("Failed to retry upload with {bytes_received} bytes received in S3Store::update")));
683
0
                        }
684
0
                        let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in S3Store::update"));
685
0
                        info!(
686
                            ?err,
687
                            ?bytes_received,
688
0
                            "Retryable S3 error"
689
                        );
690
0
                        RetryResult::Retry(err)
691
1
                    
}0
, |()| RetryResult::Ok(()));
692
1
                    Some((retry_result, reader))
693
2
                }))
694
                .await;
695
        }
696
697
        let upload_id = &self
698
            .retrier
699
1
            .retry(unfold((), move |()| async move {
700
1
                let retry_result = self
701
1
                    .s3_client
702
1
                    .create_multipart_upload()
703
1
                    .bucket(&self.bucket)
704
1
                    .key(s3_path)
705
1
                    .send()
706
1
                    .await
707
1
                    .map_or_else(
708
0
                        |e| {
709
0
                            RetryResult::Retry(make_err!(
710
0
                                Code::Aborted,
711
0
                                "Failed to create multipart upload to s3: {e:?}"
712
0
                            ))
713
0
                        },
714
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
715
1
                            upload_id.map_or_else(
716
0
                                || {
717
0
                                    RetryResult::Err(make_err!(
718
0
                                        Code::Internal,
719
0
                                        "Expected upload_id to be set by s3 response"
720
0
                                    ))
721
0
                                },
722
                                RetryResult::Ok,
723
                            )
724
1
                        },
725
                    );
726
1
                Some((retry_result, ()))
727
2
            }))
728
            .await?;
729
730
        // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
731
        // 5mb (except last part) and can have up to 10,000 parts.
732
        let bytes_per_upload_part =
733
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
734
735
1
        let upload_parts = move || async move {
736
            // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
737
            // bytes in memory at any given time waiting to be uploaded.
738
1
            let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads);
739
740
1
            let read_stream_fut = async move {
741
1
                let retrier = &Pin::get_ref(self).retrier;
742
                // Note: Our break condition is when we reach EOF.
743
4
                for part_number in 1..i32::MAX {
744
4
                    let write_buf = reader
745
4
                        .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(
746
                            || "Could not convert bytes_per_upload_part to usize",
747
0
                        )?))
748
4
                        .await
749
4
                        .err_tip(|| "Failed to read chunk in s3_store")
?0
;
750
4
                    if write_buf.is_empty() {
  Branch (750:24): [True: 0, False: 0]
  Branch (750:24): [Folded - Ignored]
  Branch (750:24): [True: 1, False: 3]
751
1
                        break; // Reached EOF.
752
3
                    }
753
754
3
                    tx.send(retrier.retry(unfold(write_buf, move |write_buf| {
755
3
                        async move {
756
3
                            let retry_result = self
757
3
                                .s3_client
758
3
                                .upload_part()
759
3
                                .bucket(&self.bucket)
760
3
                                .key(s3_path)
761
3
                                .upload_id(upload_id)
762
3
                                .body(ByteStream::new(SdkBody::from(write_buf.clone())))
763
3
                                .part_number(part_number)
764
3
                                .send()
765
3
                                .await
766
3
                                .map_or_else(
767
0
                                    |e| {
768
0
                                        RetryResult::Retry(make_err!(
769
0
                                            Code::Aborted,
770
0
                                            "Failed to upload part {part_number} in S3 store: {e:?}"
771
0
                                        ))
772
0
                                    },
773
3
                                    |mut response| {
774
3
                                        RetryResult::Ok(
775
3
                                            CompletedPartBuilder::default()
776
3
                                                // Only set an entity tag if it exists. This saves
777
3
                                                // 13 bytes per part on the final request if it can
778
3
                                                // omit the `<ETAG><ETAG/>` string.
779
3
                                                .set_e_tag(response.e_tag.take())
780
3
                                                .part_number(part_number)
781
3
                                                .build(),
782
3
                                        )
783
3
                                    },
784
                                );
785
3
                            Some((retry_result, write_buf))
786
3
                        }
787
3
                    })))
788
3
                    .await
789
3
                    .map_err(|_| 
{0
790
0
                        make_err!(Code::Internal, "Failed to send part to channel in s3_store")
791
0
                    })?;
792
                }
793
1
                Result::<_, Error>::Ok(())
794
1
            }
795
1
            .fuse();
796
797
1
            let mut upload_futures = FuturesUnordered::new();
798
799
1
            let mut completed_parts = Vec::with_capacity(
800
1
                usize::try_from(cmp::min(
801
1
                    MAX_UPLOAD_PARTS as u64,
802
1
                    (max_size / bytes_per_upload_part) + 1,
803
                ))
804
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
805
            );
806
1
            tokio::pin!(read_stream_fut);
807
            loop {
808
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures3
.
is_empty3
() {
  Branch (808:20): [True: 0, False: 0]
  Branch (808:55): [True: 0, False: 0]
  Branch (808:72): [True: 0, False: 0]
  Branch (808:20): [Folded - Ignored]
  Branch (808:55): [Folded - Ignored]
  Branch (808:72): [Folded - Ignored]
  Branch (808:20): [True: 7, False: 1]
  Branch (808:55): [True: 3, False: 4]
  Branch (808:72): [True: 1, False: 2]
809
1
                    break; // No more data to process.
810
7
                }
811
7
                tokio::select! {
812
7
                    
result1
= &mut read_stream_fut =>
result1
?0
, // Return error or wait for other futures.
813
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
814
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
815
                }
816
            }
817
818
            // Even though the spec does not require parts to be sorted by number, we do it just in case
819
            // there's an S3 implementation that requires it.
820
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
821
822
1
            self.retrier
823
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
824
                    Some((
825
1
                        self.s3_client
826
1
                            .complete_multipart_upload()
827
1
                            .bucket(&self.bucket)
828
1
                            .key(s3_path)
829
1
                            .multipart_upload(
830
1
                                CompletedMultipartUploadBuilder::default()
831
1
                                    .set_parts(Some(completed_parts.clone()))
832
1
                                    .build(),
833
1
                            )
834
1
                            .upload_id(upload_id)
835
1
                            .send()
836
1
                            .await
837
1
                            .map_or_else(
838
0
                                |e| {
839
0
                                    RetryResult::Retry(make_err!(
840
0
                                        Code::Aborted,
841
0
                                        "Failed to complete multipart upload in S3 store: {e:?}"
842
0
                                    ))
843
0
                                },
844
1
                                |_| RetryResult::Ok(()),
845
                            ),
846
1
                        completed_parts,
847
                    ))
848
2
                }))
849
1
                .await
850
2
        };
851
        // Upload our parts and complete the multipart upload.
852
        // If we fail attempt to abort the multipart upload (cleanup).
853
        upload_parts()
854
0
            .or_else(move |e| async move {
855
0
                Result::<(), _>::Err(e).merge(
856
                    // Note: We don't retry here because this is just a best attempt.
857
0
                    self.s3_client
858
0
                        .abort_multipart_upload()
859
0
                        .bucket(&self.bucket)
860
0
                        .key(s3_path)
861
0
                        .upload_id(upload_id)
862
0
                        .send()
863
0
                        .await
864
0
                        .map_or_else(
865
0
                            |e| {
866
0
                                let err = make_err!(
867
0
                                    Code::Aborted,
868
                                    "Failed to abort multipart upload in S3 store : {e:?}"
869
                                );
870
0
                                info!(?err, "Multipart upload error");
871
0
                                Err(err)
872
0
                            },
873
0
                            |_| Ok(()),
874
                        ),
875
                )
876
0
            })
877
            .await
878
2
    }
879
880
    async fn get_part(
881
        self: Pin<&Self>,
882
        key: StoreKey<'_>,
883
        writer: &mut DropCloserWriteHalf,
884
        offset: u64,
885
        length: Option<u64>,
886
5
    ) -> Result<(), Error> {
887
        if is_zero_digest(key.borrow()) {
888
            writer
889
                .send_eof()
890
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
891
            return Ok(());
892
        }
893
894
        let s3_path = &self.make_s3_path(&key);
895
        let end_read_byte = length
896
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
897
            .err_tip(|| "Integer overflow protection triggered")?;
898
899
        self.retrier
900
7
            .retry(unfold(writer, move |writer| async move {
901
7
                let result = self
902
7
                    .s3_client
903
7
                    .get_object()
904
7
                    .bucket(&self.bucket)
905
7
                    .key(s3_path)
906
7
                    .range(format!(
907
7
                        "bytes={}-{}",
908
7
                        offset + writer.get_bytes_written(),
909
7
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
910
                    ))
911
7
                    .send()
912
7
                    .await;
913
914
7
                let 
mut s3_in_stream4
= match result {
915
4
                    Ok(head_object_output) => head_object_output.body,
916
3
                    Err(sdk_error) => match sdk_error.into_service_error() {
917
0
                        GetObjectError::NoSuchKey(e) => {
918
0
                            return Some((
919
0
                                RetryResult::Err(make_err!(
920
0
                                    Code::NotFound,
921
0
                                    "No such key in S3: {e}"
922
0
                                )),
923
0
                                writer,
924
0
                            ));
925
                        }
926
3
                        other => {
927
3
                            return Some((
928
3
                                RetryResult::Retry(make_err!(
929
3
                                    Code::Unavailable,
930
3
                                    "Unhandled GetObjectError in S3: {other:?}",
931
3
                                )),
932
3
                                writer,
933
3
                            ));
934
                        }
935
                    },
936
                };
937
938
                // Copy data from s3 input stream to the writer stream.
939
8
                while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (939:27): [True: 0, False: 0]
  Branch (939:27): [Folded - Ignored]
  Branch (939:27): [True: 4, False: 4]
940
4
                    match maybe_bytes {
941
4
                        Ok(bytes) => {
942
4
                            if bytes.is_empty() {
  Branch (942:32): [True: 0, False: 0]
  Branch (942:32): [Folded - Ignored]
  Branch (942:32): [True: 1, False: 3]
943
                                // Ignore possible EOF. Different implementations of S3 may or may not
944
                                // send EOF this way.
945
1
                                continue;
946
3
                            }
947
3
                            if let Err(
e0
) = writer.send(bytes).await {
  Branch (947:36): [True: 0, False: 0]
  Branch (947:36): [Folded - Ignored]
  Branch (947:36): [True: 0, False: 3]
948
0
                                return Some((
949
0
                                    RetryResult::Err(make_err!(
950
0
                                        Code::Aborted,
951
0
                                        "Error sending bytes to consumer in S3: {e}"
952
0
                                    )),
953
0
                                    writer,
954
0
                                ));
955
3
                            }
956
                        }
957
0
                        Err(e) => {
958
0
                            return Some((
959
0
                                RetryResult::Retry(make_err!(
960
0
                                    Code::Aborted,
961
0
                                    "Bad bytestream element in S3: {e}"
962
0
                                )),
963
0
                                writer,
964
0
                            ));
965
                        }
966
                    }
967
                }
968
4
                if let Err(
e0
) = writer.send_eof() {
  Branch (968:24): [True: 0, False: 0]
  Branch (968:24): [Folded - Ignored]
  Branch (968:24): [True: 0, False: 4]
969
0
                    return Some((
970
0
                        RetryResult::Err(make_err!(
971
0
                            Code::Aborted,
972
0
                            "Failed to send EOF to consumer in S3: {e}"
973
0
                        )),
974
0
                        writer,
975
0
                    ));
976
4
                }
977
4
                Some((RetryResult::Ok(()), writer))
978
14
            }))
979
            .await
980
5
    }
981
982
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
983
0
        self
984
0
    }
985
986
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
987
0
        self
988
0
    }
989
990
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
991
0
        self
992
0
    }
993
994
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
995
0
        registry.register_indicator(self);
996
0
    }
997
998
0
    fn register_remove_callback(
999
0
        self: Arc<Self>,
1000
0
        callback: Arc<dyn RemoveItemCallback>,
1001
0
    ) -> Result<(), Error> {
1002
0
        self.remove_callbacks.lock().push(callback);
1003
0
        Ok(())
1004
0
    }
1005
}
1006
1007
#[async_trait]
1008
impl<I, NowFn> HealthStatusIndicator for S3Store<NowFn>
1009
where
1010
    I: InstantWrapper,
1011
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
1012
{
1013
0
    fn get_name(&self) -> &'static str {
1014
0
        "S3Store"
1015
0
    }
1016
1017
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1018
        StoreDriver::check_health(Pin::new(self), namespace).await
1019
0
    }
1020
}