Coverage Report

Created: 2026-06-07 03:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/local_worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::BuildHasher;
16
use core::pin::Pin;
17
use core::str;
18
use core::sync::atomic::{AtomicU64, Ordering};
19
use core::time::Duration;
20
use std::borrow::Cow;
21
use std::collections::HashMap;
22
use std::env;
23
use std::process::Stdio;
24
use std::sync::{Arc, Weak};
25
26
use futures::future::BoxFuture;
27
use futures::stream::FuturesUnordered;
28
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
29
use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig};
30
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
31
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
32
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
33
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
34
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
35
    ActionResourceUsage, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest,
36
    UpdateForWorker, execute_result,
37
};
38
use nativelink_store::fast_slow_store::FastSlowStore;
39
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
40
use nativelink_util::common::fs;
41
use nativelink_util::digest_hasher::DigestHasherFunc;
42
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
43
use nativelink_util::shutdown_guard::ShutdownGuard;
44
use nativelink_util::store_trait::Store;
45
use nativelink_util::{spawn, tls_utils};
46
use opentelemetry::context::Context;
47
use tokio::sync::{broadcast, mpsc};
48
use tokio::{process, time};
49
use tokio_stream::wrappers::UnboundedReceiverStream;
50
use tonic::Streaming;
51
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
52
53
use crate::running_actions_manager::{
54
    ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
55
    RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
56
};
57
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
58
use crate::worker_utils::make_connect_worker_request;
59
60
/// Amount of time to wait if we have actions in transit before we try to
61
/// consider an error to have occurred.
62
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;
63
64
/// If we lose connection to the worker api server we will wait this many seconds
65
/// before trying to connect.
66
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
67
68
/// Default endpoint timeout. If this value gets modified the documentation in
69
/// `cas_server.rs` must also be updated.
70
const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.;
71
72
/// Default maximum amount of time a task is allowed to run for.
73
/// If this value gets modified the documentation in `cas_server.rs` must also be updated.
74
const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_mins(20);
75
const DEFAULT_MAX_UPLOAD_TIMEOUT: Duration = Duration::from_mins(10);
76
77
struct FinishedActionResult {
78
    action_result: ActionResult,
79
    resource_usage: Option<ActionResourceUsage>,
80
}
81
82
struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
83
    config: &'a LocalWorkerConfig,
84
    // According to the tonic documentation it is a cheap operation to clone this.
85
    grpc_client: T,
86
    worker_id: String,
87
    running_actions_manager: Arc<U>,
88
    // Number of actions that have been received in `Update::StartAction`, but
89
    // not yet processed by running_actions_manager's spawn. This number should
90
    // always be zero if there are no actions running and no actions being waited
91
    // on by the scheduler.
92
    actions_in_transit: Arc<AtomicU64>,
93
    metrics: Arc<Metrics>,
94
}
95
96
7
pub async fn preconditions_met<H: BuildHasher + Sync>(
97
7
    precondition_script: Option<String>,
98
7
    extra_envs: &HashMap<String, String, H>,
99
7
) -> Result<(), Error> {
100
7
    let Some(
precondition_script2
) = &precondition_script else {
101
        // No script means we are always ok to proceed.
102
5
        return Ok(());
103
    };
104
    // TODO: Might want to pass some information about the command to the
105
    //       script, but at this point it's not even been downloaded yet,
106
    //       so that's not currently possible.  Perhaps we'll move this in
107
    //       future to pass useful information through?  Or perhaps we'll
108
    //       have a pre-condition and a pre-execute script instead, although
109
    //       arguably entrypoint already gives us that.
110
111
2
    let maybe_split_cmd = shlex::split(precondition_script);
112
2
    let (command, args) = match &maybe_split_cmd {
113
2
        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
114
        None => {
115
0
            return Err(make_input_err!(
116
0
                "Could not parse the value of precondition_script: '{}'",
117
0
                precondition_script,
118
0
            ));
119
        }
120
    };
121
122
2
    let precondition_process = process::Command::new(command)
123
2
        .args(args)
124
2
        .kill_on_drop(true)
125
2
        .stdin(Stdio::null())
126
2
        .stdout(Stdio::piped())
127
2
        .stderr(Stdio::null())
128
2
        .env_clear()
129
2
        .envs(extra_envs)
130
2
        .spawn()
131
2
        .err_tip(|| 
format!0
("Could not execute precondition command {precondition_script:?}"))
?0
;
132
2
    let output = precondition_process.wait_with_output().await
?0
;
133
2
    let stdout = str::from_utf8(&output.stdout).unwrap_or("");
134
2
    trace!(status = %output.status, %stdout, "Preconditions script returned");
135
2
    if output.status.code() == Some(0) {
136
1
        Ok(())
137
    } else {
138
1
        Err(make_err!(
139
1
            Code::ResourceExhausted,
140
1
            "Preconditions script returned status {} - {}",
141
1
            output.status,
142
1
            stdout
143
1
        ))
144
    }
145
7
}
146
147
impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
148
8
    fn new(
149
8
        config: &'a LocalWorkerConfig,
150
8
        grpc_client: T,
151
8
        worker_id: String,
152
8
        running_actions_manager: Arc<U>,
153
8
        metrics: Arc<Metrics>,
154
8
    ) -> Self {
155
8
        Self {
156
8
            config,
157
8
            grpc_client,
158
8
            worker_id,
159
8
            running_actions_manager,
160
8
            // Number of actions that have been received in `Update::StartAction`, but
161
8
            // not yet processed by running_actions_manager's spawn. This number should
162
8
            // always be zero if there are no actions running and no actions being waited
163
8
            // on by the scheduler.
164
8
            actions_in_transit: Arc::new(AtomicU64::new(0)),
165
8
            metrics,
166
8
        }
167
8
    }
