Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::fs::File;
20
use std::io::BufReader;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
30
use aws_sdk_s3::operation::get_object::GetObjectError;
31
use aws_sdk_s3::operation::head_object::HeadObjectError;
32
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
33
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
34
use base64::Engine;
35
use base64::prelude::BASE64_STANDARD_NO_PAD;
36
use bytes::BytesMut;
37
use futures::future::{Either, FusedFuture};
38
use futures::stream::{FuturesUnordered, unfold};
39
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
40
use hyper_rustls::ConfigBuilderExt;
41
use nativelink_config::stores::ExperimentalOntapS3Spec;
42
use nativelink_error::{Code, Error, ResultExt, make_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{
45
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
46
};
47
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
48
use nativelink_util::instant_wrapper::InstantWrapper;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
51
use parking_lot::Mutex;
52
use rustls::{ClientConfig, RootCertStore};
53
use rustls_pemfile::certs as extract_certs;
54
use sha2::{Digest, Sha256};
55
use tokio::time::sleep;
56
use tracing::{Level, event, warn};
57
58
use crate::cas_utils::is_zero_digest;
59
use crate::common_s3_utils::TlsClient;
60
61
// S3 parts cannot be smaller than this number
62
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB
63
64
// S3 parts cannot be larger than this number
65
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
66
67
// S3 parts cannot be more than this number
68
const MAX_UPLOAD_PARTS: usize = 10_000;
69
70
// Default max buffer size for retrying upload requests
71
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 20 * 1024 * 1024; // 20MB
72
73
// Default limit for concurrent part uploads per multipart upload
74
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
75
76
type RemoveCallback = Arc<dyn RemoveItemCallback>;
77
78
#[derive(Debug, MetricsComponent)]
79
pub struct OntapS3Store<NowFn> {
80
    s3_client: Arc<Client>,
81
    now_fn: NowFn,
82
    #[metric(help = "The bucket name for the ONTAP S3 store")]
83
    bucket: String,
84
    #[metric(help = "The key prefix for the ONTAP S3 store")]
85
    key_prefix: String,
86
    retrier: Retrier,
87
    #[metric(help = "The number of seconds to consider an object expired")]
88
    consider_expired_after_s: i64,
89
    #[metric(help = "The number of bytes to buffer for retrying requests")]
90
    max_retry_buffer_per_request: usize,
91
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
92
    multipart_max_concurrent_uploads: usize,
93
94
    remove_callbacks: Mutex<Vec<RemoveCallback>>,
95
}
96
97
0
pub fn load_custom_certs(cert_path: &str) -> Result<Arc<ClientConfig>, Error> {
98
0
    let mut root_store = RootCertStore::empty();
99
100
    // Create a BufReader from the cert file
101
0
    let mut cert_reader = BufReader::new(
102
0
        File::open(cert_path)
103
0
            .map_err(|e| make_err!(Code::Internal, "Failed to open CA certificate file: {e:?}"))?,
104
    );
105
106
    // Parse certificates
107
0
    let certs = extract_certs(&mut cert_reader)
108
0
        .collect::<Result<Vec<_>, _>>()
109
0
        .map_err(|e| make_err!(Code::Internal, "Failed to parse certificates: {e:?}"))?;
110
111
    // Add each certificate to the root store
112
0
    for cert in certs {
113
0
        root_store.add(cert).map_err(|e| {
114
0
            make_err!(
115
0
                Code::Internal,
116
                "Failed to add certificate to root store: {e:?}"
117
            )
118
0
        })?;
119
    }
120
121
    // Build the client config with the root store
122
0
    let config = ClientConfig::builder()
123
0
        .with_root_certificates(root_store)
124
0
        .with_no_client_auth();
125
126
0
    Ok(Arc::new(config))
127
0
}
128
129
impl<I, NowFn> OntapS3Store<NowFn>
130
where
131
    I: InstantWrapper,
132
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
133
{
134
3
    pub async fn new(spec: &ExperimentalOntapS3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
135
        // Load custom CA config
136
3
        let ca_config = if let Some(
cert_path0
) = &spec.root_certificates {
  Branch (136:32): [True: 0, False: 3]
  Branch (136:32): [Folded - Ignored]
137
0
            load_custom_certs(cert_path)?
138
        } else {
139
3
            Arc::new(
140
3
                ClientConfig::builder()
141
3
                    .with_native_roots()
?0
142
3
                    .with_no_client_auth(),
143
            )
144
        };
145
146
3
        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
147
3
            .with_tls_config((*ca_config).clone())
148
3
            .https_only()
149
3
            .enable_http1()
150
3
            .enable_http2()
151
3
            .build();
152
153
3
        let http_client = TlsClient::with_https_connector(&spec.common, https_connector);
154
155
3
        let credentials_provider = DefaultCredentialsChain::builder()
156
3
            .configure(
157
3
                ProviderConfig::without_region()
158
3
                    .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
159
3
                    .with_http_client(http_client.clone()),
160
3
            )
161
3
            .build()
162
3
            .await;
163
164
3
        let config = aws_sdk_s3::Config::builder()
165
3
            .credentials_provider(credentials_provider)
166
3
            .endpoint_url(&spec.endpoint)
167
3
            .region(Region::new(spec.vserver_name.clone()))
168
3
            .app_name(aws_config::AppName::new("nativelink").expect("valid app name"))
169
3
            .http_client(http_client)
170
3
            .force_path_style(true)
171
3
            .behavior_version(BehaviorVersion::v2025_08_07())
172
3
            .timeout_config(
173
3
                aws_config::timeout::TimeoutConfig::builder()
174
3
                    .connect_timeout(Duration::from_secs(30))
175
3
                    .operation_timeout(Duration::from_secs(120))
176
3
                    .build(),
177
            )
178
3
            .build();
179
180
3
        let s3_client = Client::from_conf(config);
181
182
3
        Self::new_with_client_and_jitter(
183
3
            spec,
184
3
            s3_client,
185
3
            spec.common.retry.make_jitter_fn(),
186
3
            now_fn,
187
        )
188
3
    }
189
190
15
    pub fn new_with_client_and_jitter(
191
15
        spec: &ExperimentalOntapS3Spec,
192
15
        s3_client: Client,
193
15
        jitter_fn: Arc<dyn (Fn(Duration) -> Duration) + Send + Sync>,
194
15
        now_fn: NowFn,
195
15
    ) -> Result<Arc<Self>, Error> {
196
15
        Ok(Arc::new(Self {
197
15
            s3_client: Arc::new(s3_client),
198
15
            now_fn,
199
15
            bucket: spec.bucket.clone(),
200
15
            key_prefix: spec
201
15
                .common
202
15
                .key_prefix
203
15
                .as_ref()
204
15
                .unwrap_or(&String::new())
205
15
                .clone(),
206
15
            retrier: Retrier::new(
207
15
                Arc::new(|duration| 
Box::pin3
(
sleep3
(
duration3
))),
208
15
                jitter_fn,
209
15
                spec.common.retry.clone(),
210
            ),
211
15
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
212
15
            max_retry_buffer_per_request: spec
213
15
                .common
214
15
                .max_retry_buffer_per_request
215
15
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
216
15
            multipart_max_concurrent_uploads: spec
217
15
                .common
218
15
                .multipart_max_concurrent_uploads
219
15
                .unwrap_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS),
220
15
            remove_callbacks: Mutex::new(vec![]),
221
        }))
222
15
    }
223
224
17
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
225
17
        format!("{}{}", self.key_prefix, key.as_str())
226
17
    }
