Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::vec_deque::VecDeque;
23
use std::collections::{HashMap, HashSet};
24
use std::ffi::{OsStr, OsString};
25
#[cfg(target_family = "unix")]
26
use std::fs::Permissions;
27
#[cfg(target_family = "unix")]
28
use std::os::unix::fs::{MetadataExt, PermissionsExt};
29
use std::path::{Path, PathBuf};
30
use std::process::Stdio;
31
use std::sync::{Arc, Weak};
32
use std::time::SystemTime;
33
34
use bytes::{Bytes, BytesMut};
35
use filetime::{FileTime, set_file_mtime};
36
use formatx::Template;
37
use futures::future::{
38
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
39
};
40
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
41
use nativelink_config::cas_server::{
42
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
43
};
44
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
45
use nativelink_metric::MetricsComponent;
46
use nativelink_proto::build::bazel::remote::execution::v2::{
47
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
48
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
49
    Tree as ProtoTree, UpdateActionResultRequest,
50
};
51
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
52
    HistoricalExecuteResponse, StartExecute,
53
};
54
use nativelink_store::ac_utils::{
55
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
56
};
57
use nativelink_store::cas_utils::is_zero_digest;
58
use nativelink_store::fast_slow_store::FastSlowStore;
59
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
60
use nativelink_store::grpc_store::GrpcStore;
61
use nativelink_util::action_messages::{
62
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
63
    SymlinkInfo, to_execute_response,
64
};
65
use nativelink_util::common::{DigestInfo, fs};
66
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
67
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
68
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
69
use nativelink_util::{background_spawn, spawn, spawn_blocking};
70
use parking_lot::Mutex;
71
use prost::Message;
72
use relative_path::RelativePath;
73
use scopeguard::{ScopeGuard, guard};
74
use serde::Deserialize;
75
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
76
use tokio::process;
77
use tokio::sync::{Notify, oneshot, watch};
78
use tokio::time::Instant;
79
use tokio_stream::wrappers::ReadDirStream;
80
use tonic::Request;
81
use tracing::{debug, error, info, trace, warn};
82
use uuid::Uuid;
83
84
/// For simplicity we use a fixed exit code for cases when our program is terminated
85
/// due to a signal.
86
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
87
88
/// Default strategy for uploading historical results.
89
/// Note: If this value changes the config documentation
90
/// should reflect it.
91
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
92
    UploadCacheResultsStrategy::FailuresOnly;
93
94
/// Valid string reasons for a failure.
95
/// Note: If these change, the documentation should be updated.
96
#[derive(Debug, Deserialize)]
97
#[serde(rename_all = "snake_case")]
98
enum SideChannelFailureReason {
99
    /// Task should be considered timed out.
100
    Timeout,
101
}
102
103
/// This represents the json data that can be passed from the running process
104
/// to the parent via the `SideChannelFile`. See:
105
/// `config::EnvironmentSource::sidechannelfile` for more details.
106
/// Note: Any fields added here must be added to the documentation.
107
#[derive(Debug, Deserialize, Default)]
108
struct SideChannelInfo {
109
    /// If the task should be considered a failure and why.
110
    failure: Option<SideChannelFailureReason>,
111
}
112
113
/// Aggressively download the digests of files and make a local folder from it. This function
114
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
115
/// should be rate limited if spawning too many requests at once is an issue.
116
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
117
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
118
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
119
/// to a new location.
120
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
121
// of the future. So we need to force this function to return a dynamic future instead.
122
// see: https://github.com/rust-lang/rust/issues/78649
123
25
pub fn download_to_directory<'a>(
124
25
    cas_store: &'a FastSlowStore,
125
25
    filesystem_store: Pin<&'a FilesystemStore>,
126
25
    digest: &'a DigestInfo,
127
25
    current_directory: &'a str,
128
25
) -> BoxFuture<'a, Result<(), Error>> {
129
25
    async move {
130
25
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
131
25
            .await
132
25
            .err_tip(|| "Converting digest to Directory")
?0
;
133
25
        let mut futures = FuturesUnordered::new();
134
135
29
        for 
file4
in directory.files {
136
4
            let digest: DigestInfo = file
137
4
                .digest
138
4
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
139
4
                .try_into()
140
4
                .err_tip(|| "In Directory::file::digest")
?0
;
141
4
            let dest = format!("{}/{}", current_directory, file.name);
142
4
            let (mtime, mut unix_mode) = match file.node_properties {
143
1
                Some(properties) => (properties.mtime, properties.unix_mode),
144
3
                None => (None, None),
145
            };
146
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
147
4
            if file.is_executable {
  Branch (147:16): [True: 1, False: 3]
  Branch (147:16): [Folded - Ignored]
148
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
149
3
            }
150
4
            futures.push(
151
4
                cas_store
152
4
                    .populate_fast_store(digest.into())
153
4
                    .and_then(move |()| async move {
154
4
                        if is_zero_digest(digest) {
  Branch (154:28): [True: 0, False: 4]
  Branch (154:28): [Folded - Ignored]
155
0
                            let mut file_slot = fs::create_file(&dest).await?;
156
0
                            file_slot.write_all(&[]).await?;
157
                        }
158
                        else {
159
4
                            let file_entry = filesystem_store
160
4
                                .get_file_entry_for_digest(&digest)
161
4
                                .await
162
4
                                .err_tip(|| "During hard link")
?0
;
163
                            // TODO: add a test for #2051: deadlock with large number of files
164
8
                            let 
src_path4
=
file_entry4
.
get_file_path_locked4
(|src| async move
{4
Ok(PathBuf::from(src))4
}).
await4
?0
;
165
4
                            fs::hard_link(&src_path, &dest)
166
4
                                .await
167
4
                                .map_err(|e| 
{0
168
0
                                    if e.code == Code::NotFound {
  Branch (168:40): [True: 0, False: 0]
  Branch (168:40): [Folded - Ignored]
169
0
                                        make_err!(
170
0
                                            Code::Internal,
171
                                            "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\
172
                                            This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
173
                                            To fix this issue:\n\
174
                                            1. Increase the 'max_bytes' value in your filesystem store configuration\n\
175
                                            2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
176
                                            3. The setting is typically found in your nativelink.json config under:\n\
177
                                            stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
178
                                            4. Restart NativeLink after making the change\n\n\
179
                                            If this error persists after increasing max_bytes several times, please report at:\n\
180
                                            https://github.com/TraceMachina/nativelink/issues\n\
181
                                            Include your config file and both server and client logs to help us assist you."
182
                                        )
183
                                    } else {
184
0
                                        make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
185
                                    }
186
0
                                })?;
187
                            }
188
                        #[cfg(target_family = "unix")]
189
4
                        if let Some(
unix_mode1
) = unix_mode {
  Branch (189:32): [True: 1, False: 3]
  Branch (189:32): [Folded - Ignored]
190
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
191
1
                                .await
192
1
                                .err_tip(|| 
{0
193
0
                                    format!(
194
0
                                        "Could not set unix mode in download_to_directory {dest}"
195
                                    )
196
0
                                })?;
197
3
                        }
198
4
                        if let Some(
mtime1
) = mtime {
  Branch (198:32): [True: 1, False: 3]
  Branch (198:32): [Folded - Ignored]
199
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
200
1
                                set_file_mtime(
201
1
                                    &dest,
202
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
203
                                )
204
1
                                .err_tip(|| 
{0
205
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
206
0
                                })
207
1
                            })
208
1
                            .await
209
1
                            .err_tip(
210
                                || "Failed to launch spawn_blocking in download_to_directory",
211
0
                            )??;
212
3
                        }
213
4
                        Ok(())
214
8
                    })
215
4
                    .map_err(move |e| 
e0
.
append0
(
format!0
(
"for digest {digest}"0
)))
216
4
                    .boxed(),
217
            );
218
        }
219
220
32
        for 
directory7
in directory.directories {
221
7
            let digest: DigestInfo = directory
222
7
                .digest
223
7
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
224
7
                .try_into()
225
7
                .err_tip(|| "In Directory::file::digest")
?0
;
226
7
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
227
7
            futures.push(
228
7
                async move {
229
7
                    fs::create_dir(&new_directory_path)
230
7
                        .await
231
7
                        .err_tip(|| format!(
"Could not create directory {new_directory_path}"0
))
?0
;
232
7
                    download_to_directory(
233
7
                        cas_store,
234
7
                        filesystem_store,
235
7
                        &digest,
236
7
                        &new_directory_path,
237
7
                    )
238
7
                    .await
239
7
                    .err_tip(|| format!(
"in download_to_directory : {new_directory_path}"0
))
?0
;
240
7
                    Ok(())
241
7
                }
242
7
                .boxed(),
243
            );
244
        }
245
246
        #[cfg(target_family = "unix")]
247
26
        for 
symlink_node1
in directory.symlinks {
248
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
249
1
            futures.push(
250
1
                async move {
251
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
252
0
                        format!(
253
0
                            "Could not create symlink {} -> {}",
254
                            symlink_node.target, dest
255
                        )
256
0
                    })?;
257
1
                    Ok(())
258
1
                }
259
1
                .boxed(),
260
            );
261
        }
262
263
37
        while futures.try_next().await
?0
.is_some()
{}12
  Branch (263:15): [True: 12, False: 25]
  Branch (263:15): [Folded - Ignored]
264
25
        Ok(())
265
25
    }
266
25
    .boxed()
267
25
}
268
269
/// Prepares action inputs by first trying the directory cache (if available),
270
/// then falling back to traditional `download_to_directory`.
271
///
272
/// This provides a significant performance improvement for repeated builds
273
/// with the same input directories.
274
0
pub async fn prepare_action_inputs(
275
0
    directory_cache: &Option<Arc<crate::directory_cache::DirectoryCache>>,
276
0
    cas_store: &FastSlowStore,
277
0
    filesystem_store: Pin<&FilesystemStore>,
278
0
    digest: &DigestInfo,
279
0
    work_directory: &str,
280
15
) -> Result<(), Error> {
281
    // Try cache first if available
282
15
    if let Some(
cache0
) = directory_cache {
  Branch (282:12): [Folded - Ignored]
  Branch (282:12): [Folded - Ignored]
  Branch (282:12): [True: 0, False: 15]
283
0
        match cache
284
0
            .get_or_create(*digest, Path::new(work_directory))
285
0
            .await
286
        {
287
0
            Ok(cache_hit) => {
288
0
                trace!(
289
                    ?digest,
290
0
                    work_directory, cache_hit, "Successfully prepared inputs via directory cache"
291
                );
292
0
                return Ok(());
293
            }
294
0
            Err(e) => {
295
0
                warn!(
296
                    ?digest,
297
                    ?e,
298
0
                    "Directory cache failed, falling back to traditional download"
299
                );
300
                // Fall through to traditional path
301
            }
302
        }
303
15
    }
304
305
    // Traditional path (cache disabled or failed)
306
15
    download_to_directory(cas_store, filesystem_store, digest, work_directory).await
307
15
}
308
309
#[cfg(target_family = "windows")]
310
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
311
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
312
    EXECUTABLE_EXTENSIONS
313
        .iter()
314
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
315
}
316
317
#[cfg(target_family = "unix")]
318
7
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
319
7
    (metadata.mode() & 0o111) != 0
