Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::vec_deque::VecDeque;
23
use std::collections::{HashMap, HashSet};
24
use std::env;
25
use std::ffi::{OsStr, OsString};
26
#[cfg(target_family = "unix")]
27
use std::fs::Permissions;
28
#[cfg(target_family = "unix")]
29
use std::os::unix::fs::{MetadataExt, PermissionsExt};
30
use std::path::{Path, PathBuf};
31
use std::process::Stdio;
32
use std::sync::{Arc, Weak};
33
use std::time::SystemTime;
34
35
use bytes::{Bytes, BytesMut};
36
use filetime::{FileTime, set_file_mtime};
37
use formatx::Template;
38
use futures::future::{
39
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
40
};
41
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
42
use nativelink_config::cas_server::{
43
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
44
};
45
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
46
use nativelink_metric::MetricsComponent;
47
use nativelink_proto::build::bazel::remote::execution::v2::{
48
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
49
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
50
    Tree as ProtoTree, UpdateActionResultRequest,
51
};
52
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
53
    HistoricalExecuteResponse, StartExecute,
54
};
55
use nativelink_store::ac_utils::{
56
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
57
};
58
use nativelink_store::cas_utils::is_zero_digest;
59
use nativelink_store::fast_slow_store::FastSlowStore;
60
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
61
use nativelink_store::grpc_store::GrpcStore;
62
use nativelink_util::action_messages::{
63
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
64
    SymlinkInfo, to_execute_response,
65
};
66
use nativelink_util::common::{DigestInfo, fs};
67
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
68
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
69
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
70
use nativelink_util::{background_spawn, spawn, spawn_blocking};
71
use parking_lot::Mutex;
72
use prost::Message;
73
use relative_path::RelativePath;
74
use scopeguard::{ScopeGuard, guard};
75
use serde::Deserialize;
76
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
77
use tokio::process;
78
use tokio::sync::{Notify, oneshot, watch};
79
use tokio::time::Instant;
80
use tokio_stream::wrappers::ReadDirStream;
81
use tonic::Request;
82
use tracing::{debug, error, info, trace, warn};
83
use uuid::Uuid;
84
85
use crate::persistent_worker::{
86
    Input as PersistentWorkerInput, PersistentWorkerPool, WireFormat, WorkRequest, WorkerKey,
87
};
88
89
/// For simplicity we use a fixed exit code for cases when our program is terminated
90
/// due to a signal.
91
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
92
93
const SUPPORTS_WORKERS_PROPERTY: &str = "supports-workers";
94
const REQUIRES_WORKER_PROTOCOL_PROPERTY: &str = "requires-worker-protocol";
95
96
/// Default strategy for uploading historical results.
97
/// Note: If this value changes the config documentation
98
/// should reflect it.
99
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
100
    UploadCacheResultsStrategy::FailuresOnly;
101
102
/// Valid string reasons for a failure.
103
/// Note: If these change, the documentation should be updated.
104
#[derive(Debug, Deserialize)]
105
#[serde(rename_all = "snake_case")]
106
enum SideChannelFailureReason {
107
    /// Task should be considered timed out.
108
    Timeout,
109
}
110
111
/// This represents the json data that can be passed from the running process
112
/// to the parent via the `SideChannelFile`. See:
113
/// `config::EnvironmentSource::sidechannelfile` for more details.
114
/// Note: Any fields added here must be added to the documentation.
115
#[derive(Debug, Deserialize, Default)]
116
struct SideChannelInfo {
117
    /// If the task should be considered a failure and why.
118
    failure: Option<SideChannelFailureReason>,
119
}
120
121
18
fn action_supports_persistent_workers(
122
18
    action_info: &ActionInfo,
123
18
) -> Option<Result<WireFormat, Error>> {
124
18
    if action_info
125
18
        .platform_properties
126
18
        .get(SUPPORTS_WORKERS_PROPERTY)
127
18
        .is_none_or(|value| 
value2
!=
"1"2
)
128
    {
129
16
        return None;
130
2
    }
131
132
2
    let protocol = action_info
133
2
        .platform_properties
134
2
        .get(REQUIRES_WORKER_PROTOCOL_PROPERTY)
135
2
        .map_or("proto", String::as_str);
136
2
    Some(WireFormat::parse(protocol))
137
18
}
138
139
2
fn os_args_to_strings(args: &[&OsStr]) -> Result<Vec<String>, Error> {
140
2
    args.iter()
141
4
        .
map2
(|arg| {
142
4
            arg.to_str().map(str::to_owned).ok_or_else(|| 
{0
143
0
                make_err!(
144
0
                    Code::InvalidArgument,
145
                    "Persistent worker command arguments must be valid UTF-8: {arg:?}"
146
                )
147
0
            })
148
4
        })
149
2
        .collect()
150
2
}
151
152
2
fn persistent_worker_request_arguments(argv: &[String]) -> Vec<String> {
153
2
    argv.iter()
154
2
        .skip(1)
155
2
        .skip_while(|arg| !arg.starts_with('@'))
156
2
        .cloned()
157
2
        .collect()
158
2
}
159
/// Maximum number of file-materialization (hardlink) or subdirectory
160
/// recursion futures polled concurrently per directory level. Higher values
161
/// drown APFS's per-volume metadata lock with `hardlink(2)` syscalls and
162
/// regress overall throughput vs lower-contention concurrency.
163
///
164
/// 64 is well above the inflection point on any modern Linux filesystem,
165
/// so this is also a no-op on Linux beyond replacing tokio scheduling
166
/// overhead.
167
const DOWNLOAD_TO_DIRECTORY_CONCURRENCY: usize = 64;
168
169
/// Aggressively download the digests of files and make a local folder from it. This function
170
/// gates each directory level to at most `DOWNLOAD_TO_DIRECTORY_CONCURRENCY`
171
/// concurrent in-flight materialization futures.
172
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
173
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
174
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
175
/// to a new location.
176
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
177
// of the future. So we need to force this function to return a dynamic future instead.
178
// see: https://github.com/rust-lang/rust/issues/78649
179
33
pub fn download_to_directory<'a>(
180
33
    cas_store: &'a FastSlowStore,
181
33
    filesystem_store: Pin<&'a FilesystemStore>,
182
33
    digest: &'a DigestInfo,
183
33
    current_directory: &'a str,
184
33
) -> BoxFuture<'a, Result<(), Error>> {
185
33
    async move {
186
33
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
187
33
            .await
188
33
            .err_tip(|| "Converting digest to Directory")
?0
;
189
33
        let mut futures = Vec::new();
190
191
33
        for 
file12
in directory.files {
192
12
            let digest: DigestInfo = file
193
12
                .digest
194
12
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
195
12
                .try_into()
196
12
                .err_tip(|| "In Directory::file::digest")
?0
;
197
12
            let dest = format!("{}/{}", current_directory, file.name);
198
12
            let is_executable = file.is_executable;
199
12
            let (mtime, custom_unix_mode) = match file.node_properties {
200
1
                Some(properties) => (properties.mtime, properties.unix_mode),
201
11
                None => (None, None),
202
            };
203
12
            futures.push(
204
12
                cas_store
205
12
                    .populate_fast_store(digest.into())
206
12
                    .and_then(move |()| async move {
207
12
                        if is_zero_digest(digest) {
208
                            // Zero-digest files are never persisted by the
209
                            // FilesystemStore, so materialise them directly in
210
                            // the worker exec dir.
211
3
                            let mut file_slot = fs::create_file(&dest)
212
3
                                .await
213
3
                                .err_tip(|| 
format!0
("Could not create zero-digest file at {dest}"))
?0
;
214
3
                            file_slot
215
3
                                .write_all(&[])
216
3
                                .await
217
3
                                .err_tip(|| 
format!0
("Could not write zero-digest file at {dest}"))
?0
;
218
9
                        } else if custom_unix_mode.is_some() || 
mtime8
.
is_some8
() {
219
                            // Rare path: per-file metadata (a custom unix_mode or
220
                            // an mtime) must land on a PRIVATE inode. A
221
                            // chmod/utimes on a hardlink mutates the shared CAS
222
                            // inode for every other action that hardlinked it
223
                            // (the #2347 corruption class). Copy the blob into a
224
                            // private inode, then stamp mode/mtime onto it below.
225
1
                            let file_entry = filesystem_store
226
1
                                .get_file_entry_for_digest(&digest)
227
1
                                .await
228
1
                                .err_tip(|| "During private copy")
?0
;
229
1
                            let src_path = file_entry
230
2
                                .
get_file_path_locked1
(|src| async move
{1
Ok(src)1
})
231
1
                                .await
?0
;
232
1
                            let spawned_dest = dest.clone();
233
1
                            spawn_blocking!("download_to_directory_private_copy", move || {
234
1
                                std::fs::copy(&src_path, &spawned_dest).map(|_| ()).map_err(|e| 
{0
235
0
                                    make_err!(
236
0
                                        Code::Internal,
237
                                        "Failed to copy CAS blob into a private inode at {spawned_dest}: {e:?}"
238
                                    )
239
0
                                })
240
1
                            })
241
1
                            .await
242
1
                            .err_tip(|| 
{0
243
0
                                "Failed to launch spawn_blocking private copy in download_to_directory"
244
0
                            })??;
245
                        } else {
246
                            // Hot path: hardlink only — no writable fd is ever
247
                            // opened for the materialized inode, so a concurrent
248
                            // `execve` of an executable input cannot hit ETXTBSY
249
                            // ("Text file busy"). Executables hardlink a
250
                            // per-digest 0o555 variant created once off the hot
251
                            // path (the 0o444 CAS blob is shared and cannot carry
252
                            // +x); non-executables hardlink the 0o444 CAS blob.
253
8
                            let src_path = if is_executable {
254
2
                                filesystem_store
255
2
                                    .get_executable_hardlink_source(&digest)
256
2
                                    .await
257
2
                                    .err_tip(|| "Resolving executable hardlink source")
?0
258
                            } else {
259
6
                                let file_entry = filesystem_store
260
6
                                    .get_file_entry_for_digest(&digest)
261
6
                                    .await
262
6
                                    .err_tip(|| "During hard link")
?0
;
263
                                // TODO: add a test for #2051: deadlock with large number of files
264
6
                                file_entry
265
12
                                    .
get_file_path_locked6
(|src| async move
{6
Ok(src)6
})
266
6
                                    .await
?0
267
                            };
268
8
                            fs::hard_link(&src_path, &dest)
269
8
                                .await
270
8
                                .map_err(|e| 
{0
271
0
                                    if e.code == Code::NotFound {
272
0
                                        e.append(
273
0
                                            format!(
274
                                            "Could not make hardlink to {dest}, file was likely evicted from cache.\n\
275
                                            This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
276
                                            To fix this issue:\n\
277
                                            1. Increase the 'max_bytes' value in your filesystem store configuration\n\
278
                                            2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
279
                                            3. The setting is typically found in your nativelink.json config under:\n\
280
                                            stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
281
                                            4. Restart NativeLink after making the change\n\n\
282
                                            If this error persists after increasing max_bytes several times, please report at:\n\
283
                                            https://github.com/TraceMachina/nativelink/issues\n\
284
                                            Include your config file and both server and client logs to help us assist you."
285
                                        ))
286
                                    } else {
287
0
                                        e.append(format!("Could not make hardlink to {dest}"))
288
                                    }
289
0
                                })?;
290
                            // Hardlinked inodes are already correct (the 0o444
291
                            // blob or the 0o555 executable variant) and carry no
292
                            // per-file metadata, so there is nothing to stamp.
293
8
                            return Ok(());
294
                        }
295
296
                        // Private-inode tail (zero-digest or private copy only):
297
                        // stamp the requested mode and mtime. Safe because the
298
                        // inode is private to this action.
299
                        #[cfg(target_family = "unix")]
300
                        {
301
4
                            let mode = if let Some(
mode1
) = custom_unix_mode {
302
1
                                Some(if is_executable { mode | 0o111 } else { 
mode0
})
303
3
                            } else if is_executable {
304
0
                                Some(0o555)
305
                            } else {
306
3
                                None
307
                            };
308
4
                            if let Some(
mode1
) = mode {
309
1
                                fs::set_permissions(&dest, Permissions::from_mode(mode))
310
1
                                    .await
311
1
                                    .err_tip(|| 
{0
312
0
                                        format!("Could not set unix mode in download_to_directory {dest}")
313
0
                                    })?;
314
3
                            }
315
                        }
316
4
                        if let Some(
mtime1
) = mtime {
317
1
                            let spawned_dest = dest.clone();
318
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
319
1
                                set_file_mtime(
320
1
                                    &spawned_dest,
321
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
322
                                )
323
1
                                .err_tip(|| 
{0
324
0
                                    format!("Failed to set mtime in download_to_directory {spawned_dest}")
325
0
                                })
326
1
                            })
327
1
                            .await
328
1
                            .err_tip(
329
                                || "Failed to launch spawn_blocking in download_to_directory",
330
0
                            )??;
331
3
                        }
332
4
                        Ok(())
333
24
                    })
334
12
                    .map_err(move |e| 
e0
.
append0
(
format!0
("for digest {digest}")))
335
12
                    .boxed(),
336
            );
337
        }
338
339
33
        for 
directory8
in directory.directories {
340
8
            let digest: DigestInfo = directory
341
8
                .digest
342
8
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
343
8
                .try_into()
344
8
                .err_tip(|| "In Directory::file::digest")
?0
;
345
8
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
346
8
            futures.push(
347
8
                async move {
348
8
                    fs::create_dir(&new_directory_path)
349
8
                        .await
350
8
                        .err_tip(|| 
format!0
("Could not create directory {new_directory_path}"))
?0
;
351
8
                    download_to_directory(
352
8
                        cas_store,
353
8
                        filesystem_store,
354
8
                        &digest,
355
8
                        &new_directory_path,
356
8
                    )
357
8
                    .await
358
8
                    .err_tip(|| 
format!0
("in download_to_directory : {new_directory_path}"))
?0
;
359
8
                    Ok(())
360
8
                }
361
8
                .boxed(),
362
            );
363
        }
