/build/source/nativelink-util/src/retry.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::Duration; |
18 | | |
19 | | use futures::future::Future; |
20 | | use futures::stream::StreamExt; |
21 | | use nativelink_config::stores::{ErrorCode, Retry}; |
22 | | use nativelink_error::{Code, Error, make_err}; |
23 | | use tracing::{Level, event}; |
24 | | |
25 | | struct ExponentialBackoff { |
26 | | current: Duration, |
27 | | } |
28 | | |
29 | | impl ExponentialBackoff { |
30 | 20 | const fn new(base: Duration) -> Self { |
31 | 20 | ExponentialBackoff { current: base } |
32 | 20 | } |
33 | | } |
34 | | |
35 | | impl Iterator for ExponentialBackoff { |
36 | | type Item = Duration; |
37 | | |
38 | 16 | fn next(&mut self) -> Option<Duration> { |
39 | 16 | self.current *= 2; |
40 | 16 | Some(self.current) |
41 | 16 | } |
42 | | } |
43 | | |
44 | | type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>; |
45 | | pub(crate) type JitterFn = Arc<dyn Fn(Duration) -> Duration + Send + Sync>; |
46 | | |
47 | | #[derive(PartialEq, Eq, Debug)] |
48 | | pub enum RetryResult<T> { |
49 | | Ok(T), |
50 | | Retry(Error), |
51 | | Err(Error), |
52 | | } |
53 | | |
54 | | /// Class used to retry a job with a sleep function in between each retry. |
55 | | #[derive(Clone)] |
56 | | pub struct Retrier { |
57 | | sleep_fn: SleepFn, |
58 | | jitter_fn: JitterFn, |
59 | | config: Retry, |
60 | | } |
61 | | |
62 | | impl std::fmt::Debug for Retrier { |
63 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
64 | 0 | f.debug_struct("Retrier") |
65 | 0 | .field("config", &self.config) |
66 | 0 | .finish_non_exhaustive() |
67 | 0 | } |
68 | | } |
69 | | |
70 | 0 | const fn to_error_code(code: Code) -> ErrorCode { |
71 | 0 | match code { |
72 | 0 | Code::Cancelled => ErrorCode::Cancelled, |
73 | 0 | Code::InvalidArgument => ErrorCode::InvalidArgument, |
74 | 0 | Code::DeadlineExceeded => ErrorCode::DeadlineExceeded, |
75 | 0 | Code::NotFound => ErrorCode::NotFound, |
76 | 0 | Code::AlreadyExists => ErrorCode::AlreadyExists, |
77 | 0 | Code::PermissionDenied => ErrorCode::PermissionDenied, |
78 | 0 | Code::ResourceExhausted => ErrorCode::ResourceExhausted, |
79 | 0 | Code::FailedPrecondition => ErrorCode::FailedPrecondition, |
80 | 0 | Code::Aborted => ErrorCode::Aborted, |
81 | 0 | Code::OutOfRange => ErrorCode::OutOfRange, |
82 | 0 | Code::Unimplemented => ErrorCode::Unimplemented, |
83 | 0 | Code::Internal => ErrorCode::Internal, |
84 | 0 | Code::Unavailable => ErrorCode::Unavailable, |
85 | 0 | Code::DataLoss => ErrorCode::DataLoss, |
86 | 0 | Code::Unauthenticated => ErrorCode::Unauthenticated, |
87 | 0 | _ => ErrorCode::Unknown, |
88 | | } |
89 | 0 | } |
90 | | |
91 | | impl Retrier { |
92 | 16 | pub fn new(sleep_fn: SleepFn, jitter_fn: JitterFn, config: Retry) -> Self { |
93 | 16 | Retrier { |
94 | 16 | sleep_fn, |
95 | 16 | jitter_fn, |
96 | 16 | config, |
97 | 16 | } |
98 | 16 | } |
99 | | |
100 | | /// This should only return true if the error code should be interpreted as |
101 | | /// temporary. |
102 | 18 | fn should_retry(&self, code: Code) -> bool { |
103 | 18 | if code == Code::Ok { Branch (103:12): [True: 0, False: 18]
Branch (103:12): [Folded - Ignored]
|
104 | 0 | false |
105 | 18 | } else if let Some(retry_codes0 ) = &self.config.retry_on_errors { Branch (105:23): [True: 0, False: 18]
Branch (105:23): [Folded - Ignored]
|
106 | 0 | retry_codes.contains(&to_error_code(code)) |
107 | | } else { |
108 | 18 | match code { |
109 | | // TODO(SchahinRohani): Handle all cases properly so there is no need for a wildcard match |
110 | | // All cases where a retry should happen are commented out here and replaced with a wildcard match. |
111 | | Code::InvalidArgument |
112 | | | Code::FailedPrecondition |
113 | | | Code::OutOfRange |
114 | | | Code::Unimplemented |
115 | | | Code::NotFound |
116 | | | Code::AlreadyExists |
117 | | | Code::PermissionDenied |
118 | 0 | | Code::Unauthenticated => false, |
119 | | // Code::Cancelled |
120 | | // | Code::Unknown |
121 | | // | Code::DeadlineExceeded |
122 | | // | Code::ResourceExhausted |
123 | | // | Code::Aborted |
124 | | // | Code::Internal |
125 | | // | Code::Unavailable |
126 | | // | Code::DataLoss => true, |
127 | 18 | _ => true, |
128 | | } |
129 | | } |
130 | 18 | } |
131 | | |
132 | 20 | fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ { |
133 | 20 | ExponentialBackoff::new(Duration::from_millis(self.config.delay as u64)) |
134 | 20 | .map(|d| (self.jitter_fn)(d)16 ) |
135 | 20 | .take(self.config.max_retries) // Remember this is number of retries, so will run max_retries + 1. |
136 | 20 | } |
137 | | |
138 | | #[expect( |
139 | | clippy::manual_async_fn, |
140 | | reason = "making an `async fn` results in a potential compiler bug in seemingly unrelated \ |
141 | | code" |
142 | | )] |
143 | 20 | pub fn retry<'a, T: Send>( |
144 | 20 | &'a self, |
145 | 20 | operation: impl futures::stream::Stream<Item = RetryResult<T>> + Send + 'a, |
146 | 20 | ) -> impl Future<Output = Result<T, Error>> + Send + 'a { |
147 | 20 | async move { |
148 | 20 | let mut iter = self.get_retry_config(); |
149 | 20 | tokio::pin!(operation); |
150 | 20 | let mut attempt = 0; |
151 | | loop { |
152 | 36 | attempt += 1; |
153 | 36 | match operation.next().await { |
154 | | None => { |
155 | 0 | return Err(make_err!( |
156 | 0 | Code::Internal, |
157 | 0 | "Retry stream ended abruptly on attempt {attempt}", |
158 | 0 | )); |
159 | | } |
160 | 18 | Some(RetryResult::Ok(value)) => return Ok(value), |
161 | 0 | Some(RetryResult::Err(e)) => { |
162 | 0 | return Err(e.append(format!("On attempt {attempt}"))); |
163 | | } |
164 | 18 | Some(RetryResult::Retry(err)) => { |
165 | 18 | if !self.should_retry(err.code) { Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [Folded - Ignored]
Branch (165:28): [True: 0, False: 6]
Branch (165:28): [True: 0, False: 2]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 3]
Branch (165:28): [True: 0, False: 1]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 3]
Branch (165:28): [True: 0, False: 3]
Branch (165:28): [True: 0, False: 0]
Branch (165:28): [True: 0, False: 0]
|
166 | 0 | event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error"); |
167 | 0 | return Err(err); |
168 | 18 | } |
169 | 16 | (self.sleep_fn)( |
170 | 18 | iter.next() |
171 | 18 | .ok_or(err.append(format!("On attempt {attempt}")))?2 , |
172 | | ) |
173 | 16 | .await; |
174 | | } |
175 | | } |
176 | | } |
177 | 20 | } |
178 | 20 | } |
179 | | } |