320
7
}
321
322
type DigestUploader = Arc<tokio::sync::OnceCell<()>>;
323
324
7
async fn upload_file(
325
7
    cas_store: Pin<&impl StoreLike>,
326
7
    full_path: impl AsRef<Path> + Debug + Send + Sync,
327
7
    hasher: DigestHasherFunc,
328
7
    metadata: std::fs::Metadata,
329
7
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
330
7
) -> Result<FileInfo, Error> {
331
7
    let is_executable = is_executable(&metadata, &full_path);
332
7
    let file_size = metadata.len();
333
7
    let file = fs::open_file(&full_path, 0, u64::MAX)
334
7
        .await
335
7
        .err_tip(|| format!(
"Could not open file {full_path:?}"0
))
?0
;
336
337
7
    let (digest, mut file) = hasher
338
7
        .hasher()
339
7
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
340
7
        .await
341
7
        .err_tip(|| format!(
"Failed to hash file in digest_for_file failed for {full_path:?}"0
))
?0
;
342
343
7
    let digest_uploader = match digest_uploaders.lock().entry(digest) {
344
0
        std::collections::hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
345
7
        std::collections::hash_map::Entry::Vacant(vacant_entry) => vacant_entry
346
7
            .insert(Arc::new(tokio::sync::OnceCell::new()))
347
7
            .clone(),
348
    };
349
350
    // Only upload a file with a given hash once.  The file may exist multiple
351
    // times in the output with different names.
352
7
    digest_uploader
353
14
        .
get_or_try_init7
(async || {
354
            // Only upload if the digest doesn't already exist, this should be
355
            // a much cheaper operation than an upload.
356
7
            let cas_store = cas_store.as_store_driver_pin();
357
7
            let store_key: nativelink_util::store_trait::StoreKey<'_> = digest.into();
358
7
            if cas_store
  Branch (358:16): [Folded - Ignored]
  Branch (358:16): [Folded - Ignored]
  Branch (358:16): [True: 1, False: 2]
  Branch (358:16): [True: 1, False: 3]
359
7
                .has(store_key.borrow())
360
7
                .await
361
7
                .is_ok_and(|result| result.is_some())
362
            {
363
2
                return Ok(());
364
5
            }
365
366
5
            file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
367
368
            // Note: For unknown reasons we appear to be hitting:
369
            // https://github.com/rust-lang/rust/issues/92096
370
            // or a smiliar issue if we try to use the non-store driver function, so we
371
            // are using the store driver function here.
372
5
            let store_key_for_upload = store_key.clone();
373
5
            let upload_result = cas_store
374
5
                .update_with_whole_file(
375
5
                    store_key_for_upload,
376
5
                    full_path.as_ref().into(),
377
5
                    file,
378
5
                    UploadSizeInfo::ExactSize(digest.size_bytes()),
379
5
                )
380
5
                .await
381
5
                .map(|_slot| ());
382
383
5
            match upload_result {
384
5
                Ok(()) => Ok(()),
385
0
                Err(err) => {
386
                    // Output uploads run concurrently and may overlap (e.g. a file is listed
387
                    // both as an output file and inside an output directory). When another
388
                    // upload has already moved the file into CAS, this update can fail with
389
                    // NotFound even though the digest is now present. Per the RE spec, missing
390
                    // outputs should be ignored, so treat this as success if the digest exists.
391
0
                    if err.code == Code::NotFound
  Branch (391:24): [Folded - Ignored]
  Branch (391:24): [Folded - Ignored]
  Branch (391:24): [True: 0, False: 0]
  Branch (391:24): [True: 0, False: 0]
392
0
                        && cas_store
  Branch (392:28): [Folded - Ignored]
  Branch (392:28): [Folded - Ignored]
  Branch (392:28): [True: 0, False: 0]
  Branch (392:28): [True: 0, False: 0]
393
0
                            .has(store_key.borrow())
394
0
                            .await
395
0
                            .is_ok_and(|result| result.is_some())
396
                    {
397
0
                        Ok(())
398
                    } else {
399
0
                        Err(err)
400
                    }
401
                }
402
            }
403
14
        })
404
7
        .await
405
7
        .err_tip(|| format!(
"for {full_path:?}"0
))
?0
;
406
407
7
    let name = full_path
408
7
        .as_ref()
409
7
        .file_name()
410
7
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
411
7
        .to_str()
412
7
        .err_tip(|| 
{0
413
0
            make_err!(
414
0
                Code::Internal,
415
                "Could not convert {:?} to string",
416
                full_path
417
            )
418
0
        })?
419
7
        .to_string();
420
421
7
    Ok(FileInfo {
422
7
        name_or_path: NameOrPath::Name(name),
423
7
        digest,
424
7
        is_executable,
425
7
    })
426
7
}
427
428
2
async fn upload_symlink(
429
2
    full_path: impl AsRef<Path> + Debug,
430
2
    full_work_directory_path: impl AsRef<Path>,
431
2
) -> Result<SymlinkInfo, Error> {
432
2
    let full_target_path = fs::read_link(full_path.as_ref())
433
2
        .await
434
2
        .err_tip(|| format!(
"Could not get read_link path of {full_path:?}"0
))
?0
;
435
436
    // Detect if our symlink is inside our work directory, if it is find the
437
    // relative path otherwise use the absolute path.
438
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
  Branch (438:21): [Folded - Ignored]
  Branch (438:21): [Folded - Ignored]
  Branch (438:21): [True: 0, False: 1]
  Branch (438:21): [True: 0, False: 1]
439
0
        let full_target_path = RelativePath::from_path(&full_target_path)
440
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
441
0
        RelativePath::from_path(full_work_directory_path.as_ref())
442
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
443
0
            .relative(full_target_path)
444
0
            .normalize()
445
0
            .into_string()
446
    } else {
447
2
        full_target_path
448
2
            .to_str()
449
2
            .err_tip(|| 
{0
450
0
                make_err!(
451
0
                    Code::Internal,
452
                    "Could not convert '{:?}' to string",
453
                    full_target_path
454
                )
455
0
            })?
456
2
            .to_string()
457
    };
458
459
2
    let name = full_path
460
2
        .as_ref()
461
2
        .file_name()
462
2
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
463
2
        .to_str()
464
2
        .err_tip(|| 
{0
465
0
            make_err!(
466
0
                Code::Internal,
467
                "Could not convert {:?} to string",
468
                full_path
469
            )
470
0
        })?
471
2
        .to_string();
472
473
2
    Ok(SymlinkInfo {
474
2
        name_or_path: NameOrPath::Name(name),
475
2
        target,
476
2
    })
477
2
}
478
479
3
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
480
3
    cas_store: Pin<&'a impl StoreLike>,
481
3
    full_dir_path: P,
482
3
    full_work_directory: &'a str,
483
3
    hasher: DigestHasherFunc,
484
3
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
485
3
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
486
3
    Box::pin(async move {
487
3
        let file_futures = FuturesUnordered::new();
488
3
        let dir_futures = FuturesUnordered::new();
489
3
        let symlink_futures = FuturesUnordered::new();
490
        {
491
3
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
492
3
                .await
493
3
                .err_tip(|| format!(
"Error reading dir for reading {full_dir_path:?}"0
))
?0
494
3
                .into_inner();
495
3
            let mut dir_stream = ReadDirStream::new(dir_handle);
496
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
497
            // lived as possible. This is why we iterate the directory and then build a bunch of
498
            // futures with all the work we are wanting to do then execute it. It allows us to
499
            // close the directory iterator file descriptor, then open the child files/folders.
500
8
            while let Some(
entry_result5
) = dir_stream.next().await {
  Branch (500:23): [Folded - Ignored]
  Branch (500:23): [Folded - Ignored]
  Branch (500:23): [True: 1, False: 1]
  Branch (500:23): [True: 4, False: 2]
501
5
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
502
5
                let file_type = entry
503
5
                    .file_type()
504
5
                    .await
505
5
                    .err_tip(|| format!(
"Error running file_type() on {entry:?}"0
))
?0
;
506
5
                let full_path = full_dir_path.as_ref().join(entry.path());
507
5
                if file_type.is_dir() {
  Branch (507:20): [Folded - Ignored]
  Branch (507:20): [Folded - Ignored]
  Branch (507:20): [True: 0, False: 1]
  Branch (507:20): [True: 1, False: 3]
508
1
                    let full_dir_path = full_dir_path.clone();
509
1
                    dir_futures.push(
510
1
                        upload_directory(
511
1
                            cas_store,
512
1
                            full_path.clone(),
513
1
                            full_work_directory,
514
1
                            hasher,
515
1
                            digest_uploaders.clone(),
516
                        )
517
1
                        .and_then(|(dir, all_dirs)| async move {
518
1
                            let directory_name = full_path
519
1
                                .file_name()
520
1
                                .err_tip(|| 
{0
521
0
                                    format!("Expected file_name to exist on {full_dir_path:?}")
522
0
                                })?
523
1
                                .to_str()
524
1
                                .err_tip(|| 
{0
525
0
                                    make_err!(
526
0
                                        Code::Internal,
527
                                        "Could not convert {:?} to string",
528
                                        full_dir_path
529
                                    )
530
0
                                })?
531
1
                                .to_string();
532
533
1
                            let digest =
534
1
                                serialize_and_upload_message(&dir, cas_store, &mut hasher.hasher())
535
1
                                    .await
536
1
                                    .err_tip(|| format!(
"for {}"0
,
full_path.display()0
))
?0
;
537
538
1
                            Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
539
1
                                DirectoryNode {
540
1
                                    name: directory_name,
541
1
                                    digest: Some(digest.into()),
542
1
                                },
543
1
                                all_dirs,
544
1
                            ))
545
2
                        })
546
1
                        .boxed(),
547
                    );
548
4
                } else if file_type.is_file() {
  Branch (548:27): [Folded - Ignored]
  Branch (548:27): [Folded - Ignored]
  Branch (548:27): [True: 0, False: 1]
  Branch (548:27): [True: 3, False: 0]
549
3
                    let digest_uploaders = digest_uploaders.clone();
550
3
                    file_futures.push(async move {
551
3
                        let metadata = fs::metadata(&full_path)
552
3
                            .await
553
3
                            .err_tip(|| format!(
"Could not open file {}"0
,
full_path.display()0
))
?0
;
554
3
                        upload_file(cas_store, &full_path, hasher, metadata, digest_uploaders)
555
3
                            .map_ok(TryInto::try_into)
556
3
                            .await
?0
557
3
                    });
558
1
                } else if file_type.is_symlink() {
  Branch (558:27): [Folded - Ignored]
  Branch (558:27): [Folded - Ignored]
  Branch (558:27): [True: 1, False: 0]
  Branch (558:27): [True: 0, False: 0]
559
1
                    symlink_futures.push(
560
1
                        upload_symlink(full_path, &full_work_directory)
561
1
                            .map(|symlink| symlink
?0
.try_into()),
562
                    );
563
0
                }
564
            }
565
        }
566
567
3
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
568
3
            file_futures.try_collect::<Vec<FileNode>>(),
569
3
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
570
3
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
571
3
        )
572
3
        .await
?0
;
573
574
3
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
575
        // For efficiency we use a deque because it allows cheap concat of Vecs.
576
        // We make the assumption here that when performance is important it is because
577
        // our directory is quite large. This allows us to cheaply merge large amounts of
578
        // directories into one VecDeque. Then after we are done we need to collapse it
579
        // down into a single Vec.
580
3
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
581
4
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
582
1
            directory_nodes.push(directory_node);
583
1
            all_child_directories.append(&mut recursive_child_directories);
584
1
        }
585
586
3
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
587
3
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
588
3
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
589
590
3
        let directory = Directory {
591
3
            files: file_nodes,
592
3
            directories: directory_nodes,
593
3
            symlinks,
594
3
            node_properties: None, // We don't support file properties.
595
3
        };
596
3
        all_child_directories.push_back(directory.clone());
597
598
3
        Ok((directory, all_child_directories))
599
3
    })
600
3
}
601
602
0
async fn process_side_channel_file(
603
0
    side_channel_file: Cow<'_, OsStr>,
604
0
    args: &[&OsStr],
605
0
    timeout: Duration,
606
0
) -> Result<Option<Error>, Error> {
607
0
    let mut json_contents = String::new();
608
    {
609
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
610
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
611
0
            Ok(file_slot) => file_slot,
612
0
            Err(e) => {
613
0
                if e.code != Code::NotFound {
  Branch (613:20): [Folded - Ignored]
  Branch (613:20): [Folded - Ignored]
  Branch (613:20): [True: 0, False: 0]
614
0
                    return Err(e).err_tip(|| "Error opening side channel file");
615
0
                }
616
                // Note: If file does not exist, it's ok. Users are not required to create this file.
617
0
                return Ok(None);
618
            }
619
        };
620
0
        file_slot
621
0
            .read_to_string(&mut json_contents)
622
0
            .await
623
0
            .err_tip(|| "Error reading side channel file")?;
624
    }
