/build/source/nativelink-store/src/common_s3_utils.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::future::Future; |
16 | | use core::pin::Pin; |
17 | | use core::task::{Context, Poll}; |
18 | | use std::sync::Arc; |
19 | | |
20 | | use aws_config::retry::ErrorKind::TransientError; |
21 | | use aws_smithy_runtime_api::client::http::{ |
22 | | HttpClient as SmithyHttpClient, HttpConnector as SmithyHttpConnector, HttpConnectorFuture, |
23 | | HttpConnectorSettings, SharedHttpConnector, |
24 | | }; |
25 | | use aws_smithy_runtime_api::client::orchestrator::HttpRequest; |
26 | | use aws_smithy_runtime_api::client::result::ConnectorError; |
27 | | use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; |
28 | | use aws_smithy_runtime_api::http::Response; |
29 | | use aws_smithy_types::body::SdkBody; |
30 | | use bytes::{Bytes, BytesMut}; |
31 | | use futures::Stream; |
32 | | use http_body::{Frame, SizeHint}; |
33 | | use http_body_util::BodyExt; |
34 | | use hyper::{Method, Request}; |
35 | | use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; |
36 | | use hyper_util::client::legacy::Client as LegacyClient; |
37 | | use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector; |
38 | | use hyper_util::rt::TokioExecutor; |
39 | | use nativelink_config::stores::CommonObjectSpec; |
40 | | // Note: S3 store should be very careful about the error codes it returns |
41 | | // when in a retryable wrapper. Always prefer Code::Aborted or another |
42 | | // retryable code over Code::InvalidArgument or make_input_err!(). |
43 | | // ie: Don't import make_input_err!() to help prevent this. |
44 | | use nativelink_error::{Code, make_err}; |
45 | | use nativelink_util::buf_channel::DropCloserReadHalf; |
46 | | use nativelink_util::fs; |
47 | | use nativelink_util::retry::{Retrier, RetryResult}; |
48 | | use tokio::time::sleep; |
49 | | |
50 | | #[derive(Clone)] |
51 | | pub struct TlsClient { |
52 | | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>, |
53 | | retrier: Retrier, |
54 | | } |
55 | | |
56 | | impl TlsClient { |
57 | | #[must_use] |
58 | 3 | pub fn new(common: &CommonObjectSpec) -> Self { |
59 | 3 | let connector_with_roots = HttpsConnectorBuilder::new().with_platform_verifier(); |
60 | | |
61 | 3 | let connector_with_schemes = if common.insecure_allow_http { Branch (61:41): [True: 0, False: 3]
Branch (61:41): [Folded - Ignored]
|
62 | 0 | connector_with_roots.https_or_http() |
63 | | } else { |
64 | 3 | connector_with_roots.https_only() |
65 | | }; |
66 | | |
67 | 3 | let connector = if common.disable_http2 { Branch (67:28): [True: 0, False: 3]
Branch (67:28): [Folded - Ignored]
|
68 | 0 | connector_with_schemes.enable_http1().build() |
69 | | } else { |
70 | 3 | connector_with_schemes.enable_http1().enable_http2().build() |
71 | | }; |
72 | | |
73 | 3 | Self::with_https_connector(common, connector) |
74 | 3 | } |
75 | | |
76 | 6 | pub fn with_https_connector( |
77 | 6 | common: &CommonObjectSpec, |
78 | 6 | connector: HttpsConnector<LegacyHttpConnector>, |
79 | 6 | ) -> Self { |
80 | 6 | let client = LegacyClient::builder(TokioExecutor::new()).build(connector); |
81 | | |
82 | | Self { |
83 | 6 | client, |
84 | 6 | retrier: Retrier::new( |
85 | 6 | Arc::new(|duration| Box::pin0 (sleep0 (duration0 ))), |
86 | 6 | common.retry.make_jitter_fn(), |
87 | 6 | common.retry.clone(), |
88 | | ), |
89 | | } |
90 | 6 | } |
91 | | } |
92 | | |
93 | | impl core::fmt::Debug for TlsClient { |
94 | 42 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { |
95 | 42 | f.debug_struct("TlsClient").finish_non_exhaustive() |
96 | 42 | } |
97 | | } |
98 | | |
99 | | impl SmithyHttpClient for TlsClient { |
100 | 6 | fn http_connector( |
101 | 6 | &self, |
102 | 6 | _settings: &HttpConnectorSettings, |
103 | 6 | _components: &RuntimeComponents, |
104 | 6 | ) -> SharedHttpConnector { |
105 | 6 | SharedHttpConnector::new(self.clone()) |
106 | 6 | } |
107 | | } |
108 | | |
109 | | struct RequestBuilder<'a> { |
110 | | components: &'a RequestComponents, |
111 | | } |
112 | | |
113 | | impl<'a> RequestBuilder<'a> { |
114 | | #[inline] |
115 | 6 | const fn new(components: &'a RequestComponents) -> Self { |
116 | 6 | Self { components } |
117 | 6 | } |
118 | | |
119 | | #[inline] |
120 | | #[allow(unused_qualifications, reason = "false positive on hyper::http::Error")] |
121 | 6 | fn build(&self) -> Result<Request<SdkBody>, hyper::http::Error> { |
122 | 6 | let mut req_builder = Request::builder() |
123 | 6 | .method(self.components.method.clone()) |
124 | 6 | .uri(self.components.uri.clone()) |
125 | 6 | .version(self.components.version); |
126 | | |
127 | 6 | let headers_map = req_builder.headers_mut().unwrap(); |
128 | 24 | for (name18 , value18 ) in &self.components.headers { |
129 | 18 | headers_map.insert(name, value.clone()); |
130 | 18 | } |
131 | | |
132 | 6 | match &self.components.body_data { |
133 | 6 | BufferedBodyState::Cloneable(body) => { |
134 | 6 | let cloned_body = body.try_clone().expect("Body should be cloneable"); |
135 | 6 | req_builder.body(cloned_body) |
136 | | } |
137 | 0 | BufferedBodyState::Buffered(bytes) => req_builder.body(SdkBody::from(bytes.clone())), |
138 | 0 | BufferedBodyState::Empty => req_builder.body(SdkBody::empty()), |
139 | | } |
140 | 6 | } |
141 | | } |
142 | | |
143 | | mod execution { |
144 | | use super::conversions::ResponseExt; |
145 | | use super::{ |
146 | | Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents, |
147 | | Response, RetryResult, SdkBody, fs, make_err, |
148 | | }; |
149 | | |
150 | | #[inline] |
151 | 6 | pub(crate) async fn execute_request( |
152 | 6 | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>, |
153 | 6 | components: &RequestComponents, |
154 | 6 | ) -> RetryResult<Response<SdkBody>> { |
155 | 6 | let _permit = match fs::get_permit().await { |
156 | 6 | Ok(permit) => permit, |
157 | 0 | Err(e) => { |
158 | 0 | return RetryResult::Retry(make_err!( |
159 | 0 | Code::Unavailable, |
160 | 0 | "Failed to acquire permit: {e}" |
161 | 0 | )); |
162 | | } |
163 | | }; |
164 | | |
165 | 6 | let request = match RequestBuilder::new(components).build() { |
166 | 6 | Ok(req) => req, |
167 | 0 | Err(e) => { |
168 | 0 | return RetryResult::Err(make_err!( |
169 | 0 | Code::Internal, |
170 | 0 | "Failed to create request: {e}", |
171 | 0 | )); |
172 | | } |
173 | | }; |
174 | | |
175 | 6 | match client.request(request).await { |
176 | 0 | Ok(resp) => RetryResult::Ok(resp.into_smithy_response()), |
177 | 6 | Err(e) => RetryResult::Retry(make_err!( |
178 | 6 | Code::Unavailable, |
179 | 6 | "Failed request in S3Store: {e}" |
180 | 6 | )), |
181 | | } |
182 | 6 | } |
183 | | |
184 | | #[inline] |
185 | 6 | pub(crate) fn create_retry_stream( |
186 | 6 | client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>, |
187 | 6 | components: RequestComponents, |
188 | 6 | ) -> impl futures::Stream<Item = RetryResult<Response<SdkBody>>> { |
189 | 6 | futures::stream::unfold(components, move |components| { |
190 | 6 | let client_clone = client.clone(); |
191 | 6 | async move { |
192 | 6 | let result = execute_request(client_clone, &components).await; |
193 | | |
194 | 6 | Some((result, components)) |
195 | 6 | } |
196 | 6 | }) |
197 | 6 | } |
198 | | } |
199 | | |
200 | | enum BufferedBodyState { |
201 | | Cloneable(SdkBody), |
202 | | Buffered(Bytes), |
203 | | Empty, |
204 | | } |
205 | | |
206 | | mod body_processing { |
207 | | use super::{BodyExt, BufferedBodyState, BytesMut, ConnectorError, SdkBody, TransientError}; |
208 | | |
209 | | /// Buffer a request body fully into memory. |
210 | | /// |
211 | | /// TODO(aaronmondal): This could lead to OOMs in extremely constrained |
212 | | /// environments. Probably better to implement something |
213 | | /// like a rewindable stream logic. |
214 | | #[inline] |
215 | 0 | pub(crate) async fn buffer_body(body: SdkBody) -> Result<BufferedBodyState, ConnectorError> { |
216 | 0 | let mut bytes = BytesMut::new(); |
217 | 0 | let mut body_stream = body; |
218 | 0 | while let Some(frame) = body_stream.frame().await { Branch (218:19): [True: 0, False: 0]
Branch (218:19): [Folded - Ignored]
|
219 | 0 | match frame { |
220 | 0 | Ok(frame) => { |
221 | 0 | if let Some(data) = frame.data_ref() { Branch (221:28): [True: 0, False: 0]
Branch (221:28): [Folded - Ignored]
|
222 | 0 | bytes.extend_from_slice(data); |
223 | 0 | } |
224 | | } |
225 | 0 | Err(e) => { |
226 | 0 | return Err(ConnectorError::other( |
227 | 0 | format!("Failed to read request body: {e}").into(), |
228 | 0 | Some(TransientError), |
229 | 0 | )); |
230 | | } |
231 | | } |
232 | | } |
233 | | |
234 | 0 | Ok(BufferedBodyState::Buffered(bytes.freeze())) |
235 | 0 | } |
236 | | } |
237 | | |
238 | | struct RequestComponents { |
239 | | method: Method, |
240 | | uri: hyper::Uri, |
241 | | version: hyper::Version, |
242 | | headers: hyper::HeaderMap, |
243 | | body_data: BufferedBodyState, |
244 | | } |
245 | | |
246 | | mod conversions { |
247 | | use super::{ |
248 | | BufferedBodyState, ConnectorError, Future, HttpRequest, Method, RequestComponents, |
249 | | Response, SdkBody, TransientError, body_processing, |
250 | | }; |
251 | | |
252 | | pub(crate) trait RequestExt { |
253 | | fn into_components(self) |
254 | | -> impl Future<Output = Result<RequestComponents, ConnectorError>>; |
255 | | } |
256 | | |
257 | | impl RequestExt for HttpRequest { |
258 | 6 | async fn into_components(self) -> Result<RequestComponents, ConnectorError> { |
259 | | // Note: This does *not* refer the the HTTP protocol, but to the |
260 | | // version of the http crate. |
261 | 6 | let hyper_req = self.try_into_http1x().map_err(|e| {0 |
262 | 0 | ConnectorError::other( |
263 | 0 | format!("Failed to convert to HTTP request: {e}").into(), |
264 | 0 | Some(TransientError), |
265 | | ) |
266 | 0 | })?; |
267 | | |
268 | 6 | let method = hyper_req.method().clone(); |
269 | 6 | let uri = hyper_req.uri().clone(); |
270 | 6 | let version = hyper_req.version(); |
271 | 6 | let headers = hyper_req.headers().clone(); |
272 | | |
273 | 6 | let body = hyper_req.into_body(); |
274 | | |
275 | | // Only buffer bodies for methods likely to have payloads. |
276 | 6 | let needs_buffering = matches!0 (method, Method::POST | Method::PUT); |
277 | | |
278 | | // Preserve the body in case we need to retry. |
279 | 6 | let body_data = if needs_buffering { Branch (279:32): [True: 6, False: 0]
Branch (279:32): [Folded - Ignored]
|
280 | 6 | if let Some(cloneable_body) = body.try_clone() { Branch (280:24): [True: 6, False: 0]
Branch (280:24): [Folded - Ignored]
|
281 | 6 | BufferedBodyState::Cloneable(cloneable_body) |
282 | | } else { |
283 | 0 | body_processing::buffer_body(body).await? |
284 | | } |
285 | | } else { |
286 | 0 | BufferedBodyState::Empty |
287 | | }; |
288 | | |
289 | 6 | Ok(RequestComponents { |
290 | 6 | method, |
291 | 6 | uri, |
292 | 6 | version, |
293 | 6 | headers, |
294 | 6 | body_data, |
295 | 6 | }) |
296 | 6 | } |
297 | | } |
298 | | |
299 | | pub(crate) trait ResponseExt { |
300 | | fn into_smithy_response(self) -> Response<SdkBody>; |
301 | | } |
302 | | |
303 | | impl ResponseExt for hyper::Response<hyper::body::Incoming> { |
304 | 0 | fn into_smithy_response(self) -> Response<SdkBody> { |
305 | 0 | let (parts, body) = self.into_parts(); |
306 | 0 | let sdk_body = SdkBody::from_body_1_x(body); |
307 | 0 | let mut smithy_resp = Response::new(parts.status.into(), sdk_body); |
308 | 0 | let header_pairs: Vec<(String, String)> = parts |
309 | 0 | .headers |
310 | 0 | .iter() |
311 | 0 | .filter_map(|(name, value)| { |
312 | 0 | value |
313 | 0 | .to_str() |
314 | 0 | .ok() |
315 | 0 | .map(|value_str| (name.as_str().to_owned(), value_str.to_owned())) |
316 | 0 | }) |
317 | 0 | .collect(); |
318 | | |
319 | 0 | for (name, value) in header_pairs { |
320 | 0 | smithy_resp.headers_mut().insert(name, value); |
321 | 0 | } |
322 | | |
323 | 0 | smithy_resp |
324 | 0 | } |
325 | | } |
326 | | } |
327 | | |
328 | | impl SmithyHttpConnector for TlsClient { |
329 | 6 | fn call(&self, req: HttpRequest) -> HttpConnectorFuture { |
330 | | use conversions::RequestExt; |
331 | | |
332 | 6 | let client = self.client.clone(); |
333 | 6 | let retrier = self.retrier.clone(); |
334 | | |
335 | 6 | HttpConnectorFuture::new(Box::pin(async move { |
336 | 6 | let components = req.into_components().await?0 ; |
337 | | |
338 | 6 | let retry_stream = execution::create_retry_stream(client, components); |
339 | | |
340 | 6 | match retrier.retry(retry_stream).await { |
341 | 0 | Ok(response) => Ok(response), |
342 | 6 | Err(e) => Err(ConnectorError::other( |
343 | 6 | format!("Connection failed after retries: {e}").into(), |
344 | 6 | Some(TransientError), |
345 | 6 | )), |
346 | | } |
347 | 6 | })) |
348 | 6 | } |
349 | | } |
350 | | |
351 | | #[derive(Debug)] |
352 | | pub struct BodyWrapper { |
353 | | pub reader: DropCloserReadHalf, |
354 | | pub size: u64, |
355 | | } |
356 | | |
357 | | impl http_body::Body for BodyWrapper { |
358 | | type Data = Bytes; |
359 | | type Error = std::io::Error; |
360 | | |
361 | 76 | fn poll_frame( |
362 | 76 | self: Pin<&mut Self>, |
363 | 76 | cx: &mut Context<'_>, |
364 | 76 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
365 | 76 | let reader = Pin::new(&mut Pin::get_mut(self).reader); |
366 | 76 | reader |
367 | 76 | .poll_next(cx) |
368 | 76 | .map(|maybe_bytes_res| maybe_bytes_res51 .map51 (|res| res50 .map50 (Frame::data))) |
369 | 76 | } |
370 | | |
371 | 2 | fn size_hint(&self) -> SizeHint { |
372 | 2 | SizeHint::with_exact(self.size) |
373 | 2 | } |
374 | | } |