/build/source/nativelink-worker/src/local_worker.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::hash::BuildHasher; |
16 | | use core::pin::Pin; |
17 | | use core::str; |
18 | | use core::sync::atomic::{AtomicU64, Ordering}; |
19 | | use core::time::Duration; |
20 | | use std::borrow::Cow; |
21 | | use std::collections::HashMap; |
22 | | use std::env; |
23 | | use std::process::Stdio; |
24 | | use std::sync::{Arc, Weak}; |
25 | | |
26 | | use futures::future::BoxFuture; |
27 | | use futures::stream::FuturesUnordered; |
28 | | use futures::{Future, FutureExt, StreamExt, TryFutureExt, select}; |
29 | | use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig}; |
30 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
31 | | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; |
32 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update; |
33 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient; |
34 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ |
35 | | ActionResourceUsage, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, |
36 | | UpdateForWorker, execute_result, |
37 | | }; |
38 | | use nativelink_store::fast_slow_store::FastSlowStore; |
39 | | use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId}; |
40 | | use nativelink_util::common::fs; |
41 | | use nativelink_util::digest_hasher::DigestHasherFunc; |
42 | | use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime}; |
43 | | use nativelink_util::shutdown_guard::ShutdownGuard; |
44 | | use nativelink_util::store_trait::Store; |
45 | | use nativelink_util::{spawn, tls_utils}; |
46 | | use opentelemetry::context::Context; |
47 | | use tokio::sync::{broadcast, mpsc}; |
48 | | use tokio::{process, time}; |
49 | | use tokio_stream::wrappers::UnboundedReceiverStream; |
50 | | use tonic::Streaming; |
51 | | use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn}; |
52 | | |
53 | | use crate::running_actions_manager::{ |
54 | | ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction, |
55 | | RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl, |
56 | | }; |
57 | | use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper}; |
58 | | use crate::worker_utils::make_connect_worker_request; |
59 | | |
60 | | /// Amount of time to wait if we have actions in transit before we try to |
61 | | /// consider an error to have occurred. |
62 | | const ACTIONS_IN_TRANSIT_TIMEOUT_S: f32 = 10.; |
63 | | |
64 | | /// If we lose connection to the worker api server we will wait this many seconds |
65 | | /// before trying to connect. |
66 | | const CONNECTION_RETRY_DELAY_S: f32 = 0.5; |
67 | | |
68 | | /// Default endpoint timeout. If this value gets modified the documentation in |
69 | | /// `cas_server.rs` must also be updated. |
70 | | const DEFAULT_ENDPOINT_TIMEOUT_S: f32 = 5.; |
71 | | |
72 | | /// Default maximum amount of time a task is allowed to run for. |
73 | | /// If this value gets modified the documentation in `cas_server.rs` must also be updated. |
74 | | const DEFAULT_MAX_ACTION_TIMEOUT: Duration = Duration::from_mins(20); |
75 | | const DEFAULT_MAX_UPLOAD_TIMEOUT: Duration = Duration::from_mins(10); |
76 | | |
77 | | struct FinishedActionResult { |
78 | | action_result: ActionResult, |
79 | | resource_usage: Option<ActionResourceUsage>, |
80 | | } |
81 | | |
82 | | struct LocalWorkerImpl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> { |
83 | | config: &'a LocalWorkerConfig, |
84 | | // According to the tonic documentation it is a cheap operation to clone this. |
85 | | grpc_client: T, |
86 | | worker_id: String, |
87 | | running_actions_manager: Arc<U>, |
88 | | // Number of actions that have been received in `Update::StartAction`, but |
89 | | // not yet processed by running_actions_manager's spawn. This number should |
90 | | // always be zero if there are no actions running and no actions being waited |
91 | | // on by the scheduler. |
92 | | actions_in_transit: Arc<AtomicU64>, |
93 | | metrics: Arc<Metrics>, |
94 | | } |
95 | | |
96 | 7 | pub async fn preconditions_met<H: BuildHasher + Sync>( |
97 | 7 | precondition_script: Option<String>, |
98 | 7 | extra_envs: &HashMap<String, String, H>, |
99 | 7 | ) -> Result<(), Error> { |
100 | 7 | let Some(precondition_script2 ) = &precondition_script else { |
101 | | // No script means we are always ok to proceed. |
102 | 5 | return Ok(()); |
103 | | }; |
104 | | // TODO: Might want to pass some information about the command to the |
105 | | // script, but at this point it's not even been downloaded yet, |
106 | | // so that's not currently possible. Perhaps we'll move this in |
107 | | // future to pass useful information through? Or perhaps we'll |
108 | | // have a pre-condition and a pre-execute script instead, although |
109 | | // arguably entrypoint already gives us that. |
110 | | |
111 | 2 | let maybe_split_cmd = shlex::split(precondition_script); |
112 | 2 | let (command, args) = match &maybe_split_cmd { |
113 | 2 | Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]), |
114 | | None => { |
115 | 0 | return Err(make_input_err!( |
116 | 0 | "Could not parse the value of precondition_script: '{}'", |
117 | 0 | precondition_script, |
118 | 0 | )); |
119 | | } |
120 | | }; |
121 | | |
122 | 2 | let precondition_process = process::Command::new(command) |
123 | 2 | .args(args) |
124 | 2 | .kill_on_drop(true) |
125 | 2 | .stdin(Stdio::null()) |
126 | 2 | .stdout(Stdio::piped()) |
127 | 2 | .stderr(Stdio::null()) |
128 | 2 | .env_clear() |
129 | 2 | .envs(extra_envs) |
130 | 2 | .spawn() |
131 | 2 | .err_tip(|| format!0 ("Could not execute precondition command {precondition_script:?}"))?0 ; |
132 | 2 | let output = precondition_process.wait_with_output().await?0 ; |
133 | 2 | let stdout = str::from_utf8(&output.stdout).unwrap_or(""); |
134 | 2 | trace!(status = %output.status, %stdout, "Preconditions script returned"); |
135 | 2 | if output.status.code() == Some(0) { |
136 | 1 | Ok(()) |
137 | | } else { |
138 | 1 | Err(make_err!( |
139 | 1 | Code::ResourceExhausted, |
140 | 1 | "Preconditions script returned status {} - {}", |
141 | 1 | output.status, |
142 | 1 | stdout |
143 | 1 | )) |
144 | | } |
145 | 7 | } |
146 | | |
147 | | impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorkerImpl<'a, T, U> { |
148 | 8 | fn new( |
149 | 8 | config: &'a LocalWorkerConfig, |
150 | 8 | grpc_client: T, |
151 | 8 | worker_id: String, |
152 | 8 | running_actions_manager: Arc<U>, |
153 | 8 | metrics: Arc<Metrics>, |
154 | 8 | ) -> Self { |
155 | 8 | Self { |
156 | 8 | config, |
157 | 8 | grpc_client, |
158 | 8 | worker_id, |
159 | 8 | running_actions_manager, |
160 | 8 | // Number of actions that have been received in `Update::StartAction`, but |
161 | 8 | // not yet processed by running_actions_manager's spawn. This number should |
162 | 8 | // always be zero if there are no actions running and no actions being waited |
163 | 8 | // on by the scheduler. |
164 | 8 | actions_in_transit: Arc::new(AtomicU64::new(0)), |
165 | 8 | metrics, |
166 | 8 | } |
167 | 8 | } |
168 | | |
169 | | /// Starts a background spawn/thread that will send a message to the server every `timeout / 2`. |
170 | 8 | async fn start_keep_alive(&self) -> Result<(), Error> { |
171 | | // According to tonic's documentation this call should be cheap and is the same stream. |
172 | 8 | let mut grpc_client = self.grpc_client.clone(); |
173 | 8 | let timeout = self |
174 | 8 | .config |
175 | 8 | .worker_api_endpoint |
176 | 8 | .timeout |
177 | 8 | .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S); |
178 | | |
179 | 8 | info!(timeout, "Started KeepAlive"); |
180 | | |
181 | | // We always send 2 keep alive requests per timeout. Http2 should manage most of our |
182 | | // timeout issues, this is a secondary check to ensure we can still send data. |
183 | 8 | let mut interval = time::interval(Duration::from_secs_f32(timeout) / 2); |
184 | 8 | interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); |
185 | | |
186 | | // Skip the first interval as it happens immediately and we don't need a keep alive until timeout/2 has passed |
187 | 8 | interval.tick().await; |
188 | | |
189 | | // Explicitly spawn the keep alive loop so it goes onto a different thread from the execute commands |
190 | 1 | drop( |
191 | 2 | spawn!("keep alives", async move { |
192 | | loop { |
193 | 3 | interval.tick().await; |
194 | 2 | if let Err(e1 ) = grpc_client.keep_alive(KeepAliveRequest {}).await { |
195 | 1 | error!(?e, "Failed to send KeepAlive in LocalWorker"); |
196 | 1 | return; |
197 | 1 | } |
198 | 1 | debug!("Sent KeepAlive"); |
199 | | } |
200 | 1 | }) |
201 | 2 | .await, |
202 | | ); |
203 | 1 | Ok(()) |
204 | 1 | } |
205 | | |
206 | 8 | async fn run( |
207 | 8 | &self, |
208 | 8 | update_for_worker_stream: Streaming<UpdateForWorker>, |
209 | 8 | shutdown_rx: &mut broadcast::Receiver<ShutdownGuard>, |
210 | 8 | ) -> Result<(), Error> { |
211 | | // This big block of logic is designed to help simplify upstream components. Upstream |
212 | | // components can write standard futures that return a `Result<(), Error>` and this block |
213 | | // will forward the error up to the client and disconnect from the scheduler. |
214 | | // It is a common use case that an item sent through update_for_worker_stream will always |
215 | | // have a response but the response will be triggered through a callback to the scheduler. |
216 | | // This can be quite tricky to manage, so what we have done here is given access to a |
217 | | // `futures` variable which because this is in a single thread as well as a channel that you |
218 | | // send a future into that makes it into the `futures` variable. |
219 | | // This means that if you want to perform an action based on the result of the future |
220 | | // you use the `.map()` method and the new action will always come to live in this spawn, |
221 | | // giving mutable access to stuff in this struct. |
222 | | // NOTE: If you ever return from this function it will disconnect from the scheduler. |
223 | 8 | let mut futures = FuturesUnordered::new(); |
224 | 8 | futures.push(self.start_keep_alive().boxed()); |
225 | | |
226 | 8 | let (add_future_channel, add_future_rx) = mpsc::unbounded_channel(); |
227 | 8 | let mut add_future_rx = UnboundedReceiverStream::new(add_future_rx).fuse(); |
228 | | |
229 | 8 | let mut update_for_worker_stream = update_for_worker_stream.fuse(); |
230 | | // A notify which is triggered every time actions_in_flight is subtracted. |
231 | 8 | let actions_notify = Arc::new(tokio::sync::Notify::new()); |
232 | | // A counter of actions that are in-flight, this is similar to actions_in_transit but |
233 | | // includes the AC upload and notification to the scheduler. |
234 | 8 | let actions_in_flight = Arc::new(AtomicU64::new(0)); |
235 | | // Set to true when shutting down, this stops any new StartAction. |
236 | 8 | let mut shutting_down = false; |
237 | | |
238 | | loop { |
239 | 26 | select! { |
240 | 26 | maybe_update8 = update_for_worker_stream.next() => if !shutting_down8 || maybe_update0 .is_some0 () { |
241 | 8 | match maybe_update |
242 | 8 | .err_tip(|| "UpdateForWorker stream closed early")?1 |
243 | 7 | .err_tip(|| "Got error in UpdateForWorker stream")?0 |
244 | | .update |
245 | 7 | .err_tip(|| "Expected update to exist in UpdateForWorker")?0 |
246 | | { |
247 | | Update::ConnectionResult(_) => { |
248 | 0 | return Err(make_input_err!( |
249 | 0 | "Got ConnectionResult in LocalWorker::run which should never happen" |
250 | 0 | )); |
251 | | } |
252 | | // TODO(palfrey) We should possibly do something with this notification. |
253 | 0 | Update::Disconnect(()) => { |
254 | 0 | self.metrics.disconnects_received.inc(); |
255 | 0 | } |
256 | 0 | Update::KeepAlive(()) => { |
257 | 0 | self.metrics.keep_alives_received.inc(); |
258 | 0 | } |
259 | 1 | Update::KillOperationRequest(kill_operation_request) => { |
260 | 1 | let operation_id = OperationId::from(kill_operation_request.operation_id); |
261 | 1 | if let Err(err0 ) = self.running_actions_manager.kill_operation(&operation_id).await { |
262 | 0 | error!( |
263 | | %operation_id, |
264 | | ?err, |
265 | | "Failed to send kill request for operation" |
266 | | ); |
267 | 1 | } |
268 | | } |
269 | 6 | Update::StartAction(start_execute) => { |
270 | | // Don't accept any new requests if we're shutting down. |
271 | 6 | if shutting_down { |
272 | 0 | if let Some(instance_name) = start_execute.execute_request.map(|request| request.instance_name) { |
273 | 0 | self.grpc_client.clone().execution_response( |
274 | 0 | ExecuteResult{ |
275 | 0 | instance_name, |
276 | 0 | operation_id: start_execute.operation_id, |
277 | 0 | result: Some(execute_result::Result::InternalError(make_err!(Code::ResourceExhausted, "Worker shutting down").into())), |
278 | 0 | resource_usage: None, |
279 | 0 | } |
280 | 0 | ).await?; |
281 | 0 | } |
282 | 0 | continue; |
283 | 6 | } |
284 | | |
285 | 6 | self.metrics.start_actions_received.inc(); |
286 | | |
287 | 6 | let execute_request = start_execute.execute_request.as_ref(); |
288 | 6 | let operation_id = start_execute.operation_id.clone(); |
289 | 6 | let operation_id_to_log = operation_id.clone(); |
290 | 6 | let maybe_instance_name = execute_request.map(|v| v.instance_name.clone()); |
291 | 6 | let action_digest = execute_request.and_then(|v| v.action_digest.clone()); |
292 | 6 | let digest_hasher = execute_request |
293 | 6 | .ok_or_else(|| make_input_err!0 ("Expected execute_request to be set")) |
294 | 6 | .and_then(|v| DigestHasherFunc::try_from(v.digest_function)) |
295 | 6 | .err_tip(|| "In LocalWorkerImpl::new()")?0 ; |
296 | | |
297 | 6 | let start_action_fut = { |
298 | 6 | let precondition_script_cfg = self.config.experimental_precondition_script.clone(); |
299 | 6 | let mut extra_envs: HashMap<String, String> = HashMap::new(); |
300 | 6 | if let Some(ref additional_environment0 ) = self.config.additional_environment { |
301 | 0 | for (name, source) in additional_environment { |
302 | 0 | let value = match source { |
303 | 0 | EnvironmentSource::Property(property) => start_execute |
304 | 0 | .platform.as_ref().and_then(|p|p.properties.iter().find(|pr| &pr.name == property)) |
305 | 0 | .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.value.as_str())), |
306 | 0 | EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()), |
307 | 0 | EnvironmentSource::FromEnvironment => Cow::Owned(env::var(name).unwrap_or_default()), |
308 | 0 | other => { |
309 | 0 | debug!(?other, "Worker doesn't support this type of additional environment"); |
310 | 0 | continue; |
311 | | } |
312 | | }; |
313 | 0 | extra_envs.insert(name.clone(), value.into_owned()); |
314 | | } |
315 | 6 | } |
316 | 6 | let actions_in_transit = self.actions_in_transit.clone(); |
317 | 6 | let worker_id = self.worker_id.clone(); |
318 | 6 | let running_actions_manager = self.running_actions_manager.clone(); |
319 | 6 | let mut grpc_client = self.grpc_client.clone(); |
320 | 6 | let complete = ExecuteComplete { |
321 | 6 | operation_id: operation_id.clone(), |
322 | 6 | }; |
323 | 6 | self.metrics.clone().wrap(move |metrics| async move { |
324 | 6 | metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs)) |
325 | 6 | .and_then(|()| running_actions_manager5 .create_and_add_action5 (worker_id5 , start_execute5 )) |
326 | 6 | .map(move |r| { |
327 | | // Now that we either failed or registered our action, we can |
328 | | // consider the action to no longer be in transit. |
329 | 6 | actions_in_transit.fetch_sub(1, Ordering::Release); |
330 | 6 | r |
331 | 6 | }) |
332 | 6 | .and_then(|action| {5 |
333 | 5 | debug!( |
334 | 5 | operation_id = %action.get_operation_id(), |
335 | | "Received request to run action" |
336 | | ); |
337 | 5 | action |
338 | 5 | .clone() |
339 | 5 | .prepare_action() |
340 | 5 | .and_then(RunningAction::execute) |
341 | 5 | .and_then(|result| async move {2 |
342 | | // Notify that execution has completed so it can schedule a new action. |
343 | 2 | drop(grpc_client.execution_complete(complete).await); |
344 | 2 | Ok(result) |
345 | 4 | }) |
346 | 5 | .and_then(RunningAction::upload_results) |
347 | 5 | .and_then(|action| async move {2 |
348 | 2 | let resource_usage = action.resource_usage(); |
349 | 2 | let action_result = action.get_finished_result().await?0 ; |
350 | 2 | Ok(FinishedActionResult { |
351 | 2 | action_result, |
352 | 2 | resource_usage, |
353 | 2 | }) |
354 | 4 | }) |
355 | | // Note: We need ensure we run cleanup even if one of the other steps fail. |
356 | 5 | .then(|result| async move {4 |
357 | 4 | if let Err(e0 ) = action.cleanup().await { |
358 | 0 | return Result::<FinishedActionResult, Error>::Err(e).merge(result); |
359 | 4 | } |
360 | 4 | result |
361 | 8 | }) |
362 | 6 | }5 ).await |
363 | 11 | }) |
364 | | }; |
365 | | |
366 | 6 | let make_publish_future = { |
367 | 6 | let mut grpc_client = self.grpc_client.clone(); |
368 | | |
369 | 6 | let running_actions_manager = self.running_actions_manager.clone(); |
370 | 6 | let worker_id = self.worker_id.clone(); |
371 | 5 | move |res: Result<FinishedActionResult, Error>| async move { |
372 | 5 | let instance_name = maybe_instance_name |
373 | 5 | .err_tip(|| "`instance_name` could not be resolved; this is likely an internal error in local_worker.")?0 ; |
374 | 5 | match res { |
375 | 2 | Ok(FinishedActionResult { mut action_result, resource_usage }) => { |
376 | | // Save in the action cache before notifying the scheduler that we've completed. |
377 | 2 | if let Some(digest_info) = action_digest.clone().and_then(|action_digest| action_digest.try_into().ok()) && |
378 | 2 | let Err(err0 ) = running_actions_manager.cache_action_result(digest_info, &mut action_result, digest_hasher).await { |
379 | 0 | error!( |
380 | | ?err, |
381 | | ?action_digest, |
382 | | "Error saving action in store", |
383 | | ); |
384 | 2 | } |
385 | 2 | let action_stage = ActionStage::Completed(action_result); |
386 | 2 | let resource_usage = resource_usage.map(|mut resource_usage| {0 |
387 | 0 | resource_usage.operation_id.clone_from(&operation_id); |
388 | 0 | resource_usage.worker_id.clone_from(&worker_id); |
389 | 0 | resource_usage |
390 | 0 | }); |
391 | 2 | grpc_client.execution_response( |
392 | 2 | ExecuteResult{ |
393 | 2 | instance_name, |
394 | 2 | operation_id, |
395 | 2 | result: Some(execute_result::Result::ExecuteResponse(action_stage.into())), |
396 | 2 | resource_usage, |
397 | 2 | } |
398 | 2 | ) |
399 | 2 | .await |
400 | 0 | .err_tip(|| "Error while calling execution_response")?; |
401 | | }, |
402 | 3 | Err(e) => { |
403 | 3 | let is_cas_blob_missing = e.code == Code::NotFound |
404 | 2 | && e.message_string().contains("not found in either fast or slow store"); |
405 | 3 | if is_cas_blob_missing { |
406 | 1 | warn!( |
407 | | ?e, |
408 | | "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION" |
409 | | ); |
410 | 1 | let action_result = ActionResult { |
411 | 1 | error: Some(make_err!( |
412 | 1 | Code::FailedPrecondition, |
413 | 1 | "{}", |
414 | 1 | e.message_string() |
415 | 1 | )), |
416 | 1 | ..ActionResult::default() |
417 | 1 | }; |
418 | 1 | let action_stage = ActionStage::Completed(action_result); |
419 | 1 | grpc_client.execution_response(ExecuteResult{ |
420 | 1 | instance_name, |
421 | 1 | operation_id, |
422 | 1 | result: Some(execute_result::Result::ExecuteResponse(action_stage.into())), |
423 | 1 | resource_usage: None, |
424 | 1 | }).await.err_tip0 (|| "Error calling execution_response with missing inputs")?0 ; |
425 | | } else { |
426 | 2 | grpc_client.execution_response(ExecuteResult{ |
427 | 2 | instance_name, |
428 | 2 | operation_id, |
429 | 2 | result: Some(execute_result::Result::InternalError(e.into())), |
430 | 2 | resource_usage: None, |
431 | 2 | }).await.err_tip0 (|| "Error calling execution_response with error")?0 ; |
432 | | } |
433 | | }, |
434 | | } |
435 | 0 | Ok(()) |
436 | 5 | } |
437 | | }; |
438 | | |
439 | 6 | self.actions_in_transit.fetch_add(1, Ordering::Release); |
440 | | |
441 | 6 | let add_future_channel = add_future_channel.clone(); |
442 | | |
443 | 6 | info_span!( |
444 | | "worker_start_action_ctx", |
445 | | operation_id = operation_id_to_log, |
446 | 6 | digest_function = %digest_hasher.to_string(), |
447 | 6 | ).in_scope(|| { |
448 | 6 | let _guard = Context::current_with_value(digest_hasher) |
449 | 6 | .attach(); |
450 | | |
451 | 6 | let actions_in_flight = actions_in_flight.clone(); |
452 | 6 | let actions_notify = actions_notify.clone(); |
453 | 6 | let actions_in_flight_fail = actions_in_flight.clone(); |
454 | 6 | let actions_notify_fail = actions_notify.clone(); |
455 | 6 | actions_in_flight.fetch_add(1, Ordering::Release); |
456 | | |
457 | 6 | futures.push( |
458 | 6 | spawn!("worker_start_action", start_action_fut).map(move |res| {5 |
459 | 5 | let res = res.err_tip(|| "Failed to launch spawn")?0 ; |
460 | 5 | if let Err(err3 ) = &res { |
461 | 3 | error!(?err, "Error executing action"); |
462 | 2 | } |
463 | 5 | add_future_channel |
464 | 5 | .send(make_publish_future(res).then(move |res| {0 |
465 | 0 | actions_in_flight.fetch_sub(1, Ordering::Release); |
466 | 0 | actions_notify.notify_one(); |
467 | 0 | core::future::ready(res) |
468 | 5 | }0 ).boxed()) |
469 | 5 | .map_err(|err| |
470 | 0 | Error::from_std_err(Code::Internal, &err).append("LocalWorker could not send future") |
471 | 0 | )?; |
472 | 5 | Ok(()) |
473 | 5 | }) |
474 | 6 | .or_else(move |err| {0 |
475 | | // If the make_publish_future is not run we still need to notify. |
476 | 0 | actions_in_flight_fail.fetch_sub(1, Ordering::Release); |
477 | 0 | actions_notify_fail.notify_one(); |
478 | 0 | core::future::ready(Err(err)) |
479 | 0 | }) |
480 | 6 | .boxed() |
481 | | ); |
482 | 6 | }); |
483 | | } |
484 | | } |
485 | 0 | }, |
486 | 26 | res5 = add_future_rx.next() => { |
487 | 5 | let fut = res.err_tip(|| "New future stream receives should never be closed")?0 ; |
488 | 5 | futures.push(fut); |
489 | | }, |
490 | 26 | res7 = futures.next() => res7 .err_tip7 (|| "Keep-alive should always pending. Likely unable to send data to scheduler")?1 ?0 , |
491 | 26 | complete_msg0 = shutdown_rx.recv().fuse() => { |
492 | 0 | warn!("Worker loop received shutdown signal. Shutting down worker...",); |
493 | 0 | let mut grpc_client = self.grpc_client.clone(); |
494 | 0 | let shutdown_guard = complete_msg.map_err(|e| |
495 | 0 | Error::from_std_err(Code::Internal, &e).append("Failed to receive shutdown message"))?; |
496 | 0 | let actions_in_flight = actions_in_flight.clone(); |
497 | 0 | let actions_notify = actions_notify.clone(); |
498 | 0 | let shutdown_future = async move { |
499 | | // Wait for in-flight operations to be fully completed. |
500 | 0 | while actions_in_flight.load(Ordering::Acquire) > 0 { |
501 | 0 | actions_notify.notified().await; |
502 | | } |
503 | | // Sending this message immediately evicts all jobs from |
504 | | // this worker, of which there should be none. |
505 | 0 | if let Err(e) = grpc_client.going_away(GoingAwayRequest {}).await { |
506 | 0 | error!("Failed to send GoingAwayRequest: {e}",); |
507 | 0 | return Err(e); |
508 | 0 | } |
509 | | // Allow shutdown to occur now. |
510 | 0 | drop(shutdown_guard); |
511 | 0 | Ok::<(), Error>(()) |
512 | 0 | }; |
513 | 0 | futures.push(shutdown_future.boxed()); |
514 | 0 | shutting_down = true; |
515 | | }, |
516 | | }; |
517 | | } |
518 | | // Unreachable. |
519 | 2 | } |
520 | | } |
521 | | |
522 | | type ConnectionFactory<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, Error>> + Send + Sync>; |
523 | | |
524 | | pub struct LocalWorker<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> { |
525 | | config: Arc<LocalWorkerConfig>, |
526 | | running_actions_manager: Arc<U>, |
527 | | connection_factory: ConnectionFactory<T>, |
528 | | sleep_fn: Option<Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>>, |
529 | | metrics: Arc<Metrics>, |
530 | | } |
531 | | |
532 | | impl< |
533 | | T: WorkerApiClientTrait + core::fmt::Debug + 'static, |
534 | | U: RunningActionsManager + core::fmt::Debug, |
535 | | > core::fmt::Debug for LocalWorker<T, U> |
536 | | { |
537 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
538 | 0 | f.debug_struct("LocalWorker") |
539 | 0 | .field("config", &self.config) |
540 | 0 | .field("running_actions_manager", &self.running_actions_manager) |
541 | 0 | .field("metrics", &self.metrics) |
542 | 0 | .finish_non_exhaustive() |
543 | 0 | } |
544 | | } |
545 | | |
546 | | /// Creates a new `LocalWorker`. The `cas_store` must be an instance of |
547 | | /// `FastSlowStore` and will be checked at runtime. |
548 | 0 | pub async fn new_local_worker( |
549 | 0 | config: Arc<LocalWorkerConfig>, |
550 | 0 | cas_store: Store, |
551 | 0 | ac_store: Option<Store>, |
552 | 0 | historical_store: Store, |
553 | 2 | ) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> { |
554 | 2 | let fast_slow_store = cas_store |
555 | 2 | .downcast_ref::<FastSlowStore>(None) |
556 | 2 | .err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")?0 |
557 | 2 | .get_arc() |
558 | 2 | .err_tip(|| "FastSlowStore's Arc doesn't exist")?0 ; |
559 | | |
560 | | // Log warning about CAS configuration for multi-worker setups |
561 | 2 | event!( |
562 | 2 | Level::INFO, |
563 | 2 | worker_name = %config.name, |
564 | | "Starting worker '{}'. IMPORTANT: If running multiple workers, all workers \ |
565 | | must share the same CAS storage path to avoid 'Object not found' errors.", |
566 | 2 | config.name |
567 | | ); |
568 | | |
569 | 2 | if let Ok(path1 ) = fs::canonicalize(&config.work_directory).await { |
570 | 1 | fs::remove_dir_all(&path).await.err_tip(|| {0 |
571 | 0 | format!( |
572 | | "Could not remove work_directory '{}' in LocalWorker", |
573 | 0 | &path.as_path().to_str().unwrap_or("bad path") |
574 | | ) |
575 | 0 | })?; |
576 | 1 | } |
577 | | |
578 | 2 | fs::create_dir_all(&config.work_directory) |
579 | 2 | .await |
580 | 2 | .err_tip(|| format!0 ("Could not make work_directory : {}", config.work_directory0 ))?0 ; |
581 | 2 | let entrypoint = if config.entrypoint.is_empty() { |
582 | 2 | None |
583 | | } else { |
584 | 0 | Some(config.entrypoint.clone()) |
585 | | }; |
586 | 2 | let max_action_timeout = if config.max_action_timeout == 0 { |
587 | 2 | DEFAULT_MAX_ACTION_TIMEOUT |
588 | | } else { |
589 | 0 | Duration::from_secs(config.max_action_timeout as u64) |
590 | | }; |
591 | 2 | let max_upload_timeout = if config.max_upload_timeout == 0 { |
592 | 2 | DEFAULT_MAX_UPLOAD_TIMEOUT |
593 | | } else { |
594 | 0 | Duration::from_secs(config.max_upload_timeout as u64) |
595 | | }; |
596 | | |
597 | | // Initialize directory cache if configured |
598 | 2 | let directory_cache = if let Some(cache_config0 ) = &config.directory_cache { |
599 | | use std::path::PathBuf; |
600 | | |
601 | | use crate::directory_cache::{ |
602 | | DirectoryCache, DirectoryCacheConfig as WorkerDirCacheConfig, |
603 | | }; |
604 | | |
605 | 0 | let cache_root = if cache_config.cache_root.is_empty() { |
606 | 0 | PathBuf::from(&config.work_directory).parent().map_or_else( |
607 | 0 | || PathBuf::from("/tmp/nativelink_directory_cache"), |
608 | 0 | |p| p.join("directory_cache"), |
609 | | ) |
610 | | } else { |
611 | 0 | PathBuf::from(&cache_config.cache_root) |
612 | | }; |
613 | | |
614 | 0 | let worker_cache_config = WorkerDirCacheConfig { |
615 | 0 | max_entries: cache_config.max_entries, |
616 | 0 | max_size_bytes: cache_config.max_size_bytes, |
617 | 0 | cache_root, |
618 | 0 | }; |
619 | | |
620 | 0 | match DirectoryCache::new(worker_cache_config, fast_slow_store.clone()).await { |
621 | 0 | Ok(cache) => { |
622 | 0 | tracing::info!("Directory cache initialized successfully"); |
623 | 0 | Some(Arc::new(cache)) |
624 | | } |
625 | 0 | Err(e) => { |
626 | 0 | tracing::warn!("Failed to initialize directory cache: {:?}", e); |
627 | 0 | None |
628 | | } |
629 | | } |
630 | | } else { |
631 | 2 | None |
632 | | }; |
633 | | |
634 | | #[cfg(target_os = "linux")] |
635 | 2 | let use_namespaces = if let Some(use_namespaces0 ) = &config.use_namespaces { |
636 | 0 | if *use_namespaces |
637 | 0 | && !crate::namespace_utils::namespaces_supported( |
638 | 0 | config.use_mount_namespace.unwrap_or_default(), |
639 | 0 | ) |
640 | | { |
641 | 0 | return Err(make_err!(Code::Unavailable, "Namespaces not supported")); |
642 | 0 | } |
643 | 0 | if !*use_namespaces { |
644 | 0 | crate::running_actions_manager::UseNamespaces::No |
645 | 0 | } else if config.use_mount_namespace.unwrap_or_default() { |
646 | 0 | crate::running_actions_manager::UseNamespaces::YesAndMount |
647 | | } else { |
648 | 0 | crate::running_actions_manager::UseNamespaces::Yes |
649 | | } |
650 | 2 | } else if config |
651 | 2 | .use_mount_namespace |
652 | 2 | .is_some_and(core::convert::identity) |
653 | | { |
654 | 0 | return Err(make_err!( |
655 | 0 | Code::Unavailable, |
656 | 0 | "Mount namespaces not supported" |
657 | 0 | )); |
658 | | } else { |
659 | 2 | crate::running_actions_manager::UseNamespaces::No |
660 | | }; |
661 | | |
662 | | #[cfg(not(target_os = "linux"))] |
663 | | if config.use_namespaces.is_some_and(core::convert::identity) { |
664 | | return Err(make_err!( |
665 | | Code::Unavailable, |
666 | | "Namespaces not supported on non-Linux OSes" |
667 | | )); |
668 | | } |
669 | | #[cfg(not(target_os = "linux"))] |
670 | | if config |
671 | | .use_mount_namespace |
672 | | .is_some_and(core::convert::identity) |
673 | | { |
674 | | return Err(make_err!( |
675 | | Code::Unavailable, |
676 | | "Mount namespaces not supported on non-Linux OSes" |
677 | | )); |
678 | | } |
679 | | |
680 | 2 | let running_actions_manager = |
681 | 2 | Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { |
682 | 2 | root_action_directory: config.work_directory.clone(), |
683 | 2 | execution_configuration: ExecutionConfiguration { |
684 | 2 | entrypoint, |
685 | 2 | additional_environment: config.additional_environment.clone(), |
686 | 2 | }, |
687 | 2 | cas_store: fast_slow_store, |
688 | 2 | ac_store, |
689 | 2 | historical_store, |
690 | 2 | upload_action_result_config: &config.upload_action_result, |
691 | 2 | max_action_timeout, |
692 | 2 | max_upload_timeout, |
693 | 2 | timeout_handled_externally: config.timeout_handled_externally, |
694 | 2 | directory_cache, |
695 | 2 | #[cfg(target_os = "linux")] |
696 | 2 | use_namespaces, |
697 | 2 | })?0 ); |
698 | 2 | let local_worker = LocalWorker::new_with_connection_factory_and_actions_manager( |
699 | 2 | config.clone(), |
700 | 2 | running_actions_manager, |
701 | 2 | Box::new(move || {0 |
702 | 0 | let config = config.clone(); |
703 | 0 | Box::pin(async move { |
704 | 0 | let timeout = config |
705 | 0 | .worker_api_endpoint |
706 | 0 | .timeout |
707 | 0 | .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S); |
708 | 0 | let timeout_duration = Duration::from_secs_f32(timeout); |
709 | 0 | let tls_config = |
710 | 0 | tls_utils::load_client_config(&config.worker_api_endpoint.tls_config) |
711 | 0 | .err_tip(|| "Parsing local worker TLS configuration")?; |
712 | 0 | let endpoint = |
713 | 0 | tls_utils::endpoint_from(&config.worker_api_endpoint.uri, tls_config) |
714 | 0 | .map_err(|e| { |
715 | 0 | Error::from_std_err(Code::InvalidArgument, &e) |
716 | 0 | .append("Invalid URI for worker endpoint") |
717 | 0 | })? |
718 | 0 | .connect_timeout(timeout_duration) |
719 | 0 | .timeout(timeout_duration); |
720 | | |
721 | 0 | let transport = endpoint.connect().await.map_err(|e| { |
722 | 0 | Error::from_std_err(Code::Internal, &e).append(format!( |
723 | | "Could not connect to endpoint {}", |
724 | 0 | config.worker_api_endpoint.uri |
725 | | )) |
726 | 0 | })?; |
727 | 0 | Ok(WorkerApiClient::new(transport).into()) |
728 | 0 | }) |
729 | 0 | }), |
730 | 2 | Box::new(move |d| Box::pin0 (time::sleep0 (d0 ))), |
731 | | ); |
732 | 2 | Ok(local_worker) |
733 | 2 | } |
734 | | |
735 | | impl<T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorker<T, U> { |
736 | 11 | pub fn new_with_connection_factory_and_actions_manager( |
737 | 11 | config: Arc<LocalWorkerConfig>, |
738 | 11 | running_actions_manager: Arc<U>, |
739 | 11 | connection_factory: ConnectionFactory<T>, |
740 | 11 | sleep_fn: Box<dyn Fn(Duration) -> BoxFuture<'static, ()> + Send + Sync>, |
741 | 11 | ) -> Self { |
742 | 11 | let metrics = Arc::new(Metrics::new(Arc::downgrade( |
743 | 11 | running_actions_manager.metrics(), |
744 | | ))); |
745 | 11 | Self { |
746 | 11 | config, |
747 | 11 | running_actions_manager, |
748 | 11 | connection_factory, |
749 | 11 | sleep_fn: Some(sleep_fn), |
750 | 11 | metrics, |
751 | 11 | } |
752 | 11 | } |
753 | | |
754 | | #[allow( |
755 | | clippy::missing_const_for_fn, |
756 | | reason = "False positive on stable, but not on nightly" |
757 | | )] |
758 | 0 | pub fn name(&self) -> &String { |
759 | 0 | &self.config.name |
760 | 0 | } |
761 | | |
762 | 12 | async fn register_worker( |
763 | 12 | &self, |
764 | 12 | client: &mut T, |
765 | 12 | ) -> Result<(String, Streaming<UpdateForWorker>), Error> { |
766 | 12 | let mut extra_envs: HashMap<String, String> = HashMap::new(); |
767 | 12 | if let Some(ref additional_environment0 ) = self.config.additional_environment { |
768 | 0 | for (name, source) in additional_environment { |
769 | 0 | let value = match source { |
770 | 0 | EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()), |
771 | | EnvironmentSource::FromEnvironment => { |
772 | 0 | Cow::Owned(env::var(name).unwrap_or_default()) |
773 | | } |
774 | 0 | other => { |
775 | 0 | debug!( |
776 | | ?other, |
777 | | "Worker registration doesn't support this type of additional environment" |
778 | | ); |
779 | 0 | continue; |
780 | | } |
781 | | }; |
782 | 0 | extra_envs.insert(name.clone(), value.into_owned()); |
783 | | } |
784 | 12 | } |
785 | | |
786 | 12 | let connect_worker_request = make_connect_worker_request( |
787 | 12 | self.config.name.clone(), |
788 | 12 | &self.config.platform_properties, |
789 | 12 | &extra_envs, |
790 | 12 | self.config.max_inflight_tasks, |
791 | 12 | ) |
792 | 12 | .await?0 ; |
793 | 12 | let mut update_for_worker_stream9 = client |
794 | 12 | .connect_worker(connect_worker_request) |
795 | 12 | .await |
796 | 9 | .err_tip(|| "Could not call connect_worker() in worker")?0 |
797 | 9 | .into_inner(); |
798 | | |
799 | 9 | let first_msg_update8 = update_for_worker_stream |
800 | 9 | .next() |
801 | 9 | .await |
802 | 9 | .err_tip(|| "Got EOF expected UpdateForWorker")?1 |
803 | 8 | .err_tip(|| "Got error when receiving UpdateForWorker")?0 |
804 | | .update; |
805 | | |
806 | 8 | let worker_id = match first_msg_update { |
807 | 8 | Some(Update::ConnectionResult(connection_result)) => connection_result.worker_id, |
808 | 0 | other => { |
809 | 0 | return Err(make_input_err!( |
810 | 0 | "Expected first response from scheduler to be a ConnectionResult got : {:?}", |
811 | 0 | other |
812 | 0 | )); |
813 | | } |
814 | | }; |
815 | 8 | Ok((worker_id, update_for_worker_stream)) |
816 | 9 | } |
817 | | |
818 | | #[instrument(skip(self), level = Level::INFO)] |
819 | 9 | pub async fn run( |
820 | 9 | mut self, |
821 | 9 | mut shutdown_rx: broadcast::Receiver<ShutdownGuard>, |
822 | 9 | ) -> Result<(), Error> { |
823 | | // Belt-and-suspenders QoS bump: the main binary already calls |
824 | | // this before runtime creation so the tokio worker threads |
825 | | // inherit P-core preference via pthread QoS inheritance, but |
826 | | // any thread that reaches this point should also be tagged in |
827 | | // case it was spawned by a path that bypassed `on_thread_start`. |
828 | | // No-op on non-macOS. |
829 | | let _ = crate::qos::set_user_initiated(); |
830 | | |
831 | | let sleep_fn = self |
832 | | .sleep_fn |
833 | | .take() |
834 | | .err_tip(|| "Could not unwrap sleep_fn in LocalWorker::run")?; |
835 | | let sleep_fn_pin = Pin::new(&sleep_fn); |
836 | 3 | let error_handler = Box::pin(move |err| async move { |
837 | 3 | error!(?err, "Error"); |
838 | 3 | (sleep_fn_pin)(Duration::from_secs_f32(CONNECTION_RETRY_DELAY_S)).await; |
839 | 6 | }); |
840 | | |
841 | | loop { |
842 | | // First connect to our endpoint. |
843 | | let mut client = match (self.connection_factory)().await { |
844 | | Ok(client) => client, |
845 | | Err(e) => { |
846 | | (error_handler)(e).await; |
847 | | continue; // Try to connect again. |
848 | | } |
849 | | }; |
850 | | |
851 | | debug!("Connected to endpoint"); |
852 | | |
853 | | // Next register our worker with the scheduler. |
854 | | let (inner, update_for_worker_stream) = match self.register_worker(&mut client).await { |
855 | | Err(e) => { |
856 | | (error_handler)(e).await; |
857 | | continue; // Try to connect again. |
858 | | } |
859 | | Ok((worker_id, update_for_worker_stream)) => ( |
860 | | LocalWorkerImpl::new( |
861 | | &self.config, |
862 | | client, |
863 | | worker_id, |
864 | | self.running_actions_manager.clone(), |
865 | | self.metrics.clone(), |
866 | | ), |
867 | | update_for_worker_stream, |
868 | | ), |
869 | | }; |
870 | | info!( |
871 | | worker_id = %inner.worker_id, |
872 | | "Worker registered with scheduler" |
873 | | ); |
874 | | |
875 | | // Now listen for connections and run all other services. |
876 | | if let Err(err) = inner.run(update_for_worker_stream, &mut shutdown_rx).await { |
877 | | 'no_more_actions: { |
878 | | // Ensure there are no actions in transit before we try to kill |
879 | | // all our actions. |
880 | | const ITERATIONS: usize = 1_000; |
881 | | |
882 | | const ERROR_MSG: &str = "Actions in transit did not reach zero before we disconnected from the scheduler"; |
883 | | |
884 | | let sleep_duration = ACTIONS_IN_TRANSIT_TIMEOUT_S / ITERATIONS as f32; |
885 | | for _ in 0..ITERATIONS { |
886 | | if inner.actions_in_transit.load(Ordering::Acquire) == 0 { |
887 | | break 'no_more_actions; |
888 | | } |
889 | | (sleep_fn_pin)(Duration::from_secs_f32(sleep_duration)).await; |
890 | | } |
891 | | error!(ERROR_MSG); |
892 | | return Err(err.append(ERROR_MSG)); |
893 | | } |
894 | | error!(?err, "Worker disconnected from scheduler"); |
895 | | // Kill off any existing actions because if we re-connect, we'll |
896 | | // get some more and it might resource lock us. |
897 | | self.running_actions_manager.kill_all().await; |
898 | | |
899 | | (error_handler)(err).await; // Try to connect again. |
900 | | } |
901 | | } |
902 | | // Unreachable. |
903 | 0 | } |
904 | | } |
905 | | |
906 | | #[derive(Debug, MetricsComponent)] |
907 | | pub struct Metrics { |
908 | | #[metric( |
909 | | help = "Total number of actions sent to this worker to process. This does not mean it started them, it just means it received a request to execute it." |
910 | | )] |
911 | | start_actions_received: CounterWithTime, |
912 | | #[metric(help = "Total number of disconnects received from the scheduler.")] |
913 | | disconnects_received: CounterWithTime, |
914 | | #[metric(help = "Total number of keep-alives received from the scheduler.")] |
915 | | keep_alives_received: CounterWithTime, |
916 | | #[metric( |
917 | | help = "Stats about the calls to check if an action satisfies the config supplied script." |
918 | | )] |
919 | | preconditions: AsyncCounterWrapper, |
920 | | #[metric] |
921 | | #[allow( |
922 | | clippy::struct_field_names, |
923 | | reason = "TODO Fix this. Triggers on nightly" |
924 | | )] |
925 | | running_actions_manager_metrics: Weak<RunningActionManagerMetrics>, |
926 | | } |
927 | | |
928 | | impl RootMetricsComponent for Metrics {} |
929 | | |
930 | | impl Metrics { |
931 | 11 | fn new(running_actions_manager_metrics: Weak<RunningActionManagerMetrics>) -> Self { |
932 | 11 | Self { |
933 | 11 | start_actions_received: CounterWithTime::default(), |
934 | 11 | disconnects_received: CounterWithTime::default(), |
935 | 11 | keep_alives_received: CounterWithTime::default(), |
936 | 11 | preconditions: AsyncCounterWrapper::default(), |
937 | 11 | running_actions_manager_metrics, |
938 | 11 | } |
939 | 11 | } |
940 | | } |
941 | | |
942 | | impl Metrics { |
943 | 6 | async fn wrap<U, T: Future<Output = U>, F: FnOnce(Arc<Self>) -> T>( |
944 | 6 | self: Arc<Self>, |
945 | 6 | fut: F, |
946 | 6 | ) -> U { |
947 | 6 | fut(self).await |
948 | 5 | } |
949 | | } |