Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker_registry.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::time::Duration;
16
use std::collections::HashMap;
17
use std::sync::Arc;
18
use std::time::SystemTime;
19
20
use async_lock::RwLock;
21
use nativelink_util::action_messages::WorkerId;
22
use tracing::{debug, trace};
23
24
/// In-memory worker registry that tracks worker liveness.
25
#[derive(Debug)]
26
pub struct WorkerRegistry {
27
    workers: RwLock<HashMap<WorkerId, SystemTime>>,
28
}
29
30
impl Default for WorkerRegistry {
31
0
    fn default() -> Self {
32
0
        Self::new()
33
0
    }
34
}
35
36
impl WorkerRegistry {
37
    /// Creates a new worker registry.
38
32
    pub fn new() -> Self {
39
32
        Self {
40
32
            workers: RwLock::new(HashMap::new()),
41
32
        }
42
32
    }
43
44
    /// Updates the heartbeat timestamp for a worker.
45
3
    pub async fn update_worker_heartbeat(&self, worker_id: &WorkerId, now: SystemTime) {
46
3
        let mut workers = self.workers.write().await;
47
3
        workers.insert(worker_id.clone(), now);
48
3
        trace!(?worker_id, 
"FLOW: Worker heartbeat updated in registry"2
);
49
3
    }
50
51
37
    pub async fn register_worker(&self, worker_id: &WorkerId, now: SystemTime) {
52
37
        let mut workers = self.workers.write().await;
53
37
        workers.insert(worker_id.clone(), now);
54
37
        debug!(?worker_id, 
"FLOW: Worker registered in registry"35
);
55
37
    }
56
57
7
    pub async fn remove_worker(&self, worker_id: &WorkerId) {
58
7
        let mut workers = self.workers.write().await;
59
7
        workers.remove(worker_id);
60
7
        debug!(?worker_id, 
"FLOW: Worker removed from registry"6
);
61
7
    }
62
63
10
    pub async fn is_worker_alive(
64
10
        &self,
65
10
        worker_id: &WorkerId,
66
10
        timeout: Duration,
67
10
        now: SystemTime,
68
10
    ) -> bool {
69
10
        let workers = self.workers.read().await;
70
71
10
        if let Some(
last_seen8
) = workers.get(worker_id) {
  Branch (71:16): [True: 2, False: 0]
  Branch (71:16): [True: 4, False: 2]
  Branch (71:16): [True: 0, False: 0]
  Branch (71:16): [True: 2, False: 0]
72
8
            if let Some(deadline) = last_seen.checked_add(timeout) {
  Branch (72:20): [True: 2, False: 0]
  Branch (72:20): [True: 4, False: 0]
  Branch (72:20): [True: 0, False: 0]
  Branch (72:20): [True: 2, False: 0]
73
8
                let is_alive = deadline > now;
74
8
                trace!(
75
                    ?worker_id,
76
                    ?last_seen,
77
                    ?timeout,
78
                    is_alive,
79
4
                    "FLOW: Worker liveness check"
80
                );
81
8
                return is_alive;
82
0
            }
83
2
        }
84
85
2
        trace!(?worker_id, 
"FLOW: Worker not found or timed out"0
);
86
2
        false
87
10
    }
88
89
0
    pub async fn get_worker_last_seen(&self, worker_id: &WorkerId) -> Option<SystemTime> {
90
0
        let workers = self.workers.read().await;
91
0
        workers.get(worker_id).copied()
92
0
    }
93
}
94
95
pub type SharedWorkerRegistry = Arc<WorkerRegistry>;
96
97
#[cfg(test)]
98
mod tests {
99
    use super::*;
100
101
    #[tokio::test]
102
1
    async fn test_worker_heartbeat() {
103
1
        let registry = WorkerRegistry::new();
104
1
        let worker_id = WorkerId::from(String::from("test"));
105
1
        let now = SystemTime::now();
106
107
        // Worker not registered yet
108
1
        assert!(
109
1
            !registry
110
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
111
1
                .await
112
        );
113
114
        // Register worker
115
1
        registry.register_worker(&worker_id, now).await;
116
1
        assert!(
117
1
            registry
118
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
119
1
                .await
120
        );
121
122
        // Check with expired timeout
123
1
        let future = now.checked_add(Duration::from_secs(10)).unwrap();
124
1
        assert!(
125
1
            !registry
126
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), future)
127
1
                .await
128
        );
129
130
        // Update heartbeat
131
1
        registry.update_worker_heartbeat(&worker_id, future).await;
132
1
        assert!(
133
1
            registry
134
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), future)
135
1
                .await
136
1
        );
137
1
    }
138
139
    #[tokio::test]
140
1
    async fn test_remove_worker() {
141
1
        let registry = WorkerRegistry::new();
142
1
        let worker_id = WorkerId::from(String::from("test-worker"));
143
1
        let now = SystemTime::now();
144
145
1
        registry.register_worker(&worker_id, now).await;
146
1
        assert!(
147
1
            registry
148
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
149
1
                .await
150
        );
151
152
1
        registry.remove_worker(&worker_id).await;
153
1
        assert!(
154
1
            !registry
155
1
                .is_worker_alive(&worker_id, Duration::from_secs(5), now)
156
1
                .await
157
1
        );
158
1
    }
159
}