Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::vec_deque::VecDeque;
23
use std::collections::{HashMap, HashSet};
24
use std::ffi::{OsStr, OsString};
25
#[cfg(target_family = "unix")]
26
use std::fs::Permissions;
27
#[cfg(target_family = "unix")]
28
use std::os::unix::fs::{MetadataExt, PermissionsExt};
29
use std::path::{Path, PathBuf};
30
use std::process::Stdio;
31
use std::sync::{Arc, Weak};
32
use std::time::SystemTime;
33
34
use bytes::{Bytes, BytesMut};
35
use filetime::{FileTime, set_file_mtime};
36
use formatx::Template;
37
use futures::future::{
38
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
39
};
40
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
41
use nativelink_config::cas_server::{
42
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
43
};
44
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
45
use nativelink_metric::MetricsComponent;
46
use nativelink_proto::build::bazel::remote::execution::v2::{
47
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
48
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
49
    Tree as ProtoTree, UpdateActionResultRequest,
50
};
51
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
52
    HistoricalExecuteResponse, StartExecute,
53
};
54
use nativelink_store::ac_utils::{
55
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
56
};
57
use nativelink_store::fast_slow_store::FastSlowStore;
58
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
59
use nativelink_store::grpc_store::GrpcStore;
60
use nativelink_util::action_messages::{
61
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
62
    SymlinkInfo, to_execute_response,
63
};
64
use nativelink_util::common::{DigestInfo, fs};
65
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
66
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
67
use nativelink_util::shutdown_guard::ShutdownGuard;
68
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
69
use nativelink_util::{background_spawn, spawn, spawn_blocking};
70
use parking_lot::Mutex;
71
use prost::Message;
72
use relative_path::RelativePath;
73
use scopeguard::{ScopeGuard, guard};
74
use serde::Deserialize;
75
use tokio::io::{AsyncReadExt, AsyncSeekExt};
76
use tokio::process;
77
use tokio::sync::{Notify, oneshot, watch};
78
use tokio::time::Instant;
79
use tokio_stream::wrappers::ReadDirStream;
80
use tonic::Request;
81
use tracing::{debug, error, info, trace, warn};
82
use uuid::Uuid;
83
84
/// For simplicity we use a fixed exit code for cases when our program is terminated
85
/// due to a signal.
86
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
87
88
/// Default strategy for uploading historical results.
89
/// Note: If this value changes the config documentation
90
/// should reflect it.
91
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
92
    UploadCacheResultsStrategy::FailuresOnly;
93
94
/// Valid string reasons for a failure.
95
/// Note: If these change, the documentation should be updated.
96
#[derive(Debug, Deserialize)]
97
#[serde(rename_all = "snake_case")]
98
enum SideChannelFailureReason {
99
    /// Task should be considered timed out.
100
    Timeout,
101
}
102
103
/// This represents the json data that can be passed from the running process
104
/// to the parent via the `SideChannelFile`. See:
105
/// `config::EnvironmentSource::sidechannelfile` for more details.
106
/// Note: Any fields added here must be added to the documentation.
107
#[derive(Debug, Deserialize, Default)]
108
struct SideChannelInfo {
109
    /// If the task should be considered a failure and why.
110
    failure: Option<SideChannelFailureReason>,
111
}
112
113
/// Aggressively download the digests of files and make a local folder from it. This function
114
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
115
/// should be rate limited if spawning too many requests at once is an issue.
116
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
117
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
118
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
119
/// to a new location.
120
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
121
// of the future. So we need to force this function to return a dynamic future instead.
122
// see: https://github.com/rust-lang/rust/issues/78649
123
25
pub fn download_to_directory<'a>(
124
25
    cas_store: &'a FastSlowStore,
125
25
    filesystem_store: Pin<&'a FilesystemStore>,
126
25
    digest: &'a DigestInfo,
127
25
    current_directory: &'a str,
128
25
) -> BoxFuture<'a, Result<(), Error>> {
129
25
    async move {
130
25
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
131
25
            .await
132
25
            .err_tip(|| "Converting digest to Directory")
?0
;
133
25
        let mut futures = FuturesUnordered::new();
134
135
29
        for 
file4
in directory.files {
136
4
            let digest: DigestInfo = file
137
4
                .digest
138
4
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
139
4
                .try_into()
140
4
                .err_tip(|| "In Directory::file::digest")
?0
;
141
4
            let dest = format!("{}/{}", current_directory, file.name);
142
4
            let (mtime, mut unix_mode) = match file.node_properties {
143
1
                Some(properties) => (properties.mtime, properties.unix_mode),
144
3
                None => (None, None),
145
            };
146
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
147
4
            if file.is_executable {
  Branch (147:16): [True: 1, False: 3]
  Branch (147:16): [Folded - Ignored]
148
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
149
3
            }
150
4
            futures.push(
151
4
                cas_store
152
4
                    .populate_fast_store(digest.into())
153
4
                    .and_then(move |()| async move {
154
4
                        let file_entry = filesystem_store
155
4
                            .get_file_entry_for_digest(&digest)
156
4
                            .await
157
4
                            .err_tip(|| "During hard link")
?0
;
158
4
                        file_entry
159
4
                            .get_file_path_locked(|src| fs::hard_link(src, &dest))
160
4
                            .await
161
4
                            .map_err(|e| 
{0
162
0
                                if e.code == Code::NotFound {
  Branch (162:36): [True: 0, False: 0]
  Branch (162:36): [Folded - Ignored]
163
0
                                    make_err!(
164
0
                                        Code::Internal,
165
                                        "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\
166
                                        This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
167
                                        To fix this issue:\n\
168
                                        1. Increase the 'max_bytes' value in your filesystem store configuration\n\
169
                                        2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
170
                                        3. The setting is typically found in your nativelink.json config under:\n\
171
                                           stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
172
                                        4. Restart NativeLink after making the change\n\n\
173
                                        If this error persists after increasing max_bytes several times, please report at:\n\
174
                                        https://github.com/TraceMachina/nativelink/issues\n\
175
                                        Include your config file and both server and client logs to help us assist you."
176
                                    )
177
                                } else {
178
0
                                    make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
179
                                }
180
0
                            })?;
181
                        #[cfg(target_family = "unix")]
182
4
                        if let Some(
unix_mode1
) = unix_mode {
  Branch (182:32): [True: 1, False: 3]
  Branch (182:32): [Folded - Ignored]
183
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
184
1
                                .await
185
1
                                .err_tip(|| 
{0
186
0
                                    format!(
187
0
                                        "Could not set unix mode in download_to_directory {dest}"
188
                                    )
189
0
                                })?;
190
3
                        }
191
4
                        if let Some(
mtime1
) = mtime {
  Branch (191:32): [True: 1, False: 3]
  Branch (191:32): [Folded - Ignored]
192
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
193
1
                                set_file_mtime(
194
1
                                    &dest,
195
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
196
                                )
197
1
                                .err_tip(|| 
{0
198
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
199
0
                                })
200
1
                            })
201
1
                            .await
202
1
                            .err_tip(
203
                                || "Failed to launch spawn_blocking in download_to_directory",
204
0
                            )??;
205
3
                        }
206
4
                        Ok(())
207
8
                    })
208
4
                    .map_err(move |e| 
e0
.
append0
(
format!0
(
"for digest {digest}"0
)))
209
4
                    .boxed(),
210
            );
211
        }
212
213
32
        for 
directory7
in directory.directories {
214
7
            let digest: DigestInfo = directory
215
7
                .digest
216
7
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
217
7
                .try_into()
218
7
                .err_tip(|| "In Directory::file::digest")
?0
;
219
7
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
220
7
            futures.push(
221
7
                async move {
222
7
                    fs::create_dir(&new_directory_path)
223
7
                        .await
224
7
                        .err_tip(|| format!(
"Could not create directory {new_directory_path}"0
))
?0
;
225
7
                    download_to_directory(
226
7
                        cas_store,
227
7
                        filesystem_store,
228
7
                        &digest,
229
7
                        &new_directory_path,
230
7
                    )
231
7
                    .await
232
7
                    .err_tip(|| format!(
"in download_to_directory : {new_directory_path}"0
))
?0
;
233
7
                    Ok(())
234
7
                }
235
7
                .boxed(),
236
            );
237
        }
238
239
        #[cfg(target_family = "unix")]
240
26
        for 
symlink_node1
in directory.symlinks {
241
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
242
1
            futures.push(
243
1
                async move {
244
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
245
0
                        format!(
246
0
                            "Could not create symlink {} -> {}",
247
                            symlink_node.target, dest
248
                        )
249
0
                    })?;
250
1
                    Ok(())
251
1
                }
252
1
                .boxed(),
253
            );
254
        }
255
256
37
        while futures.try_next().await
?0
.is_some()
{}12
  Branch (256:15): [True: 12, False: 25]
  Branch (256:15): [Folded - Ignored]
257
25
        Ok(())
258
25
    }
259
25
    .boxed()
260
25
}
261
262
#[cfg(target_family = "windows")]
263
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
264
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
265
    EXECUTABLE_EXTENSIONS
266
        .iter()
267
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
268
}
269
270
#[cfg(target_family = "unix")]
271
7
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
272
7
    (metadata.mode() & 0o111) != 0
