Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::vec_deque::VecDeque;
23
use std::collections::{HashMap, HashSet};
24
use std::ffi::{OsStr, OsString};
25
#[cfg(target_family = "unix")]
26
use std::fs::Permissions;
27
#[cfg(target_family = "unix")]
28
use std::os::unix::fs::{MetadataExt, PermissionsExt};
29
use std::path::{Path, PathBuf};
30
use std::process::Stdio;
31
use std::sync::{Arc, Weak};
32
use std::time::SystemTime;
33
34
use bytes::{Bytes, BytesMut};
35
use filetime::{FileTime, set_file_mtime};
36
use formatx::Template;
37
use futures::future::{
38
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
39
};
40
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
41
use nativelink_config::cas_server::{
42
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
43
};
44
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
45
use nativelink_metric::MetricsComponent;
46
use nativelink_proto::build::bazel::remote::execution::v2::{
47
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
48
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
49
    Tree as ProtoTree, UpdateActionResultRequest,
50
};
51
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
52
    HistoricalExecuteResponse, StartExecute,
53
};
54
use nativelink_store::ac_utils::{
55
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
56
};
57
use nativelink_store::fast_slow_store::FastSlowStore;
58
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
59
use nativelink_store::grpc_store::GrpcStore;
60
use nativelink_util::action_messages::{
61
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
62
    SymlinkInfo, to_execute_response,
63
};
64
use nativelink_util::common::{DigestInfo, fs};
65
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
66
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
67
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
68
use nativelink_util::{background_spawn, spawn, spawn_blocking};
69
use parking_lot::Mutex;
70
use prost::Message;
71
use relative_path::RelativePath;
72
use scopeguard::{ScopeGuard, guard};
73
use serde::Deserialize;
74
use tokio::io::{AsyncReadExt, AsyncSeekExt};
75
use tokio::process;
76
use tokio::sync::{Notify, oneshot, watch};
77
use tokio::time::Instant;
78
use tokio_stream::wrappers::ReadDirStream;
79
use tonic::Request;
80
use tracing::{debug, error, info, trace, warn};
81
use uuid::Uuid;
82
83
/// For simplicity we use a fixed exit code for cases when our program is terminated
84
/// due to a signal.
85
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
86
87
/// Default strategy for uploading historical results.
88
/// Note: If this value changes the config documentation
89
/// should reflect it.
90
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
91
    UploadCacheResultsStrategy::FailuresOnly;
92
93
/// Valid string reasons for a failure.
94
/// Note: If these change, the documentation should be updated.
95
#[derive(Debug, Deserialize)]
96
#[serde(rename_all = "snake_case")]
97
enum SideChannelFailureReason {
98
    /// Task should be considered timed out.
99
    Timeout,
100
}
101
102
/// This represents the json data that can be passed from the running process
103
/// to the parent via the `SideChannelFile`. See:
104
/// `config::EnvironmentSource::sidechannelfile` for more details.
105
/// Note: Any fields added here must be added to the documentation.
106
#[derive(Debug, Deserialize, Default)]
107
struct SideChannelInfo {
108
    /// If the task should be considered a failure and why.
109
    failure: Option<SideChannelFailureReason>,
110
}
111
112
/// Aggressively download the digests of files and make a local folder from it. This function
113
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
114
/// should be rate limited if spawning too many requests at once is an issue.
115
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
116
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
117
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
118
/// to a new location.
119
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
120
// of the future. So we need to force this function to return a dynamic future instead.
121
// see: https://github.com/rust-lang/rust/issues/78649
122
25
pub fn download_to_directory<'a>(
123
25
    cas_store: &'a FastSlowStore,
124
25
    filesystem_store: Pin<&'a FilesystemStore>,
125
25
    digest: &'a DigestInfo,
126
25
    current_directory: &'a str,
127
25
) -> BoxFuture<'a, Result<(), Error>> {
128
25
    async move {
129
25
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
130
25
            .await
131
25
            .err_tip(|| "Converting digest to Directory")
?0
;
132
25
        let mut futures = FuturesUnordered::new();
133
134
29
        for 
file4
in directory.files {
135
4
            let digest: DigestInfo = file
136
4
                .digest
137
4
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
138
4
                .try_into()
139
4
                .err_tip(|| "In Directory::file::digest")
?0
;
140
4
            let dest = format!("{}/{}", current_directory, file.name);
141
4
            let (mtime, mut unix_mode) = match file.node_properties {
142
1
                Some(properties) => (properties.mtime, properties.unix_mode),
143
3
                None => (None, None),
144
            };
145
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
146
4
            if file.is_executable {
  Branch (146:16): [True: 1, False: 3]
  Branch (146:16): [Folded - Ignored]
147
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
148
3
            }
149
4
            futures.push(
150
4
                cas_store
151
4
                    .populate_fast_store(digest.into())
152
4
                    .and_then(move |()| async move {
153
4
                        let file_entry = filesystem_store
154
4
                            .get_file_entry_for_digest(&digest)
155
4
                            .await
156
4
                            .err_tip(|| "During hard link")
?0
;
157
4
                        file_entry
158
4
                            .get_file_path_locked(|src| fs::hard_link(src, &dest))
159
4
                            .await
160
4
                            .map_err(|e| 
{0
161
0
                                if e.code == Code::NotFound {
  Branch (161:36): [True: 0, False: 0]
  Branch (161:36): [Folded - Ignored]
162
0
                                    make_err!(
163
0
                                        Code::Internal,
164
                                        "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\
165
                                        This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
166
                                        To fix this issue:\n\
167
                                        1. Increase the 'max_bytes' value in your filesystem store configuration\n\
168
                                        2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
169
                                        3. The setting is typically found in your nativelink.json config under:\n\
170
                                           stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
171
                                        4. Restart NativeLink after making the change\n\n\
172
                                        If this error persists after increasing max_bytes several times, please report at:\n\
173
                                        https://github.com/TraceMachina/nativelink/issues\n\
174
                                        Include your config file and both server and client logs to help us assist you."
175
                                    )
176
                                } else {
177
0
                                    make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
178
                                }
179
0
                            })?;
180
                        #[cfg(target_family = "unix")]
181
4
                        if let Some(
unix_mode1
) = unix_mode {
  Branch (181:32): [True: 1, False: 3]
  Branch (181:32): [Folded - Ignored]
182
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
183
1
                                .await
184
1
                                .err_tip(|| 
{0
185
0
                                    format!(
186
0
                                        "Could not set unix mode in download_to_directory {dest}"
187
                                    )
188
0
                                })?;
189
3
                        }
190
4
                        if let Some(
mtime1
) = mtime {
  Branch (190:32): [True: 1, False: 3]
  Branch (190:32): [Folded - Ignored]
191
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
192
1
                                set_file_mtime(
193
1
                                    &dest,
194
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
195
                                )
196
1
                                .err_tip(|| 
{0
197
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
198
0
                                })
199
1
                            })
200
1
                            .await
201
1
                            .err_tip(
202
                                || "Failed to launch spawn_blocking in download_to_directory",
203
0
                            )??;
204
3
                        }
205
4
                        Ok(())
206
8
                    })
207
4
                    .map_err(move |e| 
e0
.
append0
(
format!0
(
"for digest {digest}"0
)))
208
4
                    .boxed(),
209
            );
210
        }
211
212
32
        for 
directory7
in directory.directories {
213
7
            let digest: DigestInfo = directory
214
7
                .digest
215
7
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
216
7
                .try_into()
217
7
                .err_tip(|| "In Directory::file::digest")
?0
;
218
7
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
219
7
            futures.push(
220
7
                async move {
221
7
                    fs::create_dir(&new_directory_path)
222
7
                        .await
223
7
                        .err_tip(|| format!(
"Could not create directory {new_directory_path}"0
))
?0
;
224
7
                    download_to_directory(
225
7
                        cas_store,
226
7
                        filesystem_store,
227
7
                        &digest,
228
7
                        &new_directory_path,
229
7
                    )
230
7
                    .await
231
7
                    .err_tip(|| format!(
"in download_to_directory : {new_directory_path}"0
))
?0
;
232
7
                    Ok(())
233
7
                }
234
7
                .boxed(),
235
            );
236
        }
237
238
        #[cfg(target_family = "unix")]
239
26
        for 
symlink_node1
in directory.symlinks {
240
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
241
1
            futures.push(
242
1
                async move {
243
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
244
0
                        format!(
245
0
                            "Could not create symlink {} -> {}",
246
                            symlink_node.target, dest
247
                        )
248
0
                    })?;
249
1
                    Ok(())
250
1
                }
251
1
                .boxed(),
252
            );
253
        }
254
255
37
        while futures.try_next().await
?0
.is_some()
{}12
  Branch (255:15): [True: 12, False: 25]
  Branch (255:15): [Folded - Ignored]
256
25
        Ok(())
257
25
    }
258
25
    .boxed()
259
25
}
260
261
#[cfg(target_family = "windows")]
262
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
263
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
264
    EXECUTABLE_EXTENSIONS
265
        .iter()
266
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
267
}
268
269
#[cfg(target_family = "unix")]
270
7
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
271
7
    (metadata.mode() & 0o111) != 0