364
365
        #[cfg(target_family = "unix")]
366
33
        for 
symlink_node2
in directory.symlinks {
367
2
            let dest = format!("{}/{}", current_directory, symlink_node.name);
368
2
            futures.push(
369
2
                async move {
370
2
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
371
0
                        format!(
372
                            "Could not create symlink {} -> {}",
373
                            symlink_node.target, dest
374
                        )
375
0
                    })?;
376
2
                    Ok(())
377
2
                }
378
2
                .boxed(),
379
            );
380
        }
381
382
        // Gate concurrency: at most DOWNLOAD_TO_DIRECTORY_CONCURRENCY futures
383
        // polled at once for this directory level. Previously all futures were
384
        // pushed into an unbounded FuturesUnordered, which on macOS produced
385
        // thousands of parallel hardlink(2) calls fighting APFS's per-volume
386
        // metadata lock and regressing throughput vs serial.
387
33
        futures::stream::iter(futures)
388
33
            .buffer_unordered(DOWNLOAD_TO_DIRECTORY_CONCURRENCY)
389
33
            .try_collect::<Vec<_>>()
390
33
            .await
?0
;
391
33
        Ok(())
392
33
    }
393
33
    .boxed()
394
33
}
395
396
/// Prepares action inputs by first trying the directory cache (if available),
397
/// then falling back to traditional `download_to_directory`.
398
///
399
/// This provides a significant performance improvement for repeated builds
400
/// with the same input directories.
401
///
402
/// `work_directory` must already exist and be empty when this is called: the
403
/// caller pre-creates it so that, on the fallback path, `download_to_directory`
404
/// has a destination to write into. The directory cache, however, materializes
405
/// the tree with `hardlink_directory_tree` / APFS `clonefile(2)`, both of which
406
/// require the destination to *not* exist — so this function removes the empty
407
/// directory before invoking the cache and recreates it if the cache fails.
408
0
pub async fn prepare_action_inputs(
409
0
    directory_cache: &Option<Arc<crate::directory_cache::DirectoryCache>>,
410
0
    cas_store: &FastSlowStore,
411
0
    filesystem_store: Pin<&FilesystemStore>,
412
0
    digest: &DigestInfo,
413
0
    work_directory: &str,
414
21
) -> Result<(), Error> {
415
    // Try cache first if available
416
21
    if let Some(
cache0
) = directory_cache {
417
        // `clonefile(2)` and `hardlink_directory_tree` both require the
418
        // destination to not exist. Remove the empty directory the caller
419
        // pre-created; without this the cache fails its precondition on every
420
        // action and silently falls back to the slow download path.
421
0
        fs::remove_dir(work_directory)
422
0
            .await
423
0
            .err_tip(|| format!("Failed to clear pre-created work directory {work_directory}"))?;
424
0
        match cache
425
0
            .get_or_create(*digest, Path::new(work_directory))
426
0
            .await
427
        {
428
0
            Ok(cache_hit) => {
429
                // The materialized tree is already usable. The directory
430
                // cache locks each entry down with `set_readonly_recursive`,
431
                // which leaves directories writable (0o755) and only makes
432
                // files read-only (0o555). The macOS `clonefile(2)` path
433
                // copies those modes verbatim and the Linux hardlink walk
434
                // creates fresh writable directories, so every directory in
435
                // the materialized tree already accepts the nested output
436
                // files Bazel actions declare — no separate per-materialize
437
                // recursive chmod walk is needed here. Files stay read-only,
438
                // preserving the hermeticity contract and the CAS-hardlink
439
                // shared-inode invariant.
440
0
                trace!(
441
                    ?digest,
442
                    work_directory, cache_hit, "Successfully prepared inputs via directory cache"
443
                );
444
0
                return Ok(());
445
            }
446
0
            Err(e) => {
447
0
                warn!(
448
                    ?digest,
449
                    ?e,
450
                    "Directory cache failed, falling back to traditional download"
451
                );
452
                // The cache may have materialized a partial tree before
453
                // failing. `download_to_directory` needs an existing, empty
454
                // destination, so discard any partial state and recreate the
455
                // work directory.
456
0
                let _cleanup = fs::remove_dir_all(work_directory).await;
457
0
                fs::create_dir(work_directory)
458
0
                    .await
459
0
                    .err_tip(|| format!("Failed to recreate work directory {work_directory}"))?;
460
            }
461
        }
462
21
    }
463
464
    // Traditional path (cache disabled or failed)
465
21
    download_to_directory(cas_store, filesystem_store, digest, work_directory).await
466
21
}
467
468
#[cfg(target_family = "windows")]
469
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
470
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
471
    EXECUTABLE_EXTENSIONS
472
        .iter()
473
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
474
}
475
476
#[cfg(target_family = "unix")]
477
13
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
478
13
    (metadata.mode() & 0o111) != 0
479
13
}
480
481
type DigestUploader = Arc<tokio::sync::OnceCell<()>>;
482
483
13
async fn upload_file(
484
13
    cas_store: Pin<&impl StoreLike>,
485
13
    full_path: impl AsRef<Path> + Debug + Send + Sync,
486
13
    hasher: DigestHasherFunc,
487
13
    metadata: std::fs::Metadata,
488
13
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
489
13
) -> Result<FileInfo, Error> {
490
13
    let is_executable = is_executable(&metadata, &full_path);
491
13
    let file_size = metadata.len();
492
13
    let file = fs::open_file(&full_path, 0, u64::MAX)
493
13
        .await
494
13
        .err_tip(|| 
format!0
("Could not open file {full_path:?}"))
?0
;
495
496
13
    let (digest, mut file) = hasher
497
13
        .hasher()
498
13
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
499
13
        .await
500
13
        .err_tip(|| 
format!0
("Failed to hash file in digest_for_file failed for {full_path:?}"))
?0
;
501
502
13
    let digest_uploader = match digest_uploaders.lock().entry(digest) {
503
1
        std::collections::hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
504
12
        std::collections::hash_map::Entry::Vacant(vacant_entry) => vacant_entry
505
12
            .insert(Arc::new(tokio::sync::OnceCell::new()))
506
12
            .clone(),
507
    };
508
509
    // Only upload a file with a given hash once.  The file may exist multiple
510
    // times in the output with different names.
511
13
    digest_uploader
512
24
        .
get_or_try_init13
(async || {
513
            // Only upload if the digest doesn't already exist, this should be
514
            // a much cheaper operation than an upload.
515
12
            let cas_store = cas_store.as_store_driver_pin();
516
12
            let store_key: nativelink_util::store_trait::StoreKey<'_> = digest.into();
517
12
            let has_start = std::time::Instant::now();
518
12
            if cas_store
519
12
                .has(store_key.borrow())
520
12
                .await
521
12
                .is_ok_and(|result| result.is_some())
522
            {
523
2
                trace!(
524
                    ?digest,
525
2
                    has_elapsed_ms = has_start.elapsed().as_millis(),
526
                    "upload_file: digest already exists in CAS, skipping upload",
527
                );
528
2
                return Ok(());
529
10
            }
530
10
            trace!(
531
                ?digest,
532
10
                has_elapsed_ms = has_start.elapsed().as_millis(),
533
10
                file_size = digest.size_bytes(),
534
                "upload_file: digest not in CAS, starting upload",
535
            );
536
537
10
            file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
538
539
            // Note: For unknown reasons we appear to be hitting:
540
            // https://github.com/rust-lang/rust/issues/92096
541
            // or a similar issue if we try to use the non-store driver function, so we
542
            // are using the store driver function here.
543
10
            let store_key_for_upload = store_key.clone();
544
10
            let file_upload_start = std::time::Instant::now();
545
10
            let upload_result = cas_store
546
10
                .update_with_whole_file(
547
10
                    store_key_for_upload,
548
10
                    full_path.as_ref().into(),
549
10
                    file,
550
10
                    UploadSizeInfo::ExactSize(digest.size_bytes()),
551
10
                )
552
10
                .await
553
10
                .map(|_slot| ());
554
10
            trace!(
555
                ?digest,
556
10
                upload_elapsed_ms = file_upload_start.elapsed().as_millis(),
557
10
                success = upload_result.is_ok(),
558
                "upload_file: update_with_whole_file completed",
559
            );
560
561
10
            match upload_result {
562
10
                Ok(()) => Ok(()),
563
0
                Err(err) => {
564
                    // Output uploads run concurrently and may overlap (e.g. a file is listed
565
                    // both as an output file and inside an output directory). When another
566
                    // upload has already moved the file into CAS, this update can fail with
567
                    // NotFound even though the digest is now present. Per the RE spec, missing
568
                    // outputs should be ignored, so treat this as success if the digest exists.
569
0
                    if err.code == Code::NotFound
570
0
                        && cas_store
571
0
                            .has(store_key.borrow())
572
0
                            .await
573
0
                            .is_ok_and(|result| result.is_some())
574
                    {
575
0
                        Ok(())
576
                    } else {
577
0
                        Err(err)
578
                    }
579
                }
580
            }
581
24
        })
582
13
        .await
583
13
        .err_tip(|| 
format!0
("for {full_path:?}"))
?0
;
584
585
13
    let name = full_path
586
13
        .as_ref()
587
13
        .file_name()
588
13
        .err_tip(|| 
format!0
("Expected file_name to exist on {full_path:?}"))
?0
589
13
        .to_str()
590
13
        .err_tip(|| 
{0
591
0
            make_err!(
592
0
                Code::Internal,
593
                "Could not convert {:?} to string",
594
                full_path
595
            )
596
0
        })?
597
13
        .to_string();
598
599
13
    Ok(FileInfo {
600
13
        name_or_path: NameOrPath::Name(name),
601
13
        digest,
602
13
        is_executable,
603
13
    })
604
13
}
605
606
2
async fn upload_symlink(
607
2
    full_path: impl AsRef<Path> + Debug,
608
2
    full_work_directory_path: impl AsRef<Path>,
609
2
) -> Result<SymlinkInfo, Error> {
610
2
    let full_target_path = fs::read_link(full_path.as_ref())
611
2
        .await
612
2
        .err_tip(|| 
format!0
("Could not get read_link path of {full_path:?}"))
?0
;
613
614
    // Detect if our symlink is inside our work directory, if it is find the
615
    // relative path otherwise use the absolute path.
616
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
617
0
        let full_target_path = RelativePath::from_path(&full_target_path)
618
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
619
0
        RelativePath::from_path(full_work_directory_path.as_ref())
620
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
621
0
            .relative(full_target_path)
622
0
            .normalize()
623
0
            .into_string()
624
    } else {
625
2
        full_target_path
626
2
            .to_str()
627
2
            .err_tip(|| 
{0
628
0
                make_err!(
629
0
                    Code::Internal,
630
                    "Could not convert '{:?}' to string",
631
                    full_target_path
632
                )
633
0
            })?
634
2
            .to_string()
635
    };
636
637
2
    let name = full_path
638
2
        .as_ref()
639
2
        .file_name()
640
2
        .err_tip(|| 
format!0
("Expected file_name to exist on {full_path:?}"))
?0
641
2
        .to_str()
642
2
        .err_tip(|| 
{0
643
0
            make_err!(
644
0
                Code::Internal,
645
                "Could not convert {:?} to string",
646
                full_path
647
            )
648
0
        })?
649
2
        .to_string();
650
651
2
    Ok(SymlinkInfo {
652
2
        name_or_path: NameOrPath::Name(name),
653
2
        target,
654
2
    })
655
2
}
656
657
4
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
658
4
    cas_store: Pin<&'a impl StoreLike>,
659
4
    full_dir_path: P,
660
4
    full_work_directory: &'a str,
661
4
    hasher: DigestHasherFunc,
662
4
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
663
4
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
664
4
    Box::pin(async move {
665
4
        let file_futures = FuturesUnordered::new();
666
4
        let dir_futures = FuturesUnordered::new();
667
4
        let symlink_futures = FuturesUnordered::new();
668
        {
669
4
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
670
4
                .await
671
4
                .err_tip(|| 
format!0
("Error reading dir for reading {full_dir_path:?}"))
?0
672
4
                .into_inner();
673
4
            let mut dir_stream = ReadDirStream::new(dir_handle);
674
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
675
            // lived as possible. This is why we iterate the directory and then build a bunch of
676
            // futures with all the work we are wanting to do then execute it. It allows us to
677
            // close the directory iterator file descriptor, then open the child files/folders.
678
10
            while let Some(
entry_result6
) = dir_stream.next().await {
679
6
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
680
6
                let file_type = entry
681
6
                    .file_type()
682
6
                    .await
683
6
                    .err_tip(|| 
format!0
("Error running file_type() on {entry:?}"))
?0
;
684
6
                let full_path = full_dir_path.as_ref().join(entry.path());
685
6
                if file_type.is_dir() {
686
1
                    let full_dir_path = full_dir_path.clone();
687
1
                    dir_futures.push(
688
1
                        upload_directory(
689
1
                            cas_store,
690
1
                            full_path.clone(),
691
1
                            full_work_directory,
692
1
                            hasher,
693
1
                            digest_uploaders.clone(),
694
                        )
695
1
                        .and_then(|(dir, all_dirs)| async move {
696
1
                            let directory_name = full_path
697
1
                                .file_name()
698
1
                                .err_tip(|| 
{0
699
0
                                    format!("Expected file_name to exist on {full_dir_path:?}")
700
0
                                })?
701
1
                                .to_str()
702
1
                                .err_tip(|| 
{0
703
0
                                    make_err!(
704
0
                                        Code::Internal,
705
                                        "Could not convert {:?} to string",
706
                                        full_dir_path
707
                                    )
708
0
                                })?
709
1
                                .to_string();
710
711
1
                            let digest =
712
1
                                serialize_and_upload_message(&dir, cas_store, &mut hasher.hasher())
713
1
                                    .await
714
1
                                    .err_tip(|| 
format!0
("for {}",
full_path.display()0
))
?0
;
715
716
1
                            Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
717
1
                                DirectoryNode {
718
1
                                    name: directory_name,
719
1
                                    digest: Some(digest.into()),
720
1
                                },
721
1
                                all_dirs,
722
1
                            ))
723
2
                        })
724
1
                        .boxed(),
725
                    );
726
5
                } else if file_type.is_file() {
727
4
                    let digest_uploaders = digest_uploaders.clone();
728
4
                    file_futures.push(async move {
729
4
                        let metadata = fs::metadata(&full_path)
730
4
                            .await
731
4
                            .err_tip(|| 
format!0
("Could not open file {}",
full_path.display()0
))
?0
;
732
4
                        upload_file(cas_store, &full_path, hasher, metadata, digest_uploaders)
733
4
                            .map_ok(TryInto::try_into)
734
4
                            .await
?0
735
4
                    });
736
1
                } else if file_type.is_symlink() {
737
1
                    symlink_futures.push(
738
1
                        upload_symlink(full_path, &full_work_directory)
739
1
                            .map(|symlink| symlink
?0
.try_into()),
740
                    );
741
0
                }
742
            }
743
        }
