Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::fs::File;
20
use std::io::BufReader;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
30
use aws_sdk_s3::operation::get_object::GetObjectError;
31
use aws_sdk_s3::operation::head_object::HeadObjectError;
32
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
33
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
34
use base64::Engine;
35
use base64::prelude::BASE64_STANDARD_NO_PAD;
36
use bytes::BytesMut;
37
use futures::future::{Either, FusedFuture};
38
use futures::stream::{FuturesUnordered, unfold};
39
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
40
use hyper_rustls::ConfigBuilderExt;
41
use nativelink_config::stores::ExperimentalOntapS3Spec;
42
use nativelink_error::{Code, Error, ResultExt, make_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{
45
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
46
};
47
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
48
use nativelink_util::instant_wrapper::InstantWrapper;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
51
use parking_lot::Mutex;
52
use rustls::{ClientConfig, RootCertStore};
53
use rustls_pki_types::CertificateDer;
54
use rustls_pki_types::pem::PemObject;
55
use sha2::{Digest, Sha256};
56
use tokio::time::sleep;
57
use tracing::{Level, event, warn};
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::common_s3_utils::TlsClient;
61
62
// S3 parts cannot be smaller than this number
63
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB
64
65
// S3 parts cannot be larger than this number
66
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
67
68
// S3 parts cannot be more than this number
69
const MAX_UPLOAD_PARTS: usize = 10_000;
70
71
// Default max buffer size for retrying upload requests
72
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 20 * 1024 * 1024; // 20MB
73
74
// Default limit for concurrent part uploads per multipart upload
75
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
76
77
type RemoveCallback = Arc<dyn RemoveItemCallback>;
78
79
#[derive(Debug, MetricsComponent)]
80
pub struct OntapS3Store<NowFn> {
81
    s3_client: Arc<Client>,
82
    now_fn: NowFn,
83
    #[metric(help = "The bucket name for the ONTAP S3 store")]
84
    bucket: String,
85
    #[metric(help = "The key prefix for the ONTAP S3 store")]
86
    key_prefix: String,
87
    retrier: Retrier,
88
    #[metric(help = "The number of seconds to consider an object expired")]
89
    consider_expired_after_s: i64,
90
    #[metric(help = "The number of bytes to buffer for retrying requests")]
91
    max_retry_buffer_per_request: usize,
92
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
93
    multipart_max_concurrent_uploads: usize,
94
95
    remove_callbacks: Mutex<Vec<RemoveCallback>>,
96
}
97
98
0
pub fn load_custom_certs(cert_path: &str) -> Result<Arc<ClientConfig>, Error> {
99
0
    let mut root_store = RootCertStore::empty();
100
101
    // Create a BufReader from the cert file
102
0
    let mut cert_reader = BufReader::new(
103
0
        File::open(cert_path)
104
0
            .err_tip(|| format!("Failed to open CA certificate file {cert_path}"))?,
105
    );
106
107
    // Parse certificates
108
0
    let certs = CertificateDer::pem_reader_iter(&mut cert_reader).collect::<Result<Vec<_>, _>>()?;
109
110
    // Add each certificate to the root store
111
0
    for cert in certs {
112
0
        root_store.add(cert).map_err(|e| {
113
0
            Error::from_std_err(Code::Internal, &e)
114
0
                .append("Failed to add certificate to root store")
115
0
        })?;
116
    }
117
118
    // Build the client config with the root store
119
0
    let config = ClientConfig::builder()
120
0
        .with_root_certificates(root_store)
121
0
        .with_no_client_auth();
122
123
0
    Ok(Arc::new(config))
124
0
}
125
126
impl<I, NowFn> OntapS3Store<NowFn>
127
where
128
    I: InstantWrapper,
129
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
130
{
131
3
    pub async fn new(spec: &ExperimentalOntapS3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
132
        // Load custom CA config
133
3
        let ca_config = if let Some(
cert_path0
) = &spec.root_certificates {
134
0
            load_custom_certs(cert_path)?
135
        } else {
136
3
            Arc::new(
137
3
                ClientConfig::builder()
138
3
                    .with_native_roots()
?0
139
3
                    .with_no_client_auth(),
140
            )
141
        };
142
143
3
        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
144
3
            .with_tls_config((*ca_config).clone())
145
3
            .https_only()
146
3
            .enable_http1()
147
3
            .enable_http2()
148
3
            .build();
149
150
3
        let http_client = TlsClient::with_https_connector(&spec.common, https_connector);
151
152
3
        let credentials_provider = DefaultCredentialsChain::builder()
153
3
            .configure(
154
3
                ProviderConfig::without_region()
155
3
                    .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
156
3
                    .with_http_client(http_client.clone()),
157
3
            )
158
3
            .build()
159
3
            .await;
160
161
3
        let config = aws_sdk_s3::Config::builder()
162
3
            .credentials_provider(credentials_provider)
163
3
            .endpoint_url(&spec.endpoint)
164
3
            .region(Region::new(spec.vserver_name.clone()))
165
3
            .app_name(aws_config::AppName::new("nativelink").expect("valid app name"))
166
3
            .http_client(http_client)
167
3
            .force_path_style(true)
168
3
            .behavior_version(BehaviorVersion::latest())
169
3
            .timeout_config(
170
3
                aws_config::timeout::TimeoutConfig::builder()
171
3
                    .connect_timeout(Duration::from_secs(30))
172
3
                    .operation_timeout(Duration::from_mins(2))
173
3
                    .build(),
174
            )
175
3
            .build();
176
177
3
        let s3_client = Client::from_conf(config);
178
179
3
        Self::new_with_client_and_jitter(
180
3
            spec,
181
3
            s3_client,
182
3
            spec.common.retry.make_jitter_fn(),
183
3
            now_fn,
184
        )
185
3
    }
186
187
15
    pub fn new_with_client_and_jitter(
188
15
        spec: &ExperimentalOntapS3Spec,
189
15
        s3_client: Client,
190
15
        jitter_fn: Arc<dyn (Fn(Duration) -> Duration) + Send + Sync>,
191
15
        now_fn: NowFn,
192
15
    ) -> Result<Arc<Self>, Error> {
193
15
        Ok(Arc::new(Self {
194
15
            s3_client: Arc::new(s3_client),
195
15
            now_fn,
196
15
            bucket: spec.bucket.clone(),
197
15
            key_prefix: spec
198
15
                .common
199
15
                .key_prefix
200
15
                .as_ref()
201
15
                .unwrap_or(&String::new())
202
15
                .clone(),
203
15
            retrier: Retrier::new(
204
15
                Arc::new(|duration| 
Box::pin1
(
sleep1
(
duration1
))),
205
15
                jitter_fn,
206
15
                spec.common.retry.clone(),
207
            ),
208
15
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
209
15
            max_retry_buffer_per_request: spec
210
15
                .common
211
15
                .max_retry_buffer_per_request
212
15
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
213
15
            multipart_max_concurrent_uploads: spec
214
15
                .common
215
15
                .multipart_max_concurrent_uploads
216
15
                .unwrap_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS),
217
15
            remove_callbacks: Mutex::new(vec![]),
218
        }))
219
15
    }
220
221
15
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
222
15
        format!("{}{}", self.key_prefix, key.as_str())
223
15
    }
