Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::fs::File;
20
use std::io::BufReader;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
30
use aws_sdk_s3::operation::get_object::GetObjectError;
31
use aws_sdk_s3::operation::head_object::HeadObjectError;
32
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
33
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
34
use base64::Engine;
35
use base64::prelude::BASE64_STANDARD_NO_PAD;
36
use bytes::BytesMut;
37
use futures::future::{Either, FusedFuture};
38
use futures::stream::{FuturesUnordered, unfold};
39
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
40
use hyper_rustls::ConfigBuilderExt;
41
use nativelink_config::stores::ExperimentalOntapS3Spec;
42
use nativelink_error::{Code, Error, ResultExt, make_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{
45
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
46
};
47
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
48
use nativelink_util::instant_wrapper::InstantWrapper;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
51
use parking_lot::Mutex;
52
use rustls::{ClientConfig, RootCertStore};
53
use rustls_pki_types::CertificateDer;
54
use rustls_pki_types::pem::PemObject;
55
use sha2::{Digest, Sha256};
56
use tokio::time::sleep;
57
use tracing::{Level, event, warn};
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::common_s3_utils::TlsClient;
61
62
// S3 parts cannot be smaller than this number
63
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB
64
65
// S3 parts cannot be larger than this number
66
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
67
68
// S3 parts cannot be more than this number
69
const MAX_UPLOAD_PARTS: usize = 10_000;
70
71
// Default max buffer size for retrying upload requests
72
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 20 * 1024 * 1024; // 20MB
73
74
// Default limit for concurrent part uploads per multipart upload
75
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
76
77
type RemoveCallback = Arc<dyn RemoveItemCallback>;
78
79
#[derive(Debug, MetricsComponent)]
80
pub struct OntapS3Store<NowFn> {
81
    s3_client: Arc<Client>,
82
    now_fn: NowFn,
83
    #[metric(help = "The bucket name for the ONTAP S3 store")]
84
    bucket: String,
85
    #[metric(help = "The key prefix for the ONTAP S3 store")]
86
    key_prefix: String,
87
    retrier: Retrier,
88
    #[metric(help = "The number of seconds to consider an object expired")]
89
    consider_expired_after_s: i64,
90
    #[metric(help = "The number of bytes to buffer for retrying requests")]
91
    max_retry_buffer_per_request: usize,
92
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
93
    multipart_max_concurrent_uploads: usize,
94
95
    remove_callbacks: Mutex<Vec<RemoveCallback>>,
96
}
97
98
0
pub fn load_custom_certs(cert_path: &str) -> Result<Arc<ClientConfig>, Error> {
99
0
    let mut root_store = RootCertStore::empty();
100
101
    // Create a BufReader from the cert file
102
0
    let mut cert_reader = BufReader::new(
103
0
        File::open(cert_path)
104
0
            .err_tip(|| format!("Failed to open CA certificate file {cert_path}"))?,
105
    );
106
107
    // Parse certificates
108
0
    let certs = CertificateDer::pem_reader_iter(&mut cert_reader).collect::<Result<Vec<_>, _>>()?;
109
110
    // Add each certificate to the root store
111
0
    for cert in certs {
112
0
        root_store.add(cert).map_err(|e| {
113
0
            make_err!(
114
0
                Code::Internal,
115
                "Failed to add certificate to root store: {e:?}"
116
            )
117
0
        })?;
118
    }
119
120
    // Build the client config with the root store
121
0
    let config = ClientConfig::builder()
122
0
        .with_root_certificates(root_store)
123
0
        .with_no_client_auth();
124
125
0
    Ok(Arc::new(config))
126
0
}
127
128
impl<I, NowFn> OntapS3Store<NowFn>
129
where
130
    I: InstantWrapper,
131
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
132
{
133
3
    pub async fn new(spec: &ExperimentalOntapS3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
134
        // Load custom CA config
135
3
        let ca_config = if let Some(
cert_path0
) = &spec.root_certificates {
  Branch (135:32): [True: 0, False: 3]
  Branch (135:32): [Folded - Ignored]
136
0
            load_custom_certs(cert_path)?
137
        } else {
138
3
            Arc::new(
139
3
                ClientConfig::builder()
140
3
                    .with_native_roots()
?0
141
3
                    .with_no_client_auth(),
142
            )
143
        };
144
145
3
        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
146
3
            .with_tls_config((*ca_config).clone())
147
3
            .https_only()
148
3
            .enable_http1()
149
3
            .enable_http2()
150
3
            .build();
151
152
3
        let http_client = TlsClient::with_https_connector(&spec.common, https_connector);
153
154
3
        let credentials_provider = DefaultCredentialsChain::builder()
155
3
            .configure(
156
3
                ProviderConfig::without_region()
157
3
                    .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
158
3
                    .with_http_client(http_client.clone()),
159
3
            )
160
3
            .build()
161
3
            .await;
162
163
3
        let config = aws_sdk_s3::Config::builder()
164
3
            .credentials_provider(credentials_provider)
165
3
            .endpoint_url(&spec.endpoint)
166
3
            .region(Region::new(spec.vserver_name.clone()))
167
3
            .app_name(aws_config::AppName::new("nativelink").expect("valid app name"))
168
3
            .http_client(http_client)
169
3
            .force_path_style(true)
170
3
            .behavior_version(BehaviorVersion::v2025_08_07())
171
3
            .timeout_config(
172
3
                aws_config::timeout::TimeoutConfig::builder()
173
3
                    .connect_timeout(Duration::from_secs(30))
174
3
                    .operation_timeout(Duration::from_secs(120))
175
3
                    .build(),
176
            )
177
3
            .build();
178
179
3
        let s3_client = Client::from_conf(config);
180
181
3
        Self::new_with_client_and_jitter(
182
3
            spec,
183
3
            s3_client,
184
3
            spec.common.retry.make_jitter_fn(),
185
3
            now_fn,
186
        )
187
3
    }
188
189
15
    pub fn new_with_client_and_jitter(
190
15
        spec: &ExperimentalOntapS3Spec,
191
15
        s3_client: Client,
192
15
        jitter_fn: Arc<dyn (Fn(Duration) -> Duration) + Send + Sync>,
193
15
        now_fn: NowFn,
194
15
    ) -> Result<Arc<Self>, Error> {
195
15
        Ok(Arc::new(Self {
196
15
            s3_client: Arc::new(s3_client),
197
15
            now_fn,
198
15
            bucket: spec.bucket.clone(),
199
15
            key_prefix: spec
200
15
                .common
201
15
                .key_prefix
202
15
                .as_ref()
203
15
                .unwrap_or(&String::new())
204
15
                .clone(),
205
15
            retrier: Retrier::new(
206
15
                Arc::new(|duration| 
Box::pin3
(
sleep3
(
duration3
))),
207
15
                jitter_fn,
208
15
                spec.common.retry.clone(),
209
            ),
210
15
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
211
15
            max_retry_buffer_per_request: spec
212
15
                .common
213
15
                .max_retry_buffer_per_request
214
15
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
215
15
            multipart_max_concurrent_uploads: spec
216
15
                .common
217
15
                .multipart_max_concurrent_uploads
218
15
                .unwrap_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS),
219
15
            remove_callbacks: Mutex::new(vec![]),
220
        }))
221
15
    }
222
223
17
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
224
17
        format!("{}{}", self.key_prefix, key.as_str())
225
17
    }