744
745
4
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
746
4
            file_futures.try_collect::<Vec<FileNode>>(),
747
4
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
748
4
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
749
4
        )
750
4
        .await
?0
;
751
752
4
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
753
        // For efficiency we use a deque because it allows cheap concat of Vecs.
754
        // We make the assumption here that when performance is important it is because
755
        // our directory is quite large. This allows us to cheaply merge large amounts of
756
        // directories into one VecDeque. Then after we are done we need to collapse it
757
        // down into a single Vec.
758
4
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
759
4
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
760
1
            directory_nodes.push(directory_node);
761
1
            all_child_directories.append(&mut recursive_child_directories);
762
1
        }
763
764
4
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
765
4
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
766
4
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
767
768
4
        let directory = Directory {
769
4
            files: file_nodes,
770
4
            directories: directory_nodes,
771
4
            symlinks,
772
4
            node_properties: None, // We don't support file properties.
773
4
        };
774
4
        all_child_directories.push_back(directory.clone());
775
776
4
        Ok((directory, all_child_directories))
777
4
    })
778
4
}
779
780
0
async fn process_side_channel_file(
781
0
    side_channel_file: Cow<'_, OsStr>,
782
0
    args: &[&OsStr],
783
0
    timeout: Duration,
784
0
) -> Result<Option<Error>, Error> {
785
0
    let mut json_contents = String::new();
786
    {
787
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
788
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
789
0
            Ok(file_slot) => file_slot,
790
0
            Err(e) => {
791
0
                if e.code != Code::NotFound {
792
0
                    return Err(e).err_tip(|| "Error opening side channel file");
793
0
                }
794
                // Note: If file does not exist, it's ok. Users are not required to create this file.
795
0
                return Ok(None);
796
            }
797
        };
798
0
        file_slot
799
0
            .read_to_string(&mut json_contents)
800
0
            .await
801
0
            .err_tip(|| "Error reading side channel file")?;
802
    }
803
804
0
    let side_channel_info: SideChannelInfo = serde_json5::from_str(&json_contents)
805
0
        .map_err(Error::from)
806
0
        .err_tip(|| "Could not convert contents of side channel file (json) to SideChannelInfo")?;
807
0
    Ok(side_channel_info.failure.map(|failure| match failure {
808
        SideChannelFailureReason::Timeout => {
809
0
            let join_args = args.join(OsStr::new(" "));
810
0
            let command = join_args.to_string_lossy();
811
0
            warn!(%command, timeout=timeout.as_secs_f32(), "Side channel timeout for command");
812
0
            Error::new(
813
0
                Code::DeadlineExceeded,
814
0
                format!(
815
                    "Command '{}' timed out after {} seconds",
816
                    command,
817
0
                    timeout.as_secs_f32()
818
                ),
819
            )
820
        }
821
0
    }))
822
0
}
823
824
22
async fn do_cleanup(
825
22
    running_actions_manager: &Arc<RunningActionsManagerImpl>,
826
22
    operation_id: &OperationId,
827
22
    action_directory: &str,
828
22
) -> Result<(), Error> {
829
    // Mark this operation as being cleaned up
830
22
    let Some(_cleaning_guard) = running_actions_manager.perform_cleanup(operation_id.clone())
831
    else {
832
        // Cleanup is already happening elsewhere.
833
0
        return Ok(());
834
    };
835
836
22
    debug!("Worker cleaning up");
837
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
838
22
    let remove_dir_result = fs::remove_dir_all(action_directory)
839
22
        .await
840
22
        .err_tip(|| 
format!0
("Could not remove working directory {action_directory}"));
841
842
22
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
843
0
        error!(%operation_id, ?err, "Error cleaning up action");
844
0
        Result::<(), Error>::Err(err).merge(remove_dir_result)
845
22
    } else if let Err(
err0
) = remove_dir_result {
846
0
        error!(%operation_id, ?err, "Error removing working directory");
847
0
        Err(err)
848
    } else {
849
22
        Ok(())
850
    }
851
22
}
852
853
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
854
    /// Returns the action id of the action.
855
    fn get_operation_id(&self) -> &OperationId;
856
857
    /// Anything that needs to execute before the actions is actually executed should happen here.
858
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
859
860
    /// Actually perform the execution of the action.
861
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
862
863
    /// Any uploading, processing or analyzing of the results should happen here.
864
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
865
866
    /// Cleanup any residual files, handles or other junk resulting from running the action.
867
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
868
869
    /// Returns the final result. As a general rule this action should be thought of as
870
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
871
    /// is over and any action performed on it after this call is undefined behavior.
872
    fn get_finished_result(
873
        self: Arc<Self>,
874
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
875
876
    /// Returns the work directory of the action.
877
    fn get_work_directory(&self) -> &String;
878
}
879
880
#[derive(Debug)]
881
struct RunningActionImplExecutionResult {
882
    stdout: Bytes,
883
    stderr: Bytes,
884
    exit_code: i32,
885
}
886
887
#[derive(Debug)]
888
struct RunningActionImplState {
889
    command_proto: Option<ProtoCommand>,
890
    // TODO(palfrey) Kill is not implemented yet, but is instrumented.
891
    // However, it is used if the worker disconnects to destroy current jobs.
892
    kill_channel_tx: Option<oneshot::Sender<()>>,
893
    kill_channel_rx: Option<oneshot::Receiver<()>>,
894
    execution_result: Option<RunningActionImplExecutionResult>,
895
    action_result: Option<ActionResult>,
896
    execution_metadata: ExecutionMetadata,
897
    // If there was an internal error, this will be set.
898
    // This should NOT be set if everything was fine, but the process had a
899
    // non-zero exit code. Instead this should be used for internal errors
900
    // that prevented the action from running, upload failures, timeouts, exc...
901
    // but we have (or could have) the action results (like stderr/stdout).
902
    error: Option<Error>,
903
}
904
905
#[derive(Debug)]
906
pub struct RunningActionImpl {
907
    operation_id: OperationId,
908
    action_directory: String,
909
    work_directory: String,
910
    action_info: ActionInfo,
911
    timeout: Duration,
912
    running_actions_manager: Arc<RunningActionsManagerImpl>,
913
    state: Mutex<RunningActionImplState>,
914
    has_manager_entry: AtomicBool,
915
    did_cleanup: AtomicBool,
916
}
917
918
impl RunningActionImpl {
919
25
    pub fn new(
920
25
        execution_metadata: ExecutionMetadata,
921
25
        operation_id: OperationId,
922
25
        action_directory: String,
923
25
        action_info: ActionInfo,
924
25
        timeout: Duration,
925
25
        running_actions_manager: Arc<RunningActionsManagerImpl>,
926
25
    ) -> Self {
927
25
        let work_directory = format!("{}/{}", action_directory, "work");
928
25
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
929
25
        Self {
930
25
            operation_id,
931
25
            action_directory,
932
25
            work_directory,
933
25
            action_info,
934
25
            timeout,
935
25
            running_actions_manager,
936
25
            state: Mutex::new(RunningActionImplState {
937
25
                command_proto: None,
938
25
                kill_channel_rx: Some(kill_channel_rx),
939
25
                kill_channel_tx: Some(kill_channel_tx),
940
25
                execution_result: None,
941
25
                action_result: None,
942
25
                execution_metadata,
943
25
                error: None,
944
25
            }),
945
25
            // Always need to ensure that we're removed from the manager on Drop.
946
25
            has_manager_entry: AtomicBool::new(true),
947
25
            // Only needs to be cleaned up after a prepare_action call, set there.
948
25
            did_cleanup: AtomicBool::new(true),
949
25
        }
950
25
    }
951
952
    #[allow(
953
        clippy::missing_const_for_fn,
954
        reason = "False positive on stable, but not on nightly"
955
    )]
956
0
    fn metrics(&self) -> &Arc<Metrics> {
957
0
        &self.running_actions_manager.metrics
958
0
    }
959
960
    /// Prepares any actions needed to execute this action. This action will do the following:
961
    ///
962
    /// * Download any files needed to execute the action
963
    /// * Build a folder with all files needed to execute the action.
964
    ///
965
    /// This function will aggressively download and spawn potentially thousands of futures. It is
966
    /// up to the stores to rate limit if needed.
967
21
    
async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
968
21
        {
969
21
            let mut state = self.state.lock();
970
21
            state.execution_metadata.input_fetch_start_timestamp =
971
21
                (self.running_actions_manager.callbacks.now_fn)();
972
21
        }
973
21
        let command = {
974
            // Download and build out our input files/folders. Also fetch and decode our Command.
975
21
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
976
21
                get_and_decode_digest::<ProtoCommand>(
977
21
                    self.running_actions_manager.cas_store.as_ref(),
978
21
                    self.action_info.command_digest.into(),
979
21
                )
980
21
                .await
981
21
                .err_tip(|| "Converting command_digest to Command")
982
21
            });
983
21
            let filesystem_store_pin =
984
21
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
985
21
            let (command, ()) = try_join(command_fut, async {
986
21
                fs::create_dir(&self.work_directory)
987
21
                    .await
988
21
                    .err_tip(|| 
format!0
("Error creating work directory {}",
self.work_directory0
))
?0
;
989
                // Now the work directory has been created, we have to clean up.
990
21
                self.did_cleanup.store(false, Ordering::Release);
991
                // Download the input files/folder and place them into the temp directory.
992
                // Use directory cache if available for better performance.
993
21
                self.metrics()
994
21
                    .download_to_directory
995
21
                    .wrap(prepare_action_inputs(
996
21
                        &self.running_actions_manager.directory_cache,
997
21
                        &self.running_actions_manager.cas_store,
998
21
                        filesystem_store_pin,
999
21
                        &self.action_info.input_root_digest,
1000
21
                        &self.work_directory,
1001
21
                    ))
1002
21
                    .await
1003
21
            })
1004
21
            .await
?0
;
1005
21
            command
1006
        };
1007
        {
1008
            // Create all directories needed for our output paths. This is required by the bazel spec.
1009
21
            let prepare_output_directories = |output_file| 
{15
1010
15
                let full_output_path = if command.working_directory.is_empty() {
1011
4
                    format!("{}/{}", self.work_directory, output_file)
1012
                } else {
1013
11
                    format!(
1014
                        "{}/{}/{}",
1015
11
                        self.work_directory, command.working_directory, output_file
1016
                    )
1017
                };
1018
15
                async move {
1019
15
                    let full_parent_path = Path::new(&full_output_path)
1020
15
                        .parent()
1021
15
                        .err_tip(|| 
format!0
("Parent path for {full_output_path} has no parent"))
?0
;
1022
15
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
1023
0
                        format!(
1024
                            "Error creating output directory {} (file)",
1025
0
                            full_parent_path.display()
1026
                        )
1027
0
                    })?;
1028
15
                    Result::<(), Error>::Ok(())
1029
15
                }
1030
15
            };
1031
21
            self.metrics()
1032
21
                .prepare_output_files
1033
21
                .wrap(try_join_all(
1034
21
                    command.output_files.iter().map(prepare_output_directories),
1035
21
                ))
1036
21
                .await
?0
;
1037
21
            self.metrics()
1038
21
                .prepare_output_paths
1039
21
                .wrap(try_join_all(
1040
21
                    command.output_paths.iter().map(prepare_output_directories),
1041
21
                ))
1042
21
                .await
?0
;
1043
        }
1044
21
        debug!(?command, "Worker received command");
1045
21
        {
1046
21
            let mut state = self.state.lock();
1047
21
            state.command_proto = Some(command);
1048
21
            state.execution_metadata.input_fetch_completed_timestamp =
1049
21
                (self.running_actions_manager.callbacks.now_fn)();
1050
21
        }
1051
21
        Ok(self)
1052
21
    }
1053
1054
20
    pub fn canonicalise_path(
1055
20
        self: &Arc<Self>,
1056
20
        arg: &OsStr,
1057
20
        working_directory: &String,
1058
20
    ) -> Result<PathBuf, Error> {
1059
        // If the program contains a slash, we treat it as a path and resolve it relative to the work directory.
1060
20
        Ok(if Path::new(arg).components().count() > 1 {
1061
5
            let canonical_path = PathBuf::from(&self.work_directory)
1062
5
                .join(working_directory)
1063
5
                .join(arg);
1064
5
            if cfg!(target_os = "windows") {
1065
                // Workaround for https://github.com/rust-lang/rust/issues/42869 using a windows-specific crate
1066
0
                dunce::canonicalize(canonical_path)
1067
            } else {
1068
5
                canonical_path.canonicalize()
1069
            }
1070
5
            .err_tip(|| 
{1
1071
1
                format!(
1072
                    "Could not canonicalize path for command root {}.",
1073
1
                    arg.to_string_lossy()
1074
                )
1075
1
            })?
1076
        } else {
1077
15
            PathBuf::from(arg)
1078
        })
1079
20
    }