272
7
}
273
274
type DigestUploader = Arc<tokio::sync::OnceCell<()>>;
275
276
7
async fn upload_file(
277
7
    cas_store: Pin<&impl StoreLike>,
278
7
    full_path: impl AsRef<Path> + Debug + Send + Sync,
279
7
    hasher: DigestHasherFunc,
280
7
    metadata: std::fs::Metadata,
281
7
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
282
7
) -> Result<FileInfo, Error> {
283
7
    let is_executable = is_executable(&metadata, &full_path);
284
7
    let file_size = metadata.len();
285
7
    let file = fs::open_file(&full_path, 0, u64::MAX)
286
7
        .await
287
7
        .err_tip(|| format!(
"Could not open file {full_path:?}"0
))
?0
;
288
289
7
    let (digest, mut file) = hasher
290
7
        .hasher()
291
7
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
292
7
        .await
293
7
        .err_tip(|| format!(
"Failed to hash file in digest_for_file failed for {full_path:?}"0
))
?0
;
294
295
7
    let digest_uploader = match digest_uploaders.lock().entry(digest) {
296
0
        std::collections::hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
297
7
        std::collections::hash_map::Entry::Vacant(vacant_entry) => vacant_entry
298
7
            .insert(Arc::new(tokio::sync::OnceCell::new()))
299
7
            .clone(),
300
    };
301
302
    // Only upload a file with a given hash once.  The file may exist multiple
303
    // times in the output with different names.
304
7
    digest_uploader
305
14
        .
get_or_try_init7
(async || {
306
            // Only upload if the digest doesn't already exist, this should be
307
            // a much cheaper operation than an upload.
308
7
            let cas_store = cas_store.as_store_driver_pin();
309
7
            let store_key: nativelink_util::store_trait::StoreKey<'_> = digest.into();
310
7
            if cas_store
  Branch (310:16): [Folded - Ignored]
  Branch (310:16): [Folded - Ignored]
  Branch (310:16): [True: 1, False: 2]
  Branch (310:16): [True: 1, False: 3]
311
7
                .has(store_key.borrow())
312
7
                .await
313
7
                .is_ok_and(|result| result.is_some())
314
            {
315
2
                return Ok(());
316
5
            }
317
318
5
            file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
319
320
            // Note: For unknown reasons we appear to be hitting:
321
            // https://github.com/rust-lang/rust/issues/92096
322
            // or a smiliar issue if we try to use the non-store driver function, so we
323
            // are using the store driver function here.
324
5
            cas_store
325
5
                .update_with_whole_file(
326
5
                    store_key,
327
5
                    full_path.as_ref().into(),
328
5
                    file,
329
5
                    UploadSizeInfo::ExactSize(digest.size_bytes()),
330
5
                )
331
5
                .await
332
5
                .map(|_slot| ())
333
14
        })
334
7
        .await
335
7
        .err_tip(|| format!(
"for {full_path:?}"0
))
?0
;
336
337
7
    let name = full_path
338
7
        .as_ref()
339
7
        .file_name()
340
7
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
341
7
        .to_str()
342
7
        .err_tip(|| 
{0
343
0
            make_err!(
344
0
                Code::Internal,
345
                "Could not convert {:?} to string",
346
                full_path
347
            )
348
0
        })?
349
7
        .to_string();
350
351
7
    Ok(FileInfo {
352
7
        name_or_path: NameOrPath::Name(name),
353
7
        digest,
354
7
        is_executable,
355
7
    })
356
7
}
357
358
2
async fn upload_symlink(
359
2
    full_path: impl AsRef<Path> + Debug,
360
2
    full_work_directory_path: impl AsRef<Path>,
361
2
) -> Result<SymlinkInfo, Error> {
362
2
    let full_target_path = fs::read_link(full_path.as_ref())
363
2
        .await
364
2
        .err_tip(|| format!(
"Could not get read_link path of {full_path:?}"0
))
?0
;
365
366
    // Detect if our symlink is inside our work directory, if it is find the
367
    // relative path otherwise use the absolute path.
368
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
  Branch (368:21): [Folded - Ignored]
  Branch (368:21): [Folded - Ignored]
  Branch (368:21): [True: 0, False: 1]
  Branch (368:21): [True: 0, False: 1]
369
0
        let full_target_path = RelativePath::from_path(&full_target_path)
370
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
371
0
        RelativePath::from_path(full_work_directory_path.as_ref())
372
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
373
0
            .relative(full_target_path)
374
0
            .normalize()
375
0
            .into_string()
376
    } else {
377
2
        full_target_path
378
2
            .to_str()
379
2
            .err_tip(|| 
{0
380
0
                make_err!(
381
0
                    Code::Internal,
382
                    "Could not convert '{:?}' to string",
383
                    full_target_path
384
                )
385
0
            })?
386
2
            .to_string()
387
    };
388
389
2
    let name = full_path
390
2
        .as_ref()
391
2
        .file_name()
392
2
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
393
2
        .to_str()
394
2
        .err_tip(|| 
{0
395
0
            make_err!(
396
0
                Code::Internal,
397
                "Could not convert {:?} to string",
398
                full_path
399
            )
400
0
        })?
401
2
        .to_string();
402
403
2
    Ok(SymlinkInfo {
404
2
        name_or_path: NameOrPath::Name(name),
405
2
        target,
406
2
    })
407
2
}
408
409
3
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
410
3
    cas_store: Pin<&'a impl StoreLike>,
411
3
    full_dir_path: P,
412
3
    full_work_directory: &'a str,
413
3
    hasher: DigestHasherFunc,
414
3
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
415
3
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
416
3
    Box::pin(async move {
417
3
        let file_futures = FuturesUnordered::new();
418
3
        let dir_futures = FuturesUnordered::new();
419
3
        let symlink_futures = FuturesUnordered::new();
420
        {
421
3
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
422
3
                .await
423
3
                .err_tip(|| format!(
"Error reading dir for reading {full_dir_path:?}"0
))
?0
424
3
                .into_inner();
425
3
            let mut dir_stream = ReadDirStream::new(dir_handle);
426
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
427
            // lived as possible. This is why we iterate the directory and then build a bunch of
428
            // futures with all the work we are wanting to do then execute it. It allows us to
429
            // close the directory iterator file descriptor, then open the child files/folders.
430
8
            while let Some(
entry_result5
) = dir_stream.next().await {
  Branch (430:23): [Folded - Ignored]
  Branch (430:23): [Folded - Ignored]
  Branch (430:23): [True: 1, False: 1]
  Branch (430:23): [True: 4, False: 2]
431
5
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
432
5
                let file_type = entry
433
5
                    .file_type()
434
5
                    .await
435
5
                    .err_tip(|| format!(
"Error running file_type() on {entry:?}"0
))
?0
;
436
5
                let full_path = full_dir_path.as_ref().join(entry.path());
437
5
                if file_type.is_dir() {
  Branch (437:20): [Folded - Ignored]
  Branch (437:20): [Folded - Ignored]
  Branch (437:20): [True: 0, False: 1]
  Branch (437:20): [True: 1, False: 3]
438
1
                    let full_dir_path = full_dir_path.clone();
439
1
                    dir_futures.push(
440
1
                        upload_directory(
441
1
                            cas_store,
442
1
                            full_path.clone(),
443
1
                            full_work_directory,
444
1
                            hasher,
445
1
                            digest_uploaders.clone(),
446
                        )
447
1
                        .and_then(|(dir, all_dirs)| async move {
448
1
                            let directory_name = full_path
449
1
                                .file_name()
450
1
                                .err_tip(|| 
{0
451
0
                                    format!("Expected file_name to exist on {full_dir_path:?}")
452
0
                                })?
453
1
                                .to_str()
454
1
                                .err_tip(|| 
{0
455
0
                                    make_err!(
456
0
                                        Code::Internal,
457
                                        "Could not convert {:?} to string",
458
                                        full_dir_path
459
                                    )
460
0
                                })?
461
1
                                .to_string();
462
463
1
                            let digest =
464
1
                                serialize_and_upload_message(&dir, cas_store, &mut hasher.hasher())
465
1
                                    .await
466
1
                                    .err_tip(|| format!(
"for {}"0
,
full_path.display()0
))
?0
;
467
468
1
                            Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
469
1
                                DirectoryNode {
470
1
                                    name: directory_name,
471
1
                                    digest: Some(digest.into()),
472
1
                                },
473
1
                                all_dirs,
474
1
                            ))
475
2
                        })
476
1
                        .boxed(),
477
                    );
478
4
                } else if file_type.is_file() {
  Branch (478:27): [Folded - Ignored]
  Branch (478:27): [Folded - Ignored]
  Branch (478:27): [True: 0, False: 1]
  Branch (478:27): [True: 3, False: 0]
479
3
                    let digest_uploaders = digest_uploaders.clone();
480
3
                    file_futures.push(async move {
481
3
                        let metadata = fs::metadata(&full_path)
482
3
                            .await
483
3
                            .err_tip(|| format!(
"Could not open file {}"0
,
full_path.display()0
))
?0
;
484
3
                        upload_file(cas_store, &full_path, hasher, metadata, digest_uploaders)
485
3
                            .map_ok(TryInto::try_into)
486
3
                            .await
?0
487
3
                    });
488
1
                } else if file_type.is_symlink() {
  Branch (488:27): [Folded - Ignored]
  Branch (488:27): [Folded - Ignored]
  Branch (488:27): [True: 1, False: 0]
  Branch (488:27): [True: 0, False: 0]
489
1
                    symlink_futures.push(
490
1
                        upload_symlink(full_path, &full_work_directory)
491
1
                            .map(|symlink| symlink
?0
.try_into()),
492
                    );
493
0
                }
494
            }
495
        }
496
497
3
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
498
3
            file_futures.try_collect::<Vec<FileNode>>(),
499
3
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
500
3
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
501
3
        )
502
3
        .await
?0
;
503
504
3
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
505
        // For efficiency we use a deque because it allows cheap concat of Vecs.
506
        // We make the assumption here that when performance is important it is because
507
        // our directory is quite large. This allows us to cheaply merge large amounts of
508
        // directories into one VecDeque. Then after we are done we need to collapse it
509
        // down into a single Vec.
510
3
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
511
4
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
512
1
            directory_nodes.push(directory_node);
513
1
            all_child_directories.append(&mut recursive_child_directories);
514
1
        }
515
516
3
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
517
3
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
518
3
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
519
520
3
        let directory = Directory {
521
3
            files: file_nodes,
522
3
            directories: directory_nodes,
523
3
            symlinks,
524
3
            node_properties: None, // We don't support file properties.
525
3
        };
526
3
        all_child_directories.push_back(directory.clone());
527
528
3
        Ok((directory, all_child_directories))
529
3
    })
530
3
}
531
532
0
async fn process_side_channel_file(
533
0
    side_channel_file: Cow<'_, OsStr>,
534
0
    args: &[&OsStr],
535
0
    timeout: Duration,
536
0
) -> Result<Option<Error>, Error> {
537
0
    let mut json_contents = String::new();
538
    {
539
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
540
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
541
0
            Ok(file_slot) => file_slot,
542
0
            Err(e) => {
543
0
                if e.code != Code::NotFound {
  Branch (543:20): [Folded - Ignored]
  Branch (543:20): [Folded - Ignored]
  Branch (543:20): [True: 0, False: 0]
544
0
                    return Err(e).err_tip(|| "Error opening side channel file");
545
0
                }
546
                // Note: If file does not exist, it's ok. Users are not required to create this file.
547
0
                return Ok(None);
548
            }
549
        };
550
0
        file_slot
551
0
            .read_to_string(&mut json_contents)
552
0
            .await
553
0
            .err_tip(|| "Error reading side channel file")?;
554
    }
555
556
0
    let side_channel_info: SideChannelInfo =
557
0
        serde_json5::from_str(&json_contents).map_err(|e| {
558
0
            make_input_err!(
559
                "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}"
560
            )
561
0
        })?;
562
0
    Ok(side_channel_info.failure.map(|failure| match failure {
563
0
        SideChannelFailureReason::Timeout => Error::new(
564
0
            Code::DeadlineExceeded,
565
0
            format!(
566
0
                "Command '{}' timed out after {} seconds",
567
0
                args.join(OsStr::new(" ")).to_string_lossy(),
568
0
                timeout.as_secs_f32()
569
            ),
570
        ),
571
0
    }))
572
0
}
573
574
17
async fn do_cleanup(
575
17
    running_actions_manager: &Arc<RunningActionsManagerImpl>,
576
17
    operation_id: &OperationId,
577
17
    action_directory: &str,
578
17
) -> Result<(), Error> {
579
    // Mark this operation as being cleaned up
580
17
    let Some(_cleaning_guard) = running_actions_manager.perform_cleanup(operation_id.clone())
  Branch (580:9): [True: 0, False: 0]
  Branch (580:9): [Folded - Ignored]
  Branch (580:9): [True: 17, False: 0]
581
    else {
582
        // Cleanup is already happening elsewhere.
583
0
        return Ok(());
584
    };
585
586
17
    debug!("Worker cleaning up");
587
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
588
17
    let remove_dir_result = fs::remove_dir_all(action_directory)
589
17
        .await
590
17
        .err_tip(|| format!(
"Could not remove working directory {action_directory}"0
));
591
592
17
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
  Branch (592:12): [True: 0, False: 0]
  Branch (592:12): [Folded - Ignored]
  Branch (592:12): [True: 0, False: 17]
593
0
        error!(?operation_id, ?err, "Error cleaning up action");
594
0
        Result::<(), Error>::Err(err).merge(remove_dir_result)
595
17
    } else if let Err(
err0
) = remove_dir_result {
  Branch (595:19): [True: 0, False: 0]
  Branch (595:19): [Folded - Ignored]
  Branch (595:19): [True: 0, False: 17]
596
0
        error!(?operation_id, ?err, "Error removing working directory");
597
0
        Err(err)
598
    } else {
599
17
        Ok(())
600
    }