226
227
9
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
228
9
        let digest_clone = digest.into_owned();
229
9
        self.retrier
230
12
            .
retry9
(
unfold9
(
()9
, move |state| {
231
12
                let local_digest = digest_clone.clone();
232
12
                async move {
233
12
                    let result = self
234
12
                        .s3_client
235
12
                        .head_object()
236
12
                        .bucket(&self.bucket)
237
12
                        .key(self.make_s3_path(&local_digest))
238
12
                        .send()
239
12
                        .await;
240
241
12
                    match result {
242
8
                        Ok(head_object_output) => {
243
8
                            if self.consider_expired_after_s != 0 {
  Branch (243:32): [True: 0, False: 0]
  Branch (243:32): [Folded - Ignored]
  Branch (243:32): [True: 0, False: 5]
  Branch (243:32): [True: 2, False: 1]
244
2
                                if let Some(last_modified) = head_object_output.last_modified {
  Branch (244:40): [True: 0, False: 0]
  Branch (244:40): [Folded - Ignored]
  Branch (244:40): [True: 0, False: 0]
  Branch (244:40): [True: 2, False: 0]
245
2
                                    let now_s = (self.now_fn)().unix_timestamp() as i64;
246
2
                                    if last_modified.secs() + self.consider_expired_after_s <= now_s
  Branch (246:40): [True: 0, False: 0]
  Branch (246:40): [Folded - Ignored]
  Branch (246:40): [True: 0, False: 0]
  Branch (246:40): [True: 1, False: 1]
247
                                    {
248
1
                                        let remove_callbacks = self.remove_callbacks.lock().clone();
249
1
                                        let mut callbacks: FuturesUnordered<_> = remove_callbacks
250
1
                                            .into_iter()
251
1
                                            .map(|callback| 
{0
252
0
                                                let store_key = local_digest.borrow();
253
0
                                                async move { callback.callback(store_key).await }
254
0
                                            })
255
1
                                            .collect();
256
1
                                        while callbacks.next().await.is_some() 
{}0
  Branch (256:47): [True: 0, False: 0]
  Branch (256:47): [Folded - Ignored]
  Branch (256:47): [True: 0, False: 0]
  Branch (256:47): [True: 0, False: 1]
257
1
                                        return Some((RetryResult::Ok(None), state));
258
1
                                    }
259
0
                                }
260
6
                            }
261
7
                            let Some(length) = head_object_output.content_length else {
  Branch (261:33): [True: 0, False: 0]
  Branch (261:33): [Folded - Ignored]
  Branch (261:33): [True: 5, False: 0]
  Branch (261:33): [True: 2, False: 0]
262
0
                                return Some((RetryResult::Ok(None), state));
263
                            };
264
7
                            if length >= 0 {
  Branch (264:32): [True: 0, False: 0]
  Branch (264:32): [Folded - Ignored]
  Branch (264:32): [True: 5, False: 0]
  Branch (264:32): [True: 2, False: 0]
265
7
                                return Some((RetryResult::Ok(Some(length as u64)), state));
266
0
                            }
267
0
                            Some((
268
0
                                RetryResult::Err(make_err!(
269
0
                                    Code::InvalidArgument,
270
0
                                    "Negative content length in ONTAP S3: {length:?}"
271
0
                                )),
272
0
                                state,
273
0
                            ))
274
                        }
275
4
                        Err(sdk_error) => match sdk_error.into_service_error() {
276
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
277
3
                            other => Some((
278
3
                                RetryResult::Retry(make_err!(
279
3
                                    Code::Unavailable,
280
3
                                    "Unhandled HeadObjectError in ONTAP S3: {other:?}"
281
3
                                )),
282
3
                                state,
283
3
                            )),
284
                        },
285
                    }
286
12
                }
287
12
            }))