1080
1081
19
    
async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1082
19
        let (command_proto, mut kill_channel_rx) = {
1083
19
            let mut state = self.state.lock();
1084
19
            state.execution_metadata.execution_start_timestamp =
1085
19
                (self.running_actions_manager.callbacks.now_fn)();
1086
            (
1087
19
                state
1088
19
                    .command_proto
1089
19
                    .take()
1090
19
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1091
19
                state
1092
19
                    .kill_channel_rx
1093
19
                    .take()
1094
19
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
1095
                    // This is important as we may be killed at any point.
1096
19
                    .fuse(),
1097
            )
1098
        };
1099
19
        if command_proto.arguments.is_empty() {
1100
0
            return Err(make_input_err!("No arguments provided in Command proto"));
1101
19
        }
1102
19
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
1103
19
            .running_actions_manager
1104
19
            .execution_configuration
1105
19
            .entrypoint
1106
        {
1107
0
            core::iter::once(entrypoint.as_ref())
1108
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
1109
0
                .collect()
1110
        } else {
1111
19
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
1112
        };
1113
        // TODO(palfrey): This should probably be in debug, but currently
1114
        //                    that's too busy and we often rely on this to
1115
        //                    figure out toolchain misconfiguration issues.
1116
        //                    De-bloat the `debug` level by using the `trace`
1117
        //                    level more effectively and adjust this.
1118
19
        info!(?args, "Executing command");
1119
1120
19
        let 
program18
= self
1121
19
            .canonicalise_path(args[0], &command_proto.working_directory)
1122
19
            .err_tip(|| 
format!1
("Canonicalisation failure. Command={args:#?}"))
?1
;
1123
18
        if let Some(
wire_format_result2
) = action_supports_persistent_workers(&self.action_info) {
1124
2
            match wire_format_result {
1125
2
                Ok(wire_format) => {
1126
2
                    let command_argv = os_args_to_strings(&args)
?0
;
1127
2
                    let key = WorkerKey::from_argv(&command_argv, wire_format)
?0
;
1128
2
                    let request = WorkRequest {
1129
2
                        arguments: persistent_worker_request_arguments(&command_argv),
1130
2
                        inputs: Vec::<PersistentWorkerInput>::new(),
1131
                        request_id: 0,
1132
                        cancel: false,
1133
                        verbosity: 0,
1134
2
                        sandbox_dir: if command_proto.working_directory.is_empty() {
1135
2
                            self.work_directory.clone()
1136
                        } else {
1137
0
                            format!(
1138
                                "{}/{}",
1139
0
                                self.work_directory, command_proto.working_directory
1140
                            )
1141
                        },
1142
                    };
1143
2
                    let worker_cwd =
1144
2
                        PathBuf::from(&self.running_actions_manager.root_action_directory);
1145
1146
2
                    match self
1147
2
                        .running_actions_manager
1148
2
                        .persistent_worker_pool
1149
2
                        .acquire(key.clone(), &program, &worker_cwd)
1150
2
                        .await
1151
                    {
1152
2
                        Ok(mut lease) => {
1153
2
                            let timer = self.metrics().child_process.begin_timer();
1154
2
                            let dispatch_result = {
1155
2
                                let dispatch_fut =
1156
2
                                    lease.worker().dispatch_with_timeout(&request, self.timeout);
1157
2
                                tokio::pin!(dispatch_fut);
1158
2
                                tokio::select! {
1159
2
                                    result = &mut dispatch_fut => Some(result),
1160
2
                                    _ = &mut kill_channel_rx => 
None0
,
1161
                                }
1162
                            };
1163
2
                            let response = match dispatch_result {
1164
2
                                Some(Ok(response)) => {
1165
2
                                    lease.release(true).await;
1166
2
                                    response
1167
                                }
1168
0
                                Some(Err(err)) => {
1169
0
                                    lease.release(false).await;
1170
0
                                    return Err(err).err_tip(|| {
1171
0
                                        format!("Dispatching action to persistent worker {key:?}")
1172
0
                                    });
1173
                                }
1174
                                None => {
1175
0
                                    drop(timer);
1176
0
                                    lease.release(false).await;
1177
0
                                    {
1178
0
                                        let mut state = self.state.lock();
1179
0
                                        state.error = Error::merge_option(
1180
0
                                            state.error.take(),
1181
0
                                            Some(Error::new(
1182
0
                                                Code::Cancelled,
1183
0
                                                format!(
1184
0
                                                    "Persistent worker command '{}' was killed by scheduler",
1185
0
                                                    args.join(OsStr::new(" ")).to_string_lossy()
1186
0
                                                ),
1187
0
                                            )),
1188
0
                                        );
1189
0
                                        state.command_proto = Some(command_proto);
1190
0
                                        state.execution_result =
1191
0
                                            Some(RunningActionImplExecutionResult {
1192
0
                                                stdout: Bytes::new(),
1193
0
                                                stderr: Bytes::new(),
1194
0
                                                exit_code: EXIT_CODE_FOR_SIGNAL,
1195
0
                                            });
1196
0
                                        state.execution_metadata.execution_completed_timestamp =
1197
0
                                            (self.running_actions_manager.callbacks.now_fn)();
1198
0
                                    }
1199
0
                                    return Ok(self);
1200
                                }
1201
                            };
1202
2
                            timer.measure();
1203
1204
2
                            if response.exit_code == 0 {
1205
2
                                self.metrics().child_process_success_error_code.inc();
1206
2
                            } else {
1207
0
                                self.metrics().child_process_failure_error_code.inc();
1208
0
                            }
1209
2
                            info!(?args, ?key, "Persistent worker command complete");
1210
2
                            {
1211
2
                                let mut state = self.state.lock();
1212
2
                                state.command_proto = Some(command_proto);
1213
2
                                state.execution_result = Some(RunningActionImplExecutionResult {
1214
2
                                    stdout: Bytes::new(),
1215
2
                                    stderr: Bytes::from(response.output),
1216
2
                                    exit_code: response.exit_code,
1217
2
                                });
1218
2
                                state.execution_metadata.execution_completed_timestamp =
1219
2
                                    (self.running_actions_manager.callbacks.now_fn)();
1220
2
                            }
1221
2
                            return Ok(self);
1222
                        }
1223
0
                        Err(err) => {
1224
0
                            info!(
1225
                                ?err,
1226
                                ?key,
1227
                                "Falling back to one-shot execution; persistent worker unavailable"
1228
                            );
1229
                        }
1230
                    }
1231
                }
1232
0
                Err(err) => {
1233
0
                    info!(
1234
                        ?err,
1235
                        "Falling back to one-shot execution; unsupported persistent worker protocol"
1236
                    );
1237
                }
1238
            }
1239
16
        }
1240
1241
16
        let mut command_builder = process::Command::new(program);
1242
        #[cfg(target_family = "unix")]
1243
16
        command_builder.arg0(args[0]);
1244
16
        command_builder
1245
16
            .args(&args[1..])
1246
16
            .kill_on_drop(true)
1247
16
            .stdin(Stdio::null())
1248
16
            .stdout(Stdio::piped())
1249
16
            .stderr(Stdio::piped())
1250
16
            .current_dir(format!(
1251
16
                "{}/{}",
1252
16
                self.work_directory, command_proto.working_directory
1253
16
            ))
1254
16
            .env_clear();
1255
1256
16
        let requested_timeout = if self.action_info.timeout.is_zero() {
1257
14
            self.running_actions_manager.max_action_timeout
1258
        } else {
1259
2
            self.action_info.timeout
1260
        };
1261
1262
16
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
1263
16
        if let Some(
additional_environment0
) = &self
1264
16
            .running_actions_manager
1265
16
            .execution_configuration
1266
16
            .additional_environment
1267
        {
1268
0
            for (name, source) in additional_environment {
1269
0
                let value = match source {
1270
0
                    EnvironmentSource::Property(property) => self
1271
0
                        .action_info
1272
0
                        .platform_properties
1273
0
                        .get(property)
1274
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
1275
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
1276
                    EnvironmentSource::FromEnvironment => {
1277
0
                        Cow::Owned(env::var(name).unwrap_or_default())
1278
                    }
1279
                    EnvironmentSource::TimeoutMillis => {
1280
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
1281
                    }
1282
                    EnvironmentSource::SideChannelFile => {
1283
0
                        let file_cow =
1284
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
1285
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
1286
0
                        Cow::Owned(file_cow)
1287
                    }
1288
                    EnvironmentSource::ActionDirectory => {
1289
0
                        Cow::Borrowed(self.action_directory.as_str())
1290
                    }
1291
                };
1292
0
                command_builder.env(name, value.as_ref());
1293
            }
1294
16
        }
1295
1296
        #[cfg(target_family = "unix")]
1297
16
        let envs = &command_proto.environment_variables;
1298
        // If SystemRoot is not set on windows we set it to default. Failing to do
1299
        // this causes all commands to fail.
1300
        #[cfg(target_family = "windows")]
1301
        let envs = {
1302
            let mut envs = command_proto.environment_variables.clone();
1303
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
1304
                envs.push(
1305
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
1306
                        name: "SystemRoot".to_string(),
1307
                        value: "C:\\Windows".to_string(),
1308
                    },
1309
                );
1310
            }
1311
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
1312
                envs.push(
1313
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
1314
                        name: "PATH".to_string(),
1315
                        value: "C:\\Windows\\System32".to_string(),
1316
                    },
1317
                );
1318
            }
1319
            envs
1320
        };
1321
16
        for environment_variable in envs {
1322
16
            command_builder.env(&environment_variable.name, &environment_variable.value);
1323
16
        }
1324
1325
        // Sandboxing of the command if we are running on Linux, this resolves issues where
1326
        // children can spawn children and also provides better reproducibility.
1327
        #[cfg(target_os = "linux")]
1328
        {
1329
16
            let use_namespaces = self.running_actions_manager.use_namespaces;
1330
16
            let root_action_directory =
1331
16
                std::ffi::CString::new(self.running_actions_manager.root_action_directory.clone())
1332
16
                    .err_tip(|| "In RunningActionImpl::inner_execute()")
?0
;
1333
16
            let action_directory = std::ffi::CString::new(self.action_directory.clone())
1334
16
                .err_tip(|| "In RunningActionImpl::inner_execute()")
?0
;
1335
1336
            // SAFETY: This function is specifically designed to operate in a async-signal-safe
1337
            // environment.
1338
            unsafe {
1339
16
                command_builder.pre_exec(move || match 
use_namespaces0
{
1340
0
                    UseNamespaces::No => Ok(()),
1341
0
                    _ => crate::namespace_utils::configure_namespace(
1342
0
                        matches!(use_namespaces, UseNamespaces::YesAndMount),
1343
0
                        &root_action_directory,
1344
0
                        &action_directory,
1345
                    ),
1346
0
                });
1347
            }
1348
        }
1349
1350
16
        let mut child_process = command_builder
1351
16
            .spawn()
1352
16
            .err_tip(|| 
format!0
("Could not execute command {args:?}"))
?0
;
1353
16
        let mut stdout_reader = child_process
1354
16
            .stdout
1355
16
            .take()
1356
16
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
1357
16
        let mut stderr_reader = child_process
1358
16
            .stderr
1359
16
            .take()
1360
16
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
1361
1362
        #[cfg(target_os = "linux")]
1363
        // Wrap the child process to send SIGTERM rather than SIGKILL if namespaced to
1364
        // prevent zombie processes.
1365
16
        let child_process = crate::namespace_utils::MaybeNamespacedChild::new(
1366
16
            !matches!(
1367
16
                self.running_actions_manager.use_namespaces,
1368
                UseNamespaces::No,
1369
            ),
1370
16
            child_process,
1371
        );
1372
1373
16
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
1374
0
            let result: Result<Option<std::process::ExitStatus>, std::io::Error> =
1375
0
                child_process.try_wait();
1376
0
            match result {
1377
0
                Ok(res) if res.is_some() => {
1378
0
                    // The child already exited, probably a timeout or kill operation
1379
0
                }
1380
0
                result => {
1381
0
                    error!(
1382
                        ?result,
1383
                        "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
1384
                    );
1385
0
                    background_spawn!("running_actions_manager_kill_child_process", async move {
1386
0
                        drop(child_process.kill().await);
1387
0
                    });
1388
                }
1389
            }
1390
0
        });
1391
1392
16
        let all_stdout_fut = spawn!("stdout_reader", async move {
1393
16
            let mut all_stdout = BytesMut::new();
1394
            loop {
1395
19
                let sz = stdout_reader
1396
19
                    .read_buf(&mut all_stdout)
1397
19
                    .await
1398
19
                    .err_tip(|| "Error reading stdout stream")
?0
;
1399
19
                if sz == 0 {
1400
16
                    break; // EOF.
1401
3
                }
1402
            }
1403
16
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
1404
16
        });
1405
16
        let all_stderr_fut = spawn!("stderr_reader", async move {
1406
16
            let mut all_stderr = BytesMut::new();
1407
            loop {
1408
19
                let sz = stderr_reader
1409
19
                    .read_buf(&mut all_stderr)
1410
19
                    .await
1411
19
                    .err_tip(|| "Error reading stderr stream")
?0
;
1412
19
                if sz == 0 {
1413
16
                    break; // EOF.
1414
3
                }
1415
            }
1416
16
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
1417
16
        });
1418
16
        let mut killed_action = false;
1419
1420
16
        let timer = self.metrics().child_process.begin_timer();