227
228
9
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
229
9
        let digest_clone = digest.into_owned();
230
9
        self.retrier
231
12
            .
retry9
(
unfold9
(
()9
, move |state| {
232
12
                let local_digest = digest_clone.clone();
233
12
                async move {
234
12
                    let result = self
235
12
                        .s3_client
236
12
                        .head_object()
237
12
                        .bucket(&self.bucket)
238
12
                        .key(self.make_s3_path(&local_digest))
239
12
                        .send()
240
12
                        .await;
241
242
12
                    match result {
243
8
                        Ok(head_object_output) => {
244
8
                            if self.consider_expired_after_s != 0 {
  Branch (244:32): [True: 0, False: 0]
  Branch (244:32): [Folded - Ignored]
  Branch (244:32): [True: 0, False: 5]
  Branch (244:32): [True: 2, False: 1]
245
2
                                if let Some(last_modified) = head_object_output.last_modified {
  Branch (245:40): [True: 0, False: 0]
  Branch (245:40): [Folded - Ignored]
  Branch (245:40): [True: 0, False: 0]
  Branch (245:40): [True: 2, False: 0]
246
2
                                    let now_s = (self.now_fn)().unix_timestamp() as i64;
247
2
                                    if last_modified.secs() + self.consider_expired_after_s <= now_s
  Branch (247:40): [True: 0, False: 0]
  Branch (247:40): [Folded - Ignored]
  Branch (247:40): [True: 0, False: 0]
  Branch (247:40): [True: 1, False: 1]
248
                                    {
249
1
                                        let remove_callbacks = self.remove_callbacks.lock().clone();
250
1
                                        let mut callbacks: FuturesUnordered<_> = remove_callbacks
251
1
                                            .into_iter()
252
1
                                            .map(|callback| 
{0
253
0
                                                let store_key = local_digest.borrow();
254
0
                                                async move { callback.callback(store_key).await }
255
0
                                            })
256
1
                                            .collect();
257
1
                                        while callbacks.next().await.is_some() 
{}0
  Branch (257:47): [True: 0, False: 0]
  Branch (257:47): [Folded - Ignored]
  Branch (257:47): [True: 0, False: 0]
  Branch (257:47): [True: 0, False: 1]
258
1
                                        return Some((RetryResult::Ok(None), state));
259
1
                                    }
260
0
                                }
261
6
                            }
262
7
                            let Some(length) = head_object_output.content_length else {
  Branch (262:33): [True: 0, False: 0]
  Branch (262:33): [Folded - Ignored]
  Branch (262:33): [True: 5, False: 0]
  Branch (262:33): [True: 2, False: 0]
263
0
                                return Some((RetryResult::Ok(None), state));
264
                            };
265
7
                            if length >= 0 {
  Branch (265:32): [True: 0, False: 0]
  Branch (265:32): [Folded - Ignored]
  Branch (265:32): [True: 5, False: 0]
  Branch (265:32): [True: 2, False: 0]
266
7
                                return Some((RetryResult::Ok(Some(length as u64)), state));
267
0
                            }
268
0
                            Some((
269
0
                                RetryResult::Err(make_err!(
270
0
                                    Code::InvalidArgument,
271
0
                                    "Negative content length in ONTAP S3: {length:?}"
272
0
                                )),
273
0
                                state,
274
0
                            ))
275
                        }
276
4
                        Err(sdk_error) => match sdk_error.into_service_error() {
277
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
278
3
                            other => Some((
279
3
                                RetryResult::Retry(make_err!(
280
3
                                    Code::Unavailable,
281
3
                                    "Unhandled HeadObjectError in ONTAP S3: {other:?}"
282
3
                                )),
283
3
                                state,
284
3
                            )),
285
                        },
286
                    }
287
12
                }
288
12
            }))