288
9
            .await
289
9
    }
290
}
291
292
#[async_trait]
293
impl<I, NowFn> StoreDriver for OntapS3Store<NowFn>
294
where
295
    I: InstantWrapper,
296
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
297
{
298
    async fn has_with_results(
299
        self: Pin<&Self>,
300
        keys: &[StoreKey<'_>],
301
        results: &mut [Option<u64>],
302
10
    ) -> Result<(), Error> {
303
        keys.iter()
304
            .zip(results.iter_mut())
305
10
            .map(|(key, result)| async move {
306
10
                if is_zero_digest(key.borrow()) {
  Branch (306:20): [True: 0, False: 0]
  Branch (306:20): [Folded - Ignored]
  Branch (306:20): [True: 0, False: 5]
  Branch (306:20): [True: 1, False: 4]
307
1
                    *result = Some(0);
308
1
                    return Ok::<_, Error>(());
309
9
                }
310
311
9
                match self.has(key.borrow()).await {
312
9
                    Ok(size) => {
313
9
                        *result = size;
314
9
                        if size.is_none() {
  Branch (314:28): [True: 0, False: 0]
  Branch (314:28): [Folded - Ignored]
  Branch (314:28): [True: 0, False: 5]
  Branch (314:28): [True: 2, False: 2]
315
2
                            event!(
316
2
                                Level::INFO,
317
2
                                key = %key.as_str(),
318
2
                                "Object not found in ONTAP S3"
319
                            );
320
7
                        }
321
9
                        Ok(())
322
                    }
323
0
                    Err(err) => {
324
0
                        event!(
325
0
                            Level::ERROR,
326
0
                            key = %key.as_str(),
327
                            error = ?err,
328
0
                            "Error checking object existence"
329
                        );
330
0
                        Err(err)
331
                    }
332
                }
333
20
            })
334
            .collect::<FuturesUnordered<_>>()
335
            .try_collect()
336
            .await
337
10
    }
338
339
    async fn update(
340
        self: Pin<&Self>,
341
        key: StoreKey<'_>,
342
        mut reader: DropCloserReadHalf,
343
        size_info: UploadSizeInfo,
344
2
    ) -> Result<(), Error> {
345
        let s3_path = &self.make_s3_path(&key);
346
347
        let max_size = match size_info {
348
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
349
        };
350
351
        // For small files, use simple upload. For larger files or unknown size, use multipart
352
        if max_size < MIN_MULTIPART_SIZE && matches!(size_info, UploadSizeInfo::ExactSize(_)) {
353
            let UploadSizeInfo::ExactSize(sz) = size_info else {
354
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
355
            };
356
            reader.set_max_recent_data_size(
357
                u64::try_from(self.max_retry_buffer_per_request)
358
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
359
            );
360
            return self.retrier.retry(
361
1
                unfold(reader, move |mut reader| async move {
362
1
                    let (mut tx, mut rx) = make_buf_channel_pair();
363
364
1
                    let result = {
365
1
                        let reader_ref = &mut reader;
366
1
                        let (upload_res, bind_res): (Result<(), Error>, Result<(), Error>) = tokio::join!(async move {
367
1
                            let raw_body_bytes = {
368
1
                                let mut raw_body_chunks = BytesMut::new();
369
                                loop {
370
51
                                    match rx.recv().await {
371
51
                                        Ok(chunk) => {
372
51
                                            if chunk.is_empty() {
  Branch (372:48): [True: 0, False: 0]
  Branch (372:48): [Folded - Ignored]
  Branch (372:48): [True: 0, False: 0]
  Branch (372:48): [True: 1, False: 50]
373
1
                                                break Ok(raw_body_chunks.freeze());
374
50
                                            }
375
50
                                            raw_body_chunks.extend_from_slice(&chunk);
376
                                        }
377
0
                                        Err(err) => {
378
0
                                            break Err(err);
379
                                        }
380
                                    }
381
                                }
382
                            };
383
1
                            let internal_res = match raw_body_bytes {
384
1
                                Ok(body_bytes) => {
385
1
                                    let hash = Sha256::digest(&body_bytes);
386
1
                                    let send_res = self.s3_client
387
1
                                        .put_object()
388
1
                                        .bucket(&self.bucket)
389
1
                                        .key(s3_path.clone())
390
1
                                        .content_length(sz as i64)
391
1
                                        .body(
392
1
                                            ByteStream::from(body_bytes)
393
                                        )
394
1
                                        .set_checksum_algorithm(Some(aws_sdk_s3::types::ChecksumAlgorithm::Sha256))
395
1
                                        .set_checksum_sha256(Some(BASE64_STANDARD_NO_PAD.encode(hash)))
396
1
                                        .customize()
397
1
                                        .mutate_request(|req| {req.headers_mut().insert("x-amz-content-sha256", "UNSIGNED-PAYLOAD");})
398
1
                                        .send();
399
1
                                    Either::Left(send_res.map_ok_or_else(|e| Err(
make_err!0
(
Code::Aborted0
, "{e:?}")), |_| Ok(())))
400
                                    }
401
0
                                Err(collect_err) => {
402
0
                                    async fn make_collect_err(collect_err: Error) -> Result<(), Error> {
403
0
                                        Err(collect_err)
404
0
                                    }
405
406
0
                                    warn!(
407
                                        ?collect_err,
408
0
                                        "Failed to get body");
409
0
                                    let future_err = make_collect_err(collect_err);
410
0
                                    Either::Right(future_err)
411
                                }
412
                            };
413
1
                            internal_res.await
414
1
                        },
415
1
                            tx.bind_buffered(reader_ref)
416
                        );
417
1
                        upload_res
418
1
                            .merge(bind_res)
419
1
                            .err_tip(|| "Failed to upload file to ONTAP S3 in single chunk")
420
                    };
421
422
1
                    let retry_result = result.map_or_else(
423
0
                        |mut err| {
424
0
                            err.code = Code::Aborted;
425
0
                            let bytes_received = reader.get_bytes_received();
426
0
                            if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (426:36): [True: 0, False: 0]
  Branch (426:36): [Folded - Ignored]
  Branch (426:36): [True: 0, False: 0]
  Branch (426:36): [True: 0, False: 0]
427
0
                                event!(
428
0
                                    Level::ERROR,
429
                                    ?bytes_received,
430
                                    err = ?try_reset_err,
431
0
                                    "Unable to reset stream after failed upload in OntapS3Store::update"
432
                                );
433
0
                                return RetryResult::Err(
434
0
                                    err
435
0
                                        .merge(try_reset_err)
436
0
                                        .append(
437
0
                                            format!(
438
0
                                                "Failed to retry upload with {bytes_received} bytes received in OntapS3Store::update"
439
0
                                            )
440
0
                                        )
441
0
                                );
442
0
                            }
443
0
                            let err = err.append(
444
0
                                format!(
445
0
                                    "Retry on upload happened with {bytes_received} bytes received in OntapS3Store::update"
446
                                )
447
                            );
448
0
                            event!(Level::INFO, ?err, ?bytes_received, "Retryable ONTAP S3 error");
449
0
                            RetryResult::Retry(err)
450
0
                        },
451
1
                        |()| RetryResult::Ok(())
452
                    );
453
1
                    Some((retry_result, reader))
454
2
                })
455
            ).await;
456
        }
457
458
        // Handle multipart upload for large files
459
        let upload_id = &self
460
            .retrier
461
1
            .retry(unfold((), move |()| async move {
462
1
                let retry_result = self
463
1
                    .s3_client
464
1
                    .create_multipart_upload()
465
1
                    .bucket(&self.bucket)
466
1
                    .key(s3_path)
467
1
                    .send()
468
1
                    .await
469
1
                    .map_or_else(
470
0
                        |e| {
471
0
                            RetryResult::Retry(make_err!(
472
0
                                Code::Aborted,
473
0
                                "Failed to create multipart upload to ONTAP S3: {e:?}"
474
0
                            ))
475
0
                        },
476
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
477
1
                            upload_id.map_or_else(
478
0
                                || {
479
0
                                    RetryResult::Err(make_err!(
480
0
                                        Code::Internal,
481
0
                                        "Expected upload_id to be set by ONTAP S3 response"
482
0
                                    ))
483
0
                                },
484
                                RetryResult::Ok,
485
                            )
486
1
                        },
487
                    );
488
1
                Some((retry_result, ()))
489
2
            }))
490
            .await?;
491
492
        let bytes_per_upload_part =
493
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
494
495
1
        let upload_parts = move || async move {
496
1
            let (tx, mut rx) = tokio::sync::mpsc::channel(self.multipart_max_concurrent_uploads);
497
498
1
            let read_stream_fut = (
499
1
                async move {
500
1
                    let retrier = &Pin::get_ref(self).retrier;
501
4
                    for part_number in 1..i32::MAX {
502
4
                        let write_buf = reader
503
4
                            .consume(
504
                                Some(
505
4
                                    usize
506
4
                                        ::try_from(bytes_per_upload_part)
507
4
                                        .err_tip(
508
                                            || "Could not convert bytes_per_upload_part to usize"
509
0
                                        )?
510
                                )
511
4
                            ).await
512
4
                            .err_tip(|| "Failed to read chunk in ontap_s3_store")
?0
;
513
4
                        if write_buf.is_empty() {
  Branch (513:28): [True: 0, False: 0]
  Branch (513:28): [Folded - Ignored]
  Branch (513:28): [True: 0, False: 0]
  Branch (513:28): [True: 1, False: 3]
514
1
                            break;
515
3
                        }
516
517
3
                        tx
518
3
                            .send(
519
3
                                retrier.retry(
520
3
                                    unfold(write_buf, move |write_buf| async move {
521
3
                                        let retry_result = self.s3_client
522
3
                                            .upload_part()
523
3
                                            .bucket(&self.bucket)
524
3
                                            .key(s3_path)
525
3
                                            .upload_id(upload_id)
526
3
                                            .body(ByteStream::new(SdkBody::from(write_buf.clone())))
527
3
                                            .part_number(part_number)
528
3
                                            .send().await
529
3
                                            .map_or_else(
530
0
                                                |e| {
531
0
                                                    RetryResult::Retry(
532
0
                                                        make_err!(
533
0
                                                            Code::Aborted,
534
0
                                                            "Failed to upload part {part_number} in ONTAP S3 store: {e:?}"
535
0
                                                        )
536
0
                                                    )
537
0
                                                },
538
3
                                                |mut response| {
539
3
                                                    RetryResult::Ok(
540
3
                                                        CompletedPartBuilder::default()
541
3
                                                            .set_e_tag(response.e_tag.take())
542
3
                                                            .part_number(part_number)
543
3
                                                            .build()
544
3
                                                    )
545
3
                                                }
546
                                            );
547
3
                                        Some((retry_result, write_buf))
548
6
                                    })
549
                                )
550
3
                            ).await
551
3
                            .map_err(|_| 
{0
552
0
                                make_err!(
553
0
                                    Code::Internal,
554
                                    "Failed to send part to channel in ontap_s3_store"
555
                                )
556
0
                            })?;
557
                    }
558
1
                    Result::<_, Error>::Ok(())
559
                }
560
1
            ).fuse();
561
562
1
            let mut upload_futures = FuturesUnordered::new();
563
1
            let mut completed_parts = Vec::with_capacity(
564
1
                usize::try_from(cmp::min(
565
1
                    MAX_UPLOAD_PARTS as u64,
566
1
                    max_size / bytes_per_upload_part + 1,
567
                ))
568
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
569
            );
570
571
1
            tokio::pin!(read_stream_fut);
572
            loop {
573
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures2
.
is_empty2
() {
  Branch (573:20): [True: 0, False: 0]
  Branch (573:55): [True: 0, False: 0]
  Branch (573:72): [True: 0, False: 0]
  Branch (573:20): [Folded - Ignored]
  Branch (573:55): [Folded - Ignored]
  Branch (573:72): [Folded - Ignored]
  Branch (573:20): [True: 0, False: 0]
  Branch (573:55): [True: 0, False: 0]
  Branch (573:72): [True: 0, False: 0]
  Branch (573:20): [True: 7, False: 1]
  Branch (573:55): [True: 2, False: 5]
  Branch (573:72): [True: 1, False: 1]
574
1
                    break;
575
7
                }
576
7
                tokio::select! {
577
7
                    
result1
= &mut read_stream_fut =>
result1
?0
,
578
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
579
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
580
                }
581
            }
582
583
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
584
585
1
            self.retrier.retry(
586
1
                unfold(completed_parts, move |completed_parts| async move {
587
                    Some((
588
1
                        self.s3_client
589
1
                            .complete_multipart_upload()
590
1
                            .bucket(&self.bucket)
591
1
                            .key(s3_path)
592
1
                            .multipart_upload(
593
1
                                CompletedMultipartUploadBuilder::default()
594
1
                                    .set_parts(Some(completed_parts.clone()))
595
1
                                    .build()
596
1
                            )
597
1
                            .upload_id(upload_id)
598
1
                            .send().await
599
1
                            .map_or_else(
600
0
                                |e| {
601
0
                                    RetryResult::Retry(
602
0
                                        make_err!(
603
0
                                            Code::Aborted,
604
0
                                            "Failed to complete multipart upload in ONTAP S3 store: {e:?}"
605
0
                                        )
606
0
                                    )
607
0
                                },
608
1
                                |_| RetryResult::Ok(())
609
                            ),
610
1
                        completed_parts,
611
                    ))
612
2
                })
613
1
            ).await
614
2
        };
615
616
        upload_parts()
617
0
            .or_else(move |e| async move {
618
0
                Result::<(), _>::Err(e).merge(
619
0
                    self.s3_client
620
0
                        .abort_multipart_upload()
621
0
                        .bucket(&self.bucket)
622
0
                        .key(s3_path)
623
0
                        .upload_id(upload_id)
624
0
                        .send()
625
0
                        .await
626
0
                        .map_or_else(
627
0
                            |e| {
628
0
                                let err = make_err!(
629
0
                                    Code::Aborted,
630
                                    "Failed to abort multipart upload in ONTAP S3 store : {e:?}"
631
                                );
632
0
                                event!(Level::INFO, ?err, "Multipart upload error");
633
0
                                Err(err)
634
0
                            },
635
0
                            |_| Ok(()),
636
                        ),
637
                )
638
0
            })
639
            .await
640
2
    }
