Coverage Report

Created: 2026-04-07 13:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/azure_blob_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::sync::Arc;
20
21
use async_trait::async_trait;
22
use azure_core::auth::Secret;
23
use azure_core::prelude::Range;
24
use azure_core::{Body, HttpClient, StatusCode, TransportOptions};
25
use azure_storage::StorageCredentials;
26
use azure_storage_blobs::prelude::*;
27
use bytes::Bytes;
28
use futures::future::FusedFuture;
29
use futures::stream::{FuturesUnordered, unfold};
30
use futures::{FutureExt, StreamExt, TryStreamExt};
31
use http::Method;
32
use http_body_util::Full;
33
use hyper::Uri;
34
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
35
use hyper_util::client::legacy::Client as LegacyClient;
36
use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector;
37
use hyper_util::rt::TokioExecutor;
38
use nativelink_config::stores::ExperimentalAzureSpec;
39
use nativelink_error::{Code, Error, ResultExt, make_err};
40
use nativelink_metric::MetricsComponent;
41
use nativelink_util::buf_channel::{
42
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
43
};
44
use nativelink_util::fs;
45
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
46
use nativelink_util::instant_wrapper::InstantWrapper;
47
use nativelink_util::retry::{Retrier, RetryResult};
48
use nativelink_util::store_trait::{
49
    RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo,
50
};
51
use tokio::sync::mpsc;
52
use tokio::time::sleep;
53
use tracing::{Level, event};
54
55
use crate::cas_utils::is_zero_digest;
56
57
// Check the below doc for the limits specific to Azure.
58
// https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage
59
60
// Maximum number of blocks in a block blob or append blob
61
const MAX_BLOCKS: usize = 50_000;
62
63
// Maximum size of a block in a block blob (4,000 MiB)
64
const MAX_BLOCK_SIZE: u64 = 4_000 * 1024 * 1024; // 4,000 MiB = 4 GiB
65
66
// Default block size for uploads (5 MiB)
67
const DEFAULT_BLOCK_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB
68
69
// Default maximum retry buffer per request
70
const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5 MiB
71
72
// Default maximum number of concurrent uploads
73
const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10;
74
75
// Maximum number of idle connections per host
76
const MAX_IDLE_PER_HOST: usize = 32;
77
78
// Default connection timeout in milliseconds
79
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
80
81
// Environment variable name for Azure account key
82
const ACCOUNT_KEY_ENV_VAR: &str = "AZURE_STORAGE_KEY";
83
84
pub(crate) enum BufferedBodyState {
85
    Buffered(Bytes),
86
    Empty,
87
}
88
89
pub(crate) struct RequestComponents {
90
    method: Method,
91
    uri: Uri,
92
    version: http::Version,
93
    headers: http::HeaderMap,
94
    body_data: BufferedBodyState,
95
}
96
97
mod body_processing {
98
    use azure_core::Error;
99
100
    use super::{Body, BufferedBodyState};
101
102
    #[inline]
103
0
    pub(crate) async fn buffer_body(body: Body) -> Result<BufferedBodyState, Error> {
104
0
        match body {
105
0
            Body::Bytes(bytes) if bytes.is_empty() => Ok(BufferedBodyState::Empty),
106
0
            Body::Bytes(bytes) => Ok(BufferedBodyState::Buffered(bytes)),
107
0
            Body::SeekableStream(_) => Err(Error::new(
108
0
                azure_core::error::ErrorKind::Other,
109
0
                "Unsupported body type: SeekableStream",
110
0
            )),
111
        }
112
0
    }
113
}
114
115
struct RequestBuilder<'a> {
116
    components: &'a RequestComponents,
