Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/retry.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::Duration;
18
19
use futures::future::Future;
20
use futures::stream::StreamExt;
21
use nativelink_config::stores::{ErrorCode, Retry};
22
use nativelink_error::{Code, Error, make_err};
23
use tracing::{Level, event};
24
25
struct ExponentialBackoff {
26
    current: Duration,
27
}
28
29
impl ExponentialBackoff {
30
20
    const fn new(base: Duration) -> Self {
31
20
        ExponentialBackoff { current: base }
32
20
    }
33
}
34
35
impl Iterator for ExponentialBackoff {
36
    type Item = Duration;
37
38
16
    fn next(&mut self) -> Option<Duration> {
39
16
        self.current *= 2;
40
16
        Some(self.current)
41
16
    }
42
}
43
44
type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>;
45
pub(crate) type JitterFn = Arc<dyn Fn(Duration) -> Duration + Send + Sync>;
46
47
#[derive(PartialEq, Eq, Debug)]
48
pub enum RetryResult<T> {
49
    Ok(T),
50
    Retry(Error),
51
    Err(Error),
52
}
53
54
/// Class used to retry a job with a sleep function in between each retry.
55
#[derive(Clone)]
56
pub struct Retrier {
57
    sleep_fn: SleepFn,
58
    jitter_fn: JitterFn,
59
    config: Retry,
60
}
61
62
impl std::fmt::Debug for Retrier {
63
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64
0
        f.debug_struct("Retrier")
65
0
            .field("config", &self.config)
66
0
            .finish_non_exhaustive()
67
0
    }
68
}
69
70
0
const fn to_error_code(code: Code) -> ErrorCode {
71
0
    match code {
72
0
        Code::Cancelled => ErrorCode::Cancelled,
73
0
        Code::InvalidArgument => ErrorCode::InvalidArgument,
74
0
        Code::DeadlineExceeded => ErrorCode::DeadlineExceeded,
75
0
        Code::NotFound => ErrorCode::NotFound,
76
0
        Code::AlreadyExists => ErrorCode::AlreadyExists,
77
0
        Code::PermissionDenied => ErrorCode::PermissionDenied,
78
0
        Code::ResourceExhausted => ErrorCode::ResourceExhausted,
79
0
        Code::FailedPrecondition => ErrorCode::FailedPrecondition,
80
0
        Code::Aborted => ErrorCode::Aborted,
81
0
        Code::OutOfRange => ErrorCode::OutOfRange,
82
0
        Code::Unimplemented => ErrorCode::Unimplemented,
83
0
        Code::Internal => ErrorCode::Internal,
84
0
        Code::Unavailable => ErrorCode::Unavailable,
85
0
        Code::DataLoss => ErrorCode::DataLoss,
86
0
        Code::Unauthenticated => ErrorCode::Unauthenticated,
87
0
        _ => ErrorCode::Unknown,
88
    }
89
0
}
90
91
impl Retrier {
92
16
    pub fn new(sleep_fn: SleepFn, jitter_fn: JitterFn, config: Retry) -> Self {
93
16
        Retrier {
94
16
            sleep_fn,
95
16
            jitter_fn,
96
16
            config,
97
16
        }
98
16
    }
99
100
    /// This should only return true if the error code should be interpreted as
101
    /// temporary.
102
18
    fn should_retry(&self, code: Code) -> bool {
103
18
        if code == Code::Ok {
  Branch (103:12): [True: 0, False: 18]
  Branch (103:12): [Folded - Ignored]
104
0
            false
105
18
        } else if let Some(
retry_codes0
) = &self.config.retry_on_errors {
  Branch (105:23): [True: 0, False: 18]
  Branch (105:23): [Folded - Ignored]
106
0
            retry_codes.contains(&to_error_code(code))
107
        } else {
108
18
            match code {
109
                // TODO(SchahinRohani): Handle all cases properly so there is no need for a wildcard match
110
                // All cases where a retry should happen are commented out here and replaced with a wildcard match.
111
                Code::InvalidArgument
112
                | Code::FailedPrecondition
113
                | Code::OutOfRange
114
                | Code::Unimplemented
115
                | Code::NotFound
116
                | Code::AlreadyExists
117
                | Code::PermissionDenied
118
0
                | Code::Unauthenticated => false,
119
                // Code::Cancelled
120
                // | Code::Unknown
121
                // | Code::DeadlineExceeded
122
                // | Code::ResourceExhausted
123
                // | Code::Aborted
124
                // | Code::Internal
125
                // | Code::Unavailable
126
                // | Code::DataLoss => true,
127
18
                _ => true,
128
            }
129
        }
130
18
    }
131
132
20
    fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ {
133
20
        ExponentialBackoff::new(Duration::from_millis(self.config.delay as u64))
134
20
            .map(|d| 
(self.jitter_fn)(d)16
)
135
20
            .take(self.config.max_retries) // Remember this is number of retries, so will run max_retries + 1.
136
20
    }
137
138
    #[expect(
139
        clippy::manual_async_fn,
140
        reason = "making an `async fn` results in a potential compiler bug in seemingly unrelated \
141
            code"
142
    )]
143
20
    pub fn retry<'a, T: Send>(
144
20
        &'a self,
145
20
        operation: impl futures::stream::Stream<Item = RetryResult<T>> + Send + 'a,
146
20
    ) -> impl Future<Output = Result<T, Error>> + Send + 'a {
147
20
        async move {
148
20
            let mut iter = self.get_retry_config();
149
20
            tokio::pin!(operation);
150
20
            let mut attempt = 0;
151
            loop {
152
36
                attempt += 1;
153
36
                match operation.next().await {
154
                    None => {
155
0
                        return Err(make_err!(
156
0
                            Code::Internal,
157
0
                            "Retry stream ended abruptly on attempt {attempt}",
158
0
                        ));
159
                    }
160
18
                    Some(RetryResult::Ok(value)) => return Ok(value),
161
0
                    Some(RetryResult::Err(e)) => {
162
0
                        return Err(e.append(format!("On attempt {attempt}")));
163
                    }
164
18
                    Some(RetryResult::Retry(err)) => {
165
18
                        if !self.should_retry(err.code) {
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [Folded - Ignored]
  Branch (165:28): [True: 0, False: 6]
  Branch (165:28): [True: 0, False: 2]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 3]
  Branch (165:28): [True: 0, False: 1]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 3]
  Branch (165:28): [True: 0, False: 3]
  Branch (165:28): [True: 0, False: 0]
  Branch (165:28): [True: 0, False: 0]
166
0
                            event!(Level::ERROR, ?attempt, ?err, "Not retrying permanent error");
167
0
                            return Err(err);
168
18
                        }
169
16
                        (self.sleep_fn)(
170
18
                            iter.next()
171
18
                                .ok_or(err.append(format!("On attempt {attempt}")))
?2
,
172
                        )
173
16
                        .await;
174
                    }
175
                }
176
            }
177
20
        }
178
20
    }
179
}