Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/health_utils.rs
Line
Count
Source
1
// Copyright 2024 The Native Link Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::Debug;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::borrow::Cow;
19
use std::collections::HashMap;
20
use std::sync::Arc;
21
22
use async_trait::async_trait;
23
use futures::{Stream, StreamExt};
24
use parking_lot::Mutex;
25
use serde::Serialize;
26
use tokio::time::timeout;
27
use tracing::warn;
28
29
/// Struct name health indicator component.
30
type StructName = str;
31
/// Readable message status of the health indicator.
32
type Message = str;
33
34
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
35
pub enum HealthStatus {
36
    Ok {
37
        struct_name: &'static StructName,
38
        message: Cow<'static, Message>,
39
    },
40
    Initializing {
41
        struct_name: &'static StructName,
42
        message: Cow<'static, Message>,
43
    },
44
    /// This status is used to indicate a non-fatal issue with the component.
45
    Warning {
46
        struct_name: &'static StructName,
47
        message: Cow<'static, Message>,
48
    },
49
    Failed {
50
        struct_name: &'static StructName,
51
        message: Cow<'static, Message>,
52
    },
53
    Timeout {
54
        struct_name: &'static StructName,
55
    },
56
}
57
58
impl HealthStatus {
59
0
    pub fn new_ok(
60
0
        component: &(impl HealthStatusIndicator + ?Sized),
61
0
        message: Cow<'static, str>,
62
0
    ) -> Self {
63
0
        Self::Ok {
64
0
            struct_name: component.struct_name(),
65
0
            message,
66
0
        }
67
0
    }
68
69
0
    pub fn new_initializing(
70
0
        component: &(impl HealthStatusIndicator + ?Sized),
71
0
        message: Cow<'static, str>,
72
0
    ) -> Self {
73
0
        Self::Initializing {
74
0
            struct_name: component.struct_name(),
75
0
            message,
76
0
        }
77
0
    }
78
79
0
    pub fn new_warning(
80
0
        component: &(impl HealthStatusIndicator + ?Sized),
81
0
        message: Cow<'static, str>,
82
0
    ) -> Self {
83
0
        Self::Warning {
84
0
            struct_name: component.struct_name(),
85
0
            message,
86
0
        }
87
0
    }
88
89
1
    pub fn new_failed(
90
1
        component: &(impl HealthStatusIndicator + ?Sized),
91
1
        message: Cow<'static, str>,
92
1
    ) -> Self {
93
1
        Self::Failed {
94
1
            struct_name: component.struct_name(),
95
1
            message,
96
1
        }
97
1
    }
98
}
99
100
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)]
101
pub struct HealthStatusDescription {
102
    pub namespace: Cow<'static, str>,
103
    pub status: HealthStatus,
104
}
105
106
/// Health status indicator trait. This trait is used to define
107
/// a health status indicator by implementing the `check_health` function.
108
/// A default implementation is provided for the `check_health` function
109
/// that returns healthy component.
110
#[async_trait]
111
pub trait HealthStatusIndicator: Sync + Send + Unpin {
112
    fn get_name(&self) -> &'static str;
113
114
    /// Returns the name of the struct implementing the trait.
115
1
    fn struct_name(&self) -> &'static str {
116
1
        core::any::type_name::<Self>()
117
1
    }
118
119
    /// Check the health status of the component. This function should be
120
    /// implemented by the component to check the health status of the component.
121
    async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus;
122
}
123
124
type HealthRegistryBuilderState =
125
    Arc<Mutex<HashMap<Cow<'static, str>, Arc<dyn HealthStatusIndicator>>>>;
126
127
pub struct HealthRegistryBuilder {
128
    namespace: Cow<'static, str>,
129
    state: HealthRegistryBuilderState,
130
}
131
132
impl Debug for HealthRegistryBuilder {
133
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
134
0
        f.debug_struct("HealthRegistryBuilder")
135
0
            .field("namespace", &self.namespace)
136
0
            .finish_non_exhaustive()
137
0
    }