601
17
}
602
603
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
604
    /// Returns the action id of the action.
605
    fn get_operation_id(&self) -> &OperationId;
606
607
    /// Anything that needs to execute before the actions is actually executed should happen here.
608
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
609
610
    /// Actually perform the execution of the action.
611
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
612
613
    /// Any uploading, processing or analyzing of the results should happen here.
614
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
615
616
    /// Cleanup any residual files, handles or other junk resulting from running the action.
617
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
618
619
    /// Returns the final result. As a general rule this action should be thought of as
620
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
621
    /// is over and any action performed on it after this call is undefined behavior.
622
    fn get_finished_result(
623
        self: Arc<Self>,
624
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
625
626
    /// Returns the work directory of the action.
627
    fn get_work_directory(&self) -> &String;
628
}
629
630
#[derive(Debug)]
631
struct RunningActionImplExecutionResult {
632
    stdout: Bytes,
633
    stderr: Bytes,
634
    exit_code: i32,
635
}
636
637
#[derive(Debug)]
638
struct RunningActionImplState {
639
    command_proto: Option<ProtoCommand>,
640
    // TODO(palfrey) Kill is not implemented yet, but is instrumented.
641
    // However, it is used if the worker disconnects to destroy current jobs.
642
    kill_channel_tx: Option<oneshot::Sender<()>>,
643
    kill_channel_rx: Option<oneshot::Receiver<()>>,
644
    execution_result: Option<RunningActionImplExecutionResult>,
645
    action_result: Option<ActionResult>,
646
    execution_metadata: ExecutionMetadata,
647
    // If there was an internal error, this will be set.
648
    // This should NOT be set if everything was fine, but the process had a
649
    // non-zero exit code. Instead this should be used for internal errors
650
    // that prevented the action from running, upload failures, timeouts, exc...
651
    // but we have (or could have) the action results (like stderr/stdout).
652
    error: Option<Error>,
653
}
654
655
#[derive(Debug)]
656
pub struct RunningActionImpl {
657
    operation_id: OperationId,
658
    action_directory: String,
659
    work_directory: String,
660
    action_info: ActionInfo,
661
    timeout: Duration,
662
    running_actions_manager: Arc<RunningActionsManagerImpl>,
663
    state: Mutex<RunningActionImplState>,
664
    has_manager_entry: AtomicBool,
665
    did_cleanup: AtomicBool,
666
}
667
668
impl RunningActionImpl {
669
18
    fn new(
670
18
        execution_metadata: ExecutionMetadata,
671
18
        operation_id: OperationId,
672
18
        action_directory: String,
673
18
        action_info: ActionInfo,
674
18
        timeout: Duration,
675
18
        running_actions_manager: Arc<RunningActionsManagerImpl>,
676
18
    ) -> Self {
677
18
        let work_directory = format!("{}/{}", action_directory, "work");
678
18
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
679
18
        Self {
680
18
            operation_id,
681
18
            action_directory,
682
18
            work_directory,
683
18
            action_info,
684
18
            timeout,
685
18
            running_actions_manager,
686
18
            state: Mutex::new(RunningActionImplState {
687
18
                command_proto: None,
688
18
                kill_channel_rx: Some(kill_channel_rx),
689
18
                kill_channel_tx: Some(kill_channel_tx),
690
18
                execution_result: None,
691
18
                action_result: None,
692
18
                execution_metadata,
693
18
                error: None,
694
18
            }),
695
18
            // Always need to ensure that we're removed from the manager on Drop.
696
18
            has_manager_entry: AtomicBool::new(true),
697
18
            // Only needs to be cleaned up after a prepare_action call, set there.
698
18
            did_cleanup: AtomicBool::new(true),
699
18
        }
700
18
    }
701
702
    #[allow(
703
        clippy::missing_const_for_fn,
704
        reason = "False positive on stable, but not on nightly"
705
    )]
706
0
    fn metrics(&self) -> &Arc<Metrics> {
707
0
        &self.running_actions_manager.metrics
708
0
    }
709
710
    /// Prepares any actions needed to execution this action. This action will do the following:
711
    ///
712
    /// * Download any files needed to execute the action
713
    /// * Build a folder with all files needed to execute the action.
714
    ///
715
    /// This function will aggressively download and spawn potentially thousands of futures. It is
716
    /// up to the stores to rate limit if needed.
717
15
    
async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
718
15
        {
719
15
            let mut state = self.state.lock();
720
15
            state.execution_metadata.input_fetch_start_timestamp =
721
15
                (self.running_actions_manager.callbacks.now_fn)();
722
15
        }
723
15
        let command = {
724
            // Download and build out our input files/folders. Also fetch and decode our Command.
725
15
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
726
15
                get_and_decode_digest::<ProtoCommand>(
727
15
                    self.running_actions_manager.cas_store.as_ref(),
728
15
                    self.action_info.command_digest.into(),
729
15
                )
730
15
                .await
731
15
                .err_tip(|| "Converting command_digest to Command")
732
15
            });
733
15
            let filesystem_store_pin =
734
15
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
735
15
            let (command, ()) = try_join(command_fut, async {
736
15
                fs::create_dir(&self.work_directory)
737
15
                    .await
738
15
                    .err_tip(|| format!(
"Error creating work directory {}"0
,
self.work_directory0
))
?0
;
739
                // Now the work directory has been created, we have to clean up.
740
15
                self.did_cleanup.store(false, Ordering::Release);
741
                // Download the input files/folder and place them into the temp directory.
742
15
                self.metrics()
743
15
                    .download_to_directory
744
15
                    .wrap(download_to_directory(
745
15
                        &self.running_actions_manager.cas_store,
746
15
                        filesystem_store_pin,
747
15
                        &self.action_info.input_root_digest,
748
15
                        &self.work_directory,
749
15
                    ))
750
15
                    .await
751
15
            })
752
15
            .await
?0
;
753
15
            command
754
        };
755
        {
756
            // Create all directories needed for our output paths. This is required by the bazel spec.
757
15
            let prepare_output_directories = |output_file| 
{9
758
9
                let full_output_path = if command.working_directory.is_empty() {
  Branch (758:43): [Folded - Ignored]
  Branch (758:43): [Folded - Ignored]
  Branch (758:43): [True: 1, False: 8]
759
1
                    format!("{}/{}", self.work_directory, output_file)
760
                } else {
761
8
                    format!(
762
8
                        "{}/{}/{}",
763
8
                        self.work_directory, command.working_directory, output_file
764
                    )
765
                };
766
9
                async move {
767
9
                    let full_parent_path = Path::new(&full_output_path)
768
9
                        .parent()
769
9
                        .err_tip(|| format!(
"Parent path for {full_output_path} has no parent"0
))
?0
;
770
9
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
771
0
                        format!(
772
0
                            "Error creating output directory {} (file)",
773
0
                            full_parent_path.display()
774
                        )
775
0
                    })?;
776
9
                    Result::<(), Error>::Ok(())
777
9
                }
778
9
            };
779
15
            self.metrics()
780
15
                .prepare_output_files
781
15
                .wrap(try_join_all(
782
15
                    command.output_files.iter().map(prepare_output_directories),
783
15
                ))
784
15
                .await
?0
;
785
15
            self.metrics()
786
15
                .prepare_output_paths
787
15
                .wrap(try_join_all(
788
15
                    command.output_paths.iter().map(prepare_output_directories),
789
15
                ))
790
15
                .await
?0
;
791
        }
792
15
        debug!(?command, "Worker received command");
793
15
        {
794
15
            let mut state = self.state.lock();
795
15
            state.command_proto = Some(command);
796
15
            state.execution_metadata.input_fetch_completed_timestamp =
797
15
                (self.running_actions_manager.callbacks.now_fn)();
798
15
        }
799
15
        Ok(self)
800
15
    }
801
802
13
    
async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
803
13
        let (command_proto, mut kill_channel_rx) = {
804
13
            let mut state = self.state.lock();
805
13
            state.execution_metadata.execution_start_timestamp =
806
13
                (self.running_actions_manager.callbacks.now_fn)();
807
            (
808
13
                state
809
13
                    .command_proto
810
13
                    .take()
811
13
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
812
13
                state
813
13
                    .kill_channel_rx
814
13
                    .take()
815
13
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
816
                    // This is important as we may be killed at any point.
817
13
                    .fuse(),
818
            )
819
        };
820
13
        if command_proto.arguments.is_empty() {
  Branch (820:12): [Folded - Ignored]
  Branch (820:12): [Folded - Ignored]
  Branch (820:12): [True: 0, False: 13]
821
0
            return Err(make_input_err!("No arguments provided in Command proto"));
822
13
        }
823
13
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
  Branch (823:40): [Folded - Ignored]
  Branch (823:40): [Folded - Ignored]
  Branch (823:40): [True: 0, False: 13]
824
13
            .running_actions_manager
825
13
            .execution_configuration
826
13
            .entrypoint
827
        {
828
0
            core::iter::once(entrypoint.as_ref())
829
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
830
0
                .collect()
831
        } else {
832
13
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
833
        };
834
        // TODO(palfrey): This should probably be in debug, but currently
835
        //                    that's too busy and we often rely on this to
836
        //                    figure out toolchain misconfiguration issues.
837
        //                    De-bloat the `debug` level by using the `trace`
838
        //                    level more effectively and adjust this.
839
13
        info!(?args, "Executing command",);
840
13
        let mut command_builder = process::Command::new(args[0]);
841
13
        command_builder
842
13
            .args(&args[1..])
843
13
            .kill_on_drop(true)
844
13
            .stdin(Stdio::null())
845
13
            .stdout(Stdio::piped())
846
13
            .stderr(Stdio::piped())
847
13
            .current_dir(format!(
848
13
                "{}/{}",
849
13
                self.work_directory, command_proto.working_directory
850
13
            ))
851
13
            .env_clear();
852
853
13
        let requested_timeout = if self.action_info.timeout.is_zero() {
  Branch (853:36): [Folded - Ignored]
  Branch (853:36): [Folded - Ignored]
  Branch (853:36): [True: 11, False: 2]
854
11
            self.running_actions_manager.max_action_timeout
855
        } else {
856
2
            self.action_info.timeout
857
        };
858
859
13
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
860
13
        if let Some(
additional_environment0
) = &self
  Branch (860:16): [Folded - Ignored]
  Branch (860:16): [Folded - Ignored]
  Branch (860:16): [True: 0, False: 13]