117
}
118
119
impl<'a> RequestBuilder<'a> {
120
    #[inline]
121
0
    const fn new(components: &'a RequestComponents) -> Self {
122
0
        Self { components }
123
0
    }
124
125
    #[inline]
126
0
    fn build(&self) -> Result<hyper::Request<Full<Bytes>>, http::Error> {
127
0
        let mut req_builder = hyper::Request::builder()
128
0
            .method(self.components.method.clone())
129
0
            .uri(self.components.uri.clone())
130
0
            .version(self.components.version);
131
132
0
        let headers_map = req_builder.headers_mut().unwrap();
133
0
        for (name, value) in &self.components.headers {
134
0
            headers_map.insert(name, value.clone());
135
0
        }
136
137
0
        match &self.components.body_data {
138
0
            BufferedBodyState::Buffered(bytes) => req_builder.body(Full::new(bytes.clone())),
139
0
            BufferedBodyState::Empty => req_builder.body(Full::new(Bytes::new())),
140
        }
141
0
    }
142
}
143
144
mod conversions {
145
    use std::collections::HashMap;
146
147
    use azure_core::{Error, Request, Response, StatusCode, headers as azure_headers};
148
    use http_body_util::BodyExt;
149
    use hyper::body::Incoming;
150
151
    use super::{BufferedBodyState, Method, RequestComponents, Uri, body_processing};
152
153
    pub(crate) trait RequestExt {
154
        async fn into_components(self) -> Result<RequestComponents, Error>;
155
    }
156
157
    impl RequestExt for Request {
158
0
        async fn into_components(self) -> Result<RequestComponents, Error> {
159
0
            let method = Method::from_bytes(self.method().as_ref().as_bytes()).map_err(|e| {
160
0
                Error::new(
161
0
                    azure_core::error::ErrorKind::Other,
162
0
                    format!("Failed to convert method: {e}"),
163
                )
164
0
            })?;
165
166
0
            let uri = Uri::try_from(self.url().as_str()).map_err(|e| {
167
0
                Error::new(
168
0
                    azure_core::error::ErrorKind::Other,
169
0
                    format!("Failed to parse URI: {e}"),
170
                )
171
0
            })?;
172
173
0
            let version = http::Version::HTTP_11; // Default to HTTP/1.1
174
175
0
            let mut headers = http::HeaderMap::new();
176
0
            for (name, value) in self.headers().iter() {
177
0
                let header_name =
178
0
                    http::HeaderName::from_bytes(name.as_str().as_bytes()).map_err(|e| {
179
0
                        Error::new(
180
0
                            azure_core::error::ErrorKind::Other,
181
0
                            format!("Failed to convert header name: {e}"),
182
                        )
183
0
                    })?;
184
0
                let header_value = http::HeaderValue::from_str(value.as_str()).map_err(|e| {
185
0
                    Error::new(
186
0
                        azure_core::error::ErrorKind::Other,
187
0
                        format!("Failed to convert header value: {e}"),
188
                    )
189
0
                })?;
190
0
                headers.insert(header_name, header_value);
191
            }
192
193
0
            let body = self.body().clone();
194
195
0
            let needs_buffering = matches!(method, Method::POST | Method::PUT);
196
197
0
            let body_data = if needs_buffering {
198
0
                body_processing::buffer_body(body).await?
199
            } else {
200
0
                BufferedBodyState::Empty
201
            };
202
203
0
            Ok(RequestComponents {
204
0
                method,
205
0
                uri,
206
0
                version,
207
0
                headers,
208
0
                body_data,
209
0
            })
210
0
        }
211
    }
212
213
    pub(crate) trait ResponseExt {
214
        async fn into_azure_response(self) -> Result<Response, Error>;
215
    }
216
217
    impl ResponseExt for hyper::Response<Incoming> {
218
0
        async fn into_azure_response(self) -> Result<Response, Error> {
219
0
            let (parts, body) = self.into_parts();
220
221
            // Convert headers
222
0
            let headers: HashMap<_, _> = parts
223
0
                .headers
224
0
                .iter()
225
0
                .filter_map(|(k, v)| {
226
                    Some((
227
0
                        azure_headers::HeaderName::from(k.as_str().to_owned()),
228
0
                        azure_headers::HeaderValue::from(v.to_str().ok()?.to_owned()),
229
                    ))
230
0
                })
231
0
                .collect();
232
233
0
            let data = body
234
0
                .collect()
235
0
                .await
236
0
                .map_err(|e| {
237
0
                    Error::new(
238
0
                        azure_core::error::ErrorKind::Other,
239
0
                        format!("Failed to collect body: {e}"),
240
                    )
241
0
                })?
242
0
                .to_bytes();
243
244
0
            Ok(Response::new(
245
0
                StatusCode::try_from(parts.status.as_u16()).expect("Invalid status code"),
246
0
                azure_headers::Headers::from(headers),
247
0
                Box::pin(futures::stream::once(futures::future::ready(Ok(data)))),
248
0
            ))
249
0
        }
250
    }
251
}
252
253
mod execution {
254
    use azure_core::Response;
255
    use bytes::Bytes;
256
    use http_body_util::Full;
257
258
    use super::conversions::ResponseExt;
259
    use super::{
260
        Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents,
261
        RetryResult, fs, make_err,
262
    };
263
264
0
    pub(crate) async fn execute_request(
265
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>,
266
0
        components: &RequestComponents,
267
0
    ) -> RetryResult<Response> {
268
0
        let _permit = match fs::get_permit().await {
269
0
            Ok(permit) => permit,
270
0
            Err(e) => {
271
0
                return RetryResult::Retry(make_err!(
272
0
                    Code::Unavailable,
273
0
                    "Failed to acquire permit: {e}"
274
0
                ));
275
            }
276
        };
277
278
0
        let request = match RequestBuilder::new(components).build() {
279
0
            Ok(req) => req,
280
0
            Err(e) => {
281
0
                return RetryResult::Err(make_err!(
282
0
                    Code::Internal,
283
0
                    "Failed to create request: {e}",
284
0
                ));
285
            }
286
        };
287
288
0
        match client.request(request).await {
289
0
            Ok(resp) => match resp.into_azure_response().await {
290
0
                Ok(response) => RetryResult::Ok(response),
291
0
                Err(e) => RetryResult::Retry(make_err!(
292
0
                    Code::Unavailable,
293
0
                    "Failed to convert response: {e}"
294
0
                )),
295
            },
296
0
            Err(e) => RetryResult::Retry(make_err!(
297
0
                Code::Unavailable,
298
0
                "Failed request in AzureBlobStore: {e}"
299
0
            )),
300
        }
301
0
    }
302
303
    #[inline]
304
0
    pub(crate) fn create_retry_stream(
305
0
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>,
306
0
        components: RequestComponents,
307
0
    ) -> impl futures::Stream<Item = RetryResult<Response>> {
308
0
        futures::stream::unfold(components, move |components| {
309
0
            let client_clone = client.clone();
310
0
            async move {
311
0
                let result = execute_request(client_clone, &components).await;
312
0
                Some((result, components))
313
0
            }
314
0
        })
315
0
    }
316
}
317
318
#[derive(Clone)]
319
pub struct AzureClient {
320
    client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>,
321
    config: Arc<ExperimentalAzureSpec>,
322
    retrier: Retrier,
323
}
324
325
impl AzureClient {
326
0
    pub fn new(
327
0
        config: ExperimentalAzureSpec,
328
0
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
329
0
    ) -> Result<Self, Error> {
330
0
        let connector = Self::build_connector(&config);
331
0
        let connection_timeout = if config.connection_timeout_s > 0 {
332
0
            Duration::from_millis(config.connection_timeout_s)
333
        } else {
334
0
            Duration::from_millis(DEFAULT_CONNECTION_TIMEOUT_MS)
335
        };
336
0
        let client = Self::build_client(connector, connection_timeout);
337
338
        Ok(Self {
339
0
            client,
340
0
            retrier: Retrier::new(
341
0
                Arc::new(|duration| Box::pin(sleep(duration))),
342
0
                jitter_fn,
343
0
                config.common.retry.clone(),
344
            ),
345
0
            config: Arc::new(config),
346
        })
347
0
    }
348
349
0
    fn build_connector(config: &ExperimentalAzureSpec) -> HttpsConnector<LegacyHttpConnector> {
350
0
        let builder = HttpsConnectorBuilder::new().with_webpki_roots();
351
352
0
        let builder_with_schemes = if config.common.insecure_allow_http {
353
0
            builder.https_or_http()
354
        } else {
355
0
            builder.https_only()
356
        };
357
358
0
        if config.common.disable_http2 {
359
0
            builder_with_schemes.enable_http1().build()
360
        } else {
361
0
            builder_with_schemes.enable_http1().enable_http2().build()
362
        }
363
0
    }
364
365
0
    fn build_client(
366
0
        connector: HttpsConnector<LegacyHttpConnector>,
367
0
        connection_timeout: Duration,
368
0
    ) -> LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>> {
369
0
        LegacyClient::builder(TokioExecutor::new())
370
0
            .pool_idle_timeout(connection_timeout)
371
0
            .pool_max_idle_per_host(MAX_IDLE_PER_HOST)
372
0
            .build(connector)
373
0
    }
374
}
375
376
impl core::fmt::Debug for AzureClient {
377
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
378
0
        f.debug_struct("AzureClient")
379
0
            .field("config", &self.config)
380
0
            .finish()
381
0
    }
