Coverage Report

Created: 2025-11-07 13:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/local_worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::str;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::process::Stdio;
20
use std::sync::{Arc, Weak};
21
22
use futures::future::BoxFuture;
23
use futures::stream::FuturesUnordered;
24
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
25
use nativelink_config::cas_server::LocalWorkerConfig;
26
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
27
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
28
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
29
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
30
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
31
    ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
32
    execute_result,
33
};
34
use nativelink_store::fast_slow_store::FastSlowStore;
35
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
36
use nativelink_util::common::fs;
37
use nativelink_util::digest_hasher::DigestHasherFunc;
38
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
39
use nativelink_util::shutdown_guard::ShutdownGuard;
40
use nativelink_util::store_trait::Store;
41
use nativelink_util::{spawn, tls_utils};
42
use opentelemetry::context::Context;
43
use tokio::process;
44
use tokio::sync::{broadcast, mpsc};
45
use tokio::time::sleep;
46
use tokio_stream::wrappers::UnboundedReceiverStream;
47
use tonic::Streaming;
48
use tracing::{Level, debug, error, event, info, info_span, instrument, warn};
49
50
use crate::running_actions_manager::{
51
    ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
52
    RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
53
};
54
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
55
use crate::worker_utils::make_connect_worker_request;
56
57
/// Amount of time to wait if we have actions in transit before we try to
58
/// consider an error to have occurred.
59
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;
60
61
/// If we lose connection to the worker api server we will wait this many seconds
62
/// before trying to connect.
63
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
64
65
/// Default endpoint timeout. If this value gets modified the documentation in
66
/// `cas_server.rs` must also be updated.
67
const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.;
68
69
/// Default maximum amount of time a task is allowed to run for.
70
/// If this value gets modified the documentation in `cas_server.rs` must also be updated.
71
const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_secs(1200); // 20 mins.
72
73
struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
74
    config: &'a LocalWorkerConfig,
75
    // According to the tonic documentation it is a cheap operation to clone this.
76
    grpc_client: T,
77
    worker_id: String,
78
    running_actions_manager: Arc<U>,
79
    // Number of actions that have been received in `Update::StartAction`, but
80
    // not yet processed by running_actions_manager's spawn. This number should
81
    // always be zero if there are no actions running and no actions being waited
82
    // on by the scheduler.
83
    actions_in_transit: Arc<AtomicU64>,
84
    metrics: Arc<Metrics>,
85
}
86
87
4
async fn preconditions_met(precondition_script: Option<String>) -> Result<(), Error> {
88
4
    let Some(
precondition_script1
) = &precondition_script else {
  Branch (88:9): [True: 1, False: 3]
  Branch (88:9): [Folded - Ignored]
  Branch (88:9): [Folded - Ignored]
89
        // No script means we are always ok to proceed.
90
3
        return Ok(());
91
    };
92
    // TODO: Might want to pass some information about the command to the
93
    //       script, but at this point it's not even been downloaded yet,
94
    //       so that's not currently possible.  Perhaps we'll move this in
95
    //       future to pass useful information through?  Or perhaps we'll
96
    //       have a pre-condition and a pre-execute script instead, although
97
    //       arguably entrypoint already gives us that.
98
1
    let precondition_process = process::Command::new(precondition_script)
99
1
        .kill_on_drop(true)
100
1
        .stdin(Stdio::null())
101
1
        .stdout(Stdio::piped())
102
1
        .stderr(Stdio::null())
103
1
        .env_clear()
104
1
        .spawn()
105
1
        .err_tip(|| format!(
"Could not execute precondition command {precondition_script:?}"0
))
?0
;
106
1
    let output = precondition_process.wait_with_output().await
?0
;
107
1
    if output.status.code() == Some(0) {
  Branch (107:8): [True: 0, False: 1]
  Branch (107:8): [Folded - Ignored]
  Branch (107:8): [Folded - Ignored]
108
0
        Ok(())
109
    } else {
110
1
        Err(make_err!(
111
1
            Code::ResourceExhausted,
112
1
            "Preconditions script returned status {} - {}",
113
1
            output.status,
114
1
            str::from_utf8(&output.stdout).unwrap_or("")
115
1
        ))
116
    }
117
4
}
118
119
impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
120
5
    fn new(
121
5
        config: &'a LocalWorkerConfig,
122
5
        grpc_client: T,
123
5
        worker_id: String,
124
5
        running_actions_manager: Arc<U>,
125
5
        metrics: Arc<Metrics>,
126
5
    ) -> Self {
127
5
        Self {
128
5
            config,
129
5
            grpc_client,
130
5
            worker_id,
131
5
            running_actions_manager,
132
5
            // Number of actions that have been received in `Update::StartAction`, but
133
5
            // not yet processed by running_actions_manager's spawn. This number should
134
5
            // always be zero if there are no actions running and no actions being waited
135
5
            // on by the scheduler.
136
5
            actions_in_transit: Arc::new(AtomicU64::new(0)),
137
5
            metrics,
138
5
        }
139
5
    }
