Coverage Report

Created: 2025-10-18 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/worker_api_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
use std::time::{SystemTime, UNIX_EPOCH};
21
22
use futures::stream::unfold;
23
use futures::{Stream, StreamExt};
24
use nativelink_config::cas_server::WorkerApiConfig;
25
use nativelink_error::{make_err, Code, Error, ResultExt};
26
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_scheduler::Update;
27
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_server::{
28
    WorkerApi, WorkerApiServer as Server,
29
};
30
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
31
    execute_result, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForScheduler, UpdateForWorker
32
};
33
use nativelink_scheduler::worker::Worker;
34
use nativelink_scheduler::worker_scheduler::WorkerScheduler;
35
use nativelink_util::background_spawn;
36
use nativelink_util::action_messages::{OperationId, WorkerId};
37
use nativelink_util::operation_state_manager::UpdateOperationType;
38
use nativelink_util::platform_properties::PlatformProperties;
39
use rand::RngCore;
40
use tokio::sync::mpsc;
41
use tokio::time::interval;
42
use tonic::{Response, Status};
43
use tracing::{debug, error, warn, instrument, Level};
44
use uuid::Uuid;
45
46
pub type ConnectWorkerStream =
47
    Pin<Box<dyn Stream<Item = Result<UpdateForWorker, Status>> + Send + Sync + 'static>>;
48
49
pub type NowFn = Box<dyn Fn() -> Result<Duration, Error> + Send + Sync>;
50
51
pub struct WorkerApiServer {
52
    scheduler: Arc<dyn WorkerScheduler>,
53
    now_fn: Arc<NowFn>,
54
    node_id: [u8; 6],
55
}
56
57
impl core::fmt::Debug for WorkerApiServer {
58
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
59
0
        f.debug_struct("WorkerApiServer")
60
0
            .field("node_id", &self.node_id)
61
0
            .finish_non_exhaustive()
62
0
    }