861
13
            .running_actions_manager
862
13
            .execution_configuration
863
13
            .additional_environment
864
        {
865
0
            for (name, source) in additional_environment {
866
0
                let value = match source {
867
0
                    EnvironmentSource::Property(property) => self
868
0
                        .action_info
869
0
                        .platform_properties
870
0
                        .get(property)
871
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
872
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
873
                    EnvironmentSource::TimeoutMillis => {
874
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
875
                    }
876
                    EnvironmentSource::SideChannelFile => {
877
0
                        let file_cow =
878
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
879
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
880
0
                        Cow::Owned(file_cow)
881
                    }
882
                    EnvironmentSource::ActionDirectory => {
883
0
                        Cow::Borrowed(self.action_directory.as_str())
884
                    }
885
                };
886
0
                command_builder.env(name, value.as_ref());
887
            }
888
13
        }
889
890
        #[cfg(target_family = "unix")]
891
13
        let envs = &command_proto.environment_variables;
892
        // If SystemRoot is not set on windows we set it to default. Failing to do
893
        // this causes all commands to fail.
894
        #[cfg(target_family = "windows")]
895
        let envs = {
896
            let mut envs = command_proto.environment_variables.clone();
897
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
898
                envs.push(
899
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
900
                        name: "SystemRoot".to_string(),
901
                        value: "C:\\Windows".to_string(),
902
                    },
903
                );
904
            }
905
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
906
                envs.push(
907
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
908
                        name: "PATH".to_string(),
909
                        value: "C:\\Windows\\System32".to_string(),
910
                    },
911
                );
912
            }
913
            envs
914
        };
915
26
        for 
environment_variable13
in envs {
916
13
            command_builder.env(&environment_variable.name, &environment_variable.value);
917
13
        }
918
919
13
        let mut child_process = command_builder
920
13
            .spawn()
921
13
            .err_tip(|| format!(
"Could not execute command {args:?}"0
))
?0
;
922
13
        let mut stdout_reader = child_process
923
13
            .stdout
924
13
            .take()
925
13
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
926
13
        let mut stderr_reader = child_process
927
13
            .stderr
928
13
            .take()
929
13
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
930
931
13
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
932
0
            if child_process.try_wait().is_ok_and(|res| res.is_some()) {
  Branch (932:16): [Folded - Ignored]
  Branch (932:16): [Folded - Ignored]
  Branch (932:16): [True: 0, False: 0]
933
                // The child already exited, probably a timeout or kill operation.
934
0
                return;
935
0
            }
936
0
            error!(
937
0
                "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
938
            );
939
0
            background_spawn!("running_actions_manager_kill_child_process", async move {
940
0
                child_process.kill().await
941
0
            });
942
0
        });
943
944
13
        let all_stdout_fut = spawn!("stdout_reader", async move {
945
13
            let mut all_stdout = BytesMut::new();
946
            loop {
947
16
                let 
sz15
= stdout_reader
948
16
                    .read_buf(&mut all_stdout)
949
16
                    .await
950
15
                    .err_tip(|| "Error reading stdout stream")
?0
;
951
15
                if sz == 0 {
  Branch (951:20): [Folded - Ignored]
  Branch (951:20): [Folded - Ignored]
  Branch (951:20): [True: 12, False: 3]
952
12
                    break; // EOF.
953
3
                }
954
            }
955
12
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
956
12
        });
957
13
        let all_stderr_fut = spawn!("stderr_reader", async move {
958
13
            let mut all_stderr = BytesMut::new();
959
            loop {
960
16
                let sz = stderr_reader
961
16
                    .read_buf(&mut all_stderr)
962
16
                    .await
963
16
                    .err_tip(|| "Error reading stderr stream")
?0
;
964
16
                if sz == 0 {
  Branch (964:20): [Folded - Ignored]
  Branch (964:20): [Folded - Ignored]
  Branch (964:20): [True: 13, False: 3]
965
13
                    break; // EOF.
966
3
                }
967
            }
968
13
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
969
13
        });
970
13
        let mut killed_action = false;
971
972
13
        let timer = self.metrics().child_process.begin_timer();
973
13
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
974
        loop {
975
17
            tokio::select! {
976
17
                () = &mut sleep_fut => {
977
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
978
2
                    killed_action = true;
979
2
                    if let Err(
err0
) = child_process_guard.kill().await {
  Branch (979:28): [Folded - Ignored]
  Branch (979:28): [Folded - Ignored]
  Branch (979:28): [True: 0, False: 2]
980
0
                        error!(
981
                            ?err,
982
0
                            "Could not kill process in RunningActionsManager for action timeout",
983
                        );
984
2
                    }
985
2
                    {
986
2
                        let mut state = self.state.lock();
987
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
988
2
                            Code::DeadlineExceeded,
989
2
                            format!(
990
2
                                "Command '{}' timed out after {} seconds",
991
2
                                args.join(OsStr::new(" ")).to_string_lossy(),
992
2
                                self.action_info.timeout.as_secs_f32()
993
2
                            )
994
2
                        )));
995
2
                    }
996
                },
997
17
                
maybe_exit_status13
= child_process_guard.wait() => {
998
                    // Defuse our guard so it does not try to cleanup and make nessless logs.
999
13
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
1000
13
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
1001
                    // TODO(palfrey) We should implement stderr/stdout streaming to client here.
1002
                    // If we get killed before the stream is started, then these will lock up.
1003
                    // TODO(palfrey) There is a significant bug here. If we kill the action and the action creates
1004
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
1005
13
                    let (stdout, stderr) = if killed_action {
  Branch (1005:47): [Folded - Ignored]
  Branch (1005:47): [Folded - Ignored]
  Branch (1005:47): [True: 4, False: 9]
1006
4
                        drop(timer);
1007
4
                        (Bytes::new(), Bytes::new())
1008
                    } else {
1009
9
                        timer.measure();
1010
9
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
1011
                        (
1012
9
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
1013
9
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
1014
                        )
1015
                    };
1016
1017
13
                    let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| 
{9
1018
9
                        if exit_code == 0 {
  Branch (1018:28): [Folded - Ignored]
  Branch (1018:28): [Folded - Ignored]
  Branch (1018:28): [True: 8, False: 1]
1019
8
                            self.metrics().child_process_success_error_code.inc();
1020
8
                        } else {
1021
1
                            self.metrics().child_process_failure_error_code.inc();
1022
1
                        }
1023
9
                        exit_code
1024
9
                    });
1025
1026
13
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
  Branch (1026:55): [Folded - Ignored]
  Branch (1026:55): [Folded - Ignored]
  Branch (1026:55): [True: 0, False: 13]
1027
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
1028
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
1029
                    } else {
1030
13
                        None
1031
                    };
1032
13
                    {
1033
13
                        let mut state = self.state.lock();
1034
13
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
1035
13
1036
13
                        state.command_proto = Some(command_proto);
1037
13
                        state.execution_result = Some(RunningActionImplExecutionResult{
1038
13
                            stdout,
1039
13
                            stderr,
1040
13
                            exit_code,
1041
13
                        });
1042
13
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
1043
13
                    }
1044
13
                    return Ok(self);
1045
                },
1046
17
                _ = &mut kill_channel_rx => {
1047
2
                    killed_action = true;
1048
2
                    if let Err(
err0
) = child_process_guard.kill().await {
  Branch (1048:28): [Folded - Ignored]
  Branch (1048:28): [Folded - Ignored]
  Branch (1048:28): [True: 0, False: 2]
1049
0
                        error!(
1050
0
                            operation_id = ?self.operation_id,
1051
                            ?err,
1052
0
                            "Could not kill process",
1053
                        );
1054
2
                    }
1055
2
                    {
1056
2
                        let mut state = self.state.lock();
1057
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1058
2
                            Code::Aborted,
1059
2
                            format!(
1060
2
                                "Command '{}' was killed by scheduler",
1061
2
                                args.join(OsStr::new(" ")).to_string_lossy()
1062
2
                            )
1063
2
                        )));
1064
2
                    }
1065
                },
1066
            }
1067
        }
1068
        // Unreachable.
1069
13
    }
1070
1071
11
    async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1072
        enum OutputType {
1073
            None,
1074
            File(FileInfo),
1075
            Directory(DirectoryInfo),
1076
            FileSymlink(SymlinkInfo),
1077
            DirectorySymlink(SymlinkInfo),
1078
        }
1079
1080
11
        debug!("Worker uploading results",);
1081
11
        let (mut command_proto, execution_result, mut execution_metadata) = {
1082
11
            let mut state = self.state.lock();
1083
11
            state.execution_metadata.output_upload_start_timestamp =
1084
11
                (self.running_actions_manager.callbacks.now_fn)();
1085
            (
1086
11
                state
1087
11
                    .command_proto
1088
11
                    .take()
1089
11
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1090
11
                state
1091
11
                    .execution_result
1092
11
                    .take()
1093
11
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1094
11
                state.execution_metadata.clone(),
1095
            )
1096
        };
1097
11
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1098
11
        let hasher = self.action_info.unique_qualifier.digest_function();
1099
1100
11
        let mut output_path_futures = FuturesUnordered::new();
1101
11
        let mut output_paths = command_proto.output_paths;
1102
11
        if output_paths.is_empty() {
  Branch (1102:12): [Folded - Ignored]
  Branch (1102:12): [Folded - Ignored]
  Branch (1102:12): [True: 6, False: 5]
1103
6
            output_paths
1104
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1105
6
            output_paths.append(&mut command_proto.output_files);
1106
6
            output_paths.append(&mut command_proto.output_directories);
1107
6
        
}5
1108
11
        let digest_uploaders = Arc::new(Mutex::new(HashMap::new()));
1109
18
        for 
entry7
in output_paths {
1110
7
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
  Branch (1110:47): [Folded - Ignored]
  Branch (1110:47): [Folded - Ignored]
  Branch (1110:47): [True: 0, False: 7]
1111
0
                format!("{}/{}", self.work_directory, entry)
1112
            } else {
1113
7
                format!(
1114
7
                    "{}/{}/{}",
1115
7
                    self.work_directory, command_proto.working_directory, entry
1116
                )
1117
            });
1118
7
            let work_directory = &self.work_directory;
1119
7
            let digest_uploaders = digest_uploaders.clone();