273
7
}
274
275
#[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
276
7
async fn upload_file(
277
7
    cas_store: Pin<&impl StoreLike>,
278
7
    full_path: impl AsRef<Path> + Debug,
279
7
    hasher: DigestHasherFunc,
280
7
    metadata: std::fs::Metadata,
281
7
) -> Result<FileInfo, Error> {
282
7
    let is_executable = is_executable(&metadata, &full_path);
283
7
    let file_size = metadata.len();
284
7
    let file = fs::open_file(&full_path, 0, u64::MAX)
285
7
        .await
286
7
        .err_tip(|| format!(
"Could not open file {full_path:?}"0
))
?0
;
287
288
7
    let (digest, mut file) = hasher
289
7
        .hasher()
290
7
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
291
7
        .await
292
7
        .err_tip(|| format!(
"Failed to hash file in digest_for_file failed for {full_path:?}"0
))
?0
;
293
294
7
    file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
295
296
    // Note: For unknown reasons we appear to be hitting:
297
    // https://github.com/rust-lang/rust/issues/92096
298
    // or a smiliar issue if we try to use the non-store driver function, so we
299
    // are using the store driver function here.
300
7
    cas_store
301
7
        .as_store_driver_pin()
302
7
        .update_with_whole_file(
303
7
            digest.into(),
304
7
            full_path.as_ref().into(),
305
7
            file,
306
7
            UploadSizeInfo::ExactSize(digest.size_bytes()),
307
7
        )
308
7
        .await
309
7
        .err_tip(|| format!(
"for {full_path:?}"0
))
?0
;
310
311
7
    let name = full_path
312
7
        .as_ref()
313
7
        .file_name()
314
7
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
315
7
        .to_str()
316
7
        .err_tip(|| 
{0
317
0
            make_err!(
318
0
                Code::Internal,
319
                "Could not convert {:?} to string",
320
                full_path
321
            )
322
0
        })?
323
7
        .to_string();
324
325
7
    Ok(FileInfo {
326
7
        name_or_path: NameOrPath::Name(name),
327
7
        digest,
328
7
        is_executable,
329
7
    })
330
7
}
331
332
2
async fn upload_symlink(
333
2
    full_path: impl AsRef<Path> + Debug,
334
2
    full_work_directory_path: impl AsRef<Path>,
335
2
) -> Result<SymlinkInfo, Error> {
336
2
    let full_target_path = fs::read_link(full_path.as_ref())
337
2
        .await
338
2
        .err_tip(|| format!(
"Could not get read_link path of {full_path:?}"0
))
?0
;
339
340
    // Detect if our symlink is inside our work directory, if it is find the
341
    // relative path otherwise use the absolute path.
342
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
  Branch (342:21): [Folded - Ignored]
  Branch (342:21): [Folded - Ignored]
  Branch (342:21): [True: 0, False: 1]
  Branch (342:21): [True: 0, False: 1]
343
0
        let full_target_path = RelativePath::from_path(&full_target_path)
344
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
345
0
        RelativePath::from_path(full_work_directory_path.as_ref())
346
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
347
0
            .relative(full_target_path)
348
0
            .normalize()
349
0
            .into_string()
350
    } else {
351
2
        full_target_path
352
2
            .to_str()
353
2
            .err_tip(|| 
{0
354
0
                make_err!(
355
0
                    Code::Internal,
356
                    "Could not convert '{:?}' to string",
357
                    full_target_path
358
                )
359
0
            })?
360
2
            .to_string()
361
    };
362
363
2
    let name = full_path
364
2
        .as_ref()
365
2
        .file_name()
366
2
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
367
2
        .to_str()
368
2
        .err_tip(|| 
{0
369
0
            make_err!(
370
0
                Code::Internal,
371
                "Could not convert {:?} to string",
372
                full_path
373
            )
374
0
        })?
375
2
        .to_string();
376
377
2
    Ok(SymlinkInfo {
378
2
        name_or_path: NameOrPath::Name(name),
379
2
        target,
380
2
    })
381
2
}
382
383
3
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
384
3
    cas_store: Pin<&'a impl StoreLike>,
385
3
    full_dir_path: P,
386
3
    full_work_directory: &'a str,
387
3
    hasher: DigestHasherFunc,
388
3
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
389
3
    Box::pin(async move {
390
3
        let file_futures = FuturesUnordered::new();
391
3
        let dir_futures = FuturesUnordered::new();
392
3
        let symlink_futures = FuturesUnordered::new();
393
        {
394
3
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
395
3
                .await
396
3
                .err_tip(|| format!(
"Error reading dir for reading {full_dir_path:?}"0
))
?0
397
3
                .into_inner();
398
3
            let mut dir_stream = ReadDirStream::new(dir_handle);
399
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
400
            // lived as possible. This is why we iterate the directory and then build a bunch of
401
            // futures with all the work we are wanting to do then execute it. It allows us to
402
            // close the directory iterator file descriptor, then open the child files/folders.
403
8
            while let Some(
entry_result5
) = dir_stream.next().await {
  Branch (403:23): [Folded - Ignored]
  Branch (403:23): [Folded - Ignored]
  Branch (403:23): [True: 1, False: 1]
  Branch (403:23): [True: 4, False: 2]
404
5
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
405
5
                let file_type = entry
406
5
                    .file_type()
407
5
                    .await
408
5
                    .err_tip(|| format!(
"Error running file_type() on {entry:?}"0
))
?0
;
409
5
                let full_path = full_dir_path.as_ref().join(entry.path());
410
5
                if file_type.is_dir() {
  Branch (410:20): [Folded - Ignored]
  Branch (410:20): [Folded - Ignored]
  Branch (410:20): [True: 0, False: 1]
  Branch (410:20): [True: 1, False: 3]
411
1
                    let full_dir_path = full_dir_path.clone();
412
1
                    dir_futures.push(
413
1
                        upload_directory(cas_store, full_path.clone(), full_work_directory, hasher)
414
1
                            .and_then(|(dir, all_dirs)| async move {
415
1
                                let directory_name = full_path
416
1
                                    .file_name()
417
1
                                    .err_tip(|| 
{0
418
0
                                        format!("Expected file_name to exist on {full_dir_path:?}")
419
0
                                    })?
420
1
                                    .to_str()
421
1
                                    .err_tip(|| 
{0
422
0
                                        make_err!(
423
0
                                            Code::Internal,
424
                                            "Could not convert {:?} to string",
425
                                            full_dir_path
426
                                        )
427
0
                                    })?
428
1
                                    .to_string();
429
430
1
                                let digest = serialize_and_upload_message(
431
1
                                    &dir,
432
1
                                    cas_store,
433
1
                                    &mut hasher.hasher(),
434
1
                                )
435
1
                                .await
436
1
                                .err_tip(|| format!(
"for {}"0
,
full_path.display()0
))
?0
;
437
438
1
                                Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
439
1
                                    DirectoryNode {
440
1
                                        name: directory_name,
441
1
                                        digest: Some(digest.into()),
442
1
                                    },
443
1
                                    all_dirs,
444
1
                                ))
445
2
                            })
446
1
                            .boxed(),
447
                    );
448
4
                } else if file_type.is_file() {
  Branch (448:27): [Folded - Ignored]
  Branch (448:27): [Folded - Ignored]
  Branch (448:27): [True: 0, False: 1]
  Branch (448:27): [True: 3, False: 0]
449
3
                    file_futures.push(async move {
450
3
                        let metadata = fs::metadata(&full_path)
451
3
                            .await
452
3
                            .err_tip(|| format!(
"Could not open file {}"0
,
full_path.display()0
))
?0
;
453
3
                        upload_file(cas_store, &full_path, hasher, metadata)
454
3
                            .map_ok(TryInto::try_into)
455
3
                            .await
?0
456
3
                    });
457
1
                } else if file_type.is_symlink() {
  Branch (457:27): [Folded - Ignored]
  Branch (457:27): [Folded - Ignored]
  Branch (457:27): [True: 1, False: 0]
  Branch (457:27): [True: 0, False: 0]
458
1
                    symlink_futures.push(
459
1
                        upload_symlink(full_path, &full_work_directory)
460
1
                            .map(|symlink| symlink
?0
.try_into()),
461
                    );
462
0
                }
463
            }
464
        }
465
466
3
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
467
3
            file_futures.try_collect::<Vec<FileNode>>(),
468
3
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
469
3
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
470
3
        )
471
3
        .await
?0
;
472
473
3
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
474
        // For efficiency we use a deque because it allows cheap concat of Vecs.
475
        // We make the assumption here that when performance is important it is because
476
        // our directory is quite large. This allows us to cheaply merge large amounts of
477
        // directories into one VecDeque. Then after we are done we need to collapse it
478
        // down into a single Vec.
479
3
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
480
4
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
481
1
            directory_nodes.push(directory_node);
482
1
            all_child_directories.append(&mut recursive_child_directories);
483
1
        }
484
485
3
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
486
3
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
487
3
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
488
489
3
        let directory = Directory {
490
3
            files: file_nodes,
491
3
            directories: directory_nodes,
492
3
            symlinks,
493
3
            node_properties: None, // We don't support file properties.
494
3
        };
495
3
        all_child_directories.push_back(directory.clone());
496
497
3
        Ok((directory, all_child_directories))
498
3
    })
499
3
}
500
501
0
async fn process_side_channel_file(
502
0
    side_channel_file: Cow<'_, OsStr>,
503
0
    args: &[&OsStr],
504
0
    timeout: Duration,
505
0
) -> Result<Option<Error>, Error> {
506
0
    let mut json_contents = String::new();
507
    {
508
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
509
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
510
0
            Ok(file_slot) => file_slot,
511
0
            Err(e) => {
512
0
                if e.code != Code::NotFound {
  Branch (512:20): [Folded - Ignored]
  Branch (512:20): [Folded - Ignored]
  Branch (512:20): [True: 0, False: 0]
513
0
                    return Err(e).err_tip(|| "Error opening side channel file");
514
0
                }
515
                // Note: If file does not exist, it's ok. Users are not required to create this file.
516
0
                return Ok(None);
517
            }
518
        };
519
0
        file_slot
520
0
            .read_to_string(&mut json_contents)
521
0
            .await
522
0
            .err_tip(|| "Error reading side channel file")?;
523
    }
524
525
0
    let side_channel_info: SideChannelInfo =
526
0
        serde_json5::from_str(&json_contents).map_err(|e| {
527
0
            make_input_err!(
528
                "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}"
529
            )
530
0
        })?;
531
0
    Ok(side_channel_info.failure.map(|failure| match failure {
532
0
        SideChannelFailureReason::Timeout => Error::new(
533
0
            Code::DeadlineExceeded,
534
0
            format!(
535
0
                "Command '{}' timed out after {} seconds",
536
0
                args.join(OsStr::new(" ")).to_string_lossy(),
537
0
                timeout.as_secs_f32()
538
            ),
539
        ),
540
0
    }))
541
0
}
542
543
17
async fn do_cleanup(
544
17
    running_actions_manager: &RunningActionsManagerImpl,
545
17
    operation_id: &OperationId,
546
17
    action_directory: &str,
547
17
) -> Result<(), Error> {
548
    // Mark this operation as being cleaned up
549
17
    {
550
17
        let mut cleaning = running_actions_manager.cleaning_up_operations.lock();
551
17
        cleaning.insert(operation_id.clone());
552
17
    }
553
554
17
    debug!("Worker cleaning up");
555
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
556
17
    let remove_dir_result = fs::remove_dir_all(action_directory)
557
17
        .await
558
17
        .err_tip(|| format!(
"Could not remove working directory {action_directory}"0
));
559
560
17
    let cleanup_result = if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
  Branch (560:33): [True: 0, False: 0]
  Branch (560:33): [Folded - Ignored]
  Branch (560:33): [True: 0, False: 17]
561
0
        error!(?operation_id, ?err, "Error cleaning up action");
562
0
        Result::<(), Error>::Err(err).merge(remove_dir_result)
563
17
    } else if let Err(
err0
) = remove_dir_result {
  Branch (563:19): [True: 0, False: 0]
  Branch (563:19): [Folded - Ignored]
  Branch (563:19): [True: 0, False: 17]
564
0
        error!(?operation_id, ?err, "Error removing working directory");
565
0
        Err(err)
566
    } else {
567
17
        Ok(())
568
    };
