Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker_registry.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::time::Duration;
16
use std::collections::HashMap;
17
use std::sync::Arc;
18
use std::time::SystemTime;
19
20
use async_lock::RwLock;
21
use nativelink_util::action_messages::WorkerId;
22
use tracing::{debug, trace};
23
24
/// In-memory worker registry that tracks worker liveness.
25
#[derive(Debug)]
26
pub struct WorkerRegistry {
27
    workers: RwLock<HashMap<WorkerId, SystemTime>>,
28
}
29
30
impl Default for WorkerRegistry {
31
0
    fn default() -> Self {
32
0
        Self::new()
33
0
    }
34
}
35
36
impl WorkerRegistry {
37
    /// Creates a new worker registry.
38
34
    pub fn new() -> Self {
39
34
        Self {
40
34
            workers: RwLock::new(HashMap::new()),
41
34
        }
42
34
    }
43
44
    /// Updates the heartbeat timestamp for a worker.
45
3
    pub async fn update_worker_heartbeat(&self, worker_id: &WorkerId, now: SystemTime) {
46
3
        let mut workers = self.workers.write().await;
47
3
        workers.insert(worker_id.clone(), now);
48
3
        trace!(?worker_id, "FLOW: Worker heartbeat updated in registry");
49
3
    }
50
51
39
    pub async fn register_worker(&self, worker_id: &WorkerId, now: SystemTime) {
52
39
        let mut workers = self.workers.write().await;
53
39
        workers.insert(worker_id.clone(), now);
54
39
        debug!(?worker_id, "FLOW: Worker registered in registry");
55
39
    }
56
57
7
    pub async fn remove_worker(&self, worker_id: &WorkerId) {
58
7
        let mut workers = self.workers.write().await;
59
7
        workers.remove(worker_id);
60
7
        debug!(?worker_id, "FLOW: Worker removed from registry");
61
7
    }
62
63
10
    pub async fn is_worker_alive(
64
10
        &self,
65
10
        worker_id: &WorkerId,
66
10
        timeout: Duration,
67
10
        now: SystemTime,
68
10
    ) -> bool {
69
10
        let workers = self.workers.read().await;
70
71
10
        if let Some(
last_seen8
) = workers.get(worker_id) {
  Branch (71:16): [True: 2, False: 0]
  Branch (71:16): [True: 4, False: 2]
  Branch (71:16): [True: 0, False: 0]
  Branch (71:16): [True: 2, False: 0]
72
8
            if let Some(deadline) = last_seen.checked_add(timeout) {
  Branch (72:20): [True: 2, False: 0]
  Branch (72:20): [True: 4, False: 0]
  Branch (72:20): [True: 0, False: 0]
  Branch (72:20): [True: 2, False: 0]
73
8
                let is_alive = deadline > now;
74
8
                trace!(
75
                    ?worker_id,
76
                    ?last_seen,
77
                    ?timeout,
78
                    is_alive,
79
8
                    "FLOW: Worker liveness check"
80
                );
81
8
                return is_alive;
82
0
            }
83
2
        }
84
85
2
        trace!(?worker_id, "FLOW: Worker not found or timed out");
86
2
        false
87
10
    }
88
89
0
    pub async fn get_worker_last_seen(&self, worker_id: &WorkerId) -> Option<SystemTime> {
90
0
        let workers = self.workers.read().await;
91
0
        workers.get(worker_id).copied()
92
0
    }
93
}
94
95
pub type SharedWorkerRegistry = Arc<WorkerRegistry>;
96
97
#[cfg(test)]
98
mod tests {
99
    use nativelink_macro::nativelink_test;
100
101
    use super::*;
102
103
    #[nativelink_test]
104
    async fn test_worker_heartbeat() {
105
        let registry = WorkerRegistry::new();
106
        let worker_id = WorkerId::from(String::from("test"));
107
        let now = SystemTime::now();
108
109
        // Worker not registered yet
110
        assert!(
111
            !registry
112
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
113
                .await
114
        );
115
116
        // Register worker
117
        registry.register_worker(&worker_id, now).await;
118
        assert!(
119
            registry
120
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
121
                .await
122
        );
123
124
        // Check with expired timeout
125
        let future = now.checked_add(Duration::from_secs(10)).unwrap();
126
        assert!(
127
            !registry
128
                .is_worker_alive(&worker_id, Duration::from_secs(5), future)
129
                .await
130
        );
131
132
        // Update heartbeat
133
        registry.update_worker_heartbeat(&worker_id, future).await;
134
        assert!(
135
            registry
136
                .is_worker_alive(&worker_id, Duration::from_secs(5), future)
137
                .await
138
        );
139
    }
140
141
    #[nativelink_test]
142
    async fn test_remove_worker() {
143
        let registry = WorkerRegistry::new();
144
        let worker_id = WorkerId::from(String::from("test-worker"));
145
        let now = SystemTime::now();
146
147
        registry.register_worker(&worker_id, now).await;
148
        assert!(
149
            registry
150
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
151
                .await
152
        );
153
154
        registry.remove_worker(&worker_id).await;
155
        assert!(
156
            !registry
157
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
158
                .await
159
        );
160
    }
161
}