641
642
    async fn get_part(
643
        self: Pin<&Self>,
644
        key: StoreKey<'_>,
645
        writer: &mut DropCloserWriteHalf,
646
        offset: u64,
647
        length: Option<u64>,
648
4
    ) -> Result<(), Error> {
649
        if is_zero_digest(key.borrow()) {
650
            writer
651
                .send_eof()
652
                .err_tip(|| "Failed to send zero EOF in ONTAP S3 store get_part")?;
653
            return Ok(());
654
        }
655
656
        let s3_path = &self.make_s3_path(&key);
657
        let end_read_byte = length
658
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
659
            .err_tip(|| "Integer overflow protection triggered")?;
660
661
        self.retrier
662
3
            .retry(unfold(writer, move |writer| async move {
663
3
                let result = self.s3_client
664
3
                    .get_object()
665
3
                    .bucket(&self.bucket)
666
3
                    .key(s3_path)
667
3
                    .range(format!(
668
3
                        "bytes={}-{}",
669
3
                        offset + writer.get_bytes_written(),
670
3
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
671
                    ))
672
3
                    .send()
673
3
                    .await;
674
675
3
                match result {
676
3
                    Ok(head_object_output) => {
677
3
                        let mut s3_in_stream = head_object_output.body;
678
3
                        let _bytes_sent = 0;
679
680
7
                        while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (680:35): [True: 0, False: 0]
  Branch (680:35): [Folded - Ignored]
  Branch (680:35): [True: 0, False: 0]
  Branch (680:35): [True: 4, False: 3]
681
4
                            match maybe_bytes {
682
4
                                Ok(bytes) => {
683
4
                                    if bytes.is_empty() {
  Branch (683:40): [True: 0, False: 0]
  Branch (683:40): [Folded - Ignored]
  Branch (683:40): [True: 0, False: 0]
  Branch (683:40): [True: 1, False: 3]
684
1
                                        continue;
685
3
                                    }
686
687
                                    // Clone bytes before sending
688
3
                                    let bytes_clone = bytes.clone();
689
690
                                    // More robust sending mechanism
691
3
                                    match writer.send(bytes).await {
692
3
                                        Ok(()) => {
693
3
                                            let _ = bytes_clone.len();
694
3
                                        }
695
0
                                        Err(e) => {
696
0
                                            return Some((
697
0
                                                RetryResult::Err(make_err!(
698
0
                                                    Code::Aborted,
699
0
                                                    "Error sending bytes to consumer in ONTAP S3: {e}"
700
0
                                                )),
701
0
                                                writer,
702
0
                                            ));
703
                                        }
704
                                    }
705
                                }
706
0
                                Err(e) => {
707
0
                                    return Some((
708
0
                                        RetryResult::Retry(make_err!(
709
0
                                            Code::Aborted,
710
0
                                            "Bad bytestream element in ONTAP S3: {e}"
711
0
                                        )),
712
0
                                        writer,
713
0
                                    ));
714
                                }
715
                            }
716
                        }
717
718
                        // EOF handling
719
3
                        if let Err(
e0
) = writer.send_eof() {
  Branch (719:32): [True: 0, False: 0]
  Branch (719:32): [Folded - Ignored]
  Branch (719:32): [True: 0, False: 0]
  Branch (719:32): [True: 0, False: 3]
720
0
                            return Some((
721
0
                                RetryResult::Err(make_err!(
722
0
                                    Code::Aborted,
723
0
                                    "Failed to send EOF to consumer in ONTAP S3: {e}"
724
0
                                )),
725
0
                                writer,
726
0
                            ));
727
3
                        }
728
729
3
                        Some((RetryResult::Ok(()), writer))
730
                    }
731
0
                    Err(sdk_error) => {
732
                        // Clone sdk_error before moving
733
0
                        let error_description = format!("{sdk_error:?}");
734
0
                        match sdk_error.into_service_error() {
735
0
                            GetObjectError::NoSuchKey(e) => {
736
0
                                Some((
737
0
                                    RetryResult::Err(make_err!(
738
0
                                        Code::NotFound,
739
0
                                        "No such key in ONTAP S3: {e}"
740
0
                                    )),
741
0
                                    writer,
742
0
                                ))
743
                            }
744
0
                            _ => Some((
745
0
                                RetryResult::Retry(make_err!(
746
0
                                    Code::Unavailable,
747
0
                                    "Unhandled GetObjectError in ONTAP S3: {error_description}"
748
0
                                )),
749
0
                                writer,
750
0
                            )),
751
                        }
752
                    },
753
                }
754
6
            }))
755
            .await
756
4
    }
757
758
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
759
0
        self
760
0
    }
761
762
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
763
0
        self
764
0
    }
765
766
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
767
0
        self
768
0
    }
769
770
3
    fn register_remove_callback(
771
3
        self: Arc<Self>,
772
3
        callback: Arc<dyn RemoveItemCallback>,
773
3
    ) -> Result<(), Error> {
774
3
        self.remove_callbacks.lock().push(callback);
775
3
        Ok(())
776
3
    }
777
}
778
779
#[async_trait]
780
impl<I, NowFn> HealthStatusIndicator for OntapS3Store<NowFn>
781
where
782
    I: InstantWrapper,
783
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
784
{
785
0
    fn get_name(&self) -> &'static str {
786
0
        "OntapS3Store"
787
0
    }
788
789
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
790
        StoreDriver::check_health(Pin::new(self), namespace).await
791
0
    }
792
}