Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ontap_s3_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::fs::File;
20
use std::io::BufReader;
21
use std::sync::Arc;
22
23
use async_trait::async_trait;
24
use aws_config::BehaviorVersion;
25
use aws_config::default_provider::credentials::DefaultCredentialsChain;
26
use aws_config::provider_config::ProviderConfig;
27
use aws_sdk_s3::Client;
28
use aws_sdk_s3::config::Region;
29
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
30
use aws_sdk_s3::operation::get_object::GetObjectError;
31
use aws_sdk_s3::operation::head_object::HeadObjectError;
32
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
33
use aws_sdk_s3::types::builders::{CompletedMultipartUploadBuilder, CompletedPartBuilder};
34
use base64::Engine;
35
use base64::prelude::BASE64_STANDARD_NO_PAD;
36
use bytes::BytesMut;
37
use futures::future::{Either, FusedFuture};
38
use futures::stream::{FuturesUnordered, unfold};
39
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
40
use hyper_rustls::ConfigBuilderExt;
41
use nativelink_config::stores::ExperimentalOntapS3Spec;
42
use nativelink_error::{Code, Error, ResultExt, make_err};
43
use nativelink_metric::MetricsComponent;
44
use nativelink_util::buf_channel::{
45
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
46
};
47
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
48
use nativelink_util::instant_wrapper::InstantWrapper;
49
use nativelink_util::retry::{Retrier, RetryResult};
50
use nativelink_util::store_trait::{RemoveItemCallback, StoreDriver, StoreKey, UploadSizeInfo};
51
use parking_lot::Mutex;
52
use rustls::{ClientConfig, RootCertStore};
53
use rustls_pki_types::CertificateDer;
54
use rustls_pki_types::pem::PemObject;
55
use sha2::{Digest, Sha256};
56
use tokio::time::sleep;
57
use tracing::{Level, event, warn};
58
59
use crate::cas_utils::is_zero_digest;
60
use crate::common_s3_utils::TlsClient;
61
62
// S3 parts cannot be smaller than this number
63
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB
64
65
// S3 parts cannot be larger than this number
66
const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
67
68
// S3 parts cannot be more than this number
69
const MAX_UPLOAD_PARTS: usize = 10_000;
70
71
// Default max buffer size for retrying upload requests
72
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 20 * 1024 * 1024; // 20MB
73
74
// Default limit for concurrent part uploads per multipart upload
75
const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10;
76
77
type RemoveCallback = Arc<dyn RemoveItemCallback>;
78
79
#[derive(Debug, MetricsComponent)]
80
pub struct OntapS3Store<NowFn> {
81
    s3_client: Arc<Client>,
82
    now_fn: NowFn,
83
    #[metric(help = "The bucket name for the ONTAP S3 store")]
84
    bucket: String,
85
    #[metric(help = "The key prefix for the ONTAP S3 store")]
86
    key_prefix: String,
87
    retrier: Retrier,
88
    #[metric(help = "The number of seconds to consider an object expired")]
89
    consider_expired_after_s: i64,
90
    #[metric(help = "The number of bytes to buffer for retrying requests")]
91
    max_retry_buffer_per_request: usize,
92
    #[metric(help = "The number of concurrent uploads allowed for multipart uploads")]
93
    multipart_max_concurrent_uploads: usize,
94
95
    remove_callbacks: Mutex<Vec<RemoveCallback>>,
96
}
97
98
0
pub fn load_custom_certs(cert_path: &str) -> Result<Arc<ClientConfig>, Error> {
99
0
    let mut root_store = RootCertStore::empty();
100
101
    // Create a BufReader from the cert file
102
0
    let mut cert_reader = BufReader::new(
103
0
        File::open(cert_path)
104
0
            .err_tip(|| format!("Failed to open CA certificate file {cert_path}"))?,
105
    );
106
107
    // Parse certificates
108
0
    let certs = CertificateDer::pem_reader_iter(&mut cert_reader).collect::<Result<Vec<_>, _>>()?;
109
110
    // Add each certificate to the root store
111
0
    for cert in certs {
112
0
        root_store.add(cert).map_err(|e| {
113
0
            Error::from_std_err(Code::Internal, &e)
114
0
                .append("Failed to add certificate to root store")
115
0
        })?;
116
    }
117
118
    // Build the client config with the root store
119
0
    let config = ClientConfig::builder()
120
0
        .with_root_certificates(root_store)
121
0
        .with_no_client_auth();
122
123
0
    Ok(Arc::new(config))
124
0
}
125
126
impl<I, NowFn> OntapS3Store<NowFn>
127
where
128
    I: InstantWrapper,
