Coverage Report

Created: 2026-03-06 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/local_worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::BuildHasher;
16
use core::pin::Pin;
17
use core::str;
18
use core::sync::atomic::{AtomicU64, Ordering};
19
use core::time::Duration;
20
use std::borrow::Cow;
21
use std::collections::HashMap;
22
use std::env;
23
use std::process::Stdio;
24
use std::sync::{Arc, Weak};
25
26
use futures::future::BoxFuture;
27
use futures::stream::FuturesUnordered;
28
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
29
use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig};
30
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
31
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
32
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
33
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
34
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
35
    ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
36
    execute_result,
37
};
38
use nativelink_store::fast_slow_store::FastSlowStore;
39
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
40
use nativelink_util::common::fs;
41
use nativelink_util::digest_hasher::DigestHasherFunc;
42
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
43
use nativelink_util::shutdown_guard::ShutdownGuard;
44
use nativelink_util::store_trait::Store;
45
use nativelink_util::{spawn, tls_utils};
46
use opentelemetry::context::Context;
47
use tokio::process;
48
use tokio::sync::{broadcast, mpsc};
49
use tokio::time::sleep;
50
use tokio_stream::wrappers::UnboundedReceiverStream;
51
use tonic::Streaming;
52
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
53
54
use crate::running_actions_manager::{
55
    ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
56
    RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
57
};
58
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
59
use crate::worker_utils::make_connect_worker_request;
60
61
/// Amount of time to wait if we have actions in transit before we try to
62
/// consider an error to have occurred.
63
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;
64
65
/// If we lose connection to the worker api server we will wait this many seconds
66
/// before trying to connect.
67
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
68
69
/// Default endpoint timeout. If this value gets modified the documentation in
70
/// `cas_server.rs` must also be updated.
71
const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.;
72
73
/// Default maximum amount of time a task is allowed to run for.
74
/// If this value gets modified the documentation in `cas_server.rs` must also be updated.
75
const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_secs(1200); // 20 mins.
76
const DEFAULT_MAX_UPLOAD_TIMEOUT: Duration = Duration::from_secs(600); // 10 mins.
77
78
struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
79
    config: &'a LocalWorkerConfig,
80
    // According to the tonic documentation it is a cheap operation to clone this.
81
    grpc_client: T,
82
    worker_id: String,
83
    running_actions_manager: Arc<U>,
84
    // Number of actions that have been received in `Update::StartAction`, but
85
    // not yet processed by running_actions_manager's spawn. This number should
86
    // always be zero if there are no actions running and no actions being waited
87
    // on by the scheduler.
88
    actions_in_transit: Arc<AtomicU64>,
89
    metrics: Arc<Metrics>,
90
}
91
92
7
pub async fn preconditions_met<H: BuildHasher + Sync>(
93
7
    precondition_script: Option<String>,
94
7
    extra_envs: &HashMap<String, String, H>,
95
7
) -> Result<(), Error> {
96
7
    let Some(
precondition_script2
) = &precondition_script else {
  Branch (96:9): [True: 2, False: 5]
  Branch (96:9): [Folded - Ignored]
  Branch (96:9): [Folded - Ignored]
97
        // No script means we are always ok to proceed.
98
5
        return Ok(());
99
    };
100
    // TODO: Might want to pass some information about the command to the
101
    //       script, but at this point it's not even been downloaded yet,
102
    //       so that's not currently possible.  Perhaps we'll move this in
103
    //       future to pass useful information through?  Or perhaps we'll
104
    //       have a pre-condition and a pre-execute script instead, although
105
    //       arguably entrypoint already gives us that.
106
107
2
    let maybe_split_cmd = shlex::split(precondition_script);
108
2
    let (command, args) = match &maybe_split_cmd {
109
2
        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
110
        None => {
111
0
            return Err(make_input_err!(
112
0
                "Could not parse the value of precondition_script: '{}'",
113
0
                precondition_script,
114
0
            ));
115
        }
116
    };
117
118
2
    let precondition_process = process::Command::new(command)
119
2
        .args(args)
120
2
        .kill_on_drop(true)
121
2
        .stdin(Stdio::null())
122
2
        .stdout(Stdio::piped())
123
2
        .stderr(Stdio::null())
124
2
        .env_clear()
125
2
        .envs(extra_envs)
126
2
        .spawn()
127
2
        .err_tip(|| format!(
"Could not execute precondition command {precondition_script:?}"0
))
?0
;
128
2
    let output = precondition_process.wait_with_output().await
?0
;
129
2
    let stdout = str::from_utf8(&output.stdout).unwrap_or("");
130
2
    trace!(status = %output.status, %stdout, "Preconditions script returned");
131
2
    if output.status.code() == Some(0) {
  Branch (131:8): [True: 1, False: 1]
  Branch (131:8): [Folded - Ignored]
  Branch (131:8): [Folded - Ignored]
132
1
        Ok(())
133
    } else {
134
1
        Err(make_err!(
135
1
            Code::ResourceExhausted,
136
1
            "Preconditions script returned status {} - {}",
137
1
            output.status,
138
1
            stdout
139
1
        ))
140
    }
141
7
}
142
143
impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
144
7
    fn new(
145
7
        config: &'a LocalWorkerConfig,
146
7
        grpc_client: T,
147
7
        worker_id: String,
148
7
        running_actions_manager: Arc<U>,
149
7
        metrics: Arc<Metrics>,
150
7
    ) -> Self {
151
7
        Self {
152
7
            config,
153
7
            grpc_client,
154
7
            worker_id,
155
7
            running_actions_manager,
156
7
            // Number of actions that have been received in `Update::StartAction`, but
157
7
            // not yet processed by running_actions_manager's spawn. This number should
158
7
            // always be zero if there are no actions running and no actions being waited
159
7
            // on by the scheduler.
160
7
            actions_in_transit: Arc::new(AtomicU64::new(0)),
161
7
            metrics,
162
7
        }
163
7
    }