63
}
64
65
impl WorkerApiServer {
66
0
    pub fn new(
67
0
        config: &WorkerApiConfig,
68
0
        schedulers: &HashMap<String, Arc<dyn WorkerScheduler>>,
69
0
    ) -> Result<Self, Error> {
70
0
        let node_id = {
71
0
            let mut out = [0; 6];
72
0
            rand::rng().fill_bytes(&mut out);
73
0
            out
74
        };
75
0
        for scheduler in schedulers.values() {
76
            // This will protect us from holding a reference to the scheduler forever in the
77
            // event our ExecutionServer dies. Our scheduler is a weak ref, so the spawn will
78
            // eventually see the Arc went away and return.
79
0
            let weak_scheduler = Arc::downgrade(scheduler);
80
0
            background_spawn!("worker_api_server", async move {
81
0
                let mut ticker = interval(Duration::from_secs(1));
82
                loop {
83
0
                    ticker.tick().await;
84
0
                    let timestamp = SystemTime::now()
85
0
                        .duration_since(UNIX_EPOCH)
86
0
                        .expect("Error: system time is now behind unix epoch");
87
0
                    match weak_scheduler.upgrade() {
88
0
                        Some(scheduler) => {
89
0
                            if let Err(err) =
  Branch (89:36): [True: 0, False: 0]
  Branch (89:36): [Folded - Ignored]
90
0
                                scheduler.remove_timedout_workers(timestamp.as_secs()).await
91
                            {
92
0
                                error!(?err, "Failed to remove_timedout_workers",);
93
0
                            }
94
                        }
95
                        // If we fail to upgrade, our service is probably destroyed, so return.
96
0
                        None => return,
97
                    }
98
                }
99
0
            });
100
        }
101
102
0
        Self::new_with_now_fn(
103
0
            config,
104
0
            schedulers,
105
0
            Box::new(move || {
106
0
                SystemTime::now()
107
0
                    .duration_since(UNIX_EPOCH)
108
0
                    .map_err(|_| make_err!(Code::Internal, "System time is now behind unix epoch"))
109
0
            }),
110
0
            node_id,
111
        )
112
0
    }
113
114
    /// Same as `new()`, but you can pass a custom `now_fn`, that returns a Duration since `UNIX_EPOCH`
115
    /// representing the current time. Used mostly in  unit tests.
116
6
    pub fn new_with_now_fn(
117
6
        config: &WorkerApiConfig,
118
6
        schedulers: &HashMap<String, Arc<dyn WorkerScheduler>>,
119
6
        now_fn: NowFn,
120
6
        node_id: [u8; 6],
121
6
    ) -> Result<Self, Error> {
122
6
        let scheduler = schedulers
123
6
            .get(&config.scheduler)
124
6
            .err_tip(|| 
{0
125
0
                format!(
126
0
                    "Scheduler needs config for '{}' because it exists in worker_api",
127
                    config.scheduler
128
                )
129
0
            })?
130
6
            .clone();
131
6
        Ok(Self {
132
6
            scheduler,
133
6
            now_fn: Arc::new(now_fn),
134
6
            node_id,
135
6
        })
136
6
    }
137
138
0
    pub fn into_service(self) -> Server<Self> {
139
0
        Server::new(self)
140
0
    }
141
142
6
    async fn inner_connect_worker(
143
6
        &self,
144
6
        mut update_stream: impl Stream<Item = Result<UpdateForScheduler, Status>>
145
6
        + Unpin
146
6
        + Send
147
6
        + 'static,
148
6
    ) -> Result<Response<ConnectWorkerStream>, Error> {
149
6
        let first_message = update_stream
150
6
            .next()
151
6
            .await
152
6
            .err_tip(|| "Missing first message for connect_worker")
?0
153
6
            .err_tip(|| "Error reading first message for connect_worker")
?0
;
154
6
        let Some(Update::ConnectWorkerRequest(connect_worker_request)) = first_message.update
  Branch (154:13): [True: 0, False: 0]
  Branch (154:13): [Folded - Ignored]
  Branch (154:13): [True: 6, False: 0]
155
        else {
156
0
            return Err(make_err!(
157
0
                Code::Internal,
158
0
                "First message was not a ConnectWorkerRequest"
159
0
            ));
160
        };
161
162
6
        let (tx, rx) = mpsc::unbounded_channel();
163
164
        // First convert our proto platform properties into one our scheduler understands.
165
6
        let platform_properties = {
166
6
            let mut platform_properties = PlatformProperties::default();
167
6
            for 
property0
in connect_worker_request.properties {
168
0
                let platform_property_value = self
169
0
                    .scheduler
170
0
                    .get_platform_property_manager()
171
0
                    .make_prop_value(&property.name, &property.value)
172
0
                    .err_tip(|| "Bad Property during connect_worker()")?;
173
0
                platform_properties
174
0
                    .properties
175
0
                    .insert(property.name.clone(), platform_property_value);
176
            }
177
6
            platform_properties
178
        };
179
180
        // Now register the worker with the scheduler.
181
6
        let worker_id = {
182
6
            let worker_id = WorkerId(format!(
183
6
                "{}{}",
184
6
                connect_worker_request.worker_id_prefix,
185
6
                Uuid::now_v6(&self.node_id).hyphenated()
186
6
            ));
187
6
            let worker = Worker::new(
188
6
                worker_id.clone(),
189
6
                platform_properties,
190
6
                tx,
191
6
                (self.now_fn)()
?0
.as_secs(),
192
            );
193
6
            self.scheduler
194
6
                .add_worker(worker)
195
6
                .await
196
6
                .err_tip(|| "Failed to add worker in inner_connect_worker()")
?0
;
197
6
            worker_id
198
        };
199
200
6
        WorkerConnection::start(
201
6
            self.scheduler.clone(),
202
6
            self.now_fn.clone(),
203
6
            worker_id.clone(),
204
6
            update_stream,
205
        );
206
207
6
        Ok(Response::new(Box::pin(unfold(
208
6
            (rx, worker_id),
209
8
            move |state| async move {
210
8
                let (mut rx, worker_id) = state;
211
8
                if let Some(update_for_worker) = rx.recv().await {
  Branch (211:24): [True: 0, False: 0]
  Branch (211:24): [Folded - Ignored]
  Branch (211:24): [True: 8, False: 0]
212
8
                    return Some((Ok(update_for_worker), (rx, worker_id)));
213
0
                }
214
0
                warn!(
215
                    ?worker_id,
216
0
                    "UpdateForWorker channel was closed, thus closing connection to worker node",
217
                );
218
219
0
                None
220
16
            },
221
        ))))
222
6
    }
223
224
6
    pub async fn inner_connect_worker_for_testing(
225
6
        &self,
226
6
        update_stream: impl Stream<Item = Result<UpdateForScheduler, Status>> + Unpin + Send + 'static,
227
6
    ) -> Result<Response<ConnectWorkerStream>, Error> {
228
6
        self.inner_connect_worker(update_stream).await
229
6
    }
230
}
231
232
#[tonic::async_trait]
233
impl WorkerApi for WorkerApiServer {
234
    type ConnectWorkerStream = ConnectWorkerStream;
235
236
    #[instrument(
237
        err,
238
        level = Level::ERROR,
239
        skip_all,
240
        fields(request = ?grpc_request.get_ref())
241
    )]