1120
7
            output_path_futures.push(async move {
1121
3
                let metadata = {
1122
7
                    let metadata = match fs::symlink_metadata(&full_path).await {
1123
7
                        Ok(file) => file,
1124
0
                        Err(e) => {
1125
0
                            if e.code == Code::NotFound {
  Branch (1125:32): [Folded - Ignored]
  Branch (1125:32): [Folded - Ignored]
  Branch (1125:32): [True: 0, False: 0]
1126
                                // In the event our output does not exist, according to the bazel remote
1127
                                // execution spec, we simply ignore it continue.
1128
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1129
0
                            }
1130
0
                            return Err(e).err_tip(|| {
1131
0
                                format!("Could not open file {}", full_path.display())
1132
0
                            });
1133
                        }
1134
                    };
1135
1136
7
                    if metadata.is_file() {
  Branch (1136:24): [Folded - Ignored]
  Branch (1136:24): [Folded - Ignored]
  Branch (1136:24): [True: 4, False: 3]
1137
                        return Ok(OutputType::File(
1138
4
                            upload_file(
1139
4
                                cas_store.as_pin(),
1140
4
                                &full_path,
1141
4
                                hasher,
1142
4
                                metadata,
1143
4
                                digest_uploaders,
1144
4
                            )
1145
4
                            .await
1146
4
                            .map(|mut file_info| {
1147
4
                                file_info.name_or_path = NameOrPath::Path(entry);
1148
4
                                file_info
1149
4
                            })
1150
4
                            .err_tip(|| format!(
"Uploading file {}"0
,
full_path.display()0
))
?0
,
1151
                        ));
1152
3
                    }
1153
3
                    metadata
1154
                };
1155
3
                if metadata.is_dir() {
  Branch (1155:20): [Folded - Ignored]
  Branch (1155:20): [Folded - Ignored]
  Branch (1155:20): [True: 2, False: 1]
1156
                    Ok(OutputType::Directory(
1157
2
                        upload_directory(
1158
2
                            cas_store.as_pin(),
1159
2
                            &full_path,
1160
2
                            work_directory,
1161
2
                            hasher,
1162
2
                            digest_uploaders,
1163
                        )
1164
2
                        .and_then(|(root_dir, children)| async move {
1165
2
                            let tree = ProtoTree {
1166
2
                                root: Some(root_dir),
1167
2
                                children: children.into(),
1168
2
                            };
1169
2
                            let tree_digest = serialize_and_upload_message(
1170
2
                                &tree,
1171
2
                                cas_store.as_pin(),
1172
2
                                &mut hasher.hasher(),
1173
2
                            )
1174
2
                            .await
1175
2
                            .err_tip(|| format!(
"While processing {entry}"0
))
?0
;
1176
2
                            Ok(DirectoryInfo {
1177
2
                                path: entry,
1178
2
                                tree_digest,
1179
2
                            })
1180
4
                        })
1181
2
                        .await
1182
2
                        .err_tip(|| format!(
"Uploading directory {}"0
,
full_path.display()0
))
?0
,
1183
                    ))
1184
1
                } else if metadata.is_symlink() {
  Branch (1184:27): [Folded - Ignored]
  Branch (1184:27): [Folded - Ignored]
  Branch (1184:27): [True: 1, False: 0]
1185
1
                    let output_symlink = upload_symlink(&full_path, work_directory)
1186
1
                        .await
1187
1
                        .map(|mut symlink_info| {
1188
1
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1189
1
                            symlink_info
1190
1
                        })
1191
1
                        .err_tip(|| format!(
"Uploading symlink {}"0
,
full_path.display()0
))
?0
;
1192
1
                    match fs::metadata(&full_path).await {
1193
1
                        Ok(metadata) => {
1194
1
                            if metadata.is_dir() {
  Branch (1194:32): [Folded - Ignored]
  Branch (1194:32): [Folded - Ignored]
  Branch (1194:32): [True: 0, False: 1]
1195
0
                                return Ok(OutputType::DirectorySymlink(output_symlink));
1196
1
                            }
1197
                            // Note: If it's anything but directory we put it as a file symlink.
1198
1
                            return Ok(OutputType::FileSymlink(output_symlink));
1199
                        }
1200
0
                        Err(e) => {
1201
0
                            if e.code != Code::NotFound {
  Branch (1201:32): [Folded - Ignored]
  Branch (1201:32): [Folded - Ignored]
  Branch (1201:32): [True: 0, False: 0]
1202
0
                                return Err(e).err_tip(|| {
1203
0
                                    format!(
1204
0
                                        "While querying target symlink metadata for {}",
1205
0
                                        full_path.display()
1206
                                    )
1207
0
                                });
1208
0
                            }
1209
                            // If the file doesn't exist, we consider it a file. Even though the
1210
                            // file doesn't exist we still need to populate an entry.
1211
0
                            return Ok(OutputType::FileSymlink(output_symlink));
1212
                        }
1213
                    }
1214
                } else {
1215
0
                    Err(make_err!(
1216
0
                        Code::Internal,
1217
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1218
0
                    ))
1219
                }
1220
7
            });
1221
        }
1222
11
        let mut output_files = vec![];
1223
11
        let mut output_folders = vec![];
1224
11
        let mut output_directory_symlinks = vec![];
1225
11
        let mut output_file_symlinks = vec![];
1226
1227
11
        if execution_result.exit_code != 0 {
  Branch (1227:12): [Folded - Ignored]
  Branch (1227:12): [Folded - Ignored]
  Branch (1227:12): [True: 5, False: 6]
1228
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1229
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1230
5
            error!(
1231
                exit_code = ?execution_result.exit_code,
1232
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1233
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1234
5
                "Command returned non-zero exit code",
1235
            );
1236
6
        }
1237
1238
11
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1239
11
            let data = execution_result.stdout;
1240
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1241
11
            cas_store
1242
11
                .update_oneshot(digest, data)
1243
11
                .await
1244
11
                .err_tip(|| "Uploading stdout")
?0
;
1245
11
            Result::<DigestInfo, Error>::Ok(digest)
1246
11
        });
1247
11
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1248
11
            let data = execution_result.stderr;
1249
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1250
11
            cas_store
1251
11
                .update_oneshot(digest, data)
1252
11
                .await
1253
11
                .err_tip(|| "Uploading stdout")
?0
;
1254
11
            Result::<DigestInfo, Error>::Ok(digest)
1255
11
        });
1256
1257
11
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1258
18
            while let Some(
output_type7
) = output_path_futures.try_next().await
?0
{
  Branch (1258:23): [Folded - Ignored]
  Branch (1258:23): [Folded - Ignored]
  Branch (1258:23): [True: 7, False: 11]
1259
7
                match output_type {
1260
4
                    OutputType::File(output_file) => output_files.push(output_file),
1261
2
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1262
1
                    OutputType::FileSymlink(output_symlink) => {
1263
1
                        output_file_symlinks.push(output_symlink);
1264
1
                    }
1265
0
                    OutputType::DirectorySymlink(output_symlink) => {
1266
0
                        output_directory_symlinks.push(output_symlink);
1267
0
                    }
1268
0
                    OutputType::None => { /* Safe to ignore */ }
1269
                }
1270
            }
1271
11
            Ok(())
1272
11
        });
1273
11
        drop(output_path_futures);
1274
11
        let (stdout_digest, stderr_digest) = match upload_result {
1275
11
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1276
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1277
        };
1278
1279
11
        execution_metadata.output_upload_completed_timestamp =
1280
11
            (self.running_actions_manager.callbacks.now_fn)();
1281
11
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1282
11
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1283
11
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1284
11
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1285
11
        {
1286
11
            let mut state = self.state.lock();
1287
11
            execution_metadata.worker_completed_timestamp =
1288
11
                (self.running_actions_manager.callbacks.now_fn)();
1289
11
            state.action_result = Some(ActionResult {
1290
11
                output_files,
1291
11
                output_folders,
1292
11
                output_directory_symlinks,
1293
11
                output_file_symlinks,
1294
11
                exit_code: execution_result.exit_code,
1295
11
                stdout_digest,
1296
11
                stderr_digest,
1297
11
                execution_metadata,
1298
11
                server_logs: HashMap::default(), // TODO(palfrey) Not implemented.
1299
11
                error: state.error.clone(),
1300
11
                message: String::new(), // Will be filled in on cache_action_result if needed.
1301
11
            });
1302
11
        }
1303
11
        Ok(self)
1304
11
    }
1305
1306
11
    async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
1307
11
        let mut state = self.state.lock();
1308
11
        state
1309
11
            .action_result
1310
11
            .take()
1311
11
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1312
11
    }
1313
}
1314
1315
impl Drop for RunningActionImpl {
1316
18
    fn drop(&mut self) {
1317
18
        if self.did_cleanup.load(Ordering::Acquire) {
  Branch (1317:12): [True: 18, False: 0]
  Branch (1317:12): [Folded - Ignored]
1318
18
            if self.has_manager_entry.load(Ordering::Acquire) {
  Branch (1318:16): [True: 1, False: 17]
  Branch (1318:16): [Folded - Ignored]
1319
1
                drop(
1320
1
                    self.running_actions_manager
1321
1
                        .cleanup_action(&self.operation_id),
1322
1
                );
1323
17
            }
1324
18
            return;
1325
0
        }
1326
0
        let operation_id = self.operation_id.clone();
1327
0
        error!(
1328
            ?operation_id,
1329
0
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1330
        );
1331
0
        let running_actions_manager = self.running_actions_manager.clone();
1332
0
        let action_directory = self.action_directory.clone();
1333
0
        background_spawn!("running_action_impl_drop", async move {
1334
0
            let Err(err) =
  Branch (1334:17): [True: 0, False: 0]
  Branch (1334:17): [Folded - Ignored]
1335
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1336
            else {
1337
0
                return;
1338
            };
1339
0
            error!(
1340
                ?operation_id,
1341
                ?action_directory,
1342
                ?err,
1343
0
                "Error cleaning up action"
1344
            );
1345
0
        });
1346
18
    }
1347
}
1348
1349
impl RunningAction for RunningActionImpl {
1350
0
    fn get_operation_id(&self) -> &OperationId {
1351
0
        &self.operation_id
1352
0
    }
1353
1354
15
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1355
15
        self.metrics()
1356
15
            .clone()
1357
15
            .prepare_action
1358
15
            .wrap(Self::inner_prepare_action(self))
1359
15
            .await
1360
15
    }
1361
1362
13
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1363
13
        self.metrics()
1364
13
            .clone()
1365
13
            .execute
1366
13
            .wrap(Self::inner_execute(self))
1367
13
            .await
1368
13
    }
1369
1370
11
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1371
11
        self.metrics()
1372
11
            .clone()
1373
11
            .upload_results
1374
11
            .wrap(Self::inner_upload_results(self))
1375
11
            .await
1376
11
    }
1377
1378
17
    async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1379
17
        self.metrics()
1380
17
            .clone()
1381
17
            .cleanup
1382
17
            .wrap(async move {
1383
17
                let result = do_cleanup(
1384
17
                    &self.running_actions_manager,
1385
17
                    &self.operation_id,
1386
17
                    &self.action_directory,
1387
17
                )
1388
17
                .await;
1389
17
                self.has_manager_entry.store(false, Ordering::Release);
1390
17
                self.did_cleanup.store(true, Ordering::Release);
1391
17
                result.map(move |()| self)
1392
17
            })
1393
17
            .await
1394
17
    }
1395
1396
11
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1397
11
        self.metrics()
1398
11
            .clone()
1399
11
            .get_finished_result
1400
11
            .wrap(Self::inner_get_finished_result(self))
1401
11
            .await
1402
11
    }
1403
1404
0
    fn get_work_directory(&self) -> &String {
1405
0
        &self.work_directory
1406
0
    }
1407
}
1408
1409
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1410
    type RunningAction: RunningAction;