625
626
0
    let side_channel_info: SideChannelInfo =
627
0
        serde_json5::from_str(&json_contents).map_err(|e| {
628
0
            make_input_err!(
629
                "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}"
630
            )
631
0
        })?;
632
0
    Ok(side_channel_info.failure.map(|failure| match failure {
633
0
        SideChannelFailureReason::Timeout => Error::new(
634
0
            Code::DeadlineExceeded,
635
0
            format!(
636
0
                "Command '{}' timed out after {} seconds",
637
0
                args.join(OsStr::new(" ")).to_string_lossy(),
638
0
                timeout.as_secs_f32()
639
            ),
640
        ),
641
0
    }))
642
0
}
643
644
17
async fn do_cleanup(
645
17
    running_actions_manager: &Arc<RunningActionsManagerImpl>,
646
17
    operation_id: &OperationId,
647
17
    action_directory: &str,
648
17
) -> Result<(), Error> {
649
    // Mark this operation as being cleaned up
650
17
    let Some(_cleaning_guard) = running_actions_manager.perform_cleanup(operation_id.clone())
  Branch (650:9): [True: 0, False: 0]
  Branch (650:9): [Folded - Ignored]
  Branch (650:9): [True: 17, False: 0]
651
    else {
652
        // Cleanup is already happening elsewhere.
653
0
        return Ok(());
654
    };
655
656
17
    debug!("Worker cleaning up");
657
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
658
17
    let remove_dir_result = fs::remove_dir_all(action_directory)
659
17
        .await
660
17
        .err_tip(|| format!(
"Could not remove working directory {action_directory}"0
));
661
662
17
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
  Branch (662:12): [True: 0, False: 0]
  Branch (662:12): [Folded - Ignored]
  Branch (662:12): [True: 0, False: 17]
663
0
        error!(%operation_id, ?err, "Error cleaning up action");
664
0
        Result::<(), Error>::Err(err).merge(remove_dir_result)
665
17
    } else if let Err(
err0
) = remove_dir_result {
  Branch (665:19): [True: 0, False: 0]
  Branch (665:19): [Folded - Ignored]
  Branch (665:19): [True: 0, False: 17]
666
0
        error!(%operation_id, ?err, "Error removing working directory");
667
0
        Err(err)
668
    } else {
669
17
        Ok(())
670
    }
671
17
}
672
673
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
674
    /// Returns the action id of the action.
675
    fn get_operation_id(&self) -> &OperationId;
676
677
    /// Anything that needs to execute before the actions is actually executed should happen here.
678
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
679
680
    /// Actually perform the execution of the action.
681
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
682
683
    /// Any uploading, processing or analyzing of the results should happen here.
684
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
685
686
    /// Cleanup any residual files, handles or other junk resulting from running the action.
687
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
688
689
    /// Returns the final result. As a general rule this action should be thought of as
690
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
691
    /// is over and any action performed on it after this call is undefined behavior.
692
    fn get_finished_result(
693
        self: Arc<Self>,
694
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
695
696
    /// Returns the work directory of the action.
697
    fn get_work_directory(&self) -> &String;
698
}
699
700
#[derive(Debug)]
701
struct RunningActionImplExecutionResult {
702
    stdout: Bytes,
703
    stderr: Bytes,
704
    exit_code: i32,
705
}
706
707
#[derive(Debug)]
708
struct RunningActionImplState {
709
    command_proto: Option<ProtoCommand>,
710
    // TODO(palfrey) Kill is not implemented yet, but is instrumented.
711
    // However, it is used if the worker disconnects to destroy current jobs.
712
    kill_channel_tx: Option<oneshot::Sender<()>>,
713
    kill_channel_rx: Option<oneshot::Receiver<()>>,
714
    execution_result: Option<RunningActionImplExecutionResult>,
715
    action_result: Option<ActionResult>,
716
    execution_metadata: ExecutionMetadata,
717
    // If there was an internal error, this will be set.
718
    // This should NOT be set if everything was fine, but the process had a
719
    // non-zero exit code. Instead this should be used for internal errors
720
    // that prevented the action from running, upload failures, timeouts, exc...
721
    // but we have (or could have) the action results (like stderr/stdout).
722
    error: Option<Error>,
723
}
724
725
#[derive(Debug)]
726
pub struct RunningActionImpl {
727
    operation_id: OperationId,
728
    action_directory: String,
729
    work_directory: String,
730
    action_info: ActionInfo,
731
    timeout: Duration,
732
    running_actions_manager: Arc<RunningActionsManagerImpl>,
733
    state: Mutex<RunningActionImplState>,
734
    has_manager_entry: AtomicBool,
735
    did_cleanup: AtomicBool,
736
}
737
738
impl RunningActionImpl {
739
18
    pub fn new(
740
18
        execution_metadata: ExecutionMetadata,
741
18
        operation_id: OperationId,
742
18
        action_directory: String,
743
18
        action_info: ActionInfo,
744
18
        timeout: Duration,
745
18
        running_actions_manager: Arc<RunningActionsManagerImpl>,
746
18
    ) -> Self {
747
18
        let work_directory = format!("{}/{}", action_directory, "work");
748
18
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
749
18
        Self {
750
18
            operation_id,
751
18
            action_directory,
752
18
            work_directory,
753
18
            action_info,
754
18
            timeout,
755
18
            running_actions_manager,
756
18
            state: Mutex::new(RunningActionImplState {
757
18
                command_proto: None,
758
18
                kill_channel_rx: Some(kill_channel_rx),
759
18
                kill_channel_tx: Some(kill_channel_tx),
760
18
                execution_result: None,
761
18
                action_result: None,
762
18
                execution_metadata,
763
18
                error: None,
764
18
            }),
765
18
            // Always need to ensure that we're removed from the manager on Drop.
766
18
            has_manager_entry: AtomicBool::new(true),
767
18
            // Only needs to be cleaned up after a prepare_action call, set there.
768
18
            did_cleanup: AtomicBool::new(true),
769
18
        }
770
18
    }
771
772
    #[allow(
773
        clippy::missing_const_for_fn,
774
        reason = "False positive on stable, but not on nightly"
775
    )]
776
0
    fn metrics(&self) -> &Arc<Metrics> {
777
0
        &self.running_actions_manager.metrics
778
0
    }
779
780
    /// Prepares any actions needed to execute this action. This action will do the following:
781
    ///
782
    /// * Download any files needed to execute the action
783
    /// * Build a folder with all files needed to execute the action.
784
    ///
785
    /// This function will aggressively download and spawn potentially thousands of futures. It is
786
    /// up to the stores to rate limit if needed.
787
15
    async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error> {
788
15
        {
789
15
            let mut state = self.state.lock();
790
15
            state.execution_metadata.input_fetch_start_timestamp =
791
15
                (self.running_actions_manager.callbacks.now_fn)();
792
15
        }
793
15
        let command = {
794
            // Download and build out our input files/folders. Also fetch and decode our Command.
795
15
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
796
15
                get_and_decode_digest::<ProtoCommand>(
797
15
                    self.running_actions_manager.cas_store.as_ref(),
798
15
                    self.action_info.command_digest.into(),
799
15
                )
800
15
                .await
801
15
                .err_tip(|| "Converting command_digest to Command")
802
15
            });
803
15
            let filesystem_store_pin =
804
15
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
805
15
            let (command, ()) = try_join(command_fut, async {
806
15
                fs::create_dir(&self.work_directory)
807
15
                    .await
808
15
                    .err_tip(|| format!(
"Error creating work directory {}"0
,
self.work_directory0
))
?0
;
809
                // Now the work directory has been created, we have to clean up.
810
15
                self.did_cleanup.store(false, Ordering::Release);
811
                // Download the input files/folder and place them into the temp directory.
812
                // Use directory cache if available for better performance.
813
15
                self.metrics()
814
15
                    .download_to_directory
815
15
                    .wrap(prepare_action_inputs(
816
15
                        &self.running_actions_manager.directory_cache,
817
15
                        &self.running_actions_manager.cas_store,
818
15
                        filesystem_store_pin,
819
15
                        &self.action_info.input_root_digest,
820
15
                        &self.work_directory,
821
15
                    ))
822
15
                    .await
823
15
            })
824
15
            .await
?0
;
825
15
            command
826
        };
827
        {
828
            // Create all directories needed for our output paths. This is required by the bazel spec.
829
15
            let prepare_output_directories = |output_file| 
{9
830
9
                let full_output_path = if command.working_directory.is_empty() {
  Branch (830:43): [Folded - Ignored]
  Branch (830:43): [Folded - Ignored]
  Branch (830:43): [True: 1, False: 8]
831
1
                    format!("{}/{}", self.work_directory, output_file)
832
                } else {
833
8
                    format!(
834
8
                        "{}/{}/{}",
835
8
                        self.work_directory, command.working_directory, output_file
836
                    )
837
                };
838
9
                async move {
839
9
                    let full_parent_path = Path::new(&full_output_path)
840
9
                        .parent()
841
9
                        .err_tip(|| format!(
"Parent path for {full_output_path} has no parent"0
))
?0
;
842
9
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
843
0
                        format!(
844
0
                            "Error creating output directory {} (file)",
845
0
                            full_parent_path.display()
846
                        )
847
0
                    })?;
848
9
                    Result::<(), Error>::Ok(())
849
9
                }
850
9
            };
851
15
            self.metrics()
852
15
                .prepare_output_files
853
15
                .wrap(try_join_all(
854
15
                    command.output_files.iter().map(prepare_output_directories),
855
15
                ))
856
15
                .await
?0
;
857
15
            self.metrics()
858
15
                .prepare_output_paths
859
15
                .wrap(try_join_all(
860
15
                    command.output_paths.iter().map(prepare_output_directories),
861
15
                ))
862
15
                .await
?0
;
863
        }
864
15
        debug!(?command, "Worker received command");
865
15
        {
866
15
            let mut state = self.state.lock();
867
15
            state.command_proto = Some(command);
868
15
            state.execution_metadata.input_fetch_completed_timestamp =
869
15
                (self.running_actions_manager.callbacks.now_fn)();
870
15
        }
871
15
        Ok(self)
872
15
    }