224
225
9
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
226
9
        let digest_clone = digest.into_owned();
227
9
        self.retrier
228
10
            .
retry9
(
unfold9
(
()9
, move |state| {
229
10
                let local_digest = digest_clone.clone();
230
10
                async move {
231
10
                    let result = self
232
10
                        .s3_client
233
10
                        .head_object()
234
10
                        .bucket(&self.bucket)
235
10
                        .key(self.make_s3_path(&local_digest))
236
10
                        .send()
237
10
                        .await;
238
239
10
                    match result {
240
8
                        Ok(head_object_output) => {
241
8
                            if self.consider_expired_after_s != 0
242
2
                                && let Some(last_modified) = head_object_output.last_modified
243
                            {
244
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
245
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
246
1
                                    let remove_callbacks = self.remove_callbacks.lock().clone();
247
1
                                    let mut callbacks: FuturesUnordered<_> = remove_callbacks
248
1
                                        .into_iter()
249
1
                                        .map(|callback| 
{0
250
0
                                            let store_key = local_digest.borrow();
251
0
                                            async move { callback.callback(store_key).await }
252
0
                                        })
253
1
                                        .collect();
254
1
                                    while callbacks.next().await.is_some() 
{}0
255
1
                                    return Some((RetryResult::Ok(None), state));
256
1
                                }
257
6
                            }
258
7
                            let Some(length) = head_object_output.content_length else {
259
0
                                return Some((RetryResult::Ok(None), state));
260
                            };
261
7
                            if length >= 0 {
262
7
                                return Some((RetryResult::Ok(Some(length as u64)), state));
263
0
                            }
264
0
                            Some((
265
0
                                RetryResult::Err(make_err!(
266
0
                                    Code::InvalidArgument,
267
0
                                    "Negative content length in ONTAP S3: {length:?}"
268
0
                                )),
269
0
                                state,
270
0
                            ))
271
                        }
272
2
                        Err(sdk_error) => match sdk_error.into_service_error() {
273
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
274
1
                            other => Some((
275
1
                                RetryResult::Retry(make_err!(
276
1
                                    Code::Unavailable,
277
1
                                    "Unhandled HeadObjectError in ONTAP S3: {other:?}"
278
1
                                )),
279
1
                                state,
280
1
                            )),
281
                        },
282
                    }
283
10
                }
284
10
            }))
