/build/source/nativelink-store/src/azure_blob_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::pin::Pin; |
17 | | use core::time::Duration; |
18 | | use std::borrow::Cow; |
19 | | use std::sync::Arc; |
20 | | |
21 | | use async_trait::async_trait; |
22 | | use azure_core::auth::Secret; |
23 | | use azure_core::prelude::Range; |
24 | | use azure_core::{Body, HttpClient, StatusCode, TransportOptions}; |
25 | | use azure_storage::StorageCredentials; |
26 | | use azure_storage_blobs::prelude::*; |
27 | | use bytes::Bytes; |
28 | | use futures::future::FusedFuture; |
29 | | use futures::stream::{FuturesUnordered, unfold}; |
30 | | use futures::{FutureExt, StreamExt, TryStreamExt}; |
31 | | use http::Method; |
32 | | use http_body_util::Full; |
33 | | use hyper::Uri; |
34 | | use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; |
35 | | use hyper_util::client::legacy::Client as LegacyClient; |
36 | | use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector; |
37 | | use hyper_util::rt::TokioExecutor; |
38 | | use nativelink_config::stores::ExperimentalAzureSpec; |
39 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
40 | | use nativelink_metric::MetricsComponent; |
41 | | use nativelink_util::buf_channel::{ |
42 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
43 | | }; |
44 | | use nativelink_util::fs; |
45 | | use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; |
46 | | use nativelink_util::instant_wrapper::InstantWrapper; |
47 | | use nativelink_util::retry::{Retrier, RetryResult}; |
48 | | use nativelink_util::store_trait::{ |
49 | | RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo, |
50 | | }; |
51 | | use tokio::sync::mpsc; |
52 | | use tokio::time::sleep; |
53 | | use tracing::{Level, event}; |
54 | | |
55 | | use crate::cas_utils::is_zero_digest; |
56 | | |
57 | | // Check the below doc for the limits specific to Azure. |
58 | | // https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage |
59 | | |
60 | | // Maximum number of blocks in a block blob or append blob |
61 | | const MAX_BLOCKS: usize = 50_000; |
62 | | |
63 | | // Maximum size of a block in a block blob (4,000 MiB) |
64 | | const MAX_BLOCK_SIZE: u64 = 4_000 * 1024 * 1024; // 4,000 MiB = 4 GiB |
65 | | |
66 | | // Default block size for uploads (5 MiB) |
67 | | const DEFAULT_BLOCK_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB |
68 | | |
69 | | // Default maximum retry buffer per request |
70 | | const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5 MiB |
71 | | |
72 | | // Default maximum number of concurrent uploads |
73 | | const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10; |
74 | | |
75 | | // Maximum number of idle connections per host |
76 | | const MAX_IDLE_PER_HOST: usize = 32; |
77 | | |
78 | | // Default connection timeout in milliseconds |
79 | | const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000; |
80 | | |
81 | | // Environment variable name for Azure account key |
82 | | const ACCOUNT_KEY_ENV_VAR: &str = "AZURE_STORAGE_KEY"; |
83 | | |
84 | | pub(crate) enum BufferedBodyState { |
85 | | Buffered(Bytes), |
86 | | Empty, |
87 | | } |
88 | | |
89 | | pub(crate) struct RequestComponents { |
90 | | method: Method, |
91 | | uri: Uri, |
92 | | version: http::Version, |
93 | | headers: http::HeaderMap, |
94 | | body_data: BufferedBodyState, |
95 | | } |
96 | | |
97 | | mod body_processing { |
98 | | use azure_core::Error; |
99 | | |
100 | | use super::{Body, BufferedBodyState}; |
101 | | |
102 | | #[inline] |
103 | 0 | pub(crate) async fn buffer_body(body: Body) -> Result<BufferedBodyState, Error> { |
104 | 0 | match body { |
105 | 0 | Body::Bytes(bytes) if bytes.is_empty() => Ok(BufferedBodyState::Empty), |
106 | 0 | Body::Bytes(bytes) => Ok(BufferedBodyState::Buffered(bytes)), |
107 | 0 | Body::SeekableStream(_) => Err(Error::new( |
108 | 0 | azure_core::error::ErrorKind::Other, |
109 | 0 | "Unsupported body type: SeekableStream", |
110 | 0 | )), |
111 | | } |
112 | 0 | } |
113 | | } |
114 | | |
115 | | struct RequestBuilder<'a> { |
116 | | components: &'a RequestComponents, |
117 | | } |
118 | | |
119 | | impl<'a> RequestBuilder<'a> { |
120 | | #[inline] |
121 | 0 | const fn new(components: &'a RequestComponents) -> Self { |
122 | 0 | Self { components } |
123 | 0 | } |
124 | | |
125 | | #[inline] |
126 | 0 | fn build(&self) -> Result<hyper::Request<Full<Bytes>>, http::Error> { |
127 | 0 | let mut req_builder = hyper::Request::builder() |
128 | 0 | .method(self.components.method.clone()) |
129 | 0 | .uri(self.components.uri.clone()) |
130 | 0 | .version(self.components.version); |
131 | | |
132 | 0 | let headers_map = req_builder.headers_mut().unwrap(); |
133 | 0 | for (name, value) in &self.components.headers { |
134 | 0 | headers_map.insert(name, value.clone()); |
135 | 0 | } |
136 | | |
137 | 0 | match &self.components.body_data { |
138 | 0 | BufferedBodyState::Buffered(bytes) => req_builder.body(Full::new(bytes.clone())), |
139 | 0 | BufferedBodyState::Empty => req_builder.body(Full::new(Bytes::new())), |
140 | | } |
141 | 0 | } |
142 | | } |
143 | | |
144 | | mod conversions { |
145 | | use std::collections::HashMap; |
146 | | |
147 | | use azure_core::{Error, Request, Response, StatusCode, headers as azure_headers}; |
148 | | use http_body_util::BodyExt; |
149 | | use hyper::body::Incoming; |
150 | | |
151 | | use super::{BufferedBodyState, Method, RequestComponents, Uri, body_processing}; |
152 | | |
153 | | pub(crate) trait RequestExt { |
154 | | async fn into_components(self) -> Result<RequestComponents, Error>; |
155 | | } |
156 | | |
157 | | impl RequestExt for Request { |
158 | 0 | async fn into_components(self) -> Result<RequestComponents, Error> { |
159 | 0 | let method = Method::from_bytes(self.method().as_ref().as_bytes()).map_err(|e| { |
160 | 0 | Error::new( |
161 | 0 | azure_core::error::ErrorKind::Other, |
162 | 0 | format!("Failed to convert method: {e}"), |
163 | | ) |
164 | 0 | })?; |
165 | | |
166 | 0 | let uri = Uri::try_from(self.url().as_str()).map_err(|e| { |
167 | 0 | Error::new( |
168 | 0 | azure_core::error::ErrorKind::Other, |
169 | 0 | format!("Failed to parse URI: {e}"), |
170 | | ) |
171 | 0 | })?; |
172 | | |
173 | 0 | let version = http::Version::HTTP_11; // Default to HTTP/1.1 |
174 | | |
175 | 0 | let mut headers = http::HeaderMap::new(); |
176 | 0 | for (name, value) in self.headers().iter() { |
177 | 0 | let header_name = |
178 | 0 | http::HeaderName::from_bytes(name.as_str().as_bytes()).map_err(|e| { |
179 | 0 | Error::new( |
180 | 0 | azure_core::error::ErrorKind::Other, |
181 | 0 | format!("Failed to convert header name: {e}"), |
182 | | ) |
183 | 0 | })?; |
184 | 0 | let header_value = http::HeaderValue::from_str(value.as_str()).map_err(|e| { |
185 | 0 | Error::new( |
186 | 0 | azure_core::error::ErrorKind::Other, |
187 | 0 | format!("Failed to convert header value: {e}"), |
188 | | ) |
189 | 0 | })?; |
190 | 0 | headers.insert(header_name, header_value); |
191 | | } |
192 | | |
193 | 0 | let body = self.body().clone(); |
194 | | |
195 | 0 | let needs_buffering = matches!(method, Method::POST | Method::PUT); |
196 | | |
197 | 0 | let body_data = if needs_buffering { |
198 | 0 | body_processing::buffer_body(body).await? |
199 | | } else { |
200 | 0 | BufferedBodyState::Empty |
201 | | }; |
202 | | |
203 | 0 | Ok(RequestComponents { |
204 | 0 | method, |
205 | 0 | uri, |
206 | 0 | version, |
207 | 0 | headers, |
208 | 0 | body_data, |
209 | 0 | }) |
210 | 0 | } |
211 | | } |
212 | | |
213 | | pub(crate) trait ResponseExt { |
214 | | async fn into_azure_response(self) -> Result<Response, Error>; |
215 | | } |
216 | | |
217 | | impl ResponseExt for hyper::Response<Incoming> { |
218 | 0 | async fn into_azure_response(self) -> Result<Response, Error> { |
219 | 0 | let (parts, body) = self.into_parts(); |
220 | | |
221 | | // Convert headers |
222 | 0 | let headers: HashMap<_, _> = parts |
223 | 0 | .headers |
224 | 0 | .iter() |
225 | 0 | .filter_map(|(k, v)| { |
226 | | Some(( |
227 | 0 | azure_headers::HeaderName::from(k.as_str().to_owned()), |
228 | 0 | azure_headers::HeaderValue::from(v.to_str().ok()?.to_owned()), |
229 | | )) |
230 | 0 | }) |
231 | 0 | .collect(); |
232 | | |
233 | 0 | let data = body |
234 | 0 | .collect() |
235 | 0 | .await |
236 | 0 | .map_err(|e| { |
237 | 0 | Error::new( |
238 | 0 | azure_core::error::ErrorKind::Other, |
239 | 0 | format!("Failed to collect body: {e}"), |
240 | | ) |
241 | 0 | })? |
242 | 0 | .to_bytes(); |
243 | | |
244 | 0 | Ok(Response::new( |
245 | 0 | StatusCode::try_from(parts.status.as_u16()).expect("Invalid status code"), |
246 | 0 | azure_headers::Headers::from(headers), |
247 | 0 | Box::pin(futures::stream::once(futures::future::ready(Ok(data)))), |
248 | 0 | )) |
249 | 0 | } |
250 | | } |
251 | | } |
252 | | |
253 | | mod execution { |
254 | | use azure_core::Response; |
255 | | use bytes::Bytes; |
256 | | use http_body_util::Full; |
257 | | |
258 | | use super::conversions::ResponseExt; |
259 | | use super::{ |
260 | | Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents, |
261 | | RetryResult, fs, make_err, |
262 | | }; |
263 | | |
264 | 0 | pub(crate) async fn execute_request( |
265 | 0 | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>, |
266 | 0 | components: &RequestComponents, |
267 | 0 | ) -> RetryResult<Response> { |
268 | 0 | let _permit = match fs::get_permit().await { |
269 | 0 | Ok(permit) => permit, |
270 | 0 | Err(e) => { |
271 | 0 | return RetryResult::Retry(make_err!( |
272 | 0 | Code::Unavailable, |
273 | 0 | "Failed to acquire permit: {e}" |
274 | 0 | )); |
275 | | } |
276 | | }; |
277 | | |
278 | 0 | let request = match RequestBuilder::new(components).build() { |
279 | 0 | Ok(req) => req, |
280 | 0 | Err(e) => { |
281 | 0 | return RetryResult::Err(make_err!( |
282 | 0 | Code::Internal, |
283 | 0 | "Failed to create request: {e}", |
284 | 0 | )); |
285 | | } |
286 | | }; |
287 | | |
288 | 0 | match client.request(request).await { |
289 | 0 | Ok(resp) => match resp.into_azure_response().await { |
290 | 0 | Ok(response) => RetryResult::Ok(response), |
291 | 0 | Err(e) => RetryResult::Retry(make_err!( |
292 | 0 | Code::Unavailable, |
293 | 0 | "Failed to convert response: {e}" |
294 | 0 | )), |
295 | | }, |
296 | 0 | Err(e) => RetryResult::Retry(make_err!( |
297 | 0 | Code::Unavailable, |
298 | 0 | "Failed request in AzureBlobStore: {e}" |
299 | 0 | )), |
300 | | } |
301 | 0 | } |
302 | | |
303 | | #[inline] |
304 | 0 | pub(crate) fn create_retry_stream( |
305 | 0 | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>, |
306 | 0 | components: RequestComponents, |
307 | 0 | ) -> impl futures::Stream<Item = RetryResult<Response>> { |
308 | 0 | futures::stream::unfold(components, move |components| { |
309 | 0 | let client_clone = client.clone(); |
310 | 0 | async move { |
311 | 0 | let result = execute_request(client_clone, &components).await; |
312 | 0 | Some((result, components)) |
313 | 0 | } |
314 | 0 | }) |
315 | 0 | } |
316 | | } |
317 | | |
318 | | #[derive(Clone)] |
319 | | pub struct AzureClient { |
320 | | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>>, |
321 | | config: Arc<ExperimentalAzureSpec>, |
322 | | retrier: Retrier, |
323 | | } |
324 | | |
325 | | impl AzureClient { |
326 | 0 | pub fn new( |
327 | 0 | config: ExperimentalAzureSpec, |
328 | 0 | jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>, |
329 | 0 | ) -> Result<Self, Error> { |
330 | 0 | let connector = Self::build_connector(&config); |
331 | 0 | let connection_timeout = if config.connection_timeout_s > 0 { |
332 | 0 | Duration::from_millis(config.connection_timeout_s) |
333 | | } else { |
334 | 0 | Duration::from_millis(DEFAULT_CONNECTION_TIMEOUT_MS) |
335 | | }; |
336 | 0 | let client = Self::build_client(connector, connection_timeout); |
337 | | |
338 | | Ok(Self { |
339 | 0 | client, |
340 | 0 | retrier: Retrier::new( |
341 | 0 | Arc::new(|duration| Box::pin(sleep(duration))), |
342 | 0 | jitter_fn, |
343 | 0 | config.common.retry.clone(), |
344 | | ), |
345 | 0 | config: Arc::new(config), |
346 | | }) |
347 | 0 | } |
348 | | |
349 | 0 | fn build_connector(config: &ExperimentalAzureSpec) -> HttpsConnector<LegacyHttpConnector> { |
350 | 0 | let builder = HttpsConnectorBuilder::new().with_webpki_roots(); |
351 | | |
352 | 0 | let builder_with_schemes = if config.common.insecure_allow_http { |
353 | 0 | builder.https_or_http() |
354 | | } else { |
355 | 0 | builder.https_only() |
356 | | }; |
357 | | |
358 | 0 | if config.common.disable_http2 { |
359 | 0 | builder_with_schemes.enable_http1().build() |
360 | | } else { |
361 | 0 | builder_with_schemes.enable_http1().enable_http2().build() |
362 | | } |
363 | 0 | } |
364 | | |
365 | 0 | fn build_client( |
366 | 0 | connector: HttpsConnector<LegacyHttpConnector>, |
367 | 0 | connection_timeout: Duration, |
368 | 0 | ) -> LegacyClient<HttpsConnector<LegacyHttpConnector>, Full<Bytes>> { |
369 | 0 | LegacyClient::builder(TokioExecutor::new()) |
370 | 0 | .pool_idle_timeout(connection_timeout) |
371 | 0 | .pool_max_idle_per_host(MAX_IDLE_PER_HOST) |
372 | 0 | .build(connector) |
373 | 0 | } |
374 | | } |
375 | | |
376 | | impl core::fmt::Debug for AzureClient { |
377 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
378 | 0 | f.debug_struct("AzureClient") |
379 | 0 | .field("config", &self.config) |
380 | 0 | .finish() |
381 | 0 | } |
382 | | } |
383 | | |
384 | | #[async_trait::async_trait] |
385 | | impl HttpClient for AzureClient { |
386 | | async fn execute_request( |
387 | | &self, |
388 | | request: &azure_core::Request, |
389 | 0 | ) -> azure_core::Result<azure_core::Response> { |
390 | | use conversions::RequestExt; |
391 | | |
392 | | let components = request.clone().into_components().await?; |
393 | | |
394 | | match self |
395 | | .retrier |
396 | | .retry(execution::create_retry_stream( |
397 | | self.client.clone(), |
398 | | components, |
399 | | )) |
400 | | .await |
401 | | { |
402 | | Ok(response) => Ok(response), |
403 | | Err(e) => Err(azure_core::Error::new( |
404 | | azure_core::error::ErrorKind::Other, |
405 | | format!("Connection failed after retries: {e}"), |
406 | | )), |
407 | | } |
408 | 0 | } |
409 | | } |
410 | | |
411 | | #[derive(MetricsComponent, Debug)] |
412 | | pub struct AzureBlobStore<NowFn> { |
413 | | client: Arc<ContainerClient>, |
414 | | now_fn: NowFn, |
415 | | #[metric(help = "The container name for the Azure store")] |
416 | | container: String, |
417 | | #[metric(help = "The blob prefix for the Azure store")] |
418 | | blob_prefix: String, |
419 | | retrier: Retrier, |
420 | | #[metric(help = "The number of seconds to consider an object expired")] |
421 | | consider_expired_after_s: i64, |
422 | | #[metric(help = "The number of bytes to buffer for retrying requests")] |
423 | | max_retry_buffer_per_request: usize, |
424 | | #[metric(help = "The number of concurrent uploads allowed")] |
425 | | max_concurrent_uploads: usize, |
426 | | } |
427 | | |
428 | | impl<I, NowFn> AzureBlobStore<NowFn> |
429 | | where |
430 | | I: InstantWrapper, |
431 | | NowFn: Fn() -> I + Send + Sync + Unpin + 'static, |
432 | | { |
433 | 0 | pub async fn new(spec: &ExperimentalAzureSpec, now_fn: NowFn) -> Result<Arc<Self>, Error> { |
434 | 0 | let jitter_fn = spec.common.retry.make_jitter_fn(); |
435 | | |
436 | 0 | let http_client = Arc::new( |
437 | 0 | AzureClient::new(spec.clone(), jitter_fn.clone()) |
438 | 0 | .map_err(|e| make_err!(Code::Unavailable, "Failed to create Azure client: {e}"))?, |
439 | | ); |
440 | | |
441 | 0 | let transport_options = TransportOptions::new(http_client); |
442 | | |
443 | 0 | let account_key = std::env::var(ACCOUNT_KEY_ENV_VAR).map_err(|e| { |
444 | 0 | make_err!( |
445 | 0 | Code::FailedPrecondition, |
446 | | "Failed to read {ACCOUNT_KEY_ENV_VAR} environment variable: {e}" |
447 | | ) |
448 | 0 | })?; |
449 | | |
450 | 0 | let storage_credentials = |
451 | 0 | StorageCredentials::access_key(spec.account_name.clone(), Secret::new(account_key)); |
452 | | |
453 | | // Create a container client with the specified credentials and transport |
454 | 0 | let container_client = BlobServiceClient::builder(&spec.account_name, storage_credentials) |
455 | 0 | .transport(transport_options) |
456 | 0 | .container_client(&spec.container); |
457 | | |
458 | 0 | Self::new_with_client_and_jitter(spec, container_client, jitter_fn, now_fn) |
459 | 0 | } |
460 | | |
461 | 12 | pub fn new_with_client_and_jitter( |
462 | 12 | spec: &ExperimentalAzureSpec, |
463 | 12 | client: ContainerClient, |
464 | 12 | jitter_fn: Arc<dyn Fn(Duration) -> Duration + Send + Sync>, |
465 | 12 | now_fn: NowFn, |
466 | 12 | ) -> Result<Arc<Self>, Error> { |
467 | 12 | Ok(Arc::new(Self { |
468 | 12 | client: Arc::new(client), |
469 | 12 | now_fn, |
470 | 12 | container: spec.container.clone(), |
471 | 12 | blob_prefix: spec |
472 | 12 | .common |
473 | 12 | .key_prefix |
474 | 12 | .as_ref() |
475 | 12 | .unwrap_or(&String::new()) |
476 | 12 | .clone(), |
477 | 12 | retrier: Retrier::new( |
478 | 12 | Arc::new(|duration| Box::pin1 (sleep1 (duration1 ))), |
479 | 12 | jitter_fn, |
480 | 12 | spec.common.retry.clone(), |
481 | | ), |
482 | 12 | consider_expired_after_s: i64::from(spec.common.consider_expired_after_s), |
483 | 12 | max_retry_buffer_per_request: spec |
484 | 12 | .common |
485 | 12 | .max_retry_buffer_per_request |
486 | 12 | .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST), |
487 | 12 | max_concurrent_uploads: spec |
488 | 12 | .common |
489 | 12 | .multipart_max_concurrent_uploads |
490 | 12 | .map_or(DEFAULT_MAX_CONCURRENT_UPLOADS, |v| v), |
491 | | })) |
492 | 12 | } |
493 | | |
494 | 11 | fn make_blob_path(&self, key: &StoreKey<'_>) -> String { |
495 | 11 | format!("{}{}", self.blob_prefix, key.as_str()) |
496 | 11 | } |
497 | | |
498 | 5 | async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> { |
499 | 5 | let blob_path = self.make_blob_path(digest); |
500 | | |
501 | 5 | self.retrier |
502 | 5 | .retry(unfold((), move |state| { |
503 | 5 | let blob_path = blob_path.clone(); |
504 | 5 | async move { |
505 | 5 | let result = self.client.blob_client(&blob_path).get_properties().await; |
506 | | |
507 | 5 | match result { |
508 | 4 | Ok(props) => { |
509 | 4 | if self.consider_expired_after_s > 0 { |
510 | 2 | let last_modified = props.blob.properties.last_modified; |
511 | 2 | let now = (self.now_fn)().unix_timestamp() as i64; |
512 | 2 | if last_modified.unix_timestamp() + self.consider_expired_after_s |
513 | 2 | <= now |
514 | | { |
515 | 1 | return Some((RetryResult::Ok(None), state)); |
516 | 1 | } |
517 | 2 | } |
518 | 3 | let blob_size = props.blob.properties.content_length; |
519 | 3 | Some((RetryResult::Ok(Some(blob_size)), state)) |
520 | | } |
521 | 1 | Err(err) => { |
522 | 1 | if err |
523 | 1 | .as_http_error() |
524 | 1 | .is_some_and(|e| e.status() == StatusCode::NotFound) |
525 | | { |
526 | 1 | Some((RetryResult::Ok(None), state)) |
527 | 0 | } else if err.to_string().contains("ContainerNotFound") { |
528 | 0 | Some(( |
529 | 0 | RetryResult::Err(make_err!( |
530 | 0 | Code::InvalidArgument, |
531 | 0 | "Container not found: {}", |
532 | 0 | err |
533 | 0 | )), |
534 | 0 | state, |
535 | 0 | )) |
536 | | } else { |
537 | 0 | Some(( |
538 | 0 | RetryResult::Retry(make_err!( |
539 | 0 | Code::Unavailable, |
540 | 0 | "Failed to get blob properties: {:?}", |
541 | 0 | err |
542 | 0 | )), |
543 | 0 | state, |
544 | 0 | )) |
545 | | } |
546 | | } |
547 | | } |
548 | 5 | } |
549 | 5 | })) |
550 | 5 | .await |
551 | 5 | } |
552 | | } |
553 | | |
554 | | #[async_trait] |
555 | | impl<I, NowFn> StoreDriver for AzureBlobStore<NowFn> |
556 | | where |
557 | | I: InstantWrapper, |
558 | | NowFn: Fn() -> I + Send + Sync + Unpin + 'static, |
559 | | { |
560 | | async fn has_with_results( |
561 | | self: Pin<&Self>, |
562 | | keys: &[StoreKey<'_>], |
563 | | results: &mut [Option<u64>], |
564 | 6 | ) -> Result<(), Error> { |
565 | | keys.iter() |
566 | | .zip(results.iter_mut()) |
567 | 6 | .map(|(key, result)| async move { |
568 | 6 | if is_zero_digest(key.borrow()) { |
569 | 1 | *result = Some(0); |
570 | 1 | return Ok::<_, Error>(()); |
571 | 5 | } |
572 | 5 | *result = self.has(key).await?0 ; |
573 | 5 | Ok::<_, Error>(()) |
574 | 12 | }) |
575 | | .collect::<FuturesUnordered<_>>() |
576 | | .try_collect() |
577 | | .await |
578 | 6 | } |
579 | | |
580 | 0 | fn optimized_for(&self, optimization: StoreOptimizations) -> bool { |
581 | 0 | matches!(optimization, StoreOptimizations::LazyExistenceOnSync) |
582 | 0 | } |
583 | | |
584 | | async fn update( |
585 | | self: Pin<&Self>, |
586 | | digest: StoreKey<'_>, |
587 | | mut reader: DropCloserReadHalf, |
588 | | upload_size: UploadSizeInfo, |
589 | 4 | ) -> Result<(), Error> { |
590 | | let blob_path = self.make_blob_path(&digest); |
591 | | // Handling zero-sized content check |
592 | | if upload_size == UploadSizeInfo::ExactSize(0) { |
593 | | return Ok(()); |
594 | | } |
595 | | |
596 | | let max_size = match upload_size { |
597 | | UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, |
598 | | }; |
599 | | |
600 | | // For small files, we'll use single block upload |
601 | | if max_size < DEFAULT_BLOCK_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) { |
602 | | let UploadSizeInfo::ExactSize(sz) = upload_size else { |
603 | | unreachable!("upload_size must be UploadSizeInfo::ExactSize here"); |
604 | | }; |
605 | | |
606 | | reader.set_max_recent_data_size( |
607 | | u64::try_from(self.max_retry_buffer_per_request) |
608 | | .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?, |
609 | | ); |
610 | | |
611 | | return self.retrier |
612 | 2 | .retry(unfold(reader, move |mut reader| { |
613 | 2 | let client = Arc::clone(&self.client); |
614 | 2 | let blob_client = client.blob_client(&blob_path); |
615 | 2 | async move { |
616 | 2 | let (mut tx, mut rx) = make_buf_channel_pair(); |
617 | | |
618 | 2 | let result = { |
619 | 2 | let reader_ref = &mut reader; |
620 | 2 | let (upload_res, bind_res) = tokio::join!( |
621 | 2 | async { |
622 | 2 | let mut buffer = Vec::with_capacity(usize::try_from(sz).expect("size must be non-negative and fit in usize")); |
623 | 7 | while let Ok(Some(chunk5 )) = rx.try_next().await { |
624 | 5 | buffer.extend_from_slice(&chunk); |
625 | 5 | } |
626 | | |
627 | 2 | blob_client |
628 | 2 | .put_block_blob(Body::from(buffer)) |
629 | 2 | .content_type("application/octet-stream") |
630 | 2 | .into_future() |
631 | 2 | .await |
632 | 2 | .map(|_| ()) |
633 | 2 | .map_err(|e| make_err!0 (Code::Aborted0 , "{:?}", e)) |
634 | 2 | }, |
635 | 2 | async { |
636 | 2 | tx.bind_buffered(reader_ref).await |
637 | 2 | } |
638 | | ); |
639 | | |
640 | 2 | upload_res |
641 | 2 | .and(bind_res) |
642 | 2 | .err_tip(|| "Failed to upload blob in single chunk") |
643 | | }; |
644 | | |
645 | 2 | match result { |
646 | 2 | Ok(()) => Some((RetryResult::Ok(()), reader)), |
647 | 0 | Err(mut err) => { |
648 | 0 | err.code = Code::Aborted; |
649 | 0 | let bytes_received = reader.get_bytes_received(); |
650 | | |
651 | 0 | if let Err(try_reset_err) = reader.try_reset_stream() { |
652 | 0 | event!( |
653 | 0 | Level::ERROR, |
654 | | ?bytes_received, |
655 | | err = ?try_reset_err, |
656 | | "Unable to reset stream after failed upload in AzureStore::update" |
657 | | ); |
658 | 0 | Some((RetryResult::Err(err |
659 | 0 | .merge(try_reset_err) |
660 | 0 | .append(format!("Failed to retry upload with {bytes_received} bytes received in AzureStore::update"))), |
661 | 0 | reader)) |
662 | | } else { |
663 | 0 | let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in AzureStore::update")); |
664 | 0 | event!( |
665 | 0 | Level::INFO, |
666 | | ?err, |
667 | | ?bytes_received, |
668 | | "Retryable Azure error" |
669 | | ); |
670 | 0 | Some((RetryResult::Retry(err), reader)) |
671 | | } |
672 | | } |
673 | | } |
674 | 2 | } |
675 | 2 | })) |
676 | | .await; |
677 | | } |
678 | | |
679 | | // For larger files, we'll use block upload strategy |
680 | | let block_size = |
681 | | cmp::min(max_size / (MAX_BLOCKS as u64 - 1), MAX_BLOCK_SIZE).max(DEFAULT_BLOCK_SIZE); |
682 | | |
683 | | let (tx, mut rx) = mpsc::channel(self.max_concurrent_uploads); |
684 | | let mut block_ids = Vec::with_capacity(MAX_BLOCKS); |
685 | | let retrier = self.retrier.clone(); |
686 | | |
687 | | let read_stream_fut = { |
688 | | let tx = tx.clone(); |
689 | | let blob_path = blob_path.clone(); |
690 | 1 | async move { |
691 | 4 | for block_id in 0..MAX_BLOCKS1 { |
692 | 4 | let write_buf = reader |
693 | 4 | .consume(Some( |
694 | 4 | usize::try_from(block_size) |
695 | 4 | .err_tip(|| "Could not convert block_size to usize")?0 , |
696 | | )) |
697 | 4 | .await |
698 | 4 | .err_tip(|| "Failed to read chunk in azure_store")?0 ; |
699 | | |
700 | 4 | if write_buf.is_empty() { |
701 | 1 | break; |
702 | 3 | } |
703 | | |
704 | 3 | let block_id = format!("{block_id:032}"); |
705 | 3 | let blob_path = blob_path.clone(); |
706 | | |
707 | 3 | tx.send(async move { |
708 | 3 | self.retrier |
709 | 3 | .retry(unfold( |
710 | 3 | (write_buf, block_id.clone()), |
711 | 3 | move |(write_buf, block_id)| { |
712 | 3 | let client = Arc::clone(&self.client); |
713 | 3 | let blob_client = client.blob_client(&blob_path); |
714 | 3 | async move { |
715 | 3 | let retry_result = blob_client |
716 | 3 | .put_block( |
717 | 3 | block_id.clone(), |
718 | 3 | Body::from(write_buf.clone()), |
719 | 3 | ) |
720 | 3 | .into_future() |
721 | 3 | .await |
722 | 3 | .map_or_else( |
723 | 0 | |e| { |
724 | 0 | RetryResult::Retry(make_err!( |
725 | 0 | Code::Aborted, |
726 | 0 | "Failed to upload block {} in Azure store: {:?}", |
727 | 0 | block_id, |
728 | 0 | e |
729 | 0 | )) |
730 | 0 | }, |
731 | 3 | |_| RetryResult::Ok(block_id.clone()), |
732 | | ); |
733 | 3 | Some((retry_result, (write_buf, block_id))) |
734 | 3 | } |
735 | 3 | }, |
736 | | )) |
737 | 3 | .await |
738 | 3 | }) |
739 | 3 | .await |
740 | 3 | .map_err(|err| {0 |
741 | 0 | Error::from_std_err(Code::Internal, &err) |
742 | 0 | .append("Failed to send block to channel") |
743 | 0 | })?; |
744 | | } |
745 | 1 | Ok::<_, Error>(()) |
746 | 1 | } |
747 | | .fuse() |
748 | | }; |
749 | | |
750 | | let mut upload_futures = FuturesUnordered::new(); |
751 | | |
752 | | tokio::pin!(read_stream_fut); |
753 | | |
754 | | loop { |
755 | | if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() { |
756 | | break; |
757 | | } |
758 | | tokio::select! { |
759 | | result = &mut read_stream_fut => result?, |
760 | | Some(block_id) = upload_futures.next() => block_ids.push(block_id?), |
761 | | Some(fut) = rx.recv() => upload_futures.push(fut), |
762 | | } |
763 | | } |
764 | | |
765 | | // Sorting block IDs to ensure consistent ordering |
766 | | block_ids.sort_unstable(); |
767 | | |
768 | | // Commit the block list |
769 | | let block_list = BlockList { |
770 | | blocks: block_ids |
771 | | .into_iter() |
772 | 3 | .map(|id| BlobBlockType::Latest(BlockId::from(id))) |
773 | | .collect(), |
774 | | }; |
775 | | |
776 | | retrier |
777 | 1 | .retry(unfold(block_list, move |block_list| { |
778 | 1 | let client = Arc::clone(&self.client); |
779 | 1 | let blob_client = client.blob_client(&blob_path); |
780 | | |
781 | 1 | async move { |
782 | | Some(( |
783 | 1 | blob_client |
784 | 1 | .put_block_list(block_list.clone()) |
785 | 1 | .content_type("application/octet-stream") |
786 | 1 | .into_future() |
787 | 1 | .await |
788 | 1 | .map_or_else( |
789 | 0 | |e| { |
790 | 0 | RetryResult::Retry( |
791 | 0 | Error::from_std_err(Code::Aborted, &e) |
792 | 0 | .append("Failed to commit block list in Azure store:"), |
793 | 0 | ) |
794 | 0 | }, |
795 | 1 | |_| RetryResult::Ok(()), |
796 | | ), |
797 | 1 | block_list, |
798 | | )) |
799 | 1 | } |
800 | 1 | })) |
801 | | .await |
802 | 4 | } |
803 | | |
804 | | async fn get_part( |
805 | | self: Pin<&Self>, |
806 | | key: StoreKey<'_>, |
807 | | writer: &mut DropCloserWriteHalf, |
808 | | offset: u64, |
809 | | length: Option<u64>, |
810 | 3 | ) -> Result<(), Error> { |
811 | | if is_zero_digest(key.borrow()) { |
812 | | writer |
813 | | .send_eof() |
814 | | .err_tip(|| "Failed to send zero EOF in azure store get_part")?; |
815 | | return Ok(()); |
816 | | } |
817 | | |
818 | | let blob_path = self.make_blob_path(&key); |
819 | | |
820 | | let client = Arc::clone(&self.client); |
821 | | let blob_client = client.blob_client(&blob_path); |
822 | | let range = match length { |
823 | | Some(len) => Range::new(offset, offset + len - 1), |
824 | | None => Range::from(offset..), |
825 | | }; |
826 | | |
827 | | self.retrier |
828 | 3 | .retry(unfold(writer, move |writer| { |
829 | 3 | let range_clone = range.clone(); |
830 | 3 | let blob_client = blob_client.clone(); |
831 | 3 | async move { |
832 | 3 | let result = async { |
833 | 3 | let mut stream = blob_client.get().range(range_clone.clone()).into_stream(); |
834 | | |
835 | 5 | while let Some(chunk_result3 ) = stream.next().await { |
836 | 3 | match chunk_result { |
837 | 2 | Ok(response) => { |
838 | 2 | let data = response.data.collect().await.map_err(|e| {0 |
839 | 0 | make_err!( |
840 | 0 | Code::Aborted, |
841 | | "Failed to collect response data: {:?}", |
842 | | e |
843 | | ) |
844 | 0 | })?; |
845 | 2 | if data.is_empty() { |
846 | 0 | continue; |
847 | 2 | } |
848 | 2 | writer.send(data).await.map_err(|e| {0 |
849 | 0 | make_err!( |
850 | 0 | Code::Aborted, |
851 | | "Failed to send data to writer: {:?}", |
852 | | e |
853 | | ) |
854 | 0 | })?; |
855 | | } |
856 | 1 | Err(e) => { |
857 | 1 | return match e { |
858 | 1 | e0 if e.as_http_error().is_some_and(|e| { |
859 | 1 | e.status() == StatusCode::NotFound |
860 | 1 | }) => |
861 | | { |
862 | 0 | Err(make_err!( |
863 | 0 | Code::NotFound, |
864 | 0 | "Blob not found in Azure: {:?}", |
865 | 0 | e |
866 | 0 | )) |
867 | | } |
868 | 1 | _ => Err(make_err!( |
869 | 1 | Code::Aborted, |
870 | 1 | "Error reading from Azure stream: {:?}", |
871 | 1 | e |
872 | 1 | )), |
873 | | }; |
874 | | } |
875 | | } |
876 | | } |
877 | | |
878 | 2 | writer.send_eof().map_err(|e| {0 |
879 | 0 | make_err!(Code::Aborted, "Failed to send EOF to writer: {:?}", e) |
880 | 0 | })?; |
881 | 2 | Ok(()) |
882 | 3 | } |
883 | 3 | .await; |
884 | | |
885 | 3 | match result { |
886 | 2 | Ok(()) => Some((RetryResult::Ok(()), writer)), |
887 | 1 | Err(e) => { |
888 | 1 | if e.code == Code::NotFound { |
889 | 0 | Some((RetryResult::Err(e), writer)) |
890 | | } else { |
891 | 1 | Some((RetryResult::Retry(e), writer)) |
892 | | } |
893 | | } |
894 | | } |
895 | 3 | } |
896 | 3 | })) |
897 | | .await |
898 | 3 | } |
899 | | |
900 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver { |
901 | 0 | self |
902 | 0 | } |
903 | | |
904 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
905 | 0 | self |
906 | 0 | } |
907 | | |
908 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
909 | 0 | self |
910 | 0 | } |
911 | | |
912 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
913 | 0 | registry.register_indicator(self); |
914 | 0 | } |
915 | | |
916 | 0 | fn register_remove_callback( |
917 | 0 | self: Arc<Self>, |
918 | 0 | _callback: Arc<dyn RemoveItemCallback>, |
919 | 0 | ) -> Result<(), Error> { |
920 | | // Azure Blob Storage manages object lifecycle externally, |
921 | | // so we can safely ignore remove callbacks. |
922 | 0 | Ok(()) |
923 | 0 | } |
924 | | } |
925 | | |
926 | | #[async_trait] |
927 | | impl<I, NowFn> HealthStatusIndicator for AzureBlobStore<NowFn> |
928 | | where |
929 | | I: InstantWrapper, |
930 | | NowFn: Fn() -> I + Send + Sync + Unpin + 'static, |
931 | | { |
932 | 0 | fn get_name(&self) -> &'static str { |
933 | 0 | "AzureBlobStore" |
934 | 0 | } |
935 | | |
936 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
937 | | StoreDriver::check_health(Pin::new(self), namespace).await |
938 | 0 | } |
939 | | } |