382
}
383
384
#[async_trait::async_trait]
385
impl HttpClient for AzureClient {
386
    async fn execute_request(
387
        &self,
388
        request: &azure_core::Request,
389
0
    ) -> azure_core::Result<azure_core::Response> {
390
        use conversions::RequestExt;
391
392
        let components = request.clone().into_components().await?;
393
394
        match self
395
            .retrier
396
            .retry(execution::create_retry_stream(
397
                self.client.clone(),
398
                components,
399
            ))
400
            .await
401
        {
402
            Ok(response) => Ok(response),
403
            Err(e) => Err(azure_core::Error::new(
404
                azure_core::error::ErrorKind::Other,
405
                format!("Connection failed after retries: {e}"),
406
            )),
407
        }
408
0
    }
409
}
410
411
#[derive(MetricsComponent, Debug)]
412
pub struct AzureBlobStore<NowFn> {
413
    client: Arc<ContainerClient>,
414
    now_fn: NowFn,
415
    #[metric(help = "The container name for the Azure store")]
416
    container: String,
417
    #[metric(help = "The blob prefix for the Azure store")]
418
    blob_prefix: String,
419
    retrier: Retrier,
420
    #[metric(help = "The number of seconds to consider an object expired")]
421
    consider_expired_after_s: i64,
422
    #[metric(help = "The number of bytes to buffer for retrying requests")]
423
    max_retry_buffer_per_request: usize,
424
    #[metric(help = "The number of concurrent uploads allowed")]
425
    max_concurrent_uploads: usize,
426
}
427
428
impl<I, NowFn> AzureBlobStore<NowFn>
429
where
430
    I: InstantWrapper,
