Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/common_s3_utils.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::future::Future;
16
use core::pin::Pin;
17
use core::task::{Context, Poll};
18
use std::sync::Arc;
19
20
use aws_config::retry::ErrorKind::TransientError;
21
use aws_smithy_runtime_api::client::http::{
22
    HttpClient as SmithyHttpClient, HttpConnector as SmithyHttpConnector, HttpConnectorFuture,
23
    HttpConnectorSettings, SharedHttpConnector,
24
};
25
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
26
use aws_smithy_runtime_api::client::result::ConnectorError;
27
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
28
use aws_smithy_runtime_api::http::Response;
29
use aws_smithy_types::body::SdkBody;
30
use bytes::{Bytes, BytesMut};
31
use futures::Stream;
32
use http_body::{Frame, SizeHint};
33
use http_body_util::BodyExt;
34
use hyper::{Method, Request};
35
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
36
use hyper_util::client::legacy::Client as LegacyClient;
37
use hyper_util::client::legacy::connect::HttpConnector as LegacyHttpConnector;
38
use hyper_util::rt::TokioExecutor;
39
use nativelink_config::stores::CommonObjectSpec;
40
// Note: S3 store should be very careful about the error codes it returns
41
// when in a retryable wrapper. Always prefer Code::Aborted or another
42
// retryable code over Code::InvalidArgument or make_input_err!().
43
// ie: Don't import make_input_err!() to help prevent this.
44
use nativelink_error::{Code, make_err};
45
use nativelink_util::buf_channel::DropCloserReadHalf;
46
use nativelink_util::fs;
47
use nativelink_util::retry::{Retrier, RetryResult};
48
use tokio::time::sleep;
49
50
#[derive(Clone)]
51
pub struct TlsClient {
52
    client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
53
    retrier: Retrier,
54
}
55
56
impl TlsClient {
57
    #[must_use]
58
3
    pub fn new(common: &CommonObjectSpec) -> Self {
59
3
        let connector_with_roots = HttpsConnectorBuilder::new().with_platform_verifier();
60
61
3
        let connector_with_schemes = if common.insecure_allow_http {
  Branch (61:41): [True: 0, False: 3]
  Branch (61:41): [Folded - Ignored]
62
0
            connector_with_roots.https_or_http()
63
        } else {
64
3
            connector_with_roots.https_only()
65
        };
66
67
3
        let connector = if common.disable_http2 {
  Branch (67:28): [True: 0, False: 3]
  Branch (67:28): [Folded - Ignored]
68
0
            connector_with_schemes.enable_http1().build()
69
        } else {
70
3
            connector_with_schemes.enable_http1().enable_http2().build()
71
        };
72
73
3
        Self::with_https_connector(common, connector)
74
3
    }
75
76
6
    pub fn with_https_connector(
77
6
        common: &CommonObjectSpec,
78
6
        connector: HttpsConnector<LegacyHttpConnector>,
79
6
    ) -> Self {
80
6
        let client = LegacyClient::builder(TokioExecutor::new()).build(connector);
81
82
        Self {
83
6
            client,
84
6
            retrier: Retrier::new(
85
6
                Arc::new(|duration| 
Box::pin0
(
sleep0
(
duration0
))),
86
6
                common.retry.make_jitter_fn(),
87
6
                common.retry.clone(),
88
            ),
89
        }
90
6
    }
91
}
92
93
impl core::fmt::Debug for TlsClient {
94
42
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
95
42
        f.debug_struct("TlsClient").finish_non_exhaustive()
96
42
    }
97
}
98
99
impl SmithyHttpClient for TlsClient {
100
6
    fn http_connector(
101
6
        &self,
102
6
        _settings: &HttpConnectorSettings,
103
6
        _components: &RuntimeComponents,
104
6
    ) -> SharedHttpConnector {
105
6
        SharedHttpConnector::new(self.clone())
106
6
    }
107
}
108
109
struct RequestBuilder<'a> {
110
    components: &'a RequestComponents,