168
169
    /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`.
170
8
    async fn start_keep_alive(&self) -> Result<(), Error> {
171
        // According to tonic's documentation this call should be cheap and is the same stream.
172
8
        let mut grpc_client = self.grpc_client.clone();
173
8
        let timeout = self
174
8
            .config
175
8
            .worker_api_endpoint
176
8
            .timeout
177
8
            .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
178
179
8
        info!(timeout, "Started KeepAlive");
180
181
        // We always send 2 keep alive requests per timeout. Http2 should manage most of our
182
        // timeout issues, this is a secondary check to ensure we can still send data.
183
8
        let mut interval = time::interval(Duration::from_secs_f32(timeout) / 2);
184
8
        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
185
186
        // Skip the first interval as it happens immediately and we don't need a keep alive until timeout/2 has passed
187
8
        interval.tick().await;
188
189
        // Explicitly spawn the keep alive loop so it goes onto a different thread from the execute commands
190
1
        drop(
191
2
            spawn!("keep alives", async move {
192
                loop {
193
3
                    interval.tick().await;
194
2
                    if let Err(
e1
) = grpc_client.keep_alive(KeepAliveRequest {}).await {
195
1
                        error!(?e, "Failed to send KeepAlive in LocalWorker");
196
1
                        return;
197
1
                    }
198
1
                    debug!("Sent KeepAlive");
199
                }
200
1
            })
201
2
            .await,
202
        );
203
1
        Ok(())
204
1
    }
205
206
8
    async fn run(
207
8
        &self,
208
8
        update_for_worker_stream: Streaming<UpdateForWorker>,
209
8
        shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>,
210
8
    ) -> Result<(), Error> {
211
        // This big block of logic is designed to help simplify upstream components. Upstream
212
        // components can write standard futures that return a `Result<(), Error>` and this block
213
        // will forward the error up to the client and disconnect from the scheduler.
214
        // It is a common use case that an item sent through update_for_worker_stream will always
215
        // have a response but the response will be triggered through a callback to the scheduler.
216
        // This can be quite tricky to manage, so what we have done here is given access to a
217
        // `futures` variable which because this is in a single thread as well as a channel that you
218
        // send a future into that makes it into the `futures` variable.
219
        // This means that if you want to perform an action based on the result of the future
220
        // you use the `.map()` method and the new action will always come to live in this spawn,
221
        // giving mutable access to stuff in this struct.
222
        // NOTE: If you ever return from this function it will disconnect from the scheduler.
223
8
        let mut futures = FuturesUnordered::new();
224
8
        futures.push(self.start_keep_alive().boxed());
225
226
8
        let (add_future_channel, add_future_rx) = mpsc::unbounded_channel();
227
8
        let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse();
228
229
8
        let mut update_for_worker_stream = update_for_worker_stream.fuse();
230
        // A notify which is triggered every time actions_in_flight is subtracted.
231
8
        let actions_notify = Arc::new(tokio::sync::Notify::new());
232
        // A counter of actions that are in-flight, this is similar to actions_in_transit but
233
        // includes the AC upload and notification to the scheduler.
234
8
        let actions_in_flight = Arc::new(AtomicU64::new(0));
235
        // Set to true when shutting down, this stops any new StartAction.
236
8
        let mut shutting_down = false;
237
238
        loop {
239
26
            select! {
240
26
                
maybe_update8
= update_for_worker_stream.next() => if
!shutting_down8
||
maybe_update0
.
is_some0
() {
241
8
                    match maybe_update
242
8
                        .err_tip(|| "UpdateForWorker stream closed early")
?1
243
7
                        .err_tip(|| "Got error in UpdateForWorker stream")
?0
244
                        .update
245
7
                        .err_tip(|| "Expected update to exist in UpdateForWorker")
?0
246
                    {
247
                        Update::ConnectionResult(_) => {
248
0
                            return Err(make_input_err!(
249
0
                                "Got ConnectionResult in LocalWorker::run which should never happen"
250
0
                            ));
251
                        }
252
                        // TODO(palfrey) We should possibly do something with this notification.
253
0
                        Update::Disconnect(()) => {
254
0
                            self.metrics.disconnects_received.inc();
255
0
                        }
256
0
                        Update::KeepAlive(()) => {
257
0
                            self.metrics.keep_alives_received.inc();
258
0
                        }
259
1
                        Update::KillOperationRequest(kill_operation_request) => {
260
1
                            let operation_id = OperationId::from(kill_operation_request.operation_id);
261
1
                            if let Err(
err0
) = self.running_actions_manager.kill_operation(&operation_id).await {
262
0
                                error!(
263
                                    %operation_id,
264
                                    ?err,
265
                                    "Failed to send kill request for operation"
266
                                );
267
1
                            }
268
                        }
269
6
                        Update::StartAction(start_execute) => {
270
                            // Don't accept any new requests if we're shutting down.
271
6
                            if shutting_down {
272
0
                                if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) {
273
0
                                    self.grpc_client.clone().execution_response(
274
0
                                        ExecuteResult{
275
0
                                            instance_name,
276
0
                                            operation_id: start_execute.operation_id,
277
0
                                            result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())),
278
0
                                            resource_usage: None,
279
0
                                        }
280
0
                                    ).await?;
281
0
                                }
282
0
                                continue;
283
6
                            }
284
285
6
                            self.metrics.start_actions_received.inc();
286
287
6
                            let execute_request = start_execute.execute_request.as_ref();
288
6
                            let operation_id = start_execute.operation_id.clone();
289
6
                            let operation_id_to_log = operation_id.clone();
290
6
                            let maybe_instance_name = execute_request.map(|v| v.instance_name.clone());
291
6
                            let action_digest = execute_request.and_then(|v| v.action_digest.clone());
292
6
                            let digest_hasher = execute_request
293
6
                                .ok_or_else(|| 
make_input_err!0
("Expected execute_request to be set"))
294
6
                                .and_then(|v| DigestHasherFunc::try_from(v.digest_function))
295
6
                                .err_tip(|| "In LocalWorkerImpl::new()")
?0
;
296
297
6
                            let start_action_fut = {
298
6
                                let precondition_script_cfg = self.config.experimental_precondition_script.clone();
299
6
                                let mut extra_envs: HashMap<String, String> = HashMap::new();
300
6
                                if let Some(
ref additional_environment0
) = self.config.additional_environment {
301
0
                                    for (name, source) in additional_environment {
302
0
                                        let value = match source {
303
0
                                            EnvironmentSource::Property(property) => start_execute
304
0
                                                .platform.as_ref().and_then(|p|p.properties.iter().find(|pr| &pr.name == property))
305
0
                                                .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.value.as_str())),
306
0
                                            EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
307
0
                                            EnvironmentSource::FromEnvironment => Cow::Owned(env::var(name).unwrap_or_default()),
308
0
                                            other => {
309
0
                                                debug!(?other, "Worker doesn't support this type of additional environment");
310
0
                                                continue;
311
                                            }
312
                                        };
313
0
                                        extra_envs.insert(name.clone(), value.into_owned());
314
                                    }
315
6
                                }
316
6
                                let actions_in_transit = self.actions_in_transit.clone();
317
6
                                let worker_id = self.worker_id.clone();
318
6
                                let running_actions_manager = self.running_actions_manager.clone();
319
6
                                let mut grpc_client = self.grpc_client.clone();
320
6
                                let complete = ExecuteComplete {
321
6
                                    operation_id: operation_id.clone(),
322
6
                                };
323
6
                                self.metrics.clone().wrap(move |metrics| async move {
324
6
                                    metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs))
325
6
                                    .and_then(|()| 
running_actions_manager5
.
create_and_add_action5
(
worker_id5
,
start_execute5
))
326
6
                                    .map(move |r| {
327
                                        // Now that we either failed or registered our action, we can
328
                                        // consider the action to no longer be in transit.
329
6
                                        actions_in_transit.fetch_sub(1, Ordering::Release);
330
6
                                        r
331
6
                                    })
332
6
                                    .and_then(|action| 
{5
333
5
                                        debug!(
334
5
                                            operation_id = %action.get_operation_id(),
335
                                            "Received request to run action"
336
                                        );
337
5
                                        action
338
5
                                            .clone()
339
5
                                            .prepare_action()
340
5
                                            .and_then(RunningAction::execute)
341
5
                                            .and_then(|result| async move 
{2
342
                                                // Notify that execution has completed so it can schedule a new action.
343
2
                                                drop(grpc_client.execution_complete(complete).await);
344
2
                                                Ok(result)
345
4
                                            })
346
5
                                            .and_then(RunningAction::upload_results)
347
5
                                            .and_then(|action| async move 
{2
348
2
                                                let resource_usage = action.resource_usage();
349
2
                                                let action_result = action.get_finished_result().await
?0
;
350
2
                                                Ok(FinishedActionResult {
351
2
                                                    action_result,
352
2
                                                    resource_usage,
353
2
                                                })
354
4
                                            })
355
                                            // Note: We need ensure we run cleanup even if one of the other steps fail.
356
5
                                            .then(|result| async move 
{4
357
4
                                                if let Err(
e0
) = action.cleanup().await {
358
0
                                                    return Result::<FinishedActionResult, Error>::Err(e).merge(result);
359
4
                                                }
360
4
                                                result
361
8
                                            })
362
6
                                    
}5
).await
363
11
                                })
364
                            };
365
366
6
                            let make_publish_future = {
367
6
                                let mut grpc_client = self.grpc_client.clone();
368
369
6
                                let running_actions_manager = self.running_actions_manager.clone();
370
6
                                let worker_id = self.worker_id.clone();
371
5
                                move |res: Result<FinishedActionResult, Error>| async move {
372
5
                                    let instance_name = maybe_instance_name
373
5
                                        .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")
?0
;
374
5
                                    match res {
375
2
                                        Ok(FinishedActionResult { mut action_result, resource_usage }) => {
376
                                            // Save in the action cache before notifying the scheduler that we've completed.
377
2
                                            if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) &&
378
2
                                                let Err(
err0
) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await {
379
0
                                                    error!(
380
                                                        ?err,
381
                                                        ?action_digest,
382
                                                        "Error saving action in store",
383
                                                    );
384
2
                                                }
385
2
                                            let action_stage = ActionStage::Completed(action_result);
386
2
                                            let resource_usage = resource_usage.map(|mut resource_usage| 
{0
387
0
                                                resource_usage.operation_id.clone_from(&operation_id);
388
0
                                                resource_usage.worker_id.clone_from(&worker_id);
389
0
                                                resource_usage
390
0
                                            });
391
2
                                            grpc_client.execution_response(
392
2
                                                ExecuteResult{
393
2
                                                    instance_name,
394
2
                                                    operation_id,
395
2
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
396
2
                                                    resource_usage,
397
2
                                                }
398
2
                                            )
399
2
                                            .await
400
0
                                            .err_tip(|| "Error while calling execution_response")?;
401
                                        },
402
3
                                        Err(e) => {
403
3
                                            let is_cas_blob_missing = e.code == Code::NotFound
404
2
                                                && e.message_string().contains("not found in either fast or slow store");
405
3
                                            if is_cas_blob_missing {
406
1
                                                warn!(
407
                                                    ?e,
408
                                                    "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"
409
                                                );
410
1
                                                let action_result = ActionResult {
411
1
                                                    error: Some(make_err!(
412
1
                                                        Code::FailedPrecondition,
413
1
                                                        "{}",
414
1
                                                        e.message_string()
415
1
                                                    )),
416
1
                                                    ..ActionResult::default()
417
1
                                                };
418
1
                                                let action_stage = ActionStage::Completed(action_result);
419
1
                                                grpc_client.execution_response(ExecuteResult{
420
1
                                                    instance_name,
421
1
                                                    operation_id,
422
1
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
423
1
                                                    resource_usage: None,
424
1
                                                }).await.
err_tip0
(|| "Error calling execution_response with missing inputs")
?0
;
425
                                            } else {
426
2
                                                grpc_client.execution_response(ExecuteResult{
427
2
                                                    instance_name,
428
2
                                                    operation_id,
429
2
                                                    result: Some(execute_result::Result::InternalError(e.into())),
430
2
                                                    resource_usage: None,
431
2
                                                }).await.
err_tip0
(|| "Error calling execution_response with error")
?0
;
432
                                            }
433
                                        },
434
                                    }
435
0
                                    Ok(())
436
5
                                }
437
                            };
438
439
6
                            self.actions_in_transit.fetch_add(1, Ordering::Release);
440
441
6
                            let add_future_channel = add_future_channel.clone();
442
443
6
                            info_span!(
444
                                "worker_start_action_ctx",
445
                                operation_id = operation_id_to_log,
446
6
                                digest_function = %digest_hasher.to_string(),
447
6
                            ).in_scope(|| {
448
6
                                let _guard = Context::current_with_value(digest_hasher)
449
6
                                    .attach();
450
451
6
                                let actions_in_flight = actions_in_flight.clone();
452
6
                                let actions_notify = actions_notify.clone();
453
6
                                let actions_in_flight_fail = actions_in_flight.clone();
454
6
                                let actions_notify_fail = actions_notify.clone();
455
6
                                actions_in_flight.fetch_add(1, Ordering::Release);
456
457
6
                                futures.push(
458
6
                                    spawn!("worker_start_action", start_action_fut).map(move |res| 
{5
459
5
                                        let res = res.err_tip(|| "Failed to launch spawn")
?0
;
460
5
                                        if let Err(
err3
) = &res {
461
3
                                            error!(?err, "Error executing action");
462
2
                                        }
463
5
                                        add_future_channel
464
5
                                            .send(make_publish_future(res).then(move |res| 
{0
465
0
                                                actions_in_flight.fetch_sub(1, Ordering::Release);
466
0
                                                actions_notify.notify_one();
467
0
                                                core::future::ready(res)
468
5
                                            
}0
).boxed())
469
5
                                            .map_err(|err|
470
0
                                                Error::from_std_err(Code::Internal, &err).append("LocalWorker could not send future")
471
0
                                                )?;
472
5
                                        Ok(())
473
5
                                    })
474
6
                                    .or_else(move |err| 
{0
475
                                        // If the make_publish_future is not run we still need to notify.
476
0
                                        actions_in_flight_fail.fetch_sub(1, Ordering::Release);
477
0
                                        actions_notify_fail.notify_one();
478
0
                                        core::future::ready(Err(err))
479
0
                                    })
480
6
                                    .boxed()
481
                                );
482
6
                            });
483
                        }
484
                    }
485
0
                },
486
26
                
res5
= add_future_rx.next() => {
487
5
                    let fut = res.err_tip(|| "New future stream receives should never be closed")
?0
;
488
5
                    futures.push(fut);
489
                },
490
26
                
res7
= futures.next() =>
res7
.
err_tip7
(|| "Keep-alive should always pending. Likely unable to send data to scheduler")
?1
?0
,
491
26
                
complete_msg0
= shutdown_rx.recv().fuse() => {
492
0
                    warn!("Worker loop received shutdown signal. Shutting down worker...",);
493
0
                    let mut grpc_client = self.grpc_client.clone();
494
0
                    let shutdown_guard = complete_msg.map_err(|e|
495
0
                        Error::from_std_err(Code::Internal, &e).append("Failed to receive shutdown message"))?;
496
0
                    let actions_in_flight = actions_in_flight.clone();
497
0
                    let actions_notify = actions_notify.clone();
498
0
                    let shutdown_future = async move {
499
                        // Wait for in-flight operations to be fully completed.
500
0
                        while actions_in_flight.load(Ordering::Acquire) > 0 {
501
0
                            actions_notify.notified().await;
502
                        }
503
                        // Sending this message immediately evicts all jobs from
504
                        // this worker, of which there should be none.
505
0
                        if let Err(e) = grpc_client.going_away(GoingAwayRequest {}).await {
506
0
                            error!("Failed to send GoingAwayRequest: {e}",);
507
0
                            return Err(e);
508
0
                        }
509
                        // Allow shutdown to occur now.
510
0
                        drop(shutdown_guard);
511
0
                        Ok::<(), Error>(())
512
0
                    };
513
0
                    futures.push(shutdown_future.boxed());
514
0
                    shutting_down = true;
515
                },
516
            };
517
        }
518
        // Unreachable.
519
2
    }
520
}
521
522
type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>;
523
524
pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
525
    config: Arc<LocalWorkerConfig>,
526
    running_actions_manager: Arc<U>,
527
    connection_factory: ConnectionFactory<T>,
528
    sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
529
    metrics: Arc<Metrics>,
530
}
531
532
impl<
533
    T: WorkerApiClientTrait + core::fmt::Debug + 'static,
534
    U: RunningActionsManager + core::fmt::Debug,
535
> core::fmt::Debug for LocalWorker<T, U>
536
{
537
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
538
0
        f.debug_struct("LocalWorker")
539
0
            .field("config", &self.config)
540
0
            .field("running_actions_manager", &self.running_actions_manager)
541
0
            .field("metrics", &self.metrics)
542
0
            .finish_non_exhaustive()
543
0
    }
544
}
545
546
/// Creates a new `LocalWorker`. The `cas_store` must be an instance of
547
/// `FastSlowStore` and will be checked at runtime.
548
0
pub async fn new_local_worker(
549
0
    config: Arc<LocalWorkerConfig>,
550
0
    cas_store: Store,
551
0
    ac_store: Option<Store>,
552
0
    historical_store: Store,
553
2
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
554
2
    let fast_slow_store = cas_store
555
2
        .downcast_ref::<FastSlowStore>(None)
556
2
        .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")
?0
557
2
        .get_arc()
558
2
        .err_tip(|| "FastSlowStore's Arc doesn't exist")
?0
;
559
560
    // Log warning about CAS configuration for multi-worker setups
561
2
    event!(
562
2
        Level::INFO,
563
2
        worker_name = %config.name,
564
        "Starting worker '{}'. IMPORTANT: If running multiple workers, all workers \
565
        must share the same CAS storage path to avoid 'Object not found' errors.",
566
2
        config.name
567
    );
568
569
2
    if let Ok(
path1
) = fs::canonicalize(&config.work_directory).await {
570
1
        fs::remove_dir_all(&path).await.err_tip(|| 
{0
571
0
            format!(
572
                "Could not remove work_directory '{}' in LocalWorker",
573
0
                &path.as_path().to_str().unwrap_or("bad path")
574
            )
575
0
        })?;
576
1
    }
577
578
2
    fs::create_dir_all(&config.work_directory)
579
2
        .await
580
2
        .err_tip(|| 
format!0
("Could not make work_directory : {}",
config.work_directory0
))
?0
;
581
2
    let entrypoint = if config.entrypoint.is_empty() {
582
2
        None
583
    } else {
584
0
        Some(config.entrypoint.clone())
585
    };
586
2
    let max_action_timeout = if config.max_action_timeout == 0 {
587
2
        DEFAULT_MAX_ACTION_TIMEOUT
588
    } else {
589
0
        Duration::from_secs(config.max_action_timeout as u64)
590
    };
591
2
    let max_upload_timeout = if config.max_upload_timeout == 0 {
592
2
        DEFAULT_MAX_UPLOAD_TIMEOUT
593
    } else {
594
0
        Duration::from_secs(config.max_upload_timeout as u64)
595
    };
596
597
    // Initialize directory cache if configured
598
2
    let directory_cache = if let Some(
cache_config0
) = &config.directory_cache {
599
        use std::path::PathBuf;
600
601
        use crate::directory_cache::{
602
            DirectoryCache, DirectoryCacheConfig as WorkerDirCacheConfig,
603
        };
604
605
0
        let cache_root = if cache_config.cache_root.is_empty() {
606
0
            PathBuf::from(&config.work_directory).parent().map_or_else(
607
0
                || PathBuf::from("/tmp/nativelink_directory_cache"),
608
0
                |p| p.join("directory_cache"),
609
            )
610
        } else {
611
0
            PathBuf::from(&cache_config.cache_root)
612
        };
613
614
0
        let worker_cache_config = WorkerDirCacheConfig {
615
0
            max_entries: cache_config.max_entries,
616
0
            max_size_bytes: cache_config.max_size_bytes,
617
0
            cache_root,
618
0
        };
619
620
0
        match DirectoryCache::new(worker_cache_config, fast_slow_store.clone()).await {
621
0
            Ok(cache) => {
622
0
                tracing::info!("Directory cache initialized successfully");
623
0
                Some(Arc::new(cache))
624
            }
625
0
            Err(e) => {
626
0
                tracing::warn!("Failed to initialize directory cache: {:?}", e);
627
0
                None
628
            }
629
        }
630
    } else {
631
2
        None
632
    };
633
634
    #[cfg(target_os = "linux")]
635
2
    let use_namespaces = if let Some(
use_namespaces0
) = &config.use_namespaces {
636
0
        if *use_namespaces
637
0
            && !crate::namespace_utils::namespaces_supported(
638
0
                config.use_mount_namespace.unwrap_or_default(),
639
0
            )
640
        {
641
0
            return Err(make_err!(Code::Unavailable, "Namespaces not supported"));
642
0
        }
643
0
        if !*use_namespaces {
644
0
            crate::running_actions_manager::UseNamespaces::No
645
0
        } else if config.use_mount_namespace.unwrap_or_default() {
646
0
            crate::running_actions_manager::UseNamespaces::YesAndMount
647
        } else {
648
0
            crate::running_actions_manager::UseNamespaces::Yes
649
        }
650
2
    } else if config
651
2
        .use_mount_namespace
652
2
        .is_some_and(core::convert::identity)
653
    {
654
0
        return Err(make_err!(
655
0
            Code::Unavailable,
656
0
            "Mount namespaces not supported"
657
0
        ));
658
    } else {
659
2
        crate::running_actions_manager::UseNamespaces::No
660
    };
661
662
    #[cfg(not(target_os = "linux"))]
663
    if config.use_namespaces.is_some_and(core::convert::identity) {
664
        return Err(make_err!(
665
            Code::Unavailable,
666
            "Namespaces not supported on non-Linux OSes"
667
        ));
668
    }
669
    #[cfg(not(target_os = "linux"))]
670
    if config
671
        .use_mount_namespace
672
        .is_some_and(core::convert::identity)
673
    {
674
        return Err(make_err!(
675
            Code::Unavailable,
676
            "Mount namespaces not supported on non-Linux OSes"
677
        ));
678
    }
679
680
2
    let running_actions_manager =
681
2
        Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
682
2
            root_action_directory: config.work_directory.clone(),
683
2
            execution_configuration: ExecutionConfiguration {
684
2
                entrypoint,
685
2
                additional_environment: config.additional_environment.clone(),
686
2
            },
687
2
            cas_store: fast_slow_store,
688
2
            ac_store,
689
2
            historical_store,
690
2
            upload_action_result_config: &config.upload_action_result,
691
2
            max_action_timeout,
692
2
            max_upload_timeout,
693
2
            timeout_handled_externally: config.timeout_handled_externally,
694
2
            directory_cache,
695
2
            #[cfg(target_os = "linux")]
696
2
            use_namespaces,
697
2
        })
?0
);
698
2
    let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager(
699
2
        config.clone(),
700
2
        running_actions_manager,
701
2
        Box::new(move || 
{0
702
0
            let config = config.clone();
703
0
            Box::pin(async move {
704
0
                let timeout = config
705
0
                    .worker_api_endpoint
706
0
                    .timeout
707
0
                    .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
708
0
                let timeout_duration = Duration::from_secs_f32(timeout);
709
0
                let tls_config =
710
0
                    tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
711
0
                        .err_tip(|| "Parsing local worker TLS configuration")?;
712
0
                let endpoint =
713
0
                    tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config)
714
0
                        .map_err(|e| {
715
0
                            Error::from_std_err(Code::InvalidArgument, &e)
716
0
                                .append("Invalid URI for worker endpoint")
717
0
                        })?
718
0
                        .connect_timeout(timeout_duration)
719
0
                        .timeout(timeout_duration);
720
721
0
                let transport = endpoint.connect().await.map_err(|e| {
722
0
                    Error::from_std_err(Code::Internal, &e).append(format!(
723
                        "Could not connect to endpoint {}",
724
0
                        config.worker_api_endpoint.uri
725
                    ))
726
0
                })?;
727
0
                Ok(WorkerApiClient::new(transport).into())
728
0
            })
729
0
        }),
730
2
        Box::new(move |d| 
Box::pin0
(
time::sleep0
(
d0
))),
731
    );
732
2
    Ok(local_worker)
733
2
}
734
735
impl<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorker<T, U> {
736
11
    pub fn new_with_connection_factory_and_actions_manager(
737
11
        config: Arc<LocalWorkerConfig>,
738
11
        running_actions_manager: Arc<U>,
739
11
        connection_factory: ConnectionFactory<T>,
740
11
        sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>,
741
11
    ) -> Self {
742
11
        let metrics = Arc::new(Metrics::new(Arc::downgrade(
743
11
            running_actions_manager.metrics(),
744
        )));
745
11
        Self {
746
11
            config,
747
11
            running_actions_manager,
748
11
            connection_factory,
749
11
            sleep_fn: Some(sleep_fn),
750
11
            metrics,
751
11
        }
752
11
    }
753
754
    #[allow(
755
        clippy::missing_const_for_fn,
756
        reason = "False positive on stable, but not on nightly"
757
    )]
758
0
    pub fn name(&self) -> &String {
759
0
        &self.config.name
760
0
    }
761
762
12
    async fn register_worker(
763
12
        &self,
764
12
        client: &mut T,
765
12
    ) -> Result<(String, Streaming<UpdateForWorker>), Error> {
766
12
        let mut extra_envs: HashMap<String, String> = HashMap::new();
767
12
        if let Some(
ref additional_environment0
) = self.config.additional_environment {
768
0
            for (name, source) in additional_environment {
769
0
                let value = match source {
770
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
771
                    EnvironmentSource::FromEnvironment => {
772
0
                        Cow::Owned(env::var(name).unwrap_or_default())
773
                    }
774
0
                    other => {
775
0
                        debug!(
776
                            ?other,
777
                            "Worker registration doesn't support this type of additional environment"
778
                        );
779
0
                        continue;
780
                    }
781
                };
782
0
                extra_envs.insert(name.clone(), value.into_owned());
783
            }
784
12
        }
785
786
12
        let connect_worker_request = make_connect_worker_request(
787
12
            self.config.name.clone(),
788
12
            &self.config.platform_properties,
789
12
            &extra_envs,
790
12
            self.config.max_inflight_tasks,
791
12
        )
792
12
        .await
?0
;
793
12
        let 
mut update_for_worker_stream9
= client
794
12
            .connect_worker(connect_worker_request)
795
12
            .await
796
9
            .err_tip(|| "Could not call connect_worker() in worker")
?0
797
9
            .into_inner();
798
799
9
        let 
first_msg_update8
= update_for_worker_stream
800
9
            .next()
801
9
            .await
802
9
            .err_tip(|| "Got EOF expected UpdateForWorker")
?1
803
8
            .err_tip(|| "Got error when receiving UpdateForWorker")
?0
804
            .update;
805
806
8
        let worker_id = match first_msg_update {
807
8
            Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id,
808
0
            other => {
809
0
                return Err(make_input_err!(
810
0
                    "Expected first response from scheduler to be a ConnectionResult got : {:?}",
811
0
                    other
812
0
                ));
813
            }
814
        };
815
8
        Ok((worker_id, update_for_worker_stream))
816
9
    }
817
818
    #[instrument(skip(self), level = Level::INFO)]
819
9
    pub async fn run(
820
9
        mut self,
821
9
        mut shutdown_rx: broadcast::Receiver<ShutdownGuard>,
822
9
    ) -> Result<(), Error> {
823
        // Belt-and-suspenders QoS bump: the main binary already calls
824
        // this before runtime creation so the tokio worker threads
825
        // inherit P-core preference via pthread QoS inheritance, but
826
        // any thread that reaches this point should also be tagged in
827
        // case it was spawned by a path that bypassed `on_thread_start`.
828
        // No-op on non-macOS.
829
        let _ = crate::qos::set_user_initiated();
830
831
        let sleep_fn = self
832
            .sleep_fn
833
            .take()
834
            .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?;
835
        let sleep_fn_pin = Pin::new(&sleep_fn);
836
3
        let error_handler = Box::pin(move |err| async move {
837
3
            error!(?err, "Error");
838
3
            (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await;
839
6
        });
840
841
        loop {
842
            // First connect to our endpoint.
843
            let mut client = match (self.connection_factory)().await {
844
                Ok(client) => client,
845
                Err(e) => {
846
                    (error_handler)(e).await;
847
                    continue; // Try to connect again.
848
                }
849
            };
850
851
            debug!("Connected to endpoint");
852
853
            // Next register our worker with the scheduler.
854
            let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await {
855
                Err(e) => {
856
                    (error_handler)(e).await;
857
                    continue; // Try to connect again.
858
                }
859
                Ok((worker_id, update_for_worker_stream)) => (
860
                    LocalWorkerImpl::new(
861
                        &self.config,
862
                        client,
863
                        worker_id,
864
                        self.running_actions_manager.clone(),
865
                        self.metrics.clone(),
866
                    ),
867
                    update_for_worker_stream,
868
                ),
869
            };
870
            info!(
871
                worker_id = %inner.worker_id,
872
                "Worker registered with scheduler"
873
            );
874
875
            // Now listen for connections and run all other services.
876
            if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await {
877
                'no_more_actions: {
878
                    // Ensure there are no actions in transit before we try to kill
879
                    // all our actions.
880
                    const ITERATIONS: usize = 1_000;
881
882
                    const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler";
883
884
                    let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
885
                    for _ in 0..ITERATIONS {
886
                        if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
887
                            break 'no_more_actions;
888
                        }
889
                        (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
890
                    }
891
                    error!(ERROR_MSG);
892
                    return Err(err.append(ERROR_MSG));
893
                }
894
                error!(?err, "Worker disconnected from scheduler");
895
                // Kill off any existing actions because if we re-connect, we'll
896
                // get some more and it might resource lock us.
897
                self.running_actions_manager.kill_all().await;
898
899
                (error_handler)(err).await; // Try to connect again.
900
            }
901
        }
902
        // Unreachable.
903
0
    }
904
}
905
906
#[derive(Debug, MetricsComponent)]
907
pub struct Metrics {
908
    #[metric(
909
        help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it."
910
    )]
911
    start_actions_received: CounterWithTime,
912
    #[metric(help = "Total number of disconnects received from the scheduler.")]
913
    disconnects_received: CounterWithTime,
914
    #[metric(help = "Total number of keep-alives received from the scheduler.")]
915
    keep_alives_received: CounterWithTime,
916
    #[metric(
917
        help = "Stats about the calls to check if an action satisfies the config supplied script."
918
    )]
919
    preconditions: AsyncCounterWrapper,
920
    #[metric]
921
    #[allow(
922
        clippy::struct_field_names,
923
        reason = "TODO Fix this. Triggers on nightly"
924
    )]
925
    running_actions_manager_metrics: Weak<RunningActionManagerMetrics>,
926
}
927
928
impl RootMetricsComponent for Metrics {}
929
930
impl Metrics {
931
11
    fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self {
932
11
        Self {
933
11
            start_actions_received: CounterWithTime::default(),
934
11
            disconnects_received: CounterWithTime::default(),
935
11
            keep_alives_received: CounterWithTime::default(),
936
11
            preconditions: AsyncCounterWrapper::default(),
937
11
            running_actions_manager_metrics,
938
11
        }
939
11
    }
940
}
941
942
impl Metrics {
943
6
    async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>(
944
6
        self: Arc<Self>,
945
6
        fut: F,
946
6
    ) -> U {
947
6
        fut(self).await
948
5
    }
949
}