/build/source/nativelink-scheduler/src/worker_registry.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::time::Duration; |
16 | | use std::collections::HashMap; |
17 | | use std::sync::Arc; |
18 | | use std::time::SystemTime; |
19 | | |
20 | | use async_lock::RwLock; |
21 | | use nativelink_util::action_messages::WorkerId; |
22 | | use tracing::{debug, trace}; |
23 | | |
24 | | /// In-memory worker registry that tracks worker liveness. |
25 | | #[derive(Debug)] |
26 | | pub struct WorkerRegistry { |
27 | | workers: RwLock<HashMap<WorkerId, SystemTime>>, |
28 | | } |
29 | | |
30 | | impl Default for WorkerRegistry { |
31 | 0 | fn default() -> Self { |
32 | 0 | Self::new() |
33 | 0 | } |
34 | | } |
35 | | |
36 | | impl WorkerRegistry { |
37 | | /// Creates a new worker registry. |
38 | 32 | pub fn new() -> Self { |
39 | 32 | Self { |
40 | 32 | workers: RwLock::new(HashMap::new()), |
41 | 32 | } |
42 | 32 | } |
43 | | |
44 | | /// Updates the heartbeat timestamp for a worker. |
45 | 3 | pub async fn update_worker_heartbeat(&self, worker_id: &WorkerId, now: SystemTime) { |
46 | 3 | let mut workers = self.workers.write().await; |
47 | 3 | workers.insert(worker_id.clone(), now); |
48 | 3 | trace!(?worker_id, "FLOW: Worker heartbeat updated in registry"2 ); |
49 | 3 | } |
50 | | |
51 | 37 | pub async fn register_worker(&self, worker_id: &WorkerId, now: SystemTime) { |
52 | 37 | let mut workers = self.workers.write().await; |
53 | 37 | workers.insert(worker_id.clone(), now); |
54 | 37 | debug!(?worker_id, "FLOW: Worker registered in registry"35 ); |
55 | 37 | } |
56 | | |
57 | 7 | pub async fn remove_worker(&self, worker_id: &WorkerId) { |
58 | 7 | let mut workers = self.workers.write().await; |
59 | 7 | workers.remove(worker_id); |
60 | 7 | debug!(?worker_id, "FLOW: Worker removed from registry"6 ); |
61 | 7 | } |
62 | | |
63 | 10 | pub async fn is_worker_alive( |
64 | 10 | &self, |
65 | 10 | worker_id: &WorkerId, |
66 | 10 | timeout: Duration, |
67 | 10 | now: SystemTime, |
68 | 10 | ) -> bool { |
69 | 10 | let workers = self.workers.read().await; |
70 | | |
71 | 10 | if let Some(last_seen8 ) = workers.get(worker_id) { Branch (71:16): [True: 2, False: 0]
Branch (71:16): [True: 4, False: 2]
Branch (71:16): [True: 0, False: 0]
Branch (71:16): [True: 2, False: 0]
|
72 | 8 | if let Some(deadline) = last_seen.checked_add(timeout) { Branch (72:20): [True: 2, False: 0]
Branch (72:20): [True: 4, False: 0]
Branch (72:20): [True: 0, False: 0]
Branch (72:20): [True: 2, False: 0]
|
73 | 8 | let is_alive = deadline > now; |
74 | 8 | trace!( |
75 | | ?worker_id, |
76 | | ?last_seen, |
77 | | ?timeout, |
78 | | is_alive, |
79 | 4 | "FLOW: Worker liveness check" |
80 | | ); |
81 | 8 | return is_alive; |
82 | 0 | } |
83 | 2 | } |
84 | | |
85 | 2 | trace!(?worker_id, "FLOW: Worker not found or timed out"0 ); |
86 | 2 | false |
87 | 10 | } |
88 | | |
89 | 0 | pub async fn get_worker_last_seen(&self, worker_id: &WorkerId) -> Option<SystemTime> { |
90 | 0 | let workers = self.workers.read().await; |
91 | 0 | workers.get(worker_id).copied() |
92 | 0 | } |
93 | | } |
94 | | |
95 | | pub type SharedWorkerRegistry = Arc<WorkerRegistry>; |
96 | | |
97 | | #[cfg(test)] |
98 | | mod tests { |
99 | | use super::*; |
100 | | |
101 | | #[tokio::test] |
102 | 1 | async fn test_worker_heartbeat() { |
103 | 1 | let registry = WorkerRegistry::new(); |
104 | 1 | let worker_id = WorkerId::from(String::from("test")); |
105 | 1 | let now = SystemTime::now(); |
106 | | |
107 | | // Worker not registered yet |
108 | 1 | assert!( |
109 | 1 | !registry |
110 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), now) |
111 | 1 | .await |
112 | | ); |
113 | | |
114 | | // Register worker |
115 | 1 | registry.register_worker(&worker_id, now).await; |
116 | 1 | assert!( |
117 | 1 | registry |
118 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), now) |
119 | 1 | .await |
120 | | ); |
121 | | |
122 | | // Check with expired timeout |
123 | 1 | let future = now.checked_add(Duration::from_secs(10)).unwrap(); |
124 | 1 | assert!( |
125 | 1 | !registry |
126 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), future) |
127 | 1 | .await |
128 | | ); |
129 | | |
130 | | // Update heartbeat |
131 | 1 | registry.update_worker_heartbeat(&worker_id, future).await; |
132 | 1 | assert!( |
133 | 1 | registry |
134 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), future) |
135 | 1 | .await |
136 | 1 | ); |
137 | 1 | } |
138 | | |
139 | | #[tokio::test] |
140 | 1 | async fn test_remove_worker() { |
141 | 1 | let registry = WorkerRegistry::new(); |
142 | 1 | let worker_id = WorkerId::from(String::from("test-worker")); |
143 | 1 | let now = SystemTime::now(); |
144 | | |
145 | 1 | registry.register_worker(&worker_id, now).await; |
146 | 1 | assert!( |
147 | 1 | registry |
148 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), now) |
149 | 1 | .await |
150 | | ); |
151 | | |
152 | 1 | registry.remove_worker(&worker_id).await; |
153 | 1 | assert!( |
154 | 1 | !registry |
155 | 1 | .is_worker_alive(&worker_id, Duration::from_secs(5), now) |
156 | 1 | .await |
157 | 1 | ); |
158 | 1 | } |
159 | | } |