111
}
112
113
impl<'a> RequestBuilder<'a> {
114
    #[inline]
115
6
    const fn new(components: &'a RequestComponents) -> Self {
116
6
        Self { components }
117
6
    }
118
119
    #[inline]
120
    #[allow(unused_qualifications, reason = "false positive on hyper::http::Error")]
121
6
    fn build(&self) -> Result<Request<SdkBody>, hyper::http::Error> {
122
6
        let mut req_builder = Request::builder()
123
6
            .method(self.components.method.clone())
124
6
            .uri(self.components.uri.clone())
125
6
            .version(self.components.version);
126
127
6
        let headers_map = req_builder.headers_mut().unwrap();
128
24
        for (
name18
,
value18
) in &self.components.headers {
129
18
            headers_map.insert(name, value.clone());
130
18
        }
131
132
6
        match &self.components.body_data {
133
6
            BufferedBodyState::Cloneable(body) => {
134
6
                let cloned_body = body.try_clone().expect("Body should be cloneable");
135
6
                req_builder.body(cloned_body)
136
            }
137
0
            BufferedBodyState::Buffered(bytes) => req_builder.body(SdkBody::from(bytes.clone())),
138
0
            BufferedBodyState::Empty => req_builder.body(SdkBody::empty()),
139
        }
140
6
    }
141
}
142
143
mod execution {
144
    use super::conversions::ResponseExt;
145
    use super::{
146
        Code, HttpsConnector, LegacyClient, LegacyHttpConnector, RequestBuilder, RequestComponents,
147
        Response, RetryResult, SdkBody, fs, make_err,
148
    };
149
150
    #[inline]
151
6
    pub(crate) async fn execute_request(
152
6
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
153
6
        components: &RequestComponents,
154
6
    ) -> RetryResult<Response<SdkBody>> {
155
6
        let _permit = match fs::get_permit().await {
156
6
            Ok(permit) => permit,
157
0
            Err(e) => {
158
0
                return RetryResult::Retry(make_err!(
159
0
                    Code::Unavailable,
160
0
                    "Failed to acquire permit: {e}"
161
0
                ));
162
            }
163
        };
164
165
6
        let request = match RequestBuilder::new(components).build() {
166
6
            Ok(req) => req,
167
0
            Err(e) => {
168
0
                return RetryResult::Err(make_err!(
169
0
                    Code::Internal,
170
0
                    "Failed to create request: {e}",
171
0
                ));
172
            }
173
        };
174
175
6
        match client.request(request).await {
176
0
            Ok(resp) => RetryResult::Ok(resp.into_smithy_response()),
177
6
            Err(e) => RetryResult::Retry(make_err!(
178
6
                Code::Unavailable,
179
6
                "Failed request in S3Store: {e}"
180
6
            )),
181
        }
182
6
    }
183
184
    #[inline]
185
6
    pub(crate) fn create_retry_stream(
186
6
        client: LegacyClient<HttpsConnector<LegacyHttpConnector>, SdkBody>,
187
6
        components: RequestComponents,
188
6
    ) -> impl futures::Stream<Item = RetryResult<Response<SdkBody>>> {
189
6
        futures::stream::unfold(components, move |components| {
190
6
            let client_clone = client.clone();
191
6
            async move {
192
6
                let result = execute_request(client_clone, &components).await;
193
194
6
                Some((result, components))
195
6
            }
196
6
        })
197
6
    }
198
}
199
200
enum BufferedBodyState {
201
    Cloneable(SdkBody),
202
    Buffered(Bytes),
203
    Empty,
204
}
205
206
mod body_processing {
207
    use super::{BodyExt, BufferedBodyState, BytesMut, ConnectorError, SdkBody, TransientError};
208
209
    /// Buffer a request body fully into memory.
210
    ///
211
    /// TODO(aaronmondal): This could lead to OOMs in extremely constrained
212
    ///                    environments. Probably better to implement something
213
    ///                    like a rewindable stream logic.
214
    #[inline]
215
0
    pub(crate) async fn buffer_body(body: SdkBody) -> Result<BufferedBodyState, ConnectorError> {
216
0
        let mut bytes = BytesMut::new();
217
0
        let mut body_stream = body;
218
0
        while let Some(frame) = body_stream.frame().await {
  Branch (218:19): [True: 0, False: 0]
  Branch (218:19): [Folded - Ignored]
219
0
            match frame {
220
0
                Ok(frame) => {
221
0
                    if let Some(data) = frame.data_ref() {
  Branch (221:28): [True: 0, False: 0]
  Branch (221:28): [Folded - Ignored]
222
0
                        bytes.extend_from_slice(data);
223
0
                    }
224
                }
225
0
                Err(e) => {
226
0
                    return Err(ConnectorError::other(
227
0
                        format!("Failed to read request body: {e}").into(),
228
0
                        Some(TransientError),
229
0
                    ));
230
                }
231
            }
232
        }
233
234
0
        Ok(BufferedBodyState::Buffered(bytes.freeze()))
235
0
    }
236
}
237
238
struct RequestComponents {
239
    method: Method,
240
    uri: hyper::Uri,
241
    version: hyper::Version,
242
    headers: hyper::HeaderMap,
243
    body_data: BufferedBodyState,
244
}
245
246
mod conversions {
247
    use super::{
248
        BufferedBodyState, ConnectorError, Future, HttpRequest, Method, RequestComponents,
249
        Response, SdkBody, TransientError, body_processing,
250
    };
251
252
    pub(crate) trait RequestExt {
253
        fn into_components(self)
254
        -> impl Future<Output = Result<RequestComponents, ConnectorError>>;
255
    }
256
257
    impl RequestExt for HttpRequest {
258
6
        async fn into_components(self) -> Result<RequestComponents, ConnectorError> {
259
            // Note: This does *not* refer the the HTTP protocol, but to the
260
            //       version of the http crate.
261
6
            let hyper_req = self.try_into_http1x().map_err(|e| 
{0
262
0
                ConnectorError::other(
263
0
                    format!("Failed to convert to HTTP request: {e}").into(),
264
0
                    Some(TransientError),
265
                )
266
0
            })?;
267
268
6
            let method = hyper_req.method().clone();
269
6
            let uri = hyper_req.uri().clone();
270
6
            let version = hyper_req.version();
271
6
            let headers = hyper_req.headers().clone();
272
273
6
            let body = hyper_req.into_body();
274
275
            // Only buffer bodies for methods likely to have payloads.
276
6
            let needs_buffering = 
matches!0
(method, Method::POST | Method::PUT);
277
278
            // Preserve the body in case we need to retry.
279
6
            let body_data = if needs_buffering {
  Branch (279:32): [True: 6, False: 0]
  Branch (279:32): [Folded - Ignored]
280
6
                if let Some(cloneable_body) = body.try_clone() {
  Branch (280:24): [True: 6, False: 0]
  Branch (280:24): [Folded - Ignored]
281
6
                    BufferedBodyState::Cloneable(cloneable_body)
282
                } else {
283
0
                    body_processing::buffer_body(body).await?
284
                }
285
            } else {
286
0
                BufferedBodyState::Empty
287
            };
288
289
6
            Ok(RequestComponents {
290
6
                method,
291
6
                uri,
292
6
                version,
293
6
                headers,
294
6
                body_data,
295
6
            })
296
6
        }
297
    }
298
299
    pub(crate) trait ResponseExt {
300
        fn into_smithy_response(self) -> Response<SdkBody>;
301
    }
302
303
    impl ResponseExt for hyper::Response<hyper::body::Incoming> {
304
0
        fn into_smithy_response(self) -> Response<SdkBody> {
305
0
            let (parts, body) = self.into_parts();
306
0
            let sdk_body = SdkBody::from_body_1_x(body);
307
0
            let mut smithy_resp = Response::new(parts.status.into(), sdk_body);
308
0
            let header_pairs: Vec<(String, String)> = parts
309
0
                .headers
310
0
                .iter()
311
0
                .filter_map(|(name, value)| {
312
0
                    value
313
0
                        .to_str()
314
0
                        .ok()
315
0
                        .map(|value_str| (name.as_str().to_owned(), value_str.to_owned()))
316
0
                })
317
0
                .collect();
318
319
0
            for (name, value) in header_pairs {
320
0
                smithy_resp.headers_mut().insert(name, value);
321
0
            }
322
323
0
            smithy_resp
324
0
        }
325
    }
326
}
327
328
impl SmithyHttpConnector for TlsClient {
329
6
    fn call(&self, req: HttpRequest) -> HttpConnectorFuture {
330
        use conversions::RequestExt;
331
332
6
        let client = self.client.clone();
333
6
        let retrier = self.retrier.clone();
334
335
6
        HttpConnectorFuture::new(Box::pin(async move {
336
6
            let components = req.into_components().await
?0
;
337
338
6
            let retry_stream = execution::create_retry_stream(client, components);
339
340
6
            match retrier.retry(retry_stream).await {
341
0
                Ok(response) => Ok(response),
342
6
                Err(e) => Err(ConnectorError::other(
343
6
                    format!("Connection failed after retries: {e}").into(),
344
6
                    Some(TransientError),
345
6
                )),
346
            }
347
6
        }))
348
6
    }
349
}
350
351
#[derive(Debug)]
352
pub struct BodyWrapper {
353
    pub reader: DropCloserReadHalf,
354
    pub size: u64,
355
}
356
357
impl http_body::Body for BodyWrapper {
358
    type Data = Bytes;
359
    type Error = std::io::Error;
360
361
0
    fn poll_frame(
362
0
        self: Pin<&mut Self>,
363
0
        cx: &mut Context<'_>,
364
0
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
365
0
        let reader = Pin::new(&mut Pin::get_mut(self).reader);
366
0
        reader
367
0
            .poll_next(cx)
368
0
            .map(|maybe_bytes_res| maybe_bytes_res.map(|res| res.map(Frame::data)))
369
0
    }
370
371
0
    fn size_hint(&self) -> SizeHint {
372
0
        SizeHint::with_exact(self.size)
373
0
    }
374
}