Coverage Report

Created: 2025-09-18 15:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::fs::File;
20
use std::io::BufReader;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
30
use aws_sdk_s3::operation::get_object::GetObjectError;
31
use aws_sdk_s3::operation::head_object::HeadObjectError;
32
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
33
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
34
use base64::Engine;
35
use base64::prelude::BASE64_STANDARD_NO_PAD;
36
use bytes::BytesMut;
37
use futures::future::{Either, FusedFuture};
38
use futures::stream::{FuturesUnordered, unfold};
39
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
40
use hyper_rustls::ConfigBuilderExt;
41
use nativelink_config::stores::ExperimentalOntapS3Spec;
42
use nativelink_error::{Code, Error, ResultExt, make_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{
45
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
46
};
47
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
48
use nativelink_util::instant_wrapper::InstantWrapper;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
51
use rustls::{ClientConfig, RootCertStore};
52
use rustls_pemfile::certs;
53
use sha2::{Digest, Sha256};
54
use tokio::time::sleep;
55
use tracing::{Level, event, warn};
56
57
use crate::cas_utils::is_zero_digest;
58
use crate::common_s3_utils::TlsClient;
59
60
// S3 parts cannot be smaller than this number
61
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB
62
63
// S3 parts cannot be larger than this number
64
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
65
66
// S3 parts cannot be more than this number
67
const MAX_UPLOAD_PARTS: usize = 10_000;
68
69
// Default max buffer size for retrying upload requests
70
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 20 * 1024 * 1024; // 20MB
71
72
// Default limit for concurrent part uploads per multipart upload
73
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
74
75
#[derive(Debug, MetricsComponent)]
76
pub struct OntapS3Store<NowFn> {
77
    s3_client: Arc<Client>,
78
    now_fn: NowFn,
79
    #[metric(help = "The bucket name for the ONTAP S3 store")]
80
    bucket: String,
81
    #[metric(help = "The key prefix for the ONTAP S3 store")]
82
    key_prefix: String,
83
    retrier: Retrier,
84
    #[metric(help = "The number of seconds to consider an object expired")]
85
    consider_expired_after_s: i64,
86
    #[metric(help = "The number of bytes to buffer for retrying requests")]
87
    max_retry_buffer_per_request: usize,
88
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
89
    multipart_max_concurrent_uploads: usize,
90
}
91
92
0
pub fn load_custom_certs(cert_path: &str) -> Result<Arc<ClientConfig>, Error> {
93
0
    let mut root_store = RootCertStore::empty();
94
95
    // Create a BufReader from the cert file
96
0
    let mut cert_reader = BufReader::new(
97
0
        File::open(cert_path)
98
0
            .map_err(|e| make_err!(Code::Internal, "Failed to open CA certificate file: {e:?}"))?,
99
    );
100
101
    // Parse certificates
102
0
    let certs = certs(&mut cert_reader)
103
0
        .collect::<Result<Vec<_>, _>>()
104
0
        .map_err(|e| make_err!(Code::Internal, "Failed to parse certificates: {e:?}"))?;
105
106
    // Add each certificate to the root store
107
0
    for cert in certs {
108
0
        root_store.add(cert).map_err(|e| {
109
0
            make_err!(
110
0
                Code::Internal,
111
                "Failed to add certificate to root store: {e:?}"
112
            )
113
0
        })?;
114
    }
115
116
    // Build the client config with the root store
117
0
    let config = ClientConfig::builder()
118
0
        .with_root_certificates(root_store)
119
0
        .with_no_client_auth();
120
121
0
    Ok(Arc::new(config))
122
0
}
123
124
impl<I, NowFn> OntapS3Store<NowFn>
125
where
126
    I: InstantWrapper,
127
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
128
{
129
3
    pub async fn new(spec: &ExperimentalOntapS3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
130
        // Load custom CA config
131
3
        let ca_config = if let Some(
cert_path0
) = &spec.root_certificates {
  Branch (131:32): [True: 0, False: 3]
  Branch (131:32): [Folded - Ignored]
132
0
            load_custom_certs(cert_path)?
133
        } else {
134
3
            Arc::new(
135
3
                ClientConfig::builder()
136
3
                    .with_native_roots()
?0
137
3
                    .with_no_client_auth(),
138
            )
139
        };
140
141
3
        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
142
3
            .with_tls_config((*ca_config).clone())
143
3
            .https_only()
144
3
            .enable_http1()
145
3
            .enable_http2()
146
3
            .build();
147
148
3
        let http_client = TlsClient::with_https_connector(&spec.common, https_connector);
149
150
3
        let credentials_provider = DefaultCredentialsChain::builder()
151
3
            .configure(
152
3
                ProviderConfig::without_region()
153
3
                    .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
154
3
                    .with_http_client(http_client.clone()),
155
3
            )
156
3
            .build()
157
3
            .await;
158
159
3
        let config = aws_sdk_s3::Config::builder()
160
3
            .credentials_provider(credentials_provider)
161
3
            .endpoint_url(&spec.endpoint)
162
3
            .region(Region::new(spec.vserver_name.clone()))
163
3
            .app_name(aws_config::AppName::new("nativelink").expect("valid app name"))
164
3
            .http_client(http_client)
165
3
            .force_path_style(true)
166
3
            .behavior_version(BehaviorVersion::v2025_01_17())
167
3
            .timeout_config(
168
3
                aws_config::timeout::TimeoutConfig::builder()
169
3
                    .connect_timeout(Duration::from_secs(30))
170
3
                    .operation_timeout(Duration::from_secs(120))
171
3
                    .build(),
172
            )
173
3
            .build();
174
175
3
        let s3_client = Client::from_conf(config);
176
177
3
        Self::new_with_client_and_jitter(
178
3
            spec,
179
3
            s3_client,
180
3
            spec.common.retry.make_jitter_fn(),
181
3
            now_fn,
182
        )
183
3
    }
184
185
15
    pub fn new_with_client_and_jitter(
186
15
        spec: &ExperimentalOntapS3Spec,
187
15
        s3_client: Client,
188
15
        jitter_fn: Arc<dyn (Fn(Duration) -> Duration) + Send + Sync>,
189
15
        now_fn: NowFn,
190
15
    ) -> Result<Arc<Self>, Error> {
191
15
        Ok(Arc::new(Self {
192
15
            s3_client: Arc::new(s3_client),
193
15
            now_fn,
194
15
            bucket: spec.bucket.clone(),
195
15
            key_prefix: spec
196
15
                .common
197
15
                .key_prefix
198
15
                .as_ref()
199
15
                .unwrap_or(&String::new())
200
15
                .clone(),
201
15
            retrier: Retrier::new(
202
15
                Arc::new(|duration| 
Box::pin3
(
sleep3
(
duration3
))),
203
15
                jitter_fn,
204
15
                spec.common.retry.clone(),
205
            ),
206
15
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
207
15
            max_retry_buffer_per_request: spec
208
15
                .common
209
15
                .max_retry_buffer_per_request
210
15
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
211
15
            multipart_max_concurrent_uploads: spec
212
15
                .common
213
15
                .multipart_max_concurrent_uploads
214
15
                .unwrap_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS),
215
        }))
216
15
    }
217
218
17
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
219
17
        format!("{}{}", self.key_prefix, key.as_str())
220
17
    }