431
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
432
{
433
0
    pub async fn new(spec: &ExperimentalAzureSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> {
434
0
        let jitter_fn = spec.common.retry.make_jitter_fn();
435
436
0
        let http_client = Arc::new(
437
0
            AzureClient::new(spec.clone(), jitter_fn.clone())
438
0
                .map_err(|e| make_err!(Code::Unavailable, "Failed to create Azure client: {e}"))?,
439
        );
440
441
0
        let transport_options = TransportOptions::new(http_client);
442
443
0
        let account_key = std::env::var(ACCOUNT_KEY_ENV_VAR).map_err(|e| {
444
0
            make_err!(
445
0
                Code::FailedPrecondition,
446
                "Failed to read {ACCOUNT_KEY_ENV_VAR} environment variable: {e}"
447
            )
448
0
        })?;
449
450
0
        let storage_credentials =
451
0
            StorageCredentials::access_key(spec.account_name.clone(), Secret::new(account_key));
452
453
        // Create a container client with the specified credentials and transport
454
0
        let container_client = BlobServiceClient::builder(&spec.account_name, storage_credentials)
455
0
            .transport(transport_options)
456
0
            .container_client(&spec.container);
457
458
0
        Self::new_with_client_and_jitter(spec, container_client, jitter_fn, now_fn)
459
0
    }
460
461
12
    pub fn new_with_client_and_jitter(
462
12
        spec: &ExperimentalAzureSpec,
463
12
        client: ContainerClient,
464
12
        jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>,
465
12
        now_fn: NowFn,
466
12
    ) -> Result<Arc<Self>, Error> {
467
12
        Ok(Arc::new(Self {
468
12
            client: Arc::new(client),
469
12
            now_fn,
470
12
            container: spec.container.clone(),
471
12
            blob_prefix: spec
472
12
                .common
473
12
                .key_prefix
474
12
                .as_ref()
475
12
                .unwrap_or(&String::new())
476
12
                .clone(),
477
12
            retrier: Retrier::new(
478
12
                Arc::new(|duration| 
Box::pin1
(
sleep1
(
duration1
))),
479
12
                jitter_fn,
480
12
                spec.common.retry.clone(),
481
            ),
482
12
            consider_expired_after_s: i64::from(spec.common.consider_expired_after_s),
483
12
            max_retry_buffer_per_request: spec
484
12
                .common
485
12
                .max_retry_buffer_per_request
486
12
                .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST),
487
12
            max_concurrent_uploads: spec
488
12
                .common
489
12
                .multipart_max_concurrent_uploads
490
12
                .map_or(DEFAULT_MAX_CONCURRENT_UPLOADS, |v| v),
491
        }))
492
12
    }
493
494
11
    fn make_blob_path(&self, key: &StoreKey<'_>) -> String {
495
11
        format!("{}{}", self.blob_prefix, key.as_str())
496
11
    }
497
498
5
    async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
499
5
        let blob_path = self.make_blob_path(digest);
500
501
5
        self.retrier
502
5
            .retry(unfold((), move |state| {
503
5
                let blob_path = blob_path.clone();
504
5
                async move {
505
5
                    let result = self.client.blob_client(&blob_path).get_properties().await;
506
507
5
                    match result {
508
4
                        Ok(props) => {
509
4
                            if self.consider_expired_after_s > 0 {
510
2
                                let last_modified = props.blob.properties.last_modified;
511
2
                                let now = (self.now_fn)().unix_timestamp() as i64;
512
2
                                if last_modified.unix_timestamp() + self.consider_expired_after_s
513
2
                                    <= now
514
                                {
515
1
                                    return Some((RetryResult::Ok(None), state));
516
1
                                }
517
2
                            }
518
3
                            let blob_size = props.blob.properties.content_length;
519
3
                            Some((RetryResult::Ok(Some(blob_size)), state))
520
                        }
521
1
                        Err(err) => {
522
1
                            if err
523
1
                                .as_http_error()
524
1
                                .is_some_and(|e| e.status() == StatusCode::NotFound)
525
                            {
526
1
                                Some((RetryResult::Ok(None), state))
527
0
                            } else if err.to_string().contains("ContainerNotFound") {
528
0
                                Some((
529
0
                                    RetryResult::Err(make_err!(
530
0
                                        Code::InvalidArgument,
531
0
                                        "Container not found: {}",
532
0
                                        err
533
0
                                    )),
534
0
                                    state,
535
0
                                ))
536
                            } else {
537
0
                                Some((
538
0
                                    RetryResult::Retry(make_err!(
539
0
                                        Code::Unavailable,
540
0
                                        "Failed to get blob properties: {:?}",
541
0
                                        err
542
0
                                    )),
543
0
                                    state,
544
0
                                ))
545
                            }
546
                        }
547
                    }
548
5
                }
549
5
            }))
550
5
            .await
551
5
    }