164
165
    /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`.
166
7
    async fn start_keep_alive(&self) -> Result<(), Error> 
{6
167
        // According to tonic's documentation this call should be cheap and is the same stream.
168
6
        let mut grpc_client = self.grpc_client.clone();
169
170
        loop {
171
6
            let timeout = self
172
6
                .config
173
6
                .worker_api_endpoint
174
6
                .timeout
175
6
                .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
176
            // We always send 2 keep alive requests per timeout. Http2 should manage most of our
177
            // timeout issues, this is a secondary check to ensure we can still send data.
178
6
            sleep(Duration::from_secs_f32(timeout / 2.)).await;
179
0
            if let Err(e) = grpc_client.keep_alive(KeepAliveRequest {}).await {
  Branch (179:20): [True: 0, False: 0]
  Branch (179:20): [Folded - Ignored]
  Branch (179:20): [Folded - Ignored]
180
0
                return Err(make_err!(
181
0
                    Code::Internal,
182
0
                    "Failed to send KeepAlive in LocalWorker : {:?}",
183
0
                    e
184
0
                ));
185
0
            }
186
        }
187
0
    }
188
189
7
    async fn run(
190
7
        &self,
191
7
        update_for_worker_stream: Streaming<UpdateForWorker>,
192
7
        shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>,
193
7
    ) -> Result<(), Error> {
194
        // This big block of logic is designed to help simplify upstream components. Upstream
195
        // components can write standard futures that return a `Result<(), Error>` and this block
196
        // will forward the error up to the client and disconnect from the scheduler.
197
        // It is a common use case that an item sent through update_for_worker_stream will always
198
        // have a response but the response will be triggered through a callback to the scheduler.
199
        // This can be quite tricky to manage, so what we have done here is given access to a
200
        // `futures` variable which because this is in a single thread as well as a channel that you
201
        // send a future into that makes it into the `futures` variable.
202
        // This means that if you want to perform an action based on the result of the future
203
        // you use the `.map()` method and the new action will always come to live in this spawn,
204
        // giving mutable access to stuff in this struct.
205
        // NOTE: If you ever return from this function it will disconnect from the scheduler.
206
7
        let mut futures = FuturesUnordered::new();
207
7
        futures.push(self.start_keep_alive().boxed());
208
209
7
        let (add_future_channel, add_future_rx) = mpsc::unbounded_channel();
210
7
        let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse();
211
212
7
        let mut update_for_worker_stream = update_for_worker_stream.fuse();
213
        // A notify which is triggered every time actions_in_flight is subtracted.
214
7
        let actions_notify = Arc::new(tokio::sync::Notify::new());
215
        // A counter of actions that are in-flight, this is similar to actions_in_transit but
216
        // includes the AC upload and notification to the scheduler.
217
7
        let actions_in_flight = Arc::new(AtomicU64::new(0));
218
        // Set to true when shutting down, this stops any new StartAction.
219
7
        let mut shutting_down = false;
220
221
        loop {
222
24
            select! {
223
24
                
maybe_update8
= update_for_worker_stream.next() => if
!shutting_down8
||
maybe_update0
.
is_some0
() {
  Branch (223:70): [True: 8, False: 0]
  Branch (223:88): [True: 0, False: 0]
  Branch (223:70): [Folded - Ignored]
  Branch (223:88): [Folded - Ignored]
  Branch (223:70): [Folded - Ignored]
  Branch (223:88): [Folded - Ignored]
224
8
                    match maybe_update
225
8
                        .err_tip(|| "UpdateForWorker stream closed early")
?1
226
7
                        .err_tip(|| "Got error in UpdateForWorker stream")
?0
227
                        .update
228
7
                        .err_tip(|| "Expected update to exist in UpdateForWorker")
?0
229
                    {
230
                        Update::ConnectionResult(_) => {
231
0
                            return Err(make_input_err!(
232
0
                                "Got ConnectionResult in LocalWorker::run which should never happen"
233
0
                            ));
234
                        }
235
                        // TODO(palfrey) We should possibly do something with this notification.
236
0
                        Update::Disconnect(()) => {
237
0
                            self.metrics.disconnects_received.inc();
238
0
                        }
239
0
                        Update::KeepAlive(()) => {
240
0
                            self.metrics.keep_alives_received.inc();
241
0
                        }
242
1
                        Update::KillOperationRequest(kill_operation_request) => {
243
1
                            let operation_id = OperationId::from(kill_operation_request.operation_id);
244
1
                            if let Err(
err0
) = self.running_actions_manager.kill_operation(&operation_id).await {
  Branch (244:36): [True: 0, False: 1]
  Branch (244:36): [Folded - Ignored]
  Branch (244:36): [Folded - Ignored]
245
0
                                error!(
246
                                    %operation_id,
247
                                    ?err,
248
0
                                    "Failed to send kill request for operation"
249
                                );
250
1
                            }
251
                        }
252
6
                        Update::StartAction(start_execute) => {
253
                            // Don't accept any new requests if we're shutting down.
254
6
                            if shutting_down {
  Branch (254:32): [True: 0, False: 6]
  Branch (254:32): [Folded - Ignored]
  Branch (254:32): [Folded - Ignored]
255
0
                                if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) {
  Branch (255:40): [True: 0, False: 0]
  Branch (255:40): [Folded - Ignored]
  Branch (255:40): [Folded - Ignored]
256
0
                                    self.grpc_client.clone().execution_response(
257
0
                                        ExecuteResult{
258
0
                                            instance_name,
259
0
                                            operation_id: start_execute.operation_id,
260
0
                                            result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())),
261
0
                                        }
262
0
                                    ).await?;
263
0
                                }
264
0
                                continue;
265
6
                            }
266
267
6
                            self.metrics.start_actions_received.inc();
268
269
6
                            let execute_request = start_execute.execute_request.as_ref();
270
6
                            let operation_id = start_execute.operation_id.clone();
271
6
                            let operation_id_to_log = operation_id.clone();
272
6
                            let maybe_instance_name = execute_request.map(|v| v.instance_name.clone());
273
6
                            let action_digest = execute_request.and_then(|v| v.action_digest.clone());
274
6
                            let digest_hasher = execute_request
275
6
                                .ok_or_else(|| make_input_err!("Expected execute_request to be set"))
276
6
                                .and_then(|v| DigestHasherFunc::try_from(v.digest_function))
277
6
                                .err_tip(|| "In LocalWorkerImpl::new()")
?0
;
278
279
6
                            let start_action_fut = {
280
6
                                let precondition_script_cfg = self.config.experimental_precondition_script.clone();
281
6
                                let mut extra_envs: HashMap<String, String> = HashMap::new();
282
6
                                if let Some(
ref additional_environment0
) = self.config.additional_environment {
  Branch (282:40): [True: 0, False: 6]
  Branch (282:40): [Folded - Ignored]
  Branch (282:40): [Folded - Ignored]
283
0
                                    for (name, source) in additional_environment {
284
0
                                        let value = match source {
285
0
                                            EnvironmentSource::Property(property) => start_execute
286
0
                                                .platform.as_ref().and_then(|p|p.properties.iter().find(|pr| &pr.name == property))
287
0
                                                .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.value.as_str())),
288
0
                                            EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
289
0
                                            EnvironmentSource::FromEnvironment => Cow::Owned(env::var(name).unwrap_or_default()),
290
0
                                            other => {
291
0
                                                debug!(?other, "Worker doesn't support this type of additional environment");
292
0
                                                continue;
293
                                            }
294
                                        };
295
0
                                        extra_envs.insert(name.clone(), value.into_owned());
296
                                    }
297
6
                                }
298
6
                                let actions_in_transit = self.actions_in_transit.clone();
299
6
                                let worker_id = self.worker_id.clone();
300
6
                                let running_actions_manager = self.running_actions_manager.clone();
301
6
                                let mut grpc_client = self.grpc_client.clone();
302
6
                                let complete = ExecuteComplete {
303
6
                                    operation_id: operation_id.clone(),
304
6
                                };
305
6
                                self.metrics.clone().wrap(move |metrics| async move {
306
6
                                    metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs))
307
6
                                    .and_then(|()| 
running_actions_manager5
.
create_and_add_action5
(
worker_id5
,
start_execute5
))
308
6
                                    .map(move |r| {
309
                                        // Now that we either failed or registered our action, we can
310
                                        // consider the action to no longer be in transit.
311
6
                                        actions_in_transit.fetch_sub(1, Ordering::Release);
312
6
                                        r
313
6
                                    })
314
6
                                    .and_then(|action| 
{5
315
5
                                        debug!(
316
5
                                            operation_id = %action.get_operation_id(),
317
5
                                            "Received request to run action"
318
                                        );
319
5
                                        action
320
5
                                            .clone()
321
5
                                            .prepare_action()
322
5
                                            .and_then(RunningAction::execute)
323
5
                                            .and_then(|result| async move 
{2
324
                                                // Notify that execution has completed so it can schedule a new action.
325
2
                                                drop(grpc_client.execution_complete(complete).await);
326
2
                                                Ok(result)
327
4
                                            })
328
5
                                            .and_then(RunningAction::upload_results)
329
5
                                            .and_then(RunningAction::get_finished_result)
330
                                            // Note: We need ensure we run cleanup even if one of the other steps fail.
331
5
                                            .then(|result| async move 
{4
332
4
                                                if let Err(
e0
) = action.cleanup().await {
  Branch (332:56): [True: 0, False: 4]
  Branch (332:56): [Folded - Ignored]
  Branch (332:56): [Folded - Ignored]
333
0
                                                    return Result::<ActionResult, Error>::Err(e).merge(result);
334
4
                                                }
335
4
                                                result
336
8
                                            })
337
6
                                    
}5
).await
338
11
                                })
339
                            };
340
341
6
                            let make_publish_future = {
342
6
                                let mut grpc_client = self.grpc_client.clone();
343
344
6
                                let running_actions_manager = self.running_actions_manager.clone();
345
5
                                move |res: Result<ActionResult, Error>| async move {
346
5
                                    let instance_name = maybe_instance_name
347
5
                                        .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")
?0
;
348
5
                                    match res {
349
2
                                        Ok(mut action_result) => {
350
                                            // Save in the action cache before notifying the scheduler that we've completed.
351
2
                                            if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) {
  Branch (351:52): [True: 2, False: 0]
  Branch (351:52): [Folded - Ignored]
  Branch (351:52): [Folded - Ignored]
352
2
                                                if let Err(
err0
) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await {
  Branch (352:56): [True: 0, False: 2]
  Branch (352:56): [Folded - Ignored]
  Branch (352:56): [Folded - Ignored]
353
0
                                                    error!(
354
                                                        ?err,
355
                                                        ?action_digest,
356
0
                                                        "Error saving action in store",
357
                                                    );
358
2
                                                }
359
0
                                            }
360
2
                                            let action_stage = ActionStage::Completed(action_result);
361
2
                                            grpc_client.execution_response(
362
2
                                                ExecuteResult{
363
2
                                                    instance_name,
364
2
                                                    operation_id,
365
2
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
366
2
                                                }
367
2
                                            )
368
2
                                            .await
369
0
                                            .err_tip(|| "Error while calling execution_response")?;
370
                                        },
371
3
                                        Err(e) => {
372
3
                                            let is_cas_blob_missing = e.code == Code::NotFound
  Branch (372:71): [True: 2, False: 1]
  Branch (372:71): [Folded - Ignored]
  Branch (372:71): [Folded - Ignored]
373
2
                                                && e.message_string().contains("not found in either fast or slow store");
374
3
                                            if is_cas_blob_missing {
  Branch (374:48): [True: 1, False: 2]
  Branch (374:48): [Folded - Ignored]
  Branch (374:48): [Folded - Ignored]
375
1
                                                warn!(
376
                                                    ?e,
377
1
                                                    "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"
378
                                                );
379
1
                                                let action_result = ActionResult {
380
1
                                                    error: Some(make_err!(
381
1
                                                        Code::FailedPrecondition,
382
1
                                                        "{}",
383
1
                                                        e.message_string()
384
1
                                                    )),
385
1
                                                    ..ActionResult::default()
386
1
                                                };
387
1
                                                let action_stage = ActionStage::Completed(action_result);
388
1
                                                grpc_client.execution_response(ExecuteResult{
389
1
                                                    instance_name,
390
1
                                                    operation_id,
391
1
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
392
1
                                                }).await.
err_tip0
(|| "Error calling execution_response with missing inputs")
?0
;
393
                                            } else {
394
2
                                                grpc_client.execution_response(ExecuteResult{
395
2
                                                    instance_name,
396
2
                                                    operation_id,
397
2
                                                    result: Some(execute_result::Result::InternalError(e.into())),
398
2
                                                }).await.
err_tip0
(|| "Error calling execution_response with error")
?0
;
399
                                            }
400
                                        },
401
                                    }
402
0
                                    Ok(())
403
5
                                }
404
                            };
405
406
6
                            self.actions_in_transit.fetch_add(1, Ordering::Release);
407
408
6
                            let add_future_channel = add_future_channel.clone();
409
410
6
                            info_span!(
411
                                "worker_start_action_ctx",
412
                                operation_id = operation_id_to_log,
413
6
                                digest_function = %digest_hasher.to_string(),
414
6
                            ).in_scope(|| {
415
6
                                let _guard = Context::current_with_value(digest_hasher)
416
6
                                    .attach();
417
418
6
                                let actions_in_flight = actions_in_flight.clone();
419
6
                                let actions_notify = actions_notify.clone();
420
6
                                let actions_in_flight_fail = actions_in_flight.clone();
421
6
                                let actions_notify_fail = actions_notify.clone();
422
6
                                actions_in_flight.fetch_add(1, Ordering::Release);
423
424
6
                                futures.push(
425
6
                                    spawn!("worker_start_action", start_action_fut).map(move |res| 
{5
426
5
                                        let res = res.err_tip(|| "Failed to launch spawn")
?0
;
427
5
                                        if let Err(
err3
) = &res {
  Branch (427:48): [True: 3, False: 2]
  Branch (427:48): [Folded - Ignored]
  Branch (427:48): [Folded - Ignored]
428
3
                                            error!(?err, "Error executing action");
429
2
                                        }
430
5
                                        add_future_channel
431
5
                                            .send(make_publish_future(res).then(move |res| 
{0
432
0
                                                actions_in_flight.fetch_sub(1, Ordering::Release);
433
0
                                                actions_notify.notify_one();
434
0
                                                core::future::ready(res)
435
5
                                            
}0
).boxed())
436
5
                                            .map_err(|_| make_err!(
Code::Internal0
, "LocalWorker could not send future"))
?0
;
437
5
                                        Ok(())
438
5
                                    })
439
6
                                    .or_else(move |err| 
{0
440
                                        // If the make_publish_future is not run we still need to notify.
441
0
                                        actions_in_flight_fail.fetch_sub(1, Ordering::Release);
442
0
                                        actions_notify_fail.notify_one();
443
0
                                        core::future::ready(Err(err))
444
0
                                    })
445
6
                                    .boxed()
446
                                );
447
6
                            });
448
                        }
449
                    }
450
0
                },
451
24
                
res5
= add_future_rx.next() => {
452
5
                    let fut = res.err_tip(|| "New future stream receives should never be closed")
?0
;
453
5
                    futures.push(fut);
454
                },
455
24
                
res5
= futures.next() =>
res5
.
err_tip5
(|| "Keep-alive should always pending. Likely unable to send data to scheduler")
?0
?0
,
456
24
                
complete_msg0
= shutdown_rx.recv().fuse() => {
457
0
                    warn!("Worker loop received shutdown signal. Shutting down worker...",);
458
0
                    let mut grpc_client = self.grpc_client.clone();
459
0
                    let shutdown_guard = complete_msg.map_err(|e| make_err!(Code::Internal, "Failed to receive shutdown message: {e:?}"))?;
460
0
                    let actions_in_flight = actions_in_flight.clone();
461
0
                    let actions_notify = actions_notify.clone();
462
0
                    let shutdown_future = async move {
463
                        // Wait for in-flight operations to be fully completed.
464
0
                        while actions_in_flight.load(Ordering::Acquire) > 0 {
  Branch (464:31): [True: 0, False: 0]
  Branch (464:31): [Folded - Ignored]
  Branch (464:31): [Folded - Ignored]
465
0
                            actions_notify.notified().await;
466
                        }
467
                        // Sending this message immediately evicts all jobs from
468
                        // this worker, of which there should be none.
469
0
                        if let Err(e) = grpc_client.going_away(GoingAwayRequest {}).await {
  Branch (469:32): [True: 0, False: 0]
  Branch (469:32): [Folded - Ignored]
  Branch (469:32): [Folded - Ignored]
470
0
                            error!("Failed to send GoingAwayRequest: {e}",);
471
0
                            return Err(e);
472
0
                        }
473
                        // Allow shutdown to occur now.
474
0
                        drop(shutdown_guard);
475
0
                        Ok::<(), Error>(())
476
0
                    };
477
0
                    futures.push(shutdown_future.boxed());
478
0
                    shutting_down = true;
479
                },
480
            };
481
        }
482
        // Unreachable.
483
1
    }
484
}
485
486
type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>;
487
488
pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
489
    config: Arc<LocalWorkerConfig>,
490
    running_actions_manager: Arc<U>,
491
    connection_factory: ConnectionFactory<T>,
492
    sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
493
    metrics: Arc<Metrics>,
494
}
495
496
impl<
497
    T: WorkerApiClientTrait + core::fmt::Debug + 'static,
498
    U: RunningActionsManager + core::fmt::Debug,
499
> core::fmt::Debug for LocalWorker<T, U>
500
{
501
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
502
0
        f.debug_struct("LocalWorker")
503
0
            .field("config", &self.config)
504
0
            .field("running_actions_manager", &self.running_actions_manager)
505
0
            .field("metrics", &self.metrics)
506
0
            .finish_non_exhaustive()
507
0
    }
508
}
509
510
/// Creates a new `LocalWorker`. The `cas_store` must be an instance of
511
/// `FastSlowStore` and will be checked at runtime.
512
2
pub async fn new_local_worker(
513
2
    config: Arc<LocalWorkerConfig>,
514
2
    cas_store: Store,
515
2
    ac_store: Option<Store>,
516
2
    historical_store: Store,
517
2
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
518
2
    let fast_slow_store = cas_store
519
2
        .downcast_ref::<FastSlowStore>(None)
520
2
        .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")
?0
521
2
        .get_arc()
522
2
        .err_tip(|| "FastSlowStore's Arc doesn't exist")
?0
;
523
524
    // Log warning about CAS configuration for multi-worker setups
525
2
    event!(
526
2
        Level::INFO,
527
2
        worker_name = %config.name,
528
2
        "Starting worker '{}'. IMPORTANT: If running multiple workers, all workers \
529
2
        must share the same CAS storage path to avoid 'Object not found' errors.",
530
2
        config.name
531
    );
532
533
2
    if let Ok(
path1
) = fs::canonicalize(&config.work_directory).await {
  Branch (533:12): [True: 1, False: 1]
  Branch (533:12): [Folded - Ignored]
  Branch (533:12): [Folded - Ignored]
534
1
        fs::remove_dir_all(&path).await.err_tip(|| 
{0
535
0
            format!(
536
0
                "Could not remove work_directory '{}' in LocalWorker",
537
0
                &path.as_path().to_str().unwrap_or("bad path")
538
            )
539
0
        })?;
540
1
    }
541
542
2
    fs::create_dir_all(&config.work_directory)
543
2
        .await
544
2
        .err_tip(|| format!(
"Could not make work_directory : {}"0
,
config.work_directory0
))
?0
;
545
2
    let entrypoint = if config.entrypoint.is_empty() {
  Branch (545:25): [True: 2, False: 0]
  Branch (545:25): [Folded - Ignored]
  Branch (545:25): [Folded - Ignored]
546
2
        None
547
    } else {
548
0
        Some(config.entrypoint.clone())
549
    };
550
2
    let max_action_timeout = if config.max_action_timeout == 0 {
  Branch (550:33): [True: 2, False: 0]
  Branch (550:33): [Folded - Ignored]
  Branch (550:33): [Folded - Ignored]
551
2
        DEFAULT_MAX_ACTION_TIMEOUT
552
    } else {
553
0
        Duration::from_secs(config.max_action_timeout as u64)
554
    };
555
2
    let max_upload_timeout = if config.max_upload_timeout == 0 {
  Branch (555:33): [True: 2, False: 0]
  Branch (555:33): [Folded - Ignored]
  Branch (555:33): [Folded - Ignored]
556
2
        DEFAULT_MAX_UPLOAD_TIMEOUT
557
    } else {
558
0
        Duration::from_secs(config.max_upload_timeout as u64)
559
    };
560
561
    // Initialize directory cache if configured
562
2
    let directory_cache = if let Some(
cache_config0
) = &config.directory_cache {
  Branch (562:34): [True: 0, False: 2]
  Branch (562:34): [Folded - Ignored]
  Branch (562:34): [Folded - Ignored]
563
        use std::path::PathBuf;
564
565
        use crate::directory_cache::{
566
            DirectoryCache, DirectoryCacheConfig as WorkerDirCacheConfig,
567
        };
568
569
0
        let cache_root = if cache_config.cache_root.is_empty() {
  Branch (569:29): [True: 0, False: 0]
  Branch (569:29): [Folded - Ignored]
  Branch (569:29): [Folded - Ignored]
570
0
            PathBuf::from(&config.work_directory).parent().map_or_else(
571
0
                || PathBuf::from("/tmp/nativelink_directory_cache"),
572
0
                |p| p.join("directory_cache"),
573
            )
574
        } else {
575
0
            PathBuf::from(&cache_config.cache_root)
576
        };
577
578
0
        let worker_cache_config = WorkerDirCacheConfig {
579
0
            max_entries: cache_config.max_entries,
580
0
            max_size_bytes: cache_config.max_size_bytes,
581
0
            cache_root,
582
0
        };
583
584
0
        match DirectoryCache::new(worker_cache_config, Store::new(fast_slow_store.clone())).await {
585
0
            Ok(cache) => {
586
0
                tracing::info!("Directory cache initialized successfully");
587
0
                Some(Arc::new(cache))
588
            }
589
0
            Err(e) => {
590
0
                tracing::warn!("Failed to initialize directory cache: {:?}", e);
591
0
                None
592
            }
593
        }
594
    } else {
595
2
        None
596
    };
597
598
2
    let running_actions_manager =
599
2
        Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
600
2
            root_action_directory: config.work_directory.clone(),
601
2
            execution_configuration: ExecutionConfiguration {
602
2
                entrypoint,
603
2
                additional_environment: config.additional_environment.clone(),
604
2
            },
605
2
            cas_store: fast_slow_store,
606
2
            ac_store,
607
2
            historical_store,
608
2
            upload_action_result_config: &config.upload_action_result,
609
2
            max_action_timeout,
610
2
            max_upload_timeout,
611
2
            timeout_handled_externally: config.timeout_handled_externally,
612
2
            directory_cache,
613
2
        })
?0
);
614
2
    let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager(
615
2
        config.clone(),
616
2
        running_actions_manager,
617
2
        Box::new(move || 
{0
618
0
            let config = config.clone();
619
0
            Box::pin(async move {
620
0
                let timeout = config
621
0
                    .worker_api_endpoint
622
0
                    .timeout
623
0
                    .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
624
0
                let timeout_duration = Duration::from_secs_f32(timeout);
625
0
                let tls_config =
626
0
                    tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
627
0
                        .err_tip(|| "Parsing local worker TLS configuration")?;
628
0
                let endpoint =
629
0
                    tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config)
630
0
                        .map_err(|e| make_input_err!("Invalid URI for worker endpoint : {e:?}"))?
631
0
                        .connect_timeout(timeout_duration)
632
0
                        .timeout(timeout_duration);
633
634
0
                let transport = endpoint.connect().await.map_err(|e| {
635
0
                    make_err!(
636
0
                        Code::Internal,
637
                        "Could not connect to endpoint {}: {e:?}",
638
0
                        config.worker_api_endpoint.uri
639
                    )
640
0
                })?;
641
0
                Ok(WorkerApiClient::new(transport).into())
642
0
            })
643
0
        }),
644
2
        Box::new(move |d| 
Box::pin0
(
sleep0
(
d0
))),
645
    );
646
2
    Ok(local_worker)
647
2
}
648
649
impl<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorker<T, U> {
650
10
    pub fn new_with_connection_factory_and_actions_manager(
651
10
        config: Arc<LocalWorkerConfig>,
652
10
        running_actions_manager: Arc<U>,
653
10
        connection_factory: ConnectionFactory<T>,
654
10
        sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>,
655
10
    ) -> Self {
656
10
        let metrics = Arc::new(Metrics::new(Arc::downgrade(
657
10
            running_actions_manager.metrics(),
658
        )));
659
10
        Self {
660
10
            config,
661
10
            running_actions_manager,
662
10
            connection_factory,
663
10
            sleep_fn: Some(sleep_fn),
664
10
            metrics,
665
10
        }
666
10
    }
667
668
    #[allow(
669
        clippy::missing_const_for_fn,
670
        reason = "False positive on stable, but not on nightly"
671
    )]
672
0
    pub fn name(&self) -> &String {
673
0
        &self.config.name
674
0
    }
675
676
10
    async fn register_worker(
677
10
        &self,
678
10
        client: &mut T,
679
10
    ) -> Result<(String, Streaming<UpdateForWorker>), Error> {
680
10
        let mut extra_envs: HashMap<String, String> = HashMap::new();
681
10
        if let Some(
ref additional_environment0
) = self.config.additional_environment {
  Branch (681:16): [True: 0, False: 10]
  Branch (681:16): [Folded - Ignored]
  Branch (681:16): [Folded - Ignored]
682
0
            for (name, source) in additional_environment {
683
0
                let value = match source {
684
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
685
                    EnvironmentSource::FromEnvironment => {
686
0
                        Cow::Owned(env::var(name).unwrap_or_default())
687
                    }
688
0
                    other => {
689
0
                        debug!(
690
                            ?other,
691
0
                            "Worker registration doesn't support this type of additional environment"
692
                        );
693
0
                        continue;
694
                    }
695
                };
696
0
                extra_envs.insert(name.clone(), value.into_owned());
697
            }
698
10
        }
699
700
10
        let connect_worker_request = make_connect_worker_request(
701
10
            self.config.name.clone(),
702
10
            &self.config.platform_properties,
703
10
            &extra_envs,
704
10
            self.config.max_inflight_tasks,
705
10
        )
706
10
        .await
?0
;
707
10
        let 
mut update_for_worker_stream8
= client
708
10
            .connect_worker(connect_worker_request)
709
10
            .await
710
8
            .err_tip(|| "Could not call connect_worker() in worker")
?0
711
8
            .into_inner();
712
713
8
        let 
first_msg_update7
= update_for_worker_stream
714
8
            .next()
715
8
            .await
716
8
            .err_tip(|| "Got EOF expected UpdateForWorker")
?1
717
7
            .err_tip(|| "Got error when receiving UpdateForWorker")
?0
718
            .update;
719
720
7
        let worker_id = match first_msg_update {
721
7
            Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id,
722
0
            other => {
723
0
                return Err(make_input_err!(
724
0
                    "Expected first response from scheduler to be a ConnectResult got : {:?}",
725
0
                    other
726
0
                ));
727
            }
728
        };
729
7
        Ok((worker_id, update_for_worker_stream))
730
8
    }
731
732
    #[instrument(skip(self), level = Level::INFO)]
733
8
    pub async fn run(
734
8
        mut self,
735
8
        mut shutdown_rx: broadcast::Receiver<ShutdownGuard>,
736
8
    ) -> Result<(), Error> {
737
        let sleep_fn = self
738
            .sleep_fn
739
            .take()
740
            .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?;
741
        let sleep_fn_pin = Pin::new(&sleep_fn);
742
2
        let error_handler = Box::pin(move |err| async move {
743
2
            error!(?err, "Error");
744
2
            (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await;
745
4
        });
746
747
        loop {
748
            // First connect to our endpoint.
749
            let mut client = match (self.connection_factory)().await {
750
                Ok(client) => client,
751
                Err(e) => {
752
                    (error_handler)(e).await;
753
                    continue; // Try to connect again.
754
                }
755
            };
756
757
            // Next register our worker with the scheduler.
758
            let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await {
759
                Err(e) => {
760
                    (error_handler)(e).await;
761
                    continue; // Try to connect again.
762
                }
763
                Ok((worker_id, update_for_worker_stream)) => (
764
                    LocalWorkerImpl::new(
765
                        &self.config,
766
                        client,
767
                        worker_id,
768
                        self.running_actions_manager.clone(),
769
                        self.metrics.clone(),
770
                    ),
771
                    update_for_worker_stream,
772
                ),
773
            };
774
            info!(
775
                worker_id = %inner.worker_id,
776
                "Worker registered with scheduler"
777
            );
778
779
            // Now listen for connections and run all other services.
780
            if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await {
781
                'no_more_actions: {
782
                    // Ensure there are no actions in transit before we try to kill
783
                    // all our actions.
784
                    const ITERATIONS: usize = 1_000;
785
786
                    const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler";
787
788
                    let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
789
                    for _ in 0..ITERATIONS {
790
                        if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
791
                            break 'no_more_actions;
792
                        }
793
                        (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
794
                    }
795
                    error!(ERROR_MSG);
796
                    return Err(err.append(ERROR_MSG));
797
                }
798
                error!(?err, "Worker disconnected from scheduler");
799
                // Kill off any existing actions because if we re-connect, we'll
800
                // get some more and it might resource lock us.
801
                self.running_actions_manager.kill_all().await;
802
803
                (error_handler)(err).await; // Try to connect again.
804
            }
805
        }
806
        // Unreachable.
807
0
    }
808
}
809
810
#[derive(Debug, MetricsComponent)]
811
pub struct Metrics {
812
    #[metric(
813
        help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it."
814
    )]
815
    start_actions_received: CounterWithTime,
816
    #[metric(help = "Total number of disconnects received from the scheduler.")]
817
    disconnects_received: CounterWithTime,
818
    #[metric(help = "Total number of keep-alives received from the scheduler.")]
819
    keep_alives_received: CounterWithTime,
820
    #[metric(
821
        help = "Stats about the calls to check if an action satisfies the config supplied script."
822
    )]
823
    preconditions: AsyncCounterWrapper,
824
    #[metric]
825
    #[allow(
826
        clippy::struct_field_names,
827
        reason = "TODO Fix this. Triggers on nightly"
828
    )]
829
    running_actions_manager_metrics: Weak<RunningActionManagerMetrics>,
830
}
831
832
impl RootMetricsComponent for Metrics {}
833
834
impl Metrics {
835
10
    fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self {
836
10
        Self {
837
10
            start_actions_received: CounterWithTime::default(),
838
10
            disconnects_received: CounterWithTime::default(),
839
10
            keep_alives_received: CounterWithTime::default(),
840
10
            preconditions: AsyncCounterWrapper::default(),
841
10
            running_actions_manager_metrics,
842
10
        }
843
10
    }
844
}
845
846
impl Metrics {
847
6
    async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>(
848
6
        self: Arc<Self>,
849
6
        fut: F,
850
6
    ) -> U {
851
6
        fut(self).await
852
5
    }
853
}