569
570
    // Remove from cleaning set and notify waiters
571
17
    {
572
17
        let mut cleaning = running_actions_manager.cleaning_up_operations.lock();
573
17
        cleaning.remove(operation_id);
574
17
    }
575
17
    running_actions_manager
576
17
        .cleanup_complete_notify
577
17
        .notify_waiters();
578
579
17
    cleanup_result
580
17
}
581
582
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
583
    /// Returns the action id of the action.
584
    fn get_operation_id(&self) -> &OperationId;
585
586
    /// Anything that needs to execute before the actions is actually executed should happen here.
587
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
588
589
    /// Actually perform the execution of the action.
590
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
591
592
    /// Any uploading, processing or analyzing of the results should happen here.
593
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
594
595
    /// Cleanup any residual files, handles or other junk resulting from running the action.
596
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
597
598
    /// Returns the final result. As a general rule this action should be thought of as
599
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
600
    /// is over and any action performed on it after this call is undefined behavior.
601
    fn get_finished_result(
602
        self: Arc<Self>,
603
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
604
605
    /// Returns the work directory of the action.
606
    fn get_work_directory(&self) -> &String;
607
}
608
609
#[derive(Debug)]
610
struct RunningActionImplExecutionResult {
611
    stdout: Bytes,
612
    stderr: Bytes,
613
    exit_code: i32,
614
}
615
616
#[derive(Debug)]
617
struct RunningActionImplState {
618
    command_proto: Option<ProtoCommand>,
619
    // TODO(aaronmondal) Kill is not implemented yet, but is instrumented.
620
    // However, it is used if the worker disconnects to destroy current jobs.
621
    kill_channel_tx: Option<oneshot::Sender<()>>,
622
    kill_channel_rx: Option<oneshot::Receiver<()>>,
623
    execution_result: Option<RunningActionImplExecutionResult>,
624
    action_result: Option<ActionResult>,
625
    execution_metadata: ExecutionMetadata,
626
    // If there was an internal error, this will be set.
627
    // This should NOT be set if everything was fine, but the process had a
628
    // non-zero exit code. Instead this should be used for internal errors
629
    // that prevented the action from running, upload failures, timeouts, exc...
630
    // but we have (or could have) the action results (like stderr/stdout).
631
    error: Option<Error>,
632
}
633
634
#[derive(Debug)]
635
pub struct RunningActionImpl {
636
    operation_id: OperationId,
637
    action_directory: String,
638
    work_directory: String,
639
    action_info: ActionInfo,
640
    timeout: Duration,
641
    running_actions_manager: Arc<RunningActionsManagerImpl>,
642
    state: Mutex<RunningActionImplState>,
643
    did_cleanup: AtomicBool,
644
}
645
646
impl RunningActionImpl {
647
18
    fn new(
648
18
        execution_metadata: ExecutionMetadata,
649
18
        operation_id: OperationId,
650
18
        action_directory: String,
651
18
        action_info: ActionInfo,
652
18
        timeout: Duration,
653
18
        running_actions_manager: Arc<RunningActionsManagerImpl>,
654
18
    ) -> Self {
655
18
        let work_directory = format!("{}/{}", action_directory, "work");
656
18
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
657
18
        Self {
658
18
            operation_id,
659
18
            action_directory,
660
18
            work_directory,
661
18
            action_info,
662
18
            timeout,
663
18
            running_actions_manager,
664
18
            state: Mutex::new(RunningActionImplState {
665
18
                command_proto: None,
666
18
                kill_channel_rx: Some(kill_channel_rx),
667
18
                kill_channel_tx: Some(kill_channel_tx),
668
18
                execution_result: None,
669
18
                action_result: None,
670
18
                execution_metadata,
671
18
                error: None,
672
18
            }),
673
18
            did_cleanup: AtomicBool::new(false),
674
18
        }
675
18
    }
676
677
    #[allow(
678
        clippy::missing_const_for_fn,
679
        reason = "False positive on stable, but not on nightly"
680
    )]
681
0
    fn metrics(&self) -> &Arc<Metrics> {
682
0
        &self.running_actions_manager.metrics
683
0
    }
684
685
    /// Prepares any actions needed to execution this action. This action will do the following:
686
    ///
687
    /// * Download any files needed to execute the action
688
    /// * Build a folder with all files needed to execute the action.
689
    ///
690
    /// This function will aggressively download and spawn potentially thousands of futures. It is
691
    /// up to the stores to rate limit if needed.
692
15
    async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error> {
693
15
        {
694
15
            let mut state = self.state.lock();
695
15
            state.execution_metadata.input_fetch_start_timestamp =
696
15
                (self.running_actions_manager.callbacks.now_fn)();
697
15
        }
698
15
        let command = {
699
            // Download and build out our input files/folders. Also fetch and decode our Command.
700
15
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
701
15
                get_and_decode_digest::<ProtoCommand>(
702
15
                    self.running_actions_manager.cas_store.as_ref(),
703
15
                    self.action_info.command_digest.into(),
704
15
                )
705
15
                .await
706
15
                .err_tip(|| "Converting command_digest to Command")
707
15
            });
708
15
            let filesystem_store_pin =
709
15
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
710
15
            let (command, ()) = try_join(command_fut, async {
711
15
                fs::create_dir(&self.work_directory)
712
15
                    .await
713
15
                    .err_tip(|| format!(
"Error creating work directory {}"0
,
self.work_directory0
))
?0
;
714
                // Download the input files/folder and place them into the temp directory.
715
15
                self.metrics()
716
15
                    .download_to_directory
717
15
                    .wrap(download_to_directory(
718
15
                        &self.running_actions_manager.cas_store,
719
15
                        filesystem_store_pin,
720
15
                        &self.action_info.input_root_digest,
721
15
                        &self.work_directory,
722
15
                    ))
723
15
                    .await
724
15
            })
725
15
            .await
?0
;
726
15
            command
727
        };
728
        {
729
            // Create all directories needed for our output paths. This is required by the bazel spec.
730
15
            let prepare_output_directories = |output_file| 
{9
731
9
                let full_output_path = if command.working_directory.is_empty() {
  Branch (731:43): [Folded - Ignored]
  Branch (731:43): [Folded - Ignored]
  Branch (731:43): [True: 1, False: 8]
732
1
                    format!("{}/{}", self.work_directory, output_file)
733
                } else {
734
8
                    format!(
735
8
                        "{}/{}/{}",
736
8
                        self.work_directory, command.working_directory, output_file
737
                    )
738
                };
739
9
                async move {
740
9
                    let full_parent_path = Path::new(&full_output_path)
741
9
                        .parent()
742
9
                        .err_tip(|| format!(
"Parent path for {full_output_path} has no parent"0
))
?0
;
743
9
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
744
0
                        format!(
745
0
                            "Error creating output directory {} (file)",
746
0
                            full_parent_path.display()
747
                        )
748
0
                    })?;
749
9
                    Result::<(), Error>::Ok(())
750
9
                }
751
9
            };
752
15
            self.metrics()
753
15
                .prepare_output_files
754
15
                .wrap(try_join_all(
755
15
                    command.output_files.iter().map(prepare_output_directories),
756
15
                ))
757
15
                .await
?0
;
758
15
            self.metrics()
759
15
                .prepare_output_paths
760
15
                .wrap(try_join_all(
761
15
                    command.output_paths.iter().map(prepare_output_directories),
762
15
                ))
763
15
                .await
?0
;
764
        }
765
15
        debug!(?command, "Worker received command",);
766
15
        {
767
15
            let mut state = self.state.lock();
768
15
            state.command_proto = Some(command);
769
15
            state.execution_metadata.input_fetch_completed_timestamp =
770
15
                (self.running_actions_manager.callbacks.now_fn)();
771
15
        }
772
15
        Ok(self)
773
15
    }
774
775
13
    
async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
776
13
        let (command_proto, mut kill_channel_rx) = {
777
13
            let mut state = self.state.lock();
778
13
            state.execution_metadata.execution_start_timestamp =
779
13
                (self.running_actions_manager.callbacks.now_fn)();
780
            (
781
13
                state
782
13
                    .command_proto
783
13
                    .take()
784
13
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
785
13
                state
786
13
                    .kill_channel_rx
787
13
                    .take()
788
13
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
789
                    // This is important as we may be killed at any point.
790
13
                    .fuse(),
791
            )
792
        };
793
13
        if command_proto.arguments.is_empty() {
  Branch (793:12): [Folded - Ignored]
  Branch (793:12): [Folded - Ignored]
  Branch (793:12): [True: 0, False: 13]
794
0
            return Err(make_input_err!("No arguments provided in Command proto"));
795
13
        }
796
13
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
  Branch (796:40): [Folded - Ignored]
  Branch (796:40): [Folded - Ignored]
  Branch (796:40): [True: 0, False: 13]
797
13
            .running_actions_manager
798
13
            .execution_configuration
799
13
            .entrypoint
800
        {
801
0
            core::iter::once(entrypoint.as_ref())
802
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
803
0
                .collect()
804
        } else {
805
13
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
806
        };
807
        // TODO(aaronmondal): This should probably be in debug, but currently
808
        //                    that's too busy and we often rely on this to
809
        //                    figure out toolchain misconfiguration issues.
810
        //                    De-bloat the `debug` level by using the `trace`
811
        //                    level more effectively and adjust this.
812
13
        info!(?args, "Executing command",);
813
13
        let mut command_builder = process::Command::new(args[0]);
814
13
        command_builder
815
13
            .args(&args[1..])
816
13
            .kill_on_drop(true)
817
13
            .stdin(Stdio::null())
818
13
            .stdout(Stdio::piped())
819
13
            .stderr(Stdio::piped())
820
13
            .current_dir(format!(
821
13
                "{}/{}",
822
13
                self.work_directory, command_proto.working_directory
823
13
            ))
824
13
            .env_clear();
825
826
13
        let requested_timeout = if self.action_info.timeout.is_zero() {
  Branch (826:36): [Folded - Ignored]
  Branch (826:36): [Folded - Ignored]
  Branch (826:36): [True: 11, False: 2]
827
11
            self.running_actions_manager.max_action_timeout
828
        } else {
829
2
            self.action_info.timeout
830
        };
831
832
13
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
833
13
        if let Some(
additional_environment0
) = &self
  Branch (833:16): [Folded - Ignored]
  Branch (833:16): [Folded - Ignored]
  Branch (833:16): [True: 0, False: 13]
834
13
            .running_actions_manager
835
13
            .execution_configuration
836
13
            .additional_environment