285
9
            .await
286
9
    }
287
}
288
289
#[async_trait]
290
impl<I, NowFn> StoreDriver for OntapS3Store<NowFn>
291
where
292
    I: InstantWrapper,
293
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
294
{
295
    async fn has_with_results(
296
        self: Pin<&Self>,
297
        keys: &[StoreKey<'_>],
298
        results: &mut [Option<u64>],
299
10
    ) -> Result<(), Error> {
300
        keys.iter()
301
            .zip(results.iter_mut())
302
10
            .map(|(key, result)| async move {
303
10
                if is_zero_digest(key.borrow()) {
304
1
                    *result = Some(0);
305
1
                    return Ok::<_, Error>(());
306
9
                }
307
308
9
                match self.has(key.borrow()).await {
309
9
                    Ok(size) => {
310
9
                        *result = size;
311
9
                        if size.is_none() {
312
2
                            event!(
313
2
                                Level::INFO,
314
2
                                key = %key.as_str(),
315
                                "Object not found in ONTAP S3"
316
                            );
317
7
                        }
318
9
                        Ok(())
319
                    }
320
0
                    Err(err) => {
321
0
                        event!(
322
0
                            Level::ERROR,
323
0
                            key = %key.as_str(),
324
                            error = ?err,
325
                            "Error checking object existence"
326
                        );
327
0
                        Err(err)
328
                    }
329
                }
330
20
            })
331
            .collect::<FuturesUnordered<_>>()
332
            .try_collect()
333
            .await
334
10
    }
335
336
    async fn update(
337
        self: Pin<&Self>,
338
        key: StoreKey<'_>,
339
        mut reader: DropCloserReadHalf,
340
        size_info: UploadSizeInfo,
341
2
    ) -> Result<(), Error> {
342
        let s3_path = &self.make_s3_path(&key);
343
344
        let max_size = match size_info {
345
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
346
        };
347
348
        // For small files, use simple upload. For larger files or unknown size, use multipart
349
        if max_size < MIN_MULTIPART_SIZE && matches!(size_info, UploadSizeInfo::ExactSize(_)) {
350
            let UploadSizeInfo::ExactSize(sz) = size_info else {
351
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
352
            };
353
            reader.set_max_recent_data_size(
354
                u64::try_from(self.max_retry_buffer_per_request)
355
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
356
            );
357
            return self.retrier.retry(
358
1
                unfold(reader, move |mut reader| async move {
359
1
                    let (mut tx, mut rx) = make_buf_channel_pair();
360
361
1
                    let result = {
362
1
                        let reader_ref = &mut reader;
363
1
                        let (upload_res, bind_res): (Result<(), Error>, Result<(), Error>) = tokio::join!(async move {
364
1
                            let raw_body_bytes = {
365
1
                                let mut raw_body_chunks = BytesMut::new();
366
                                loop {
367
51
                                    match rx.recv().await {
368
51
                                        Ok(chunk) => {
369
51
                                            if chunk.is_empty() {
370
1
                                                break Ok(raw_body_chunks.freeze());
371
50
                                            }
372
50
                                            raw_body_chunks.extend_from_slice(&chunk);
373
                                        }
374
0
                                        Err(err) => {
375
0
                                            break Err(err);
376
                                        }
377
                                    }
378
                                }
379
                            };
380
1
                            let internal_res = match raw_body_bytes {
381
1
                                Ok(body_bytes) => {
382
1
                                    let hash = Sha256::digest(&body_bytes);
383
1
                                    let send_res = self.s3_client
384
1
                                        .put_object()
385
1
                                        .bucket(&self.bucket)
386
1
                                        .key(s3_path.clone())
387
1
                                        .content_length(sz as i64)
388
1
                                        .body(
389
1
                                            ByteStream::from(body_bytes)
390
                                        )
391
1
                                        .set_checksum_algorithm(Some(aws_sdk_s3::types::ChecksumAlgorithm::Sha256))
392
1
                                        .set_checksum_sha256(Some(BASE64_STANDARD_NO_PAD.encode(hash)))
393
1
                                        .customize()
394
1
                                        .mutate_request(|req| {req.headers_mut().insert("x-amz-content-sha256", "UNSIGNED-PAYLOAD");})
395
1
                                        .send();
396
1
                                    Either::Left(send_res.map_ok_or_else(|e| Err(
397
1
                                        
Error::from_std_err0
(
Code::Aborted0
,
&e0
)), |_| Ok(())))
398
                                    }
399
0
                                Err(collect_err) => {
400
0
                                    async fn make_collect_err(collect_err: Error) -> Result<(), Error> {
401
0
                                        Err(collect_err)
402
0
                                    }
403
404
0
                                    warn!(
405
                                        ?collect_err,
406
                                        "Failed to get body");
407
0
                                    let future_err = make_collect_err(collect_err);
408
0
                                    Either::Right(future_err)
409
                                }
410
                            };
411
1
                            internal_res.await
412
1
                        },
413
1
                            tx.bind_buffered(reader_ref)
414
                        );
415
1
                        upload_res
416
1
                            .merge(bind_res)
417
1
                            .err_tip(|| "Failed to upload file to ONTAP S3 in single chunk")
418
                    };
419
420
1
                    let retry_result = result.map_or_else(
421
0
                        |mut err| {
422
0
                            err.code = Code::Aborted;
423
0
                            let bytes_received = reader.get_bytes_received();
424
0
                            if let Err(try_reset_err) = reader.try_reset_stream() {
425
0
                                event!(
426
0
                                    Level::ERROR,
427
                                    ?bytes_received,
428
                                    err = ?try_reset_err,
429
                                    "Unable to reset stream after failed upload in OntapS3Store::update"
430
                                );
431
0
                                return RetryResult::Err(
432
0
                                    err
433
0
                                        .merge(try_reset_err)
434
0
                                        .append(
435
0
                                            format!(
436
0
                                                "Failed to retry upload with {bytes_received} bytes received in OntapS3Store::update"
437
0
                                            )
438
0
                                        )
439
0
                                );
440
0
                            }
441
0
                            let err = err.append(
442
0
                                format!(
443
                                    "Retry on upload happened with {bytes_received} bytes received in OntapS3Store::update"
444
                                )
445
                            );
446
0
                            event!(Level::INFO, ?err, ?bytes_received, "Retryable ONTAP S3 error");
447
0
                            RetryResult::Retry(err)
448
0
                        },
449
1
                        |()| RetryResult::Ok(())
450
                    );
451
1
                    Some((retry_result, reader))
452
2
                })
453
            ).await;
454
        }
455
456
        // Handle multipart upload for large files
457
        let upload_id = &self
458
            .retrier
459
1
            .retry(unfold((), move |()| async move {
460
1
                let retry_result = self
461
1
                    .s3_client
462
1
                    .create_multipart_upload()
463
1
                    .bucket(&self.bucket)
464
1
                    .key(s3_path)
465
1
                    .send()
466
1
                    .await
467
1
                    .map_or_else(
468
0
                        |e| {
469
0
                            RetryResult::Retry(
470
0
                                Error::from_std_err(Code::Aborted, &e)
471
0
                                    .append("Failed to create multipart upload to ONTAP S3"),
472
0
                            )
473
0
                        },
474
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
475
1
                            upload_id.map_or_else(
476
0
                                || {
477
0
                                    RetryResult::Err(make_err!(
478
0
                                        Code::Internal,
479
0
                                        "Expected upload_id to be set by ONTAP S3 response"
480
0
                                    ))
481
0
                                },
482
                                RetryResult::Ok,
483
                            )
484
1
                        },
485
                    );
486
1
                Some((retry_result, ()))
487
2
            }))
488
            .await?;
489
490
        let bytes_per_upload_part =
491
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
492
493
1
        let upload_parts = move || async move {
494
1
            let (tx, mut rx) = tokio::sync::mpsc::channel(self.multipart_max_concurrent_uploads);
495
496
1
            let read_stream_fut = (
497
1
                async move {
498
1
                    let retrier = &Pin::get_ref(self).retrier;
499
4
                    for part_number in 
1..i32::MAX1
{
500
4
                        let write_buf = reader
501
4
                            .consume(
502
                                Some(
503
4
                                    usize
504
4
                                        ::try_from(bytes_per_upload_part)
505
4
                                        .err_tip(
506
                                            || "Could not convert bytes_per_upload_part to usize"
507
0
                                        )?
508
                                )
509
4
                            ).await
510
4
                            .err_tip(|| "Failed to read chunk in ontap_s3_store")
?0
;
511
4
                        if write_buf.is_empty() {
512
1
                            break;
513
3
                        }
514
515
3
                        tx
516
3
                            .send(
517
3
                                retrier.retry(
518
3
                                    unfold(write_buf, move |write_buf| async move {
519
3
                                        let retry_result = self.s3_client
520
3
                                            .upload_part()
521
3
                                            .bucket(&self.bucket)
522
3
                                            .key(s3_path)
523
3
                                            .upload_id(upload_id)
524
3
                                            .body(ByteStream::new(SdkBody::from(write_buf.clone())))
525
3
                                            .part_number(part_number)
526
3
                                            .send().await
527
3
                                            .map_or_else(
528
0
                                                |e| {
529
0
                                                    RetryResult::Retry(
530
0
                                                        Error::from_std_err(
531
0
                                                            Code::Aborted,&e).append(
532
0
                                                            "Failed to upload part {part_number} in ONTAP S3 store"
533
0
                                                        )
534
0
                                                    )
535
0
                                                },
536
3
                                                |mut response| {
537
3
                                                    RetryResult::Ok(
538
3
                                                        CompletedPartBuilder::default()
539
3
                                                            .set_e_tag(response.e_tag.take())
540
3
                                                            .part_number(part_number)
541
3
                                                            .build()
542
3
                                                    )
543
3
                                                }
544
                                            );
545
3
                                        Some((retry_result, write_buf))
546
6
                                    })
547
                                )
548
3
                            ).await
549
3
                            .map_err(|err| 
{0
550
0
                                Error::from_std_err(Code::Internal, &err).append("Failed to send part to channel in ontap_s3_store")
551
0
                            })?;
552
                    }
553
1
                    Result::<_, Error>::Ok(())
554
                }
555
1
            ).fuse();
556
557
1
            let mut upload_futures = FuturesUnordered::new();
558
1
            let mut completed_parts = Vec::with_capacity(
559
1
                usize::try_from(cmp::min(
560
1
                    MAX_UPLOAD_PARTS as u64,
561
1
                    max_size / bytes_per_upload_part + 1,
562
                ))
563
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
564
            );
565
566
1
            tokio::pin!(read_stream_fut);
567
            loop {
568
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures4
.
is_empty4
() {
569
1
                    break;
570
7
                }
571
7
                tokio::select! {
572
7
                    
result1
= &mut read_stream_fut =>
result1
?0
,
573
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
574
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
575
                }
576
            }
577
578
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
579
580
1
            self.retrier
581
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
582
                    Some((
583
1
                        self.s3_client
584
1
                            .complete_multipart_upload()
585
1
                            .bucket(&self.bucket)
586
1
                            .key(s3_path)
587
1
                            .multipart_upload(
588
1
                                CompletedMultipartUploadBuilder::default()
589
1
                                    .set_parts(Some(completed_parts.clone()))
590
1
                                    .build(),
591
1
                            )
592
1
                            .upload_id(upload_id)
593
1
                            .send()
594
1
                            .await
595
1
                            .map_or_else(
596
0
                                |e| {
597
0
                                    RetryResult::Retry(
598
0
                                        Error::from_std_err(Code::Aborted, &e).append(
599
0
                                            "Failed to complete multipart upload in ONTAP S3 store",
600
0
                                        ),
601
0
                                    )
602
0
                                },
603
1
                                |_| RetryResult::Ok(()),
604
                            ),
605
1
                        completed_parts,
606
                    ))
607
2
                }))