1411
1412
    fn create_and_add_action(
1413
        self: &Arc<Self>,
1414
        worker_id: String,
1415
        start_execute: StartExecute,
1416
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1417
1418
    fn cache_action_result(
1419
        &self,
1420
        action_digest: DigestInfo,
1421
        action_result: &mut ActionResult,
1422
        hasher: DigestHasherFunc,
1423
    ) -> impl Future<Output = Result<(), Error>> + Send;
1424
1425
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1426
1427
    fn kill_operation(
1428
        &self,
1429
        operation_id: &OperationId,
1430
    ) -> impl Future<Output = Result<(), Error>> + Send;
1431
1432
    fn metrics(&self) -> &Arc<Metrics>;
1433
}
1434
1435
/// A function to get the current system time, used to allow mocking for tests
1436
type NowFn = fn() -> SystemTime;
1437
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1438
1439
/// Functions that may be injected for testing purposes, during standard control
1440
/// flows these are specified by the new function.
1441
#[derive(Clone, Copy)]
1442
pub struct Callbacks {
1443
    /// A function that gets the current time.
1444
    pub now_fn: NowFn,
1445
    /// A function that sleeps for a given Duration.
1446
    pub sleep_fn: SleepFn,
1447
}
1448
1449
impl Debug for Callbacks {
1450
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1451
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
1452
0
    }
1453
}
1454
1455
/// The set of additional information for executing an action over and above
1456
/// those given in the `ActionInfo` passed to the worker.  This allows
1457
/// modification of the action for execution on this particular worker.  This
1458
/// may be used to run the action with a particular set of additional
1459
/// environment variables, or perhaps configure it to execute within a
1460
/// container.
1461
#[derive(Debug, Default)]
1462
pub struct ExecutionConfiguration {
1463
    /// If set, will be executed instead of the first argument passed in the
1464
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
1465
    /// arguments to this command.
1466
    pub entrypoint: Option<String>,
1467
    /// The only environment variables that will be specified when the command
1468
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
1469
    /// and PATH are also assigned (see `inner_execute`).
1470
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1471
}
1472
1473
#[derive(Debug)]
1474
struct UploadActionResults {
1475
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1476
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1477
    ac_store: Option<Store>,
1478
    historical_store: Store,
1479
    success_message_template: Template,
1480
    failure_message_template: Template,
1481
}
1482
1483
impl UploadActionResults {
1484
26
    fn new(
1485
26
        config: &UploadActionResultConfig,
1486
26
        ac_store: Option<Store>,
1487
26
        historical_store: Store,
1488
26
    ) -> Result<Self, Error> {
1489
26
        let upload_historical_results_strategy = config
1490
26
            .upload_historical_results_strategy
1491
26
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1492
8
        if !matches!(
  Branch (1492:12): [True: 8, False: 18]
  Branch (1492:12): [Folded - Ignored]
1493
26
            config.upload_ac_results_strategy,
1494
            UploadCacheResultsStrategy::Never
1495
8
        ) && ac_store.is_none()
  Branch (1495:14): [True: 0, False: 8]
  Branch (1495:14): [Folded - Ignored]
1496
        {
1497
0
            return Err(make_input_err!(
1498
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1499
0
            ));
1500
26
        }
1501
        Ok(Self {
1502
26
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1503
26
            upload_historical_results_strategy,
1504
26
            ac_store,
1505
26
            historical_store,
1506
26
            success_message_template: Template::new(&config.success_message_template).map_err(
1507
0
                |e| {
1508
0
                    make_input_err!(
1509
                        "Could not convert success_message_template to rust template: {} : {e:?}",
1510
                        config.success_message_template
1511
                    )
1512
0
                },
1513
0
            )?,
1514
26
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1515
0
                |e| {
1516
0
                    make_input_err!(
1517
                        "Could not convert failure_message_template to rust template: {} : {e:?}",
1518
                        config.success_message_template
1519
                    )
1520
0
                },
1521
0
            )?,
1522
        })
1523
26
    }
1524
1525
0
    const fn should_cache_result(
1526
0
        strategy: UploadCacheResultsStrategy,
1527
0
        action_result: &ActionResult,
1528
0
        treat_infra_error_as_failure: bool,
1529
0
    ) -> bool {
1530
0
        let did_fail = action_result.exit_code != 0
  Branch (1530:24): [Folded - Ignored]
1531
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
  Branch (1531:17): [Folded - Ignored]
1532
0
        match strategy {
1533
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
1534
0
            UploadCacheResultsStrategy::Never => false,
1535
            // Never cache internal errors or timeouts.
1536
            UploadCacheResultsStrategy::Everything => {
1537
0
                treat_infra_error_as_failure || action_result.error.is_none()
  Branch (1537:17): [Folded - Ignored]
1538
            }
1539
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
1540
        }
1541
0
    }
1542
1543
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
1544
    /// or `failure_message_template` config templates.
1545
5
    fn format_execute_response_message(
1546
5
        mut template_str: Template,
1547
5
        action_digest_info: DigestInfo,
1548
5
        maybe_historical_digest_info: Option<DigestInfo>,
1549
5
        hasher: DigestHasherFunc,
1550
5
    ) -> Result<String, Error> {
1551
5
        template_str.replace(
1552
            "digest_function",
1553
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1554
        );
1555
5
        template_str.replace(
1556
            "action_digest_hash",
1557
5
            action_digest_info.packed_hash().to_string(),
1558
        );
1559
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1560
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
  Branch (1560:16): [True: 3, False: 2]
  Branch (1560:16): [Folded - Ignored]
1561
3
            template_str.replace(
1562
3
                "historical_results_hash",
1563
3
                format!("{}", historical_digest_info.packed_hash()),
1564
3
            );
1565
3
            template_str.replace(
1566
3
                "historical_results_size",
1567
3
                historical_digest_info.size_bytes(),
1568
3
            );
1569
3
        } else {
1570
2
            template_str.replace("historical_results_hash", "");
1571
2
            template_str.replace("historical_results_size", "");
1572
2
        }
1573
5
        template_str
1574
5
            .text()
1575
5
            .map_err(|e| make_input_err!("Could not convert template to text: {e:?}"))
1576
5
    }
1577
1578
5
    async fn upload_ac_results(
1579
5
        &self,
1580
5
        action_digest: DigestInfo,
1581
5
        action_result: ProtoActionResult,
1582
5
        hasher: DigestHasherFunc,
1583
5
    ) -> Result<(), Error> {
1584
5
        let Some(ac_store) = self.ac_store.as_ref() else {
  Branch (1584:13): [Folded - Ignored]
  Branch (1584:13): [Folded - Ignored]
  Branch (1584:13): [True: 5, False: 0]
1585
0
            return Ok(());
1586
        };
1587
        // If we are a GrpcStore we shortcut here, as this is a special store.
1588
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (1588:16): [Folded - Ignored]
  Branch (1588:16): [Folded - Ignored]
  Branch (1588:16): [True: 0, False: 5]
1589
0
            let update_action_request = UpdateActionResultRequest {
1590
0
                // This is populated by `update_action_result`.
1591
0
                instance_name: String::new(),
1592
0
                action_digest: Some(action_digest.into()),
1593
0
                action_result: Some(action_result),
1594
0
                results_cache_policy: None,
1595
0
                digest_function: hasher.proto_digest_func().into(),
1596
0
            };
1597
0
            return grpc_store
1598
0
                .update_action_result(Request::new(update_action_request))
1599
0
                .await
1600
0
                .map(|_| ())
1601
0
                .err_tip(|| "Caching ActionResult");
1602
5
        }
1603
1604
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1605
5
        action_result
1606
5
            .encode(&mut store_data)
1607
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
1608
1609
5
        ac_store
1610
5
            .update_oneshot(action_digest, store_data.split().freeze())
1611
5
            .await
1612
5
            .err_tip(|| "Caching ActionResult")
1613
5
    }
1614
1615
3
    async fn upload_historical_results_with_message(
1616
3
        &self,
1617
3
        action_digest: DigestInfo,
1618
3
        execute_response: ExecuteResponse,
1619
3
        message_template: Template,
1620
3
        hasher: DigestHasherFunc,
1621
3
    ) -> Result<String, Error> {
1622
3
        let historical_digest_info = serialize_and_upload_message(
1623
3
            &HistoricalExecuteResponse {
1624
3
                action_digest: Some(action_digest.into()),
1625
3
                execute_response: Some(execute_response.clone()),
1626
3
            },
1627
3
            self.historical_store.as_pin(),
1628
3
            &mut hasher.hasher(),
1629
3
        )
1630
3
        .await
1631
3
        .err_tip(|| format!(
"Caching HistoricalExecuteResponse for digest: {action_digest}"0
))
?0
;
1632
1633
3
        Self::format_execute_response_message(
1634
3
            message_template,
1635
3
            action_digest,
1636
3
            Some(historical_digest_info),
1637
3
            hasher,
1638
        )
1639
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
1640
3
    }
1641
1642
6
    async fn cache_action_result(
1643
6
        &self,
1644
6
        action_info: DigestInfo,
1645
6
        action_result: &mut ActionResult,
1646
6
        hasher: DigestHasherFunc,
1647
6
    ) -> Result<(), Error> {
1648
6
        let should_upload_historical_results =
1649
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1650
6
        let should_upload_ac_results =
1651
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1652
        // Shortcut so we don't need to convert to proto if not needed.
1653
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
  Branch (1653:12): [Folded - Ignored]
  Branch (1653:41): [Folded - Ignored]
  Branch (1653:12): [Folded - Ignored]
  Branch (1653:41): [Folded - Ignored]
  Branch (1653:12): [True: 1, False: 5]
  Branch (1653:41): [True: 1, False: 0]
1654
1
            return Ok(());
1655
5
        }
1656
1657
5
        let mut execute_response = to_execute_response(action_result.clone());
1658
1659
        // In theory exit code should always be != 0 if there's an error, but for safety we
1660
        // catch both.
1661
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
  Branch (1661:35): [Folded - Ignored]
  Branch (1661:67): [Folded - Ignored]
  Branch (1661:35): [Folded - Ignored]
  Branch (1661:67): [Folded - Ignored]
  Branch (1661:35): [True: 4, False: 1]
  Branch (1661:67): [True: 3, False: 1]
1662
3
            self.success_message_template.clone()
1663
        } else {
1664
2
            self.failure_message_template.clone()
1665
        };
1666
1667
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
  Branch (1667:64): [Folded - Ignored]
  Branch (1667:64): [Folded - Ignored]
  Branch (1667:64): [True: 3, False: 2]
1668
3
            let maybe_message = self
1669
3
                .upload_historical_results_with_message(
1670
3
                    action_info,
1671
3
                    execute_response.clone(),
1672
3
                    message_template,
1673
3
                    hasher,
1674
3
                )
1675
3
                .await;
1676
3
            match maybe_message {
1677
3
                Ok(message) => {
1678
3
                    action_result.message.clone_from(&message);
1679
3
                    execute_response.message = message;
1680
3
                    Ok(())
1681
                }
1682
0
                Err(e) => Result::<(), Error>::Err(e),
1683
            }
1684
        } else {
1685
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1686
            {
1687
2
                Ok(message) => {
1688
2
                    action_result.message.clone_from(&message);
1689
2
                    execute_response.message = message;
1690
2
                    Ok(())
1691
                }
1692
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1693
            }
1694
        };
1695
1696
        // Note: Done in this order because we assume most results will succeed and most configs will
