Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/local_worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::BuildHasher;
16
use core::pin::Pin;
17
use core::str;
18
use core::sync::atomic::{AtomicU64, Ordering};
19
use core::time::Duration;
20
use std::borrow::Cow;
21
use std::collections::HashMap;
22
use std::env;
23
use std::process::Stdio;
24
use std::sync::{Arc, Weak};
25
26
use futures::future::BoxFuture;
27
use futures::stream::FuturesUnordered;
28
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
29
use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig};
30
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
31
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
32
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
33
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
34
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
35
    ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
36
    execute_result,
37
};
38
use nativelink_store::fast_slow_store::FastSlowStore;
39
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
40
use nativelink_util::common::fs;
41
use nativelink_util::digest_hasher::DigestHasherFunc;
42
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
43
use nativelink_util::shutdown_guard::ShutdownGuard;
44
use nativelink_util::store_trait::Store;
45
use nativelink_util::{spawn, tls_utils};
46
use opentelemetry::context::Context;
47
use tokio::sync::{broadcast, mpsc};
48
use tokio::{process, time};
49
use tokio_stream::wrappers::UnboundedReceiverStream;
50
use tonic::Streaming;
51
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
52
53
use crate::running_actions_manager::{
54
    ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
55
    RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
56
};
57
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
58
use crate::worker_utils::make_connect_worker_request;
59
60
/// Amount of time to wait if we have actions in transit before we try to
61
/// consider an error to have occurred.
62
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;
63
64
/// If we lose connection to the worker api server we will wait this many seconds
65
/// before trying to connect.
66
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
67
68
/// Default endpoint timeout. If this value gets modified the documentation in
69
/// `cas_server.rs` must also be updated.
70
const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.;
71
72
/// Default maximum amount of time a task is allowed to run for.
73
/// If this value gets modified the documentation in `cas_server.rs` must also be updated.
74
const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_mins(20);
75
const DEFAULT_MAX_UPLOAD_TIMEOUT: Duration = Duration::from_mins(10);
76
77
struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
78
    config: &'a LocalWorkerConfig,
79
    // According to the tonic documentation it is a cheap operation to clone this.
80
    grpc_client: T,
81
    worker_id: String,
82
    running_actions_manager: Arc<U>,
83
    // Number of actions that have been received in `Update::StartAction`, but
84
    // not yet processed by running_actions_manager's spawn. This number should
85
    // always be zero if there are no actions running and no actions being waited
86
    // on by the scheduler.
87
    actions_in_transit: Arc<AtomicU64>,
88
    metrics: Arc<Metrics>,
89
}
90
91
7
pub async fn preconditions_met<H: BuildHasher + Sync>(
92
7
    precondition_script: Option<String>,
93
7
    extra_envs: &HashMap<String, String, H>,
94
7
) -> Result<(), Error> {
95
7
    let Some(
precondition_script2
) = &precondition_script else {
96
        // No script means we are always ok to proceed.
97
5
        return Ok(());
98
    };
99
    // TODO: Might want to pass some information about the command to the
100
    //       script, but at this point it's not even been downloaded yet,
101
    //       so that's not currently possible.  Perhaps we'll move this in
102
    //       future to pass useful information through?  Or perhaps we'll
103
    //       have a pre-condition and a pre-execute script instead, although
104
    //       arguably entrypoint already gives us that.
105
106
2
    let maybe_split_cmd = shlex::split(precondition_script);
107
2
    let (command, args) = match &maybe_split_cmd {
108
2
        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
109
        None => {
110
0
            return Err(make_input_err!(
111
0
                "Could not parse the value of precondition_script: '{}'",
112
0
                precondition_script,
113
0
            ));
114
        }
115
    };
116
117
2
    let precondition_process = process::Command::new(command)
118
2
        .args(args)
119
2
        .kill_on_drop(true)
120
2
        .stdin(Stdio::null())
121
2
        .stdout(Stdio::piped())
122
2
        .stderr(Stdio::null())
123
2
        .env_clear()
124
2
        .envs(extra_envs)
125
2
        .spawn()
126
2
        .err_tip(|| 
format!0
("Could not execute precondition command {precondition_script:?}"))
?0
;
127
2
    let output = precondition_process.wait_with_output().await
?0
;
128
2
    let stdout = str::from_utf8(&output.stdout).unwrap_or("");
129
2
    trace!(status = %output.status, %stdout, "Preconditions script returned");
130
2
    if output.status.code() == Some(0) {
131
1
        Ok(())
132
    } else {
133
1
        Err(make_err!(
134
1
            Code::ResourceExhausted,
135
1
            "Preconditions script returned status {} - {}",
136
1
            output.status,
137
1
            stdout
138
1
        ))
139
    }
140
7
}
141
142
impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
143
8
    fn new(
144
8
        config: &'a LocalWorkerConfig,
145
8
        grpc_client: T,
146
8
        worker_id: String,
147
8
        running_actions_manager: Arc<U>,
148
8
        metrics: Arc<Metrics>,
149
8
    ) -> Self {
150
8
        Self {
151
8
            config,
152
8
            grpc_client,
153
8
            worker_id,
154
8
            running_actions_manager,
155
8
            // Number of actions that have been received in `Update::StartAction`, but
156
8
            // not yet processed by running_actions_manager's spawn. This number should
157
8
            // always be zero if there are no actions running and no actions being waited
158
8
            // on by the scheduler.
159
8
            actions_in_transit: Arc::new(AtomicU64::new(0)),
160
8
            metrics,
161
8
        }
162
8
    }