837
        {
838
0
            for (name, source) in additional_environment {
839
0
                let value = match source {
840
0
                    EnvironmentSource::Property(property) => self
841
0
                        .action_info
842
0
                        .platform_properties
843
0
                        .get(property)
844
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
845
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
846
                    EnvironmentSource::TimeoutMillis => {
847
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
848
                    }
849
                    EnvironmentSource::SideChannelFile => {
850
0
                        let file_cow =
851
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
852
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
853
0
                        Cow::Owned(file_cow)
854
                    }
855
                    EnvironmentSource::ActionDirectory => {
856
0
                        Cow::Borrowed(self.action_directory.as_str())
857
                    }
858
                };
859
0
                command_builder.env(name, value.as_ref());
860
            }
861
13
        }
862
863
        #[cfg(target_family = "unix")]
864
13
        let envs = &command_proto.environment_variables;
865
        // If SystemRoot is not set on windows we set it to default. Failing to do
866
        // this causes all commands to fail.
867
        #[cfg(target_family = "windows")]
868
        let envs = {
869
            let mut envs = command_proto.environment_variables.clone();
870
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
871
                envs.push(
872
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
873
                        name: "SystemRoot".to_string(),
874
                        value: "C:\\Windows".to_string(),
875
                    },
876
                );
877
            }
878
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
879
                envs.push(
880
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
881
                        name: "PATH".to_string(),
882
                        value: "C:\\Windows\\System32".to_string(),
883
                    },
884
                );
885
            }
886
            envs
887
        };
888
26
        for 
environment_variable13
in envs {
889
13
            command_builder.env(&environment_variable.name, &environment_variable.value);
890
13
        }
891
892
13
        let mut child_process = command_builder
893
13
            .spawn()
894
13
            .err_tip(|| format!(
"Could not execute command {args:?}"0
))
?0
;
895
13
        let mut stdout_reader = child_process
896
13
            .stdout
897
13
            .take()
898
13
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
899
13
        let mut stderr_reader = child_process
900
13
            .stderr
901
13
            .take()
902
13
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
903
904
13
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
905
0
            error!(
906
0
                "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
907
            );
908
0
            background_spawn!("running_actions_manager_kill_child_process", async move {
909
0
                child_process.kill().await
910
0
            });
911
0
        });
912
913
13
        let all_stdout_fut = spawn!("stdout_reader", async move {
914
13
            let mut all_stdout = BytesMut::new();
915
            loop {
916
16
                let 
sz15
= stdout_reader
917
16
                    .read_buf(&mut all_stdout)
918
16
                    .await
919
15
                    .err_tip(|| "Error reading stdout stream")
?0
;
920
15
                if sz == 0 {
  Branch (920:20): [Folded - Ignored]
  Branch (920:20): [Folded - Ignored]
  Branch (920:20): [True: 12, False: 3]
921
12
                    break; // EOF.
922
3
                }
923
            }
924
12
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
925
12
        });
926
13
        let all_stderr_fut = spawn!("stderr_reader", async move {
927
13
            let mut all_stderr = BytesMut::new();
928
            loop {
929
16
                let sz = stderr_reader
930
16
                    .read_buf(&mut all_stderr)
931
16
                    .await
932
16
                    .err_tip(|| "Error reading stderr stream")
?0
;
933
16
                if sz == 0 {
  Branch (933:20): [Folded - Ignored]
  Branch (933:20): [Folded - Ignored]
  Branch (933:20): [True: 13, False: 3]
934
13
                    break; // EOF.
935
3
                }
936
            }
937
13
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
938
13
        });
939
13
        let mut killed_action = false;
940
941
13
        let timer = self.metrics().child_process.begin_timer();
942
13
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
943
        loop {
944
17
            tokio::select! {
945
17
                () = &mut sleep_fut => {
946
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
947
2
                    killed_action = true;
948
2
                    if let Err(
err0
) = child_process_guard.start_kill() {
  Branch (948:28): [Folded - Ignored]
  Branch (948:28): [Folded - Ignored]
  Branch (948:28): [True: 0, False: 2]
949
0
                        error!(
950
                            ?err,
951
0
                            "Could not kill process in RunningActionsManager for action timeout",
952
                        );
953
2
                    }
954
2
                    {
955
2
                        let mut state = self.state.lock();
956
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
957
2
                            Code::DeadlineExceeded,
958
2
                            format!(
959
2
                                "Command '{}' timed out after {} seconds",
960
2
                                args.join(OsStr::new(" ")).to_string_lossy(),
961
2
                                self.action_info.timeout.as_secs_f32()
962
2
                            )
963
2
                        )));
964
2
                    }
965
                },
966
17
                
maybe_exit_status13
= child_process_guard.wait() => {
967
                    // Defuse our guard so it does not try to cleanup and make nessless logs.
968
13
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
969
13
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
970
                    // TODO(aaronmondal) We should implement stderr/stdout streaming to client here.
971
                    // If we get killed before the stream is started, then these will lock up.
972
                    // TODO(aaronmondal) There is a significant bug here. If we kill the action and the action creates
973
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
974
13
                    let (stdout, stderr) = if killed_action {
  Branch (974:47): [Folded - Ignored]
  Branch (974:47): [Folded - Ignored]
  Branch (974:47): [True: 4, False: 9]
975
4
                        drop(timer);
976
4
                        (Bytes::new(), Bytes::new())
977
                    } else {
978
9
                        timer.measure();
979
9
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
980
                        (
981
9
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
982
9
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
983
                        )
984
                    };
985
986
13
                    let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| 
{9
987
9
                        if exit_code == 0 {
  Branch (987:28): [Folded - Ignored]
  Branch (987:28): [Folded - Ignored]
  Branch (987:28): [True: 8, False: 1]
988
8
                            self.metrics().child_process_success_error_code.inc();
989
8
                        } else {
990
1
                            self.metrics().child_process_failure_error_code.inc();
991
1
                        }
992
9
                        exit_code
993
9
                    });
994
995
13
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
  Branch (995:55): [Folded - Ignored]
  Branch (995:55): [Folded - Ignored]
  Branch (995:55): [True: 0, False: 13]
996
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
997
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
998
                    } else {
999
13
                        None
1000
                    };
1001
13
                    {
1002
13
                        let mut state = self.state.lock();
1003
13
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
1004
13
1005
13
                        state.command_proto = Some(command_proto);
1006
13
                        state.execution_result = Some(RunningActionImplExecutionResult{
1007
13
                            stdout,
1008
13
                            stderr,
1009
13
                            exit_code,
1010
13
                        });
1011
13
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
1012
13
                    }
1013
13
                    return Ok(self);
1014
                },
1015
17
                _ = &mut kill_channel_rx => {
1016
2
                    killed_action = true;
1017
2
                    if let Err(
err0
) = child_process_guard.start_kill() {
  Branch (1017:28): [Folded - Ignored]
  Branch (1017:28): [Folded - Ignored]
  Branch (1017:28): [True: 0, False: 2]
1018
0
                        error!(
1019
0
                            operation_id = ?self.operation_id,
1020
                            ?err,
1021
0
                            "Could not kill process",
1022
                        );
1023
2
                    }
1024
2
                    {
1025
2
                        let mut state = self.state.lock();
1026
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1027
2
                            Code::Aborted,
1028
2
                            format!(
1029
2
                                "Command '{}' was killed by scheduler",
1030
2
                                args.join(OsStr::new(" ")).to_string_lossy()
1031
2
                            )
1032
2
                        )));
1033
2
                    }
1034
                },
1035
            }
1036
        }
1037
        // Unreachable.
1038
13
    }
1039
1040
11
    async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1041
        enum OutputType {
1042
            None,
1043
            File(FileInfo),
1044
            Directory(DirectoryInfo),
1045
            FileSymlink(SymlinkInfo),
1046
            DirectorySymlink(SymlinkInfo),
1047
        }
1048
1049
11
        debug!("Worker uploading results",);
1050
11
        let (mut command_proto, execution_result, mut execution_metadata) = {
1051
11
            let mut state = self.state.lock();
1052
11
            state.execution_metadata.output_upload_start_timestamp =
1053
11
                (self.running_actions_manager.callbacks.now_fn)();
1054
            (
1055
11
                state
1056
11
                    .command_proto
1057
11
                    .take()
1058
11
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1059
11
                state
1060
11
                    .execution_result
1061
11
                    .take()
1062
11
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1063
11
                state.execution_metadata.clone(),
1064
            )
1065
        };
1066
11
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1067
11
        let hasher = self.action_info.unique_qualifier.digest_function();
1068
1069
11
        let mut output_path_futures = FuturesUnordered::new();
1070
11
        let mut output_paths = command_proto.output_paths;
1071
11
        if output_paths.is_empty() {
  Branch (1071:12): [Folded - Ignored]
  Branch (1071:12): [Folded - Ignored]
  Branch (1071:12): [True: 6, False: 5]
1072
6
            output_paths
1073
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1074
6
            output_paths.append(&mut command_proto.output_files);
1075
6
            output_paths.append(&mut command_proto.output_directories);
1076
6
        
}5
1077
18
        for 
entry7
in output_paths {
1078
7
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
  Branch (1078:47): [Folded - Ignored]
  Branch (1078:47): [Folded - Ignored]
  Branch (1078:47): [True: 0, False: 7]
1079
0
                format!("{}/{}", self.work_directory, entry)
1080
            } else {
1081
7
                format!(
1082
7
                    "{}/{}/{}",
1083
7
                    self.work_directory, command_proto.working_directory, entry
1084
                )
1085
            });
1086
7
            let work_directory = &self.work_directory;