1697
        // either always upload upload historical results or only upload on filure. In which case
1698
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1699
5
        let ac_upload_results = if should_upload_ac_results {
  Branch (1699:36): [Folded - Ignored]
  Branch (1699:36): [Folded - Ignored]
  Branch (1699:36): [True: 5, False: 0]
1700
5
            self.upload_ac_results(
1701
5
                action_info,
1702
5
                execute_response
1703
5
                    .result
1704
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
1705
5
                hasher,
1706
            )
1707
5
            .await
1708
        } else {
1709
0
            Ok(())
1710
        };
1711
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1712
6
    }
1713
}
1714
1715
#[derive(Debug)]
1716
pub struct RunningActionsManagerArgs<'a> {
1717
    pub root_action_directory: String,
1718
    pub execution_configuration: ExecutionConfiguration,
1719
    pub cas_store: Arc<FastSlowStore>,
1720
    pub ac_store: Option<Store>,
1721
    pub historical_store: Store,
1722
    pub upload_action_result_config: &'a UploadActionResultConfig,
1723
    pub max_action_timeout: Duration,
1724
    pub timeout_handled_externally: bool,
1725
}
1726
1727
struct CleanupGuard {
1728
    manager: Weak<RunningActionsManagerImpl>,
1729
    operation_id: OperationId,
1730
}
1731
1732
impl Drop for CleanupGuard {
1733
17
    fn drop(&mut self) {
1734
17
        let Some(manager) = self.manager.upgrade() else {
  Branch (1734:13): [True: 17, False: 0]
  Branch (1734:13): [Folded - Ignored]
1735
0
            return;
1736
        };
1737
17
        let mut cleaning = manager.cleaning_up_operations.lock();
1738
17
        cleaning.remove(&self.operation_id);
1739
17
        manager.cleanup_complete_notify.notify_waiters();
1740
17
    }
1741
}
1742
1743
/// Holds state info about what is being executed and the interface for interacting
1744
/// with actions while they are running.
1745
#[derive(Debug)]
1746
pub struct RunningActionsManagerImpl {
1747
    root_action_directory: String,
1748
    execution_configuration: ExecutionConfiguration,
1749
    cas_store: Arc<FastSlowStore>,
1750
    filesystem_store: Arc<FilesystemStore>,
1751
    upload_action_results: UploadActionResults,
1752
    max_action_timeout: Duration,
1753
    timeout_handled_externally: bool,
1754
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
1755
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
1756
    // Notify does not support.
1757
    action_done_tx: watch::Sender<()>,
1758
    callbacks: Callbacks,
1759
    metrics: Arc<Metrics>,
1760
    /// Track operations being cleaned up to avoid directory collisions during action retries.
1761
    /// When an action fails and is retried on the same worker, we need to ensure the previous
1762
    /// attempt's directory is fully cleaned up before creating a new one.
1763
    /// See: <https://github.com/TraceMachina/nativelink/issues/1859>
1764
    cleaning_up_operations: Mutex<HashSet<OperationId>>,
1765
    /// Notify waiters when a cleanup operation completes. This is used in conjunction with
1766
    /// `cleaning_up_operations` to coordinate directory cleanup and creation.
1767
    cleanup_complete_notify: Arc<Notify>,
1768
}
1769
1770
impl RunningActionsManagerImpl {
1771
    /// Maximum time to wait for a cleanup operation to complete before timing out.
1772
    /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
1773
    const MAX_WAIT: Duration = Duration::from_secs(30);
1774
    /// Maximum backoff duration for exponential backoff when waiting for cleanup.
1775
    const MAX_BACKOFF: Duration = Duration::from_millis(500);
1776
26
    pub fn new_with_callbacks(
1777
26
        args: RunningActionsManagerArgs<'_>,
1778
26
        callbacks: Callbacks,
1779
26
    ) -> Result<Self, Error> {
1780
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
1781
26
        let filesystem_store = args
1782
26
            .cas_store
1783
26
            .fast_store()
1784
26
            .downcast_ref::<FilesystemStore>(None)
1785
26
            .err_tip(
1786
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
1787
0
            )?
1788
26
            .get_arc()
1789
26
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
1790
26
        let (action_done_tx, _) = watch::channel(());
1791
        Ok(Self {
1792
26
            root_action_directory: args.root_action_directory,
1793
26
            execution_configuration: args.execution_configuration,
1794
26
            cas_store: args.cas_store,
1795
26
            filesystem_store,
1796
26
            upload_action_results: UploadActionResults::new(
1797
26
                args.upload_action_result_config,
1798
26
                args.ac_store,
1799
26
                args.historical_store,
1800
            )
1801
26
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
1802
26
            max_action_timeout: args.max_action_timeout,
1803
26
            timeout_handled_externally: args.timeout_handled_externally,
1804
26
            running_actions: Mutex::new(HashMap::new()),
1805
26
            action_done_tx,
1806
26
            callbacks,
1807
26
            metrics: Arc::new(Metrics::default()),
1808
26
            cleaning_up_operations: Mutex::new(HashSet::new()),
1809
26
            cleanup_complete_notify: Arc::new(Notify::new()),
1810
        })
1811
26
    }
1812
1813
12
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
1814
12
        Self::new_with_callbacks(
1815
12
            args,
1816
            Callbacks {
1817
12
                now_fn: SystemTime::now,
1818
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
1819
            },
1820
        )
1821
12
    }
1822
1823
    /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
1824
    /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
1825
    /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
1826
19
    
async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error>0
{
1827
19
        let start = Instant::now();
1828
19
        let mut backoff = Duration::from_millis(10);
1829
19
        let mut has_waited = false;
1830
1831
        loop {
1832
19
            let should_wait = {
1833
19
                let cleaning = self.cleaning_up_operations.lock();
1834
19
                cleaning.contains(operation_id)
1835
            };
1836
1837
19
            if !should_wait {
  Branch (1837:16): [Folded - Ignored]
  Branch (1837:16): [Folded - Ignored]
  Branch (1837:16): [True: 19, False: 0]
1838
19
                let dir_path =
1839
19
                    PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
1840
1841
19
                if !dir_path.exists() {
  Branch (1841:20): [Folded - Ignored]
  Branch (1841:20): [Folded - Ignored]
  Branch (1841:20): [True: 18, False: 1]
1842
18
                    return Ok(());
1843
1
                }
1844
1845
                // Safety check: ensure we're only removing directories under root_action_directory
1846
1
                let root_path = Path::new(&self.root_action_directory);
1847
1
                let canonical_root = root_path.canonicalize().err_tip(|| 
{0
1848
0
                    format!(
1849
0
                        "Failed to canonicalize root directory: {}",
1850
                        self.root_action_directory
1851
                    )
1852
0
                })?;
1853
1
                let canonical_dir = dir_path.canonicalize().err_tip(|| 
{0
1854
0
                    format!("Failed to canonicalize directory: {}", dir_path.display())
1855
0
                })?;
1856
1857
1
                if !canonical_dir.starts_with(&canonical_root) {
  Branch (1857:20): [Folded - Ignored]
  Branch (1857:20): [Folded - Ignored]
  Branch (1857:20): [True: 0, False: 1]
1858
0
                    return Err(make_err!(
1859
0
                        Code::Internal,
1860
0
                        "Attempted to remove directory outside of root_action_directory: {}",
1861
0
                        dir_path.display()
1862
0
                    ));
1863
1
                }
1864
1865
                // Directory exists but not being cleaned - remove it
1866
1
                warn!(
1867
1
                    "Removing stale directory for {}: {}",
1868
                    operation_id,
1869
1
                    dir_path.display()
1870
                );
1871
1
                self.metrics.stale_removals.inc();
1872
1873
                // Try to remove the directory, with one retry on failure
1874
1
                let remove_result = fs::remove_dir_all(&dir_path).await;
1875
1
                if let Err(
e0
) = remove_result {
  Branch (1875:24): [Folded - Ignored]
  Branch (1875:24): [Folded - Ignored]
  Branch (1875:24): [True: 0, False: 1]
1876
                    // Retry once after a short delay in case the directory is temporarily locked
1877
0
                    tokio::time::sleep(Duration::from_millis(100)).await;
1878
0
                    fs::remove_dir_all(&dir_path).await.err_tip(|| {
1879
0
                        format!(
1880
0
                            "Failed to remove stale directory {} for retry of {} after retry (original error: {})",
1881
0
                            dir_path.display(),
1882
                            operation_id,
1883
                            e
1884
                        )
1885
0
                    })?;
1886
1
                }
1887
1
                return Ok(());
1888
0
            }
1889
1890
0
            if start.elapsed() > Self::MAX_WAIT {
  Branch (1890:16): [Folded - Ignored]
  Branch (1890:16): [Folded - Ignored]
  Branch (1890:16): [True: 0, False: 0]
1891
0
                self.metrics.cleanup_wait_timeouts.inc();
1892
0
                return Err(make_err!(
1893
0
                    Code::DeadlineExceeded,
1894
0
                    "Timeout waiting for previous operation cleanup: {} (waited {:?})",
1895
0
                    operation_id,
1896
0
                    start.elapsed()
1897
0
                ));
1898
0
            }
1899
1900
0
            if !has_waited {
  Branch (1900:16): [Folded - Ignored]
  Branch (1900:16): [Folded - Ignored]
  Branch (1900:16): [True: 0, False: 0]
1901
0
                self.metrics.cleanup_waits.inc();
1902
0
                has_waited = true;
1903
0
            }
1904
1905
0
            trace!(
1906
0
                "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
1907
                operation_id,
1908
0
                start.elapsed(),
1909
                backoff
1910
            );
1911
1912
0
            tokio::select! {
1913
0
                () = self.cleanup_complete_notify.notified() => {},
1914
0
                () = tokio::time::sleep(backoff) => {
1915
0
                    // Exponential backoff
1916
0
                    backoff = (backoff * 2).min(Self::MAX_BACKOFF);
1917
0
                },
1918
            }
1919
        }
1920
19
    }
1921
1922
0
    fn make_action_directory<'a>(
1923
0
        &'a self,
1924
0
        operation_id: &'a OperationId,
1925
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
1926
19
        
self.metrics.make_action_directory0
.
wrap0
(async move {
1927
19
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
1928
19
            fs::create_dir(&action_directory)
1929
19
                .await
1930
19
                .err_tip(|| format!(
"Error creating action directory {action_directory}"0
))
?0
;
1931
19
            Ok(action_directory)
1932
19
        })
1933
0
    }
1934
1935
19
    fn create_action_info(
1936
19
        &self,
1937
19
        start_execute: StartExecute,
1938
19
        queued_timestamp: SystemTime,
1939
19
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
1940
19
        self.metrics.create_action_info.wrap(async move {
1941
19
            let execute_request = start_execute
1942
19
                .execute_request
1943
19
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
1944
19
            let action_digest: DigestInfo = execute_request
1945
19
                .action_digest
1946
19
                .clone()
1947
19
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
1948
19
                .try_into()
?0
;
1949
19
            let load_start_timestamp = (self.callbacks.now_fn)();
1950
19
            let action =
1951
19
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
1952
19
                    .await
1953
19
                    .err_tip(|| "During start_action")
?0
;
1954
19
            let action_info = ActionInfo::try_from_action_and_execute_request(
1955
19
                execute_request,
1956
19
                action,
1957
19
                load_start_timestamp,
1958
19
                queued_timestamp,
1959
            )
1960
19
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
1961
19
            Ok(action_info)
1962
19
        })
1963
19
    }