163
164
    /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`.
165
8
    async fn start_keep_alive(&self) -> Result<(), Error> 
{7
166
        // According to tonic's documentation this call should be cheap and is the same stream.
167
7
        let mut grpc_client = self.grpc_client.clone();
168
7
        let timeout = self
169
7
            .config
170
7
            .worker_api_endpoint
171
7
            .timeout
172
7
            .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
173
174
7
        info!(timeout, "Started KeepAlive");
175
176
        // We always send 2 keep alive requests per timeout. Http2 should manage most of our
177
        // timeout issues, this is a secondary check to ensure we can still send data.
178
7
        let mut interval = time::interval(Duration::from_secs_f32(timeout) / 2);
179
7
        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
180
181
        // Skip the first interval as it happens immediately and we don't need a keep alive until timeout/2 has passed
182
7
        interval.tick().await;
183
184
        // Explicitly spawn the keep alive loop so it goes onto a different thread from the execute commands
185
1
        drop(
186
4
            spawn!("keep alives", async move {
187
                loop {
188
5
                    interval.tick().await;
189
2
                    if let Err(
e1
) = grpc_client.keep_alive(KeepAliveRequest {}).await {
190
1
                        error!(?e, "Failed to send KeepAlive in LocalWorker");
191
1
                        return;
192
1
                    }
193
1
                    debug!("Sent KeepAlive");
194
                }
195
1
            })
196
4
            .await,
197
        );
198
1
        Ok(())
199
1
    }
200
201
8
    async fn run(
202
8
        &self,
203
8
        update_for_worker_stream: Streaming<UpdateForWorker>,
204
8
        shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>,
205
8
    ) -> Result<(), Error> {
206
        // This big block of logic is designed to help simplify upstream components. Upstream
207
        // components can write standard futures that return a `Result<(), Error>` and this block
208
        // will forward the error up to the client and disconnect from the scheduler.
209
        // It is a common use case that an item sent through update_for_worker_stream will always
210
        // have a response but the response will be triggered through a callback to the scheduler.
211
        // This can be quite tricky to manage, so what we have done here is given access to a
212
        // `futures` variable which because this is in a single thread as well as a channel that you
213
        // send a future into that makes it into the `futures` variable.
214
        // This means that if you want to perform an action based on the result of the future
215
        // you use the `.map()` method and the new action will always come to live in this spawn,
216
        // giving mutable access to stuff in this struct.
217
        // NOTE: If you ever return from this function it will disconnect from the scheduler.
218
8
        let mut futures = FuturesUnordered::new();
219
8
        futures.push(self.start_keep_alive().boxed());
220
221
8
        let (add_future_channel, add_future_rx) = mpsc::unbounded_channel();
222
8
        let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse();
223
224
8
        let mut update_for_worker_stream = update_for_worker_stream.fuse();
225
        // A notify which is triggered every time actions_in_flight is subtracted.
226
8
        let actions_notify = Arc::new(tokio::sync::Notify::new());
227
        // A counter of actions that are in-flight, this is similar to actions_in_transit but
228
        // includes the AC upload and notification to the scheduler.
229
8
        let actions_in_flight = Arc::new(AtomicU64::new(0));
230
        // Set to true when shutting down, this stops any new StartAction.
231
8
        let mut shutting_down = false;
232
233
        loop {
234
26
            select! {
235
26
                
maybe_update8
= update_for_worker_stream.next() => if
!shutting_down8
||
maybe_update0
.
is_some0
() {
236
8
                    match maybe_update
237
8
                        .err_tip(|| "UpdateForWorker stream closed early")
?1
238
7
                        .err_tip(|| "Got error in UpdateForWorker stream")
?0
239
                        .update
240
7
                        .err_tip(|| "Expected update to exist in UpdateForWorker")
?0
241
                    {
242
                        Update::ConnectionResult(_) => {
243
0
                            return Err(make_input_err!(
244
0
                                "Got ConnectionResult in LocalWorker::run which should never happen"
245
0
                            ));
246
                        }
247
                        // TODO(palfrey) We should possibly do something with this notification.
248
0
                        Update::Disconnect(()) => {
249
0
                            self.metrics.disconnects_received.inc();
250
0
                        }
251
0
                        Update::KeepAlive(()) => {
252
0
                            self.metrics.keep_alives_received.inc();
253
0
                        }
254
1
                        Update::KillOperationRequest(kill_operation_request) => {
255
1
                            let operation_id = OperationId::from(kill_operation_request.operation_id);
256
1
                            if let Err(
err0
) = self.running_actions_manager.kill_operation(&operation_id).await {
257
0
                                error!(
258
                                    %operation_id,
259
                                    ?err,
260
                                    "Failed to send kill request for operation"
261
                                );
262
1
                            }
263
                        }
264
6
                        Update::StartAction(start_execute) => {
265
                            // Don't accept any new requests if we're shutting down.
266
6
                            if shutting_down {
267
0
                                if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) {
268
0
                                    self.grpc_client.clone().execution_response(
269
0
                                        ExecuteResult{
270
0
                                            instance_name,
271
0
                                            operation_id: start_execute.operation_id,
272
0
                                            result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())),
273
0
                                        }
274
0
                                    ).await?;
275
0
                                }
276
0
                                continue;
277
6
                            }
278
279
6
                            self.metrics.start_actions_received.inc();
280
281
6
                            let execute_request = start_execute.execute_request.as_ref();
282
6
                            let operation_id = start_execute.operation_id.clone();
283
6
                            let operation_id_to_log = operation_id.clone();
284
6
                            let maybe_instance_name = execute_request.map(|v| v.instance_name.clone());
285
6
                            let action_digest = execute_request.and_then(|v| v.action_digest.clone());
286
6
                            let digest_hasher = execute_request
287
6
                                .ok_or_else(|| 
make_input_err!0
("Expected execute_request to be set"))
288
6
                                .and_then(|v| DigestHasherFunc::try_from(v.digest_function))
289
6
                                .err_tip(|| "In LocalWorkerImpl::new()")
?0
;
290
291
6
                            let start_action_fut = {
292
6
                                let precondition_script_cfg = self.config.experimental_precondition_script.clone();
293
6
                                let mut extra_envs: HashMap<String, String> = HashMap::new();
294
6
                                if let Some(
ref additional_environment0
) = self.config.additional_environment {
295
0
                                    for (name, source) in additional_environment {
296
0
                                        let value = match source {
297
0
                                            EnvironmentSource::Property(property) => start_execute
298
0
                                                .platform.as_ref().and_then(|p|p.properties.iter().find(|pr| &pr.name == property))
299
0
                                                .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.value.as_str())),
300
0
                                            EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
301
0
                                            EnvironmentSource::FromEnvironment => Cow::Owned(env::var(name).unwrap_or_default()),
302
0
                                            other => {
303
0
                                                debug!(?other, "Worker doesn't support this type of additional environment");
304
0
                                                continue;
305
                                            }
306
                                        };
307
0
                                        extra_envs.insert(name.clone(), value.into_owned());
308
                                    }
309
6
                                }
310
6
                                let actions_in_transit = self.actions_in_transit.clone();
311
6
                                let worker_id = self.worker_id.clone();
312
6
                                let running_actions_manager = self.running_actions_manager.clone();
313
6
                                let mut grpc_client = self.grpc_client.clone();
314
6
                                let complete = ExecuteComplete {
315
6
                                    operation_id: operation_id.clone(),
316
6
                                };
317
6
                                self.metrics.clone().wrap(move |metrics| async move {
318
6
                                    metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs))
319
6
                                    .and_then(|()| 
running_actions_manager5
.
create_and_add_action5
(
worker_id5
,
start_execute5
))
320
6
                                    .map(move |r| {
321
                                        // Now that we either failed or registered our action, we can
322
                                        // consider the action to no longer be in transit.
323
6
                                        actions_in_transit.fetch_sub(1, Ordering::Release);
324
6
                                        r
325
6
                                    })
326
6
                                    .and_then(|action| 
{5
327
5
                                        debug!(
328
5
                                            operation_id = %action.get_operation_id(),
329
                                            "Received request to run action"
330
                                        );
331
5
                                        action
332
5
                                            .clone()
333
5
                                            .prepare_action()
334
5
                                            .and_then(RunningAction::execute)
335
5
                                            .and_then(|result| async move 
{2
336
                                                // Notify that execution has completed so it can schedule a new action.
337
2
                                                drop(grpc_client.execution_complete(complete).await);
338
2
                                                Ok(result)
339
4
                                            })
340
5
                                            .and_then(RunningAction::upload_results)
341
5
                                            .and_then(RunningAction::get_finished_result)
342
                                            // Note: We need ensure we run cleanup even if one of the other steps fail.
343
5
                                            .then(|result| async move 
{4
344
4
                                                if let Err(
e0
) = action.cleanup().await {
345
0
                                                    return Result::<ActionResult, Error>::Err(e).merge(result);
346
4
                                                }
347
4
                                                result
348
8
                                            })
349
6
                                    
}5
).await
350
11
                                })
351
                            };
352
353
6
                            let make_publish_future = {
354
6
                                let mut grpc_client = self.grpc_client.clone();
355
356
6
                                let running_actions_manager = self.running_actions_manager.clone();
357
5
                                move |res: Result<ActionResult, Error>| async move {
358
5
                                    let instance_name = maybe_instance_name
359
5
                                        .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")
?0
;
360
5
                                    match res {
361
2
                                        Ok(mut action_result) => {
362
                                            // Save in the action cache before notifying the scheduler that we've completed.
363
2
                                            if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) &&
364
2
                                                let Err(
err0
) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await {
365
0
                                                    error!(
366
                                                        ?err,
367
                                                        ?action_digest,
368
                                                        "Error saving action in store",
369
                                                    );
370
2
                                                }
371
2
                                            let action_stage = ActionStage::Completed(action_result);
372
2
                                            grpc_client.execution_response(
373
2
                                                ExecuteResult{
374
2
                                                    instance_name,
375
2
                                                    operation_id,
376
2
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
377
2
                                                }
378
2
                                            )
379
2
                                            .await
380
0
                                            .err_tip(|| "Error while calling execution_response")?;
381
                                        },
382
3
                                        Err(e) => {
383
3
                                            let is_cas_blob_missing = e.code == Code::NotFound
384
2
                                                && e.message_string().contains("not found in either fast or slow store");
385
3
                                            if is_cas_blob_missing {
386
1
                                                warn!(
387
                                                    ?e,
388
                                                    "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"
389
                                                );
390
1
                                                let action_result = ActionResult {
391
1
                                                    error: Some(make_err!(
392
1
                                                        Code::FailedPrecondition,
393
1
                                                        "{}",
394
1
                                                        e.message_string()
395
1
                                                    )),
396
1
                                                    ..ActionResult::default()
397
1
                                                };
398
1
                                                let action_stage = ActionStage::Completed(action_result);
399
1
                                                grpc_client.execution_response(ExecuteResult{
400
1
                                                    instance_name,
401
1
                                                    operation_id,
402
1
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
403
1
                                                }).await.
err_tip0
(|| "Error calling execution_response with missing inputs")
?0
;
404
                                            } else {
405
2
                                                grpc_client.execution_response(ExecuteResult{
406
2
                                                    instance_name,
407
2
                                                    operation_id,
408
2
                                                    result: Some(execute_result::Result::InternalError(e.into())),
409
2
                                                }).await.
err_tip0
(|| "Error calling execution_response with error")
?0
;
410
                                            }
411
                                        },
412
                                    }
413
0
                                    Ok(())
414
5
                                }
415
                            };
416
417
6
                            self.actions_in_transit.fetch_add(1, Ordering::Release);
418
419
6
                            let add_future_channel = add_future_channel.clone();
420
421
6
                            info_span!(
422
                                "worker_start_action_ctx",
423
                                operation_id = operation_id_to_log,
424
6
                                digest_function = %digest_hasher.to_string(),
425
6
                            ).in_scope(|| {
426
6
                                let _guard = Context::current_with_value(digest_hasher)
427
6
                                    .attach();
428
429
6
                                let actions_in_flight = actions_in_flight.clone();
430
6
                                let actions_notify = actions_notify.clone();
431
6
                                let actions_in_flight_fail = actions_in_flight.clone();
432
6
                                let actions_notify_fail = actions_notify.clone();
433
6
                                actions_in_flight.fetch_add(1, Ordering::Release);
434
435
6
                                futures.push(
436
6
                                    spawn!("worker_start_action", start_action_fut).map(move |res| 
{5
437
5
                                        let res = res.err_tip(|| "Failed to launch spawn")
?0
;
438
5
                                        if let Err(
err3
) = &res {
439
3
                                            error!(?err, "Error executing action");
440
2
                                        }
441
5
                                        add_future_channel
442
5
                                            .send(make_publish_future(res).then(move |res| 
{0
443
0
                                                actions_in_flight.fetch_sub(1, Ordering::Release);
444
0
                                                actions_notify.notify_one();
445
0
                                                core::future::ready(res)
446
5
                                            
}0
).boxed())
447
5
                                            .map_err(|err|
448
0
                                                Error::from_std_err(Code::Internal, &err).append("LocalWorker could not send future")
449
0
                                                )?;
450
5
                                        Ok(())
451
5
                                    })
452
6
                                    .or_else(move |err| 
{0
453
                                        // If the make_publish_future is not run we still need to notify.
454
0
                                        actions_in_flight_fail.fetch_sub(1, Ordering::Release);
455
0
                                        actions_notify_fail.notify_one();
456
0
                                        core::future::ready(Err(err))
457
0
                                    })
458
6
                                    .boxed()
459
                                );
460
6
                            });
461
                        }
462
                    }
463
0
                },
464
26
                
res5
= add_future_rx.next() => {
465
5
                    let fut = res.err_tip(|| "New future stream receives should never be closed")
?0
;
466
5
                    futures.push(fut);
467
                },
468
26
                
res7
= futures.next() =>
res7
.
err_tip7
(|| "Keep-alive should always pending. Likely unable to send data to scheduler")
?1
?0
,
469
26
                
complete_msg0
= shutdown_rx.recv().fuse() => {
470
0
                    warn!("Worker loop received shutdown signal. Shutting down worker...",);
471
0
                    let mut grpc_client = self.grpc_client.clone();
472
0
                    let shutdown_guard = complete_msg.map_err(|e|
473
0
                        Error::from_std_err(Code::Internal, &e).append("Failed to receive shutdown message"))?;
474
0
                    let actions_in_flight = actions_in_flight.clone();
475
0
                    let actions_notify = actions_notify.clone();
476
0
                    let shutdown_future = async move {
477
                        // Wait for in-flight operations to be fully completed.
478
0
                        while actions_in_flight.load(Ordering::Acquire) > 0 {
479
0
                            actions_notify.notified().await;
480
                        }
481
                        // Sending this message immediately evicts all jobs from
482
                        // this worker, of which there should be none.
483
0
                        if let Err(e) = grpc_client.going_away(GoingAwayRequest {}).await {
484
0
                            error!("Failed to send GoingAwayRequest: {e}",);
485
0
                            return Err(e);
486
0
                        }
487
                        // Allow shutdown to occur now.
488
0
                        drop(shutdown_guard);
489
0
                        Ok::<(), Error>(())
490
0
                    };
491
0
                    futures.push(shutdown_future.boxed());
492
0
                    shutting_down = true;
493
                },
494
            };
495
        }
496
        // Unreachable.
497
2
    }
498
}
499
500
type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>;
501
502
pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
503
    config: Arc<LocalWorkerConfig>,
504
    running_actions_manager: Arc<U>,
505
    connection_factory: ConnectionFactory<T>,
506
    sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
507
    metrics: Arc<Metrics>,
508
}
509
510
impl<
511
    T: WorkerApiClientTrait + core::fmt::Debug + 'static,
512
    U: RunningActionsManager + core::fmt::Debug,
513
> core::fmt::Debug for LocalWorker<T, U>
514
{
515
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
516
0
        f.debug_struct("LocalWorker")
517
0
            .field("config", &self.config)
518
0
            .field("running_actions_manager", &self.running_actions_manager)
519
0
            .field("metrics", &self.metrics)
520
0
            .finish_non_exhaustive()
521
0
    }
522
}
523
524
/// Creates a new `LocalWorker`. The `cas_store` must be an instance of
525
/// `FastSlowStore` and will be checked at runtime.
526
0
pub async fn new_local_worker(
527
0
    config: Arc<LocalWorkerConfig>,
528
0
    cas_store: Store,
529
0
    ac_store: Option<Store>,
530
0
    historical_store: Store,
531
2
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
532
2
    let fast_slow_store = cas_store
533
2
        .downcast_ref::<FastSlowStore>(None)
534
2
        .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")
?0
535
2
        .get_arc()
536
2
        .err_tip(|| "FastSlowStore's Arc doesn't exist")
?0
;
537
538
    // Log warning about CAS configuration for multi-worker setups
539
2
    event!(
540
2
        Level::INFO,
541
2
        worker_name = %config.name,
542
        "Starting worker '{}'. IMPORTANT: If running multiple workers, all workers \
543
        must share the same CAS storage path to avoid 'Object not found' errors.",
544
2
        config.name
545
    );
546
547
2
    if let Ok(
path1
) = fs::canonicalize(&config.work_directory).await {
548
1
        fs::remove_dir_all(&path).await.err_tip(|| 
{0
549
0
            format!(
550
                "Could not remove work_directory '{}' in LocalWorker",
551
0
                &path.as_path().to_str().unwrap_or("bad path")
552
            )
553
0
        })?;
554
1
    }
555
556
2
    fs::create_dir_all(&config.work_directory)
557
2
        .await
558
2
        .err_tip(|| 
format!0
("Could not make work_directory : {}",
config.work_directory0
))
?0
;
559
2
    let entrypoint = if config.entrypoint.is_empty() {
560
2
        None
561
    } else {
562
0
        Some(config.entrypoint.clone())
563
    };
564
2
    let max_action_timeout = if config.max_action_timeout == 0 {
565
2
        DEFAULT_MAX_ACTION_TIMEOUT
566
    } else {
567
0
        Duration::from_secs(config.max_action_timeout as u64)
568
    };
569
2
    let max_upload_timeout = if config.max_upload_timeout == 0 {
570
2
        DEFAULT_MAX_UPLOAD_TIMEOUT
571
    } else {
572
0
        Duration::from_secs(config.max_upload_timeout as u64)
573
    };
574
575
    // Initialize directory cache if configured
576
2
    let directory_cache = if let Some(
cache_config0
) = &config.directory_cache {
577
        use std::path::PathBuf;
578
579
        use crate::directory_cache::{
580
            DirectoryCache, DirectoryCacheConfig as WorkerDirCacheConfig,
581
        };
582
583
0
        let cache_root = if cache_config.cache_root.is_empty() {
584
0
            PathBuf::from(&config.work_directory).parent().map_or_else(
585
0
                || PathBuf::from("/tmp/nativelink_directory_cache"),
586
0
                |p| p.join("directory_cache"),
587
            )
588
        } else {
589
0
            PathBuf::from(&cache_config.cache_root)
590
        };
591
592
0
        let worker_cache_config = WorkerDirCacheConfig {
593
0
            max_entries: cache_config.max_entries,
594
0
            max_size_bytes: cache_config.max_size_bytes,
595
0
            cache_root,
596
0
        };
597
598
0
        match DirectoryCache::new(worker_cache_config, Store::new(fast_slow_store.clone())).await {
599
0
            Ok(cache) => {
600
0
                tracing::info!("Directory cache initialized successfully");
601
0
                Some(Arc::new(cache))
602
            }
603
0
            Err(e) => {
604
0
                tracing::warn!("Failed to initialize directory cache: {:?}", e);
605
0
                None
606
            }
607
        }
608
    } else {
609
2
        None
610
    };
611
612
    #[cfg(target_os = "linux")]
613
2
    let use_namespaces = if let Some(
use_namespaces0
) = &config.use_namespaces {
614
0
        if *use_namespaces
615
0
            && !crate::namespace_utils::namespaces_supported(
616
0
                config.use_mount_namespace.unwrap_or_default(),
617
0
            )
618
        {
619
0
            return Err(make_err!(Code::Unavailable, "Namespaces not supported"));
620
0
        }
621
0
        if !*use_namespaces {
622
0
            crate::running_actions_manager::UseNamespaces::No
623
0
        } else if config.use_mount_namespace.unwrap_or_default() {
624
0
            crate::running_actions_manager::UseNamespaces::YesAndMount
625
        } else {
626
0
            crate::running_actions_manager::UseNamespaces::Yes
627
        }
628
2
    } else if config
629
2
        .use_mount_namespace
630
2
        .is_some_and(core::convert::identity)
631
    {
632
0
        return Err(make_err!(
633
0
            Code::Unavailable,
634
0
            "Mount namespaces not supported"
635
0
        ));
636
    } else {
637
2
        crate::running_actions_manager::UseNamespaces::No
638
    };
639
640
    #[cfg(not(target_os = "linux"))]
641
    if config.use_namespaces.is_some_and(core::convert::identity) {
642
        return Err(make_err!(
643
            Code::Unavailable,
644
            "Namespaces not supported on non-Linux OSes"
645
        ));
646
    }
647
    #[cfg(not(target_os = "linux"))]
648
    if config
649
        .use_mount_namespace
650
        .is_some_and(core::convert::identity)
651
    {
652
        return Err(make_err!(
653
            Code::Unavailable,
654
            "Mount namespaces not supported on non-Linux OSes"
655
        ));
656
    }
657
658
2
    let running_actions_manager =
659
2
        Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
660
2
            root_action_directory: config.work_directory.clone(),
661
2
            execution_configuration: ExecutionConfiguration {
662
2
                entrypoint,
663
2
                additional_environment: config.additional_environment.clone(),
664
2
            },
665
2
            cas_store: fast_slow_store,
666
2
            ac_store,
667
2
            historical_store,
668
2
            upload_action_result_config: &config.upload_action_result,
669
2
            max_action_timeout,
670
2
            max_upload_timeout,
671
2
            timeout_handled_externally: config.timeout_handled_externally,
672
2
            directory_cache,
673
2
            #[cfg(target_os = "linux")]
674
2
            use_namespaces,
675
2
        })
?0
);
676
2
    let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager(
677
2
        config.clone(),
678
2
        running_actions_manager,
679
2
        Box::new(move || 
{0
680
0
            let config = config.clone();
681
0
            Box::pin(async move {
682
0
                let timeout = config
683
0
                    .worker_api_endpoint
684
0
                    .timeout
685
0
                    .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
686
0
                let timeout_duration = Duration::from_secs_f32(timeout);
687
0
                let tls_config =
688
0
                    tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
689
0
                        .err_tip(|| "Parsing local worker TLS configuration")?;
690
0
                let endpoint =
691
0
                    tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config)
692
0
                        .map_err(|e| {
693
0
                            Error::from_std_err(Code::InvalidArgument, &e)
694
0
                                .append("Invalid URI for worker endpoint")
695
0
                        })?
696
0
                        .connect_timeout(timeout_duration)
697
0
                        .timeout(timeout_duration);
698
699
0
                let transport = endpoint.connect().await.map_err(|e| {
700
0
                    Error::from_std_err(Code::Internal, &e).append(format!(
701
                        "Could not connect to endpoint {}",
702
0
                        config.worker_api_endpoint.uri
703
                    ))
704
0
                })?;
705
0
                Ok(WorkerApiClient::new(transport).into())
706
0
            })
707
0
        }),
708
2
        Box::new(move |d| 
Box::pin0
(
time::sleep0
(
d0
))),
709
    );
710
2
    Ok(local_worker)
711
2
}
712
713
impl<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorker<T, U> {
714
11
    pub fn new_with_connection_factory_and_actions_manager(
715
11
        config: Arc<LocalWorkerConfig>,
716
11
        running_actions_manager: Arc<U>,
717
11
        connection_factory: ConnectionFactory<T>,
718
11
        sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>,
719
11
    ) -> Self {
720
11
        let metrics = Arc::new(Metrics::new(Arc::downgrade(
721
11
            running_actions_manager.metrics(),
722
        )));
723
11
        Self {
724
11
            config,
725
11
            running_actions_manager,
726
11
            connection_factory,
727
11
            sleep_fn: Some(sleep_fn),
728
11
            metrics,
729
11
        }
730
11
    }
731
732
    #[allow(
733
        clippy::missing_const_for_fn,
734
        reason = "False positive on stable, but not on nightly"
735
    )]
736
0
    pub fn name(&self) -> &String {
737
0
        &self.config.name
738
0
    }
739
740
12
    async fn register_worker(
741
12
        &self,
742
12
        client: &mut T,
743
12
    ) -> Result<(String, Streaming<UpdateForWorker>), Error> {
744
12
        let mut extra_envs: HashMap<String, String> = HashMap::new();
745
12
        if let Some(
ref additional_environment0
) = self.config.additional_environment {
746
0
            for (name, source) in additional_environment {
747
0
                let value = match source {
748
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
749
                    EnvironmentSource::FromEnvironment => {
750
0
                        Cow::Owned(env::var(name).unwrap_or_default())
751
                    }
752
0
                    other => {
753
0
                        debug!(
754
                            ?other,
755
                            "Worker registration doesn't support this type of additional environment"
756
                        );
757
0
                        continue;
758
                    }
759
                };
760
0
                extra_envs.insert(name.clone(), value.into_owned());
761
            }
762
12
        }
763
764
12
        let connect_worker_request = make_connect_worker_request(
765
12
            self.config.name.clone(),
766
12
            &self.config.platform_properties,
767
12
            &extra_envs,
768
12
            self.config.max_inflight_tasks,
769
12
        )
770
12
        .await
?0
;
771
12
        let 
mut update_for_worker_stream9
= client
772
12
            .connect_worker(connect_worker_request)
773
12
            .await
774
9
            .err_tip(|| "Could not call connect_worker() in worker")
?0
775
9
            .into_inner();
776
777
9
        let 
first_msg_update8
= update_for_worker_stream
778
9
            .next()
779
9
            .await
780
9
            .err_tip(|| "Got EOF expected UpdateForWorker")
?1
781
8
            .err_tip(|| "Got error when receiving UpdateForWorker")
?0
782
            .update;
783
784
8
        let worker_id = match first_msg_update {
785
8
            Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id,
786
0
            other => {
787
0
                return Err(make_input_err!(
788
0
                    "Expected first response from scheduler to be a ConnectionResult got : {:?}",
789
0
                    other
790
0
                ));
791
            }
792
        };
793
8
        Ok((worker_id, update_for_worker_stream))
794
9
    }
795
796
    #[instrument(skip(self), level = Level::INFO)]
797
9
    pub async fn run(
798
9
        mut self,
799
9
        mut shutdown_rx: broadcast::Receiver<ShutdownGuard>,
800
9
    ) -> Result<(), Error> {
801
        // Belt-and-suspenders QoS bump: the main binary already calls
802
        // this before runtime creation so the tokio worker threads
803
        // inherit P-core preference via pthread QoS inheritance, but
804
        // any thread that reaches this point should also be tagged in
805
        // case it was spawned by a path that bypassed `on_thread_start`.
806
        // No-op on non-macOS.
807
        let _ = crate::qos::set_user_initiated();
808
809
        let sleep_fn = self
810
            .sleep_fn
811
            .take()
812
            .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?;
813
        let sleep_fn_pin = Pin::new(&sleep_fn);
814
3
        let error_handler = Box::pin(move |err| async move {
815
3
            error!(?err, "Error");
816
3
            (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await;
817
6
        });
818
819
        loop {
820
            // First connect to our endpoint.
821
            let mut client = match (self.connection_factory)().await {
822
                Ok(client) => client,
823
                Err(e) => {
824
                    (error_handler)(e).await;
825
                    continue; // Try to connect again.
826
                }
827
            };
828
829
            debug!("Connected to endpoint");
830
831
            // Next register our worker with the scheduler.
832
            let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await {
833
                Err(e) => {
834
                    (error_handler)(e).await;
835
                    continue; // Try to connect again.
836
                }
837
                Ok((worker_id, update_for_worker_stream)) => (
838
                    LocalWorkerImpl::new(
839
                        &self.config,
840
                        client,
841
                        worker_id,
842
                        self.running_actions_manager.clone(),
843
                        self.metrics.clone(),
844
                    ),
845
                    update_for_worker_stream,
846
                ),
847
            };
848
            info!(
849
                worker_id = %inner.worker_id,
850
                "Worker registered with scheduler"
851
            );
852
853
            // Now listen for connections and run all other services.
854
            if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await {
855
                'no_more_actions: {
856
                    // Ensure there are no actions in transit before we try to kill
857
                    // all our actions.
858
                    const ITERATIONS: usize = 1_000;
859
860
                    const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler";
861
862
                    let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
863
                    for _ in 0..ITERATIONS {
864
                        if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
865
                            break 'no_more_actions;
866
                        }
867
                        (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
868
                    }
869
                    error!(ERROR_MSG);
870
                    return Err(err.append(ERROR_MSG));
871
                }
872
                error!(?err, "Worker disconnected from scheduler");
873
                // Kill off any existing actions because if we re-connect, we'll
874
                // get some more and it might resource lock us.
875
                self.running_actions_manager.kill_all().await;
876
877
                (error_handler)(err).await; // Try to connect again.
878
            }
879
        }
880
        // Unreachable.
881
0
    }
882
}
883
884
#[derive(Debug, MetricsComponent)]
885
pub struct Metrics {
886
    #[metric(
887
        help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it."
888
    )]
889
    start_actions_received: CounterWithTime,
890
    #[metric(help = "Total number of disconnects received from the scheduler.")]
891
    disconnects_received: CounterWithTime,
892
    #[metric(help = "Total number of keep-alives received from the scheduler.")]
893
    keep_alives_received: CounterWithTime,
894
    #[metric(
895
        help = "Stats about the calls to check if an action satisfies the config supplied script."
896
    )]
897
    preconditions: AsyncCounterWrapper,
898
    #[metric]
899
    #[allow(
900
        clippy::struct_field_names,
901
        reason = "TODO Fix this. Triggers on nightly"
902
    )]
903
    running_actions_manager_metrics: Weak<RunningActionManagerMetrics>,
904
}
905
906
impl RootMetricsComponent for Metrics {}
907
908
impl Metrics {
909
11
    fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self {
910
11
        Self {
911
11
            start_actions_received: CounterWithTime::default(),
912
11
            disconnects_received: CounterWithTime::default(),
913
11
            keep_alives_received: CounterWithTime::default(),
914
11
            preconditions: AsyncCounterWrapper::default(),
915
11
            running_actions_manager_metrics,
916
11
        }
917
11
    }
918
}
919
920
impl Metrics {
921
6
    async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>(
922
6
        self: Arc<Self>,
923
6
        fut: F,
924
6
    ) -> U {
925
6
        fut(self).await
926
5
    }
927
}