242
    async fn connect_worker(
243
        &self,
244
        grpc_request: tonic::Request<tonic::Streaming<UpdateForScheduler>>,
245
    ) -> Result<Response<Self::ConnectWorkerStream>, Status> {
246
        let resp = self
247
            .inner_connect_worker(grpc_request.into_inner())
248
            .await
249
            .map_err(Into::into);
250
        if resp.is_ok() {
251
            debug!(return = "Ok(<stream>)");
252
        }
253
        resp
254
    }
255
}
256
257
struct WorkerConnection {
258
    scheduler: Arc<dyn WorkerScheduler>,
259
    now_fn: Arc<NowFn>,
260
    worker_id: WorkerId,
261
}
262
263
impl WorkerConnection {
264
6
    fn start(
265
6
        scheduler: Arc<dyn WorkerScheduler>,
266
6
        now_fn: Arc<NowFn>,
267
6
        worker_id: WorkerId,
268
6
        mut connection: impl Stream<Item = Result<UpdateForScheduler, Status>> + Unpin + Send + 'static,
269
6
    ) {
270
6
        let instance = Self {
271
6
            scheduler,
272
6
            now_fn,
273
6
            worker_id,
274
6
        };
275
276
6
        background_spawn!("worker_api", async move 
{2
277
2
            let mut had_going_away = false;
278
3
            while let Some(
maybe_update2
) = connection.next().await {
  Branch (278:23): [True: 0, False: 0]
  Branch (278:23): [Folded - Ignored]
  Branch (278:23): [True: 2, False: 0]
279
2
                let update = match maybe_update.map(|u| u.update) {
280
2
                    Ok(Some(update)) => update,
281
                    Ok(None) => {
282
0
                        tracing::warn!(worker_id=?instance.worker_id, "Empty update");
283
0
                        continue;
284
                    }
285
0
                    Err(err) => {
286
0
                        tracing::warn!(worker_id=?instance.worker_id, ?err, "Error from worker");
287
0
                        break;
288
                    }
289
                };
290
2
                let 
result1
= match update {
291
0
                    Update::ConnectWorkerRequest(_connect_worker_request) => Err(make_err!(
292
0
                        Code::Internal,
293
0
                        "Got ConnectWorkerRequest after initial message for {}",
294
0
                        instance.worker_id
295
0
                    )),
296
1
                    Update::KeepAliveRequest(keep_alive_request) => {
297
1
                        instance.inner_keep_alive(keep_alive_request).await
298
                    }
299
0
                    Update::GoingAwayRequest(going_away_request) => {
300
0
                        had_going_away = true;
301
0
                        instance.inner_going_away(going_away_request).await
302
                    }
303
1
                    Update::ExecuteResult(execute_result) => {
304
1
                        instance.inner_execution_response(execute_result).await
305
                    }
306
0
                    Update::ExecuteComplete(execute_complete) => {
307
0
                        instance.execution_complete(execute_complete).await
308
                    }
309
                };
310
1
                if let Err(
err0
) = result {
  Branch (310:24): [True: 0, False: 0]
  Branch (310:24): [Folded - Ignored]
  Branch (310:24): [True: 0, False: 1]
311
0
                    tracing::warn!(worker_id=?instance.worker_id, ?err, "Error processing worker message");
312
1
                }
313
            }
314
0
            tracing::debug!(worker_id=?instance.worker_id, "Update for scheduler dropped");
315
0
            if !had_going_away {
  Branch (315:16): [True: 0, False: 0]
  Branch (315:16): [Folded - Ignored]
  Branch (315:16): [True: 0, False: 0]
316
0
                drop(instance.scheduler.remove_worker(&instance.worker_id).await);
317
0
            }
318
0
        });
319
6
    }
320
321
1
    async fn inner_keep_alive(&self, _keep_alive_request: KeepAliveRequest) -> Result<(), Error> {
322
1
        self.scheduler
323
1
            .worker_keep_alive_received(&self.worker_id, (self.now_fn)()
?0
.as_secs())
324
1
            .await
325
1
            .err_tip(|| "Could not process keep_alive from worker in inner_keep_alive()")
?0
;
326
1
        Ok(())
327
1
    }
328
329
0
    async fn inner_going_away(&self, _going_away_request: GoingAwayRequest) -> Result<(), Error> {
330
0
        self.scheduler
331
0
            .remove_worker(&self.worker_id)
332
0
            .await
333
0
            .err_tip(|| "While calling WorkerApiServer::inner_going_away")?;
334
0
        Ok(())
335
0
    }
336
337
1
    async fn inner_execution_response(&self, execute_result: ExecuteResult) -> Result<(), Error> {
338
1
        let operation_id = OperationId::from(execute_result.operation_id);
339
340
1
        match execute_result
341
1
            .result
342
1
            .err_tip(|| "Expected result to exist in ExecuteResult")
?0
343
        {
344
1
            execute_result::Result::ExecuteResponse(finished_result) => {
345
1
                let action_stage = finished_result
346
1
                    .try_into()
347
1
                    .err_tip(|| "Failed to convert ExecuteResponse into an ActionStage")
?0
;
348
1
                self.scheduler
349
1
                    .update_action(
350
1
                        &self.worker_id,
351
1
                        &operation_id,
352
1
                        UpdateOperationType::UpdateWithActionStage(action_stage),
353
1
                    )
354
1
                    .await
355
0
                    .err_tip(|| format!("Failed to operation {operation_id:?}"))?;
356
            }
357
0
            execute_result::Result::InternalError(e) => {
358
0
                self.scheduler
359
0
                    .update_action(
360
0
                        &self.worker_id,
361
0
                        &operation_id,
362
0
                        UpdateOperationType::UpdateWithError(e.into()),
363
0
                    )
364
0
                    .await
365
0
                    .err_tip(|| format!("Failed to operation {operation_id:?}"))?;
366
            }
367
        }
368
0
        Ok(())
369
0
    }
370
371
0
    async fn execution_complete(&self, execute_complete: ExecuteComplete) -> Result<(), Error> {
372
0
        let operation_id = OperationId::from(execute_complete.operation_id);
373
0
        self.scheduler
374
0
            .update_action(
375
0
                &self.worker_id,
376
0
                &operation_id,
377
0
                UpdateOperationType::ExecutionComplete,
378
0
            )
379
0
            .await
380
0
            .err_tip(|| format!("Failed to operation {operation_id:?}"))?;
381
0
        Ok(())
382
0
    }
383
}