1964
1965
18
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
1966
18
        let mut running_actions = self.running_actions.lock();
1967
18
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
1968
0
            format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl")
1969
0
        });
1970
        // No need to copy anything, we just are telling the receivers an event happened.
1971
18
        self.action_done_tx.send_modify(|()| {});
1972
18
        result.map(|_| ())
1973
18
    }
1974
1975
    // Note: We do not capture metrics on this call, only `.kill_all()`.
1976
    // Important: When the future returns the process may still be running.
1977
2
    
async fn kill_operation(action: Arc<RunningActionImpl>)0
{
1978
2
        warn!(
1979
2
            operation_id = ?action.operation_id,
1980
2
            "Sending kill to running operation",
1981
        );
1982
2
        let kill_channel_tx = {
1983
2
            let mut action_state = action.state.lock();
1984
2
            action_state.kill_channel_tx.take()
1985
        };
1986
2
        if let Some(kill_channel_tx) = kill_channel_tx {
  Branch (1986:16): [Folded - Ignored]
  Branch (1986:16): [Folded - Ignored]
  Branch (1986:16): [True: 2, False: 0]
1987
2
            if kill_channel_tx.send(()).is_err() {
  Branch (1987:16): [Folded - Ignored]
  Branch (1987:16): [Folded - Ignored]
  Branch (1987:16): [True: 0, False: 2]
1988
0
                error!(
1989
0
                    operation_id = ?action.operation_id,
1990
0
                    "Error sending kill to running operation",
1991
                );
1992
2
            }
1993
0
        }
1994
2
    }
1995
1996
17
    fn perform_cleanup(self: &Arc<Self>, operation_id: OperationId) -> Option<CleanupGuard> {
1997
17
        let mut cleaning = self.cleaning_up_operations.lock();
1998
17
        cleaning
1999
17
            .insert(operation_id.clone())
2000
17
            .then_some(CleanupGuard {
2001
17
                manager: Arc::downgrade(self),
2002
17
                operation_id,
2003
17
            })
2004
17
    }
2005
}
2006
2007
impl RunningActionsManager for RunningActionsManagerImpl {
2008
    type RunningAction = RunningActionImpl;
2009
2010
19
    async fn create_and_add_action(
2011
19
        self: &Arc<Self>,
2012
19
        worker_id: String,
2013
19
        start_execute: StartExecute,
2014
19
    ) -> Result<Arc<RunningActionImpl>, Error> {
2015
19
        self.metrics
2016
19
            .create_and_add_action
2017
19
            .wrap(async move {
2018
19
                let queued_timestamp = start_execute
2019
19
                    .queued_timestamp
2020
19
                    .and_then(|time| 
time13
.
try_into13
().
ok13
())
2021
19
                    .unwrap_or(SystemTime::UNIX_EPOCH);
2022
19
                let operation_id = start_execute
2023
19
                    .operation_id.as_str().into();
2024
19
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
2025
19
                debug!(
2026
                    ?action_info,
2027
19
                    "Worker received action",
2028
                );
2029
                // Wait for any previous cleanup to complete before creating directory
2030
19
                self.wait_for_cleanup_if_needed(&operation_id).await
?0
;
2031
19
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
2032
19
                let execution_metadata = ExecutionMetadata {
2033
19
                    worker: worker_id,
2034
19
                    queued_timestamp: action_info.insert_timestamp,
2035
19
                    worker_start_timestamp: action_info.load_timestamp,
2036
19
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
2037
19
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
2038
19
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
2039
19
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
2040
19
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
2041
19
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
2042
19
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
2043
19
                };
2044
19
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
  Branch (2044:34): [Folded - Ignored]
  Branch (2044:67): [Folded - Ignored]
  Branch (2044:34): [Folded - Ignored]
  Branch (2044:67): [Folded - Ignored]
  Branch (2044:34): [True: 16, False: 3]
  Branch (2044:67): [True: 0, False: 3]
2045
16
                    self.max_action_timeout
2046
                } else {
2047
3
                    action_info.timeout
2048
                };
2049
19
                if timeout > self.max_action_timeout {
  Branch (2049:20): [Folded - Ignored]
  Branch (2049:20): [Folded - Ignored]
  Branch (2049:20): [True: 1, False: 18]
2050
1
                    return Err(make_err!(
2051
1
                        Code::InvalidArgument,
2052
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
2053
1
                        timeout.as_secs_f32(),
2054
1
                        self.max_action_timeout.as_secs_f32()
2055
1
                    ));
2056
18
                }
2057
18
                let running_action = Arc::new(RunningActionImpl::new(
2058
18
                    execution_metadata,
2059
18
                    operation_id.clone(),
2060
18
                    action_directory,
2061
18
                    action_info,
2062
18
                    timeout,
2063
18
                    self.clone(),
2064
                ));
2065
                {
2066
18
                    let mut running_actions = self.running_actions.lock();
2067
                    // Check if action already exists and is still alive
2068
18
                    if let Some(
existing_weak0
) = running_actions.get(&operation_id) {
  Branch (2068:28): [Folded - Ignored]
  Branch (2068:28): [Folded - Ignored]
  Branch (2068:28): [True: 0, False: 18]
2069
0
                        if let Some(_existing_action) = existing_weak.upgrade() {
  Branch (2069:32): [Folded - Ignored]
  Branch (2069:32): [Folded - Ignored]
  Branch (2069:32): [True: 0, False: 0]
2070
0
                            return Err(make_err!(
2071
0
                                Code::AlreadyExists,
2072
0
                                "Action with operation_id {} is already running",
2073
0
                                operation_id
2074
0
                            ));
2075
0
                        }
2076
18
                    }
2077
18
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
2078
                }
2079
18
                Ok(running_action)
2080
19
            })
2081
19
            .await
2082
19
    }
2083
2084
6
    async fn cache_action_result(
2085
6
        &self,
2086
6
        action_info: DigestInfo,
2087
6
        action_result: &mut ActionResult,
2088
6
        hasher: DigestHasherFunc,
2089
6
    ) -> Result<(), Error> {
2090
6
        self.metrics
2091
6
            .cache_action_result
2092
6
            .wrap(self.upload_action_results.cache_action_result(
2093
6
                action_info,
2094
6
                action_result,
2095
6
                hasher,
2096
6
            ))
2097
6
            .await
2098
6
    }
2099
2100
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
2101
0
        let running_action = {
2102
0
            let running_actions = self.running_actions.lock();
2103
0
            running_actions
2104
0
                .get(operation_id)
2105
0
                .and_then(Weak::upgrade)
2106
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
2107
        };
2108
0
        Self::kill_operation(running_action).await;
2109
0
        Ok(())
2110
0
    }
2111
2112
    // Note: When the future returns the process should be fully killed and cleaned up.
2113
2
    
async fn kill_all(&self)0
{
2114
2
        self.metrics
2115
2
            .kill_all
2116
2
            .wrap_no_capture_result(async move {
2117
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
2118
2
                    let running_actions = self.running_actions.lock();
2119
2
                    running_actions
2120
2
                        .iter()
2121
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
2122
2
                        .collect()
2123
                };
2124
2
                let mut kill_futures: FuturesUnordered<_> = kill_operations
2125
2
                    .into_iter()
2126
2
                    .map(Self::kill_operation)
2127
2
                    .collect();
2128
4
                while kill_futures.next().await.is_some() 
{}2
  Branch (2128:23): [Folded - Ignored]
  Branch (2128:23): [Folded - Ignored]
  Branch (2128:23): [True: 2, False: 2]
2129
2
            })
2130
2
            .await;
2131
        // Ignore error. If error happens it means there's no sender, which is not a problem.
2132
        // Note: Sanity check this API will always check current value then future values:
2133
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
2134
2
        drop(
2135
2
            self.action_done_tx
2136
2
                .subscribe()
2137
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
2138
2
                .await,
2139
        );
2140
2
    }
2141
2142
    #[inline]
2143
2
    fn metrics(&self) -> &Arc<Metrics> {
2144
2
        &self.metrics
2145
2
    }
2146
}
2147
2148
#[derive(Debug, Default, MetricsComponent)]
2149
pub struct Metrics {
2150
    #[metric(help = "Stats about the create_and_add_action command.")]
2151
    create_and_add_action: AsyncCounterWrapper,
2152
    #[metric(help = "Stats about the cache_action_result command.")]
2153
    cache_action_result: AsyncCounterWrapper,
2154
    #[metric(help = "Stats about the kill_all command.")]
2155
    kill_all: AsyncCounterWrapper,
2156
    #[metric(help = "Stats about the create_action_info command.")]
2157
    create_action_info: AsyncCounterWrapper,
2158
    #[metric(help = "Stats about the make_work_directory command.")]
2159
    make_action_directory: AsyncCounterWrapper,
2160
    #[metric(help = "Stats about the prepare_action command.")]
2161
    prepare_action: AsyncCounterWrapper,
2162
    #[metric(help = "Stats about the execute command.")]
2163
    execute: AsyncCounterWrapper,
2164
    #[metric(help = "Stats about the upload_results command.")]
2165
    upload_results: AsyncCounterWrapper,
2166
    #[metric(help = "Stats about the cleanup command.")]
2167
    cleanup: AsyncCounterWrapper,
2168
    #[metric(help = "Stats about the get_finished_result command.")]
2169
    get_finished_result: AsyncCounterWrapper,
2170
    #[metric(help = "Number of times an action waited for cleanup to complete.")]
2171
    cleanup_waits: CounterWithTime,
2172
    #[metric(help = "Number of stale directories removed during action retries.")]
2173
    stale_removals: CounterWithTime,
2174
    #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2175
    cleanup_wait_timeouts: CounterWithTime,
2176
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
2177
    get_proto_command_from_store: AsyncCounterWrapper,
2178
    #[metric(help = "Stats about the download_to_directory command.")]
2179
    download_to_directory: AsyncCounterWrapper,
2180
    #[metric(help = "Stats about the prepare_output_files command.")]
2181
    prepare_output_files: AsyncCounterWrapper,
2182
    #[metric(help = "Stats about the prepare_output_paths command.")]
2183
    prepare_output_paths: AsyncCounterWrapper,
2184
    #[metric(help = "Stats about the child_process command.")]
2185
    child_process: AsyncCounterWrapper,
2186
    #[metric(help = "Stats about the child_process_success_error_code command.")]
2187
    child_process_success_error_code: CounterWithTime,
2188
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
2189
    child_process_failure_error_code: CounterWithTime,
2190
    #[metric(help = "Total time spent uploading stdout.")]
2191
    upload_stdout: AsyncCounterWrapper,
2192
    #[metric(help = "Total time spent uploading stderr.")]
2193
    upload_stderr: AsyncCounterWrapper,
2194
    #[metric(help = "Total number of task timeouts.")]
2195
    task_timeouts: CounterWithTime,
2196
}