221
222
9
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
223
9
        self.retrier
224
12
            .
retry9
(
unfold9
(
()9
, move |state| async move {
225
12
                let result = self
226
12
                    .s3_client
227
12
                    .head_object()
228
12
                    .bucket(&self.bucket)
229
12
                    .key(self.make_s3_path(&digest.borrow()))
230
12
                    .send()
231
12
                    .await;
232
233
12
                match result {
234
8
                    Ok(head_object_output) => {
235
8
                        if self.consider_expired_after_s != 0 {
  Branch (235:28): [True: 0, False: 0]
  Branch (235:28): [Folded - Ignored]
  Branch (235:28): [True: 0, False: 5]
  Branch (235:28): [True: 2, False: 1]
236
2
                            if let Some(last_modified) = head_object_output.last_modified {
  Branch (236:36): [True: 0, False: 0]
  Branch (236:36): [Folded - Ignored]
  Branch (236:36): [True: 0, False: 0]
  Branch (236:36): [True: 2, False: 0]
237
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
238
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
  Branch (238:36): [True: 0, False: 0]
  Branch (238:36): [Folded - Ignored]
  Branch (238:36): [True: 0, False: 0]
  Branch (238:36): [True: 1, False: 1]
239
1
                                    return Some((RetryResult::Ok(None), state));
240
1
                                }
241
0
                            }
242
6
                        }
243
7
                        let Some(length) = head_object_output.content_length else {
  Branch (243:29): [True: 0, False: 0]
  Branch (243:29): [Folded - Ignored]
  Branch (243:29): [True: 5, False: 0]
  Branch (243:29): [True: 2, False: 0]
244
0
                            return Some((RetryResult::Ok(None), state));
245
                        };
246
7
                        if length >= 0 {
  Branch (246:28): [True: 0, False: 0]
  Branch (246:28): [Folded - Ignored]
  Branch (246:28): [True: 5, False: 0]
  Branch (246:28): [True: 2, False: 0]
247
7
                            return Some((RetryResult::Ok(Some(length as u64)), state));
248
0
                        }
249
0
                        Some((
250
0
                            RetryResult::Err(make_err!(
251
0
                                Code::InvalidArgument,
252
0
                                "Negative content length in ONTAP S3: {length:?}"
253
0
                            )),
254
0
                            state,
255
0
                        ))
256
                    }
257
4
                    Err(sdk_error) => match sdk_error.into_service_error() {
258
1
                        HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
259
3
                        other => Some((
260
3
                            RetryResult::Retry(make_err!(
261
3
                                Code::Unavailable,
262
3
                                "Unhandled HeadObjectError in ONTAP S3: {other:?}"
263
3
                            )),
264
3
                            state,
265
3
                        )),
266
                    },
267
                }
268
24
            }))