289
9
            .await
290
9
    }
291
}
292
293
#[async_trait]
294
impl<I, NowFn> StoreDriver for OntapS3Store<NowFn>
295
where
296
    I: InstantWrapper,
297
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
298
{
299
    async fn has_with_results(
300
        self: Pin<&Self>,
301
        keys: &[StoreKey<'_>],
302
        results: &mut [Option<u64>],
303
10
    ) -> Result<(), Error> {
304
        keys.iter()
305
            .zip(results.iter_mut())
306
10
            .map(|(key, result)| async move {
307
10
                if is_zero_digest(key.borrow()) {
  Branch (307:20): [True: 0, False: 0]
  Branch (307:20): [Folded - Ignored]
  Branch (307:20): [True: 0, False: 5]
  Branch (307:20): [True: 1, False: 4]
308
1
                    *result = Some(0);
309
1
                    return Ok::<_, Error>(());
310
9
                }
311
312
9
                match self.has(key.borrow()).await {
313
9
                    Ok(size) => {
314
9
                        *result = size;
315
9
                        if size.is_none() {
  Branch (315:28): [True: 0, False: 0]
  Branch (315:28): [Folded - Ignored]
  Branch (315:28): [True: 0, False: 5]
  Branch (315:28): [True: 2, False: 2]
316
2
                            event!(
317
2
                                Level::INFO,
318
2
                                key = %key.as_str(),
319
2
                                "Object not found in ONTAP S3"
320
                            );
321
7
                        }
322
9
                        Ok(())
323
                    }
324
0
                    Err(err) => {
325
0
                        event!(
326
0
                            Level::ERROR,
327
0
                            key = %key.as_str(),
328
                            error = ?err,
329
0
                            "Error checking object existence"
330
                        );
331
0
                        Err(err)
332
                    }
333
                }
334
20
            })
335
            .collect::<FuturesUnordered<_>>()
336
            .try_collect()
337
            .await
338
10
    }
339
340
    async fn update(
341
        self: Pin<&Self>,
342
        key: StoreKey<'_>,
343
        mut reader: DropCloserReadHalf,
344
        size_info: UploadSizeInfo,
345
2
    ) -> Result<(), Error> {
346
        let s3_path = &self.make_s3_path(&key);
347
348
        let max_size = match size_info {
349
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
350
        };
351
352
        // For small files, use simple upload. For larger files or unknown size, use multipart
353
        if max_size < MIN_MULTIPART_SIZE && matches!(size_info, UploadSizeInfo::ExactSize(_)) {
354
            let UploadSizeInfo::ExactSize(sz) = size_info else {
355
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
356
            };
357
            reader.set_max_recent_data_size(
358
                u64::try_from(self.max_retry_buffer_per_request)
359
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
360
            );
361
            return self.retrier.retry(
362
1
                unfold(reader, move |mut reader| async move {
363
1
                    let (mut tx, mut rx) = make_buf_channel_pair();
364
365
1
                    let result = {
366
1
                        let reader_ref = &mut reader;
367
1
                        let (upload_res, bind_res): (Result<(), Error>, Result<(), Error>) = tokio::join!(async move {
368
1
                            let raw_body_bytes = {
369
1
                                let mut raw_body_chunks = BytesMut::new();
370
                                loop {
371
51
                                    match rx.recv().await {
372
51
                                        Ok(chunk) => {
373
51
                                            if chunk.is_empty() {
  Branch (373:48): [True: 0, False: 0]
  Branch (373:48): [Folded - Ignored]
  Branch (373:48): [True: 0, False: 0]
  Branch (373:48): [True: 1, False: 50]
374
1
                                                break Ok(raw_body_chunks.freeze());
375
50
                                            }
376
50
                                            raw_body_chunks.extend_from_slice(&chunk);
377
                                        }
378
0
                                        Err(err) => {
379
0
                                            break Err(err);
380
                                        }
381
                                    }
382
                                }
383
                            };
384
1
                            let internal_res = match raw_body_bytes {
385
1
                                Ok(body_bytes) => {
386
1
                                    let hash = Sha256::digest(&body_bytes);
387
1
                                    let send_res = self.s3_client
388
1
                                        .put_object()
389
1
                                        .bucket(&self.bucket)
390
1
                                        .key(s3_path.clone())
391
1
                                        .content_length(sz as i64)
392
1
                                        .body(
393
1
                                            ByteStream::from(body_bytes)
394
                                        )
395
1
                                        .set_checksum_algorithm(Some(aws_sdk_s3::types::ChecksumAlgorithm::Sha256))
396
1
                                        .set_checksum_sha256(Some(BASE64_STANDARD_NO_PAD.encode(hash)))
397
1
                                        .customize()
398
1
                                        .mutate_request(|req| {req.headers_mut().insert("x-amz-content-sha256", "UNSIGNED-PAYLOAD");})
399
1
                                        .send();
400
1
                                    Either::Left(send_res.map_ok_or_else(|e| Err(
make_err!0
(
Code::Aborted0
, "{e:?}")), |_| Ok(())))
401
                                    }
402
0
                                Err(collect_err) => {
403
0
                                    async fn make_collect_err(collect_err: Error) -> Result<(), Error> {
404
0
                                        Err(collect_err)
405
0
                                    }
406
407
0
                                    warn!(
408
                                        ?collect_err,
409
0
                                        "Failed to get body");
410
0
                                    let future_err = make_collect_err(collect_err);
411
0
                                    Either::Right(future_err)
412
                                }
413
                            };
414
1
                            internal_res.await
415
1
                        },
416
1
                            tx.bind_buffered(reader_ref)
417
                        );
418
1
                        upload_res
419
1
                            .merge(bind_res)
420
1
                            .err_tip(|| "Failed to upload file to ONTAP S3 in single chunk")
421
                    };
422
423
1
                    let retry_result = result.map_or_else(
424
0
                        |mut err| {
425
0
                            err.code = Code::Aborted;
426
0
                            let bytes_received = reader.get_bytes_received();
427
0
                            if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (427:36): [True: 0, False: 0]
  Branch (427:36): [Folded - Ignored]
  Branch (427:36): [True: 0, False: 0]
  Branch (427:36): [True: 0, False: 0]
428
0
                                event!(
429
0
                                    Level::ERROR,
430
                                    ?bytes_received,
431
                                    err = ?try_reset_err,
432
0
                                    "Unable to reset stream after failed upload in OntapS3Store::update"
433
                                );
434
0
                                return RetryResult::Err(
435
0
                                    err
436
0
                                        .merge(try_reset_err)
437
0
                                        .append(
438
0
                                            format!(
439
0
                                                "Failed to retry upload with {bytes_received} bytes received in OntapS3Store::update"
440
0
                                            )
441
0
                                        )
442
0
                                );
443
0
                            }
444
0
                            let err = err.append(
445
0
                                format!(
446
0
                                    "Retry on upload happened with {bytes_received} bytes received in OntapS3Store::update"
447
                                )
448
                            );
449
0
                            event!(Level::INFO, ?err, ?bytes_received, "Retryable ONTAP S3 error");
450
0
                            RetryResult::Retry(err)
451
0
                        },
452
1
                        |()| RetryResult::Ok(())
453
                    );
454
1
                    Some((retry_result, reader))
455
2
                })
456
            ).await;
457
        }
458
459
        // Handle multipart upload for large files
460
        let upload_id = &self
461
            .retrier
462
1
            .retry(unfold((), move |()| async move {
463
1
                let retry_result = self
464
1
                    .s3_client
465
1
                    .create_multipart_upload()
466
1
                    .bucket(&self.bucket)
467
1
                    .key(s3_path)
468
1
                    .send()
469
1
                    .await
470
1
                    .map_or_else(
471
0
                        |e| {
472
0
                            RetryResult::Retry(make_err!(
473
0
                                Code::Aborted,
474
0
                                "Failed to create multipart upload to ONTAP S3: {e:?}"
475
0
                            ))
476
0
                        },
477
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
478
1
                            upload_id.map_or_else(
479
0
                                || {
480
0
                                    RetryResult::Err(make_err!(
481
0
                                        Code::Internal,
482
0
                                        "Expected upload_id to be set by ONTAP S3 response"
483
0
                                    ))
484
0
                                },
485
                                RetryResult::Ok,
486
                            )
487
1
                        },
488
                    );
489
1
                Some((retry_result, ()))
490
2
            }))
491
            .await?;
492
493
        let bytes_per_upload_part =
494
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
495
496
1
        let upload_parts = move || async move {
497
1
            let (tx, mut rx) = tokio::sync::mpsc::channel(self.multipart_max_concurrent_uploads);
498
499
1
            let read_stream_fut = (
500
1
                async move {
501
1
                    let retrier = &Pin::get_ref(self).retrier;
502
4
                    for part_number in 1..i32::MAX {
503
4
                        let write_buf = reader
504
4
                            .consume(
505
                                Some(
506
4
                                    usize
507
4
                                        ::try_from(bytes_per_upload_part)
508
4
                                        .err_tip(
509
                                            || "Could not convert bytes_per_upload_part to usize"
510
0
                                        )?
511
                                )
512
4
                            ).await
513
4
                            .err_tip(|| "Failed to read chunk in ontap_s3_store")
?0
;
514
4
                        if write_buf.is_empty() {
  Branch (514:28): [True: 0, False: 0]
  Branch (514:28): [Folded - Ignored]
  Branch (514:28): [True: 0, False: 0]
  Branch (514:28): [True: 1, False: 3]
515
1
                            break;
516
3
                        }
517
518
3
                        tx
519
3
                            .send(
520
3
                                retrier.retry(
521
3
                                    unfold(write_buf, move |write_buf| async move {
522
3
                                        let retry_result = self.s3_client
523
3
                                            .upload_part()
524
3
                                            .bucket(&self.bucket)
525
3
                                            .key(s3_path)
526
3
                                            .upload_id(upload_id)
527
3
                                            .body(ByteStream::new(SdkBody::from(write_buf.clone())))
528
3
                                            .part_number(part_number)
529
3
                                            .send().await
530
3
                                            .map_or_else(
531
0
                                                |e| {
532
0
                                                    RetryResult::Retry(
533
0
                                                        make_err!(
534
0
                                                            Code::Aborted,
535
0
                                                            "Failed to upload part {part_number} in ONTAP S3 store: {e:?}"
536
0
                                                        )
537
0
                                                    )
538
0
                                                },
539
3
                                                |mut response| {
540
3
                                                    RetryResult::Ok(
541
3
                                                        CompletedPartBuilder::default()
542
3
                                                            .set_e_tag(response.e_tag.take())
543
3
                                                            .part_number(part_number)
544
3
                                                            .build()
545
3
                                                    )
546
3
                                                }
547
                                            );
548
3
                                        Some((retry_result, write_buf))
549
6
                                    })
550
                                )
551
3
                            ).await
552
3
                            .map_err(|_| 
{0
553
0
                                make_err!(
554
0
                                    Code::Internal,
555
                                    "Failed to send part to channel in ontap_s3_store"
556
                                )
557
0
                            })?;
558
                    }
559
1
                    Result::<_, Error>::Ok(())
560
                }
561
1
            ).fuse();
562
563
1
            let mut upload_futures = FuturesUnordered::new();
564
1
            let mut completed_parts = Vec::with_capacity(
565
1
                usize::try_from(cmp::min(
566
1
                    MAX_UPLOAD_PARTS as u64,
567
1
                    max_size / bytes_per_upload_part + 1,
568
                ))
569
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
570
            );
571
572
1
            tokio::pin!(read_stream_fut);
573
            loop {
574
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures3
.
is_empty3
() {
  Branch (574:20): [True: 0, False: 0]
  Branch (574:55): [True: 0, False: 0]
  Branch (574:72): [True: 0, False: 0]
  Branch (574:20): [Folded - Ignored]
  Branch (574:55): [Folded - Ignored]
  Branch (574:72): [Folded - Ignored]
  Branch (574:20): [True: 0, False: 0]
  Branch (574:55): [True: 0, False: 0]
  Branch (574:72): [True: 0, False: 0]
  Branch (574:20): [True: 7, False: 1]
  Branch (574:55): [True: 3, False: 4]
  Branch (574:72): [True: 1, False: 2]
575
1
                    break;
576
7
                }
577
7
                tokio::select! {
578
7
                    
result1
= &mut read_stream_fut =>
result1
?0
,
579
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
580
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
581
                }
582
            }
583
584
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
585
586
1
            self.retrier.retry(
587
1
                unfold(completed_parts, move |completed_parts| async move {
588
                    Some((
589
1
                        self.s3_client
590
1
                            .complete_multipart_upload()
591
1
                            .bucket(&self.bucket)
592
1
                            .key(s3_path)
593
1
                            .multipart_upload(
594
1
                                CompletedMultipartUploadBuilder::default()
595
1
                                    .set_parts(Some(completed_parts.clone()))
596
1
                                    .build()
597
1
                            )
598
1
                            .upload_id(upload_id)
599
1
                            .send().await
600
1
                            .map_or_else(
601
0
                                |e| {
602
0
                                    RetryResult::Retry(
603
0
                                        make_err!(
604
0
                                            Code::Aborted,
605
0
                                            "Failed to complete multipart upload in ONTAP S3 store: {e:?}"
606
0
                                        )
607
0
                                    )
608
0
                                },
609
1
                                |_| RetryResult::Ok(())
610
                            ),
611
1
                        completed_parts,
612
                    ))
613
2
                })
614
1
            ).await
615
2
        };
616
617
        upload_parts()
618
0
            .or_else(move |e| async move {
619
0
                Result::<(), _>::Err(e).merge(
620
0
                    self.s3_client
621
0
                        .abort_multipart_upload()
622
0
                        .bucket(&self.bucket)
623
0
                        .key(s3_path)
624
0
                        .upload_id(upload_id)
625
0
                        .send()
626
0
                        .await
627
0
                        .map_or_else(
628
0
                            |e| {
629
0
                                let err = make_err!(
630
0
                                    Code::Aborted,
631
                                    "Failed to abort multipart upload in ONTAP S3 store : {e:?}"
632
                                );
633
0
                                event!(Level::INFO, ?err, "Multipart upload error");
634
0
                                Err(err)
635
0
                            },
636
0
                            |_| Ok(()),
637
                        ),
638
                )
639
0
            })
640
            .await
641
2
    }