138
}
139
140
/// Health registry builder that is used to build a health registry.
141
/// The builder provides creation, registering of health status indicators,
142
/// sub-building scoped health registries and building the health registry.
143
/// `build()` should be called once for finalizing the production of a health registry.
144
impl HealthRegistryBuilder {
145
8
    pub fn new(namespace: &str) -> Self {
146
8
        Self {
147
8
            namespace: format!("/{namespace}").into(),
148
8
            state: Arc::new(Mutex::new(HashMap::new())),
149
8
        }
150
8
    }
151
152
    /// Register a health status indicator at current namespace.
153
11
    pub fn register_indicator(&mut self, indicator: Arc<dyn HealthStatusIndicator>) {
154
11
        let name = format!("{}/{}", self.namespace, indicator.get_name());
155
11
        self.state.lock().insert(name.into(), indicator);
156
11
    }
157
158
    /// Create a sub builder for a namespace.
159
    #[must_use]
160
4
    pub fn sub_builder(&mut self, namespace: &str) -> Self {
161
4
        Self {
162
4
            namespace: format!("{}/{}", self.namespace, namespace).into(),
163
4
            state: self.state.clone(),
164
4
        }
165
4
    }
166
167
    /// Finalize the production of the health registry.
168
8
    pub fn build(&mut self) -> HealthRegistry {
169
8
        HealthRegistry {
170
8
            indicators: self.state.lock().clone().into_iter().collect(),
171
8
        }
172
8
    }
173
}
174
175
#[derive(Default, Clone)]
176
pub struct HealthRegistry {
177
    indicators: Vec<(Cow<'static, str>, Arc<dyn HealthStatusIndicator>)>,
178
}
179
180
impl Debug for HealthRegistry {
181
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
182
0
        f.debug_struct("HealthRegistry")
183
0
            .field(
184
0
                "indicators",
185
0
                &self
186
0
                    .indicators
187
0
                    .iter()
188
0
                    .map(|(name, _)| name)
189
0
                    .collect::<Vec<_>>(),
190
            )
191
0
            .finish()
192
0
    }
193
}
194
195
pub trait HealthStatusReporter {
196
    fn health_status_report(
197
        &self,
198
        timeout: &Duration,
199
    ) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + Send + '_>>;
200
}
201
202
/// Health status reporter implementation for the health registry that provides a stream
203
/// of health status descriptions.
204
impl HealthStatusReporter for HealthRegistry {
205
8
    fn health_status_report(
206
8
        &self,
207
8
        timeout_limit: &Duration,
208
8
    ) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + Send + '_>> {
209
8
        let local_timeout_limit = Arc::new(*timeout_limit);
210
8
        Box::pin(
211
8
            futures::stream::iter(
212
8
                self.indicators
213
8
                    .iter()
214
8
                    .zip(core::iter::repeat(local_timeout_limit)),
215
            )
216
11
            .
then8
(|((namespace, indicator), internal_timeout)| async move {
217
11
                let status_res =
218
11
                    timeout(*internal_timeout, indicator.check_health(namespace.clone())).await;
219
                HealthStatusDescription {
220
11
                    namespace: namespace.clone(),
221
11
                    status: status_res.unwrap_or_else(|_| 
{1
222
1
                        let struct_name = indicator.struct_name();
223
1
                        warn!(struct_name, "Timeout during health check");
224
1
                        HealthStatus::Timeout { struct_name }
225
1
                    }),
226
                }
227
22
            }),
228
        )
229
8
    }
230
}
231
232
/// Default health status indicator implementation for a component.
233
/// Generally used for components that don't need custom implementations
234
/// of the `check_health` function.
235
#[macro_export]
236
macro_rules! default_health_status_indicator {
237
    ($type:ty) => {
238
        #[async_trait::async_trait]
239
        impl HealthStatusIndicator for $type {
240
0
            fn get_name(&self) -> &'static str {
241
0
                stringify!($type)
242
0
            }
243
244
0
            async fn check_health(
245
                &self,
246
                namespace: std::borrow::Cow<'static, str>,
247
            ) -> nativelink_util::health_utils::HealthStatus {
248
                StoreDriver::check_health(Pin::new(self), namespace).await
249
0
            }
250
        }
251
    };
252
}
253
254
// Re-scoped for the health_utils module.
255
pub use crate::default_health_status_indicator;