873
874
13
    async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error> {
875
13
        let (command_proto, mut kill_channel_rx) = {
876
13
            let mut state = self.state.lock();
877
13
            state.execution_metadata.execution_start_timestamp =
878
13
                (self.running_actions_manager.callbacks.now_fn)();
879
            (
880
13
                state
881
13
                    .command_proto
882
13
                    .take()
883
13
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
884
13
                state
885
13
                    .kill_channel_rx
886
13
                    .take()
887
13
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
888
                    // This is important as we may be killed at any point.
889
13
                    .fuse(),
890
            )
891
        };
892
13
        if command_proto.arguments.is_empty() {
  Branch (892:12): [Folded - Ignored]
  Branch (892:12): [Folded - Ignored]
  Branch (892:12): [True: 0, False: 13]
893
0
            return Err(make_input_err!("No arguments provided in Command proto"));
894
13
        }
895
13
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
  Branch (895:40): [Folded - Ignored]
  Branch (895:40): [Folded - Ignored]
  Branch (895:40): [True: 0, False: 13]
896
13
            .running_actions_manager
897
13
            .execution_configuration
898
13
            .entrypoint
899
        {
900
0
            core::iter::once(entrypoint.as_ref())
901
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
902
0
                .collect()
903
        } else {
904
13
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
905
        };
906
        // TODO(palfrey): This should probably be in debug, but currently
907
        //                    that's too busy and we often rely on this to
908
        //                    figure out toolchain misconfiguration issues.
909
        //                    De-bloat the `debug` level by using the `trace`
910
        //                    level more effectively and adjust this.
911
13
        info!(?args, "Executing command",);
912
13
        let mut command_builder = process::Command::new(args[0]);
913
13
        command_builder
914
13
            .args(&args[1..])
915
13
            .kill_on_drop(true)
916
13
            .stdin(Stdio::null())
917
13
            .stdout(Stdio::piped())
918
13
            .stderr(Stdio::piped())
919
13
            .current_dir(format!(
920
13
                "{}/{}",
921
13
                self.work_directory, command_proto.working_directory
922
13
            ))
923
13
            .env_clear();
924
925
13
        let requested_timeout = if self.action_info.timeout.is_zero() {
  Branch (925:36): [Folded - Ignored]
  Branch (925:36): [Folded - Ignored]
  Branch (925:36): [True: 11, False: 2]
926
11
            self.running_actions_manager.max_action_timeout
927
        } else {
928
2
            self.action_info.timeout
929
        };
930
931
13
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
932
13
        if let Some(
additional_environment0
) = &self
  Branch (932:16): [Folded - Ignored]
  Branch (932:16): [Folded - Ignored]
  Branch (932:16): [True: 0, False: 13]
933
13
            .running_actions_manager
934
13
            .execution_configuration
935
13
            .additional_environment
936
        {
937
0
            for (name, source) in additional_environment {
938
0
                let value = match source {
939
0
                    EnvironmentSource::Property(property) => self
940
0
                        .action_info
941
0
                        .platform_properties
942
0
                        .get(property)
943
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
944
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
945
                    EnvironmentSource::TimeoutMillis => {
946
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
947
                    }
948
                    EnvironmentSource::SideChannelFile => {
949
0
                        let file_cow =
950
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
951
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
952
0
                        Cow::Owned(file_cow)
953
                    }
954
                    EnvironmentSource::ActionDirectory => {
955
0
                        Cow::Borrowed(self.action_directory.as_str())
956
                    }
957
                };
958
0
                command_builder.env(name, value.as_ref());
959
            }
960
13
        }
961
962
        #[cfg(target_family = "unix")]
963
13
        let envs = &command_proto.environment_variables;
964
        // If SystemRoot is not set on windows we set it to default. Failing to do
965
        // this causes all commands to fail.
966
        #[cfg(target_family = "windows")]
967
        let envs = {
968
            let mut envs = command_proto.environment_variables.clone();
969
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
970
                envs.push(
971
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
972
                        name: "SystemRoot".to_string(),
973
                        value: "C:\\Windows".to_string(),
974
                    },
975
                );
976
            }
977
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
978
                envs.push(
979
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
980
                        name: "PATH".to_string(),
981
                        value: "C:\\Windows\\System32".to_string(),
982
                    },
983
                );
984
            }
985
            envs
986
        };
987
26
        for 
environment_variable13
in envs {
988
13
            command_builder.env(&environment_variable.name, &environment_variable.value);
989
13
        }
990
991
13
        let mut child_process = command_builder
992
13
            .spawn()
993
13
            .err_tip(|| format!(
"Could not execute command {args:?}"0
))
?0
;
994
13
        let mut stdout_reader = child_process
995
13
            .stdout
996
13
            .take()
997
13
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
998
13
        let mut stderr_reader = child_process
999
13
            .stderr
1000
13
            .take()
1001
13
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
1002
1003
13
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
1004
0
            let result: Result<Option<std::process::ExitStatus>, std::io::Error> =
1005
0
                child_process.try_wait();
1006
0
            match result {
1007
0
                Ok(res) if res.is_some() => {
  Branch (1007:28): [Folded - Ignored]
  Branch (1007:28): [Folded - Ignored]
  Branch (1007:28): [True: 0, False: 0]
1008
0
                    // The child already exited, probably a timeout or kill operation
1009
0
                }
1010
0
                result => {
1011
0
                    error!(
1012
                        ?result,
1013
0
                        "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
1014
                    );
1015
0
                    background_spawn!("running_actions_manager_kill_child_process", async move {
1016
0
                        child_process.kill().await
1017
0
                    });
1018
                }
1019
            }
1020
0
        });
1021
1022
13
        let all_stdout_fut = spawn!("stdout_reader", async move {
1023
13
            let mut all_stdout = BytesMut::new();
1024
            loop {
1025
16
                let sz = stdout_reader
1026
16
                    .read_buf(&mut all_stdout)
1027
16
                    .await
1028
16
                    .err_tip(|| "Error reading stdout stream")
?0
;
1029
16
                if sz == 0 {
  Branch (1029:20): [Folded - Ignored]
  Branch (1029:20): [Folded - Ignored]
  Branch (1029:20): [True: 13, False: 3]
1030
13
                    break; // EOF.
1031
3
                }
1032
            }
1033
13
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
1034
13
        });
1035
13
        let all_stderr_fut = spawn!("stderr_reader", async move {
1036
13
            let mut all_stderr = BytesMut::new();
1037
            loop {
1038
16
                let sz = stderr_reader
1039
16
                    .read_buf(&mut all_stderr)
1040
16
                    .await
1041
16
                    .err_tip(|| "Error reading stderr stream")
?0
;
1042
16
                if sz == 0 {
  Branch (1042:20): [Folded - Ignored]
  Branch (1042:20): [Folded - Ignored]
  Branch (1042:20): [True: 13, False: 3]
1043
13
                    break; // EOF.
1044
3
                }
1045
            }
1046
13
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
1047
13
        });
1048
13
        let mut killed_action = false;
1049
1050
13
        let timer = self.metrics().child_process.begin_timer();
1051
13
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
1052
        loop {
1053
17
            tokio::select! {
1054
17
                () = &mut sleep_fut => {
1055
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
1056
2
                    killed_action = true;
1057
2
                    if let Err(
err0
) = child_process_guard.kill().await {
  Branch (1057:28): [Folded - Ignored]
  Branch (1057:28): [Folded - Ignored]
  Branch (1057:28): [True: 0, False: 2]
1058
0
                        error!(
1059
                            ?err,
1060
0
                            "Could not kill process in RunningActionsManager for action timeout",
1061
                        );
1062
2
                    }
1063
                    {
1064
2
                        let joined_command = args.join(OsStr::new(" "));
1065
2
                        let command = joined_command.to_string_lossy();
1066
2
                        info!(
1067
2
                            seconds = self.action_info.timeout.as_secs_f32(),
1068
                            %command,
1069
2
                            "Command timed out"
1070
                        );
1071
2
                        let mut state = self.state.lock();
1072
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1073
2
                            Code::DeadlineExceeded,
1074
2
                            format!(
1075
2
                                "Command '{}' timed out after {} seconds",
1076
2
                                command,
1077
2
                                self.action_info.timeout.as_secs_f32()
1078
2
                            )
1079
2
                        )));
1080
                    }
1081
                },
1082
17
                
maybe_exit_status13
= child_process_guard.wait() => {
1083
                    // Defuse our guard so it does not try to cleanup and make senseless logs.
1084
13
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
1085
13
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
1086
                    // TODO(palfrey) We should implement stderr/stdout streaming to client here.
1087
                    // If we get killed before the stream is started, then these will lock up.
1088
                    // TODO(palfrey) There is a significant bug here. If we kill the action and the action creates
1089
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
1090
13
                    let (stdout, stderr) = if killed_action {
  Branch (1090:47): [Folded - Ignored]
  Branch (1090:47): [Folded - Ignored]
  Branch (1090:47): [True: 4, False: 9]
1091
4
                        drop(timer);
1092
4
                        (Bytes::new(), Bytes::new())
1093
                    } else {
1094
9
                        timer.measure();
1095
9
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
1096
                        (
1097
9
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
1098
9
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
1099
                        )
1100
                    };
1101
1102
13
                    let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| 
{9
1103
9
                        if exit_code == 0 {
  Branch (1103:28): [Folded - Ignored]
  Branch (1103:28): [Folded - Ignored]
  Branch (1103:28): [True: 8, False: 1]
1104
8
                            self.metrics().child_process_success_error_code.inc();
1105
8
                        } else {
1106
1
                            self.metrics().child_process_failure_error_code.inc();
1107
1
                        }
1108
9
                        exit_code
1109
9
                    });
1110
1111
13
                    info!(?args, "Command complete");
1112
1113
13
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
  Branch (1113:55): [Folded - Ignored]
  Branch (1113:55): [Folded - Ignored]
  Branch (1113:55): [True: 0, False: 13]
1114
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
1115
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
1116
                    } else {
1117
13
                        None
1118
                    };
1119
13
                    {
1120
13
                        let mut state = self.state.lock();
1121
13
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
1122
13
1123
13
                        state.command_proto = Some(command_proto);
1124
13
                        state.execution_result = Some(RunningActionImplExecutionResult{
1125
13
                            stdout,
1126
13
                            stderr,
1127
13
                            exit_code,
1128
13
                        });
1129
13
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
1130
13
                    }
1131
13
                    return Ok(self);
1132
                },
1133
17
                _ = &mut kill_channel_rx => {
1134
2
                    killed_action = true;
1135
2
                    if let Err(
err0
) = child_process_guard.kill().await {
  Branch (1135:28): [Folded - Ignored]
  Branch (1135:28): [Folded - Ignored]
  Branch (1135:28): [True: 0, False: 2]
1136
0
                        error!(
1137
0
                            operation_id = ?self.operation_id,
1138
                            ?err,
1139
0
                            "Could not kill process",
1140
                        );
1141
2
                    }
1142
2
                    {
1143
2
                        let mut state = self.state.lock();
1144
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1145
2
                            Code::Aborted,
1146
2
                            format!(
1147
2
                                "Command '{}' was killed by scheduler",
1148
2
                                args.join(OsStr::new(" ")).to_string_lossy()
1149
2
                            )
1150
2
                        )));
1151
2
                    }
1152
                },
1153
            }
1154
        }
1155
        // Unreachable.
1156
13
    }
1157
1158
11
    
async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1159
        enum OutputType {
1160
            None,
1161
            File(FileInfo),
1162
            Directory(DirectoryInfo),
1163
            FileSymlink(SymlinkInfo),
1164
            DirectorySymlink(SymlinkInfo),
1165
        }
1166
1167
11
        debug!("Worker uploading results",);
1168
11
        let (mut command_proto, execution_result, mut execution_metadata) = {
1169
11
            let mut state = self.state.lock();
1170
11
            state.execution_metadata.output_upload_start_timestamp =
1171
11
                (self.running_actions_manager.callbacks.now_fn)();
1172
            (
1173
11
                state
1174
11
                    .command_proto
1175
11
                    .take()
1176
11
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1177
11
                state
1178
11
                    .execution_result
1179
11
                    .take()
1180
11
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1181
11
                state.execution_metadata.clone(),
1182
            )
1183
        };
1184
11
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1185
11
        let hasher = self.action_info.unique_qualifier.digest_function();
1186
1187
11
        let mut output_path_futures = FuturesUnordered::new();
1188
11
        let mut output_paths = command_proto.output_paths;
1189
11
        if output_paths.is_empty() {
  Branch (1189:12): [Folded - Ignored]
  Branch (1189:12): [Folded - Ignored]
  Branch (1189:12): [True: 6, False: 5]
1190
6
            output_paths
1191
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1192
6
            output_paths.append(&mut command_proto.output_files);
1193
6
            output_paths.append(&mut command_proto.output_directories);
1194
6
        
}5
1195
11
        let digest_uploaders = Arc::new(Mutex::new(HashMap::new()));
1196
18
        for 