552
}
553
554
#[async_trait]
555
impl<I, NowFn> StoreDriver for AzureBlobStore<NowFn>
556
where
557
    I: InstantWrapper,
558
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
559
{
560
    async fn has_with_results(
561
        self: Pin<&Self>,
562
        keys: &[StoreKey<'_>],
563
        results: &mut [Option<u64>],
564
6
    ) -> Result<(), Error> {
565
        keys.iter()
566
            .zip(results.iter_mut())
567
6
            .map(|(key, result)| async move {
568
6
                if is_zero_digest(key.borrow()) {
569
1
                    *result = Some(0);
570
1
                    return Ok::<_, Error>(());
571
5
                }
572
5
                *result = self.has(key).await
?0
;
573
5
                Ok::<_, Error>(())
574
12
            })
575
            .collect::<FuturesUnordered<_>>()
576
            .try_collect()
577
            .await
578
6
    }
579
580
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
581
0
        matches!(optimization, StoreOptimizations::LazyExistenceOnSync)
582
0
    }
583
584
    async fn update(
585
        self: Pin<&Self>,
586
        digest: StoreKey<'_>,
587
        mut reader: DropCloserReadHalf,
588
        upload_size: UploadSizeInfo,
589
4
    ) -> Result<(), Error> {
590
        let blob_path = self.make_blob_path(&digest);
591
        // Handling zero-sized content check
592
        if upload_size == UploadSizeInfo::ExactSize(0) {
593
            return Ok(());
594
        }
595
596
        let max_size = match upload_size {
597
            UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
598
        };
599
600
        // For small files, we'll use single block upload
601
        if max_size < DEFAULT_BLOCK_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) {
602
            let UploadSizeInfo::ExactSize(sz) = upload_size else {
603
                unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
604
            };
605
606
            reader.set_max_recent_data_size(
607
                u64::try_from(self.max_retry_buffer_per_request)
608
                    .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
609
            );
610
611
            return self.retrier
612
2
                .retry(unfold(reader, move |mut reader| {
613
2
                    let client = Arc::clone(&self.client);
614
2
                    let blob_client = client.blob_client(&blob_path);
615
2
                    async move {
616
2
                        let (mut tx, mut rx) = make_buf_channel_pair();
617
618
2
                        let result = {
619
2
                            let reader_ref = &mut reader;
620
2
                            let (upload_res, bind_res) = tokio::join!(
621
2
                            async {
622
2
                                let mut buffer = Vec::with_capacity(usize::try_from(sz).expect("size must be non-negative and fit in usize"));
623
7
                                while let Ok(Some(
chunk5
)) = rx.try_next().await {
624
5
                                    buffer.extend_from_slice(&chunk);
625
5
                                }
626
627
2
                                blob_client
628
2
                                    .put_block_blob(Body::from(buffer))
629
2
                                    .content_type("application/octet-stream")
630
2
                                    .into_future()
631
2
                                    .await
632
2
                                    .map(|_| ())
633
2
                                    .map_err(|e| 
make_err!0
(
Code::Aborted0
, "{:?}", e))
634
2
                            },
635
2
                            async {
636
2
                                tx.bind_buffered(reader_ref).await
637
2
                            }
638
                        );
639
640
2
                            upload_res
641
2
                                .and(bind_res)
642
2
                                .err_tip(|| "Failed to upload blob in single chunk")
643
                        };
644
645
2
                        match result {
646
2
                            Ok(()) => Some((RetryResult::Ok(()), reader)),
647
0
                            Err(mut err) => {
648
0
                                err.code = Code::Aborted;
649
0
                                let bytes_received = reader.get_bytes_received();
650
651
0
                                if let Err(try_reset_err) = reader.try_reset_stream() {
652
0
                                    event!(
653
0
                                    Level::ERROR,
654
                                    ?bytes_received,
655
                                    err = ?try_reset_err,
656
                                    "Unable to reset stream after failed upload in AzureStore::update"
657
                                );
658
0
                                    Some((RetryResult::Err(err
659
0
                                        .merge(try_reset_err)
660
0
                                        .append(format!("Failed to retry upload with {bytes_received} bytes received in AzureStore::update"))),
661
0
                                          reader))
662
                                } else {
663
0
                                    let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in AzureStore::update"));
664
0
                                    event!(
665
0
                                    Level::INFO,
666
                                    ?err,
667
                                    ?bytes_received,
668
                                    "Retryable Azure error"
669
                                );
670
0
                                    Some((RetryResult::Retry(err), reader))
671
                                }
672
                            }
673
                        }
674
2
                    }
675
2
                }))
676
                .await;
677
        }
678
679
        // For larger files, we'll use block upload strategy
680
        let block_size =
681
            cmp::min(max_size / (MAX_BLOCKS as u64 - 1), MAX_BLOCK_SIZE).max(DEFAULT_BLOCK_SIZE);
682
683
        let (tx, mut rx) = mpsc::channel(self.max_concurrent_uploads);
684
        let mut block_ids = Vec::with_capacity(MAX_BLOCKS);
685
        let retrier = self.retrier.clone();
686
687
        let read_stream_fut = {
688
            let tx = tx.clone();
689
            let blob_path = blob_path.clone();
690
1
            async move {
691
4
                for block_id in 
0..MAX_BLOCKS1
{
692
4
                    let write_buf = reader
693
4
                        .consume(Some(
694
4
                            usize::try_from(block_size)
695
4
                                .err_tip(|| "Could not convert block_size to usize")
?0
,
696
                        ))
697
4
                        .await
698
4
                        .err_tip(|| "Failed to read chunk in azure_store")
?0
;
699
700
4
                    if write_buf.is_empty() {
701
1
                        break;
702
3
                    }
703
704
3
                    let block_id = format!("{block_id:032}");
705
3
                    let blob_path = blob_path.clone();
706
707
3
                    tx.send(async move {
708
3
                        self.retrier
709
3
                            .retry(unfold(
710
3
                                (write_buf, block_id.clone()),
711
3
                                move |(write_buf, block_id)| {
712
3
                                    let client = Arc::clone(&self.client);
713
3
                                    let blob_client = client.blob_client(&blob_path);
714
3
                                    async move {
715
3
                                        let retry_result = blob_client
716
3
                                            .put_block(
717
3
                                                block_id.clone(),
718
3
                                                Body::from(write_buf.clone()),
719
3
                                            )
720
3
                                            .into_future()
721
3
                                            .await
722
3
                                            .map_or_else(
723
0
                                                |e| {
724
0
                                                    RetryResult::Retry(make_err!(
725
0
                                            Code::Aborted,
726
0
                                            "Failed to upload block {} in Azure store: {:?}",
727
0
                                            block_id,
728
0
                                            e
729
0
                                        ))
730
0
                                                },
731
3
                                                |_| RetryResult::Ok(block_id.clone()),
732
                                            );
733
3
                                        Some((retry_result, (write_buf, block_id)))
734
3
                                    }
735
3
                                },
736
                            ))
737
3
                            .await
738
3
                    })
739
3
                    .await
740
3
                    .map_err(|err| 
{0
741
0
                        Error::from_std_err(Code::Internal, &err)
742
0
                            .append("Failed to send block to channel")
743
0
                    })?;
744
                }
745
1
                Ok::<_, Error>(())
746
1
            }
747
            .fuse()
748
        };
749
750
        let mut upload_futures = FuturesUnordered::new();
751
752
        tokio::pin!(read_stream_fut);
753
754
        loop {
755
            if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() {
756
                break;
757
            }
758
            tokio::select! {
759
                result = &mut read_stream_fut => result?,
760
                Some(block_id) = upload_futures.next() => block_ids.push(block_id?),
761
                Some(fut) = rx.recv() => upload_futures.push(fut),
762
            }
763
        }
764
765
        // Sorting block IDs to ensure consistent ordering
766
        block_ids.sort_unstable();
767
768
        // Commit the block list
769
        let block_list = BlockList {
770
            blocks: block_ids
771
                .into_iter()
772
3
                .map(|id| BlobBlockType::Latest(BlockId::from(id)))
773
                .collect(),
774
        };
775
776
        retrier
777
1
            .retry(unfold(block_list, move |block_list| {
778
1
                let client = Arc::clone(&self.client);
779
1
                let blob_client = client.blob_client(&blob_path);
780
781
1
                async move {
782
                    Some((
783
1
                        blob_client
784
1
                            .put_block_list(block_list.clone())
785
1
                            .content_type("application/octet-stream")
786
1
                            .into_future()
787
1
                            .await
788
1
                            .map_or_else(
789
0
                                |e| {
790
0
                                    RetryResult::Retry(
791
0
                                        Error::from_std_err(Code::Aborted, &e)
792
0
                                            .append("Failed to commit block list in Azure store:"),
793
0
                                    )
794
0
                                },
795
1
                                |_| RetryResult::Ok(()),
796
                            ),
797
1
                        block_list,
798
                    ))
799
1
                }
800
1
            }))
801
            .await
802
4
    }