129
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
130
{
131
3
    pub async fn new(spec: &ExperimentalOntapS3Spec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
132
        // Load custom CA config
133
3
        let ca_config = if let Some(
cert_path0
) = &spec.root_certificates {
134
0
            load_custom_certs(cert_path)?
135
        } else {
136
3
            Arc::new(
137
3
                ClientConfig::builder()
138
3
                    .with_native_roots()
?0
139
3
                    .with_no_client_auth(),
140
            )
141
        };
142
143
3
        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
144
3
            .with_tls_config((*ca_config).clone())
145
3
            .https_only()
146
3
            .enable_http1()
147
3
            .enable_http2()
148
3
            .build();
149
150
3
        let http_client = TlsClient::with_https_connector(&spec.common, https_connector);
151
152
3
        let credentials_provider = DefaultCredentialsChain::builder()
153
3
            .configure(
154
3
                ProviderConfig::without_region()
155
3
                    .with_region(Some(Region::new(Cow::Owned(spec.vserver_name.clone()))))
156
3
                    .with_http_client(http_client.clone()),
157
3
            )
158
3
            .build()
159
3
            .await;
160
161
3
        let config = aws_sdk_s3::Config::builder()
162
3
            .credentials_provider(credentials_provider)
163
3
            .endpoint_url(&spec.endpoint)
164
3
            .region(Region::new(spec.vserver_name.clone()))
165
3
            .app_name(aws_config::AppName::new("nativelink").expect("valid app name"))
166
3
            .http_client(http_client)
167
3
            .force_path_style(true)
168
3
            .behavior_version(BehaviorVersion::latest())
169
3
            .timeout_config(
170
3
                aws_config::timeout::TimeoutConfig::builder()
171
3
                    .connect_timeout(Duration::from_secs(30))
172
3
                    .operation_timeout(Duration::from_mins(2))
173
3
                    .build(),
174
            )
175
3
            .build();
176
177
3
        let s3_client = Client::from_conf(config);
178
179
3
        Self::new_with_client_and_jitter(
180
3
            spec,
181
3
            s3_client,
182
3
            spec.common.retry.make_jitter_fn(),
183
3
            now_fn,
184
        )
185
3
    }
186
187
15
    pub fn new_with_client_and_jitter(
188
15
        spec: &ExperimentalOntapS3Spec,
189
15
        s3_client: Client,
190
15
        jitter_fn: Arc<dyn (Fn(Duration) -> Duration) + Send + Sync>,
191
15
        now_fn: NowFn,
192
15
    ) -> Result<Arc<Self>, Error> {
193
15
        Ok(Arc::new(Self {
194
15
            s3_client: Arc::new(s3_client),
195
15
            now_fn,
196
15
            bucket: spec.bucket.clone(),
197
15
            key_prefix: spec
198
15
                .common
199
15
                .key_prefix
200
15
                .as_ref()
201
15
                .unwrap_or(&String::new())
202
15
                .clone(),
203
15
            retrier: Retrier::new(
204
15
                Arc::new(|duration| 
Box::pin1
(
sleep1
(
duration1
))),
205
15
                jitter_fn,
206
15
                spec.common.retry.clone(),
207
            ),
208
15
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
209
15
            max_retry_buffer_per_request: spec
210
15
                .common
211
15
                .max_retry_buffer_per_request
212
15
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
213
15
            multipart_max_concurrent_uploads: spec
214
15
                .common
215
15
                .multipart_max_concurrent_uploads
216
15
                .unwrap_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS),
217
15
            remove_callbacks: Mutex::new(vec![]),
218
        }))
219
15
    }
220
221
15
    fn make_s3_path(&self, key: &StoreKey<'_>) -> String {
222
15
        format!("{}{}", self.key_prefix, key.as_str())
223
15
    }