608
1
                .await
609
2
        };
610
611
        upload_parts()
612
0
            .or_else(move |e| async move {
613
0
                Result::<(), _>::Err(e).merge(
614
0
                    self.s3_client
615
0
                        .abort_multipart_upload()
616
0
                        .bucket(&self.bucket)
617
0
                        .key(s3_path)
618
0
                        .upload_id(upload_id)
619
0
                        .send()
620
0
                        .await
621
0
                        .map_or_else(
622
0
                            |e| {
623
0
                                let err = Error::from_std_err(Code::Aborted, &e)
624
0
                                    .append("Failed to abort multipart upload in ONTAP S3 store");
625
0
                                event!(Level::INFO, ?err, "Multipart upload error");
626
0
                                Err(err)
627
0
                            },
628
0
                            |_| Ok(()),
629
                        ),
630
                )
631
0
            })
632
            .await
633
2
    }
634
635
    async fn get_part(
636
        self: Pin<&Self>,
637
        key: StoreKey<'_>,
638
        writer: &mut DropCloserWriteHalf,
639
        offset: u64,
640
        length: Option<u64>,
641
4
    ) -> Result<(), Error> {
642
        if is_zero_digest(key.borrow()) {
643
            writer
644
                .send_eof()
645
                .err_tip(|| "Failed to send zero EOF in ONTAP S3 store get_part")?;
646
            return Ok(());
647
        }
648
649
        let s3_path = &self.make_s3_path(&key);
650
        let end_read_byte = length
651
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
652
            .err_tip(|| "Integer overflow protection triggered")?;
653
654
        self.retrier
655
3
            .retry(unfold(writer, move |writer| async move {
656
3
                let result = self
657
3
                    .s3_client
658
3
                    .get_object()
659
3
                    .bucket(&self.bucket)
660
3
                    .key(s3_path)
661
3
                    .range(format!(
662
                        "bytes={}-{}",
663
3
                        offset + writer.get_bytes_written(),
664
3
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
665
                    ))
666
3
                    .send()
667
3
                    .await;
668
669
3
                match result {
670
3
                    Ok(head_object_output) => {
671
3
                        let mut s3_in_stream = head_object_output.body;
672
3
                        let _bytes_sent = 0;
673
674
7
                        while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
675
4
                            match maybe_bytes {
676
4
                                Ok(bytes) => {
677
4
                                    if bytes.is_empty() {
678
1
                                        continue;
679
3
                                    }
680
681
                                    // Clone bytes before sending
682
3
                                    let bytes_clone = bytes.clone();
683
684
                                    // More robust sending mechanism
685
3
                                    match writer.send(bytes).await {
686
3
                                        Ok(()) => {
687
3
                                            let _ = bytes_clone.len();
688
3
                                        }
689
0
                                        Err(e) => {
690
0
                                            return Some((
691
0
                                                RetryResult::Err(Error::from_std_err(
692
0
                                                    Code::Aborted,&e).append(
693
0
                                                    "Error sending bytes to consumer in ONTAP S3"
694
0
                                                )),
695
0
                                                writer,
696
0
                                            ));
697
                                        }
698
                                    }
699
                                }
700
0
                                Err(e) => {
701
0
                                    return Some((
702
0
                                        RetryResult::Retry(
703
0
                                            Error::from_std_err(Code::Aborted, &e)
704
0
                                                .append("Bad bytestream element in ONTAP S3"),
705
0
                                        ),
706
0
                                        writer,
707
0
                                    ));
708
                                }
709
                            }
710
                        }
711
712
                        // EOF handling
713
3
                        if let Err(
e0
) = writer.send_eof() {
714
0
                            return Some((
715
0
                                RetryResult::Err(
716
0
                                    Error::from_std_err(Code::Aborted, &e)
717
0
                                        .append("Failed to send EOF to consumer in ONTAP S3"),
718
0
                                ),
719
0
                                writer,
720
0
                            ));
721
3
                        }
722
723
3
                        Some((RetryResult::Ok(()), writer))
724
                    }
725
0
                    Err(sdk_error) => match sdk_error.into_service_error() {
726
0
                        GetObjectError::NoSuchKey(e) => Some((
727
0
                            RetryResult::Err(
728
0
                                Error::from_std_err(Code::NotFound, &e)
729
0
                                    .append("No such key in ONTAP S3"),
730
0
                            ),
731
0
                            writer,
732
0
                        )),
733
0
                        other => Some((
734
0
                            RetryResult::Retry(
735
0
                                Error::from_std_err(Code::Unavailable, &other)
736
0
                                    .append("Unhandled GetObjectError in ONTAP S3"),
737
0
                            ),
738
0
                            writer,
739
0
                        )),
740
                    },
741
                }
742
6
            }))
743
            .await
744
4
    }
745
746
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
747
0
        self
748
0
    }
749
750
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
751
0
        self
752
0
    }
753
754
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
755
0
        self
756
0
    }
757
758
3
    fn register_remove_callback(
759
3
        self: Arc<Self>,
760
3
        callback: Arc<dyn RemoveItemCallback>,
761
3
    ) -> Result<(), Error> {
762
3
        self.remove_callbacks.lock().push(callback);
763
3
        Ok(())
764
3
    }
765
}
766
767
#[async_trait]
768
impl<I, NowFn> HealthStatusIndicator for OntapS3Store<NowFn>
769
where
770
    I: InstantWrapper,
771
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
772
{
773
0
    fn get_name(&self) -> &'static str {
774
0
        "OntapS3Store"
775
0
    }
776
777
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
778
        StoreDriver::check_health(Pin::new(self), namespace).await
779
0
    }
780
}