entry7
in output_paths {
1197
7
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
  Branch (1197:47): [Folded - Ignored]
  Branch (1197:47): [Folded - Ignored]
  Branch (1197:47): [True: 0, False: 7]
1198
0
                format!("{}/{}", self.work_directory, entry)
1199
            } else {
1200
7
                format!(
1201
7
                    "{}/{}/{}",
1202
7
                    self.work_directory, command_proto.working_directory, entry
1203
                )
1204
            });
1205
7
            let work_directory = &self.work_directory;
1206
7
            let digest_uploaders = digest_uploaders.clone();
1207
7
            output_path_futures.push(async move {
1208
3
                let metadata = {
1209
7
                    let metadata = match fs::symlink_metadata(&full_path).await {
1210
7
                        Ok(file) => file,
1211
0
                        Err(e) => {
1212
0
                            if e.code == Code::NotFound {
  Branch (1212:32): [Folded - Ignored]
  Branch (1212:32): [Folded - Ignored]
  Branch (1212:32): [True: 0, False: 0]
1213
                                // In the event our output does not exist, according to the bazel remote
1214
                                // execution spec, we simply ignore it continue.
1215
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1216
0
                            }
1217
0
                            return Err(e).err_tip(|| {
1218
0
                                format!("Could not open file {}", full_path.display())
1219
0
                            });
1220
                        }
1221
                    };
1222
1223
7
                    if metadata.is_file() {
  Branch (1223:24): [Folded - Ignored]
  Branch (1223:24): [Folded - Ignored]
  Branch (1223:24): [True: 4, False: 3]
1224
                        return Ok(OutputType::File(
1225
4
                            upload_file(
1226
4
                                cas_store.as_pin(),
1227
4
                                &full_path,
1228
4
                                hasher,
1229
4
                                metadata,
1230
4
                                digest_uploaders,
1231
4
                            )
1232
4
                            .await
1233
4
                            .map(|mut file_info| {
1234
4
                                file_info.name_or_path = NameOrPath::Path(entry);
1235
4
                                file_info
1236
4
                            })
1237
4
                            .err_tip(|| format!(
"Uploading file {}"0
,
full_path.display()0
))
?0
,
1238
                        ));
1239
3
                    }
1240
3
                    metadata
1241
                };
1242
3
                if metadata.is_dir() {
  Branch (1242:20): [Folded - Ignored]
  Branch (1242:20): [Folded - Ignored]
  Branch (1242:20): [True: 2, False: 1]
1243
                    Ok(OutputType::Directory(
1244
2
                        upload_directory(
1245
2
                            cas_store.as_pin(),
1246
2
                            &full_path,
1247
2
                            work_directory,
1248
2
                            hasher,
1249
2
                            digest_uploaders,
1250
                        )
1251
2
                        .and_then(|(root_dir, children)| async move {
1252
2
                            let tree = ProtoTree {
1253
2
                                root: Some(root_dir),
1254
2
                                children: children.into(),
1255
2
                            };
1256
2
                            let tree_digest = serialize_and_upload_message(
1257
2
                                &tree,
1258
2
                                cas_store.as_pin(),
1259
2
                                &mut hasher.hasher(),
1260
2
                            )
1261
2
                            .await
1262
2
                            .err_tip(|| format!(
"While processing {entry}"0
))
?0
;
1263
2
                            Ok(DirectoryInfo {
1264
2
                                path: entry,
1265
2
                                tree_digest,
1266
2
                            })
1267
4
                        })
1268
2
                        .await
1269
2
                        .err_tip(|| format!(
"Uploading directory {}"0
,
full_path.display()0
))
?0
,
1270
                    ))
1271
1
                } else if metadata.is_symlink() {
  Branch (1271:27): [Folded - Ignored]
  Branch (1271:27): [Folded - Ignored]
  Branch (1271:27): [True: 1, False: 0]
1272
1
                    let output_symlink = upload_symlink(&full_path, work_directory)
1273
1
                        .await
1274
1
                        .map(|mut symlink_info| {
1275
1
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1276
1
                            symlink_info
1277
1
                        })
1278
1
                        .err_tip(|| format!(
"Uploading symlink {}"0
,
full_path.display()0
))
?0
;
1279
1
                    match fs::metadata(&full_path).await {
1280
1
                        Ok(metadata) => {
1281
1
                            if metadata.is_dir() {
  Branch (1281:32): [Folded - Ignored]
  Branch (1281:32): [Folded - Ignored]
  Branch (1281:32): [True: 0, False: 1]
1282
0
                                Ok(OutputType::DirectorySymlink(output_symlink))
1283
                            } else {
1284
                                // Note: If it's anything but directory we put it as a file symlink.
1285
1
                                Ok(OutputType::FileSymlink(output_symlink))
1286
                            }
1287
                        }
1288
0
                        Err(e) => {
1289
0
                            if e.code != Code::NotFound {
  Branch (1289:32): [Folded - Ignored]
  Branch (1289:32): [Folded - Ignored]
  Branch (1289:32): [True: 0, False: 0]
1290
0
                                return Err(e).err_tip(|| {
1291
0
                                    format!(
1292
0
                                        "While querying target symlink metadata for {}",
1293
0
                                        full_path.display()
1294
                                    )
1295
0
                                });
1296
0
                            }
1297
                            // If the file doesn't exist, we consider it a file. Even though the
1298
                            // file doesn't exist we still need to populate an entry.
1299
0
                            Ok(OutputType::FileSymlink(output_symlink))
1300
                        }
1301
                    }
1302
                } else {
1303
0
                    Err(make_err!(
1304
0
                        Code::Internal,
1305
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1306
0
                    ))
1307
                }
1308
7
            });
1309
        }
1310
11
        let mut output_files = vec![];
1311
11
        let mut output_folders = vec![];
1312
11
        let mut output_directory_symlinks = vec![];
1313
11
        let mut output_file_symlinks = vec![];
1314
1315
11
        if execution_result.exit_code != 0 {
  Branch (1315:12): [Folded - Ignored]
  Branch (1315:12): [Folded - Ignored]
  Branch (1315:12): [True: 5, False: 6]
1316
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1317
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1318
5
            error!(
1319
                exit_code = ?execution_result.exit_code,
1320
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1321
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1322
5
                "Command returned non-zero exit code",
1323
            );
1324
6
        }
1325
1326
11
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1327
11
            let data = execution_result.stdout;
1328
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1329
11
            cas_store
1330
11
                .update_oneshot(digest, data)
1331
11
                .await
1332
11
                .err_tip(|| "Uploading stdout")
?0
;
1333
11
            Result::<DigestInfo, Error>::Ok(digest)
1334
11
        });
1335
11
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1336
11
            let data = execution_result.stderr;
1337
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1338
11
            cas_store
1339
11
                .update_oneshot(digest, data)
1340
11
                .await
1341
11
                .err_tip(|| "Uploading stdout")
?0
;
1342
11
            Result::<DigestInfo, Error>::Ok(digest)
1343
11
        });
1344
1345
11
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1346
18
            while let Some(
output_type7
) = output_path_futures.try_next().await
?0
{
  Branch (1346:23): [Folded - Ignored]
  Branch (1346:23): [Folded - Ignored]
  Branch (1346:23): [True: 7, False: 11]
1347
7
                match output_type {
1348
4
                    OutputType::File(output_file) => output_files.push(output_file),
1349
2
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1350
1
                    OutputType::FileSymlink(output_symlink) => {
1351
1
                        output_file_symlinks.push(output_symlink);
1352
1
                    }
1353
0
                    OutputType::DirectorySymlink(output_symlink) => {
1354
0
                        output_directory_symlinks.push(output_symlink);
1355
0
                    }
1356
0
                    OutputType::None => { /* Safe to ignore */ }
1357
                }
1358
            }
1359
11
            Ok(())
1360
11
        });
1361
11
        drop(output_path_futures);
1362
11
        let (stdout_digest, stderr_digest) = match upload_result {
1363
11
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1364
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1365
        };
1366
1367
11
        execution_metadata.output_upload_completed_timestamp =
1368
11
            (self.running_actions_manager.callbacks.now_fn)();
1369
11
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1370
11
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1371
11
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1372
11
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1373
11
        {
1374
11
            let mut state = self.state.lock();
1375
11
            execution_metadata.worker_completed_timestamp =
1376
11
                (self.running_actions_manager.callbacks.now_fn)();
1377
11
            state.action_result = Some(ActionResult {
1378
11
                output_files,
1379
11
                output_folders,
1380
11
                output_directory_symlinks,
1381
11
                output_file_symlinks,
1382
11
                exit_code: execution_result.exit_code,
1383
11
                stdout_digest,
1384
11
                stderr_digest,
1385
11
                execution_metadata,
1386
11
                server_logs: HashMap::default(), // TODO(palfrey) Not implemented.
1387
11
                error: state.error.clone(),
1388
11
                message: String::new(), // Will be filled in on cache_action_result if needed.
1389
11
            });
1390
11
        }
1391
11
        Ok(self)
1392
11
    }
1393
1394
11
    
async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1395
11
        let mut state = self.state.lock();
1396
11
        state
1397
11
            .action_result
1398
11
            .take()
1399
11
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1400
11
    }
1401
}
1402
1403
impl Drop for RunningActionImpl {
1404
18
    fn drop(&mut self) {
1405
18
        if self.did_cleanup.load(Ordering::Acquire) {
  Branch (1405:12): [True: 18, False: 0]
  Branch (1405:12): [Folded - Ignored]
1406
18
            if self.has_manager_entry.load(Ordering::Acquire) {
  Branch (1406:16): [True: 1, False: 17]
  Branch (1406:16): [Folded - Ignored]
1407
1
                drop(
1408
1
                    self.running_actions_manager
1409
1
                        .cleanup_action(&self.operation_id),
1410
1
                );
1411
17
            }
1412
18
            return;
1413
0
        }
1414
0
        let operation_id = self.operation_id.clone();
1415
0
        error!(
1416
            %operation_id,
1417
0
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1418
        );
1419
0
        let running_actions_manager = self.running_actions_manager.clone();
1420
0
        let action_directory = self.action_directory.clone();
1421
0
        background_spawn!("running_action_impl_drop", async move {
1422
0
            let Err(err) =
  Branch (1422:17): [True: 0, False: 0]
  Branch (1422:17): [Folded - Ignored]
1423
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1424
            else {
1425
0
                return;
1426
            };
1427
0
            error!(
1428
                %operation_id,
1429
                ?action_directory,
1430
                ?err,
1431
0
                "Error cleaning up action"
1432
            );
1433
0
        });
1434
18
    }
1435
}
1436
1437
impl RunningAction for RunningActionImpl {
1438
0
    fn get_operation_id(&self) -> &OperationId {
1439
0
        &self.operation_id
1440
0
    }
1441
1442
15
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1443
15
        let res = self
1444
15
            .metrics()
1445
15
            .clone()
1446
15
            .prepare_action
1447
15
            .wrap(Self::inner_prepare_action(self))
1448
15
            .await;
1449
15
        if let Err(
ref e0
) = res {
  Branch (1449:16): [Folded - Ignored]
  Branch (1449:16): [Folded - Ignored]
  Branch (1449:16): [True: 0, False: 15]
1450
0
            warn!(?e, "Error during prepare_action");
1451
15
        }
1452
15
        res
1453
15
    }
1454
1455
13
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1456
13
        let res = self
1457
13
            .metrics()
1458
13
            .clone()
1459
13
            .execute
1460
13
            .wrap(Self::inner_execute(self))
1461
13
            .await;
1462
13
        if let Err(
ref e0
) = res {
  Branch (1462:16): [Folded - Ignored]
  Branch (1462:16): [Folded - Ignored]
  Branch (1462:16): [True: 0, False: 13]
1463
0
            warn!(?e, "Error during prepare_action");
1464
13
        }
1465
13
        res
1466
13
    }
1467
1468
11
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1469
11
        let res = self
1470
11
            .metrics()
1471
11
            .clone()
1472
11
            .upload_results
1473
11
            .wrap(Self::inner_upload_results(self))
1474
11
            .await;