1087
7
            output_path_futures.push(async move {
1088
3
                let metadata = {
1089
7
                    let metadata = match fs::symlink_metadata(&full_path).await {
1090
7
                        Ok(file) => file,
1091
0
                        Err(e) => {
1092
0
                            if e.code == Code::NotFound {
  Branch (1092:32): [Folded - Ignored]
  Branch (1092:32): [Folded - Ignored]
  Branch (1092:32): [True: 0, False: 0]
1093
                                // In the event our output does not exist, according to the bazel remote
1094
                                // execution spec, we simply ignore it continue.
1095
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1096
0
                            }
1097
0
                            return Err(e).err_tip(|| {
1098
0
                                format!("Could not open file {}", full_path.display())
1099
0
                            });
1100
                        }
1101
                    };
1102
1103
7
                    if metadata.is_file() {
  Branch (1103:24): [Folded - Ignored]
  Branch (1103:24): [Folded - Ignored]
  Branch (1103:24): [True: 4, False: 3]
1104
                        return Ok(OutputType::File(
1105
4
                            upload_file(cas_store.as_pin(), &full_path, hasher, metadata)
1106
4
                                .await
1107
4
                                .map(|mut file_info| {
1108
4
                                    file_info.name_or_path = NameOrPath::Path(entry);
1109
4
                                    file_info
1110
4
                                })
1111
4
                                .err_tip(|| format!(
"Uploading file {}"0
,
full_path.display()0
))
?0
,
1112
                        ));
1113
3
                    }
1114
3
                    metadata
1115
                };
1116
3
                if metadata.is_dir() {
  Branch (1116:20): [Folded - Ignored]
  Branch (1116:20): [Folded - Ignored]
  Branch (1116:20): [True: 2, False: 1]
1117
                    Ok(OutputType::Directory(
1118
2
                        upload_directory(cas_store.as_pin(), &full_path, work_directory, hasher)
1119
2
                            .and_then(|(root_dir, children)| async move {
1120
2
                                let tree = ProtoTree {
1121
2
                                    root: Some(root_dir),
1122
2
                                    children: children.into(),
1123
2
                                };
1124
2
                                let tree_digest = serialize_and_upload_message(
1125
2
                                    &tree,
1126
2
                                    cas_store.as_pin(),
1127
2
                                    &mut hasher.hasher(),
1128
2
                                )
1129
2
                                .await
1130
2
                                .err_tip(|| format!(
"While processing {entry}"0
))
?0
;
1131
2
                                Ok(DirectoryInfo {
1132
2
                                    path: entry,
1133
2
                                    tree_digest,
1134
2
                                })
1135
4
                            })
1136
2
                            .await
1137
2
                            .err_tip(|| format!(
"Uploading directory {}"0
,
full_path.display()0
))
?0
,
1138
                    ))
1139
1
                } else if metadata.is_symlink() {
  Branch (1139:27): [Folded - Ignored]
  Branch (1139:27): [Folded - Ignored]
  Branch (1139:27): [True: 1, False: 0]
1140
1
                    let output_symlink = upload_symlink(&full_path, work_directory)
1141
1
                        .await
1142
1
                        .map(|mut symlink_info| {
1143
1
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1144
1
                            symlink_info
1145
1
                        })
1146
1
                        .err_tip(|| format!(
"Uploading symlink {}"0
,
full_path.display()0
))
?0
;
1147
1
                    match fs::metadata(&full_path).await {
1148
1
                        Ok(metadata) => {
1149
1
                            if metadata.is_dir() {
  Branch (1149:32): [Folded - Ignored]
  Branch (1149:32): [Folded - Ignored]
  Branch (1149:32): [True: 0, False: 1]
1150
0
                                return Ok(OutputType::DirectorySymlink(output_symlink));
1151
1
                            }
1152
                            // Note: If it's anything but directory we put it as a file symlink.
1153
1
                            return Ok(OutputType::FileSymlink(output_symlink));
1154
                        }
1155
0
                        Err(e) => {
1156
0
                            if e.code != Code::NotFound {
  Branch (1156:32): [Folded - Ignored]
  Branch (1156:32): [Folded - Ignored]
  Branch (1156:32): [True: 0, False: 0]
1157
0
                                return Err(e).err_tip(|| {
1158
0
                                    format!(
1159
0
                                        "While querying target symlink metadata for {}",
1160
0
                                        full_path.display()
1161
                                    )
1162
0
                                });
1163
0
                            }
1164
                            // If the file doesn't exist, we consider it a file. Even though the
1165
                            // file doesn't exist we still need to populate an entry.
1166
0
                            return Ok(OutputType::FileSymlink(output_symlink));
1167
                        }
1168
                    }
1169
                } else {
1170
0
                    Err(make_err!(
1171
0
                        Code::Internal,
1172
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1173
0
                    ))
1174
                }
1175
7
            });
1176
        }
1177
11
        let mut output_files = vec![];
1178
11
        let mut output_folders = vec![];
1179
11
        let mut output_directory_symlinks = vec![];
1180
11
        let mut output_file_symlinks = vec![];
1181
1182
11
        if execution_result.exit_code != 0 {
  Branch (1182:12): [Folded - Ignored]
  Branch (1182:12): [Folded - Ignored]
  Branch (1182:12): [True: 5, False: 6]
1183
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1184
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1185
5
            error!(
1186
                exit_code = ?execution_result.exit_code,
1187
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1188
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1189
5
                "Command returned non-zero exit code",
1190
            );
1191
6
        }
1192
1193
11
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1194
11
            let data = execution_result.stdout;
1195
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1196
11
            cas_store
1197
11
                .update_oneshot(digest, data)
1198
11
                .await
1199
11
                .err_tip(|| "Uploading stdout")
?0
;
1200
11
            Result::<DigestInfo, Error>::Ok(digest)
1201
11
        });
1202
11
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1203
11
            let data = execution_result.stderr;
1204
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1205
11
            cas_store
1206
11
                .update_oneshot(digest, data)
1207
11
                .await
1208
11
                .err_tip(|| "Uploading stdout")
?0
;
1209
11
            Result::<DigestInfo, Error>::Ok(digest)
1210
11
        });
1211
1212
11
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1213
18
            while let Some(
output_type7
) = output_path_futures.try_next().await
?0
{
  Branch (1213:23): [Folded - Ignored]
  Branch (1213:23): [Folded - Ignored]
  Branch (1213:23): [True: 7, False: 11]
1214
7
                match output_type {
1215
4
                    OutputType::File(output_file) => output_files.push(output_file),
1216
2
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1217
1
                    OutputType::FileSymlink(output_symlink) => {
1218
1
                        output_file_symlinks.push(output_symlink);
1219
1
                    }
1220
0
                    OutputType::DirectorySymlink(output_symlink) => {
1221
0
                        output_directory_symlinks.push(output_symlink);
1222
0
                    }
1223
0
                    OutputType::None => { /* Safe to ignore */ }
1224
                }
1225
            }
1226
11
            Ok(())
1227
11
        });
1228
11
        drop(output_path_futures);
1229
11
        let (stdout_digest, stderr_digest) = match upload_result {
1230
11
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1231
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1232
        };
1233
1234
11
        execution_metadata.output_upload_completed_timestamp =
1235
11
            (self.running_actions_manager.callbacks.now_fn)();
1236
11
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1237
11
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1238
11
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1239
11
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1240
11
        {
1241
11
            let mut state = self.state.lock();
1242
11
            execution_metadata.worker_completed_timestamp =
1243
11
                (self.running_actions_manager.callbacks.now_fn)();
1244
11
            state.action_result = Some(ActionResult {
1245
11
                output_files,
1246
11
                output_folders,
1247
11
                output_directory_symlinks,
1248
11
                output_file_symlinks,
1249
11
                exit_code: execution_result.exit_code,
1250
11
                stdout_digest,
1251
11
                stderr_digest,
1252
11
                execution_metadata,
1253
11
                server_logs: HashMap::default(), // TODO(aaronmondal) Not implemented.
1254
11
                error: state.error.clone(),
1255
11
                message: String::new(), // Will be filled in on cache_action_result if needed.
1256
11
            });
1257
11
        }
1258
11
        Ok(self)
1259
11
    }
1260
1261
11
    async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
1262
11
        let mut state = self.state.lock();
1263
11
        state
1264
11
            .action_result
1265
11
            .take()
1266
11
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1267
11
    }
1268
}
1269
1270
impl Drop for RunningActionImpl {
1271
18
    fn drop(&mut self) {
1272
18
        if self.did_cleanup.load(Ordering::Acquire) {
  Branch (1272:12): [True: 17, False: 1]
  Branch (1272:12): [Folded - Ignored]
1273
17
            return;
1274
1
        }
1275
1
        let operation_id = self.operation_id.clone();
1276
1
        error!(
1277
            ?operation_id,
1278
1
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1279
        );
1280
1
        let running_actions_manager = self.running_actions_manager.clone();
1281
1
        let action_directory = self.action_directory.clone();
1282
1
        background_spawn!("running_action_impl_drop", async move 
{0
1283
0
            let Err(err) =
  Branch (1283:17): [True: 0, False: 0]
  Branch (1283:17): [Folded - Ignored]
1284
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1285
            else {
1286
0
                return;
1287
            };
1288
0
            error!(
1289
                ?operation_id,
1290
                ?action_directory,
1291
                ?err,
1292
0
                "Error cleaning up action"
1293
            );
1294
0
        });
1295
18
    }
1296
}
1297
1298
impl RunningAction for RunningActionImpl {
1299
0
    fn get_operation_id(&self) -> &OperationId {
1300
0
        &self.operation_id
1301
0
    }
1302
1303
15
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1304
15
        self.metrics()
1305
15
            .clone()
1306
15
            .prepare_action
1307
15
            .wrap(Self::inner_prepare_action(self))
1308
15
            .await
1309
15
    }
1310
1311
13
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1312
13
        self.metrics()
1313
13
            .clone()
1314
13
            .execute
1315
13
            .wrap(Self::inner_execute(self))
1316
13
            .await
1317
13
    }
1318
1319
11
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1320
11
        self.metrics()
1321
11
            .clone()
1322
11
            .upload_results
1323
11
            .wrap(Self::inner_upload_results(self))
1324
11
            .await
1325
11
    }
1326
1327
17
    async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1328
17
        self.metrics()
1329
17
            .clone()
1330
17
            .cleanup
1331
17
            .wrap(async move {
1332
17
                let result = do_cleanup(
1333
17
                    &self.running_actions_manager,
1334
17
                    &self.operation_id,
1335
17
                    &self.action_directory,
1336
17
                )
1337
17
                .await;
1338
17
                self.did_cleanup.store(true, Ordering::Release);
1339
17
                result.map(move |()| self)
1340
17
            })
1341
17
            .await
1342
17
    }
1343
1344
11
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1345
11
        self.metrics()
1346
11
            .clone()
1347
11
            .get_finished_result
1348
11
            .wrap(Self::inner_get_finished_result(self))
1349
11
            .await
1350
11
    }
1351
1352
0
    fn get_work_directory(&self) -> &String {
1353
0
        &self.work_directory
1354
0
    }
1355
}
1356
1357
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1358
    type RunningAction: RunningAction;
1359
1360
    fn create_and_add_action(
1361
        self: &Arc<Self>,
1362
        worker_id: String,
1363
        start_execute: StartExecute,
1364
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1365
1366
    fn cache_action_result(
1367
        &self,
1368
        action_digest: DigestInfo,
1369
        action_result: &mut ActionResult,
1370
        hasher: DigestHasherFunc,
1371
    ) -> impl Future<Output = Result<(), Error>> + Send;
1372
1373
    fn complete_actions(&self, complete_msg: ShutdownGuard) -> impl Future<Output = ()> + Send;
1374
1375
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1376
1377
    fn kill_operation(
1378
        &self,
1379
        operation_id: &OperationId,
1380
    ) -> impl Future<Output = Result<(), Error>> + Send;
1381
1382
    fn metrics(&self) -> &Arc<Metrics>;
1383
}
1384
1385
/// A function to get the current system time, used to allow mocking for tests
1386
type NowFn = fn() -> SystemTime;
1387
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1388
1389
/// Functions that may be injected for testing purposes, during standard control
1390
/// flows these are specified by the new function.
1391
#[derive(Clone, Copy)]
1392
pub struct Callbacks {
1393
    /// A function that gets the current time.
1394
    pub now_fn: NowFn,
1395
    /// A function that sleeps for a given Duration.
1396
    pub sleep_fn: SleepFn,
1397
}
1398
1399
impl Debug for Callbacks {
1400
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1401
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
1402
0
    }