224
225
9
    async fn has(self: Pin<&Self>, digest: StoreKey<'_>) -> Result<Option<u64>, Error> {
226
9
        let digest_clone = digest.into_owned();
227
9
        self.retrier
228
10
            .
retry9
(
unfold9
(
()9
, move |state| {
229
10
                let local_digest = digest_clone.clone();
230
10
                async move {
231
10
                    let result = self
232
10
                        .s3_client
233
10
                        .head_object()
234
10
                        .bucket(&self.bucket)
235
10
                        .key(self.make_s3_path(&local_digest))
236
10
                        .send()
237
10
                        .await;
238
239
10
                    match result {
240
8
                        Ok(head_object_output) => {
241
8
                            if self.consider_expired_after_s != 0
242
2
                                && let Some(last_modified) = head_object_output.last_modified
243
                            {
244
2
                                let now_s = (self.now_fn)().unix_timestamp() as i64;
245
2
                                if last_modified.secs() + self.consider_expired_after_s <= now_s {
246
1
                                    let remove_callbacks = self.remove_callbacks.lock().clone();
247
1
                                    let mut callbacks: FuturesUnordered<_> = remove_callbacks
248
1
                                        .into_iter()
249
1
                                        .map(|callback| 
{0
250
0
                                            let store_key = local_digest.borrow();
251
0
                                            async move { callback.callback(store_key).await }
252
0
                                        })
253
1
                                        .collect();
254
1
                                    while callbacks.next().await.is_some() 
{}0
255
1
                                    return Some((RetryResult::Ok(None), state));
256
1
                                }
257
6
                            }
258
7
                            let Some(length) = head_object_output.content_length else {
259
0
                                return Some((RetryResult::Ok(None), state));
260
                            };
261
7
                            if length >= 0 {
262
7
                                return Some((RetryResult::Ok(Some(length as u64)), state));
263
0
                            }
264
0
                            Some((
265
0
                                RetryResult::Err(make_err!(
266
0
                                    Code::InvalidArgument,
267
0
                                    "Negative content length in ONTAP S3: {length:?}"
268
0
                                )),
269
0
                                state,
270
0
                            ))
271
                        }
272
2
                        Err(sdk_error) => match sdk_error.into_service_error() {
273
1
                            HeadObjectError::NotFound(_) => Some((RetryResult::Ok(None), state)),
274
1
                            other => Some((
275
1
                                RetryResult::Retry(make_err!(
276
1
                                    Code::Unavailable,
277
1
                                    "Unhandled HeadObjectError in ONTAP S3: {other:?}"
278
1
                                )),
279
1
                                state,
280
1
                            )),
281
                        },
282
                    }
283
10
                }
284
10
            }))
285
9
            .await
286
9
    }
287
}
288
289
#[async_trait]
290
impl<I, NowFn> StoreDriver for OntapS3Store<NowFn>
291
where
292
    I: InstantWrapper,