1421
16
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
1422
        loop {
1423
20
            tokio::select! {
1424
20
                () = &mut sleep_fut => {
1425
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
1426
2
                    killed_action = true;
1427
2
                    if let Err(
err0
) = child_process_guard.kill().await {
1428
0
                        error!(
1429
                            ?err,
1430
                            "Could not kill process in RunningActionsManager for action timeout",
1431
                        );
1432
2
                    }
1433
                    {
1434
2
                        let joined_command = args.join(OsStr::new(" "));
1435
2
                        let command = joined_command.to_string_lossy();
1436
2
                        info!(
1437
2
                            seconds = self.action_info.timeout.as_secs_f32(),
1438
                            %command,
1439
                            "Command timed out"
1440
                        );
1441
2
                        let mut state = self.state.lock();
1442
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1443
2
                            Code::DeadlineExceeded,
1444
2
                            format!(
1445
2
                                "Command '{}' timed out after {} seconds",
1446
2
                                command,
1447
2
                                self.action_info.timeout.as_secs_f32()
1448
2
                            )
1449
2
                        )));
1450
                    }
1451
                },
1452
20
                
maybe_exit_status16
= child_process_guard.wait() => {
1453
                    // Defuse our guard so it does not try to cleanup and make senseless logs.
1454
16
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
1455
16
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
1456
                    // TODO(palfrey) We should implement stderr/stdout streaming to client here.
1457
                    // If we get killed before the stream is started, then these will lock up.
1458
                    // TODO(palfrey) There is a significant bug here. If we kill the action and the action creates
1459
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
1460
16
                    let (stdout, stderr) = if killed_action {
1461
4
                        drop(timer);
1462
4
                        (Bytes::new(), Bytes::new())
1463
                    } else {
1464
12
                        timer.measure();
1465
12
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
1466
                        (
1467
12
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
1468
12
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
1469
                        )
1470
                    };
1471
1472
16
                    let exit_code = exit_status.code().map_or_else(|| 
{4
1473
                        // No exit code means the runner was terminated by a
1474
                        // signal. SIGKILL on Linux is the kernel OOM killer's
1475
                        // weapon of choice, so flag this for operators trying
1476
                        // to correlate action failures with kubectl-top
1477
                        // memory pressure.
1478
4
                        warn!(
1479
                            ?args,
1480
                            "Runner subprocess terminated by signal (no exit code); likely OOMKilled \
1481
                             or externally killed. If this repeats for the same action, raise \
1482
                             `workers.specs[*].resources.limits.memory` or shrink the action's \
1483
                             concurrency."
1484
                        );
1485
4
                        self.metrics().child_process_failure_error_code.inc();
1486
4
                        EXIT_CODE_FOR_SIGNAL
1487
12
                    
}4
, |exit_code| {
1488
12
                        if exit_code == 0 {
1489
11
                            self.metrics().child_process_success_error_code.inc();
1490
11
                        } else {
1491
1
                            self.metrics().child_process_failure_error_code.inc();
1492
1
                        }
1493
12
                        exit_code
1494
12
                    });
1495
1496
16
                    info!(?args, "Command complete");
1497
1498
16
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
1499
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
1500
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
1501
                    } else {
1502
16
                        None
1503
                    };
1504
16
                    {
1505
16
                        let mut state = self.state.lock();
1506
16
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
1507
16
1508
16
                        state.command_proto = Some(command_proto);
1509
16
                        state.execution_result = Some(RunningActionImplExecutionResult{
1510
16
                            stdout,
1511
16
                            stderr,
1512
16
                            exit_code,
1513
16
                        });
1514
16
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
1515
16
                    }
1516
16
                    return Ok(self);
1517
                },
1518
20
                _ = &mut kill_channel_rx => {
1519
2
                    killed_action = true;
1520
2
                    if let Err(
err0
) = child_process_guard.kill().await {
1521
0
                        error!(
1522
0
                            operation_id = ?self.operation_id,
1523
                            ?err,
1524
                            "Could not kill process",
1525
                        );
1526
2
                    }
1527
2
                    {
1528
2
                        let mut state = self.state.lock();
1529
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1530
2
                            Code::Aborted,
1531
2
                            format!(
1532
2
                                "Command '{}' was killed by scheduler",
1533
2
                                args.join(OsStr::new(" ")).to_string_lossy()
1534
2
                            )
1535
2
                        )));
1536
2
                    }
1537
                },
1538
            }
1539
        }
1540
        // Unreachable.
1541
19
    }
1542
1543
16
    
async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1544
        enum OutputType {
1545
            None,
1546
            File(FileInfo),
1547
            Directory(DirectoryInfo),
1548
            FileSymlink(SymlinkInfo),
1549
            DirectorySymlink(SymlinkInfo),
1550
        }
1551
1552
16
        let upload_start = std::time::Instant::now();
1553
16
        debug!(
1554
16
            operation_id = ?self.operation_id,
1555
            "Worker uploading results - starting",
1556
        );
1557
16
        let (mut command_proto, execution_result, mut execution_metadata) = {
1558
16
            let mut state = self.state.lock();
1559
16
            state.execution_metadata.output_upload_start_timestamp =
1560
16
                (self.running_actions_manager.callbacks.now_fn)();
1561
            (
1562
16
                state
1563
16
                    .command_proto
1564
16
                    .take()
1565
16
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1566
16
                state
1567
16
                    .execution_result
1568
16
                    .take()
1569
16
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1570
16
                state.execution_metadata.clone(),
1571
            )
1572
        };
1573
16
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1574
16
        let hasher = self.action_info.unique_qualifier.digest_function();
1575
1576
16
        let mut output_path_futures = FuturesUnordered::new();
1577
16
        let mut output_paths = command_proto.output_paths;
1578
16
        if output_paths.is_empty() {
1579
6
            output_paths
1580
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1581
6
            output_paths.append(&mut command_proto.output_files);
1582
6
            output_paths.append(&mut command_proto.output_directories);
1583
10
        }
1584
16
        let digest_uploaders = Arc::new(Mutex::new(HashMap::new()));
1585
16
        for 
entry13
in output_paths {
1586
13
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
1587
3
                format!("{}/{}", self.work_directory, entry)
1588
            } else {
1589
10
                format!(
1590
                    "{}/{}/{}",
1591
10
                    self.work_directory, command_proto.working_directory, entry
1592
                )
1593
            });
1594
13
            let work_directory = &self.work_directory;
1595
13
            let digest_uploaders = digest_uploaders.clone();
1596
13
            output_path_futures.push(async move {
1597
6
                let metadata = {
1598
13
                    let metadata = match fs::symlink_metadata(&full_path).await {
1599
13
                        Ok(file) => file,
1600
0
                        Err(e) => {
1601
0
                            if e.code == Code::NotFound {
1602
                                // In the event our output does not exist, according to the bazel remote
1603
                                // execution spec, we simply ignore it continue.
1604
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1605
0
                            }
1606
0
                            return Err(e).err_tip(|| {
1607
0
                                format!("Could not open file {}", full_path.display())
1608
0
                            });
1609
                        }
1610
                    };
1611
1612
13
                    if metadata.is_file() {
1613
                        return Ok(OutputType::File(
1614
7
                            upload_file(
1615
7
                                cas_store.as_pin(),
1616
7
                                &full_path,
1617
7
                                hasher,
1618
7
                                metadata,
1619
7
                                digest_uploaders,
1620
7
                            )
1621
7
                            .await
1622
7
                            .map(|mut file_info| {
1623
7
                                file_info.name_or_path = NameOrPath::Path(entry);
1624
7
                                file_info
1625
7
                            })
1626
7
                            .err_tip(|| 
format!0
("Uploading file {}",
full_path.display()0
))
?0
,
1627
                        ));
1628
6
                    }
1629
6
                    metadata
1630
                };
1631
6
                if metadata.is_dir() {
1632
                    Ok(OutputType::Directory(
1633
2
                        upload_directory(
1634
2
                            cas_store.as_pin(),
1635
2
                            &full_path,
1636
2
                            work_directory,
1637
2
                            hasher,
1638
2
                            digest_uploaders,
1639
                        )
1640
2
                        .and_then(|(root_dir, children)| async move {
1641
2
                            let tree = ProtoTree {
1642
2
                                root: Some(root_dir),
1643
2
                                children: children.into(),
1644
2
                            };
1645
2
                            let tree_digest = serialize_and_upload_message(
1646
2
                                &tree,
1647
2
                                cas_store.as_pin(),
1648
2
                                &mut hasher.hasher(),
1649
2
                            )
1650
2
                            .await
1651
2
                            .err_tip(|| 
format!0
("While processing {entry}"))
?0
;
1652
2
                            Ok(DirectoryInfo {
1653
2
                                path: entry,
1654
2
                                tree_digest,
1655
2
                            })
1656
4
                        })
1657
2
                        .await
1658
2
                        .err_tip(|| 
format!0
("Uploading directory {}",
full_path.display()0
))
?0
,
1659
                    ))
1660
4
                } else if metadata.is_symlink() {
1661
                    // Resolve the symlink to determine what it points to.
1662
                    // Symlinks created by DirectoryCache (absolute paths into
1663
                    // the cache directory) must NOT be uploaded as symlinks —
1664
                    // the target path is worker-local and meaningless to the
1665
                    // client. Instead, follow the symlink and upload the
1666
                    // resolved content (file or directory).
1667
4
                    let target = fs::read_link(&full_path).await.err_tip(|| 
{0
1668
0
                        format!("Reading symlink target for {}", full_path.display())
1669
0
                    })?;
1670
4
                    let is_absolute_symlink = Path::new(&target).is_absolute();
1671
1672
4
                    if is_absolute_symlink {
1673
                        // Absolute symlink — resolve and upload contents.
1674
3
                        match fs::metadata(&full_path).await {
1675
3
                            Ok(resolved_meta) => {
1676
3
                                if resolved_meta.is_dir() {
1677
                                    // Upload as directory (Tree proto).
1678
                                    Ok(OutputType::Directory(
1679
1
                                        upload_directory(
1680
1
                                            cas_store.as_pin(),
1681
1
                                            &full_path,
1682
1
                                            work_directory,
1683
1
                                            hasher,
1684
1
                                            digest_uploaders,
1685
                                        )
1686
1
                                        .and_then(|(root_dir, children)| async move {
1687
1
                                            let tree = ProtoTree {
1688
1
                                                root: Some(root_dir),
1689
1
                                                children: children.into(),
1690
1
                                            };
1691
1
                                            let tree_digest = serialize_and_upload_message(
1692
1
                                                &tree,
1693
1
                                                cas_store.as_pin(),
1694
1
                                                &mut hasher.hasher(),
1695
1
                                            )
1696
1
                                            .await
1697
1
                                            .err_tip(|| 
format!0
("While processing {entry}"))
?0
;
1698
1
                                            Ok(DirectoryInfo {
1699
1
                                                path: entry,
1700
1
                                                tree_digest,
1701
1
                                            })
1702
2
                                        })
1703
1
                                        .await
1704
1
                                        .err_tip(|| 
{0
1705
0
                                            format!(
1706
                                                "Uploading symlinked directory {}",
1707
0
                                                full_path.display()
1708
                                            )
1709
0
                                        })?,
1710
                                    ))
1711
                                } else {
1712
                                    // Upload as file (follow symlink).
1713
                                    Ok(OutputType::File(
1714
2
                                        upload_file(
1715
2
                                            cas_store.as_pin(),
1716
2
                                            &full_path,
1717
2
                                            hasher,
1718
2
                                            resolved_meta,
1719
2
                                            digest_uploaders,
1720
2
                                        )
1721
2
                                        .await
1722
2
                                        .map(|mut file_info| {
1723
2
                                            file_info.name_or_path = NameOrPath::Path(entry);
1724
2
                                            file_info
1725
2
                                        })
1726
2
                                        .err_tip(|| 
{0
1727
0
                                            format!(
1728
                                                "Uploading symlinked file {}",
1729
0
                                                full_path.display()
1730
                                            )
1731
0
                                        })?,
1732
                                    ))
1733
                                }
1734
                            }
1735
0
                            Err(e) => {
1736
0
                                if e.code != Code::NotFound {
1737
0
                                    return Err(e).err_tip(|| {
1738
0
                                        format!(
1739
                                            "While resolving absolute symlink {}",
1740
0
                                            full_path.display()
1741
                                        )
1742
0
                                    });
1743
0
                                }
1744
0
                                Ok(OutputType::None)
1745
                            }
1746
                        }
1747
                    } else {
1748
                        // Relative symlink — action intentionally created it.
1749
                        // Upload as a proper symlink.
1750
1
                        let output_symlink = upload_symlink(&full_path, work_directory)
1751
1
                            .await
1752
1
                            .map(|mut symlink_info| {
1753
1
                                symlink_info.name_or_path = NameOrPath::Path(entry);
1754
1
                                symlink_info
1755
1
                            })
1756
1
                            .err_tip(|| 
format!0
("Uploading symlink {}",
full_path.display()0
))
?0
;
1757
1
                        match fs::metadata(&full_path).await {
1758
1
                            Ok(metadata) => {
1759
1
                                if metadata.is_dir() {
1760
0
                                    Ok(OutputType::DirectorySymlink(output_symlink))
1761
                                } else {
1762
                                    // Note: If it's anything but directory we put it as a file symlink.
1763
1
                                    Ok(OutputType::FileSymlink(output_symlink))
1764
                                }
1765
                            }
1766
0
                            Err(e) => {
1767
0
                                if e.code != Code::NotFound {
1768
0
                                    return Err(e).err_tip(|| {
1769
0
                                        format!(
1770
                                            "While querying target symlink metadata for {}",
1771
0
                                            full_path.display()
1772
                                        )
1773
0
                                    });
1774
0
                                }
1775
                                // If the file doesn't exist, we consider it a file. Even though the
1776
                                // file doesn't exist we still need to populate an entry.
1777
0
                                Ok(OutputType::FileSymlink(output_symlink))
1778
                            }
1779
                        }
1780
                    }
1781
                } else {
1782
0
                    Err(make_err!(
1783
0
                        Code::Internal,
1784
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1785
0
                    ))
1786
                }
1787
13
            });
