Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/local_worker.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::str;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::process::Stdio;
20
use std::sync::{Arc, Weak};
21
22
use futures::future::BoxFuture;
23
use futures::stream::FuturesUnordered;
24
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
25
use nativelink_config::cas_server::LocalWorkerConfig;
26
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
27
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
28
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
29
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
30
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
31
    ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
32
    execute_result,
33
};
34
use nativelink_store::fast_slow_store::FastSlowStore;
35
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
36
use nativelink_util::common::fs;
37
use nativelink_util::digest_hasher::DigestHasherFunc;
38
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
39
use nativelink_util::shutdown_guard::ShutdownGuard;
40
use nativelink_util::store_trait::Store;
41
use nativelink_util::{spawn, tls_utils};
42
use opentelemetry::context::Context;
43
use tokio::process;
44
use tokio::sync::{broadcast, mpsc};
45
use tokio::time::sleep;
46
use tokio_stream::wrappers::UnboundedReceiverStream;
47
use tonic::Streaming;
48
use tracing::{Level, debug, error, info, info_span, instrument, warn};
49
50
use crate::running_actions_manager::{
51
    ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
52
    RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
53
};
54
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
55
use crate::worker_utils::make_connect_worker_request;
56
57
/// Amount of time to wait if we have actions in transit before we try to
58
/// consider an error to have occurred.
59
const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.;
60
61
/// If we lose connection to the worker api server we will wait this many seconds
62
/// before trying to connect.
63
const CONNECTION_RETRY_DELAY_S: f32 = 0.5;
64
65
/// Default endpoint timeout. If this value gets modified the documentation in
66
/// `cas_server.rs` must also be updated.
67
const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.;
68
69
/// Default maximum amount of time a task is allowed to run for.
70
/// If this value gets modified the documentation in `cas_server.rs` must also be updated.
71
const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_secs(1200); // 20 mins.
72
73
struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
74
    config: &'a LocalWorkerConfig,
75
    // According to the tonic documentation it is a cheap operation to clone this.
76
    grpc_client: T,
77
    worker_id: String,
78
    running_actions_manager: Arc<U>,
79
    // Number of actions that have been received in `Update::StartAction`, but
80
    // not yet processed by running_actions_manager's spawn. This number should
81
    // always be zero if there are no actions running and no actions being waited
82
    // on by the scheduler.
83
    actions_in_transit: Arc<AtomicU64>,
84
    metrics: Arc<Metrics>,
