Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-util/src/retry.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::Duration;
18
19
use futures::future::Future;
20
use futures::stream::StreamExt;
21
use nativelink_config::stores::{ErrorCode, Retry};
22
use nativelink_error::{make_err, Code, Error};
23
use tracing::{event, Level};
24
25
struct ExponentialBackoff {
26
    current: Duration,
27
}
28
29
impl ExponentialBackoff {
30
20
    fn new(base: Duration) -> Self {
31
20
        ExponentialBackoff { current: base }
32
20
    }
33
}
34
35
impl Iterator for ExponentialBackoff {
36
    type Item = Duration;
37
38
16
    fn next(&mut self) -> Option<Duration> {
39
16
        self.current *= 2;
40
16
        Some(self.current)
41
16
    }
42
}
43
44
type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>;
45
pub(crate) type JitterFn = Arc<dyn Fn(Duration) -> Duration + Send + Sync>;
46
47
#[derive(PartialEq, Eq, Debug)]
48
pub enum RetryResult<T> {
49
    Ok(T),
50
    Retry(Error),
51
    Err(Error),
52
}
53
54
/// Class used to retry a job with a sleep function in between each retry.
55
#[derive(Clone)]
56
pub struct Retrier {
57
    sleep_fn: SleepFn,
58
    jitter_fn: JitterFn,
59
    config: Retry,
60
}
61
62
0
fn to_error_code(code: Code) -> ErrorCode {
63
0
    match code {
64
0
        Code::Cancelled => ErrorCode::Cancelled,
65
0
        Code::Unknown => ErrorCode::Unknown,
66
0
        Code::InvalidArgument => ErrorCode::InvalidArgument,
67
0
        Code::DeadlineExceeded => ErrorCode::DeadlineExceeded,
68
0
        Code::NotFound => ErrorCode::NotFound,
69
0
        Code::AlreadyExists => ErrorCode::AlreadyExists,
70
0
        Code::PermissionDenied => ErrorCode::PermissionDenied,
71
0
        Code::ResourceExhausted => ErrorCode::ResourceExhausted,
72
0
        Code::FailedPrecondition => ErrorCode::FailedPrecondition,
73
0
        Code::Aborted => ErrorCode::Aborted,
74
0
        Code::OutOfRange => ErrorCode::OutOfRange,
75
0
        Code::Unimplemented => ErrorCode::Unimplemented,
76
0
        Code::Internal => ErrorCode::Internal,
77
0
        Code::Unavailable => ErrorCode::Unavailable,
78
0
        Code::DataLoss => ErrorCode::DataLoss,
79
0
        Code::Unauthenticated => ErrorCode::Unauthenticated,
80
0
        _ => ErrorCode::Unknown,
81
    }
82
0
}
83
84
impl Retrier {
85
16
    pub fn new(sleep_fn: SleepFn, jitter_fn: JitterFn, config: Retry) -> Self {
86
16
        Retrier {
87
16
            sleep_fn,
88
16
            jitter_fn,
89
16
            config,
90
16
        }
91
16
    }
92
93
    /// This should only return true if the error code should be interpreted as
94
    /// temporary.
95
18
    fn should_retry(&self, code: Code) -> bool {
96
18
        if code == Code::Ok {
  Branch (96:12): [True: 0, False: 18]
  Branch (96:12): [Folded - Ignored]
97
0
            false
98
18
        } else if let Some(
retry_codes0
) = &self.config.retry_on_errors {
  Branch (98:23): [True: 0, False: 18]
  Branch (98:23): [Folded - Ignored]
99
0
            retry_codes.contains(&to_error_code(code))
100
        } else {
101
18
            match code {
102
0
                Code::InvalidArgument => false,
103
0
                Code::FailedPrecondition => false,
104
0
                Code::OutOfRange => false,
105
0
                Code::Unimplemented => false,
106
0
                Code::NotFound => false,
107
0
                Code::AlreadyExists => false,
108
0
                Code::PermissionDenied => false,
109
0
                Code::Unauthenticated => false,
110
0
                Code::Cancelled => true,
111
0
                Code::Unknown => true,
112
0
                Code::DeadlineExceeded => true,
113
0
                Code::ResourceExhausted => true,
114
0
                Code::Aborted => true,
115
0
                Code::Internal => true,
116
18
                Code::Unavailable => true,
117
0
                Code::DataLoss => true,
118
0
                _ => true,
119
            }
120
        }
121
18
    }
122
123
20
    fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ {
124
20
        ExponentialBackoff::new(Duration::from_millis(self.config.delay as u64))
125
20
            .map(|d| 
(self.jitter_fn)(d)16
)
126
20
            .take(self.config.max_retries) // Remember this is number of retries, so will run max_retries + 1.
127
20
    }
128
129
    // Clippy complains that this function can be `async fn`, but this is not true.
130
    // If we use `async fn`, other places in our code will fail to compile stating
131
    // something about the async blocks not matching.
132
    // This appears to happen due to a compiler bug while inlining, because the
133
    // function that it complained about was calling another function that called
134
    // this one.
135
    #[allow(clippy::manual_async_fn)]
136
20
    pub fn retry<'a, T: Send>(
137
20
        &'a self,
138
20
        operation: impl futures::stream::Stream<Item = RetryResult<T>> + Send + 'a,
139
20
    ) -> impl Future<Output = Result<T, Error>> + Send + 'a {
140
20
        async move {
141
20
            let mut iter = self.get_retry_config();
142
20
            tokio::pin!(operation);
143
20
            let mut attempt = 0;
144
            loop {
145
36
                attempt += 1;
146
53
                match operation.next().await {
147
                    None => {
148
0
                        return Err(make_err!(
149
0
                            Code::Internal,
150
0
                            "Retry stream ended abruptly on attempt {attempt}",
151
0
                        ))
152
                    }
153
18
                    Some(RetryResult::Ok(value)) => return Ok(value),
154
0
                    Some(RetryResult::Err(e)) => {
155
0
                        return Err(e.append(format!("On attempt {attempt}")));
156
                    }
157
18
                    Some(RetryResult::Retry(err)) => {
158
18
                        if !self.should_retry(err.code) {
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [Folded - Ignored]
  Branch (158:28): [True: 0, False: 6]
  Branch (158:28): [True: 0, False: 2]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 3]
  Branch (158:28): [True: 0, False: 1]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 3]
  Branch (158:28): [True: 0, False: 3]
  Branch (158:28): [True: 0, False: 0]
  Branch (158:28): [True: 0, False: 0]
159
0
                            event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error");
160
0
                            return Err(err);
161
18
                        }
162
18
                        (self.sleep_fn)(
163
18
                            iter.next()
164
18
                                .ok_or(err.append(format!("On attempt {attempt}")))
?2
,
165
                        )
166
6
                        .await;
167
                    }
168
                }
169
            }
170
20
        }
171
20
    }
172
}