269
9
            .await
270
9
    }
271
}
272
273
#[async_trait]
274
impl<I, NowFn> StoreDriver for OntapS3Store<NowFn>
275
where
276
    I: InstantWrapper,
277
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
278
{
279
    async fn has_with_results(
280
        self: Pin<&Self>,
281
        keys: &[StoreKey<'_>],
282
        results: &mut [Option<u64>],
283
20
    ) -> Result<(), Error> {
284
10
        keys.iter()
285
10
            .zip(results.iter_mut())
286
10
            .map(|(key, result)| async move {
287
10
                if is_zero_digest(key.borrow()) {
  Branch (287:20): [True: 0, False: 0]
  Branch (287:20): [Folded - Ignored]
  Branch (287:20): [True: 0, False: 5]
  Branch (287:20): [True: 1, False: 4]
288
1
                    *result = Some(0);
289
1
                    return Ok::<_, Error>(());
290
9
                }
291
292
9
                match self.has(key).await {
293
9
                    Ok(size) => {
294
9
                        *result = size;
295
9
                        if size.is_none() {
  Branch (295:28): [True: 0, False: 0]
  Branch (295:28): [Folded - Ignored]
  Branch (295:28): [True: 0, False: 5]
  Branch (295:28): [True: 2, False: 2]
296
2
                            event!(
297
2
                                Level::INFO,
298
2
                                key = %key.as_str(),
299
2
                                "Object not found in ONTAP S3"
300
                            );
301
7
                        }
302
9
                        Ok(())
303
                    }
304
0
                    Err(err) => {
305
0
                        event!(
306
0
                            Level::ERROR,
307
0
                            key = %key.as_str(),
308
                            error = ?err,
309
0
                            "Error checking object existence"
310
                        );
311
0
                        Err(err)
312
                    }
313
                }
314
20
            })
315
10
            .collect::<FuturesUnordered<_>>()
316
10
            .try_collect()
317
10
            .await
318
20
    }
319
320
    async fn update(
321
        self: Pin<&Self>,
322
        key: StoreKey<'_>,
323
        mut reader: DropCloserReadHalf,
324
        size_info: UploadSizeInfo,
325
4
    ) -> Result<(), Error> {
326
2
        let s3_path = &self.make_s3_path(&key);
327
328
2
        let max_size = match size_info {
329
2
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(
sz0
) => sz,
330
        };
331
332
        // For small files, use simple upload. For larger files or unknown size, use multipart
333
2
        if max_size < MIN_MULTIPART_SIZE && 
matches!0
(
size_info1
, UploadSizeInfo::ExactSize(_)) {
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [Folded - Ignored]
  Branch (333:12): [True: 0, False: 0]
  Branch (333:12): [True: 1, False: 1]
334
1
            let UploadSizeInfo::ExactSize(sz) = size_info else {
  Branch (334:17): [True: 0, False: 0]
  Branch (334:17): [Folded - Ignored]
  Branch (334:17): [True: 0, False: 0]
  Branch (334:17): [True: 1, False: 0]
335
0
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
336
            };
337
1
            reader.set_max_recent_data_size(
338
1
                u64::try_from(self.max_retry_buffer_per_request)
339
1
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")
?0
,
340
            );
341
1
            return self.retrier.retry(
342
1
                unfold(reader, move |mut reader| async move {
343
1
                    let (mut tx, mut rx) = make_buf_channel_pair();
344
345
1
                    let result = {
346
1
                        let reader_ref = &mut reader;
347
1
                        let (upload_res, bind_res): (Result<(), Error>, Result<(), Error>) = tokio::join!(async move {
348
1
                            let raw_body_bytes = {
349
1
                                let mut raw_body_chunks = BytesMut::new();
350
                                loop {
351
51
                                    match rx.recv().await {
352
51
                                        Ok(chunk) => {
353
51
                                            if chunk.is_empty() {
  Branch (353:48): [True: 0, False: 0]
  Branch (353:48): [Folded - Ignored]
  Branch (353:48): [True: 0, False: 0]
  Branch (353:48): [True: 1, False: 50]
354
1
                                                break Ok(raw_body_chunks.freeze());
355
50
                                            }
356
50
                                            raw_body_chunks.extend_from_slice(&chunk);
357
                                        }
358
0
                                        Err(err) => {
359
0
                                            break Err(err);
360
                                        }
361
                                    }
362
                                }
363
                            };
364
1
                            let internal_res = match raw_body_bytes {
365
1
                                Ok(body_bytes) => {
366
1
                                    let hash = Sha256::digest(&body_bytes);
367
1
                                    let send_res = self.s3_client
368
1
                                        .put_object()
369
1
                                        .bucket(&self.bucket)
370
1
                                        .key(s3_path.clone())
371
1
                                        .content_length(sz as i64)
372
1
                                        .body(
373
1
                                            ByteStream::from(body_bytes)
374
                                        )
375
1
                                        .set_checksum_algorithm(Some(aws_sdk_s3::types::ChecksumAlgorithm::Sha256))
376
1
                                        .set_checksum_sha256(Some(BASE64_STANDARD_NO_PAD.encode(hash)))
377
1
                                        .customize()
378
1
                                        .mutate_request(|req| {req.headers_mut().insert("x-amz-content-sha256", "UNSIGNED-PAYLOAD");})
379
1
                                        .send();
380
1
                                    Either::Left(send_res.map_ok_or_else(|e| Err(
make_err!0
(
Code::Aborted0
, "{e:?}")), |_| Ok(())))
381
                                    }
382
0
                                Err(collect_err) => {
383
0
                                    async fn make_collect_err(collect_err: Error) -> Result<(), Error> {
384
0
                                        Err(collect_err)
385
0
                                    }
386
387
0
                                    warn!(
388
                                        ?collect_err,
389
0
                                        "Failed to get body");
390
0
                                    let future_err = make_collect_err(collect_err);
391
0
                                    Either::Right(future_err)
392
                                }
393
                            };
394
1
                            internal_res.await
395
1
                        },
396
1
                            tx.bind_buffered(reader_ref)
397
                        );
398
1
                        upload_res
399
1
                            .merge(bind_res)
400
1
                            .err_tip(|| "Failed to upload file to ONTAP S3 in single chunk")
401
                    };
402
403
1
                    let retry_result = result.map_or_else(
404
0
                        |mut err| {
405
0
                            err.code = Code::Aborted;
406
0
                            let bytes_received = reader.get_bytes_received();
407
0
                            if let Err(try_reset_err) = reader.try_reset_stream() {
  Branch (407:36): [True: 0, False: 0]
  Branch (407:36): [Folded - Ignored]
  Branch (407:36): [True: 0, False: 0]
  Branch (407:36): [True: 0, False: 0]
408
0
                                event!(
409
0
                                    Level::ERROR,
410
                                    ?bytes_received,
411
                                    err = ?try_reset_err,
412
0
                                    "Unable to reset stream after failed upload in OntapS3Store::update"
413
                                );
414
0
                                return RetryResult::Err(
415
0
                                    err
416
0
                                        .merge(try_reset_err)
417
0
                                        .append(
418
0
                                            format!(
419
0
                                                "Failed to retry upload with {bytes_received} bytes received in OntapS3Store::update"
420
0
                                            )
421
0
                                        )
422
0
                                );
423
0
                            }
424
0
                            let err = err.append(
425
0
                                format!(
426
0
                                    "Retry on upload happened with {bytes_received} bytes received in OntapS3Store::update"
427
                                )
428
                            );
429
0
                            event!(Level::INFO, ?err, ?bytes_received, "Retryable ONTAP S3 error");
430
0
                            RetryResult::Retry(err)
431
0
                        },
432
1
                        |()| RetryResult::Ok(())
433
                    );
434
1
                    Some((retry_result, reader))
435
2
                })
436
1
            ).await;
437
1
        }
438
439
        // Handle multipart upload for large files
440
1
        let upload_id = &self
441
1
            .retrier
442
1
            .retry(unfold((), move |()| async move {
443
1
                let retry_result = self
444
1
                    .s3_client
445
1
                    .create_multipart_upload()
446
1
                    .bucket(&self.bucket)
447
1
                    .key(s3_path)
448
1
                    .send()
449
1
                    .await
450
1
                    .map_or_else(
451
0
                        |e| {
452
0
                            RetryResult::Retry(make_err!(
453
0
                                Code::Aborted,
454
0
                                "Failed to create multipart upload to ONTAP S3: {e:?}"
455
0
                            ))
456
0
                        },
457
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
458
1
                            upload_id.map_or_else(
459
0
                                || {
460
0
                                    RetryResult::Err(make_err!(
461
0
                                        Code::Internal,
462
0
                                        "Expected upload_id to be set by ONTAP S3 response"
463
0
                                    ))
464
0
                                },
465
                                RetryResult::Ok,
466
                            )
467
1
                        },
468
                    );
469
1
                Some((retry_result, ()))
470
2
            }))
471
1
            .await
?0
;
472
473
1
        let bytes_per_upload_part =
474
1
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
475
476
1
        let upload_parts = move || async move {
477
1
            let (tx, mut rx) = tokio::sync::mpsc::channel(self.multipart_max_concurrent_uploads);
478
479
1
            let read_stream_fut = (
480
1
                async move {
481
1
                    let retrier = &Pin::get_ref(self).retrier;
482
4
                    for part_number in 1..i32::MAX {
483
4
                        let write_buf = reader
484
4
                            .consume(
485
                                Some(
486
4
                                    usize
487
4
                                        ::try_from(bytes_per_upload_part)
488
4
                                        .err_tip(
489
                                            || "Could not convert bytes_per_upload_part to usize"
490
0
                                        )?
491
                                )
492
4
                            ).await
493
4
                            .err_tip(|| "Failed to read chunk in ontap_s3_store")
?0
;
494
4
                        if write_buf.is_empty() {
  Branch (494:28): [True: 0, False: 0]
  Branch (494:28): [Folded - Ignored]
  Branch (494:28): [True: 0, False: 0]
  Branch (494:28): [True: 1, False: 3]
495
1
                            break;
496
3
                        }
497
498
3
                        tx
499
3
                            .send(
500
3
                                retrier.retry(
501
3
                                    unfold(write_buf, move |write_buf| async move {
502
3
                                        let retry_result = self.s3_client
503
3
                                            .upload_part()
504
3
                                            .bucket(&self.bucket)
505
3
                                            .key(s3_path)
506
3
                                            .upload_id(upload_id)
507
3
                                            .body(ByteStream::new(SdkBody::from(write_buf.clone())))
508
3
                                            .part_number(part_number)
509
3
                                            .send().await
510
3
                                            .map_or_else(
511
0
                                                |e| {
512
0
                                                    RetryResult::Retry(
513
0
                                                        make_err!(
514
0
                                                            Code::Aborted,
515
0
                                                            "Failed to upload part {part_number} in ONTAP S3 store: {e:?}"
516
0
                                                        )
517
0
                                                    )
518
0
                                                },
519
3
                                                |mut response| {
520
3
                                                    RetryResult::Ok(
521
3
                                                        CompletedPartBuilder::default()
522
3
                                                            .set_e_tag(response.e_tag.take())
523
3
                                                            .part_number(part_number)
524
3
                                                            .build()
525
3
                                                    )
526
3
                                                }
527
                                            );
528
3
                                        Some((retry_result, write_buf))
529
6
                                    })
530
                                )
531
3
                            ).await
532
3
                            .map_err(|_| 
{0
533
0
                                make_err!(
534
0
                                    Code::Internal,
535
                                    "Failed to send part to channel in ontap_s3_store"
536
                                )
537
0
                            })?;
538
                    }
539
1
                    Result::<_, Error>::Ok(())
540
                }
541
1
            ).fuse();
542
543
1
            let mut upload_futures = FuturesUnordered::new();
544
1
            let mut completed_parts = Vec::with_capacity(
545
1
                usize::try_from(cmp::min(
546
1
                    MAX_UPLOAD_PARTS as u64,
547
1
                    max_size / bytes_per_upload_part + 1,
548
                ))
549
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
550
            );
551
552
1
            tokio::pin!(read_stream_fut);
553
            loop {
554
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures3
.
is_empty3
() {
  Branch (554:20): [True: 0, False: 0]
  Branch (554:55): [True: 0, False: 0]
  Branch (554:72): [True: 0, False: 0]
  Branch (554:20): [Folded - Ignored]
  Branch (554:55): [Folded - Ignored]
  Branch (554:72): [Folded - Ignored]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:55): [True: 0, False: 0]
  Branch (554:72): [True: 0, False: 0]
  Branch (554:20): [True: 7, False: 1]
  Branch (554:55): [True: 3, False: 4]
  Branch (554:72): [True: 1, False: 2]
555
1
                    break;
556
7
                }
557
7
                tokio::select! {
558
7
                    
result1
= &mut read_stream_fut =>
result1
?0
,
559
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
560
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
561
                }
562
            }
563
564
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
565
566
1
            self.retrier.retry(
567
1
                unfold(completed_parts, move |completed_parts| async move {
568
                    Some((
569
1
                        self.s3_client
570
1
                            .complete_multipart_upload()
571
1
                            .bucket(&self.bucket)
572
1
                            .key(s3_path)
573
1
                            .multipart_upload(
574
1
                                CompletedMultipartUploadBuilder::default()
575
1
                                    .set_parts(Some(completed_parts.clone()))
576
1
                                    .build()
577
1
                            )
578
1
                            .upload_id(upload_id)
579
1
                            .send().await
580
1
                            .map_or_else(
581
0
                                |e| {
582
0
                                    RetryResult::Retry(
583
0
                                        make_err!(
584
0
                                            Code::Aborted,
585
0
                                            "Failed to complete multipart upload in ONTAP S3 store: {e:?}"
586
0
                                        )
587
0
                                    )
588
0
                                },
589
1
                                |_| RetryResult::Ok(())
590
                            ),
591
1
                        completed_parts,
592
                    ))
593
2
                })
594
1
            ).await
595
2
        };
596
597
1
        upload_parts()
598
1
            .or_else(move |e| async move 
{0
599
0
                Result::<(), _>::Err(e).merge(
600
0
                    self.s3_client
601
0
                        .abort_multipart_upload()
602
0
                        .bucket(&self.bucket)
603
0
                        .key(s3_path)
604
0
                        .upload_id(upload_id)
605
0
                        .send()
606
0
                        .await
607
0
                        .map_or_else(
608
0
                            |e| {
609
0
                                let err = make_err!(
610
0
                                    Code::Aborted,
611
                                    "Failed to abort multipart upload in ONTAP S3 store : {e:?}"
612
                                );
613
0
                                event!(Level::INFO, ?err, "Multipart upload error");
614
0
                                Err(err)
615
0
                            },
616
0
                            |_| Ok(()),
617
                        ),
618
                )
619
0
            })
620
1
            .await
621
4
    }