85
}
86
87
4
async fn preconditions_met(precondition_script: Option<String>) -> Result<(), Error> {
88
4
    let Some(
precondition_script1
) = &precondition_script else {
  Branch (88:9): [True: 1, False: 3]
  Branch (88:9): [Folded - Ignored]
  Branch (88:9): [Folded - Ignored]
89
        // No script means we are always ok to proceed.
90
3
        return Ok(());
91
    };
92
    // TODO: Might want to pass some information about the command to the
93
    //       script, but at this point it's not even been downloaded yet,
94
    //       so that's not currently possible.  Perhaps we'll move this in
95
    //       future to pass useful information through?  Or perhaps we'll
96
    //       have a pre-condition and a pre-execute script instead, although
97
    //       arguably entrypoint already gives us that.
98
1
    let precondition_process = process::Command::new(precondition_script)
99
1
        .kill_on_drop(true)
100
1
        .stdin(Stdio::null())
101
1
        .stdout(Stdio::piped())
102
1
        .stderr(Stdio::null())
103
1
        .env_clear()
104
1
        .spawn()
105
1
        .err_tip(|| format!(
"Could not execute precondition command {precondition_script:?}"0
))
?0
;
106
1
    let output = precondition_process.wait_with_output().await
?0
;
107
1
    if output.status.code() == Some(0) {
  Branch (107:8): [True: 0, False: 1]
  Branch (107:8): [Folded - Ignored]
  Branch (107:8): [Folded - Ignored]
108
0
        Ok(())
109
    } else {
110
1
        Err(make_err!(
111
1
            Code::ResourceExhausted,
112
1
            "Preconditions script returned status {} - {}",
113
1
            output.status,
114
1
            str::from_utf8(&output.stdout).unwrap_or("")
115
1
        ))
116
    }
117
4
}
118
119
impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> {
120
5
    fn new(
121
5
        config: &'a LocalWorkerConfig,
122
5
        grpc_client: T,
123
5
        worker_id: String,
124
5
        running_actions_manager: Arc<U>,
125
5
        metrics: Arc<Metrics>,
126
5
    ) -> Self {
127
5
        Self {
128
5
            config,
129
5
            grpc_client,
130
5
            worker_id,
131
5
            running_actions_manager,
132
5
            // Number of actions that have been received in `Update::StartAction`, but
133
5
            // not yet processed by running_actions_manager's spawn. This number should
134
5
            // always be zero if there are no actions running and no actions being waited
135
5
            // on by the scheduler.
136
5
            actions_in_transit: Arc::new(AtomicU64::new(0)),
137
5
            metrics,
138
5
        }
139
5
    }
140
141
    /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`.
142
5
    async fn start_keep_alive(&self) -> Result<(), Error> 
{4
143
        // According to tonic's documentation this call should be cheap and is the same stream.
144
4
        let mut grpc_client = self.grpc_client.clone();
145
146
        loop {
147
4
            let timeout = self
148
4
                .config
149
4
                .worker_api_endpoint
150
4
                .timeout
151
4
                .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
152
            // We always send 2 keep alive requests per timeout. Http2 should manage most of our
153
            // timeout issues, this is a secondary check to ensure we can still send data.
154
4
            sleep(Duration::from_secs_f32(timeout / 2.)).await;
155
0
            if let Err(e) = grpc_client
  Branch (155:20): [True: 0, False: 0]
  Branch (155:20): [Folded - Ignored]
  Branch (155:20): [Folded - Ignored]
156
0
                .keep_alive(KeepAliveRequest {
157
0
                    worker_id: self.worker_id.clone(),
158
0
                })
159
0
                .await
160
            {
161
0
                return Err(make_err!(
162
0
                    Code::Internal,
163
0
                    "Failed to send KeepAlive in LocalWorker : {:?}",
164
0
                    e
165
0
                ));
166
0
            }
167
        }
168
0
    }
169
170
5
    async fn run(
171
5
        &self,
172
5
        update_for_worker_stream: Streaming<UpdateForWorker>,
173
5
        shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>,
174
5
    ) -> Result<(), Error> {
175
        // This big block of logic is designed to help simplify upstream components. Upstream
176
        // components can write standard futures that return a `Result<(), Error>` and this block
177
        // will forward the error up to the client and disconnect from the scheduler.
178
        // It is a common use case that an item sent through update_for_worker_stream will always
179
        // have a response but the response will be triggered through a callback to the scheduler.
180
        // This can be quite tricky to manage, so what we have done here is given access to a
181
        // `futures` variable which because this is in a single thread as well as a channel that you
182
        // send a future into that makes it into the `futures` variable.
183
        // This means that if you want to perform an action based on the result of the future
184
        // you use the `.map()` method and the new action will always come to live in this spawn,
185
        // giving mutable access to stuff in this struct.
186
        // NOTE: If you ever return from this function it will disconnect from the scheduler.
187
5
        let mut futures = FuturesUnordered::new();
188
5
        futures.push(self.start_keep_alive().boxed());
189
190
5
        let (add_future_channel, add_future_rx) = mpsc::unbounded_channel();
191
5
        let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse();
192
193
5
        let mut update_for_worker_stream = update_for_worker_stream.fuse();
194
        // A notify which is triggered every time actions_in_flight is subtracted.
195
5
        let actions_notify = Arc::new(tokio::sync::Notify::new());
196
        // A counter of actions that are in-flight, this is similar to actions_in_transit but
197
        // includes the AC upload and notification to the scheduler.
198
5
        let actions_in_flight = Arc::new(AtomicU64::new(0));
199
        // Set to true when shutting down, this stops any new StartAction.
200
5
        let mut shutting_down = false;
201
202
        loop {
203
16
            select! {
204
16
                
maybe_update6
= update_for_worker_stream.next() => {
205
6
                    match maybe_update
206
6
                        .err_tip(|| "UpdateForWorker stream closed early")
?1
207
5
                        .err_tip(|| "Got error in UpdateForWorker stream")
?0
208
                        .update
209
5
                        .err_tip(|| "Expected update to exist in UpdateForWorker")
?0
210
                    {
211
                        Update::ConnectionResult(_) => {
212
0
                            return Err(make_input_err!(
213
0
                                "Got ConnectionResult in LocalWorker::run which should never happen"
214
0
                            ));
215
                        }
216
                        // TODO(palfrey) We should possibly do something with this notification.
217
0
                        Update::Disconnect(()) => {
218
0
                            self.metrics.disconnects_received.inc();
219
0
                        }
220
0
                        Update::KeepAlive(()) => {
221
0
                            self.metrics.keep_alives_received.inc();
222
0
                        }
223
1
                        Update::KillOperationRequest(kill_operation_request) => {
224
1
                            let operation_id = OperationId::from(kill_operation_request.operation_id);
225
1
                            if let Err(
err0
) = self.running_actions_manager.kill_operation(&operation_id).await {
  Branch (225:36): [True: 0, False: 1]
  Branch (225:36): [Folded - Ignored]
  Branch (225:36): [Folded - Ignored]
226
0
                                error!(
227
                                    ?operation_id,
228
                                    ?err,
229
0
                                    "Failed to send kill request for operation"
230
                                );
231
1
                            }
232
                        }
233
4
                        Update::StartAction(start_execute) => {
234
                            // Don't accept any new requests if we're shutting down.
235
4
                            if shutting_down {
  Branch (235:32): [True: 0, False: 4]
  Branch (235:32): [Folded - Ignored]
  Branch (235:32): [Folded - Ignored]
236
0
                                if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) {
  Branch (236:40): [True: 0, False: 0]
  Branch (236:40): [Folded - Ignored]
  Branch (236:40): [Folded - Ignored]
237
0
                                    self.grpc_client.clone().execution_response(
238
0
                                        ExecuteResult{
239
0
                                            worker_id: self.worker_id.clone(),
240
0
                                            instance_name,
241
0
                                            operation_id: start_execute.operation_id,
242
0
                                            result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())),
243
0
                                        }
244
0
                                    ).await?;
245
0
                                }
246
0
                                continue;
247
4
                            }
248
249
4
                            self.metrics.start_actions_received.inc();
250
251
4
                            let execute_request = start_execute.execute_request.as_ref();
252
4
                            let operation_id = start_execute.operation_id.clone();
253
4
                            let maybe_instance_name = execute_request.map(|v| v.instance_name.clone());
254
4
                            let action_digest = execute_request.and_then(|v| v.action_digest.clone());
255
4
                            let digest_hasher = execute_request
256
4
                                .ok_or_else(|| make_input_err!("Expected execute_request to be set"))
257
4
                                .and_then(|v| DigestHasherFunc::try_from(v.digest_function))
258
4
                                .err_tip(|| "In LocalWorkerImpl::new()")
?0
;
259
260
4
                            let start_action_fut = {
261
4
                                let precondition_script_cfg = self.config.experimental_precondition_script.clone();
262
4
                                let actions_in_transit = self.actions_in_transit.clone();
263
4
                                let worker_id = self.worker_id.clone();
264
4
                                let running_actions_manager = self.running_actions_manager.clone();
265
4
                                let execute_complete = maybe_instance_name.as_ref().map(|instance_name| ExecuteComplete{
266
4
                                    worker_id: worker_id.clone(),
267
4
                                    instance_name: instance_name.clone(),
268
4
                                    operation_id: operation_id.clone(),
269
4
                                });
270
4
                                let mut grpc_client = self.grpc_client.clone();
271
4
                                self.metrics.clone().wrap(move |metrics| async move {
272
4
                                    metrics.preconditions.wrap(preconditions_met(precondition_script_cfg))
273
4
                                    .and_then(|()| 
running_actions_manager3
.
create_and_add_action3
(
worker_id3
,
start_execute3
))
274
4
                                    .map(move |r| {
275
                                        // Now that we either failed or registered our action, we can
276
                                        // consider the action to no longer be in transit.
277
4
                                        actions_in_transit.fetch_sub(1, Ordering::Release);
278
4
                                        r
279
4
                                    })
280
4
                                    .and_then(move |action| 
{3
281
3
                                        debug!(
282
3
                                            operation_id = ?action.get_operation_id(),
283
3
                                            "Received request to run action"
284
                                        );
285
3
                                        action
286
3
                                            .clone()
287
3
                                            .prepare_action()
288
3
                                            .and_then(RunningAction::execute)
289
3
                                            .and_then(move |result| async move 
{2
290
                                                // Notify that we're completed with execution and simply uploading results.
291
                                                // We only do this on success as otherwise the action may be retried and
292
                                                // cause a conflict with our existing operation ID.
293
2
                                                if let Some(execute_complete) = execute_complete {
  Branch (293:56): [True: 2, False: 0]
  Branch (293:56): [Folded - Ignored]
  Branch (293:56): [Folded - Ignored]
294
2
                                                    drop(grpc_client.execution_complete(execute_complete).await);
295
0
                                                }
296
2
                                                Ok(result)
297
4
                                            })
298
3
                                            .and_then(RunningAction::upload_results)
299
3
                                            .and_then(RunningAction::get_finished_result)
300
                                            // Note: We need ensure we run cleanup even if one of the other steps fail.
301
3
                                            .then(move |result| async move 
{2
302
2
                                                if let Err(
e0
) = action.cleanup().await {
  Branch (302:56): [True: 0, False: 2]
  Branch (302:56): [Folded - Ignored]
  Branch (302:56): [Folded - Ignored]
303
0
                                                    return Result::<ActionResult, Error>::Err(e).merge(result);
304
2
                                                }
305
2
                                                result
306
4
                                            })
307
4
                                    
}3
).await
308
7
                                })
309
                            };
310
311
4
                            let make_publish_future = {
312
4
                                let mut grpc_client = self.grpc_client.clone();
313
314
4
                                let worker_id = self.worker_id.clone();
315
4
                                let running_actions_manager = self.running_actions_manager.clone();
316
3
                                move |res: Result<ActionResult, Error>| async move {
317
3
                                    let instance_name = maybe_instance_name
318
3
                                        .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")
?0
;
319
3
                                    match res {
320
2
                                        Ok(mut action_result) => {
321
                                            // Save in the action cache before notifying the scheduler that we've completed.
322
2
                                            if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) {
  Branch (322:52): [True: 2, False: 0]
  Branch (322:52): [Folded - Ignored]
  Branch (322:52): [Folded - Ignored]
323
2
                                                if let Err(
err0
) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await {
  Branch (323:56): [True: 0, False: 2]
  Branch (323:56): [Folded - Ignored]
  Branch (323:56): [Folded - Ignored]
324
0
                                                    error!(
325
                                                        ?err,
326
                                                        ?action_digest,
327
0
                                                        "Error saving action in store",
328
                                                    );
329
2
                                                }
330
0
                                            }
331
2
                                            let action_stage = ActionStage::Completed(action_result);
332
2
                                            grpc_client.execution_response(
333
2
                                                ExecuteResult{
334
2
                                                    worker_id,
335
2
                                                    instance_name,
336
2
                                                    operation_id,
337
2
                                                    result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
338
2
                                                }
339
2
                                            )
340
2
                                            .await
341
0
                                            .err_tip(|| "Error while calling execution_response")?;
342
                                        },
343
1
                                        Err(e) => {
344
1
                                            grpc_client.execution_response(ExecuteResult{
345
1
                                                worker_id,
346
1
                                                instance_name,
347
1
                                                operation_id,
348
1
                                                result: Some(execute_result::Result::InternalError(e.into())),
349
1
                                            }).await.
err_tip0
(|| "Error calling execution_response with error")
?0
;
350
                                        },
351
                                    }
352
0
                                    Ok(())
353
3
                                }
354
                            };
355
356
4
                            self.actions_in_transit.fetch_add(1, Ordering::Release);
357
358
4
                            let add_future_channel = add_future_channel.clone();
359
360
4
                            info_span!(
361
                                "worker_start_action_ctx",
362
4
                                digest_function = %digest_hasher.to_string(),
363
4
                            ).in_scope(|| {
364
4
                                let _guard = Context::current_with_value(digest_hasher)
365
4
                                    .attach();
366
367
4
                                let actions_in_flight = actions_in_flight.clone();
368
4
                                let actions_notify = actions_notify.clone();
369
4
                                let actions_in_flight_fail = actions_in_flight.clone();
370
4
                                let actions_notify_fail = actions_notify.clone();
371
4
                                actions_in_flight.fetch_add(1, Ordering::Release);
372
373
4
                                futures.push(
374
4
                                    spawn!("worker_start_action", start_action_fut).map(move |res| 
{3
375
3
                                        let res = res.err_tip(|| "Failed to launch spawn")
?0
;
376
3
                                        if let Err(
err1
) = &res {
  Branch (376:48): [True: 1, False: 2]
  Branch (376:48): [Folded - Ignored]
  Branch (376:48): [Folded - Ignored]
377
1
                                            error!(?err, "Error executing action");
378
2
                                        }
379
3
                                        add_future_channel
380
3
                                            .send(make_publish_future(res).then(move |res| 
{0
381
0
                                                actions_in_flight.fetch_sub(1, Ordering::Release);
382
0
                                                actions_notify.notify_one();
383
0
                                                core::future::ready(res)
384
3
                                            
}0
).boxed())
385
3
                                            .map_err(|_| make_err!(
Code::Internal0
, "LocalWorker could not send future"))
?0
;
386
3
                                        Ok(())
387
3
                                    })
388
4
                                    .or_else(move |err| 
{0
389
                                        // If the make_publish_future is not run we still need to notify.
390
0
                                        actions_in_flight_fail.fetch_sub(1, Ordering::Release);
391
0
                                        actions_notify_fail.notify_one();
392
0
                                        core::future::ready(Err(err))
393
0
                                    })
394
4
                                    .boxed()
395
                                );
396
4
                            });
397
                        }
398
                    }
399
                },
400
16
                
res3
= add_future_rx.next() => {
401
3
                    let fut = res.err_tip(|| "New future stream receives should never be closed")
?0
;
402
3
                    futures.push(fut);
403
                },
404
16
                
res3
= futures.next() =>
res3
.
err_tip3
(|| "Keep-alive should always pending. Likely unable to send data to scheduler")
?0
?0
,
405
16
                
complete_msg0
= shutdown_rx.recv().fuse() => {
406
0
                    warn!("Worker loop received shutdown signal. Shutting down worker...",);
407
0
                    let mut grpc_client = self.grpc_client.clone();
408
0
                    let worker_id = self.worker_id.clone();
409
0
                    let shutdown_guard = complete_msg.map_err(|e| make_err!(Code::Internal, "Failed to receive shutdown message: {e:?}"))?;
410
0
                    let actions_in_flight = actions_in_flight.clone();
411
0
                    let actions_notify = actions_notify.clone();
412
0
                    let shutdown_future = async move {
413
                        // Wait for in-flight operations to be fully completed.
414
0
                        while actions_in_flight.load(Ordering::Acquire) > 0 {
  Branch (414:31): [True: 0, False: 0]
  Branch (414:31): [Folded - Ignored]
  Branch (414:31): [Folded - Ignored]
415
0
                            actions_notify.notified().await;
416
                        }
417
                        // Sending this message immediately evicts all jobs from
418
                        // this worker, of which there should be none.
419
0
                        if let Err(e) = grpc_client.going_away(GoingAwayRequest { worker_id }).await {
  Branch (419:32): [True: 0, False: 0]
  Branch (419:32): [Folded - Ignored]
  Branch (419:32): [Folded - Ignored]
420
0
                            error!("Failed to send GoingAwayRequest: {e}",);
421
0
                            return Err(e.into());
422
0
                        }
423
                        // Allow shutdown to occur now.
424
0
                        drop(shutdown_guard);
425
0
                        Ok::<(), Error>(())
426
0
                    };
427
0
                    futures.push(shutdown_future.boxed());
428
0
                    shutting_down = true;
429
                },
430
            };
431
        }
432
        // Unreachable.
433
1
    }
434
}
435
436
type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>;
437
438
pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> {
439
    config: Arc<LocalWorkerConfig>,
440
    running_actions_manager: Arc<U>,
441
    connection_factory: ConnectionFactory<T>,
442
    sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>,
443
    metrics: Arc<Metrics>,
444
}
445
446
impl<T: WorkerApiClientTrait + core::fmt::Debug, U: RunningActionsManager + core::fmt::Debug>
447
    core::fmt::Debug for LocalWorker<T, U>
448
{
449
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
450
0
        f.debug_struct("LocalWorker")
451
0
            .field("config", &self.config)
452
0
            .field("running_actions_manager", &self.running_actions_manager)
453
0
            .field("metrics", &self.metrics)
454
0
            .finish_non_exhaustive()
455
0
    }
456
}
457
458
/// Creates a new `LocalWorker`. The `cas_store` must be an instance of
459
/// `FastSlowStore` and will be checked at runtime.
460
2
pub async fn new_local_worker(
461
2
    config: Arc<LocalWorkerConfig>,
462
2
    cas_store: Store,
463
2
    ac_store: Option<Store>,
464
2
    historical_store: Store,
465
2
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
466
2
    let fast_slow_store = cas_store
467
2
        .downcast_ref::<FastSlowStore>(None)
468
2
        .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")
?0
469
2
        .get_arc()
470
2
        .err_tip(|| "FastSlowStore's Arc doesn't exist")
?0
;
471
472
2
    if let Ok(
path1
) = fs::canonicalize(&config.work_directory).await {
  Branch (472:12): [True: 1, False: 1]
  Branch (472:12): [Folded - Ignored]
  Branch (472:12): [Folded - Ignored]
473
1
        fs::remove_dir_all(path)
474
1
            .await
475
1
            .err_tip(|| "Could not remove work_directory in LocalWorker")
?0
;
476
1
    }
477
478
2
    fs::create_dir_all(&config.work_directory)
479
2
        .await
480
2
        .err_tip(|| format!(
"Could not make work_directory : {}"0
,
config.work_directory0
))
?0
;
481
2
    let entrypoint = if config.entrypoint.is_empty() {
  Branch (481:25): [True: 2, False: 0]
  Branch (481:25): [Folded - Ignored]
  Branch (481:25): [Folded - Ignored]
482
2
        None
483
    } else {
484
0
        Some(config.entrypoint.clone())
485
    };
486
2
    let max_action_timeout = if config.max_action_timeout == 0 {
  Branch (486:33): [True: 2, False: 0]
  Branch (486:33): [Folded - Ignored]
  Branch (486:33): [Folded - Ignored]
487
2
        DEFAULT_MAX_ACTION_TIMEOUT
488
    } else {
489
0
        Duration::from_secs(config.max_action_timeout as u64)
490
    };
491
2
    let running_actions_manager =
492
2
        Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
493
2
            root_action_directory: config.work_directory.clone(),
494
2
            execution_configuration: ExecutionConfiguration {
495
2
                entrypoint,
496
2
                additional_environment: config.additional_environment.clone(),
497
2
            },
498
2
            cas_store: fast_slow_store,
499
2
            ac_store,
500
2
            historical_store,
501
2
            upload_action_result_config: &config.upload_action_result,
502
2
            max_action_timeout,
503
2
            timeout_handled_externally: config.timeout_handled_externally,
504
2
        })
?0
);
505
2
    let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager(
506
2
        config.clone(),
507
2
        running_actions_manager,
508
2
        Box::new(move || 
{0
509
0
            let config = config.clone();
510
0
            Box::pin(async move {
511
0
                let timeout = config
512
0
                    .worker_api_endpoint
513
0
                    .timeout
514
0
                    .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
515
0
                let timeout_duration = Duration::from_secs_f32(timeout);
516
0
                let tls_config =
517
0
                    tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
518
0
                        .err_tip(|| "Parsing local worker TLS configuration")?;
519
0
                let endpoint =
520
0
                    tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config)
521
0
                        .map_err(|e| make_input_err!("Invalid URI for worker endpoint : {e:?}"))?
522
0
                        .connect_timeout(timeout_duration)
523
0
                        .timeout(timeout_duration);
524
525
0
                let transport = endpoint.connect().await.map_err(|e| {
526
0
                    make_err!(
527
0
                        Code::Internal,
528
                        "Could not connect to endpoint {}: {e:?}",
529
0
                        config.worker_api_endpoint.uri
530
                    )
531
0
                })?;
532
0
                Ok(WorkerApiClient::new(transport).into())
533
0
            })
534
0
        }),
535
2
        Box::new(move |d| 
Box::pin0
(
sleep0
(
d0
))),
536
    );
537
2
    Ok(local_worker)
538
2
}
539
540
impl<T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorker<T, U> {
541
8
    pub fn new_with_connection_factory_and_actions_manager(
542
8
        config: Arc<LocalWorkerConfig>,
543
8
        running_actions_manager: Arc<U>,
544
8
        connection_factory: ConnectionFactory<T>,
545
8
        sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>,
546
8
    ) -> Self {
547
8
        let metrics = Arc::new(Metrics::new(Arc::downgrade(
548
8
            running_actions_manager.metrics(),
549
        )));
550
8
        Self {
551
8
            config,
552
8
            running_actions_manager,
553
8
            connection_factory,
554
8
            sleep_fn: Some(sleep_fn),
555
8
            metrics,
556
8
        }
557
8
    }
558
559
    #[allow(
560
        clippy::missing_const_for_fn,
561
        reason = "False positive on stable, but not on nightly"
562
    )]
563
0
    pub fn name(&self) -> &String {
564
0
        &self.config.name
565
0
    }
566
567
8
    async fn register_worker(
568
8
        &self,
569
8
        client: &mut T,
570
8
    ) -> Result<(String, Streaming<UpdateForWorker>), Error> {
571
8
        let connect_worker_request =
572
8
            make_connect_worker_request(self.config.name.clone(), &self.config.platform_properties)
573
8
                .await
?0
;
574
8
        let 
mut update_for_worker_stream6
= client
575
8
            .connect_worker(connect_worker_request)
576
8
            .await
577
6
            .err_tip(|| "Could not call connect_worker() in worker")
?0
578
6
            .into_inner();
579
580
6
        let 
first_msg_update5
= update_for_worker_stream
581
6
            .next()
582
6
            .await
583
6
            .err_tip(|| "Got EOF expected UpdateForWorker")
?1
584
5
            .err_tip(|| "Got error when receiving UpdateForWorker")
?0
585
            .update;
586
587
5
        let worker_id = match first_msg_update {
588
5
            Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id,
589
0
            other => {
590
0
                return Err(make_input_err!(
591
0
                    "Expected first response from scheduler to be a ConnectResult got : {:?}",
592
0
                    other
593
0
                ));
594
            }
595
        };
596
5
        Ok((worker_id, update_for_worker_stream))
597
6
    }
598
599
6
    #[instrument(skip(self), level = Level::INFO)]
600
    pub async fn run(
601
        mut self,
602
        mut shutdown_rx: broadcast::Receiver<ShutdownGuard>,
603
    ) -> Result<(), Error> {
604
        let sleep_fn = self
605
            .sleep_fn
606
            .take()
607
            .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?;
608
        let sleep_fn_pin = Pin::new(&sleep_fn);
609
2
        let error_handler = Box::pin(move |err| async move {
610
2
            error!(?err, "Error");
611
2
            (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await;
612
4
        });
613
614
        loop {
615
            // First connect to our endpoint.
616
            let mut client = match (self.connection_factory)().await {
617
                Ok(client) => client,
618
                Err(e) => {
619
                    (error_handler)(e).await;
620
                    continue; // Try to connect again.
621
                }
622
            };
623
624
            // Next register our worker with the scheduler.
625
            let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await {
626
                Err(e) => {
627
                    (error_handler)(e).await;
628
                    continue; // Try to connect again.
629
                }
630
                Ok((worker_id, update_for_worker_stream)) => (
631
                    LocalWorkerImpl::new(
632
                        &self.config,
633
                        client,
634
                        worker_id,
635
                        self.running_actions_manager.clone(),
636
                        self.metrics.clone(),
637
                    ),
638
                    update_for_worker_stream,
639
                ),
640
            };
641
            info!(
642
                worker_id = %inner.worker_id,
643
                "Worker registered with scheduler"
644
            );
645
646
            // Now listen for connections and run all other services.
647
            if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await {
648
                'no_more_actions: {
649
                    // Ensure there are no actions in transit before we try to kill
650
                    // all our actions.
651
                    const ITERATIONS: usize = 1_000;
652
653
                    const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler";
654
655
                    let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32;
656
                    for _ in 0..ITERATIONS {
657
                        if inner.actions_in_transit.load(Ordering::Acquire) == 0 {
658
                            break 'no_more_actions;
659
                        }
660
                        (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await;
661
                    }
662
                    error!(ERROR_MSG);
663
                    return Err(err.append(ERROR_MSG));
664
                }
665
                error!(?err, "Worker disconnected from scheduler");
666
                // Kill off any existing actions because if we re-connect, we'll
667
                // get some more and it might resource lock us.
668
                self.running_actions_manager.kill_all().await;
669
670
                (error_handler)(err).await; // Try to connect again.
671
            }
672
        }
673
        // Unreachable.
674
    }
675
}
676
677
#[derive(Debug, MetricsComponent)]
678
pub struct Metrics {
679
    #[metric(
680
        help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it."
681
    )]
682
    start_actions_received: CounterWithTime,
683
    #[metric(help = "Total number of disconnects received from the scheduler.")]
684
    disconnects_received: CounterWithTime,
685
    #[metric(help = "Total number of keep-alives received from the scheduler.")]
686
    keep_alives_received: CounterWithTime,
687
    #[metric(
688
        help = "Stats about the calls to check if an action satisfies the config supplied script."
689
    )]
690
    preconditions: AsyncCounterWrapper,
691
    #[metric]
692
    #[allow(
693
        clippy::struct_field_names,
694
        reason = "TODO Fix this. Triggers on nightly"
695
    )]
696
    running_actions_manager_metrics: Weak<RunningActionManagerMetrics>,
697
}
698
699
impl RootMetricsComponent for Metrics {}
700
701
impl Metrics {
702
8
    fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self {
703
8
        Self {
704
8
            start_actions_received: CounterWithTime::default(),
705
8
            disconnects_received: CounterWithTime::default(),
706
8
            keep_alives_received: CounterWithTime::default(),
707
8
            preconditions: AsyncCounterWrapper::default(),
708
8
            running_actions_manager_metrics,
709
8
        }
710
8
    }
711
}
712
713
impl Metrics {
714
4
    async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>(
715
4
        self: Arc<Self>,
716
4
        fut: F,
717
4
    ) -> U {
718
4
        fut(self).await
719
3
    }
720
}