140
141
    /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`.
142
5
    async fn start_keep_alive(&self) -> Result<(), Error> 
{4
143
        // According to tonic's documentation this call should be cheap and is the same stream.
144
4
        let mut grpc_client = self.grpc_client.clone();
145
146
        loop {
147
4
            let timeout = self
148
4
                .config
149
4
                .worker_api_endpoint
150
4
                .timeout
151
4
                .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
152
            // We always send 2 keep alive requests per timeout. Http2 should manage most of our
153
            // timeout issues, this is a secondary check to ensure we can still send data.
154
4
            sleep(Duration::from_secs_f32(timeout / 2.)).await;
155
0
            if let Err(e) = grpc_client.keep_alive(KeepAliveRequest {}).await {
  Branch (155:20): [True: 0, False: 0]
  Branch (155:20): [Folded - Ignored]
  Branch (155:20): [Folded - Ignored]
156
0
                return Err(make_err!(
157
0
                    Code::Internal,
158
0
                    "Failed to send KeepAlive in LocalWorker : {:?}",
159
0
                    e
160
0
                ));
161
0
            }
162
        }
163
0
    }
164
165
5
    async fn run(
166
5
        &self,
167
5
        update_for_worker_stream: Streaming<UpdateForWorker>,
168
5
        shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>,
169
5
    ) -> Result<(), Error> {
170
        // This big block of logic is designed to help simplify upstream components. Upstream
171
        // components can write standard futures that return a `Result<(), Error>` and this block
172
        // will forward the error up to the client and disconnect from the scheduler.
173
        // It is a common use case that an item sent through update_for_worker_stream will always
174
        // have a response but the response will be triggered through a callback to the scheduler.
175
        // This can be quite tricky to manage, so what we have done here is given access to a
176
        // `futures` variable which because this is in a single thread as well as a channel that you
177
        // send a future into that makes it into the `futures` variable.
178
        // This means that if you want to perform an action based on the result of the future
179
        // you use the `.map()` method and the new action will always come to live in this spawn,
180
        // giving mutable access to stuff in this struct.
181
        // NOTE: If you ever return from this function it will disconnect from the scheduler.
182
5
        let mut futures = FuturesUnordered::new();
183
5
        futures.push(self.start_keep_alive().boxed());
184
185
5
        let (add_future_channel, add_future_rx) = mpsc::unbounded_channel();
186
5
        let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse();
187
188
5
        let mut update_for_worker_stream = update_for_worker_stream.fuse();
189
        // A notify which is triggered every time actions_in_flight is subtracted.
190
5
        let actions_notify = Arc::new(tokio::sync::Notify::new());
191
        // A counter of actions that are in-flight, this is similar to actions_in_transit but
192
        // includes the AC upload and notification to the scheduler.
193
5
        let actions_in_flight = Arc::new(AtomicU64::new(0));
194
        // Set to true when shutting down, this stops any new StartAction.
195
5
        let mut shutting_down = false;
196
197
        loop {
198
16
            select! {
199
16
                
maybe_update6
= update_for_worker_stream.next() => {
200
6
                    match maybe_update
201
6
                        .err_tip(|| "UpdateForWorker stream closed early")
?1
202
5
                        .err_tip(|| "Got error in UpdateForWorker stream")
?0
203
                        .update
204
5
                        .err_tip(|| "Expected update to exist in UpdateForWorker")
?0
205
                    {
206
                        Update::ConnectionResult(_) => {
207
0
                            return Err(make_input_err!(
208
0
                                "Got ConnectionResult in LocalWorker::run which should never happen"
209
0
                            ));
210
                        }
211
                        // TODO(palfrey) We should possibly do something with this notification.
212
0
                        Update::Disconnect(()) => {
213
0
                            self.metrics.disconnects_received.inc();
214
0
                        }
215
0
                        Update::KeepAlive(()) => {
216
0
                            self.metrics.keep_alives_received.inc();
217
0
                        }
218
1
                        Update::KillOperationRequest(kill_operation_request) => {
219
1
                            let operation_id = OperationId::from(kill_operation_request.operation_id);
220
1
                            if let Err(
err0
) = self.running_actions_manager.kill_operation(&operation_id).await {
  Branch (220:36): [True: 0, False: 1]
  Branch (220:36): [Folded - Ignored]
  Branch (220:36): [Folded - Ignored]
221
0
                                error!(
222
                                    %operation_id,
223
                                    ?err,
224
0
                                    "Failed to send kill request for operation"
225
                                );
226
1
                            }
227
                        }
228
4
                        Update::StartAction(start_execute) => {
229
                            // Don't accept any new requests if we're shutting down.
230
4
                            if shutting_down {
  Branch (230:32): [True: 0, False: 4]
  Branch (230:32): [Folded - Ignored]
  Branch (230:32): [Folded - Ignored]
231
0
                                if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) {
  Branch (231:40): [True: 0, False: 0]
  Branch (231:40): [Folded - Ignored]
  Branch (231:40): [Folded - Ignored]
232
0
                                    self.grpc_client.clone().execution_response(
233
0
                                        ExecuteResult{
234
0
                                            instance_name,
235
0
                                            operation_id: start_execute.operation_id,
236
0
                                            result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())),
237
0
                                        }
238
0
                                    ).await?;
239
0
                                }
240
0
                                continue;
241
4
                            }
242
243
4
                            self.metrics.start_actions_received.inc();
244
245
4
                            let execute_request = start_execute.execute_request.as_ref();
246
4
                            let operation_id = start_execute.operation_id.clone();
247
4
                            let operation_id_to_log = operation_id.clone();
248
4
                            let maybe_instance_name = execute_request.map(|v| v.instance_name.clone());
249
4
                            let action_digest = execute_request.and_then(|v| v.action_digest.clone());
250
4
                            let digest_hasher = execute_request
251
4
                                .ok_or_else(|| make_input_err!("Expected execute_request to be set"))
252
4
                                .and_then(|v| DigestHasherFunc::try_from(v.digest_function))
253
4
                                .err_tip(|| "In LocalWorkerImpl::new()")
?0
;
254
255
4
                            let start_action_fut = {
256
4
                                let precondition_script_cfg = self.config.experimental_precondition_script.clone();
257
4
                                let actions_in_transit = self.actions_in_transit.clone();
258
4
                                let worker_id = self.worker_id.clone();
259
4
                                let running_actions_manager = self.running_actions_manager.clone();
260
4
                                let mut grpc_client = self.grpc_client.clone();
261
4
                                let complete = ExecuteComplete {
262
4
                                    operation_id: operation_id.clone(),
263
4
                                };
264
4
                                self.metrics.clone().wrap(move |metrics| async move {
265
4
                                    metrics.preconditions.wrap(preconditions_met(precondition_script_cfg))
266
4
                                    .and_then(|()| 
running_actions_manager3
.
create_and_add_action3
(
worker_id3
,
start_execute3
))
267
4
                                    .map(move |r| {
268
                                        // Now that we either failed or registered our action, we can
269
                                        // consider the action to no longer be in transit.
270
4
                                        actions_in_transit.fetch_sub(1, Ordering::Release);
271
4
                                        r
272
4
                                    })
273
4
                                    .and_then(|action| 
{3
274
3
                                        debug!(
275
3
                                            operation_id = %action.get_operation_id(),
276
3
                                            "Received request to run action"
277
                                        );
278
3
                                        action
279
3
                                            .clone()
280
3
                                            .prepare_action()
281
3
                                            .and_then(RunningAction::execute)
282
3
                                            .and_then(|result| async move 
{2
283
                                                // Notify that execution has completed so it can schedule a new action.
284
2
                                                drop(grpc_client.execution_complete(complete).await);
285
2
                                                Ok(result)
286
4
                                            })
287
3
                                            .and_then(RunningAction::upload_results)
288
3
                                            .and_then(RunningAction::get_finished_result)
289
                                            // Note: We need ensure we run cleanup even if one of the other steps fail.
290
3
                                            .then(|result| async move 
{2
291
2
                                                if let Err(
e0
) = action.cleanup().await {
  Branch (291:56): [True: 0, False: 2]
  Branch (291:56): [Folded - Ignored]
  Branch (291:56): [Folded - Ignored]
292
0
                                                    return Result::<ActionResult, Error>::Err(e).merge(result);
293
2
                                                }
294
2
                                                result
295
4
                                            })
296
4
                                    
}3
).await
297
7
                                })
298
                            };
299
300
4
                            let make_publish_future = {
301
4
                                let mut grpc_client = self.grpc_client.clone();
302
303
4
                                let running_actions_manager = self.running_actions_manager.clone();
304
3
                                move |res: Result<ActionResult, Error>| async move {
305
3
                                    let instance_name = maybe_instance_name
306
3
                                        .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")
?0
;
307
3
                                    match res {
308
2
                                        Ok(mut action_result) => {
309
                                            // Save in the action cache before notifying the scheduler that we've completed.
310
2
                                            if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) {
  Branch (310:52): [True: 2, False: 0]
  Branch (310:52): [Folded - Ignored]
  Branch (310:52): [Folded - Ignored]
311
2
                                                if let Err(
err0
) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await {
  Branch (311:56): [True: 0, False: 2]
  Branch (311:56): [Folded - Ignored]
  Branch (311:56): [Folded - Ignored]
312
0
                                                    error!(
313
                                                        ?err,
314
                                                        ?action_digest,
315
0
                                                        "Error saving action in store",
316
                                                    );
317
2
                                                }
318
0
                                            }
319
2
                                            let action_stage = ActionStage::Completed(action_result);
320
2
                                            grpc_client.execution_response(
321
2
                                                ExecuteResult{
322
2
                                                    instance_name,
323
2
                                                    operation_id,
324
2
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
325
2
                                                }
326
2
                                            )
327
2
                                            .await
328
0
                                            .err_tip(|| "Error while calling execution_response")?;
329
                                        },
330
1
                                        Err(e) => {
331
1
                                            grpc_client.execution_response(ExecuteResult{
332
1
                                                instance_name,
333
1
                                                operation_id,
334
1
                                                result: Some(execute_result::Result::InternalError(e.into())),
335
1
                                            }).await.
err_tip0
(|| "Error calling execution_response with error")
?0
;
336
                                        },
337
                                    }
338
0
                                    Ok(())
339
3
                                }
340
                            };
341
342
4
                            self.actions_in_transit.fetch_add(1, Ordering::Release);
343
344
4
                            let add_future_channel = add_future_channel.clone();
345
346
4
                            info_span!(
347
                                "worker_start_action_ctx",
348
                                operation_id = operation_id_to_log,
349
4
                                digest_function = %digest_hasher.to_string(),
350
4
                            ).in_scope(|| {
351
4
                                let _guard = Context::current_with_value(digest_hasher)
352
4
                                    .attach();
353
354
4
                                let actions_in_flight = actions_in_flight.clone();
355
4
                                let actions_notify = actions_notify.clone();
356
4
                                let actions_in_flight_fail = actions_in_flight.clone();
357
4
                                let actions_notify_fail = actions_notify.clone();
358
4
                                actions_in_flight.fetch_add(1, Ordering::Release);
359
360
4
                                futures.push(
361
4
                                    spawn!("worker_start_action", start_action_fut).map(move |res| 
{3
362
3
                                        let res = res.err_tip(|| "Failed to launch spawn")
?0
;
363
3
                                        if let Err(
err1
) = &res {
  Branch (363:48): [True: 1, False: 2]
  Branch (363:48): [Folded - Ignored]
  Branch (363:48): [Folded - Ignored]
364
1
                                            error!(?err, "Error executing action");
365
2
                                        }
366
3
                                        add_future_channel
367
3
                                            .send(make_publish_future(res).then(move |res| 
{0
368
0
                                                actions_in_flight.fetch_sub(1, Ordering::Release);
369
0
                                                actions_notify.notify_one();
370
0
                                                core::future::ready(res)
371
3
                                            
}0
).boxed())
372
3
                                            .map_err(|_| make_err!(
Code::Internal0
, "LocalWorker could not send future"))
?0
;
373
3
                                        Ok(())
374
3
                                    })
375
4
                                    .or_else(move |err| 
{0
376
                                        // If the make_publish_future is not run we still need to notify.
377
0
                                        actions_in_flight_fail.fetch_sub(1, Ordering::Release);
378
0
                                        actions_notify_fail.notify_one();
379
0
                                        core::future::ready(Err(err))
380
0
                                    })
381
4
                                    .boxed()
382
                                );
383
4
                            });
384
                        }
385
                    }
386
                },
387
16
                
res3
= add_future_rx.next() => {
388
3
                    let fut = res.err_tip(|| "New future stream receives should never be closed")
?0
;
389
3
                    futures.push(fut);
390
                },
391
16
                
res3
= futures.next() =>
res3
.
err_tip3
(|| "Keep-alive should always pending. Likely unable to send data to scheduler")
?0
?0
,
392
16
                
complete_msg0
= shutdown_rx.recv().fuse() => {
393
0
                    warn!("Worker loop received shutdown signal. Shutting down worker...",);
394
0
                    let mut grpc_client = self.grpc_client.clone();
395
0
                    let shutdown_guard = complete_msg.map_err(|e| make_err!(Code::Internal, "Failed to receive shutdown message: {e:?}"))?;
396
0
                    let actions_in_flight = actions_in_flight.clone();
397
0
                    let actions_notify = actions_notify.clone();
398
0
                    let shutdown_future = async move {
399
                        // Wait for in-flight operations to be fully completed.
400
0
                        while actions_in_flight.load(Ordering::Acquire) > 0 {
  Branch (400:31): [True: 0, False: 0]
  Branch (400:31): [Folded - Ignored]
  Branch (400:31): [Folded - Ignored]
401
0
                            actions_notify.notified().await;
402
                        }
403
                        // Sending this message immediately evicts all jobs from
404
                        // this worker, of which there should be none.
405
0
                        if let Err(e) = grpc_client.going_away(GoingAwayRequest {}).await {
  Branch (405:32): [True: 0, False: 0]
  Branch (405:32): [Folded - Ignored]
  Branch (405:32): [Folded - Ignored]
406
0
                            error!("Failed to send GoingAwayRequest: {e}",);
407
0
                            return Err(e);
408
0
                        }
409
                        // Allow shutdown to occur now.
410
0
                        drop(shutdown_guard);
411
0
                        Ok::<(), Error>(())
412
0
                    };
413
0
                    futures.push(shutdown_future.boxed());
414
0
                    shutting_down = true;
415
                },
416
            };
417
        }
418
        // Unreachable.
419
1
    }
420
}
421
422
type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>;
423
424
pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
425
    config: Arc<LocalWorkerConfig>,
426
    running_actions_manager: Arc<U>,
427
    connection_factory: ConnectionFactory<T>,
428
    sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
429
    metrics: Arc<Metrics>,
430
}
431
432
impl<
433
    T: WorkerApiClientTrait + core::fmt::Debug + 'static,
434
    U: RunningActionsManager + core::fmt::Debug,
435
> core::fmt::Debug for LocalWorker<T, U>
436
{
437
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
438
0
        f.debug_struct("LocalWorker")
439
0
            .field("config", &self.config)
440
0
            .field("running_actions_manager", &self.running_actions_manager)
441
0
            .field("metrics", &self.metrics)
442
0
            .finish_non_exhaustive()
443
0
    }
444
}
445
446
/// Creates a new `LocalWorker`. The `cas_store` must be an instance of
447
/// `FastSlowStore` and will be checked at runtime.
448
2
pub async fn new_local_worker(
449
2
    config: Arc<LocalWorkerConfig>,
450
2
    cas_store: Store,
451
2
    ac_store: Option<Store>,
452
2
    historical_store: Store,
453
2
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
454
2
    let fast_slow_store = cas_store
455
2
        .downcast_ref::<FastSlowStore>(None)
456
2
        .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")
?0
457
2
        .get_arc()
458
2
        .err_tip(|| "FastSlowStore's Arc doesn't exist")
?0
;
459
460
    // Log warning about CAS configuration for multi-worker setups
461
2
    event!(
462
2
        Level::INFO,
463
2
        worker_name = %config.name,
464
2
        "Starting worker '{}'. IMPORTANT: If running multiple workers, all workers \
465
2
        must share the same CAS storage path to avoid 'Object not found' errors.",
466
2
        config.name
467
    );
468
469
2
    if let Ok(
path1
) = fs::canonicalize(&config.work_directory).await {
  Branch (469:12): [True: 1, False: 1]
  Branch (469:12): [Folded - Ignored]
  Branch (469:12): [Folded - Ignored]
470
1
        fs::remove_dir_all(&path).await.err_tip(|| 
{0
471
0
            format!(
472
0
                "Could not remove work_directory '{}' in LocalWorker",
473
0
                &path.as_path().to_str().unwrap_or("bad path")
474
            )
475
0
        })?;
476
1
    }
477
478
2
    fs::create_dir_all(&config.work_directory)
479
2
        .await
480
2
        .err_tip(|| format!(
"Could not make work_directory : {}"0
,
config.work_directory0
))
?0
;
481
2
    let entrypoint = if config.entrypoint.is_empty() {
  Branch (481:25): [True: 2, False: 0]
  Branch (481:25): [Folded - Ignored]
  Branch (481:25): [Folded - Ignored]
482
2
        None
483
    } else {
484
0
        Some(config.entrypoint.clone())
485
    };
486
2
    let max_action_timeout = if config.max_action_timeout == 0 {
  Branch (486:33): [True: 2, False: 0]
  Branch (486:33): [Folded - Ignored]
  Branch (486:33): [Folded - Ignored]
487
2
        DEFAULT_MAX_ACTION_TIMEOUT
488
    } else {
489
0
        Duration::from_secs(config.max_action_timeout as u64)
490
    };
491
492
    // Initialize directory cache if configured
493
2
    let directory_cache = if let Some(
cache_config0
) = &config.directory_cache {
  Branch (493:34): [True: 0, False: 2]
  Branch (493:34): [Folded - Ignored]
  Branch (493:34): [Folded - Ignored]
494
        use std::path::PathBuf;
495
496
        use crate::directory_cache::{
497
            DirectoryCache, DirectoryCacheConfig as WorkerDirCacheConfig,
498
        };
499
500
0
        let cache_root = if cache_config.cache_root.is_empty() {
  Branch (500:29): [True: 0, False: 0]
  Branch (500:29): [Folded - Ignored]
  Branch (500:29): [Folded - Ignored]
501
0
            PathBuf::from(&config.work_directory).parent().map_or_else(
502
0
                || PathBuf::from("/tmp/nativelink_directory_cache"),
503
0
                |p| p.join("directory_cache"),
504
            )
505
        } else {
506
0
            PathBuf::from(&cache_config.cache_root)
507
        };
508
509
0
        let worker_cache_config = WorkerDirCacheConfig {
510
0
            max_entries: cache_config.max_entries,
511
0
            max_size_bytes: cache_config.max_size_bytes,
512
0
            cache_root,
513
0
        };
514
515
0
        match DirectoryCache::new(worker_cache_config, Store::new(fast_slow_store.clone())).await {
516
0
            Ok(cache) => {
517
0
                tracing::info!("Directory cache initialized successfully");
518
0
                Some(Arc::new(cache))
519
            }
520
0
            Err(e) => {
521
0
                tracing::warn!("Failed to initialize directory cache: {:?}", e);
522
0
                None
523
            }
524
        }
525
    } else {
526
2
        None
527
    };
528
529
2
    let running_actions_manager =
530
2
        Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
531
2
            root_action_directory: config.work_directory.clone(),
532
2
            execution_configuration: ExecutionConfiguration {
533
2
                entrypoint,
534
2
                additional_environment: config.additional_environment.clone(),
535
2
            },
536
2
            cas_store: fast_slow_store,
537
2
            ac_store,
538
2
            historical_store,
539
2
            upload_action_result_config: &config.upload_action_result,
540
2
            max_action_timeout,
541
2
            timeout_handled_externally: config.timeout_handled_externally,
542
2
            directory_cache,
543
2
        })
?0
);
544
2
    let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager(
545
2
        config.clone(),
546
2
        running_actions_manager,
547
2
        Box::new(move || 
{0
548
0
            let config = config.clone();
549
0
            Box::pin(async move {
550
0
                let timeout = config
551
0
                    .worker_api_endpoint
552
0
                    .timeout
553
0
                    .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
554
0
                let timeout_duration = Duration::from_secs_f32(timeout);
555
0
                let tls_config =
556
0
                    tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
557
0
                        .err_tip(|| "Parsing local worker TLS configuration")?;
558
0
                let endpoint =
559
0
                    tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config)
560
0
                        .map_err(|e| make_input_err!("Invalid URI for worker endpoint : {e:?}"))?
561
0
                        .connect_timeout(timeout_duration)
562
0
                        .timeout(timeout_duration);
563
564
0
                let transport = endpoint.connect().await.map_err(|e| {
565
0
                    make_err!(
566
0
                        Code::Internal,
567
                        "Could not connect to endpoint {}: {e:?}",
568
0
                        config.worker_api_endpoint.uri
569
                    )
570
0
                })?;
571
0
                Ok(WorkerApiClient::new(transport).into())
572
0
            })
573
0
        }),
574
2
        Box::new(move |d| 
Box::pin0
(
sleep0
(
d0
))),
575
    );
576
2
    Ok(local_worker)
577
2
}
578
579
impl<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorker<T, U> {
580
8
    pub fn new_with_connection_factory_and_actions_manager(
581
8
        config: Arc<LocalWorkerConfig>,
582
8
        running_actions_manager: Arc<U>,
583
8
        connection_factory: ConnectionFactory<T>,
584
8
        sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>,
585
8
    ) -> Self {
586
8
        let metrics = Arc::new(Metrics::new(Arc::downgrade(
587
8
            running_actions_manager.metrics(),
588
        )));
589
8
        Self {
590
8
            config,
591
8
            running_actions_manager,
592
8
            connection_factory,
593
8
            sleep_fn: Some(sleep_fn),
594
8
            metrics,
595
8
        }
596
8
    }
597
598
    #[allow(
599
        clippy::missing_const_for_fn,
600
        reason = "False positive on stable, but not on nightly"
601
    )]
602
0
    pub fn name(&self) -> &String {
603
0
        &self.config.name
604
0
    }
605
606
8
    async fn register_worker(
607
8
        &self,
608
8
        client: &mut T,
609
8
    ) -> Result<(String, Streaming<UpdateForWorker>), Error> {
610
8
        let connect_worker_request =
611
8
            make_connect_worker_request(self.config.name.clone(), &self.config.platform_properties)
612
8
                .await
?0
;
613
8
        let 
mut update_for_worker_stream6
= client
614
8
            .connect_worker(connect_worker_request)
615
8
            .await
616
6
            .err_tip(|| "Could not call connect_worker() in worker")
?0
617
6
            .into_inner();
618
619
6
        let 
first_msg_update5
= update_for_worker_stream
620
6
            .next()
621
6
            .await
622
6
            .err_tip(|| "Got EOF expected UpdateForWorker")
?1
623
5
            .err_tip(|| "Got error when receiving UpdateForWorker")
?0
624
            .update;
625
626
5
        let worker_id = match first_msg_update {
627
5
            Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id,
628
0
            other => {
629
0
                return Err(make_input_err!(
630
0
                    "Expected first response from scheduler to be a ConnectResult got : {:?}",
631
0
                    other
632
0
                ));
633
            }
634
        };
635
5
        Ok((worker_id, update_for_worker_stream))
636
6
    }
637
638
    #[instrument(skip(self), level = Level::INFO)]
639
6
    pub async fn run(
640
6
        mut self,
641
6
        mut shutdown_rx: broadcast::Receiver<ShutdownGuard>,
642
6
    ) -> Result<(), Error> {
643
        let sleep_fn = self
644
            .sleep_fn
645
            .take()
646
            .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?;
647
        let sleep_fn_pin = Pin::new(&sleep_fn);
648
2
        let error_handler = Box::pin(move |err| async move {
649
2
            error!(?err, "Error");
650
2
            (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await;
651
4
        });
652
653
        loop {
654
            // First connect to our endpoint.
655
            let mut client = match (self.connection_factory)().await {
656
                Ok(client) => client,
657
                Err(e) => {
658
                    (error_handler)(e).await;
659
                    continue; // Try to connect again.
660
                }
661
            };
662
663
            // Next register our worker with the scheduler.
664
            let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await {
665
                Err(e) => {
666
                    (error_handler)(e).await;
667
                    continue; // Try to connect again.
668
                }
669
                Ok((worker_id, update_for_worker_stream)) => (
670
                    LocalWorkerImpl::new(
671
                        &self.config,
672
                        client,
673
                        worker_id,
674
                        self.running_actions_manager.clone(),
675
                        self.metrics.clone(),
676
                    ),
677
                    update_for_worker_stream,
678
                ),
679
            };
680
            info!(
681
                worker_id = %inner.worker_id,
682
                "Worker registered with scheduler"
683
            );
684
685
            // Now listen for connections and run all other services.
686
            if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await {
687
                'no_more_actions: {
688
                    // Ensure there are no actions in transit before we try to kill
689
                    // all our actions.
690
                    const ITERATIONS: usize = 1_000;
691
692
                    const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler";
693
694
                    let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
695
                    for _ in 0..ITERATIONS {
696
                        if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
697
                            break 'no_more_actions;
698
                        }
699
                        (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
700
                    }
701
                    error!(ERROR_MSG);
702
                    return Err(err.append(ERROR_MSG));
703
                }
704
                error!(?err, "Worker disconnected from scheduler");
705
                // Kill off any existing actions because if we re-connect, we'll
706
                // get some more and it might resource lock us.
707
                self.running_actions_manager.kill_all().await;
708
709
                (error_handler)(err).await; // Try to connect again.
710
            }
711
        }
712
        // Unreachable.
713
0
    }
714
}
715
716
#[derive(Debug, MetricsComponent)]
717
pub struct Metrics {
718
    #[metric(
719
        help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it."
720
    )]
721
    start_actions_received: CounterWithTime,
722
    #[metric(help = "Total number of disconnects received from the scheduler.")]
723
    disconnects_received: CounterWithTime,
724
    #[metric(help = "Total number of keep-alives received from the scheduler.")]
725
    keep_alives_received: CounterWithTime,
726
    #[metric(
727
        help = "Stats about the calls to check if an action satisfies the config supplied script."
728
    )]
729
    preconditions: AsyncCounterWrapper,
730
    #[metric]
731
    #[allow(
732
        clippy::struct_field_names,
733
        reason = "TODO Fix this. Triggers on nightly"
734
    )]
735
    running_actions_manager_metrics: Weak<RunningActionManagerMetrics>,
736
}
737
738
impl RootMetricsComponent for Metrics {}
739
740
impl Metrics {
741
8
    fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self {
742
8
        Self {
743
8
            start_actions_received: CounterWithTime::default(),
744
8
            disconnects_received: CounterWithTime::default(),
745
8
            keep_alives_received: CounterWithTime::default(),
746
8
            preconditions: AsyncCounterWrapper::default(),
747
8
            running_actions_manager_metrics,
748
8
        }
749
8
    }
750
}
751
752
impl Metrics {
753
4
    async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>(
754
4
        self: Arc<Self>,
755
4
        fut: F,
756
4
    ) -> U {
757
4
        fut(self).await
758
3
    }
759
}