1788
        }
1789
16
        let mut output_files = vec![];
1790
16
        let mut output_folders = vec![];
1791
16
        let mut output_directory_symlinks = vec![];
1792
16
        let mut output_file_symlinks = vec![];
1793
1794
16
        if execution_result.exit_code != 0 {
1795
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1796
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1797
5
            error!(
1798
                exit_code = ?execution_result.exit_code,
1799
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1800
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1801
                command = ?command_proto.arguments,
1802
                "Command returned non-zero exit code",
1803
            );
1804
11
        }
1805
1806
16
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1807
16
            let start = std::time::Instant::now();
1808
16
            let data = execution_result.stdout;
1809
16
            let data_len = data.len();
1810
16
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1811
16
            cas_store
1812
16
                .update_oneshot(digest, data)
1813
16
                .await
1814
16
                .err_tip(|| "Uploading stdout")
?0
;
1815
16
            debug!(
1816
                ?digest,
1817
                data_len,
1818
16
                elapsed_ms = start.elapsed().as_millis(),
1819
                "upload_results: stdout upload completed",
1820
            );
1821
16
            Result::<DigestInfo, Error>::Ok(digest)
1822
16
        });
1823
16
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1824
16
            let start = std::time::Instant::now();
1825
16
            let data = execution_result.stderr;
1826
16
            let data_len = data.len();
1827
16
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1828
16
            cas_store
1829
16
                .update_oneshot(digest, data)
1830
16
                .await
1831
16
                .err_tip(|| "Uploading  stderr")
?0
;
1832
16
            debug!(
1833
                ?digest,
1834
                data_len,
1835
16
                elapsed_ms = start.elapsed().as_millis(),
1836
                "upload_results: stderr upload completed",
1837
            );
1838
16
            Result::<DigestInfo, Error>::Ok(digest)
1839
16
        });
1840
1841
16
        debug!(
1842
16
            operation_id = ?self.operation_id,
1843
16
            num_output_paths = output_path_futures.len(),
1844
            "upload_results: starting stdout/stderr/output_paths uploads",
1845
        );
1846
16
        let join_start = std::time::Instant::now();
1847
16
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1848
29
            while let Some(
output_type13
) = output_path_futures.try_next().await
?0
{
1849
13
                match output_type {
1850
9
                    OutputType::File(output_file) => output_files.push(output_file),
1851
3
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1852
1
                    OutputType::FileSymlink(output_symlink) => {
1853
1
                        output_file_symlinks.push(output_symlink);
1854
1
                    }
1855
0
                    OutputType::DirectorySymlink(output_symlink) => {
1856
0
                        output_directory_symlinks.push(output_symlink);
1857
0
                    }
1858
0
                    OutputType::None => { /* Safe to ignore */ }
1859
                }
1860
            }
1861
16
            Ok(())
1862
16
        });
1863
16
        drop(output_path_futures);
1864
16
        debug!(
1865
16
            operation_id = ?self.operation_id,
1866
16
            elapsed_ms = join_start.elapsed().as_millis(),
1867
16
            success = upload_result.is_ok(),
1868
            "upload_results: all uploads completed",
1869
        );
1870
16
        let (stdout_digest, stderr_digest) = match upload_result {
1871
16
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1872
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1873
        };
1874
1875
16
        execution_metadata.output_upload_completed_timestamp =
1876
16
            (self.running_actions_manager.callbacks.now_fn)();
1877
16
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1878
16
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1879
16
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1880
16
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1881
16
        let num_output_files = output_files.len();
1882
16
        let num_output_folders = output_folders.len();
1883
16
        {
1884
16
            let mut state = self.state.lock();
1885
16
            execution_metadata.worker_completed_timestamp =
1886
16
                (self.running_actions_manager.callbacks.now_fn)();
1887
16
            state.action_result = Some(ActionResult {
1888
16
                output_files,
1889
16
                output_folders,
1890
16
                output_directory_symlinks,
1891
16
                output_file_symlinks,
1892
16
                exit_code: execution_result.exit_code,
1893
16
                stdout_digest,
1894
16
                stderr_digest,
1895
16
                execution_metadata,
1896
16
                server_logs: HashMap::default(), // TODO(palfrey) Not implemented.
1897
16
                error: state.error.clone(),
1898
16
                message: String::new(), // Will be filled in on cache_action_result if needed.
1899
16
            });
1900
16
        }
1901
16
        debug!(
1902
16
            operation_id = ?self.operation_id,
1903
16
            total_elapsed_ms = upload_start.elapsed().as_millis(),
1904
            num_output_files,
1905
            num_output_folders,
1906
            "upload_results: inner_upload_results completed successfully",
1907
        );
1908
16
        Ok(self)
1909
16
    }
1910
1911
16
    
async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1912
16
        let mut state = self.state.lock();
1913
16
        state
1914
16
            .action_result
1915
16
            .take()
1916
16
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1917
16
    }
1918
}
1919
1920
impl Drop for RunningActionImpl {
1921
25
    fn drop(&mut self) {
1922
25
        if self.did_cleanup.load(Ordering::Acquire) {
1923
24
            if self.has_manager_entry.load(Ordering::Acquire) {
1924
2
                drop(
1925
2
                    self.running_actions_manager
1926
2
                        .cleanup_action(&self.operation_id),
1927
2
                );
1928
22
            }
1929
24
            return;
1930
1
        }
1931
1
        let operation_id = self.operation_id.clone();
1932
1
        error!(
1933
            %operation_id,
1934
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1935
        );
1936
1
        let running_actions_manager = self.running_actions_manager.clone();
1937
1
        let action_directory = self.action_directory.clone();
1938
1
        background_spawn!("running_action_impl_drop", async move 
{0
1939
0
            let Err(err) =
1940
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1941
            else {
1942
0
                return;
1943
            };
1944
0
            error!(
1945
                %operation_id,
1946
                ?action_directory,
1947
                ?err,
1948
                "Error cleaning up action"
1949
            );
1950
0
        });
1951
25
    }
1952
}
1953
1954
impl RunningAction for RunningActionImpl {
1955
0
    fn get_operation_id(&self) -> &OperationId {
1956
0
        &self.operation_id
1957
0
    }
1958
1959
21
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1960
21
        let res = self
1961
21
            .metrics()
1962
21
            .clone()
1963
21
            .prepare_action
1964
21
            .wrap(Self::inner_prepare_action(self))
1965
21
            .await;
1966
21
        if let Err(
ref e0
) = res {
1967
0
            warn!(?e, "Error during prepare_action");
1968
21
        }
1969
21
        res
1970
21
    }
1971
1972
19
    async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1973
19
        let res = self
1974
19
            .metrics()
1975
19
            .clone()
1976
19
            .execute
1977
19
            .wrap(Self::inner_execute(self))
1978
19
            .await;
1979
19
        if let Err(
ref e1
) = res {
1980
1
            warn!(?e, "Error during prepare_action");
1981
18
        }
1982
19
        res
1983
19
    }
1984
1985
16
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1986
16
        let upload_timeout = self.running_actions_manager.max_upload_timeout;
1987
16
        let operation_id = self.operation_id.clone();
1988
16
        info!(
1989
            ?operation_id,
1990
16
            upload_timeout_s = upload_timeout.as_secs(),
1991
            "upload_results: starting with timeout",
1992
        );
1993
16
        let metrics = self.metrics().clone();
1994
16
        let upload_fut = metrics
1995
16
            .upload_results
1996
16
            .wrap(Self::inner_upload_results(self));
1997
1998
16
        let stall_warn_fut = async 
{13
1999
13
            let mut elapsed_secs = 0u64;
2000
            loop {
2001
13
                tokio::time::sleep(Duration::from_mins(1)).await;
2002
0
                elapsed_secs += 60;
2003
0
                warn!(
2004
                    ?operation_id,
2005
                    elapsed_s = elapsed_secs,
2006
0
                    timeout_s = upload_timeout.as_secs(),
2007
                    "upload_results: still in progress — possible stall",
2008
                );
2009
            }
2010
        };
2011
2012
16
        let res = tokio::time::timeout(upload_timeout, async {
2013
16
            tokio::pin!(upload_fut);
2014
16
            tokio::pin!(stall_warn_fut);
2015
16
            tokio::select! {
2016
16
                result = &mut upload_fut => result,
2017
16
                () = &mut stall_warn_fut => 
unreachable!0
(),
2018
            }
2019
16
        })
2020
16
        .await
2021
16
        .map_err(|err| 
{0
2022
0
            warn!(%operation_id, timeout=upload_timeout.as_secs(), "Upload results timeout");
2023
0
            Error::from_std_err(Code::DeadlineExceeded, &err).append(format!(
2024
                "Upload results timed out after {}s for operation {:?}",
2025
0
                upload_timeout.as_secs(),
2026
                operation_id,
2027
            ))
2028
0
        })?;
2029
16
        if let Err(
ref e0
) = res {
2030
0
            warn!(?operation_id, ?e, "Error during upload_results");
2031
16
        }
2032
16
        res
2033
16
    }
2034
2035
22
    async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
2036
22
        let res = self
2037
22
            .metrics()
2038
22
            .clone()
2039
22
            .cleanup
2040
22
            .wrap(async move {
2041
22
                let result = do_cleanup(
2042
22
                    &self.running_actions_manager,
2043
22
                    &self.operation_id,
2044
22
                    &self.action_directory,
2045
22
                )
2046
22
                .await;
2047
22
                self.has_manager_entry.store(false, Ordering::Release);
2048
22
                self.did_cleanup.store(true, Ordering::Release);
2049
22
                result.map(move |()| self)
2050
22
            })
2051
22
            .await;
2052
22
        if let Err(
ref e0
) = res {
2053
0
            warn!(?e, "Error during cleanup");
2054
22
        }
2055
22
        res
2056
22
    }
2057
2058
16
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
2059
16
        self.metrics()
2060
16
            .clone()
2061
16
            .get_finished_result
2062
16
            .wrap(Self::inner_get_finished_result(self))
2063
16
            .await
2064
16
    }
2065
2066
0
    fn get_work_directory(&self) -> &String {
2067
0
        &self.work_directory
2068
0
    }
2069
}
2070
2071
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
2072
    type RunningAction: RunningAction;
2073
2074
    fn create_and_add_action(
2075
        self: &Arc<Self>,
2076
        worker_id: String,
2077
        start_execute: StartExecute,
2078
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
2079
2080
    fn cache_action_result(
2081
        &self,
2082
        action_digest: DigestInfo,
2083
        action_result: &mut ActionResult,
2084
        hasher: DigestHasherFunc,
2085
    ) -> impl Future<Output = Result<(), Error>> + Send;
2086
2087
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
2088
2089
    fn kill_operation(
2090
        &self,
2091
        operation_id: &OperationId,
2092
    ) -> impl Future<Output = Result<(), Error>> + Send;
2093
2094
    fn metrics(&self) -> &Arc<Metrics>;
2095
}
2096
2097
/// A function to get the current system time, used to allow mocking for tests
2098
type NowFn = fn() -> SystemTime;
2099
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
2100
2101
/// Functions that may be injected for testing purposes, during standard control
2102
/// flows these are specified by the new function.
2103
#[derive(Clone, Copy)]
2104
pub struct Callbacks {
2105
    /// A function that gets the current time.
2106
    pub now_fn: NowFn,
2107
    /// A function that sleeps for a given Duration.
2108
    pub sleep_fn: SleepFn,
2109
}
2110
2111
impl Debug for Callbacks {
2112
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2113
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
2114
0
    }