1475
11
        if let Err(
ref e0
) = res {
  Branch (1475:16): [Folded - Ignored]
  Branch (1475:16): [Folded - Ignored]
  Branch (1475:16): [True: 0, False: 11]
1476
0
            warn!(?e, "Error during upload_results");
1477
11
        }
1478
11
        res
1479
11
    }
1480
1481
17
    
async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1482
17
        let res = self
1483
17
            .metrics()
1484
17
            .clone()
1485
17
            .cleanup
1486
17
            .wrap(async move {
1487
17
                let result = do_cleanup(
1488
17
                    &self.running_actions_manager,
1489
17
                    &self.operation_id,
1490
17
                    &self.action_directory,
1491
17
                )
1492
17
                .await;
1493
17
                self.has_manager_entry.store(false, Ordering::Release);
1494
17
                self.did_cleanup.store(true, Ordering::Release);
1495
17
                result.map(move |()| self)
1496
17
            })
1497
17
            .await;
1498
17
        if let Err(
ref e0
) = res {
  Branch (1498:16): [Folded - Ignored]
  Branch (1498:16): [Folded - Ignored]
  Branch (1498:16): [True: 0, False: 17]
1499
0
            warn!(?e, "Error during cleanup");
1500
17
        }
1501
17
        res
1502
17
    }
1503
1504
11
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1505
11
        self.metrics()
1506
11
            .clone()
1507
11
            .get_finished_result
1508
11
            .wrap(Self::inner_get_finished_result(self))
1509
11
            .await
1510
11
    }
1511
1512
0
    fn get_work_directory(&self) -> &String {
1513
0
        &self.work_directory
1514
0
    }
1515
}
1516
1517
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1518
    type RunningAction: RunningAction;
1519
1520
    fn create_and_add_action(
1521
        self: &Arc<Self>,
1522
        worker_id: String,
1523
        start_execute: StartExecute,
1524
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1525
1526
    fn cache_action_result(
1527
        &self,
1528
        action_digest: DigestInfo,
1529
        action_result: &mut ActionResult,
1530
        hasher: DigestHasherFunc,
1531
    ) -> impl Future<Output = Result<(), Error>> + Send;
1532
1533
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1534
1535
    fn kill_operation(
1536
        &self,
1537
        operation_id: &OperationId,
1538
    ) -> impl Future<Output = Result<(), Error>> + Send;
1539
1540
    fn metrics(&self) -> &Arc<Metrics>;
1541
}
1542
1543
/// A function to get the current system time, used to allow mocking for tests
1544
type NowFn = fn() -> SystemTime;
1545
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1546
1547
/// Functions that may be injected for testing purposes, during standard control
1548
/// flows these are specified by the new function.
1549
#[derive(Clone, Copy)]
1550
pub struct Callbacks {
1551
    /// A function that gets the current time.
1552
    pub now_fn: NowFn,
1553
    /// A function that sleeps for a given Duration.
1554
    pub sleep_fn: SleepFn,
1555
}
1556
1557
impl Debug for Callbacks {
1558
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1559
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
1560
0
    }
1561
}
1562
1563
/// The set of additional information for executing an action over and above
1564
/// those given in the `ActionInfo` passed to the worker.  This allows
1565
/// modification of the action for execution on this particular worker.  This
1566
/// may be used to run the action with a particular set of additional
1567
/// environment variables, or perhaps configure it to execute within a
1568
/// container.
1569
#[derive(Debug, Default)]
1570
pub struct ExecutionConfiguration {
1571
    /// If set, will be executed instead of the first argument passed in the
1572
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
1573
    /// arguments to this command.
1574
    pub entrypoint: Option<String>,
1575
    /// The only environment variables that will be specified when the command
1576
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
1577
    /// and PATH are also assigned (see `inner_execute`).
1578
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1579
}
1580
1581
#[derive(Debug)]
1582
struct UploadActionResults {
1583
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1584
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1585
    ac_store: Option<Store>,
1586
    historical_store: Store,
1587
    success_message_template: Template,
1588
    failure_message_template: Template,
1589
}
1590
1591
impl UploadActionResults {
1592
26
    fn new(
1593
26
        config: &UploadActionResultConfig,
1594
26
        ac_store: Option<Store>,
1595
26
        historical_store: Store,
1596
26
    ) -> Result<Self, Error> {
1597
26
        let upload_historical_results_strategy = config
1598
26
            .upload_historical_results_strategy
1599
26
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1600
8
        if !matches!(
  Branch (1600:12): [True: 8, False: 18]
  Branch (1600:12): [Folded - Ignored]
1601
26
            config.upload_ac_results_strategy,
1602
            UploadCacheResultsStrategy::Never
1603
8
        ) && ac_store.is_none()
  Branch (1603:14): [True: 0, False: 8]
  Branch (1603:14): [Folded - Ignored]
1604
        {
1605
0
            return Err(make_input_err!(
1606
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1607
0
            ));
1608
26
        }
1609
        Ok(Self {
1610
26
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1611
26
            upload_historical_results_strategy,
1612
26
            ac_store,
1613
26
            historical_store,
1614
26
            success_message_template: Template::new(&config.success_message_template).map_err(
1615
0
                |e| {
1616
0
                    make_input_err!(
1617
                        "Could not convert success_message_template to rust template: {} : {e:?}",
1618
                        config.success_message_template
1619
                    )
1620
0
                },
1621
0
            )?,
1622
26
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1623
0
                |e| {
1624
0
                    make_input_err!(
1625
                        "Could not convert failure_message_template to rust template: {} : {e:?}",
1626
                        config.success_message_template
1627
                    )
1628
0
                },
1629
0
            )?,
1630
        })
1631
26
    }
1632
1633
0
    const fn should_cache_result(
1634
0
        strategy: UploadCacheResultsStrategy,
1635
0
        action_result: &ActionResult,
1636
0
        treat_infra_error_as_failure: bool,
1637
0
    ) -> bool {
1638
0
        let did_fail = action_result.exit_code != 0
  Branch (1638:24): [Folded - Ignored]
1639
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
  Branch (1639:17): [Folded - Ignored]
1640
0
        match strategy {
1641
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
1642
0
            UploadCacheResultsStrategy::Never => false,
1643
            // Never cache internal errors or timeouts.
1644
            UploadCacheResultsStrategy::Everything => {
1645
0
                treat_infra_error_as_failure || action_result.error.is_none()
  Branch (1645:17): [Folded - Ignored]
1646
            }
1647
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
1648
        }
1649
0
    }
1650
1651
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
1652
    /// or `failure_message_template` config templates.
1653
5
    fn format_execute_response_message(
1654
5
        mut template_str: Template,
1655
5
        action_digest_info: DigestInfo,
1656
5
        maybe_historical_digest_info: Option<DigestInfo>,
1657
5
        hasher: DigestHasherFunc,
1658
5
    ) -> Result<String, Error> {
1659
5
        template_str.replace(
1660
            "digest_function",
1661
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1662
        );
1663
5
        template_str.replace(
1664
            "action_digest_hash",
1665
5
            action_digest_info.packed_hash().to_string(),
1666
        );
1667
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1668
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
  Branch (1668:16): [True: 3, False: 2]
  Branch (1668:16): [Folded - Ignored]
1669
3
            template_str.replace(
1670
3
                "historical_results_hash",
1671
3
                format!("{}", historical_digest_info.packed_hash()),
1672
3
            );
1673
3
            template_str.replace(
1674
3
                "historical_results_size",
1675
3
                historical_digest_info.size_bytes(),
1676
3
            );
1677
3
        } else {
1678
2
            template_str.replace("historical_results_hash", "");
1679
2
            template_str.replace("historical_results_size", "");
1680
2
        }
1681
5
        template_str
1682
5
            .text()
1683
5
            .map_err(|e| make_input_err!("Could not convert template to text: {e:?}"))
1684
5
    }
1685
1686
5
    async fn upload_ac_results(
1687
5
        &self,
1688
5
        action_digest: DigestInfo,
1689
5
        action_result: ProtoActionResult,
1690
5
        hasher: DigestHasherFunc,
1691
5
    ) -> Result<(), Error> {
1692
5
        let Some(ac_store) = self.ac_store.as_ref() else {
  Branch (1692:13): [Folded - Ignored]
  Branch (1692:13): [Folded - Ignored]
  Branch (1692:13): [True: 5, False: 0]
1693
0
            return Ok(());
1694
        };
1695
        // If we are a GrpcStore we shortcut here, as this is a special store.
1696
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (1696:16): [Folded - Ignored]
  Branch (1696:16): [Folded - Ignored]
  Branch (1696:16): [True: 0, False: 5]
1697
0
            let update_action_request = UpdateActionResultRequest {
1698
0
                // This is populated by `update_action_result`.
1699
0
                instance_name: String::new(),
1700
0
                action_digest: Some(action_digest.into()),
1701
0
                action_result: Some(action_result),
1702
0
                results_cache_policy: None,
1703
0
                digest_function: hasher.proto_digest_func().into(),
1704
0
            };
1705
0
            return grpc_store
1706
0
                .update_action_result(Request::new(update_action_request))
1707
0
                .await
1708
0
                .map(|_| ())
1709
0
                .err_tip(|| "Caching ActionResult");
1710
5
        }
1711
1712
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1713
5
        action_result
1714
5
            .encode(&mut store_data)
1715
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
1716
1717
5
        ac_store
1718
5
            .update_oneshot(action_digest, store_data.split().freeze())
1719
5
            .await
1720
5
            .err_tip(|| "Caching ActionResult")
1721
5
    }
1722
1723
3
    async fn upload_historical_results_with_message(
1724
3
        &self,
1725
3
        action_digest: DigestInfo,
1726
3
        execute_response: ExecuteResponse,
1727
3
        message_template: Template,
1728
3
        hasher: DigestHasherFunc,
1729
3
    ) -> Result<String, Error> {
1730
3
        let historical_digest_info = serialize_and_upload_message(
1731
3
            &HistoricalExecuteResponse {
1732
3
                action_digest: Some(action_digest.into()),
1733
3
                execute_response: Some(execute_response.clone()),
1734
3
            },
1735
3
            self.historical_store.as_pin(),
1736
3
            &mut hasher.hasher(),
1737
3
        )
1738
3
        .await
1739
3
        .err_tip(|| format!(
"Caching HistoricalExecuteResponse for digest: {action_digest}"0
))
?0
;
1740
1741
3
        Self::format_execute_response_message(
1742
3
            message_template,
1743
3
            action_digest,
1744
3
            Some(historical_digest_info),
1745
3
            hasher,
1746
        )
1747
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
1748
3
    }
1749
1750
6
    async fn cache_action_result(
1751
6
        &self,
1752
6
        action_info: DigestInfo,
1753
6
        action_result: &mut ActionResult,
1754
6
        hasher: DigestHasherFunc,
1755
6
    ) -> Result<(), Error> {
1756
6
        let should_upload_historical_results =
1757
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1758
6
        let should_upload_ac_results =
1759
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1760
        // Shortcut so we don't need to convert to proto if not needed.
1761
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
  Branch (1761:12): [Folded - Ignored]
  Branch (1761:41): [Folded - Ignored]
  Branch (1761:12): [Folded - Ignored]
  Branch (1761:41): [Folded - Ignored]
  Branch (1761:12): [True: 1, False: 5]
  Branch (1761:41): [True: 1, False: 0]
1762
1
            return Ok(());
1763
5
        }
1764
1765
5
        let mut execute_response = to_execute_response(action_result.clone());
1766
1767
        // In theory exit code should always be != 0 if there's an error, but for safety we
1768
        // catch both.
1769
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
  Branch (1769:35): [Folded - Ignored]
  Branch (1769:67): [Folded - Ignored]
  Branch (1769:35): [Folded - Ignored]
  Branch (1769:67): [Folded - Ignored]
  Branch (1769:35): [True: 4, False: 1]
  Branch (1769:67): [True: 3, False: 1]