293
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
294
{
295
    async fn has_with_results(
296
        self: Pin<&Self>,
297
        keys: &[StoreKey<'_>],
298
        results: &mut [Option<u64>],
299
10
    ) -> Result<(), Error> {
300
        keys.iter()
301
            .zip(results.iter_mut())
302
10
            .map(|(key, result)| async move {
303
10
                if is_zero_digest(key.borrow()) {
304
1
                    *result = Some(0);
305
1
                    return Ok::<_, Error>(());
306
9
                }
307
308
9
                match self.has(key.borrow()).await {
309
9
                    Ok(size) => {
310
9
                        *result = size;
311
9
                        if size.is_none() {
312
2
                            event!(
313
2
                                Level::INFO,
314
2
                                key = %key.as_str(),
315
                                "Object not found in ONTAP S3"
316
                            );
317
7
                        }
318
9
                        Ok(())
319
                    }
320
0
                    Err(err) => {
321
0
                        event!(
322
0
                            Level::ERROR,
323
0
                            key = %key.as_str(),
324
                            error = ?err,
325
                            "Error checking object existence"
326
                        );
327
0
                        Err(err)
328
                    }
329
                }
330
20
            })
331
            .collect::<FuturesUnordered<_>>()
332
            .try_collect()
333
            .await
334
10
    }
335
336
    async fn update(
337
        self: Pin<&Self>,
338
        key: StoreKey<'_>,
339
        mut reader: DropCloserReadHalf,
340
        size_info: UploadSizeInfo,
341
2
    ) -> Result<u64, Error> {
342
        let s3_path = &self.make_s3_path(&key);
343
344
        let max_size = match size_info {
345
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
346
        };
347
348
        // For small files, use simple upload. For larger files or unknown size, use multipart
349
        if max_size < MIN_MULTIPART_SIZE && matches!(size_info, UploadSizeInfo::ExactSize(_)) {
350
            let UploadSizeInfo::ExactSize(sz) = size_info else {
351
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
352
            };
353
            reader.set_max_recent_data_size(
354
                u64::try_from(self.max_retry_buffer_per_request)
355
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
356
            );
357
            return self.retrier.retry(
358
1
                unfold(reader, move |mut reader| async move {
359
1
                    let (mut tx, mut rx) = make_buf_channel_pair();
360
361
1
                    let result = {
362
1
                        let reader_ref = &mut reader;
363
1
                        let (upload_res, bind_res): (Result<u64, Error>, Result<(), Error>) = tokio::join!(async move {
364
1
                            let raw_body_bytes = {
365
1
                                let mut raw_body_chunks = BytesMut::new();
366
                                loop {
367
51
                                    match rx.recv().await {
368
51
                                        Ok(chunk) => {
369
51
                                            if chunk.is_empty() {
370
1
                                                break Ok(raw_body_chunks.freeze());
371
50
                                            }
372
50
                                            raw_body_chunks.extend_from_slice(&chunk);
373
                                        }
374
0
                                        Err(err) => {
375
0
                                            break Err(err);
376
                                        }
377
                                    }
378
                                }
379
                            };
380
1
                            let internal_res = match raw_body_bytes {
381
1
                                Ok(body_bytes) => {
382
1
                                    let hash = Sha256::digest(&body_bytes);
383
1
                                    let body_len: u64 = body_bytes.len().try_into().unwrap_or(0);
384
1
                                    let send_res = self.s3_client
385
1
                                        .put_object()
386
1
                                        .bucket(&self.bucket)
387
1
                                        .key(s3_path.clone())
388
1
                                        .content_length(sz as i64)
389
1
                                        .body(
390
1
                                            ByteStream::from(body_bytes)
391
                                        )
392
1
                                        .set_checksum_algorithm(Some(aws_sdk_s3::types::ChecksumAlgorithm::Sha256))
393
1
                                        .set_checksum_sha256(Some(BASE64_STANDARD_NO_PAD.encode(hash)))
394
1
                                        .customize()
395
1
                                        .mutate_request(|req| {req.headers_mut().insert("x-amz-content-sha256", "UNSIGNED-PAYLOAD");})
396
1
                                        .send();
397
1
                                    Either::Left(send_res.map_ok_or_else(|e| Err(
398
1
                                        
Error::from_std_err0
(
Code::Aborted0
,
&e0
)), move |_| Ok(body_len)))
399
                                    }
400
0
                                Err(collect_err) => {
401
0
                                    async fn make_collect_err(collect_err: Error) -> Result<u64, Error> {
402
0
                                        Err(collect_err)
403
0
                                    }
404
405
0
                                    warn!(
406
                                        ?collect_err,
407
                                        "Failed to get body");
408
0
                                    let future_err = make_collect_err(collect_err);
409
0
                                    Either::Right(future_err)
410
                                }
411
                            };
412
1
                            internal_res.await
413
1
                        },
414
1
                            tx.bind_buffered(reader_ref)
415
                        );
416
1
                        match (upload_res, bind_res) {
417
1
                            (Ok(size), Ok(())) => Ok(size),
418
0
                            (Err(e), _) | (_, Err(e)) => Err(e),
419
                        }
420
1
                        .err_tip(|| "Failed to upload file to ONTAP S3 in single chunk")
421
                    };
422
423
1
                    let retry_result = result.map_or_else(
424
0
                        |mut err| {
425
0
                            err.code = Code::Aborted;
426
0
                            let bytes_received = reader.get_bytes_received();
427
0
                            if let Err(try_reset_err) = reader.try_reset_stream() {
428
0
                                event!(
429
0
                                    Level::ERROR,
430
                                    ?bytes_received,
431
                                    err = ?try_reset_err,
432
                                    "Unable to reset stream after failed upload in OntapS3Store::update"
433
                                );
434
0
                                return RetryResult::Err(
435
0
                                    err
436
0
                                        .merge(try_reset_err)
437
0
                                        .append(
438
0
                                            format!(
439
0
                                                "Failed to retry upload with {bytes_received} bytes received in OntapS3Store::update"
440
0
                                            )
441
0
                                        )
442
0
                                );
443
0
                            }
444
0
                            let err = err.append(
445
0
                                format!(
446
                                    "Retry on upload happened with {bytes_received} bytes received in OntapS3Store::update"
447
                                )
448
                            );
449
0
                            event!(Level::INFO, ?err, ?bytes_received, "Retryable ONTAP S3 error");
450
0
                            RetryResult::Retry(err)
451
0
                        },
452
                        RetryResult::Ok
453
                    );
454
1
                    Some((retry_result, reader))
455
2
                })
456
            ).await;
457
        }
458
459
        // Handle multipart upload for large files
460
        let upload_id = &self
461
            .retrier
462
1
            .retry(unfold((), move |()| async move {
463
1
                let retry_result = self
464
1
                    .s3_client
465
1
                    .create_multipart_upload()
466
1
                    .bucket(&self.bucket)
467
1
                    .key(s3_path)
468
1
                    .send()
469
1
                    .await
470
1
                    .map_or_else(
471
0
                        |e| {
472
0
                            RetryResult::Retry(
473
0
                                Error::from_std_err(Code::Aborted, &e)
474
0
                                    .append("Failed to create multipart upload to ONTAP S3"),
475
0
                            )
476
0
                        },
477
1
                        |CreateMultipartUploadOutput { upload_id, .. }| {
478
1
                            upload_id.map_or_else(
479
0
                                || {
480
0
                                    RetryResult::Err(make_err!(
481
0
                                        Code::Internal,
482
0
                                        "Expected upload_id to be set by ONTAP S3 response"
483
0
                                    ))
484
0
                                },
485
                                RetryResult::Ok,
486
                            )
487
1
                        },
488
                    );
489
1
                Some((retry_result, ()))
490
2
            }))
491
            .await?;
492
493
        let bytes_per_upload_part =
494
            (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
495
496
1
        let upload_parts = move || async move {
497
1
            let (tx, mut rx) = tokio::sync::mpsc::channel(self.multipart_max_concurrent_uploads);
498
499
1
            let read_stream_fut = (
500
1
                async move {
501
1
                    let retrier = &Pin::get_ref(self).retrier;
502
1
                    let mut total_uploaded = 0;
503
4
                    for part_number in 
1..i32::MAX1
{
504
4
                        let write_buf = reader
505
4
                            .consume(
506
                                Some(
507
4
                                    usize
508
4
                                        ::try_from(bytes_per_upload_part)
509
4
                                        .err_tip(
510
                                            || "Could not convert bytes_per_upload_part to usize"
511
0
                                        )?
512
                                )
513
4
                            ).await
514
4
                            .err_tip(|| "Failed to read chunk in ontap_s3_store")
?0
;
515
4
                        if write_buf.is_empty() {
516
1
                            break;
517
3
                        }
518
519
3
                        total_uploaded += write_buf.len() as u64;
520
521
3
                        tx
522
3
                            .send(
523
3
                                retrier.retry(
524
3
                                    unfold(write_buf, move |write_buf| async move {
525
3
                                        let retry_result = self.s3_client
526
3
                                            .upload_part()
527
3
                                            .bucket(&self.bucket)
528
3
                                            .key(s3_path)
529
3
                                            .upload_id(upload_id)
530
3
                                            .body(ByteStream::new(SdkBody::from(write_buf.clone())))
531
3
                                            .part_number(part_number)
532
3
                                            .send().await
533
3
                                            .map_or_else(
534
0
                                                |e| {
535
0
                                                    RetryResult::Retry(
536
0
                                                        Error::from_std_err(
537
0
                                                            Code::Aborted,&e).append(
538
0
                                                            "Failed to upload part {part_number} in ONTAP S3 store"
539
0
                                                        )
540
0
                                                    )
541
0
                                                },
542
3
                                                |mut response| {
543
3
                                                    RetryResult::Ok(
544
3
                                                        CompletedPartBuilder::default()
545
3
                                                            .set_e_tag(response.e_tag.take())
546
3
                                                            .part_number(part_number)
547
3
                                                            .build()
548
3
                                                    )
549
3
                                                }
550
                                            );
551
3
                                        Some((retry_result, write_buf))
552
6
                                    })
553
                                )
554
3
                            ).await
555
3
                            .map_err(|err| 
{0
556
0
                                Error::from_std_err(Code::Internal, &err).append("Failed to send part to channel in ontap_s3_store")
557
0
                            })?;
558
                    }
559
1
                    Result::<_, Error>::Ok(total_uploaded)
560
                }
561
1
            ).fuse();
562
563
1
            let mut upload_futures = FuturesUnordered::new();
564
1
            let mut total_uploaded = 0;
565
1
            let mut completed_parts = Vec::with_capacity(
566
1
                usize::try_from(cmp::min(
567
1
                    MAX_UPLOAD_PARTS as u64,
568
1
                    max_size / bytes_per_upload_part + 1,
569
                ))
570
1
                .err_tip(|| "Could not convert u64 to usize")
?0
,
571
            );
572
573
1
            tokio::pin!(read_stream_fut);
574
            loop {
575
8
                if read_stream_fut.is_terminated() && 
rx7
.
is_empty7
() &&
upload_futures2
.
is_empty2
() {
576
1
                    break;
577
7
                }
578
7
                tokio::select! {
579
7
                    
result1
= &mut read_stream_fut => {
580
1
                        total_uploaded = result
?0
;
581
                    },
582
7
                    Some(
upload_result3
) = upload_futures.next() =>
completed_parts3
.
push3
(
upload_result3
?0
),
583
7
                    Some(
fut3
) = rx.recv() =>
upload_futures3
.
push3
(
fut3
),
584
                }
585
            }
586
587
1
            completed_parts.sort_unstable_by_key(|part| part.part_number);
588
589
1
            self.retrier
590
1
                .retry(unfold(completed_parts, move |completed_parts| async move {
591
                    Some((
592
1
                        self.s3_client
593
1
                            .complete_multipart_upload()
594
1
                            .bucket(&self.bucket)
595
1
                            .key(s3_path)
596
1
                            .multipart_upload(
597
1
                                CompletedMultipartUploadBuilder::default()
598
1
                                    .set_parts(Some(completed_parts.clone()))
599
1
                                    .build(),
600
1
                            )
601
1
                            .upload_id(upload_id)
602
1
                            .send()
603
1
                            .await
604
1
                            .map_or_else(
605
0
                                |e| {
606
0
                                    RetryResult::Retry(
607
0
                                        Error::from_std_err(Code::Aborted, &e).append(
608
0
                                            "Failed to complete multipart upload in ONTAP S3 store",
609
0
                                        ),
610
0
                                    )
611
0
                                },
612
1
                                |_| RetryResult::Ok(total_uploaded),
613
                            ),
614
1
                        completed_parts,
615
                    ))
616
2
                }))
617
1
                .await
618
2
        };
619
620
        upload_parts()
621
0
            .or_else(move |mut e| async move {
622
0
                let abort_res = self
623
0
                    .s3_client
624
0
                    .abort_multipart_upload()
625
0
                    .bucket(&self.bucket)
626
0
                    .key(s3_path)
627
0
                    .upload_id(upload_id)
628
0
                    .send()
629
0
                    .await;
630
0
                if let Err(abort_err) = abort_res {
631
0
                    let err = Error::from_std_err(Code::Aborted, &abort_err)
632
0
                        .append("Failed to abort multipart upload in ONTAP S3 store");
633
0
                    event!(Level::INFO, ?err, "Multipart upload error");
634
0
                    e = e.merge(err);
635
0
                }
636
0
                Err(e)
637
0
            })
638
            .await
639
2
    }
640
641
    async fn get_part(
642
        self: Pin<&Self>,
643
        key: StoreKey<'_>,
644
        writer: &mut DropCloserWriteHalf,
645
        offset: u64,
646
        length: Option<u64>,
647
4
    ) -> Result<(), Error> {
648
        if is_zero_digest(key.borrow()) {
649
            writer
650
                .send_eof()
651
                .err_tip(|| "Failed to send zero EOF in ONTAP S3 store get_part")?;
652
            return Ok(());
653
        }
654
655
        let s3_path = &self.make_s3_path(&key);
656
        let end_read_byte = length
657
2
            .map_or(Some(None), |length| Some(offset.checked_add(length)))
658
            .err_tip(|| "Integer overflow protection triggered")?;
659
660
        self.retrier
661
3
            .retry(unfold(writer, move |writer| async move {
662
3
                let result = self
663
3
                    .s3_client
664
3
                    .get_object()
665
3
                    .bucket(&self.bucket)
666
3
                    .key(s3_path)
667
3
                    .range(format!(
668
                        "bytes={}-{}",
669
3
                        offset + writer.get_bytes_written(),
670
3
                        end_read_byte.map_or_else(String::new, |v| 
v2
.
to_string2
())
671
                    ))
672
3
                    .send()
673
3
                    .await;
674
675
3
                match result {
676
3
                    Ok(head_object_output) => {
677
3
                        let mut s3_in_stream = head_object_output.body;
678
3
                        let _bytes_sent = 0;
679
680
7
                        while let Some(
maybe_bytes4
) = s3_in_stream.next().await {
681
4
                            match maybe_bytes {
682
4
                                Ok(bytes) => {
683
4
                                    if bytes.is_empty() {
684
1
                                        continue;
685
3
                                    }
686
687
                                    // Clone bytes before sending
688
3
                                    let bytes_clone = bytes.clone();
689
690
                                    // More robust sending mechanism
691
3
                                    match writer.send(bytes).await {
692
3
                                        Ok(()) => {
693
3
                                            let _ = bytes_clone.len();
694
3
                                        }
695
0
                                        Err(e) => {
696
0
                                            return Some((
697
0
                                                RetryResult::Err(Error::from_std_err(
698
0
                                                    Code::Aborted,&e).append(
699
0
                                                    "Error sending bytes to consumer in ONTAP S3"
700
0
                                                )),
701
0
                                                writer,
702
0
                                            ));
703
                                        }
704
                                    }
705
                                }
706
0
                                Err(e) => {
707
0
                                    return Some((
708
0
                                        RetryResult::Retry(
709
0
                                            Error::from_std_err(Code::Aborted, &e)
710
0
                                                .append("Bad bytestream element in ONTAP S3"),
711
0
                                        ),
712
0
                                        writer,
713
0
                                    ));
714
                                }
715
                            }
716
                        }
717
718
                        // EOF handling
719
3
                        if let Err(
e0
) = writer.send_eof() {
720
0
                            return Some((
721
0
                                RetryResult::Err(
722
0
                                    Error::from_std_err(Code::Aborted, &e)
723
0
                                        .append("Failed to send EOF to consumer in ONTAP S3"),
724
0
                                ),
725
0
                                writer,
726
0
                            ));
727
3
                        }
728
729
3
                        Some((RetryResult::Ok(()), writer))
730
                    }
731
0
                    Err(sdk_error) => match sdk_error.into_service_error() {
732
0
                        GetObjectError::NoSuchKey(e) => Some((
733
0
                            RetryResult::Err(
734
0
                                Error::from_std_err(Code::NotFound, &e)
735
0
                                    .append("No such key in ONTAP S3"),
736
0
                            ),
737
0
                            writer,
738
0
                        )),
739
0
                        other => Some((
740
0
                            RetryResult::Retry(
741
0
                                Error::from_std_err(Code::Unavailable, &other)
742
0
                                    .append("Unhandled GetObjectError in ONTAP S3"),
743
0
                            ),
744
0
                            writer,
745
0
                        )),
746
                    },
747
                }
748
6
            }))
749
            .await
750
4
    }
751
752
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
753
0
        self
754
0
    }
755
756
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
757
0
        self
758
0
    }
759
760
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
761
0
        self
762
0
    }
763
764
3
    fn register_remove_callback(
765
3
        self: Arc<Self>,
766
3
        callback: Arc<dyn RemoveItemCallback>,
767
3
    ) -> Result<(), Error> {
768
3
        self.remove_callbacks.lock().push(callback);
769
3
        Ok(())
770
3
    }
771
}
772
773
#[async_trait]
774
impl<I, NowFn> HealthStatusIndicator for OntapS3Store<NowFn>
775
where
776
    I: InstantWrapper,
777
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
778
{
779
0
    fn get_name(&self) -> &'static str {
780
0
        "OntapS3Store"
781
0
    }
782
783
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
784
        StoreDriver::check_health(Pin::new(self), namespace).await
785
0
    }
786
}