2115
}
2116
2117
/// The set of additional information for executing an action over and above
2118
/// those given in the `ActionInfo` passed to the worker.  This allows
2119
/// modification of the action for execution on this particular worker.  This
2120
/// may be used to run the action with a particular set of additional
2121
/// environment variables, or perhaps configure it to execute within a
2122
/// container.
2123
#[derive(Debug, Default)]
2124
pub struct ExecutionConfiguration {
2125
    /// If set, will be executed instead of the first argument passed in the
2126
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
2127
    /// arguments to this command.
2128
    pub entrypoint: Option<String>,
2129
    /// The only environment variables that will be specified when the command
2130
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
2131
    /// and PATH are also assigned (see `inner_execute`).
2132
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
2133
}
2134
2135
#[derive(Debug)]
2136
struct UploadActionResults {
2137
    upload_ac_results_strategy: UploadCacheResultsStrategy,
2138
    upload_historical_results_strategy: UploadCacheResultsStrategy,
2139
    ac_store: Option<Store>,
2140
    historical_store: Store,
2141
    success_message_template: Template,
2142
    failure_message_template: Template,
2143
}
2144
2145
impl UploadActionResults {
2146
32
    fn new(
2147
32
        config: &UploadActionResultConfig,
2148
32
        ac_store: Option<Store>,
2149
32
        historical_store: Store,
2150
32
    ) -> Result<Self, Error> {
2151
32
        let upload_historical_results_strategy = config
2152
32
            .upload_historical_results_strategy
2153
32
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
2154
8
        if !matches!(
2155
32
            config.upload_ac_results_strategy,
2156
            UploadCacheResultsStrategy::Never
2157
8
        ) && ac_store.is_none()
2158
        {
2159
0
            return Err(make_input_err!(
2160
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
2161
0
            ));
2162
32
        }
2163
        Ok(Self {
2164
32
            upload_ac_results_strategy: config.upload_ac_results_strategy,
2165
32
            upload_historical_results_strategy,
2166
32
            ac_store,
2167
32
            historical_store,
2168
32
            success_message_template: Template::new(&config.success_message_template).map_err(
2169
0
                |e| {
2170
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(format!(
2171
                        "Could not convert success_message_template to rust template: {}",
2172
                        config.success_message_template
2173
                    ))
2174
0
                },
2175
0
            )?,
2176
32
            failure_message_template: Template::new(&config.failure_message_template).map_err(
2177
0
                |e| {
2178
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(format!(
2179
                        "Could not convert failure_message_template to rust template: {}",
2180
                        config.success_message_template
2181
                    ))
2182
0
                },
2183
0
            )?,
2184
        })
2185
32
    }
2186
2187
0
    const fn should_cache_result(
2188
0
        strategy: UploadCacheResultsStrategy,
2189
0
        action_result: &ActionResult,
2190
0
        treat_infra_error_as_failure: bool,
2191
0
    ) -> bool {
2192
0
        let did_fail = action_result.exit_code != 0
2193
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
2194
0
        match strategy {
2195
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
2196
0
            UploadCacheResultsStrategy::Never => false,
2197
            // Never cache internal errors or timeouts.
2198
            UploadCacheResultsStrategy::Everything => {
2199
0
                treat_infra_error_as_failure || action_result.error.is_none()
2200
            }
2201
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
2202
        }
2203
0
    }
2204
2205
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
2206
    /// or `failure_message_template` config templates.
2207
5
    fn format_execute_response_message(
2208
5
        mut template_str: Template,
2209
5
        action_digest_info: DigestInfo,
2210
5
        maybe_historical_digest_info: Option<DigestInfo>,
2211
5
        hasher: DigestHasherFunc,
2212
5
    ) -> Result<String, Error> {
2213
5
        template_str.replace(
2214
            "digest_function",
2215
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
2216
        );
2217
5
        template_str.replace(
2218
            "action_digest_hash",
2219
5
            action_digest_info.packed_hash().to_string(),
2220
        );
2221
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
2222
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
2223
3
            template_str.replace(
2224
3
                "historical_results_hash",
2225
3
                format!("{}", historical_digest_info.packed_hash()),
2226
3
            );
2227
3
            template_str.replace(
2228
3
                "historical_results_size",
2229
3
                historical_digest_info.size_bytes(),
2230
3
            );
2231
3
        } else {
2232
2
            template_str.replace("historical_results_hash", "");
2233
2
            template_str.replace("historical_results_size", "");
2234
2
        }
2235
5
        template_str.text().map_err(|e| 
{0
2236
0
            Error::from_std_err(Code::InvalidArgument, &e)
2237
0
                .append("Could not convert template to text")
2238
0
        })
2239
5
    }
2240
2241
5
    async fn upload_ac_results(
2242
5
        &self,
2243
5
        action_digest: DigestInfo,
2244
5
        action_result: ProtoActionResult,
2245
5
        hasher: DigestHasherFunc,
2246
5
    ) -> Result<(), Error> {
2247
5
        let Some(ac_store) = self.ac_store.as_ref() else {
2248
0
            return Ok(());
2249
        };
2250
        // If we are a GrpcStore we shortcut here, as this is a special store.
2251
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
2252
0
            let update_action_request = UpdateActionResultRequest {
2253
0
                // This is populated by `update_action_result`.
2254
0
                instance_name: String::new(),
2255
0
                action_digest: Some(action_digest.into()),
2256
0
                action_result: Some(action_result),
2257
0
                results_cache_policy: None,
2258
0
                digest_function: hasher.proto_digest_func().into(),
2259
0
            };
2260
0
            return grpc_store
2261
0
                .update_action_result(Request::new(update_action_request))
2262
0
                .await
2263
0
                .map(|_| ())
2264
0
                .err_tip(|| "Caching ActionResult");
2265
5
        }
2266
2267
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
2268
5
        action_result
2269
5
            .encode(&mut store_data)
2270
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
2271
2272
5
        ac_store
2273
5
            .update_oneshot(action_digest, store_data.split().freeze())
2274
5
            .await
2275
5
            .err_tip(|| "Caching ActionResult")
2276
5
    }
2277
2278
3
    async fn upload_historical_results_with_message(
2279
3
        &self,
2280
3
        action_digest: DigestInfo,
2281
3
        execute_response: ExecuteResponse,
2282
3
        message_template: Template,
2283
3
        hasher: DigestHasherFunc,
2284
3
    ) -> Result<String, Error> {
2285
3
        let historical_digest_info = serialize_and_upload_message(
2286
3
            &HistoricalExecuteResponse {
2287
3
                action_digest: Some(action_digest.into()),
2288
3
                execute_response: Some(execute_response.clone()),
2289
3
            },
2290
3
            self.historical_store.as_pin(),
2291
3
            &mut hasher.hasher(),
2292
3
        )
2293
3
        .await
2294
3
        .err_tip(|| 
format!0
("Caching HistoricalExecuteResponse for digest: {action_digest}"))
?0
;
2295
2296
3
        Self::format_execute_response_message(
2297
3
            message_template,
2298
3
            action_digest,
2299
3
            Some(historical_digest_info),
2300
3
            hasher,
2301
        )
2302
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
2303
3
    }
2304
2305
6
    async fn cache_action_result(
2306
6
        &self,
2307
6
        action_info: DigestInfo,
2308
6
        action_result: &mut ActionResult,
2309
6
        hasher: DigestHasherFunc,
2310
6
    ) -> Result<(), Error> {
2311
6
        let should_upload_historical_results =
2312
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
2313
6
        let should_upload_ac_results =
2314
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
2315
        // Shortcut so we don't need to convert to proto if not needed.
2316
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
2317
1
            return Ok(());
2318
5
        }
2319
2320
5
        let mut execute_response = to_execute_response(action_result.clone());
2321
2322
        // In theory exit code should always be != 0 if there's an error, but for safety we
2323
        // catch both.
2324
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
2325
3
            self.success_message_template.clone()
2326
        } else {
2327
2
            self.failure_message_template.clone()
2328
        };
2329
2330
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
2331
3
            let maybe_message = self
2332
3
                .upload_historical_results_with_message(
2333
3
                    action_info,
2334
3
                    execute_response.clone(),
2335
3
                    message_template,
2336
3
                    hasher,
2337
3
                )
2338
3
                .await;
2339
3
            match maybe_message {
2340
3
                Ok(message) => {
2341
3
                    action_result.message.clone_from(&message);
2342
3
                    execute_response.message = message;
2343
3
                    Ok(())
2344
                }
2345
0
                Err(e) => Result::<(), Error>::Err(e),
2346
            }
2347
        } else {
2348
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
2349
            {
2350
2
                Ok(message) => {
2351
2
                    action_result.message.clone_from(&message);
2352
2
                    execute_response.message = message;
2353
2
                    Ok(())
2354
                }
2355
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
2356
            }
2357
        };
2358
2359
        // Note: Done in this order because we assume most results will succeed and most configs will
2360
        // either always upload upload historical results or only upload on failure. In which case
2361
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
2362
5
        let ac_upload_results = if should_upload_ac_results {
2363
5
            self.upload_ac_results(
2364
5
                action_info,
2365
5
                execute_response
2366
5
                    .result
2367
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
2368
5
                hasher,
2369
            )
2370
5
            .await
2371
        } else {
2372
0
            Ok(())
2373
        };
2374
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
2375
6
    }
2376
}
2377
2378
#[cfg(target_os = "linux")]
2379
#[derive(Copy, Clone, Debug)]
2380
pub enum UseNamespaces {
2381
    No,
2382
    Yes,
2383
    YesAndMount,
2384
}
2385
2386
#[derive(Debug)]
2387
pub struct RunningActionsManagerArgs<'a> {
2388
    pub root_action_directory: String,
2389
    pub execution_configuration: ExecutionConfiguration,
2390
    pub cas_store: Arc<FastSlowStore>,
2391
    pub ac_store: Option<Store>,
2392
    pub historical_store: Store,
2393
    pub upload_action_result_config: &'a UploadActionResultConfig,
2394
    pub max_action_timeout: Duration,
2395
    pub max_upload_timeout: Duration,
2396
    pub timeout_handled_externally: bool,
2397
    pub directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
2398
    #[cfg(target_os = "linux")]
2399
    pub use_namespaces: UseNamespaces,
2400
}
2401
2402
struct CleanupGuard {
2403
    manager: Weak<RunningActionsManagerImpl>,
2404
    operation_id: OperationId,
2405
}
2406
2407
impl Drop for CleanupGuard {
2408
22
    fn drop(&mut self) {
2409
22
        let Some(manager) = self.manager.upgrade() else {
2410
0
            return;
2411
        };
2412
22
        let mut cleaning = manager.cleaning_up_operations.lock();
2413
22
        cleaning.remove(&self.operation_id);
2414
22
        manager.cleanup_complete_notify.notify_waiters();
2415
22
    }
2416
}
2417
2418
/// Holds state info about what is being executed and the interface for interacting
2419
/// with actions while they are running.
2420
#[derive(Debug)]
2421
pub struct RunningActionsManagerImpl {
2422
    root_action_directory: String,
2423
    execution_configuration: ExecutionConfiguration,
2424
    cas_store: Arc<FastSlowStore>,
2425
    filesystem_store: Arc<FilesystemStore>,
2426
    upload_action_results: UploadActionResults,
2427
    max_action_timeout: Duration,
2428
    max_upload_timeout: Duration,
2429
    timeout_handled_externally: bool,
2430
    #[cfg(target_os = "linux")]
2431
    use_namespaces: UseNamespaces,
2432
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
2433
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
2434
    // Notify does not support.
2435
    action_done_tx: watch::Sender<()>,
2436
    callbacks: Callbacks,
2437
    metrics: Arc<Metrics>,
2438
    /// Track operations being cleaned up to avoid directory collisions during action retries.
2439
    /// When an action fails and is retried on the same worker, we need to ensure the previous
2440
    /// attempt's directory is fully cleaned up before creating a new one.
2441
    /// See: <https://github.com/TraceMachina/nativelink/issues/1859>
2442
    cleaning_up_operations: Mutex<HashSet<OperationId>>,
2443
    /// Notify waiters when a cleanup operation completes. This is used in conjunction with
2444
    /// `cleaning_up_operations` to coordinate directory cleanup and creation.
2445
    cleanup_complete_notify: Arc<Notify>,
2446
    /// Optional directory cache for improving performance by caching reconstructed
2447
    /// input directories and using hardlinks.
2448
    directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
2449
    persistent_worker_pool: PersistentWorkerPool,
2450
}
2451
2452
impl RunningActionsManagerImpl {
2453
    /// Maximum time to wait for a cleanup operation to complete before timing out.
2454
    /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
2455
    const MAX_WAIT: Duration = Duration::from_secs(30);
2456
    /// Maximum backoff duration for exponential backoff when waiting for cleanup.
2457
    const MAX_BACKOFF: Duration = Duration::from_millis(500);
2458
32
    pub fn new_with_callbacks(
2459
32
        args: RunningActionsManagerArgs<'_>,
2460
32
        callbacks: Callbacks,
2461
32
    ) -> Result<Self, Error> {
2462
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
2463
32
        let filesystem_store = args
2464
32
            .cas_store
2465
32
            .fast_store()
2466
32
            .downcast_ref::<FilesystemStore>(None)
2467
32
            .err_tip(
2468
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
2469
0
            )?
2470
32
            .get_arc()
2471
32
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
2472
32
        let (action_done_tx, _) = watch::channel(());
2473
        Ok(Self {
2474
32
            root_action_directory: args.root_action_directory,
2475
32
            execution_configuration: args.execution_configuration,
2476
32
            cas_store: args.cas_store,
2477
32
            filesystem_store,
2478
32
            upload_action_results: UploadActionResults::new(
2479
32
                args.upload_action_result_config,
2480
32
                args.ac_store,
2481
32
                args.historical_store,
2482
            )
2483
32
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
2484
32
            max_action_timeout: args.max_action_timeout,
2485
32
            max_upload_timeout: args.max_upload_timeout,
2486
32
            timeout_handled_externally: args.timeout_handled_externally,
2487
32
            running_actions: Mutex::new(HashMap::new()),
2488
32
            action_done_tx,
2489
32
            callbacks,
2490
32
            metrics: Arc::new(Metrics::default()),
2491
32
            cleaning_up_operations: Mutex::new(HashSet::new()),
2492
32
            cleanup_complete_notify: Arc::new(Notify::new()),
2493
32
            directory_cache: args.directory_cache,
2494
32
            persistent_worker_pool: PersistentWorkerPool::default(),
2495
            #[cfg(target_os = "linux")]
2496
32
            use_namespaces: args.use_namespaces,
2497
        })
2498
32
    }
2499
2500
13
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
2501
13
        Self::new_with_callbacks(
2502
13
            args,
2503
            Callbacks {
2504
13
                now_fn: SystemTime::now,
2505
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
2506
            },
2507
        )
2508
13
    }
2509
2510
    /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
2511
    /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
2512
    /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