1770
3
            self.success_message_template.clone()
1771
        } else {
1772
2
            self.failure_message_template.clone()
1773
        };
1774
1775
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
  Branch (1775:64): [Folded - Ignored]
  Branch (1775:64): [Folded - Ignored]
  Branch (1775:64): [True: 3, False: 2]
1776
3
            let maybe_message = self
1777
3
                .upload_historical_results_with_message(
1778
3
                    action_info,
1779
3
                    execute_response.clone(),
1780
3
                    message_template,
1781
3
                    hasher,
1782
3
                )
1783
3
                .await;
1784
3
            match maybe_message {
1785
3
                Ok(message) => {
1786
3
                    action_result.message.clone_from(&message);
1787
3
                    execute_response.message = message;
1788
3
                    Ok(())
1789
                }
1790
0
                Err(e) => Result::<(), Error>::Err(e),
1791
            }
1792
        } else {
1793
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1794
            {
1795
2
                Ok(message) => {
1796
2
                    action_result.message.clone_from(&message);
1797
2
                    execute_response.message = message;
1798
2
                    Ok(())
1799
                }
1800
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1801
            }
1802
        };
1803
1804
        // Note: Done in this order because we assume most results will succeed and most configs will
1805
        // either always upload upload historical results or only upload on filure. In which case
1806
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1807
5
        let ac_upload_results = if should_upload_ac_results {
  Branch (1807:36): [Folded - Ignored]
  Branch (1807:36): [Folded - Ignored]
  Branch (1807:36): [True: 5, False: 0]
1808
5
            self.upload_ac_results(
1809
5
                action_info,
1810
5
                execute_response
1811
5
                    .result
1812
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
1813
5
                hasher,
1814
            )
1815
5
            .await
1816
        } else {
1817
0
            Ok(())
1818
        };
1819
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1820
6
    }
1821
}
1822
1823
#[derive(Debug)]
1824
pub struct RunningActionsManagerArgs<'a> {
1825
    pub root_action_directory: String,
1826
    pub execution_configuration: ExecutionConfiguration,
1827
    pub cas_store: Arc<FastSlowStore>,
1828
    pub ac_store: Option<Store>,
1829
    pub historical_store: Store,
1830
    pub upload_action_result_config: &'a UploadActionResultConfig,
1831
    pub max_action_timeout: Duration,
1832
    pub timeout_handled_externally: bool,
1833
    pub directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
1834
}
1835
1836
struct CleanupGuard {
1837
    manager: Weak<RunningActionsManagerImpl>,
1838
    operation_id: OperationId,
1839
}
1840
1841
impl Drop for CleanupGuard {
1842
17
    fn drop(&mut self) {
1843
17
        let Some(manager) = self.manager.upgrade() else {
  Branch (1843:13): [True: 17, False: 0]
  Branch (1843:13): [Folded - Ignored]
1844
0
            return;
1845
        };
1846
17
        let mut cleaning = manager.cleaning_up_operations.lock();
1847
17
        cleaning.remove(&self.operation_id);
1848
17
        manager.cleanup_complete_notify.notify_waiters();
1849
17
    }
1850
}
1851
1852
/// Holds state info about what is being executed and the interface for interacting
1853
/// with actions while they are running.
1854
#[derive(Debug)]
1855
pub struct RunningActionsManagerImpl {
1856
    root_action_directory: String,
1857
    execution_configuration: ExecutionConfiguration,
1858
    cas_store: Arc<FastSlowStore>,
1859
    filesystem_store: Arc<FilesystemStore>,
1860
    upload_action_results: UploadActionResults,
1861
    max_action_timeout: Duration,
1862
    timeout_handled_externally: bool,
1863
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
1864
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
1865
    // Notify does not support.
1866
    action_done_tx: watch::Sender<()>,
1867
    callbacks: Callbacks,
1868
    metrics: Arc<Metrics>,
1869
    /// Track operations being cleaned up to avoid directory collisions during action retries.
1870
    /// When an action fails and is retried on the same worker, we need to ensure the previous
1871
    /// attempt's directory is fully cleaned up before creating a new one.
1872
    /// See: <https://github.com/TraceMachina/nativelink/issues/1859>
1873
    cleaning_up_operations: Mutex<HashSet<OperationId>>,
1874
    /// Notify waiters when a cleanup operation completes. This is used in conjunction with
1875
    /// `cleaning_up_operations` to coordinate directory cleanup and creation.
1876
    cleanup_complete_notify: Arc<Notify>,
1877
    /// Optional directory cache for improving performance by caching reconstructed
1878
    /// input directories and using hardlinks.
1879
    directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
1880
}
1881
1882
impl RunningActionsManagerImpl {
1883
    /// Maximum time to wait for a cleanup operation to complete before timing out.
1884
    /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
1885
    const MAX_WAIT: Duration = Duration::from_secs(30);
1886
    /// Maximum backoff duration for exponential backoff when waiting for cleanup.
1887
    const MAX_BACKOFF: Duration = Duration::from_millis(500);
1888
26
    pub fn new_with_callbacks(
1889
26
        args: RunningActionsManagerArgs<'_>,
1890
26
        callbacks: Callbacks,
1891
26
    ) -> Result<Self, Error> {
1892
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
1893
26
        let filesystem_store = args
1894
26
            .cas_store
1895
26
            .fast_store()
1896
26
            .downcast_ref::<FilesystemStore>(None)
1897
26
            .err_tip(
1898
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
1899
0
            )?
1900
26
            .get_arc()
1901
26
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
1902
26
        let (action_done_tx, _) = watch::channel(());
1903
        Ok(Self {
1904
26
            root_action_directory: args.root_action_directory,
1905
26
            execution_configuration: args.execution_configuration,
1906
26
            cas_store: args.cas_store,
1907
26
            filesystem_store,
1908
26
            upload_action_results: UploadActionResults::new(
1909
26
                args.upload_action_result_config,
1910
26
                args.ac_store,
1911
26
                args.historical_store,
1912
            )
1913
26
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
1914
26
            max_action_timeout: args.max_action_timeout,
1915
26
            timeout_handled_externally: args.timeout_handled_externally,
1916
26
            running_actions: Mutex::new(HashMap::new()),
1917
26
            action_done_tx,
1918
26
            callbacks,
1919
26
            metrics: Arc::new(Metrics::default()),
1920
26
            cleaning_up_operations: Mutex::new(HashSet::new()),
1921
26
            cleanup_complete_notify: Arc::new(Notify::new()),
1922
26
            directory_cache: args.directory_cache,
1923
        })
1924
26
    }
1925
1926
12
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
1927
12
        Self::new_with_callbacks(
1928
12
            args,
1929
            Callbacks {
1930
12
                now_fn: SystemTime::now,
1931
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
1932
            },
1933
        )
1934
12
    }
1935
1936
    /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
1937
    /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
1938
    /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
1939
19
    
async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error>0
{
1940
19
        let start = Instant::now();
1941
19
        let mut backoff = Duration::from_millis(10);
1942
19
        let mut has_waited = false;
1943
1944
        loop {
1945
19
            let should_wait = {
1946
19
                let cleaning = self.cleaning_up_operations.lock();
1947
19
                cleaning.contains(operation_id)
1948
            };
1949
1950
19
            if !should_wait {
  Branch (1950:16): [Folded - Ignored]
  Branch (1950:16): [Folded - Ignored]
  Branch (1950:16): [True: 19, False: 0]
1951
19
                let dir_path =
1952
19
                    PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
1953
1954
19
                if !dir_path.exists() {
  Branch (1954:20): [Folded - Ignored]
  Branch (1954:20): [Folded - Ignored]
  Branch (1954:20): [True: 18, False: 1]
1955
18
                    return Ok(());
1956
1
                }
1957
1958
                // Safety check: ensure we're only removing directories under root_action_directory
1959
1
                let root_path = Path::new(&self.root_action_directory);
1960
1
                let canonical_root = root_path.canonicalize().err_tip(|| 
{0
1961
0
                    format!(
1962
0
                        "Failed to canonicalize root directory: {}",
1963
                        self.root_action_directory
1964
                    )
1965
0
                })?;
1966
1
                let canonical_dir = dir_path.canonicalize().err_tip(|| 
{0
1967
0
                    format!("Failed to canonicalize directory: {}", dir_path.display())
1968
0
                })?;
1969
1970
1
                if !canonical_dir.starts_with(&canonical_root) {
  Branch (1970:20): [Folded - Ignored]
  Branch (1970:20): [Folded - Ignored]
  Branch (1970:20): [True: 0, False: 1]
1971
0
                    return Err(make_err!(
1972
0
                        Code::Internal,
1973
0
                        "Attempted to remove directory outside of root_action_directory: {}",
1974
0
                        dir_path.display()
1975
0
                    ));
1976
1
                }
1977
1978
                // Directory exists but not being cleaned - remove it
1979
1
                warn!(
1980
1
                    "Removing stale directory for {}: {}",
1981
                    operation_id,
1982
1
                    dir_path.display()
1983
                );
1984
1
                self.metrics.stale_removals.inc();
1985
1986
                // Try to remove the directory, with one retry on failure
1987
1
                let remove_result = fs::remove_dir_all(&dir_path).await;
1988
1
                if let Err(
e0
) = remove_result {
  Branch (1988:24): [Folded - Ignored]
  Branch (1988:24): [Folded - Ignored]
  Branch (1988:24): [True: 0, False: 1]
1989
                    // Retry once after a short delay in case the directory is temporarily locked
1990
0
                    tokio::time::sleep(Duration::from_millis(100)).await;
1991
0
                    fs::remove_dir_all(&dir_path).await.err_tip(|| {
1992
0
                        format!(
1993
0
                            "Failed to remove stale directory {} for retry of {} after retry (original error: {})",
1994
0
                            dir_path.display(),
1995
                            operation_id,
1996
                            e
1997
                        )
1998
0
                    })?;
1999
1
                }
2000
1
                return Ok(());
2001
0
            }
2002
2003
0
            if start.elapsed() > Self::MAX_WAIT {
  Branch (2003:16): [Folded - Ignored]
  Branch (2003:16): [Folded - Ignored]
  Branch (2003:16): [True: 0, False: 0]
2004
0
                self.metrics.cleanup_wait_timeouts.inc();
2005
0
                return Err(make_err!(
2006
0
                    Code::DeadlineExceeded,
2007
0
                    "Timeout waiting for previous operation cleanup: {} (waited {:?})",
2008
0
                    operation_id,
2009
0
                    start.elapsed()
2010
0
                ));
2011
0
            }
2012
2013
0
            if !has_waited {
  Branch (2013:16): [Folded - Ignored]
  Branch (2013:16): [Folded - Ignored]
  Branch (2013:16): [True: 0, False: 0]
2014
0
                self.metrics.cleanup_waits.inc();
2015
0
                has_waited = true;
2016
0
            }
2017
2018
0
            trace!(
2019
0
                "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
2020
                operation_id,
2021
0
                start.elapsed(),
2022
                backoff
2023
            );
2024
2025
0
            tokio::select! {
2026
0
                () = self.cleanup_complete_notify.notified() => {},
2027
0
                () = tokio::time::sleep(backoff) => {
2028
0
                    // Exponential backoff
2029
0
                    backoff = (backoff * 2).min(Self::MAX_BACKOFF);
2030
0
                },
2031
            }
2032
        }
2033
19
    }
2034
2035
0
    fn make_action_directory<'a>(
2036
0
        &'a self,
2037
0
        operation_id: &'a OperationId,
2038
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
2039
19
        
self.metrics.make_action_directory0
.
wrap0
(async move {
2040
19
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
2041
19
            fs::create_dir(&action_directory)
2042
19
                .await
2043
19
                .err_tip(|| format!(
"Error creating action directory {action_directory}"0
))
?0
;
2044
19
            Ok(action_directory)
2045
19
        })
2046
0
    }
