Coverage Report

Created: 2024-12-03 16:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/retry.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::Duration;
18
19
use futures::future::Future;
20
use futures::stream::StreamExt;
21
use nativelink_config::stores::{ErrorCode, Retry};
22
use nativelink_error::{make_err, Code, Error};
23
use tracing::{event, Level};
24
25
struct ExponentialBackoff {
26
    current: Duration,
27
}
28
29
impl ExponentialBackoff {
30
20
    fn new(base: Duration) -> Self {
31
20
        ExponentialBackoff { current: base }
32
20
    }
33
}
34
35
impl Iterator for ExponentialBackoff {
36
    type Item = Duration;
37
38
16
    fn next(&mut self) -> Option<Duration> {
39
16
        self.current *= 2;
40
16
        Some(self.current)
41
16
    }
42
}
43
44
type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>;
45
pub(crate) type JitterFn = Arc<dyn Fn(Duration) -> Duration + Send + Sync>;
46
47
#[derive(PartialEq, Eq, Debug)]
48
pub enum RetryResult<T> {
49
    Ok(T),
50
    Retry(Error),
51
    Err(Error),
52
}
53
54
/// Class used to retry a job with a sleep function in between each retry.
55
#[derive(Clone)]
56
pub struct Retrier {
57
    sleep_fn: SleepFn,
58
    jitter_fn: JitterFn,
59
    config: Retry,
60
}
61
62
0
fn to_error_code(code: Code) -> ErrorCode {
63
0
    match code {
64
0
        Code::Cancelled => ErrorCode::Cancelled,
65
0
        Code::InvalidArgument => ErrorCode::InvalidArgument,
66
0
        Code::DeadlineExceeded => ErrorCode::DeadlineExceeded,
67
0
        Code::NotFound => ErrorCode::NotFound,
68
0
        Code::AlreadyExists => ErrorCode::AlreadyExists,
69
0
        Code::PermissionDenied => ErrorCode::PermissionDenied,
70
0
        Code::ResourceExhausted => ErrorCode::ResourceExhausted,
71
0
        Code::FailedPrecondition => ErrorCode::FailedPrecondition,
72
0
        Code::Aborted => ErrorCode::Aborted,
73
0
        Code::OutOfRange => ErrorCode::OutOfRange,
74
0
        Code::Unimplemented => ErrorCode::Unimplemented,
75
0
        Code::Internal => ErrorCode::Internal,
76
0
        Code::Unavailable => ErrorCode::Unavailable,
77
0
        Code::DataLoss => ErrorCode::DataLoss,
78
0
        Code::Unauthenticated => ErrorCode::Unauthenticated,
79
0
        _ => ErrorCode::Unknown,
80
    }
81
0
}
82
83
impl Retrier {
84
16
    pub fn new(sleep_fn: SleepFn, jitter_fn: JitterFn, config: Retry) -> Self {
85
16
        Retrier {
86
16
            sleep_fn,
87
16
            jitter_fn,
88
16
            config,
89
16
        }
90
16
    }
91
92
    /// This should only return true if the error code should be interpreted as
93
    /// temporary.
94
18
    fn should_retry(&self, code: Code) -> bool {
95
18
        if code == Code::Ok {
  Branch (95:12): [True: 0, False: 18]
  Branch (95:12): [Folded - Ignored]
96
0
            false
97
18
        } else if let Some(
retry_codes0
) = &self.config.retry_on_errors {
  Branch (97:23): [True: 0, False: 18]
  Branch (97:23): [Folded - Ignored]
98
0
            retry_codes.contains(&to_error_code(code))
99
        } else {
100
18
            match code {
101
                // TODO(SchahinRohani): Handle all cases properly so there is no need for a wildcard match
102
                // All cases where a retry should happen are commented out here and replaced with a wildcard match.
103
                Code::InvalidArgument
104
                | Code::FailedPrecondition
105
                | Code::OutOfRange
106
                | Code::Unimplemented
107
                | Code::NotFound
108
                | Code::AlreadyExists
109
                | Code::PermissionDenied
110
0
                | Code::Unauthenticated => false,
111
                // Code::Cancelled
112
                // | Code::Unknown
113
                // | Code::DeadlineExceeded
114
                // | Code::ResourceExhausted
115
                // | Code::Aborted
116
                // | Code::Internal
117
                // | Code::Unavailable
118
                // | Code::DataLoss => true,
119
18
                _ => true,
120
            }
121
        }
122
18
    }
123
124
20
    fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ {
125
20
        ExponentialBackoff::new(Duration::from_millis(self.config.delay as u64))
126
20
            .map(|d| 
(self.jitter_fn)(d)16
)
127
20
            .take(self.config.max_retries) // Remember this is number of retries, so will run max_retries + 1.
128
20
    }
129
130
    // Clippy complains that this function can be `async fn`, but this is not true.
131
    // If we use `async fn`, other places in our code will fail to compile stating
132
    // something about the async blocks not matching.
133
    // This appears to happen due to a compiler bug while inlining, because the
134
    // function that it complained about was calling another function that called
135
    // this one.
136
    #[allow(clippy::manual_async_fn)]
137
20
    pub fn retry<'a, T: Send>(
138
20
        &'a self,
139
20
        operation: impl futures::stream::Stream<Item = RetryResult<T>> + Send + 'a,
140
20
    ) -> impl Future<Output = Result<T, Error>> + Send + 'a {
141
20
        async move {
142
20
            let mut iter = self.get_retry_config();
143
20
            tokio::pin!(operation);
144
20
            let mut attempt = 0;
145
            loop {
146
36
                attempt += 1;
147
36
                match operation.next().await {
148
                    None => {
149
0
                        return Err(make_err!(
150
0
                            Code::Internal,
151
0
                            "Retry stream ended abruptly on attempt {attempt}",
152
0
                        ))
153
                    }
154
18
                    Some(RetryResult::Ok(value)) => return Ok(value),
155
0
                    Some(RetryResult::Err(e)) => {
156
0
                        return Err(e.append(format!("On attempt {attempt}")));
157
                    }
158
18
                    Some(RetryResult::Retry(err)) => {
159
18
                        if !self.should_retry(err.code) {
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [Folded - Ignored]
  Branch (159:28): [True: 0, False: 6]
  Branch (159:28): [True: 0, False: 2]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 3]
  Branch (159:28): [True: 0, False: 1]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 3]
  Branch (159:28): [True: 0, False: 0]
  Branch (159:28): [True: 0, False: 3]
  Branch (159:28): [True: 0, False: 0]
160
0
                            event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error");
161
0
                            return Err(err);
162
18
                        }
163
18
                        (self.sleep_fn)(
164
18
                            iter.next()
165
18
                                .ok_or(err.append(format!("On attempt {attempt}")))
?2
,
166
                        )
167
16
                        .await;
168
                    }
169
                }
170
            }
171
20
        }
172
20
    }
173
}