1403
}
1404
1405
/// The set of additional information for executing an action over and above
1406
/// those given in the `ActionInfo` passed to the worker.  This allows
1407
/// modification of the action for execution on this particular worker.  This
1408
/// may be used to run the action with a particular set of additional
1409
/// environment variables, or perhaps configure it to execute within a
1410
/// container.
1411
#[derive(Debug, Default)]
1412
pub struct ExecutionConfiguration {
1413
    /// If set, will be executed instead of the first argument passed in the
1414
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
1415
    /// arguments to this command.
1416
    pub entrypoint: Option<String>,
1417
    /// The only environment variables that will be specified when the command
1418
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
1419
    /// and PATH are also assigned (see `inner_execute`).
1420
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1421
}
1422
1423
#[derive(Debug)]
1424
struct UploadActionResults {
1425
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1426
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1427
    ac_store: Option<Store>,
1428
    historical_store: Store,
1429
    success_message_template: Template,
1430
    failure_message_template: Template,
1431
}
1432
1433
impl UploadActionResults {
1434
26
    fn new(
1435
26
        config: &UploadActionResultConfig,
1436
26
        ac_store: Option<Store>,
1437
26
        historical_store: Store,
1438
26
    ) -> Result<Self, Error> {
1439
26
        let upload_historical_results_strategy = config
1440
26
            .upload_historical_results_strategy
1441
26
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1442
8
        if !matches!(
  Branch (1442:12): [True: 8, False: 18]
  Branch (1442:12): [Folded - Ignored]
1443
26
            config.upload_ac_results_strategy,
1444
            UploadCacheResultsStrategy::Never
1445
8
        ) && ac_store.is_none()
  Branch (1445:14): [True: 0, False: 8]
  Branch (1445:14): [Folded - Ignored]
1446
        {
1447
0
            return Err(make_input_err!(
1448
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1449
0
            ));
1450
26
        }
1451
        Ok(Self {
1452
26
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1453
26
            upload_historical_results_strategy,
1454
26
            ac_store,
1455
26
            historical_store,
1456
26
            success_message_template: Template::new(&config.success_message_template).map_err(
1457
0
                |e| {
1458
0
                    make_input_err!(
1459
                        "Could not convert success_message_template to rust template: {} : {e:?}",
1460
                        config.success_message_template
1461
                    )
1462
0
                },
1463
0
            )?,
1464
26
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1465
0
                |e| {
1466
0
                    make_input_err!(
1467
                        "Could not convert failure_message_template to rust template: {} : {e:?}",
1468
                        config.success_message_template
1469
                    )
1470
0
                },
1471
0
            )?,
1472
        })
1473
26
    }
1474
1475
0
    const fn should_cache_result(
1476
0
        strategy: UploadCacheResultsStrategy,
1477
0
        action_result: &ActionResult,
1478
0
        treat_infra_error_as_failure: bool,
1479
0
    ) -> bool {
1480
0
        let did_fail = action_result.exit_code != 0
  Branch (1480:24): [Folded - Ignored]
1481
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
  Branch (1481:17): [Folded - Ignored]
1482
0
        match strategy {
1483
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
1484
0
            UploadCacheResultsStrategy::Never => false,
1485
            // Never cache internal errors or timeouts.
1486
            UploadCacheResultsStrategy::Everything => {
1487
0
                treat_infra_error_as_failure || action_result.error.is_none()
  Branch (1487:17): [Folded - Ignored]
1488
            }
1489
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
1490
        }
1491
0
    }
1492
1493
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
1494
    /// or `failure_message_template` config templates.
1495
5
    fn format_execute_response_message(
1496
5
        mut template_str: Template,
1497
5
        action_digest_info: DigestInfo,
1498
5
        maybe_historical_digest_info: Option<DigestInfo>,
1499
5
        hasher: DigestHasherFunc,
1500
5
    ) -> Result<String, Error> {
1501
5
        template_str.replace(
1502
            "digest_function",
1503
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1504
        );
1505
5
        template_str.replace(
1506
            "action_digest_hash",
1507
5
            action_digest_info.packed_hash().to_string(),
1508
        );
1509
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1510
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
  Branch (1510:16): [True: 3, False: 2]
  Branch (1510:16): [Folded - Ignored]
1511
3
            template_str.replace(
1512
3
                "historical_results_hash",
1513
3
                format!("{}", historical_digest_info.packed_hash()),
1514
3
            );
1515
3
            template_str.replace(
1516
3
                "historical_results_size",
1517
3
                historical_digest_info.size_bytes(),
1518
3
            );
1519
3
        } else {
1520
2
            template_str.replace("historical_results_hash", "");
1521
2
            template_str.replace("historical_results_size", "");
1522
2
        }
1523
5
        template_str
1524
5
            .text()
1525
5
            .map_err(|e| make_input_err!("Could not convert template to text: {e:?}"))
1526
5
    }
1527
1528
5
    async fn upload_ac_results(
1529
5
        &self,
1530
5
        action_digest: DigestInfo,
1531
5
        action_result: ProtoActionResult,
1532
5
        hasher: DigestHasherFunc,
1533
5
    ) -> Result<(), Error> {
1534
5
        let Some(ac_store) = self.ac_store.as_ref() else {
  Branch (1534:13): [Folded - Ignored]
  Branch (1534:13): [Folded - Ignored]
  Branch (1534:13): [True: 5, False: 0]
1535
0
            return Ok(());
1536
        };
1537
        // If we are a GrpcStore we shortcut here, as this is a special store.
1538
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (1538:16): [Folded - Ignored]
  Branch (1538:16): [Folded - Ignored]
  Branch (1538:16): [True: 0, False: 5]
1539
0
            let update_action_request = UpdateActionResultRequest {
1540
0
                // This is populated by `update_action_result`.
1541
0
                instance_name: String::new(),
1542
0
                action_digest: Some(action_digest.into()),
1543
0
                action_result: Some(action_result),
1544
0
                results_cache_policy: None,
1545
0
                digest_function: hasher.proto_digest_func().into(),
1546
0
            };
1547
0
            return grpc_store
1548
0
                .update_action_result(Request::new(update_action_request))
1549
0
                .await
1550
0
                .map(|_| ())
1551
0
                .err_tip(|| "Caching ActionResult");
1552
5
        }
1553
1554
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1555
5
        action_result
1556
5
            .encode(&mut store_data)
1557
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
1558
1559
5
        ac_store
1560
5
            .update_oneshot(action_digest, store_data.split().freeze())
1561
5
            .await
1562
5
            .err_tip(|| "Caching ActionResult")
1563
5
    }
1564
1565
0
    async fn upload_historical_results_with_message(
1566
0
        &self,
1567
0
        action_digest: DigestInfo,
1568
0
        execute_response: ExecuteResponse,
1569
0
        message_template: Template,
1570
0
        hasher: DigestHasherFunc,
1571
3
    ) -> Result<String, Error> {
1572
3
        let historical_digest_info = serialize_and_upload_message(
1573
3
            &HistoricalExecuteResponse {
1574
3
                action_digest: Some(action_digest.into()),
1575
3
                execute_response: Some(execute_response.clone()),
1576
3
            },
1577
3
            self.historical_store.as_pin(),
1578
3
            &mut hasher.hasher(),
1579
3
        )
1580
3
        .await
1581
3
        .err_tip(|| format!(
"Caching HistoricalExecuteResponse for digest: {action_digest}"0
))
?0
;
1582
1583
3
        Self::format_execute_response_message(
1584
3
            message_template,
1585
3
            action_digest,
1586
3
            Some(historical_digest_info),
1587
3
            hasher,
1588
        )
1589
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
1590
3
    }
1591
1592
6
    async fn cache_action_result(
1593
6
        &self,
1594
6
        action_info: DigestInfo,
1595
6
        action_result: &mut ActionResult,
1596
6
        hasher: DigestHasherFunc,
1597
6
    ) -> Result<(), Error> {
1598
6
        let should_upload_historical_results =
1599
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1600
6
        let should_upload_ac_results =
1601
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1602
        // Shortcut so we don't need to convert to proto if not needed.
1603
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
  Branch (1603:12): [Folded - Ignored]
  Branch (1603:41): [Folded - Ignored]
  Branch (1603:12): [Folded - Ignored]
  Branch (1603:41): [Folded - Ignored]
  Branch (1603:12): [True: 1, False: 5]
  Branch (1603:41): [True: 1, False: 0]
1604
1
            return Ok(());
1605
5
        }
1606
1607
5
        let mut execute_response = to_execute_response(action_result.clone());
1608
1609
        // In theory exit code should always be != 0 if there's an error, but for safety we
1610
        // catch both.
1611
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
  Branch (1611:35): [Folded - Ignored]
  Branch (1611:67): [Folded - Ignored]
  Branch (1611:35): [Folded - Ignored]
  Branch (1611:67): [Folded - Ignored]
  Branch (1611:35): [True: 4, False: 1]
  Branch (1611:67): [True: 3, False: 1]
1612
3
            self.success_message_template.clone()
1613
        } else {
1614
2
            self.failure_message_template.clone()
1615
        };
1616
1617
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
  Branch (1617:64): [Folded - Ignored]
  Branch (1617:64): [Folded - Ignored]
  Branch (1617:64): [True: 3, False: 2]
1618
3
            let maybe_message = self
1619
3
                .upload_historical_results_with_message(
1620
3
                    action_info,
1621
3
                    execute_response.clone(),
1622
3
                    message_template,
1623
3
                    hasher,
1624
3
                )
1625
3
                .await;
1626
3
            match maybe_message {
1627
3
                Ok(message) => {
1628
3
                    action_result.message.clone_from(&message);
1629
3
                    execute_response.message = message;
1630
3
                    Ok(())
1631
                }
1632
0
                Err(e) => Result::<(), Error>::Err(e),
1633
            }
1634
        } else {
1635
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1636
            {
1637
2
                Ok(message) => {
1638
2
                    action_result.message.clone_from(&message);
1639
2
                    execute_response.message = message;
1640
2
                    Ok(())
1641
                }
1642
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1643
            }
1644
        };