2047
2048
19
    fn create_action_info(
2049
19
        &self,
2050
19
        start_execute: StartExecute,
2051
19
        queued_timestamp: SystemTime,
2052
19
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
2053
19
        self.metrics.create_action_info.wrap(async move {
2054
19
            let execute_request = start_execute
2055
19
                .execute_request
2056
19
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
2057
19
            let action_digest: DigestInfo = execute_request
2058
19
                .action_digest
2059
19
                .clone()
2060
19
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
2061
19
                .try_into()
?0
;
2062
19
            let load_start_timestamp = (self.callbacks.now_fn)();
2063
19
            let action =
2064
19
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
2065
19
                    .await
2066
19
                    .err_tip(|| "During start_action")
?0
;
2067
19
            let action_info = ActionInfo::try_from_action_and_execute_request(
2068
19
                execute_request,
2069
19
                action,
2070
19
                load_start_timestamp,
2071
19
                queued_timestamp,
2072
            )
2073
19
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
2074
19
            Ok(action_info)
2075
19
        })
2076
19
    }
2077
2078
18
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
2079
18
        let mut running_actions = self.running_actions.lock();
2080
18
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
2081
0
            format!("Expected operation id '{operation_id}' to exist in RunningActionsManagerImpl")
2082
0
        });
2083
        // No need to copy anything, we just are telling the receivers an event happened.
2084
18
        self.action_done_tx.send_modify(|()| {});
2085
18
        result.map(|_| ())
2086
18
    }
2087
2088
    // Note: We do not capture metrics on this call, only `.kill_all()`.
2089
    // Important: When the future returns the process may still be running.
2090
2
    
async fn kill_operation(action: Arc<RunningActionImpl>)0
{
2091
2
        warn!(
2092
2
            operation_id = ?action.operation_id,
2093
2
            "Sending kill to running operation",
2094
        );
2095
2
        let kill_channel_tx = {
2096
2
            let mut action_state = action.state.lock();
2097
2
            action_state.kill_channel_tx.take()
2098
        };
2099
2
        if let Some(kill_channel_tx) = kill_channel_tx {
  Branch (2099:16): [Folded - Ignored]
  Branch (2099:16): [Folded - Ignored]
  Branch (2099:16): [True: 2, False: 0]
2100
2
            if kill_channel_tx.send(()).is_err() {
  Branch (2100:16): [Folded - Ignored]
  Branch (2100:16): [Folded - Ignored]
  Branch (2100:16): [True: 0, False: 2]
2101
0
                error!(
2102
0
                    operation_id = ?action.operation_id,
2103
0
                    "Error sending kill to running operation",
2104
                );
2105
2
            }
2106
0
        }
2107
2
    }
2108
2109
17
    fn perform_cleanup(self: &Arc<Self>, operation_id: OperationId) -> Option<CleanupGuard> {
2110
17
        let mut cleaning = self.cleaning_up_operations.lock();
2111
17
        cleaning
2112
17
            .insert(operation_id.clone())
2113
17
            .then_some(CleanupGuard {
2114
17
                manager: Arc::downgrade(self),
2115
17
                operation_id,
2116
17
            })
2117
17
    }
2118
}
2119
2120
impl RunningActionsManager for RunningActionsManagerImpl {
2121
    type RunningAction = RunningActionImpl;
2122
2123
19
    async fn create_and_add_action(
2124
19
        self: &Arc<Self>,
2125
19
        worker_id: String,
2126
19
        start_execute: StartExecute,
2127
19
    ) -> Result<Arc<RunningActionImpl>, Error> {
2128
19
        self.metrics
2129
19
            .create_and_add_action
2130
19
            .wrap(async move {
2131
19
                let queued_timestamp = start_execute
2132
19
                    .queued_timestamp
2133
19
                    .and_then(|time| 
time13
.
try_into13
().
ok13
())
2134
19
                    .unwrap_or(SystemTime::UNIX_EPOCH);
2135
19
                let operation_id = start_execute
2136
19
                    .operation_id.as_str().into();
2137
19
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
2138
19
                debug!(
2139
                    ?action_info,
2140
19
                    "Worker received action",
2141
                );
2142
                // Wait for any previous cleanup to complete before creating directory
2143
19
                self.wait_for_cleanup_if_needed(&operation_id).await
?0
;
2144
19
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
2145
19
                let execution_metadata = ExecutionMetadata {
2146
19
                    worker: worker_id,
2147
19
                    queued_timestamp: action_info.insert_timestamp,
2148
19
                    worker_start_timestamp: action_info.load_timestamp,
2149
19
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
2150
19
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
2151
19
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
2152
19
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
2153
19
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
2154
19
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
2155
19
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
2156
19
                };
2157
19
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
  Branch (2157:34): [Folded - Ignored]
  Branch (2157:67): [Folded - Ignored]
  Branch (2157:34): [Folded - Ignored]
  Branch (2157:67): [Folded - Ignored]
  Branch (2157:34): [True: 16, False: 3]
  Branch (2157:67): [True: 0, False: 3]
2158
16
                    self.max_action_timeout
2159
                } else {
2160
3
                    action_info.timeout
2161
                };
2162
19
                if timeout > self.max_action_timeout {
  Branch (2162:20): [Folded - Ignored]
  Branch (2162:20): [Folded - Ignored]
  Branch (2162:20): [True: 1, False: 18]
2163
1
                    return Err(make_err!(
2164
1
                        Code::InvalidArgument,
2165
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
2166
1
                        timeout.as_secs_f32(),
2167
1
                        self.max_action_timeout.as_secs_f32()
2168
1
                    ));
2169
18
                }
2170
18
                let running_action = Arc::new(RunningActionImpl::new(
2171
18
                    execution_metadata,
2172
18
                    operation_id.clone(),
2173
18
                    action_directory,
2174
18
                    action_info,
2175
18
                    timeout,
2176
18
                    self.clone(),
2177
                ));
2178
                {
2179
18
                    let mut running_actions = self.running_actions.lock();
2180
                    // Check if action already exists and is still alive
2181
18
                    if let Some(
existing_weak0
) = running_actions.get(&operation_id) {
  Branch (2181:28): [Folded - Ignored]
  Branch (2181:28): [Folded - Ignored]
  Branch (2181:28): [True: 0, False: 18]
2182
0
                        if let Some(_existing_action) = existing_weak.upgrade() {
  Branch (2182:32): [Folded - Ignored]
  Branch (2182:32): [Folded - Ignored]
  Branch (2182:32): [True: 0, False: 0]
2183
0
                            return Err(make_err!(
2184
0
                                Code::AlreadyExists,
2185
0
                                "Action with operation_id {} is already running",
2186
0
                                operation_id
2187
0
                            ));
2188
0
                        }
2189
18
                    }
2190
18
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
2191
                }
2192
18
                Ok(running_action)
2193
19
            })
2194
19
            .await
2195
19
    }
2196
2197
0
    async fn cache_action_result(
2198
0
        &self,
2199
0
        action_info: DigestInfo,
2200
0
        action_result: &mut ActionResult,
2201
0
        hasher: DigestHasherFunc,
2202
6
    ) -> Result<(), Error> {
2203
6
        self.metrics
2204
6
            .cache_action_result
2205
6
            .wrap(self.upload_action_results.cache_action_result(
2206
6
                action_info,
2207
6
                action_result,
2208
6
                hasher,
2209
6
            ))
2210
6
            .await
2211
6
    }
2212
2213
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
2214
0
        let running_action = {
2215
0
            let running_actions = self.running_actions.lock();
2216
0
            running_actions
2217
0
                .get(operation_id)
2218
0
                .and_then(Weak::upgrade)
2219
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
2220
        };
2221
0
        Self::kill_operation(running_action).await;
2222
0
        Ok(())
2223
0
    }
2224
2225
    // Note: When the future returns the process should be fully killed and cleaned up.
2226
2
    
async fn kill_all(&self)0
{
2227
2
        self.metrics
2228
2
            .kill_all
2229
2
            .wrap_no_capture_result(async move {
2230
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
2231
2
                    let running_actions = self.running_actions.lock();
2232
2
                    running_actions
2233
2
                        .iter()
2234
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
2235
2
                        .collect()
2236
                };
2237
2
                let mut kill_futures: FuturesUnordered<_> = kill_operations
2238
2
                    .into_iter()
2239
2
                    .map(Self::kill_operation)
2240
2
                    .collect();
2241
4
                while kill_futures.next().await.is_some() 
{}2
  Branch (2241:23): [Folded - Ignored]
  Branch (2241:23): [Folded - Ignored]
  Branch (2241:23): [True: 2, False: 2]
2242
2
            })
2243
2
            .await;
2244
        // Ignore error. If error happens it means there's no sender, which is not a problem.
2245
        // Note: Sanity check this API will always check current value then future values:
2246
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
2247
2
        drop(
2248
2
            self.action_done_tx
2249
2
                .subscribe()
2250
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
2251
2
                .await,
2252
        );
2253
2
    }
2254
2255
    #[inline]
2256
2
    fn metrics(&self) -> &Arc<Metrics> {
2257
2
        &self.metrics
2258
2
    }
2259
}
2260
2261
#[derive(Debug, Default, MetricsComponent)]
2262
pub struct Metrics {
2263
    #[metric(help = "Stats about the create_and_add_action command.")]
2264
    create_and_add_action: AsyncCounterWrapper,
2265
    #[metric(help = "Stats about the cache_action_result command.")]
2266
    cache_action_result: AsyncCounterWrapper,
2267
    #[metric(help = "Stats about the kill_all command.")]
2268
    kill_all: AsyncCounterWrapper,
2269
    #[metric(help = "Stats about the create_action_info command.")]
2270
    create_action_info: AsyncCounterWrapper,
2271
    #[metric(help = "Stats about the make_work_directory command.")]
2272
    make_action_directory: AsyncCounterWrapper,
2273
    #[metric(help = "Stats about the prepare_action command.")]
2274
    prepare_action: AsyncCounterWrapper,
2275
    #[metric(help = "Stats about the execute command.")]
2276
    execute: AsyncCounterWrapper,
2277
    #[metric(help = "Stats about the upload_results command.")]
2278
    upload_results: AsyncCounterWrapper,
2279
    #[metric(help = "Stats about the cleanup command.")]
2280
    cleanup: AsyncCounterWrapper,
2281
    #[metric(help = "Stats about the get_finished_result command.")]
2282
    get_finished_result: AsyncCounterWrapper,
2283
    #[metric(help = "Number of times an action waited for cleanup to complete.")]
2284
    cleanup_waits: CounterWithTime,
2285
    #[metric(help = "Number of stale directories removed during action retries.")]
2286
    stale_removals: CounterWithTime,
2287
    #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2288
    cleanup_wait_timeouts: CounterWithTime,
2289
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
2290
    get_proto_command_from_store: AsyncCounterWrapper,
2291
    #[metric(help = "Stats about the download_to_directory command.")]
2292
    download_to_directory: AsyncCounterWrapper,
2293
    #[metric(help = "Stats about the prepare_output_files command.")]
2294
    prepare_output_files: AsyncCounterWrapper,
2295
    #[metric(help = "Stats about the prepare_output_paths command.")]
2296
    prepare_output_paths: AsyncCounterWrapper,
2297
    #[metric(help = "Stats about the child_process command.")]
2298
    child_process: AsyncCounterWrapper,
2299
    #[metric(help = "Stats about the child_process_success_error_code command.")]
2300
    child_process_success_error_code: CounterWithTime,
2301
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
2302
    child_process_failure_error_code: CounterWithTime,
2303
    #[metric(help = "Total time spent uploading stdout.")]
2304
    upload_stdout: AsyncCounterWrapper,
2305
    #[metric(help = "Total time spent uploading stderr.")]
2306
    upload_stderr: AsyncCounterWrapper,
2307
    #[metric(help = "Total number of task timeouts.")]
2308
    task_timeouts: CounterWithTime,
2309
}