2513
26
    async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error> {
2514
26
        let start = Instant::now();
2515
26
        let mut backoff = Duration::from_millis(10);
2516
26
        let mut has_waited = false;
2517
2518
        loop {
2519
26
            let should_wait = {
2520
26
                let cleaning = self.cleaning_up_operations.lock();
2521
26
                cleaning.contains(operation_id)
2522
            };
2523
2524
26
            if !should_wait {
2525
26
                let dir_path =
2526
26
                    PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
2527
2528
26
                if !dir_path.exists() {
2529
25
                    return Ok(());
2530
1
                }
2531
2532
                // Safety check: ensure we're only removing directories under root_action_directory
2533
1
                let root_path = Path::new(&self.root_action_directory);
2534
1
                let canonical_root = root_path.canonicalize().err_tip(|| 
{0
2535
0
                    format!(
2536
                        "Failed to canonicalize root directory: {}",
2537
                        self.root_action_directory
2538
                    )
2539
0
                })?;
2540
1
                let canonical_dir = dir_path.canonicalize().err_tip(|| 
{0
2541
0
                    format!("Failed to canonicalize directory: {}", dir_path.display())
2542
0
                })?;
2543
2544
1
                if !canonical_dir.starts_with(&canonical_root) {
2545
0
                    return Err(make_err!(
2546
0
                        Code::Internal,
2547
0
                        "Attempted to remove directory outside of root_action_directory: {}",
2548
0
                        dir_path.display()
2549
0
                    ));
2550
1
                }
2551
2552
                // Directory exists but not being cleaned - remove it
2553
1
                warn!(
2554
                    "Removing stale directory for {}: {}",
2555
                    operation_id,
2556
1
                    dir_path.display()
2557
                );
2558
1
                self.metrics.stale_removals.inc();
2559
2560
                // Try to remove the directory, with one retry on failure
2561
1
                let remove_result = fs::remove_dir_all(&dir_path).await;
2562
1
                if let Err(
e0
) = remove_result {
2563
                    // Retry once after a short delay in case the directory is temporarily locked
2564
0
                    tokio::time::sleep(Duration::from_millis(100)).await;
2565
0
                    fs::remove_dir_all(&dir_path).await.err_tip(|| {
2566
0
                        format!(
2567
                            "Failed to remove stale directory {} for retry of {} after retry (original error: {})",
2568
0
                            dir_path.display(),
2569
                            operation_id,
2570
                            e
2571
                        )
2572
0
                    })?;
2573
1
                }
2574
1
                return Ok(());
2575
0
            }
2576
2577
0
            if start.elapsed() > Self::MAX_WAIT {
2578
0
                self.metrics.cleanup_wait_timeouts.inc();
2579
0
                warn!(%operation_id, waited=?start.elapsed(), "Timeout waiting for previous operation cleanup");
2580
0
                return Err(make_err!(
2581
0
                    Code::DeadlineExceeded,
2582
0
                    "Timeout waiting for previous operation cleanup: {} (waited {:?})",
2583
0
                    operation_id,
2584
0
                    start.elapsed()
2585
0
                ));
2586
0
            }
2587
2588
0
            if !has_waited {
2589
0
                self.metrics.cleanup_waits.inc();
2590
0
                has_waited = true;
2591
0
            }
2592
2593
0
            trace!(
2594
                "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
2595
                operation_id,
2596
0
                start.elapsed(),
2597
                backoff
2598
            );
2599
2600
0
            tokio::select! {
2601
0
                () = self.cleanup_complete_notify.notified() => {},
2602
0
                () = tokio::time::sleep(backoff) => {
2603
0
                    // Exponential backoff
2604
0
                    backoff = (backoff * 2).min(Self::MAX_BACKOFF);
2605
0
                },
2606
            }
2607
        }
2608
26
    }
2609
2610
0
    fn make_action_directory<'a>(
2611
0
        &'a self,
2612
0
        operation_id: &'a OperationId,
2613
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
2614
26
        
self.metrics.make_action_directory0
.
wrap0
(async move {
2615
26
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
2616
26
            fs::create_dir(&action_directory)
2617
26
                .await
2618
26
                .err_tip(|| 
format!0
("Error creating action directory {action_directory}"))
?0
;
2619
26
            Ok(action_directory)
2620
26
        })
2621
0
    }
2622
2623
26
    fn create_action_info(
2624
26
        &self,
2625
26
        start_execute: StartExecute,
2626
26
        queued_timestamp: SystemTime,
2627
26
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
2628
26
        self.metrics.create_action_info.wrap(async move {
2629
26
            let execute_request = start_execute
2630
26
                .execute_request
2631
26
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
2632
26
            let action_digest: DigestInfo = execute_request
2633
26
                .action_digest
2634
26
                .clone()
2635
26
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
2636
26
                .try_into()
?0
;
2637
26
            let load_start_timestamp = (self.callbacks.now_fn)();
2638
26
            let action =
2639
26
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
2640
26
                    .await
2641
26
                    .err_tip(|| "During start_action")
?0
;
2642
26
            let action_info = ActionInfo::try_from_action_and_execute_request(
2643
26
                execute_request,
2644
26
                action,
2645
26
                load_start_timestamp,
2646
26
                queued_timestamp,
2647
            )
2648
26
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
2649
26
            Ok(action_info)
2650
26
        })
2651
26
    }
2652
2653
24
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
2654
24
        let mut running_actions = self.running_actions.lock();
2655
24
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
2656
0
            format!("Expected operation id '{operation_id}' to exist in RunningActionsManagerImpl")
2657
0
        });
2658
        // No need to copy anything, we just are telling the receivers an event happened.
2659
24
        self.action_done_tx.send_modify(|()| {});
2660
24
        result.map(|_| ())
2661
24
    }
2662
2663
    // Note: We do not capture metrics on this call, only `.kill_all()`.
2664
    // Important: When the future returns the process may still be running.
2665
2
    async fn kill_operation(action: Arc<RunningActionImpl>) {
2666
2
        warn!(
2667
2
            operation_id = ?action.operation_id,
2668
            "Sending kill to running operation",
2669
        );
2670
2
        let kill_channel_tx = {
2671
2
            let mut action_state = action.state.lock();
2672
2
            action_state.kill_channel_tx.take()
2673
        };
2674
2
        if let Some(kill_channel_tx) = kill_channel_tx
2675
2
            && kill_channel_tx.send(()).is_err()
2676
        {
2677
0
            error!(
2678
0
                operation_id = ?action.operation_id,
2679
                "Error sending kill to running operation",
2680
            );
2681
2
        }
2682
2
    }
2683
2684
22
    fn perform_cleanup(self: &Arc<Self>, operation_id: OperationId) -> Option<CleanupGuard> {
2685
22
        let mut cleaning = self.cleaning_up_operations.lock();
2686
22
        cleaning
2687
22
            .insert(operation_id.clone())
2688
22
            .then_some(CleanupGuard {
2689
22
                manager: Arc::downgrade(self),
2690
22
                operation_id,
2691
22
            })
2692
22
    }
2693
}
2694
2695
impl RunningActionsManager for RunningActionsManagerImpl {
2696
    type RunningAction = RunningActionImpl;
2697
2698
26
    async fn create_and_add_action(
2699
26
        self: &Arc<Self>,
2700
26
        worker_id: String,
2701
26
        start_execute: StartExecute,
2702
26
    ) -> Result<Arc<RunningActionImpl>, Error> {
2703
26
        self.metrics
2704
26
            .create_and_add_action
2705
26
            .wrap(async move {
2706
26
                let queued_timestamp = start_execute
2707
26
                    .queued_timestamp
2708
26
                    .and_then(|time| 
time16
.
try_into16
().
ok16
())
2709
26
                    .unwrap_or(SystemTime::UNIX_EPOCH);
2710
26
                let operation_id = start_execute
2711
26
                    .operation_id.as_str().into();
2712
26
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
2713
26
                debug!(
2714
                    ?action_info,
2715
                    "Worker received action",
2716
                );
2717
                // Wait for any previous cleanup to complete before creating directory
2718
26
                self.wait_for_cleanup_if_needed(&operation_id).await
?0
;
2719
26
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
2720
26
                let execution_metadata = ExecutionMetadata {
2721
26
                    worker: worker_id,
2722
26
                    queued_timestamp: action_info.insert_timestamp,
2723
26
                    worker_start_timestamp: action_info.load_timestamp,
2724
26
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
2725
26
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
2726
26
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
2727
26
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
2728
26
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
2729
26
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
2730
26
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
2731
26
                };
2732
26
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
2733
23
                    self.max_action_timeout
2734
                } else {
2735
3
                    action_info.timeout
2736
                };
2737
26
                if timeout > self.max_action_timeout {
2738
1
                    return Err(make_err!(
2739
1
                        Code::InvalidArgument,
2740
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
2741
1
                        timeout.as_secs_f32(),
2742
1
                        self.max_action_timeout.as_secs_f32()
2743
1
                    ));
2744
25
                }
2745
25
                let running_action = Arc::new(RunningActionImpl::new(
2746
25
                    execution_metadata,
2747
25
                    operation_id.clone(),
2748
25
                    action_directory,
2749
25
                    action_info,
2750
25
                    timeout,
2751
25
                    self.clone(),
2752
                ));
2753
                {
2754
25
                    let mut running_actions = self.running_actions.lock();
2755
                    // Check if action already exists and is still alive
2756
25
                    if let Some(
existing_weak0
) = running_actions.get(&operation_id)
2757
0
                        && let Some(_existing_action) = existing_weak.upgrade() {
2758
0
                            return Err(make_err!(
2759
0
                                Code::AlreadyExists,
2760
0
                                "Action with operation_id {} is already running",
2761
0
                                operation_id
2762
0
                            ));
2763
25
                    }
2764
25
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
2765
                }
2766
25
                Ok(running_action)
2767
26
            })
2768
26
            .await
2769
26
    }
2770
2771
6
    async fn cache_action_result(
2772
6
        &self,
2773
6
        action_info: DigestInfo,
2774
6
        action_result: &mut ActionResult,
2775
6
        hasher: DigestHasherFunc,
2776
6
    ) -> Result<(), Error> {
2777
6
        self.metrics
2778
6
            .cache_action_result
2779
6
            .wrap(self.upload_action_results.cache_action_result(
2780
6
                action_info,
2781
6
                action_result,
2782
6
                hasher,
2783
6
            ))
2784
6
            .await
2785
6
    }
2786
2787
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
2788
0
        let running_action = {
2789
0
            let running_actions = self.running_actions.lock();
2790
0
            running_actions
2791
0
                .get(operation_id)
2792
0
                .and_then(Weak::upgrade)
2793
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
2794
        };
2795
0
        Self::kill_operation(running_action).await;
2796
0
        Ok(())
2797
0
    }
2798
2799
    // Note: When the future returns the process should be fully killed and cleaned up.
2800
2
    
async fn kill_all(&self)0
{
2801
2
        self.metrics
2802
2
            .kill_all
2803
2
            .wrap_no_capture_result(async move {
2804
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
2805
2
                    let running_actions = self.running_actions.lock();
2806
2
                    running_actions
2807
2
                        .iter()
2808
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
2809
2
                        .collect()
2810
                };
2811
2
                let mut kill_futures: FuturesUnordered<_> = kill_operations
2812
2
                    .into_iter()
2813
2
                    .map(Self::kill_operation)
2814
2
                    .collect();
2815
4
                while kill_futures.next().await.is_some() 
{}2
2816
2
            })
2817
2
            .await;
2818
        // Ignore error. If error happens it means there's no sender, which is not a problem.
2819
        // Note: Sanity check this API will always check current value then future values:
2820
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
2821
2
        drop(
2822
2
            self.action_done_tx
2823
2
                .subscribe()
2824
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
2825
2
                .await,
2826
        );
2827
2
    }
2828
2829
    #[inline]
2830
2
    fn metrics(&self) -> &Arc<Metrics> {
2831
2
        &self.metrics
2832
2
    }
2833
}
2834
2835
#[derive(Debug, Default, MetricsComponent)]
2836
pub struct Metrics {
2837
    #[metric(help = "Stats about the create_and_add_action command.")]
2838
    create_and_add_action: AsyncCounterWrapper,
2839
    #[metric(help = "Stats about the cache_action_result command.")]
2840
    cache_action_result: AsyncCounterWrapper,
2841
    #[metric(help = "Stats about the kill_all command.")]
2842
    kill_all: AsyncCounterWrapper,
2843
    #[metric(help = "Stats about the create_action_info command.")]
2844
    create_action_info: AsyncCounterWrapper,
2845
    #[metric(help = "Stats about the make_work_directory command.")]
2846
    make_action_directory: AsyncCounterWrapper,
2847
    #[metric(help = "Stats about the prepare_action command.")]
2848
    prepare_action: AsyncCounterWrapper,
2849
    #[metric(help = "Stats about the execute command.")]
2850
    execute: AsyncCounterWrapper,
2851
    #[metric(help = "Stats about the upload_results command.")]
2852
    upload_results: AsyncCounterWrapper,
2853
    #[metric(help = "Stats about the cleanup command.")]
2854
    cleanup: AsyncCounterWrapper,
2855
    #[metric(help = "Stats about the get_finished_result command.")]
2856
    get_finished_result: AsyncCounterWrapper,
2857
    #[metric(help = "Number of times an action waited for cleanup to complete.")]
2858
    cleanup_waits: CounterWithTime,
2859
    #[metric(help = "Number of stale directories removed during action retries.")]
2860
    stale_removals: CounterWithTime,
2861
    #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2862
    cleanup_wait_timeouts: CounterWithTime,
2863
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
2864
    get_proto_command_from_store: AsyncCounterWrapper,
2865
    #[metric(help = "Stats about the download_to_directory command.")]
2866
    download_to_directory: AsyncCounterWrapper,
2867
    #[metric(help = "Stats about the prepare_output_files command.")]
2868
    prepare_output_files: AsyncCounterWrapper,
2869
    #[metric(help = "Stats about the prepare_output_paths command.")]
2870
    prepare_output_paths: AsyncCounterWrapper,
2871
    #[metric(help = "Stats about the child_process command.")]
2872
    child_process: AsyncCounterWrapper,
2873
    #[metric(help = "Stats about the child_process_success_error_code command.")]
2874
    child_process_success_error_code: CounterWithTime,
2875
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
2876
    child_process_failure_error_code: CounterWithTime,
2877
    #[metric(help = "Total time spent uploading stdout.")]
2878
    upload_stdout: AsyncCounterWrapper,
2879
    #[metric(help = "Total time spent uploading stderr.")]
2880
    upload_stderr: AsyncCounterWrapper,
2881
    #[metric(help = "Total number of task timeouts.")]
2882
    task_timeouts: CounterWithTime,
2883
}