1645
1646
        // Note: Done in this order because we assume most results will succeed and most configs will
1647
        // either always upload upload historical results or only upload on filure. In which case
1648
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1649
5
        let ac_upload_results = if should_upload_ac_results {
  Branch (1649:36): [Folded - Ignored]
  Branch (1649:36): [Folded - Ignored]
  Branch (1649:36): [True: 5, False: 0]
1650
5
            self.upload_ac_results(
1651
5
                action_info,
1652
5
                execute_response
1653
5
                    .result
1654
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
1655
5
                hasher,
1656
            )
1657
5
            .await
1658
        } else {
1659
0
            Ok(())
1660
        };
1661
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1662
6
    }
1663
}
1664
1665
#[derive(Debug)]
1666
pub struct RunningActionsManagerArgs<'a> {
1667
    pub root_action_directory: String,
1668
    pub execution_configuration: ExecutionConfiguration,
1669
    pub cas_store: Arc<FastSlowStore>,
1670
    pub ac_store: Option<Store>,
1671
    pub historical_store: Store,
1672
    pub upload_action_result_config: &'a UploadActionResultConfig,
1673
    pub max_action_timeout: Duration,
1674
    pub timeout_handled_externally: bool,
1675
}
1676
1677
/// Holds state info about what is being executed and the interface for interacting
1678
/// with actions while they are running.
1679
#[derive(Debug)]
1680
pub struct RunningActionsManagerImpl {
1681
    root_action_directory: String,
1682
    execution_configuration: ExecutionConfiguration,
1683
    cas_store: Arc<FastSlowStore>,
1684
    filesystem_store: Arc<FilesystemStore>,
1685
    upload_action_results: UploadActionResults,
1686
    max_action_timeout: Duration,
1687
    timeout_handled_externally: bool,
1688
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
1689
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
1690
    // Notify does not support.
1691
    action_done_tx: watch::Sender<()>,
1692
    callbacks: Callbacks,
1693
    metrics: Arc<Metrics>,
1694
    /// Track operations being cleaned up to avoid directory collisions during action retries.
1695
    /// When an action fails and is retried on the same worker, we need to ensure the previous
1696
    /// attempt's directory is fully cleaned up before creating a new one.
1697
    /// See: <https://github.com/TraceMachina/nativelink/issues/1859>
1698
    cleaning_up_operations: Mutex<HashSet<OperationId>>,
1699
    /// Notify waiters when a cleanup operation completes. This is used in conjunction with
1700
    /// `cleaning_up_operations` to coordinate directory cleanup and creation.
1701
    cleanup_complete_notify: Arc<Notify>,
1702
}
1703
1704
impl RunningActionsManagerImpl {
1705
    /// Maximum time to wait for a cleanup operation to complete before timing out.
1706
    /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
1707
    const MAX_WAIT: Duration = Duration::from_secs(30);
1708
    /// Maximum backoff duration for exponential backoff when waiting for cleanup.
1709
    const MAX_BACKOFF: Duration = Duration::from_millis(500);
1710
26
    pub fn new_with_callbacks(
1711
26
        args: RunningActionsManagerArgs<'_>,
1712
26
        callbacks: Callbacks,
1713
26
    ) -> Result<Self, Error> {
1714
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
1715
26
        let filesystem_store = args
1716
26
            .cas_store
1717
26
            .fast_store()
1718
26
            .downcast_ref::<FilesystemStore>(None)
1719
26
            .err_tip(
1720
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
1721
0
            )?
1722
26
            .get_arc()
1723
26
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
1724
26
        let (action_done_tx, _) = watch::channel(());
1725
        Ok(Self {
1726
26
            root_action_directory: args.root_action_directory,
1727
26
            execution_configuration: args.execution_configuration,
1728
26
            cas_store: args.cas_store,
1729
26
            filesystem_store,
1730
26
            upload_action_results: UploadActionResults::new(
1731
26
                args.upload_action_result_config,
1732
26
                args.ac_store,
1733
26
                args.historical_store,
1734
            )
1735
26
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
1736
26
            max_action_timeout: args.max_action_timeout,
1737
26
            timeout_handled_externally: args.timeout_handled_externally,
1738
26
            running_actions: Mutex::new(HashMap::new()),
1739
26
            action_done_tx,
1740
26
            callbacks,
1741
26
            metrics: Arc::new(Metrics::default()),
1742
26
            cleaning_up_operations: Mutex::new(HashSet::new()),
1743
26
            cleanup_complete_notify: Arc::new(Notify::new()),
1744
        })
1745
26
    }
1746
1747
12
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
1748
12
        Self::new_with_callbacks(
1749
12
            args,
1750
            Callbacks {
1751
12
                now_fn: SystemTime::now,
1752
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
1753
            },
1754
        )
1755
12
    }
1756
1757
    /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
1758
    /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
1759
    /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
1760
19
    async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error> {
1761
19
        let start = Instant::now();
1762
19
        let mut backoff = Duration::from_millis(10);
1763
19
        let mut has_waited = false;
1764
1765
        loop {
1766
19
            let should_wait = {
1767
19
                let cleaning = self.cleaning_up_operations.lock();
1768
19
                cleaning.contains(operation_id)
1769
            };
1770
1771
19
            if !should_wait {
  Branch (1771:16): [Folded - Ignored]
  Branch (1771:16): [Folded - Ignored]
  Branch (1771:16): [True: 19, False: 0]
1772
19
                let dir_path =
1773
19
                    PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
1774
1775
19
                if !dir_path.exists() {
  Branch (1775:20): [Folded - Ignored]
  Branch (1775:20): [Folded - Ignored]
  Branch (1775:20): [True: 18, False: 1]
1776
18
                    return Ok(());
1777
1
                }
1778
1779
                // Safety check: ensure we're only removing directories under root_action_directory
1780
1
                let root_path = Path::new(&self.root_action_directory);
1781
1
                let canonical_root = root_path.canonicalize().err_tip(|| 
{0
1782
0
                    format!(
1783
0
                        "Failed to canonicalize root directory: {}",
1784
                        self.root_action_directory
1785
                    )
1786
0
                })?;
1787
1
                let canonical_dir = dir_path.canonicalize().err_tip(|| 
{0
1788
0
                    format!("Failed to canonicalize directory: {}", dir_path.display())
1789
0
                })?;
1790
1791
1
                if !canonical_dir.starts_with(&canonical_root) {
  Branch (1791:20): [Folded - Ignored]
  Branch (1791:20): [Folded - Ignored]
  Branch (1791:20): [True: 0, False: 1]
1792
0
                    return Err(make_err!(
1793
0
                        Code::Internal,
1794
0
                        "Attempted to remove directory outside of root_action_directory: {}",
1795
0
                        dir_path.display()
1796
0
                    ));
1797
1
                }
1798
1799
                // Directory exists but not being cleaned - remove it
1800
1
                warn!(
1801
1
                    "Removing stale directory for {}: {}",
1802
                    operation_id,
1803
1
                    dir_path.display()
1804
                );
1805
1
                self.metrics.stale_removals.inc();
1806
1807
                // Try to remove the directory, with one retry on failure
1808
1
                let remove_result = fs::remove_dir_all(&dir_path).await;
1809
1
                if let Err(
e0
) = remove_result {
  Branch (1809:24): [Folded - Ignored]
  Branch (1809:24): [Folded - Ignored]
  Branch (1809:24): [True: 0, False: 1]
1810
                    // Retry once after a short delay in case the directory is temporarily locked
1811
0
                    tokio::time::sleep(Duration::from_millis(100)).await;
1812
0
                    fs::remove_dir_all(&dir_path).await.err_tip(|| {
1813
0
                        format!(
1814
0
                            "Failed to remove stale directory {} for retry of {} after retry (original error: {})",
1815
0
                            dir_path.display(),
1816
                            operation_id,
1817
                            e
1818
                        )
1819
0
                    })?;
1820
1
                }
1821
1
                return Ok(());
1822
0
            }
1823
1824
0
            if start.elapsed() > Self::MAX_WAIT {
  Branch (1824:16): [Folded - Ignored]
  Branch (1824:16): [Folded - Ignored]
  Branch (1824:16): [True: 0, False: 0]
1825
0
                self.metrics.cleanup_wait_timeouts.inc();
1826
0
                return Err(make_err!(
1827
0
                    Code::DeadlineExceeded,
1828
0
                    "Timeout waiting for previous operation cleanup: {} (waited {:?})",
1829
0
                    operation_id,
1830
0
                    start.elapsed()
1831
0
                ));
1832
0
            }
1833
1834
0
            if !has_waited {
  Branch (1834:16): [Folded - Ignored]
  Branch (1834:16): [Folded - Ignored]
  Branch (1834:16): [True: 0, False: 0]
1835
0
                self.metrics.cleanup_waits.inc();
1836
0
                has_waited = true;
1837
0
            }
1838
1839
0
            trace!(
1840
0
                "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
1841
                operation_id,
1842
0
                start.elapsed(),
1843
                backoff
1844
            );
1845
1846
0
            tokio::select! {
1847
0
                () = self.cleanup_complete_notify.notified() => {},
1848
0
                () = tokio::time::sleep(backoff) => {
1849
0
                    // Exponential backoff
1850
0
                    backoff = (backoff * 2).min(Self::MAX_BACKOFF);
1851
0
                },
1852
            }
1853
        }
1854
19
    }
1855
1856
0
    fn make_action_directory<'a>(
1857
0
        &'a self,
1858
0
        operation_id: &'a OperationId,
1859
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
1860
19
        
self.metrics.make_action_directory0
.
wrap0
(async move {
1861
19
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
1862
19
            fs::create_dir(&action_directory)
1863
19
                .await
1864
19
                .err_tip(|| format!(
"Error creating action directory {action_directory}"0
))
?0
;
1865
19
            Ok(action_directory)
1866
19
        })
1867
0
    }
1868
1869
19
    fn create_action_info(
1870
19
        &self,
1871
19
        start_execute: StartExecute,
1872
19
        queued_timestamp: SystemTime,
1873
19
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
1874
19
        self.metrics.create_action_info.wrap(async move {
1875
19
            let execute_request = start_execute
1876
19
                .execute_request
1877
19
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
1878
19
            let action_digest: DigestInfo = execute_request
1879
19
                .action_digest
1880
19
                .clone()
1881
19
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
1882
19
                .try_into()
?0
;
1883
19
            let load_start_timestamp = (self.callbacks.now_fn)();
1884
19
            let action =
1885
19
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
1886
19
                    .await
1887
19
                    .err_tip(|| "During start_action")
?0
;
1888
19
            let action_info = ActionInfo::try_from_action_and_execute_request(
1889
19
                execute_request,
1890
19
                action,
1891
19
                load_start_timestamp,
1892
19
                queued_timestamp,
1893
            )
1894
19
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
1895
19
            Ok(action_info)
1896
19
        })
1897
19
    }
