Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-service/src/health_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::convert::Infallible;
16
use std::future::Future;
17
use std::pin::Pin;
18
use std::sync::Arc;
19
use std::task::{Context, Poll};
20
21
use axum::body::Body;
22
use bytes::Bytes;
23
use futures::StreamExt;
24
use http_body_util::Full;
25
use hyper::header::{HeaderValue, CONTENT_TYPE};
26
use hyper::{Request, Response, StatusCode};
27
use nativelink_util::health_utils::{
28
    HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter,
29
};
30
use nativelink_util::origin_context::OriginContext;
31
use tower::Service;
32
use tracing::error_span;
33
34
/// Content type header value for JSON.
35
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";
36
37
#[derive(Clone)]
38
pub struct HealthServer {
39
    health_registry: HealthRegistry,
40
}
41
42
impl HealthServer {
43
0
    pub fn new(health_registry: HealthRegistry) -> Self {
44
0
        Self { health_registry }
45
0
    }
46
}
47
48
impl Service<Request<Body>> for HealthServer {
49
    type Response = Response<Full<Bytes>>;
50
    type Error = Infallible;
51
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
52
53
0
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54
0
        Poll::Ready(Ok(()))
55
0
    }
56
57
0
    fn call(&mut self, _req: Request<Body>) -> Self::Future {
58
0
        let health_registry = self.health_registry.clone();
59
0
        Box::pin(Arc::new(OriginContext::new()).wrap_async(
60
0
            error_span!("health_server_call"),
61
0
            async move {
62
0
                let health_status_descriptions: Vec<HealthStatusDescription> =
63
0
                    health_registry.health_status_report().collect().await;
64
65
0
                match serde_json5::to_string(&health_status_descriptions) {
66
0
                    Ok(body) => {
67
0
                        let contains_failed_report =
68
0
                            health_status_descriptions.iter().any(|description| {
69
0
                                matches!(description.status, HealthStatus::Failed { .. })
70
0
                            });
71
0
                        let status_code = if contains_failed_report {
  Branch (71:46): [True: 0, False: 0]
  Branch (71:46): [Folded - Ignored]
72
0
                            StatusCode::SERVICE_UNAVAILABLE
73
                        } else {
74
0
                            StatusCode::OK
75
                        };
76
77
0
                        Ok(Response::builder()
78
0
                            .status(status_code)
79
0
                            .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE))
80
0
                            .body(Full::new(Bytes::from(body)))
81
0
                            .unwrap())
82
                    }
83
84
0
                    Err(e) => Ok(Response::builder()
85
0
                        .status(StatusCode::INTERNAL_SERVER_ERROR)
86
0
                        .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE))
87
0
                        .body(Full::new(Bytes::from(format!("Internal Failure: {e:?}"))))
88
0
                        .unwrap()),
89
                }
90
0
            },
91
0
        ))
92
0
    }
93
}