642
643
    async fn get_part(
644
        self: Pin<&Self>,
645
        key: StoreKey<'_>,
646
        writer: &mut DropCloserWriteHalf,
647
        offset: u64,
648
        length: Option<u64>,
649
4
    ) -> Result<(), Error> {
650
        if is_zero_digest(key.borrow()) {
651
            writer
652
                .send_eof()
653
                .err_tip(|| "Failed to send zero EOF in ONTAP S3 store get_part")?;
654
            return Ok(());
655
        }
656
657
        let s3_path = &self.make_s3_path(&key);
658
        let end_read_byte = length
659
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
660
            .err_tip(|| "Integer overflow protection triggered")?;
661
662
        self.retrier
663
3
            .retry(unfold(writer, move |writer| async move {
664
3
                let result = self.s3_client
665
3
                    .get_object()
666
3
                    .bucket(&self.bucket)
667
3
                    .key(s3_path)
668
3
                    .range(format!(
669
3
                        "bytes={}-{}",
670
3
                        offset + writer.get_bytes_written(),
671
3
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
672
                    ))
673
3
                    .send()
674
3
                    .await;
675
676
3
                match result {
677
3
                    Ok(head_object_output) => {
678
3
                        let mut s3_in_stream = head_object_output.body;
679
3
                        let _bytes_sent = 0;
680
681
7
                        while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (681:35): [True: 0, False: 0]
  Branch (681:35): [Folded - Ignored]
  Branch (681:35): [True: 0, False: 0]
  Branch (681:35): [True: 4, False: 3]
682
4
                            match maybe_bytes {
683
4
                                Ok(bytes) => {
684
4
                                    if bytes.is_empty() {
  Branch (684:40): [True: 0, False: 0]
  Branch (684:40): [Folded - Ignored]
  Branch (684:40): [True: 0, False: 0]
  Branch (684:40): [True: 1, False: 3]
685
1
                                        continue;
686
3
                                    }
687
688
                                    // Clone bytes before sending
689
3
                                    let bytes_clone = bytes.clone();
690
691
                                    // More robust sending mechanism
692
3
                                    match writer.send(bytes).await {
693
3
                                        Ok(()) => {
694
3
                                            let _ = bytes_clone.len();
695
3
                                        }
696
0
                                        Err(e) => {
697
0
                                            return Some((
698
0
                                                RetryResult::Err(make_err!(
699
0
                                                    Code::Aborted,
700
0
                                                    "Error sending bytes to consumer in ONTAP S3: {e}"
701
0
                                                )),
702
0
                                                writer,
703
0
                                            ));
704
                                        }
705
                                    }
706
                                }
707
0
                                Err(e) => {
708
0
                                    return Some((
709
0
                                        RetryResult::Retry(make_err!(
710
0
                                            Code::Aborted,
711
0
                                            "Bad bytestream element in ONTAP S3: {e}"
712
0
                                        )),
713
0
                                        writer,
714
0
                                    ));
715
                                }
716
                            }
717
                        }
718
719
                        // EOF handling
720
3
                        if let Err(
e0
) = writer.send_eof() {
  Branch (720:32): [True: 0, False: 0]
  Branch (720:32): [Folded - Ignored]
  Branch (720:32): [True: 0, False: 0]
  Branch (720:32): [True: 0, False: 3]
721
0
                            return Some((
722
0
                                RetryResult::Err(make_err!(
723
0
                                    Code::Aborted,
724
0
                                    "Failed to send EOF to consumer in ONTAP S3: {e}"
725
0
                                )),
726
0
                                writer,
727
0
                            ));
728
3
                        }
729
730
3
                        Some((RetryResult::Ok(()), writer))
731
                    }
732
0
                    Err(sdk_error) => {
733
                        // Clone sdk_error before moving
734
0
                        let error_description = format!("{sdk_error:?}");
735
0
                        match sdk_error.into_service_error() {
736
0
                            GetObjectError::NoSuchKey(e) => {
737
0
                                Some((
738
0
                                    RetryResult::Err(make_err!(
739
0
                                        Code::NotFound,
740
0
                                        "No such key in ONTAP S3: {e}"
741
0
                                    )),
742
0
                                    writer,
743
0
                                ))
744
                            }
745
0
                            _ => Some((
746
0
                                RetryResult::Retry(make_err!(
747
0
                                    Code::Unavailable,
748
0
                                    "Unhandled GetObjectError in ONTAP S3: {error_description}"
749
0
                                )),
750
0
                                writer,
751
0
                            )),
752
                        }
753
                    },
754
                }
755
6
            }))
756
            .await
757
4
    }
758
759
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
760
0
        self
761
0
    }
762
763
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
764
0
        self
765
0
    }
766
767
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
768
0
        self
769
0
    }
770
771
3
    fn register_remove_callback(
772
3
        self: Arc<Self>,
773
3
        callback: Arc<dyn RemoveItemCallback>,
774
3
    ) -> Result<(), Error> {
775
3
        self.remove_callbacks.lock().push(callback);
776
3
        Ok(())
777
3
    }
778
}
779
780
#[async_trait]
781
impl<I, NowFn> HealthStatusIndicator for OntapS3Store<NowFn>
782
where
783
    I: InstantWrapper,
784
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
785
{
786
0
    fn get_name(&self) -> &'static str {
787
0
        "OntapS3Store"
788
0
    }
789
790
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
791
        StoreDriver::check_health(Pin::new(self), namespace).await
792
0
    }
793
}