/build/source/nativelink-util/src/retry.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::Duration; |
18 | | |
19 | | use futures::future::Future; |
20 | | use futures::stream::StreamExt; |
21 | | use nativelink_config::stores::{ErrorCode, Retry}; |
22 | | use nativelink_error::{make_err, Code, Error}; |
23 | | use tracing::{event, Level}; |
24 | | |
25 | | struct ExponentialBackoff { |
26 | | current: Duration, |
27 | | } |
28 | | |
29 | | impl ExponentialBackoff { |
30 | 20 | fn new(base: Duration) -> Self { |
31 | 20 | ExponentialBackoff { current: base } |
32 | 20 | } |
33 | | } |
34 | | |
35 | | impl Iterator for ExponentialBackoff { |
36 | | type Item = Duration; |
37 | | |
38 | 16 | fn next(&mut self) -> Option<Duration> { |
39 | 16 | self.current *= 2; |
40 | 16 | Some(self.current) |
41 | 16 | } |
42 | | } |
43 | | |
44 | | type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>; |
45 | | pub(crate) type JitterFn = Arc<dyn Fn(Duration) -> Duration + Send + Sync>; |
46 | | |
47 | | #[derive(PartialEq, Eq, Debug)] |
48 | | pub enum RetryResult<T> { |
49 | | Ok(T), |
50 | | Retry(Error), |
51 | | Err(Error), |
52 | | } |
53 | | |
54 | | /// Class used to retry a job with a sleep function in between each retry. |
55 | | #[derive(Clone)] |
56 | | pub struct Retrier { |
57 | | sleep_fn: SleepFn, |
58 | | jitter_fn: JitterFn, |
59 | | config: Retry, |
60 | | } |
61 | | |
62 | 0 | fn to_error_code(code: Code) -> ErrorCode { |
63 | 0 | match code { |
64 | 0 | Code::Cancelled => ErrorCode::Cancelled, |
65 | 0 | Code::InvalidArgument => ErrorCode::InvalidArgument, |
66 | 0 | Code::DeadlineExceeded => ErrorCode::DeadlineExceeded, |
67 | 0 | Code::NotFound => ErrorCode::NotFound, |
68 | 0 | Code::AlreadyExists => ErrorCode::AlreadyExists, |
69 | 0 | Code::PermissionDenied => ErrorCode::PermissionDenied, |
70 | 0 | Code::ResourceExhausted => ErrorCode::ResourceExhausted, |
71 | 0 | Code::FailedPrecondition => ErrorCode::FailedPrecondition, |
72 | 0 | Code::Aborted => ErrorCode::Aborted, |
73 | 0 | Code::OutOfRange => ErrorCode::OutOfRange, |
74 | 0 | Code::Unimplemented => ErrorCode::Unimplemented, |
75 | 0 | Code::Internal => ErrorCode::Internal, |
76 | 0 | Code::Unavailable => ErrorCode::Unavailable, |
77 | 0 | Code::DataLoss => ErrorCode::DataLoss, |
78 | 0 | Code::Unauthenticated => ErrorCode::Unauthenticated, |
79 | 0 | _ => ErrorCode::Unknown, |
80 | | } |
81 | 0 | } |
82 | | |
83 | | impl Retrier { |
84 | 16 | pub fn new(sleep_fn: SleepFn, jitter_fn: JitterFn, config: Retry) -> Self { |
85 | 16 | Retrier { |
86 | 16 | sleep_fn, |
87 | 16 | jitter_fn, |
88 | 16 | config, |
89 | 16 | } |
90 | 16 | } |
91 | | |
92 | | /// This should only return true if the error code should be interpreted as |
93 | | /// temporary. |
94 | 18 | fn should_retry(&self, code: Code) -> bool { |
95 | 18 | if code == Code::Ok { Branch (95:12): [True: 0, False: 18]
Branch (95:12): [Folded - Ignored]
|
96 | 0 | false |
97 | 18 | } else if let Some(retry_codes0 ) = &self.config.retry_on_errors { Branch (97:23): [True: 0, False: 18]
Branch (97:23): [Folded - Ignored]
|
98 | 0 | retry_codes.contains(&to_error_code(code)) |
99 | | } else { |
100 | 18 | match code { |
101 | | // TODO(SchahinRohani): Handle all cases properly so there is no need for a wildcard match |
102 | | // All cases where a retry should happen are commented out here and replaced with a wildcard match. |
103 | | Code::InvalidArgument |
104 | | | Code::FailedPrecondition |
105 | | | Code::OutOfRange |
106 | | | Code::Unimplemented |
107 | | | Code::NotFound |
108 | | | Code::AlreadyExists |
109 | | | Code::PermissionDenied |
110 | 0 | | Code::Unauthenticated => false, |
111 | | // Code::Cancelled |
112 | | // | Code::Unknown |
113 | | // | Code::DeadlineExceeded |
114 | | // | Code::ResourceExhausted |
115 | | // | Code::Aborted |
116 | | // | Code::Internal |
117 | | // | Code::Unavailable |
118 | | // | Code::DataLoss => true, |
119 | 18 | _ => true, |
120 | | } |
121 | | } |
122 | 18 | } |
123 | | |
124 | 20 | fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ { |
125 | 20 | ExponentialBackoff::new(Duration::from_millis(self.config.delay as u64)) |
126 | 20 | .map(|d| (self.jitter_fn)(d)16 ) |
127 | 20 | .take(self.config.max_retries) // Remember this is number of retries, so will run max_retries + 1. |
128 | 20 | } |
129 | | |
130 | | // Clippy complains that this function can be `async fn`, but this is not true. |
131 | | // If we use `async fn`, other places in our code will fail to compile stating |
132 | | // something about the async blocks not matching. |
133 | | // This appears to happen due to a compiler bug while inlining, because the |
134 | | // function that it complained about was calling another function that called |
135 | | // this one. |
136 | | #[allow(clippy::manual_async_fn)] |
137 | 20 | pub fn retry<'a, T: Send>( |
138 | 20 | &'a self, |
139 | 20 | operation: impl futures::stream::Stream<Item = RetryResult<T>> + Send + 'a, |
140 | 20 | ) -> impl Future<Output = Result<T, Error>> + Send + 'a { |
141 | 20 | async move { |
142 | 20 | let mut iter = self.get_retry_config(); |
143 | 20 | tokio::pin!(operation); |
144 | 20 | let mut attempt = 0; |
145 | | loop { |
146 | 36 | attempt += 1; |
147 | 36 | match operation.next().await { |
148 | | None => { |
149 | 0 | return Err(make_err!( |
150 | 0 | Code::Internal, |
151 | 0 | "Retry stream ended abruptly on attempt {attempt}", |
152 | 0 | )) |
153 | | } |
154 | 18 | Some(RetryResult::Ok(value)) => return Ok(value), |
155 | 0 | Some(RetryResult::Err(e)) => { |
156 | 0 | return Err(e.append(format!("On attempt {attempt}"))); |
157 | | } |
158 | 18 | Some(RetryResult::Retry(err)) => { |
159 | 18 | if !self.should_retry(err.code) { Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [Folded - Ignored]
Branch (159:28): [True: 0, False: 6]
Branch (159:28): [True: 0, False: 2]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 3]
Branch (159:28): [True: 0, False: 1]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 3]
Branch (159:28): [True: 0, False: 0]
Branch (159:28): [True: 0, False: 3]
Branch (159:28): [True: 0, False: 0]
|
160 | 0 | event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error"); |
161 | 0 | return Err(err); |
162 | 18 | } |
163 | 18 | (self.sleep_fn)( |
164 | 18 | iter.next() |
165 | 18 | .ok_or(err.append(format!("On attempt {attempt}")))?2 , |
166 | | ) |
167 | 16 | .await; |
168 | | } |
169 | | } |
170 | | } |
171 | 20 | } |
172 | 20 | } |
173 | | } |