622
623
    async fn get_part(
624
        self: Pin<&Self>,
625
        key: StoreKey<'_>,
626
        writer: &mut DropCloserWriteHalf,
627
        offset: u64,
628
        length: Option<u64>,
629
8
    ) -> Result<(), Error> {
630
4
        if is_zero_digest(key.borrow()) {
  Branch (630:12): [True: 0, False: 0]
  Branch (630:12): [Folded - Ignored]
  Branch (630:12): [True: 0, False: 0]
  Branch (630:12): [True: 1, False: 3]
631
1
            writer
632
1
                .send_eof()
633
1
                .err_tip(|| "Failed to send zero EOF in ONTAP S3 store get_part")
?0
;
634
1
            return Ok(());
635
3
        }
636
637
3
        let s3_path = &self.make_s3_path(&key);
638
3
        let end_read_byte = length
639
3
            .map_or(Some(None), |length| Some(
offset2
.
checked_add2
(
length2
)))
640
3
            .err_tip(|| "Integer overflow protection triggered")
?0
;
641
642
3
        self.retrier
643
3
            .retry(unfold(writer, move |writer| async move {
644
3
                let result = self.s3_client
645
3
                    .get_object()
646
3
                    .bucket(&self.bucket)
647
3
                    .key(s3_path)
648
3
                    .range(format!(
649
3
                        "bytes={}-{}",
650
3
                        offset + writer.get_bytes_written(),
651
3
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
652
                    ))
653
3
                    .send()
654
3
                    .await;
655
656
3
                match result {
657
3
                    Ok(head_object_output) => {
658
3
                        let mut s3_in_stream = head_object_output.body;
659
3
                        let _bytes_sent = 0;
660
661
7
                        while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
  Branch (661:35): [True: 0, False: 0]
  Branch (661:35): [Folded - Ignored]
  Branch (661:35): [True: 0, False: 0]
  Branch (661:35): [True: 4, False: 3]
662
4
                            match maybe_bytes {
663
4
                                Ok(bytes) => {
664
4
                                    if bytes.is_empty() {
  Branch (664:40): [True: 0, False: 0]
  Branch (664:40): [Folded - Ignored]
  Branch (664:40): [True: 0, False: 0]
  Branch (664:40): [True: 1, False: 3]
665
1
                                        continue;
666
3
                                    }
667
668
                                    // Clone bytes before sending
669
3
                                    let bytes_clone = bytes.clone();
670
671
                                    // More robust sending mechanism
672
3
                                    match writer.send(bytes).await {
673
3
                                        Ok(()) => {
674
3
                                            let _ = bytes_clone.len();
675
3
                                        }
676
0
                                        Err(e) => {
677
0
                                            return Some((
678
0
                                                RetryResult::Err(make_err!(
679
0
                                                    Code::Aborted,
680
0
                                                    "Error sending bytes to consumer in ONTAP S3: {e}"
681
0
                                                )),
682
0
                                                writer,
683
0
                                            ));
684
                                        }
685
                                    }
686
                                }
687
0
                                Err(e) => {
688
0
                                    return Some((
689
0
                                        RetryResult::Retry(make_err!(
690
0
                                            Code::Aborted,
691
0
                                            "Bad bytestream element in ONTAP S3: {e}"
692
0
                                        )),
693
0
                                        writer,
694
0
                                    ));
695
                                }
696
                            }
697
                        }
698
699
                        // EOF handling
700
3
                        if let Err(
e0
) = writer.send_eof() {
  Branch (700:32): [True: 0, False: 0]
  Branch (700:32): [Folded - Ignored]
  Branch (700:32): [True: 0, False: 0]
  Branch (700:32): [True: 0, False: 3]
701
0
                            return Some((
702
0
                                RetryResult::Err(make_err!(
703
0
                                    Code::Aborted,
704
0
                                    "Failed to send EOF to consumer in ONTAP S3: {e}"
705
0
                                )),
706
0
                                writer,
707
0
                            ));
708
3
                        }
709
710
3
                        Some((RetryResult::Ok(()), writer))
711
                    }
712
0
                    Err(sdk_error) => {
713
                        // Clone sdk_error before moving
714
0
                        let error_description = format!("{sdk_error:?}");
715
0
                        match sdk_error.into_service_error() {
716
0
                            GetObjectError::NoSuchKey(e) => {
717
0
                                Some((
718
0
                                    RetryResult::Err(make_err!(
719
0
                                        Code::NotFound,
720
0
                                        "No such key in ONTAP S3: {e}"
721
0
                                    )),
722
0
                                    writer,
723
0
                                ))
724
                            }
725
0
                            _ => Some((
726
0
                                RetryResult::Retry(make_err!(
727
0
                                    Code::Unavailable,
728
0
                                    "Unhandled GetObjectError in ONTAP S3: {error_description}"
729
0
                                )),
730
0
                                writer,
731
0
                            )),
732
                        }
733
                    },
734
                }
735
6
            }))
736
3
            .await
737
8
    }
738
739
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
740
0
        self
741
0
    }
742
743
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
744
0
        self
745
0
    }
746
747
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
748
0
        self
749
0
    }
750
}
751
752
#[async_trait]
753
impl<I, NowFn> HealthStatusIndicator for OntapS3Store<NowFn>
754
where
755
    I: InstantWrapper,
756
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
757
{
758
0
    fn get_name(&self) -> &'static str {
759
0
        "OntapS3Store"
760
0
    }
761
762
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
763
0
        StoreDriver::check_health(Pin::new(self), namespace).await
764
0
    }
765
}