1898
1899
17
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
1900
17
        let mut running_actions = self.running_actions.lock();
1901
17
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
1902
0
            format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl")
1903
0
        });
1904
        // No need to copy anything, we just are telling the receivers an event happened.
1905
17
        self.action_done_tx.send_modify(|()| {});
1906
17
        result.map(|_| ())
1907
17
    }
1908
1909
    // Note: We do not capture metrics on this call, only `.kill_all()`.
1910
    // Important: When the future returns the process may still be running.
1911
2
    async fn kill_operation(action: Arc<RunningActionImpl>) {
1912
2
        warn!(
1913
2
            operation_id = ?action.operation_id,
1914
2
            "Sending kill to running operation",
1915
        );
1916
2
        let kill_channel_tx = {
1917
2
            let mut action_state = action.state.lock();
1918
2
            action_state.kill_channel_tx.take()
1919
        };
1920
2
        if let Some(kill_channel_tx) = kill_channel_tx {
  Branch (1920:16): [Folded - Ignored]
  Branch (1920:16): [Folded - Ignored]
  Branch (1920:16): [True: 2, False: 0]
1921
2
            if kill_channel_tx.send(()).is_err() {
  Branch (1921:16): [Folded - Ignored]
  Branch (1921:16): [Folded - Ignored]
  Branch (1921:16): [True: 0, False: 2]
1922
0
                error!(
1923
0
                    operation_id = ?action.operation_id,
1924
0
                    "Error sending kill to running operation",
1925
                );
1926
2
            }
1927
0
        }
1928
2
    }
1929
}
1930
1931
impl RunningActionsManager for RunningActionsManagerImpl {
1932
    type RunningAction = RunningActionImpl;
1933
1934
0
    async fn create_and_add_action(
1935
0
        self: &Arc<Self>,
1936
0
        worker_id: String,
1937
0
        start_execute: StartExecute,
1938
19
    ) -> Result<Arc<RunningActionImpl>, Error> {
1939
19
        self.metrics
1940
19
            .create_and_add_action
1941
19
            .wrap(async move {
1942
19
                let queued_timestamp = start_execute
1943
19
                    .queued_timestamp
1944
19
                    .and_then(|time| 
time13
.
try_into13
().
ok13
())
1945
19
                    .unwrap_or(SystemTime::UNIX_EPOCH);
1946
19
                let operation_id = start_execute
1947
19
                    .operation_id.as_str().into();
1948
19
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
1949
19
                debug!(
1950
                    ?action_info,
1951
19
                    "Worker received action",
1952
                );
1953
                // Wait for any previous cleanup to complete before creating directory
1954
19
                self.wait_for_cleanup_if_needed(&operation_id).await
?0
;
1955
19
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
1956
19
                let execution_metadata = ExecutionMetadata {
1957
19
                    worker: worker_id,
1958
19
                    queued_timestamp: action_info.insert_timestamp,
1959
19
                    worker_start_timestamp: action_info.load_timestamp,
1960
19
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
1961
19
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
1962
19
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
1963
19
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
1964
19
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
1965
19
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
1966
19
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
1967
19
                };
1968
19
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
  Branch (1968:34): [Folded - Ignored]
  Branch (1968:67): [Folded - Ignored]
  Branch (1968:34): [Folded - Ignored]
  Branch (1968:67): [Folded - Ignored]
  Branch (1968:34): [True: 16, False: 3]
  Branch (1968:67): [True: 0, False: 3]
1969
16
                    self.max_action_timeout
1970
                } else {
1971
3
                    action_info.timeout
1972
                };
1973
19
                if timeout > self.max_action_timeout {
  Branch (1973:20): [Folded - Ignored]
  Branch (1973:20): [Folded - Ignored]
  Branch (1973:20): [True: 1, False: 18]
1974
1
                    return Err(make_err!(
1975
1
                        Code::InvalidArgument,
1976
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
1977
1
                        timeout.as_secs_f32(),
1978
1
                        self.max_action_timeout.as_secs_f32()
1979
1
                    ));
1980
18
                }
1981
18
                let running_action = Arc::new(RunningActionImpl::new(
1982
18
                    execution_metadata,
1983
18
                    operation_id.clone(),
1984
18
                    action_directory,
1985
18
                    action_info,
1986
18
                    timeout,
1987
18
                    self.clone(),
1988
                ));
1989
                {
1990
18
                    let mut running_actions = self.running_actions.lock();
1991
                    // Check if action already exists and is still alive
1992
18
                    if let Some(
existing_weak0
) = running_actions.get(&operation_id) {
  Branch (1992:28): [Folded - Ignored]
  Branch (1992:28): [Folded - Ignored]
  Branch (1992:28): [True: 0, False: 18]
1993
0
                        if let Some(_existing_action) = existing_weak.upgrade() {
  Branch (1993:32): [Folded - Ignored]
  Branch (1993:32): [Folded - Ignored]
  Branch (1993:32): [True: 0, False: 0]
1994
0
                            return Err(make_err!(
1995
0
                                Code::AlreadyExists,
1996
0
                                "Action with operation_id {} is already running",
1997
0
                                operation_id
1998
0
                            ));
1999
0
                        }
2000
18
                    }
2001
18
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
2002
                }
2003
18
                Ok(running_action)
2004
19
            })
2005
19
            .await
2006
19
    }
2007
2008
6
    async fn cache_action_result(
2009
6
        &self,
2010
6
        action_info: DigestInfo,
2011
6
        action_result: &mut ActionResult,
2012
6
        hasher: DigestHasherFunc,
2013
6
    ) -> Result<(), Error> {
2014
6
        self.metrics
2015
6
            .cache_action_result
2016
6
            .wrap(self.upload_action_results.cache_action_result(
2017
6
                action_info,
2018
6
                action_result,
2019
6
                hasher,
2020
6
            ))
2021
6
            .await
2022
6
    }
2023
2024
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
2025
0
        let running_action = {
2026
0
            let running_actions = self.running_actions.lock();
2027
0
            running_actions
2028
0
                .get(operation_id)
2029
0
                .and_then(Weak::upgrade)
2030
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
2031
        };
2032
0
        Self::kill_operation(running_action).await;
2033
0
        Ok(())
2034
0
    }
2035
2036
    // Waits for all running actions to complete and signals completion.
2037
    // Use the ShutdownGuard to signal the completion of the actions
2038
    // Dropping the sender automatically notifies the process to terminate.
2039
0
    async fn complete_actions(&self, _complete_msg: ShutdownGuard) {
2040
0
        drop(
2041
0
            self.action_done_tx
2042
0
                .subscribe()
2043
0
                .wait_for(|()| self.running_actions.lock().is_empty())
2044
0
                .await,
2045
        );
2046
0
    }
2047
2048
    // Note: When the future returns the process should be fully killed and cleaned up.
2049
2
    async fn kill_all(&self) {
2050
2
        self.metrics
2051
2
            .kill_all
2052
2
            .wrap_no_capture_result(async move {
2053
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
2054
2
                    let running_actions = self.running_actions.lock();
2055
2
                    running_actions
2056
2
                        .iter()
2057
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
2058
2
                        .collect()
2059
                };
2060
4
                for 
action2
in kill_operations {
2061
2
                    Self::kill_operation(action).await;
2062
                }
2063
2
            })
2064
2
            .await;
2065
        // Ignore error. If error happens it means there's no sender, which is not a problem.
2066
        // Note: Sanity check this API will always check current value then future values:
2067
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
2068
2
        drop(
2069
2
            self.action_done_tx
2070
2
                .subscribe()
2071
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
2072
2
                .await,
2073
        );
2074
2
    }
2075
2076
    #[inline]
2077
2
    fn metrics(&self) -> &Arc<Metrics> {
2078
2
        &self.metrics
2079
2
    }
2080
}
2081
2082
#[derive(Debug, Default, MetricsComponent)]
2083
pub struct Metrics {
2084
    #[metric(help = "Stats about the create_and_add_action command.")]
2085
    create_and_add_action: AsyncCounterWrapper,
2086
    #[metric(help = "Stats about the cache_action_result command.")]
2087
    cache_action_result: AsyncCounterWrapper,
2088
    #[metric(help = "Stats about the kill_all command.")]
2089
    kill_all: AsyncCounterWrapper,
2090
    #[metric(help = "Stats about the create_action_info command.")]
2091
    create_action_info: AsyncCounterWrapper,
2092
    #[metric(help = "Stats about the make_work_directory command.")]
2093
    make_action_directory: AsyncCounterWrapper,
2094
    #[metric(help = "Stats about the prepare_action command.")]
2095
    prepare_action: AsyncCounterWrapper,
2096
    #[metric(help = "Stats about the execute command.")]
2097
    execute: AsyncCounterWrapper,
2098
    #[metric(help = "Stats about the upload_results command.")]
2099
    upload_results: AsyncCounterWrapper,
2100
    #[metric(help = "Stats about the cleanup command.")]
2101
    cleanup: AsyncCounterWrapper,
2102
    #[metric(help = "Stats about the get_finished_result command.")]
2103
    get_finished_result: AsyncCounterWrapper,
2104
    #[metric(help = "Number of times an action waited for cleanup to complete.")]
2105
    cleanup_waits: CounterWithTime,
2106
    #[metric(help = "Number of stale directories removed during action retries.")]
2107
    stale_removals: CounterWithTime,
2108
    #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2109
    cleanup_wait_timeouts: CounterWithTime,
2110
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
2111
    get_proto_command_from_store: AsyncCounterWrapper,
2112
    #[metric(help = "Stats about the download_to_directory command.")]
2113
    download_to_directory: AsyncCounterWrapper,
2114
    #[metric(help = "Stats about the prepare_output_files command.")]
2115
    prepare_output_files: AsyncCounterWrapper,
2116
    #[metric(help = "Stats about the prepare_output_paths command.")]
2117
    prepare_output_paths: AsyncCounterWrapper,
2118
    #[metric(help = "Stats about the child_process command.")]
2119
    child_process: AsyncCounterWrapper,
2120
    #[metric(help = "Stats about the child_process_success_error_code command.")]
2121
    child_process_success_error_code: CounterWithTime,
2122
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
2123
    child_process_failure_error_code: CounterWithTime,
2124
    #[metric(help = "Total time spent uploading stdout.")]
2125
    upload_stdout: AsyncCounterWrapper,
2126
    #[metric(help = "Total time spent uploading stderr.")]
2127
    upload_stderr: AsyncCounterWrapper,
2128
    #[metric(help = "Total number of task timeouts.")]
2129
    task_timeouts: CounterWithTime,
2130
}