803
804
    async fn get_part(
805
        self: Pin<&Self>,
806
        key: StoreKey<'_>,
807
        writer: &mut DropCloserWriteHalf,
808
        offset: u64,
809
        length: Option<u64>,
810
3
    ) -> Result<(), Error> {
811
        if is_zero_digest(key.borrow()) {
812
            writer
813
                .send_eof()
814
                .err_tip(|| "Failed to send zero EOF in azure store get_part")?;
815
            return Ok(());
816
        }
817
818
        let blob_path = self.make_blob_path(&key);
819
820
        let client = Arc::clone(&self.client);
821
        let blob_client = client.blob_client(&blob_path);
822
        let range = match length {
823
            Some(len) => Range::new(offset, offset + len - 1),
824
            None => Range::from(offset..),
825
        };
826
827
        self.retrier
828
3
            .retry(unfold(writer, move |writer| {
829
3
                let range_clone = range.clone();
830
3
                let blob_client = blob_client.clone();
831
3
                async move {
832
3
                    let result = async {
833
3
                        let mut stream = blob_client.get().range(range_clone.clone()).into_stream();
834
835
5
                        while let Some(
chunk_result3
) = stream.next().await {
836
3
                            match chunk_result {
837
2
                                Ok(response) => {
838
2
                                    let data = response.data.collect().await.map_err(|e| 
{0
839
0
                                        make_err!(
840
0
                                            Code::Aborted,
841
                                            "Failed to collect response data: {:?}",
842
                                            e
843
                                        )
844
0
                                    })?;
845
2
                                    if data.is_empty() {
846
0
                                        continue;
847
2
                                    }
848
2
                                    writer.send(data).await.map_err(|e| 
{0
849
0
                                        make_err!(
850
0
                                            Code::Aborted,
851
                                            "Failed to send data to writer: {:?}",
852
                                            e
853
                                        )
854
0
                                    })?;
855
                                }
856
1
                                Err(e) => {
857
1
                                    return match e {
858
1
                                        
e0
if e.as_http_error().is_some_and(|e| {
859
1
                                            e.status() == StatusCode::NotFound
860
1
                                        }) =>
861
                                        {
862
0
                                            Err(make_err!(
863
0
                                                Code::NotFound,
864
0
                                                "Blob not found in Azure: {:?}",
865
0
                                                e
866
0
                                            ))
867
                                        }
868
1
                                        _ => Err(make_err!(
869
1
                                            Code::Aborted,
870
1
                                            "Error reading from Azure stream: {:?}",
871
1
                                            e
872
1
                                        )),
873
                                    };
874
                                }
875
                            }
876
                        }
877
878
2
                        writer.send_eof().map_err(|e| 
{0
879
0
                            make_err!(Code::Aborted, "Failed to send EOF to writer: {:?}", e)
880
0
                        })?;
881
2
                        Ok(())
882
3
                    }
883
3
                    .await;
884
885
3
                    match result {
886
2
                        Ok(()) => Some((RetryResult::Ok(()), writer)),
887
1
                        Err(e) => {
888
1
                            if e.code == Code::NotFound {
889
0
                                Some((RetryResult::Err(e), writer))
890
                            } else {
891
1
                                Some((RetryResult::Retry(e), writer))
892
                            }
893
                        }
894
                    }
895
3
                }
896
3
            }))
897
            .await
898
3
    }
899
900
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
901
0
        self
902
0
    }
903
904
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
905
0
        self
906
0
    }
907
908
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
909
0
        self
910
0
    }
911
912
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
913
0
        registry.register_indicator(self);
914
0
    }
915
916
0
    fn register_remove_callback(
917
0
        self: Arc<Self>,
918
0
        _callback: Arc<dyn RemoveItemCallback>,
919
0
    ) -> Result<(), Error> {
920
        // Azure Blob Storage manages object lifecycle externally,
921
        // so we can safely ignore remove callbacks.
922
0
        Ok(())
923
0
    }
924
}
925
926
#[async_trait]
927
impl<I, NowFn> HealthStatusIndicator for AzureBlobStore<NowFn>
928
where
929
    I: InstantWrapper,
930
    NowFn: Fn() -> I + Send + Sync + Unpin + 'static,
931
{
932
0
    fn get_name(&self) -> &'static str {
933
0
        "AzureBlobStore"
934
0
    }
935
936
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
937
        StoreDriver::check_health(Pin::new(self), namespace).await
938
0
    }
939
}