Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::HashMap;
23
use std::collections::vec_deque::VecDeque;
24
use std::ffi::{OsStr, OsString};
25
#[cfg(target_family = "unix")]
26
use std::fs::Permissions;
27
#[cfg(target_family = "unix")]
28
use std::os::unix::fs::{MetadataExt, PermissionsExt};
29
use std::path::Path;
30
use std::process::Stdio;
31
use std::sync::{Arc, Weak};
32
use std::time::SystemTime;
33
34
use bytes::{Bytes, BytesMut};
35
use filetime::{FileTime, set_file_mtime};
36
use formatx::Template;
37
use futures::future::{
38
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
39
};
40
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
41
use nativelink_config::cas_server::{
42
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
43
};
44
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
45
use nativelink_metric::MetricsComponent;
46
use nativelink_proto::build::bazel::remote::execution::v2::{
47
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
48
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
49
    Tree as ProtoTree, UpdateActionResultRequest,
50
};
51
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
52
    HistoricalExecuteResponse, StartExecute,
53
};
54
use nativelink_store::ac_utils::{
55
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
56
};
57
use nativelink_store::fast_slow_store::FastSlowStore;
58
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
59
use nativelink_store::grpc_store::GrpcStore;
60
use nativelink_util::action_messages::{
61
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
62
    SymlinkInfo, to_execute_response,
63
};
64
use nativelink_util::common::{DigestInfo, fs};
65
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
66
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
67
use nativelink_util::shutdown_guard::ShutdownGuard;
68
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
69
use nativelink_util::{background_spawn, spawn, spawn_blocking};
70
use parking_lot::Mutex;
71
use prost::Message;
72
use relative_path::RelativePath;
73
use scopeguard::{ScopeGuard, guard};
74
use serde::Deserialize;
75
use tokio::io::{AsyncReadExt, AsyncSeekExt};
76
use tokio::process;
77
use tokio::sync::{oneshot, watch};
78
use tokio_stream::wrappers::ReadDirStream;
79
use tonic::Request;
80
use tracing::{debug, error, info, warn};
81
use uuid::Uuid;
82
83
/// For simplicity we use a fixed exit code for cases when our program is terminated
84
/// due to a signal.
85
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
86
87
/// Default strategy for uploading historical results.
88
/// Note: If this value changes the config documentation
89
/// should reflect it.
90
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
91
    UploadCacheResultsStrategy::FailuresOnly;
92
93
/// Valid string reasons for a failure.
94
/// Note: If these change, the documentation should be updated.
95
#[derive(Debug, Deserialize)]
96
#[serde(rename_all = "snake_case")]
97
enum SideChannelFailureReason {
98
    /// Task should be considered timed out.
99
    Timeout,
100
}
101
102
/// This represents the json data that can be passed from the running process
103
/// to the parent via the `SideChannelFile`. See:
104
/// `config::EnvironmentSource::sidechannelfile` for more details.
105
/// Note: Any fields added here must be added to the documentation.
106
#[derive(Debug, Deserialize, Default)]
107
struct SideChannelInfo {
108
    /// If the task should be considered a failure and why.
109
    failure: Option<SideChannelFailureReason>,
110
}
111
112
/// Aggressively download the digests of files and make a local folder from it. This function
113
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
114
/// should be rate limited if spawning too many requests at once is an issue.
115
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
116
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
117
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
118
/// to a new location.
119
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
120
// of the future. So we need to force this function to return a dynamic future instead.
121
// see: https://github.com/rust-lang/rust/issues/78649
122
25
pub fn download_to_directory<'a>(
123
25
    cas_store: &'a FastSlowStore,
124
25
    filesystem_store: Pin<&'a FilesystemStore>,
125
25
    digest: &'a DigestInfo,
126
25
    current_directory: &'a str,
127
25
) -> BoxFuture<'a, Result<(), Error>> {
128
25
    async move {
129
25
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
130
25
            .await
131
25
            .err_tip(|| "Converting digest to Directory")
?0
;
132
25
        let mut futures = FuturesUnordered::new();
133
134
29
        for 
file4
in directory.files {
135
4
            let digest: DigestInfo = file
136
4
                .digest
137
4
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
138
4
                .try_into()
139
4
                .err_tip(|| "In Directory::file::digest")
?0
;
140
4
            let dest = format!("{}/{}", current_directory, file.name);
141
4
            let (mtime, mut unix_mode) = match file.node_properties {
142
1
                Some(properties) => (properties.mtime, properties.unix_mode),
143
3
                None => (None, None),
144
            };
145
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
146
4
            if file.is_executable {
  Branch (146:16): [True: 1, False: 3]
  Branch (146:16): [Folded - Ignored]
147
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
148
3
            }
149
4
            futures.push(
150
4
                cas_store
151
4
                    .populate_fast_store(digest.into())
152
4
                    .and_then(move |()| async move {
153
4
                        let file_entry = filesystem_store
154
4
                            .get_file_entry_for_digest(&digest)
155
4
                            .await
156
4
                            .err_tip(|| "During hard link")
?0
;
157
4
                        file_entry
158
4
                            .get_file_path_locked(|src| fs::hard_link(src, &dest))
159
4
                            .await
160
4
                            .map_err(|e| 
{0
161
0
                                make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
162
0
                            })?;
163
                        #[cfg(target_family = "unix")]
164
4
                        if let Some(
unix_mode1
) = unix_mode {
  Branch (164:32): [True: 1, False: 3]
  Branch (164:32): [Folded - Ignored]
165
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
166
1
                                .await
167
1
                                .err_tip(|| 
{0
168
0
                                    format!(
169
0
                                        "Could not set unix mode in download_to_directory {dest}"
170
                                    )
171
0
                                })?;
172
3
                        }
173
4
                        if let Some(
mtime1
) = mtime {
  Branch (173:32): [True: 1, False: 3]
  Branch (173:32): [Folded - Ignored]
174
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
175
1
                                set_file_mtime(
176
1
                                    &dest,
177
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
178
                                )
179
1
                                .err_tip(|| 
{0
180
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
181
0
                                })
182
1
                            })
183
1
                            .await
184
1
                            .err_tip(
185
                                || "Failed to launch spawn_blocking in download_to_directory",
186
0
                            )??;
187
3
                        }
188
4
                        Ok(())
189
8
                    })
190
4
                    .map_err(move |e| 
e0
.
append0
(
format!0
(
"for digest {digest}"0
)))
191
4
                    .boxed(),
192
            );
193
        }
194
195
32
        for 
directory7
in directory.directories {
196
7
            let digest: DigestInfo = directory
197
7
                .digest
198
7
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
199
7
                .try_into()
200
7
                .err_tip(|| "In Directory::file::digest")
?0
;
201
7
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
202
7
            futures.push(
203
7
                async move {
204
7
                    fs::create_dir(&new_directory_path)
205
7
                        .await
206
7
                        .err_tip(|| format!(
"Could not create directory {new_directory_path}"0
))
?0
;
207
7
                    download_to_directory(
208
7
                        cas_store,
209
7
                        filesystem_store,
210
7
                        &digest,
211
7
                        &new_directory_path,
212
7
                    )
213
7
                    .await
214
7
                    .err_tip(|| format!(
"in download_to_directory : {new_directory_path}"0
))
?0
;
215
7
                    Ok(())
216
7
                }
217
7
                .boxed(),
218
            );
219
        }
220
221
        #[cfg(target_family = "unix")]
222
26
        for 
symlink_node1
in directory.symlinks {
223
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
224
1
            futures.push(
225
1
                async move {
226
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
227
0
                        format!(
228
0
                            "Could not create symlink {} -> {}",
229
                            symlink_node.target, dest
230
                        )
231
0
                    })?;
232
1
                    Ok(())
233
1
                }
234
1
                .boxed(),
235
            );
236
        }
237
238
37
        while futures.try_next().await
?0
.is_some()
{}12
  Branch (238:15): [True: 12, False: 25]
  Branch (238:15): [Folded - Ignored]
239
25
        Ok(())
240
25
    }
241
25
    .boxed()
242
25
}
243
244
#[cfg(target_family = "windows")]
245
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
246
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
247
    EXECUTABLE_EXTENSIONS
248
        .iter()
249
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
250
}
251
252
#[cfg(target_family = "unix")]
253
7
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
254
7
    (metadata.mode() & 0o111) != 0
255
7
}
256
257
#[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
258
7
async fn upload_file(
259
7
    cas_store: Pin<&impl StoreLike>,
260
7
    full_path: impl AsRef<Path> + Debug,
261
7
    hasher: DigestHasherFunc,
262
7
    metadata: std::fs::Metadata,
263
7
) -> Result<FileInfo, Error> {
264
7
    let is_executable = is_executable(&metadata, &full_path);
265
7
    let file_size = metadata.len();
266
7
    let file = fs::open_file(&full_path, 0, u64::MAX)
267
7
        .await
268
7
        .err_tip(|| format!(
"Could not open file {full_path:?}"0
))
?0
;
269
270
7
    let (digest, mut file) = hasher
271
7
        .hasher()
272
7
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
273
7
        .await
274
7
        .err_tip(|| format!(
"Failed to hash file in digest_for_file failed for {full_path:?}"0
))
?0
;
275
276
7
    file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
277
278
    // Note: For unknown reasons we appear to be hitting:
279
    // https://github.com/rust-lang/rust/issues/92096
280
    // or a smiliar issue if we try to use the non-store driver function, so we
281
    // are using the store driver function here.
282
7
    cas_store
283
7
        .as_store_driver_pin()
284
7
        .update_with_whole_file(
285
7
            digest.into(),
286
7
            full_path.as_ref().into(),
287
7
            file,
288
7
            UploadSizeInfo::ExactSize(digest.size_bytes()),
289
7
        )
290
7
        .await
291
7
        .err_tip(|| format!(
"for {full_path:?}"0
))
?0
;
292
293
7
    let name = full_path
294
7
        .as_ref()
295
7
        .file_name()
296
7
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
297
7
        .to_str()
298
7
        .err_tip(|| 
{0
299
0
            make_err!(
300
0
                Code::Internal,
301
                "Could not convert {:?} to string",
302
                full_path
303
            )
304
0
        })?
305
7
        .to_string();
306
307
7
    Ok(FileInfo {
308
7
        name_or_path: NameOrPath::Name(name),
309
7
        digest,
310
7
        is_executable,
311
7
    })
312
7
}
313
314
2
async fn upload_symlink(
315
2
    full_path: impl AsRef<Path> + Debug,
316
2
    full_work_directory_path: impl AsRef<Path>,
317
2
) -> Result<SymlinkInfo, Error> {
318
2
    let full_target_path = fs::read_link(full_path.as_ref())
319
2
        .await
320
2
        .err_tip(|| format!(
"Could not get read_link path of {full_path:?}"0
))
?0
;
321
322
    // Detect if our symlink is inside our work directory, if it is find the
323
    // relative path otherwise use the absolute path.
324
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
  Branch (324:21): [Folded - Ignored]
  Branch (324:21): [Folded - Ignored]
  Branch (324:21): [True: 0, False: 1]
  Branch (324:21): [True: 0, False: 1]
325
0
        let full_target_path = RelativePath::from_path(&full_target_path)
326
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
327
0
        RelativePath::from_path(full_work_directory_path.as_ref())
328
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
329
0
            .relative(full_target_path)
330
0
            .normalize()
331
0
            .into_string()
332
    } else {
333
2
        full_target_path
334
2
            .to_str()
335
2
            .err_tip(|| 
{0
336
0
                make_err!(
337
0
                    Code::Internal,
338
                    "Could not convert '{:?}' to string",
339
                    full_target_path
340
                )
341
0
            })?
342
2
            .to_string()
343
    };
344
345
2
    let name = full_path
346
2
        .as_ref()
347
2
        .file_name()
348
2
        .err_tip(|| format!(
"Expected file_name to exist on {full_path:?}"0
))
?0
349
2
        .to_str()
350
2
        .err_tip(|| 
{0
351
0
            make_err!(
352
0
                Code::Internal,
353
                "Could not convert {:?} to string",
354
                full_path
355
            )
356
0
        })?
357
2
        .to_string();
358
359
2
    Ok(SymlinkInfo {
360
2
        name_or_path: NameOrPath::Name(name),
361
2
        target,
362
2
    })
363
2
}
364
365
3
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
366
3
    cas_store: Pin<&'a impl StoreLike>,
367
3
    full_dir_path: P,
368
3
    full_work_directory: &'a str,
369
3
    hasher: DigestHasherFunc,
370
3
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
371
3
    Box::pin(async move {
372
3
        let file_futures = FuturesUnordered::new();
373
3
        let dir_futures = FuturesUnordered::new();
374
3
        let symlink_futures = FuturesUnordered::new();
375
        {
376
3
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
377
3
                .await
378
3
                .err_tip(|| format!(
"Error reading dir for reading {full_dir_path:?}"0
))
?0
379
3
                .into_inner();
380
3
            let mut dir_stream = ReadDirStream::new(dir_handle);
381
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
382
            // lived as possible. This is why we iterate the directory and then build a bunch of
383
            // futures with all the work we are wanting to do then execute it. It allows us to
384
            // close the directory iterator file descriptor, then open the child files/folders.
385
8
            while let Some(
entry_result5
) = dir_stream.next().await {
  Branch (385:23): [Folded - Ignored]
  Branch (385:23): [Folded - Ignored]
  Branch (385:23): [True: 1, False: 1]
  Branch (385:23): [True: 4, False: 2]
386
5
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
387
5
                let file_type = entry
388
5
                    .file_type()
389
5
                    .await
390
5
                    .err_tip(|| format!(
"Error running file_type() on {entry:?}"0
))
?0
;
391
5
                let full_path = full_dir_path.as_ref().join(entry.path());
392
5
                if file_type.is_dir() {
  Branch (392:20): [Folded - Ignored]
  Branch (392:20): [Folded - Ignored]
  Branch (392:20): [True: 0, False: 1]
  Branch (392:20): [True: 1, False: 3]
393
1
                    let full_dir_path = full_dir_path.clone();
394
1
                    dir_futures.push(
395
1
                        upload_directory(cas_store, full_path.clone(), full_work_directory, hasher)
396
1
                            .and_then(|(dir, all_dirs)| async move {
397
1
                                let directory_name = full_path
398
1
                                    .file_name()
399
1
                                    .err_tip(|| 
{0
400
0
                                        format!("Expected file_name to exist on {full_dir_path:?}")
401
0
                                    })?
402
1
                                    .to_str()
403
1
                                    .err_tip(|| 
{0
404
0
                                        make_err!(
405
0
                                            Code::Internal,
406
                                            "Could not convert {:?} to string",
407
                                            full_dir_path
408
                                        )
409
0
                                    })?
410
1
                                    .to_string();
411
412
1
                                let digest = serialize_and_upload_message(
413
1
                                    &dir,
414
1
                                    cas_store,
415
1
                                    &mut hasher.hasher(),
416
1
                                )
417
1
                                .await
418
1
                                .err_tip(|| format!(
"for {}"0
,
full_path.display()0
))
?0
;
419
420
1
                                Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
421
1
                                    DirectoryNode {
422
1
                                        name: directory_name,
423
1
                                        digest: Some(digest.into()),
424
1
                                    },
425
1
                                    all_dirs,
426
1
                                ))
427
2
                            })
428
1
                            .boxed(),
429
                    );
430
4
                } else if file_type.is_file() {
  Branch (430:27): [Folded - Ignored]
  Branch (430:27): [Folded - Ignored]
  Branch (430:27): [True: 0, False: 1]
  Branch (430:27): [True: 3, False: 0]
431
3
                    file_futures.push(async move {
432
3
                        let metadata = fs::metadata(&full_path)
433
3
                            .await
434
3
                            .err_tip(|| format!(
"Could not open file {}"0
,
full_path.display()0
))
?0
;
435
3
                        upload_file(cas_store, &full_path, hasher, metadata)
436
3
                            .map_ok(TryInto::try_into)
437
3
                            .await
?0
438
3
                    });
439
1
                } else if file_type.is_symlink() {
  Branch (439:27): [Folded - Ignored]
  Branch (439:27): [Folded - Ignored]
  Branch (439:27): [True: 1, False: 0]
  Branch (439:27): [True: 0, False: 0]
440
1
                    symlink_futures.push(
441
1
                        upload_symlink(full_path, &full_work_directory)
442
1
                            .map(|symlink| symlink
?0
.try_into()),
443
                    );
444
0
                }
445
            }
446
        }
447
448
3
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
449
3
            file_futures.try_collect::<Vec<FileNode>>(),
450
3
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
451
3
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
452
3
        )
453
3
        .await
?0
;
454
455
3
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
456
        // For efficiency we use a deque because it allows cheap concat of Vecs.
457
        // We make the assumption here that when performance is important it is because
458
        // our directory is quite large. This allows us to cheaply merge large amounts of
459
        // directories into one VecDeque. Then after we are done we need to collapse it
460
        // down into a single Vec.
461
3
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
462
4
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
463
1
            directory_nodes.push(directory_node);
464
1
            all_child_directories.append(&mut recursive_child_directories);
465
1
        }
466
467
3
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
468
3
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
469
3
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
470
471
3
        let directory = Directory {
472
3
            files: file_nodes,
473
3
            directories: directory_nodes,
474
3
            symlinks,
475
3
            node_properties: None, // We don't support file properties.
476
3
        };
477
3
        all_child_directories.push_back(directory.clone());
478
479
3
        Ok((directory, all_child_directories))
480
3
    })
481
3
}
482
483
0
async fn process_side_channel_file(
484
0
    side_channel_file: Cow<'_, OsStr>,
485
0
    args: &[&OsStr],
486
0
    timeout: Duration,
487
0
) -> Result<Option<Error>, Error> {
488
0
    let mut json_contents = String::new();
489
    {
490
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
491
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
492
0
            Ok(file_slot) => file_slot,
493
0
            Err(e) => {
494
0
                if e.code != Code::NotFound {
  Branch (494:20): [Folded - Ignored]
  Branch (494:20): [Folded - Ignored]
  Branch (494:20): [True: 0, False: 0]
495
0
                    return Err(e).err_tip(|| "Error opening side channel file");
496
0
                }
497
                // Note: If file does not exist, it's ok. Users are not required to create this file.
498
0
                return Ok(None);
499
            }
500
        };
501
0
        file_slot
502
0
            .read_to_string(&mut json_contents)
503
0
            .await
504
0
            .err_tip(|| "Error reading side channel file")?;
505
    }
506
507
0
    let side_channel_info: SideChannelInfo =
508
0
        serde_json5::from_str(&json_contents).map_err(|e| {
509
0
            make_input_err!(
510
                "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}"
511
            )
512
0
        })?;
513
0
    Ok(side_channel_info.failure.map(|failure| match failure {
514
0
        SideChannelFailureReason::Timeout => Error::new(
515
0
            Code::DeadlineExceeded,
516
0
            format!(
517
0
                "Command '{}' timed out after {} seconds",
518
0
                args.join(OsStr::new(" ")).to_string_lossy(),
519
0
                timeout.as_secs_f32()
520
            ),
521
        ),
522
0
    }))
523
0
}
524
525
15
async fn do_cleanup(
526
15
    running_actions_manager: &RunningActionsManagerImpl,
527
15
    operation_id: &OperationId,
528
15
    action_directory: &str,
529
15
) -> Result<(), Error> {
530
15
    debug!(
"Worker cleaning up"0
);
531
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
532
15
    let remove_dir_result = fs::remove_dir_all(action_directory)
533
15
        .await
534
15
        .err_tip(|| format!(
"Could not remove working directory {action_directory}"0
));
535
15
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
  Branch (535:12): [True: 0, False: 0]
  Branch (535:12): [Folded - Ignored]
  Branch (535:12): [True: 0, False: 15]
536
0
        error!(?operation_id, ?err, "Error cleaning up action");
537
0
        return Result::<(), Error>::Err(err).merge(remove_dir_result);
538
15
    }
539
15
    if let Err(
err0
) = remove_dir_result {
  Branch (539:12): [True: 0, False: 0]
  Branch (539:12): [Folded - Ignored]
  Branch (539:12): [True: 0, False: 15]
540
0
        error!(?operation_id, ?err, "Error removing working directory");
541
0
        return Err(err);
542
15
    }
543
15
    Ok(())
544
15
}
545
546
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
547
    /// Returns the action id of the action.
548
    fn get_operation_id(&self) -> &OperationId;
549
550
    /// Anything that needs to execute before the actions is actually executed should happen here.
551
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
552
553
    /// Actually perform the execution of the action.
554
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
555
556
    /// Any uploading, processing or analyzing of the results should happen here.
557
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
558
559
    /// Cleanup any residual files, handles or other junk resulting from running the action.
560
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
561
562
    /// Returns the final result. As a general rule this action should be thought of as
563
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
564
    /// is over and any action performed on it after this call is undefined behavior.
565
    fn get_finished_result(
566
        self: Arc<Self>,
567
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
568
569
    /// Returns the work directory of the action.
570
    fn get_work_directory(&self) -> &String;
571
}
572
573
#[derive(Debug)]
574
struct RunningActionImplExecutionResult {
575
    stdout: Bytes,
576
    stderr: Bytes,
577
    exit_code: i32,
578
}
579
580
#[derive(Debug)]
581
struct RunningActionImplState {
582
    command_proto: Option<ProtoCommand>,
583
    // TODO(aaronmondal) Kill is not implemented yet, but is instrumented.
584
    // However, it is used if the worker disconnects to destroy current jobs.
585
    kill_channel_tx: Option<oneshot::Sender<()>>,
586
    kill_channel_rx: Option<oneshot::Receiver<()>>,
587
    execution_result: Option<RunningActionImplExecutionResult>,
588
    action_result: Option<ActionResult>,
589
    execution_metadata: ExecutionMetadata,
590
    // If there was an internal error, this will be set.
591
    // This should NOT be set if everything was fine, but the process had a
592
    // non-zero exit code. Instead this should be used for internal errors
593
    // that prevented the action from running, upload failures, timeouts, exc...
594
    // but we have (or could have) the action results (like stderr/stdout).
595
    error: Option<Error>,
596
}
597
598
#[derive(Debug)]
599
pub struct RunningActionImpl {
600
    operation_id: OperationId,
601
    action_directory: String,
602
    work_directory: String,
603
    action_info: ActionInfo,
604
    timeout: Duration,
605
    running_actions_manager: Arc<RunningActionsManagerImpl>,
606
    state: Mutex<RunningActionImplState>,
607
    did_cleanup: AtomicBool,
608
}
609
610
impl RunningActionImpl {
611
15
    fn new(
612
15
        execution_metadata: ExecutionMetadata,
613
15
        operation_id: OperationId,
614
15
        action_directory: String,
615
15
        action_info: ActionInfo,
616
15
        timeout: Duration,
617
15
        running_actions_manager: Arc<RunningActionsManagerImpl>,
618
15
    ) -> Self {
619
15
        let work_directory = format!("{}/{}", action_directory, "work");
620
15
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
621
15
        Self {
622
15
            operation_id,
623
15
            action_directory,
624
15
            work_directory,
625
15
            action_info,
626
15
            timeout,
627
15
            running_actions_manager,
628
15
            state: Mutex::new(RunningActionImplState {
629
15
                command_proto: None,
630
15
                kill_channel_rx: Some(kill_channel_rx),
631
15
                kill_channel_tx: Some(kill_channel_tx),
632
15
                execution_result: None,
633
15
                action_result: None,
634
15
                execution_metadata,
635
15
                error: None,
636
15
            }),
637
15
            did_cleanup: AtomicBool::new(false),
638
15
        }
639
15
    }
640
641
    #[allow(
642
        clippy::missing_const_for_fn,
643
        reason = "False positive on stable, but not on nightly"
644
    )]
645
0
    fn metrics(&self) -> &Arc<Metrics> {
646
0
        &self.running_actions_manager.metrics
647
0
    }
648
649
    /// Prepares any actions needed to execution this action. This action will do the following:
650
    ///
651
    /// * Download any files needed to execute the action
652
    /// * Build a folder with all files needed to execute the action.
653
    ///
654
    /// This function will aggressively download and spawn potentially thousands of futures. It is
655
    /// up to the stores to rate limit if needed.
656
15
    
async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
657
15
        {
658
15
            let mut state = self.state.lock();
659
15
            state.execution_metadata.input_fetch_start_timestamp =
660
15
                (self.running_actions_manager.callbacks.now_fn)();
661
15
        }
662
15
        let command = {
663
            // Download and build out our input files/folders. Also fetch and decode our Command.
664
15
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
665
15
                get_and_decode_digest::<ProtoCommand>(
666
15
                    self.running_actions_manager.cas_store.as_ref(),
667
15
                    self.action_info.command_digest.into(),
668
15
                )
669
15
                .await
670
15
                .err_tip(|| "Converting command_digest to Command")
671
15
            });
672
15
            let filesystem_store_pin =
673
15
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
674
15
            let (command, ()) = try_join(command_fut, async {
675
15
                fs::create_dir(&self.work_directory)
676
15
                    .await
677
15
                    .err_tip(|| format!(
"Error creating work directory {}"0
,
self.work_directory0
))
?0
;
678
                // Download the input files/folder and place them into the temp directory.
679
15
                self.metrics()
680
15
                    .download_to_directory
681
15
                    .wrap(download_to_directory(
682
15
                        &self.running_actions_manager.cas_store,
683
15
                        filesystem_store_pin,
684
15
                        &self.action_info.input_root_digest,
685
15
                        &self.work_directory,
686
15
                    ))
687
15
                    .await
688
15
            })
689
15
            .await
?0
;
690
15
            command
691
        };
692
        {
693
            // Create all directories needed for our output paths. This is required by the bazel spec.
694
15
            let prepare_output_directories = |output_file| 
{9
695
9
                let full_output_path = if command.working_directory.is_empty() {
  Branch (695:43): [Folded - Ignored]
  Branch (695:43): [Folded - Ignored]
  Branch (695:43): [True: 1, False: 8]
696
1
                    format!("{}/{}", self.work_directory, output_file)
697
                } else {
698
8
                    format!(
699
8
                        "{}/{}/{}",
700
8
                        self.work_directory, command.working_directory, output_file
701
                    )
702
                };
703
9
                async move {
704
9
                    let full_parent_path = Path::new(&full_output_path)
705
9
                        .parent()
706
9
                        .err_tip(|| format!(
"Parent path for {full_output_path} has no parent"0
))
?0
;
707
9
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
708
0
                        format!(
709
0
                            "Error creating output directory {} (file)",
710
0
                            full_parent_path.display()
711
                        )
712
0
                    })?;
713
9
                    Result::<(), Error>::Ok(())
714
9
                }
715
9
            };
716
15
            self.metrics()
717
15
                .prepare_output_files
718
15
                .wrap(try_join_all(
719
15
                    command.output_files.iter().map(prepare_output_directories),
720
15
                ))
721
15
                .await
?0
;
722
15
            self.metrics()
723
15
                .prepare_output_paths
724
15
                .wrap(try_join_all(
725
15
                    command.output_paths.iter().map(prepare_output_directories),
726
15
                ))
727
15
                .await
?0
;
728
        }
729
15
        debug!(?command, 
"Worker received command"0
,);
730
15
        {
731
15
            let mut state = self.state.lock();
732
15
            state.command_proto = Some(command);
733
15
            state.execution_metadata.input_fetch_completed_timestamp =
734
15
                (self.running_actions_manager.callbacks.now_fn)();
735
15
        }
736
15
        Ok(self)
737
15
    }
738
739
13
    async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error> {
740
13
        let (command_proto, mut kill_channel_rx) = {
741
13
            let mut state = self.state.lock();
742
13
            state.execution_metadata.execution_start_timestamp =
743
13
                (self.running_actions_manager.callbacks.now_fn)();
744
            (
745
13
                state
746
13
                    .command_proto
747
13
                    .take()
748
13
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
749
13
                state
750
13
                    .kill_channel_rx
751
13
                    .take()
752
13
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
753
                    // This is important as we may be killed at any point.
754
13
                    .fuse(),
755
            )
756
        };
757
13
        if command_proto.arguments.is_empty() {
  Branch (757:12): [Folded - Ignored]
  Branch (757:12): [Folded - Ignored]
  Branch (757:12): [True: 0, False: 13]
758
0
            return Err(make_input_err!("No arguments provided in Command proto"));
759
13
        }
760
13
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
  Branch (760:40): [Folded - Ignored]
  Branch (760:40): [Folded - Ignored]
  Branch (760:40): [True: 0, False: 13]
761
13
            .running_actions_manager
762
13
            .execution_configuration
763
13
            .entrypoint
764
        {
765
0
            core::iter::once(entrypoint.as_ref())
766
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
767
0
                .collect()
768
        } else {
769
13
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
770
        };
771
        // TODO(aaronmondal): This should probably be in debug, but currently
772
        //                    that's too busy and we often rely on this to
773
        //                    figure out toolchain misconfiguration issues.
774
        //                    De-bloat the `debug` level by using the `trace`
775
        //                    level more effectively and adjust this.
776
13
        info!(?args, "Executing command",);
777
13
        let mut command_builder = process::Command::new(args[0]);
778
13
        command_builder
779
13
            .args(&args[1..])
780
13
            .kill_on_drop(true)
781
13
            .stdin(Stdio::null())
782
13
            .stdout(Stdio::piped())
783
13
            .stderr(Stdio::piped())
784
13
            .current_dir(format!(
785
13
                "{}/{}",
786
13
                self.work_directory, command_proto.working_directory
787
13
            ))
788
13
            .env_clear();
789
790
13
        let requested_timeout = if self.action_info.timeout.is_zero() {
  Branch (790:36): [Folded - Ignored]
  Branch (790:36): [Folded - Ignored]
  Branch (790:36): [True: 11, False: 2]
791
11
            self.running_actions_manager.max_action_timeout
792
        } else {
793
2
            self.action_info.timeout
794
        };
795
796
13
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
797
13
        if let Some(
additional_environment0
) = &self
  Branch (797:16): [Folded - Ignored]
  Branch (797:16): [Folded - Ignored]
  Branch (797:16): [True: 0, False: 13]
798
13
            .running_actions_manager
799
13
            .execution_configuration
800
13
            .additional_environment
801
        {
802
0
            for (name, source) in additional_environment {
803
0
                let value = match source {
804
0
                    EnvironmentSource::Property(property) => self
805
0
                        .action_info
806
0
                        .platform_properties
807
0
                        .get(property)
808
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
809
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
810
                    EnvironmentSource::TimeoutMillis => {
811
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
812
                    }
813
                    EnvironmentSource::SideChannelFile => {
814
0
                        let file_cow =
815
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
816
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
817
0
                        Cow::Owned(file_cow)
818
                    }
819
                    EnvironmentSource::ActionDirectory => {
820
0
                        Cow::Borrowed(self.action_directory.as_str())
821
                    }
822
                };
823
0
                command_builder.env(name, value.as_ref());
824
            }
825
13
        }
826
827
        #[cfg(target_family = "unix")]
828
13
        let envs = &command_proto.environment_variables;
829
        // If SystemRoot is not set on windows we set it to default. Failing to do
830
        // this causes all commands to fail.
831
        #[cfg(target_family = "windows")]
832
        let envs = {
833
            let mut envs = command_proto.environment_variables.clone();
834
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
835
                envs.push(
836
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
837
                        name: "SystemRoot".to_string(),
838
                        value: "C:\\Windows".to_string(),
839
                    },
840
                );
841
            }
842
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
843
                envs.push(
844
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
845
                        name: "PATH".to_string(),
846
                        value: "C:\\Windows\\System32".to_string(),
847
                    },
848
                );
849
            }
850
            envs
851
        };
852
26
        for 
environment_variable13
in envs {
853
13
            command_builder.env(&environment_variable.name, &environment_variable.value);
854
13
        }
855
856
13
        let mut child_process = command_builder
857
13
            .spawn()
858
13
            .err_tip(|| format!(
"Could not execute command {args:?}"0
))
?0
;
859
13
        let mut stdout_reader = child_process
860
13
            .stdout
861
13
            .take()
862
13
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
863
13
        let mut stderr_reader = child_process
864
13
            .stderr
865
13
            .take()
866
13
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
867
868
13
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
869
0
            error!(
870
0
                "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
871
            );
872
0
            background_spawn!("running_actions_manager_kill_child_process", async move {
873
0
                child_process.kill().await
874
0
            });
875
0
        });
876
877
13
        let all_stdout_fut = spawn!("stdout_reader", async move {
878
13
            let mut all_stdout = BytesMut::new();
879
            loop {
880
16
                let 
sz14
= stdout_reader
881
16
                    .read_buf(&mut all_stdout)
882
16
                    .await
883
14
                    .err_tip(|| "Error reading stdout stream")
?0
;
884
14
                if sz == 0 {
  Branch (884:20): [Folded - Ignored]
  Branch (884:20): [Folded - Ignored]
  Branch (884:20): [True: 11, False: 3]
885
11
                    break; // EOF.
886
3
                }
887
            }
888
11
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
889
11
        });
890
13
        let all_stderr_fut = spawn!("stderr_reader", async move {
891
13
            let mut all_stderr = BytesMut::new();
892
            loop {
893
16
                let 
sz14
= stderr_reader
894
16
                    .read_buf(&mut all_stderr)
895
16
                    .await
896
14
                    .err_tip(|| "Error reading stderr stream")
?0
;
897
14
                if sz == 0 {
  Branch (897:20): [Folded - Ignored]
  Branch (897:20): [Folded - Ignored]
  Branch (897:20): [True: 11, False: 3]
898
11
                    break; // EOF.
899
3
                }
900
            }
901
11
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
902
11
        });
903
13
        let mut killed_action = false;
904
905
13
        let timer = self.metrics().child_process.begin_timer();
906
13
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
907
        loop {
908
17
            tokio::select! {
909
17
                () = &mut sleep_fut => {
910
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
911
2
                    killed_action = true;
912
2
                    if let Err(
err0
) = child_process_guard.start_kill() {
  Branch (912:28): [Folded - Ignored]
  Branch (912:28): [Folded - Ignored]
  Branch (912:28): [True: 0, False: 2]
913
0
                        error!(
914
                            ?err,
915
0
                            "Could not kill process in RunningActionsManager for action timeout",
916
                        );
917
2
                    }
918
2
                    {
919
2
                        let mut state = self.state.lock();
920
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
921
2
                            Code::DeadlineExceeded,
922
2
                            format!(
923
2
                                "Command '{}' timed out after {} seconds",
924
2
                                args.join(OsStr::new(" ")).to_string_lossy(),
925
2
                                self.action_info.timeout.as_secs_f32()
926
2
                            )
927
2
                        )));
928
2
                    }
929
                },
930
17
                
maybe_exit_status13
= child_process_guard.wait() => {
931
                    // Defuse our guard so it does not try to cleanup and make nessless logs.
932
13
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
933
13
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
934
                    // TODO(aaronmondal) We should implement stderr/stdout streaming to client here.
935
                    // If we get killed before the stream is started, then these will lock up.
936
                    // TODO(aaronmondal) There is a significant bug here. If we kill the action and the action creates
937
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
938
13
                    let (stdout, stderr) = if killed_action {
  Branch (938:47): [Folded - Ignored]
  Branch (938:47): [Folded - Ignored]
  Branch (938:47): [True: 4, False: 9]
939
4
                        drop(timer);
940
4
                        (Bytes::new(), Bytes::new())
941
                    } else {
942
9
                        timer.measure();
943
9
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
944
                        (
945
9
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
946
9
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
947
                        )
948
                    };
949
950
13
                    let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| 
{9
951
9
                        if exit_code == 0 {
  Branch (951:28): [Folded - Ignored]
  Branch (951:28): [Folded - Ignored]
  Branch (951:28): [True: 8, False: 1]
952
8
                            self.metrics().child_process_success_error_code.inc();
953
8
                        } else {
954
1
                            self.metrics().child_process_failure_error_code.inc();
955
1
                        }
956
9
                        exit_code
957
9
                    });
958
959
13
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
  Branch (959:55): [Folded - Ignored]
  Branch (959:55): [Folded - Ignored]
  Branch (959:55): [True: 0, False: 13]
960
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
961
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
962
                    } else {
963
13
                        None
964
                    };
965
13
                    {
966
13
                        let mut state = self.state.lock();
967
13
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
968
13
969
13
                        state.command_proto = Some(command_proto);
970
13
                        state.execution_result = Some(RunningActionImplExecutionResult{
971
13
                            stdout,
972
13
                            stderr,
973
13
                            exit_code,
974
13
                        });
975
13
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
976
13
                    }
977
13
                    return Ok(self);
978
                },
979
17
                _ = &mut kill_channel_rx => {
980
2
                    killed_action = true;
981
2
                    if let Err(
err0
) = child_process_guard.start_kill() {
  Branch (981:28): [Folded - Ignored]
  Branch (981:28): [Folded - Ignored]
  Branch (981:28): [True: 0, False: 2]
982
0
                        error!(
983
0
                            operation_id = ?self.operation_id,
984
                            ?err,
985
0
                            "Could not kill process",
986
                        );
987
2
                    }
988
2
                    {
989
2
                        let mut state = self.state.lock();
990
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
991
2
                            Code::Aborted,
992
2
                            format!(
993
2
                                "Command '{}' was killed by scheduler",
994
2
                                args.join(OsStr::new(" ")).to_string_lossy()
995
2
                            )
996
2
                        )));
997
2
                    }
998
                },
999
            }
1000
        }
1001
        // Unreachable.
1002
13
    }
1003
1004
11
    async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1005
        enum OutputType {
1006
            None,
1007
            File(FileInfo),
1008
            Directory(DirectoryInfo),
1009
            FileSymlink(SymlinkInfo),
1010
            DirectorySymlink(SymlinkInfo),
1011
        }
1012
1013
11
        debug!(
"Worker uploading results"0
,);
1014
11
        let (mut command_proto, execution_result, mut execution_metadata) = {
1015
11
            let mut state = self.state.lock();
1016
11
            state.execution_metadata.output_upload_start_timestamp =
1017
11
                (self.running_actions_manager.callbacks.now_fn)();
1018
            (
1019
11
                state
1020
11
                    .command_proto
1021
11
                    .take()
1022
11
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1023
11
                state
1024
11
                    .execution_result
1025
11
                    .take()
1026
11
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1027
11
                state.execution_metadata.clone(),
1028
            )
1029
        };
1030
11
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1031
11
        let hasher = self.action_info.unique_qualifier.digest_function();
1032
1033
11
        let mut output_path_futures = FuturesUnordered::new();
1034
11
        let mut output_paths = command_proto.output_paths;
1035
11
        if output_paths.is_empty() {
  Branch (1035:12): [Folded - Ignored]
  Branch (1035:12): [Folded - Ignored]
  Branch (1035:12): [True: 6, False: 5]
1036
6
            output_paths
1037
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1038
6
            output_paths.append(&mut command_proto.output_files);
1039
6
            output_paths.append(&mut command_proto.output_directories);
1040
6
        
}5
1041
18
        for 
entry7
in output_paths {
1042
7
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
  Branch (1042:47): [Folded - Ignored]
  Branch (1042:47): [Folded - Ignored]
  Branch (1042:47): [True: 0, False: 7]
1043
0
                format!("{}/{}", self.work_directory, entry)
1044
            } else {
1045
7
                format!(
1046
7
                    "{}/{}/{}",
1047
7
                    self.work_directory, command_proto.working_directory, entry
1048
                )
1049
            });
1050
7
            let work_directory = &self.work_directory;
1051
7
            output_path_futures.push(async move {
1052
3
                let metadata = {
1053
7
                    let metadata = match fs::symlink_metadata(&full_path).await {
1054
7
                        Ok(file) => file,
1055
0
                        Err(e) => {
1056
0
                            if e.code == Code::NotFound {
  Branch (1056:32): [Folded - Ignored]
  Branch (1056:32): [Folded - Ignored]
  Branch (1056:32): [True: 0, False: 0]
1057
                                // In the event our output does not exist, according to the bazel remote
1058
                                // execution spec, we simply ignore it continue.
1059
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1060
0
                            }
1061
0
                            return Err(e).err_tip(|| {
1062
0
                                format!("Could not open file {}", full_path.display())
1063
0
                            });
1064
                        }
1065
                    };
1066
1067
7
                    if metadata.is_file() {
  Branch (1067:24): [Folded - Ignored]
  Branch (1067:24): [Folded - Ignored]
  Branch (1067:24): [True: 4, False: 3]
1068
                        return Ok(OutputType::File(
1069
4
                            upload_file(cas_store.as_pin(), &full_path, hasher, metadata)
1070
4
                                .await
1071
4
                                .map(|mut file_info| {
1072
4
                                    file_info.name_or_path = NameOrPath::Path(entry);
1073
4
                                    file_info
1074
4
                                })
1075
4
                                .err_tip(|| format!(
"Uploading file {}"0
,
full_path.display()0
))
?0
,
1076
                        ));
1077
3
                    }
1078
3
                    metadata
1079
                };
1080
3
                if metadata.is_dir() {
  Branch (1080:20): [Folded - Ignored]
  Branch (1080:20): [Folded - Ignored]
  Branch (1080:20): [True: 2, False: 1]
1081
                    Ok(OutputType::Directory(
1082
2
                        upload_directory(cas_store.as_pin(), &full_path, work_directory, hasher)
1083
2
                            .and_then(|(root_dir, children)| async move {
1084
2
                                let tree = ProtoTree {
1085
2
                                    root: Some(root_dir),
1086
2
                                    children: children.into(),
1087
2
                                };
1088
2
                                let tree_digest = serialize_and_upload_message(
1089
2
                                    &tree,
1090
2
                                    cas_store.as_pin(),
1091
2
                                    &mut hasher.hasher(),
1092
2
                                )
1093
2
                                .await
1094
2
                                .err_tip(|| format!(
"While processing {entry}"0
))
?0
;
1095
2
                                Ok(DirectoryInfo {
1096
2
                                    path: entry,
1097
2
                                    tree_digest,
1098
2
                                })
1099
4
                            })
1100
2
                            .await
1101
2
                            .err_tip(|| format!(
"Uploading directory {}"0
,
full_path.display()0
))
?0
,
1102
                    ))
1103
1
                } else if metadata.is_symlink() {
  Branch (1103:27): [Folded - Ignored]
  Branch (1103:27): [Folded - Ignored]
  Branch (1103:27): [True: 1, False: 0]
1104
1
                    let output_symlink = upload_symlink(&full_path, work_directory)
1105
1
                        .await
1106
1
                        .map(|mut symlink_info| {
1107
1
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1108
1
                            symlink_info
1109
1
                        })
1110
1
                        .err_tip(|| format!(
"Uploading symlink {}"0
,
full_path.display()0
))
?0
;
1111
1
                    match fs::metadata(&full_path).await {
1112
1
                        Ok(metadata) => {
1113
1
                            if metadata.is_dir() {
  Branch (1113:32): [Folded - Ignored]
  Branch (1113:32): [Folded - Ignored]
  Branch (1113:32): [True: 0, False: 1]
1114
0
                                return Ok(OutputType::DirectorySymlink(output_symlink));
1115
1
                            }
1116
                            // Note: If it's anything but directory we put it as a file symlink.
1117
1
                            return Ok(OutputType::FileSymlink(output_symlink));
1118
                        }
1119
0
                        Err(e) => {
1120
0
                            if e.code != Code::NotFound {
  Branch (1120:32): [Folded - Ignored]
  Branch (1120:32): [Folded - Ignored]
  Branch (1120:32): [True: 0, False: 0]
1121
0
                                return Err(e).err_tip(|| {
1122
0
                                    format!(
1123
0
                                        "While querying target symlink metadata for {}",
1124
0
                                        full_path.display()
1125
                                    )
1126
0
                                });
1127
0
                            }
1128
                            // If the file doesn't exist, we consider it a file. Even though the
1129
                            // file doesn't exist we still need to populate an entry.
1130
0
                            return Ok(OutputType::FileSymlink(output_symlink));
1131
                        }
1132
                    }
1133
                } else {
1134
0
                    Err(make_err!(
1135
0
                        Code::Internal,
1136
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1137
0
                    ))
1138
                }
1139
7
            });
1140
        }
1141
11
        let mut output_files = vec![];
1142
11
        let mut output_folders = vec![];
1143
11
        let mut output_directory_symlinks = vec![];
1144
11
        let mut output_file_symlinks = vec![];
1145
1146
11
        if execution_result.exit_code != 0 {
  Branch (1146:12): [Folded - Ignored]
  Branch (1146:12): [Folded - Ignored]
  Branch (1146:12): [True: 5, False: 6]
1147
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1148
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1149
5
            error!(
1150
                exit_code = ?execution_result.exit_code,
1151
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1152
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1153
5
                "Command returned non-zero exit code",
1154
            );
1155
6
        }
1156
1157
11
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1158
11
            let data = execution_result.stdout;
1159
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1160
11
            cas_store
1161
11
                .update_oneshot(digest, data)
1162
11
                .await
1163
11
                .err_tip(|| "Uploading stdout")
?0
;
1164
11
            Result::<DigestInfo, Error>::Ok(digest)
1165
11
        });
1166
11
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1167
11
            let data = execution_result.stderr;
1168
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1169
11
            cas_store
1170
11
                .update_oneshot(digest, data)
1171
11
                .await
1172
11
                .err_tip(|| "Uploading stdout")
?0
;
1173
11
            Result::<DigestInfo, Error>::Ok(digest)
1174
11
        });
1175
1176
11
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1177
18
            while let Some(
output_type7
) = output_path_futures.try_next().await
?0
{
  Branch (1177:23): [Folded - Ignored]
  Branch (1177:23): [Folded - Ignored]
  Branch (1177:23): [True: 7, False: 11]
1178
7
                match output_type {
1179
4
                    OutputType::File(output_file) => output_files.push(output_file),
1180
2
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1181
1
                    OutputType::FileSymlink(output_symlink) => {
1182
1
                        output_file_symlinks.push(output_symlink);
1183
1
                    }
1184
0
                    OutputType::DirectorySymlink(output_symlink) => {
1185
0
                        output_directory_symlinks.push(output_symlink);
1186
0
                    }
1187
0
                    OutputType::None => { /* Safe to ignore */ }
1188
                }
1189
            }
1190
11
            Ok(())
1191
11
        });
1192
11
        drop(output_path_futures);
1193
11
        let (stdout_digest, stderr_digest) = match upload_result {
1194
11
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1195
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1196
        };
1197
1198
11
        execution_metadata.output_upload_completed_timestamp =
1199
11
            (self.running_actions_manager.callbacks.now_fn)();
1200
11
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1201
11
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1202
11
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1203
11
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1204
11
        {
1205
11
            let mut state = self.state.lock();
1206
11
            execution_metadata.worker_completed_timestamp =
1207
11
                (self.running_actions_manager.callbacks.now_fn)();
1208
11
            state.action_result = Some(ActionResult {
1209
11
                output_files,
1210
11
                output_folders,
1211
11
                output_directory_symlinks,
1212
11
                output_file_symlinks,
1213
11
                exit_code: execution_result.exit_code,
1214
11
                stdout_digest,
1215
11
                stderr_digest,
1216
11
                execution_metadata,
1217
11
                server_logs: HashMap::default(), // TODO(aaronmondal) Not implemented.
1218
11
                error: state.error.clone(),
1219
11
                message: String::new(), // Will be filled in on cache_action_result if needed.
1220
11
            });
1221
11
        }
1222
11
        Ok(self)
1223
11
    }
1224
1225
11
    async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
1226
11
        let mut state = self.state.lock();
1227
11
        state
1228
11
            .action_result
1229
11
            .take()
1230
11
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1231
11
    }
1232
}
1233
1234
impl Drop for RunningActionImpl {
1235
15
    fn drop(&mut self) {
1236
15
        if self.did_cleanup.load(Ordering::Acquire) {
  Branch (1236:12): [True: 15, False: 0]
  Branch (1236:12): [Folded - Ignored]
1237
15
            return;
1238
0
        }
1239
0
        let operation_id = self.operation_id.clone();
1240
0
        error!(
1241
            ?operation_id,
1242
0
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1243
        );
1244
0
        let running_actions_manager = self.running_actions_manager.clone();
1245
0
        let action_directory = self.action_directory.clone();
1246
0
        background_spawn!("running_action_impl_drop", async move {
1247
0
            let Err(err) =
  Branch (1247:17): [True: 0, False: 0]
  Branch (1247:17): [Folded - Ignored]
1248
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1249
            else {
1250
0
                return;
1251
            };
1252
0
            error!(
1253
                ?operation_id,
1254
                ?action_directory,
1255
                ?err,
1256
0
                "Error cleaning up action"
1257
            );
1258
0
        });
1259
15
    }
1260
}
1261
1262
impl RunningAction for RunningActionImpl {
1263
0
    fn get_operation_id(&self) -> &OperationId {
1264
0
        &self.operation_id
1265
0
    }
1266
1267
15
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1268
15
        self.metrics()
1269
15
            .clone()
1270
15
            .prepare_action
1271
15
            .wrap(Self::inner_prepare_action(self))
1272
15
            .await
1273
15
    }
1274
1275
13
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1276
13
        self.metrics()
1277
13
            .clone()
1278
13
            .execute
1279
13
            .wrap(Self::inner_execute(self))
1280
13
            .await
1281
13
    }
1282
1283
11
    
async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1284
11
        self.metrics()
1285
11
            .clone()
1286
11
            .upload_results
1287
11
            .wrap(Self::inner_upload_results(self))
1288
11
            .await
1289
11
    }
1290
1291
15
    async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1292
15
        self.metrics()
1293
15
            .clone()
1294
15
            .cleanup
1295
15
            .wrap(async move {
1296
15
                let result = do_cleanup(
1297
15
                    &self.running_actions_manager,
1298
15
                    &self.operation_id,
1299
15
                    &self.action_directory,
1300
15
                )
1301
15
                .await;
1302
15
                self.did_cleanup.store(true, Ordering::Release);
1303
15
                result.map(move |()| self)
1304
15
            })
1305
15
            .await
1306
15
    }
1307
1308
11
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1309
11
        self.metrics()
1310
11
            .clone()
1311
11
            .get_finished_result
1312
11
            .wrap(Self::inner_get_finished_result(self))
1313
11
            .await
1314
11
    }
1315
1316
0
    fn get_work_directory(&self) -> &String {
1317
0
        &self.work_directory
1318
0
    }
1319
}
1320
1321
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1322
    type RunningAction: RunningAction;
1323
1324
    fn create_and_add_action(
1325
        self: &Arc<Self>,
1326
        worker_id: String,
1327
        start_execute: StartExecute,
1328
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1329
1330
    fn cache_action_result(
1331
        &self,
1332
        action_digest: DigestInfo,
1333
        action_result: &mut ActionResult,
1334
        hasher: DigestHasherFunc,
1335
    ) -> impl Future<Output = Result<(), Error>> + Send;
1336
1337
    fn complete_actions(&self, complete_msg: ShutdownGuard) -> impl Future<Output = ()> + Send;
1338
1339
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1340
1341
    fn kill_operation(
1342
        &self,
1343
        operation_id: &OperationId,
1344
    ) -> impl Future<Output = Result<(), Error>> + Send;
1345
1346
    fn metrics(&self) -> &Arc<Metrics>;
1347
}
1348
1349
/// A function to get the current system time, used to allow mocking for tests
1350
type NowFn = fn() -> SystemTime;
1351
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1352
1353
/// Functions that may be injected for testing purposes, during standard control
1354
/// flows these are specified by the new function.
1355
#[derive(Clone, Copy)]
1356
pub struct Callbacks {
1357
    /// A function that gets the current time.
1358
    pub now_fn: NowFn,
1359
    /// A function that sleeps for a given Duration.
1360
    pub sleep_fn: SleepFn,
1361
}
1362
1363
impl Debug for Callbacks {
1364
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1365
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
1366
0
    }
1367
}
1368
1369
/// The set of additional information for executing an action over and above
1370
/// those given in the `ActionInfo` passed to the worker.  This allows
1371
/// modification of the action for execution on this particular worker.  This
1372
/// may be used to run the action with a particular set of additional
1373
/// environment variables, or perhaps configure it to execute within a
1374
/// container.
1375
#[derive(Debug, Default)]
1376
pub struct ExecutionConfiguration {
1377
    /// If set, will be executed instead of the first argument passed in the
1378
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
1379
    /// arguments to this command.
1380
    pub entrypoint: Option<String>,
1381
    /// The only environment variables that will be specified when the command
1382
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
1383
    /// and PATH are also assigned (see `inner_execute`).
1384
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1385
}
1386
1387
#[derive(Debug)]
1388
struct UploadActionResults {
1389
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1390
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1391
    ac_store: Option<Store>,
1392
    historical_store: Store,
1393
    success_message_template: Template,
1394
    failure_message_template: Template,
1395
}
1396
1397
impl UploadActionResults {
1398
24
    fn new(
1399
24
        config: &UploadActionResultConfig,
1400
24
        ac_store: Option<Store>,
1401
24
        historical_store: Store,
1402
24
    ) -> Result<Self, Error> {
1403
24
        let upload_historical_results_strategy = config
1404
24
            .upload_historical_results_strategy
1405
24
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1406
8
        if !matches!(
  Branch (1406:12): [True: 8, False: 16]
  Branch (1406:12): [Folded - Ignored]
1407
24
            config.upload_ac_results_strategy,
1408
            UploadCacheResultsStrategy::Never
1409
8
        ) && ac_store.is_none()
  Branch (1409:14): [True: 0, False: 8]
  Branch (1409:14): [Folded - Ignored]
1410
        {
1411
0
            return Err(make_input_err!(
1412
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1413
0
            ));
1414
24
        }
1415
        Ok(Self {
1416
24
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1417
24
            upload_historical_results_strategy,
1418
24
            ac_store,
1419
24
            historical_store,
1420
24
            success_message_template: Template::new(&config.success_message_template).map_err(
1421
0
                |e| {
1422
0
                    make_input_err!(
1423
                        "Could not convert success_message_template to rust template: {} : {e:?}",
1424
                        config.success_message_template
1425
                    )
1426
0
                },
1427
0
            )?,
1428
24
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1429
0
                |e| {
1430
0
                    make_input_err!(
1431
                        "Could not convert failure_message_template to rust template: {} : {e:?}",
1432
                        config.success_message_template
1433
                    )
1434
0
                },
1435
0
            )?,
1436
        })
1437
24
    }
1438
1439
0
    const fn should_cache_result(
1440
0
        strategy: UploadCacheResultsStrategy,
1441
0
        action_result: &ActionResult,
1442
0
        treat_infra_error_as_failure: bool,
1443
0
    ) -> bool {
1444
0
        let did_fail = action_result.exit_code != 0
  Branch (1444:24): [Folded - Ignored]
  Branch (1444:24): [Folded - Ignored]
1445
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
  Branch (1445:17): [Folded - Ignored]
  Branch (1445:17): [Folded - Ignored]
1446
0
        match strategy {
1447
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
1448
0
            UploadCacheResultsStrategy::Never => false,
1449
            // Never cache internal errors or timeouts.
1450
            UploadCacheResultsStrategy::Everything => {
1451
0
                treat_infra_error_as_failure || action_result.error.is_none()
  Branch (1451:17): [Folded - Ignored]
  Branch (1451:17): [Folded - Ignored]
1452
            }
1453
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
1454
        }
1455
0
    }
1456
1457
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
1458
    /// or `failure_message_template` config templates.
1459
5
    fn format_execute_response_message(
1460
5
        mut template_str: Template,
1461
5
        action_digest_info: DigestInfo,
1462
5
        maybe_historical_digest_info: Option<DigestInfo>,
1463
5
        hasher: DigestHasherFunc,
1464
5
    ) -> Result<String, Error> {
1465
5
        template_str.replace(
1466
            "digest_function",
1467
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1468
        );
1469
5
        template_str.replace(
1470
            "action_digest_hash",
1471
5
            action_digest_info.packed_hash().to_string(),
1472
        );
1473
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1474
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
  Branch (1474:16): [True: 3, False: 2]
  Branch (1474:16): [Folded - Ignored]
1475
3
            template_str.replace(
1476
3
                "historical_results_hash",
1477
3
                format!("{}", historical_digest_info.packed_hash()),
1478
3
            );
1479
3
            template_str.replace(
1480
3
                "historical_results_size",
1481
3
                historical_digest_info.size_bytes(),
1482
3
            );
1483
3
        } else {
1484
2
            template_str.replace("historical_results_hash", "");
1485
2
            template_str.replace("historical_results_size", "");
1486
2
        }
1487
5
        template_str
1488
5
            .text()
1489
5
            .map_err(|e| make_input_err!("Could not convert template to text: {e:?}"))
1490
5
    }
1491
1492
5
    async fn upload_ac_results(
1493
5
        &self,
1494
5
        action_digest: DigestInfo,
1495
5
        action_result: ProtoActionResult,
1496
5
        hasher: DigestHasherFunc,
1497
5
    ) -> Result<(), Error> {
1498
5
        let Some(ac_store) = self.ac_store.as_ref() else {
  Branch (1498:13): [Folded - Ignored]
  Branch (1498:13): [Folded - Ignored]
  Branch (1498:13): [True: 5, False: 0]
1499
0
            return Ok(());
1500
        };
1501
        // If we are a GrpcStore we shortcut here, as this is a special store.
1502
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (1502:16): [Folded - Ignored]
  Branch (1502:16): [Folded - Ignored]
  Branch (1502:16): [True: 0, False: 5]
1503
0
            let update_action_request = UpdateActionResultRequest {
1504
0
                // This is populated by `update_action_result`.
1505
0
                instance_name: String::new(),
1506
0
                action_digest: Some(action_digest.into()),
1507
0
                action_result: Some(action_result),
1508
0
                results_cache_policy: None,
1509
0
                digest_function: hasher.proto_digest_func().into(),
1510
0
            };
1511
0
            return grpc_store
1512
0
                .update_action_result(Request::new(update_action_request))
1513
0
                .await
1514
0
                .map(|_| ())
1515
0
                .err_tip(|| "Caching ActionResult");
1516
5
        }
1517
1518
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1519
5
        action_result
1520
5
            .encode(&mut store_data)
1521
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
1522
1523
5
        ac_store
1524
5
            .update_oneshot(action_digest, store_data.split().freeze())
1525
5
            .await
1526
5
            .err_tip(|| "Caching ActionResult")
1527
5
    }
1528
1529
3
    async fn upload_historical_results_with_message(
1530
3
        &self,
1531
3
        action_digest: DigestInfo,
1532
3
        execute_response: ExecuteResponse,
1533
3
        message_template: Template,
1534
3
        hasher: DigestHasherFunc,
1535
3
    ) -> Result<String, Error> {
1536
3
        let historical_digest_info = serialize_and_upload_message(
1537
3
            &HistoricalExecuteResponse {
1538
3
                action_digest: Some(action_digest.into()),
1539
3
                execute_response: Some(execute_response.clone()),
1540
3
            },
1541
3
            self.historical_store.as_pin(),
1542
3
            &mut hasher.hasher(),
1543
3
        )
1544
3
        .await
1545
3
        .err_tip(|| format!(
"Caching HistoricalExecuteResponse for digest: {action_digest}"0
))
?0
;
1546
1547
3
        Self::format_execute_response_message(
1548
3
            message_template,
1549
3
            action_digest,
1550
3
            Some(historical_digest_info),
1551
3
            hasher,
1552
        )
1553
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
1554
3
    }
1555
1556
6
    async fn cache_action_result(
1557
6
        &self,
1558
6
        action_info: DigestInfo,
1559
6
        action_result: &mut ActionResult,
1560
6
        hasher: DigestHasherFunc,
1561
6
    ) -> Result<(), Error> {
1562
6
        let should_upload_historical_results =
1563
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1564
6
        let should_upload_ac_results =
1565
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1566
        // Shortcut so we don't need to convert to proto if not needed.
1567
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
  Branch (1567:12): [Folded - Ignored]
  Branch (1567:41): [Folded - Ignored]
  Branch (1567:12): [Folded - Ignored]
  Branch (1567:41): [Folded - Ignored]
  Branch (1567:12): [True: 1, False: 5]
  Branch (1567:41): [True: 1, False: 0]
1568
1
            return Ok(());
1569
5
        }
1570
1571
5
        let mut execute_response = to_execute_response(action_result.clone());
1572
1573
        // In theory exit code should always be != 0 if there's an error, but for safety we
1574
        // catch both.
1575
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
  Branch (1575:35): [Folded - Ignored]
  Branch (1575:67): [Folded - Ignored]
  Branch (1575:35): [Folded - Ignored]
  Branch (1575:67): [Folded - Ignored]
  Branch (1575:35): [True: 4, False: 1]
  Branch (1575:67): [True: 3, False: 1]
1576
3
            self.success_message_template.clone()
1577
        } else {
1578
2
            self.failure_message_template.clone()
1579
        };
1580
1581
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
  Branch (1581:64): [Folded - Ignored]
  Branch (1581:64): [Folded - Ignored]
  Branch (1581:64): [True: 3, False: 2]
1582
3
            let maybe_message = self
1583
3
                .upload_historical_results_with_message(
1584
3
                    action_info,
1585
3
                    execute_response.clone(),
1586
3
                    message_template,
1587
3
                    hasher,
1588
3
                )
1589
3
                .await;
1590
3
            match maybe_message {
1591
3
                Ok(message) => {
1592
3
                    action_result.message.clone_from(&message);
1593
3
                    execute_response.message = message;
1594
3
                    Ok(())
1595
                }
1596
0
                Err(e) => Result::<(), Error>::Err(e),
1597
            }
1598
        } else {
1599
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1600
            {
1601
2
                Ok(message) => {
1602
2
                    action_result.message.clone_from(&message);
1603
2
                    execute_response.message = message;
1604
2
                    Ok(())
1605
                }
1606
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1607
            }
1608
        };
1609
1610
        // Note: Done in this order because we assume most results will succeed and most configs will
1611
        // either always upload upload historical results or only upload on filure. In which case
1612
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1613
5
        let ac_upload_results = if should_upload_ac_results {
  Branch (1613:36): [Folded - Ignored]
  Branch (1613:36): [Folded - Ignored]
  Branch (1613:36): [True: 5, False: 0]
1614
5
            self.upload_ac_results(
1615
5
                action_info,
1616
5
                execute_response
1617
5
                    .result
1618
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
1619
5
                hasher,
1620
            )
1621
5
            .await
1622
        } else {
1623
0
            Ok(())
1624
        };
1625
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1626
6
    }
1627
}
1628
1629
#[derive(Debug)]
1630
pub struct RunningActionsManagerArgs<'a> {
1631
    pub root_action_directory: String,
1632
    pub execution_configuration: ExecutionConfiguration,
1633
    pub cas_store: Arc<FastSlowStore>,
1634
    pub ac_store: Option<Store>,
1635
    pub historical_store: Store,
1636
    pub upload_action_result_config: &'a UploadActionResultConfig,
1637
    pub max_action_timeout: Duration,
1638
    pub timeout_handled_externally: bool,
1639
}
1640
1641
/// Holds state info about what is being executed and the interface for interacting
1642
/// with actions while they are running.
1643
#[derive(Debug)]
1644
pub struct RunningActionsManagerImpl {
1645
    root_action_directory: String,
1646
    execution_configuration: ExecutionConfiguration,
1647
    cas_store: Arc<FastSlowStore>,
1648
    filesystem_store: Arc<FilesystemStore>,
1649
    upload_action_results: UploadActionResults,
1650
    max_action_timeout: Duration,
1651
    timeout_handled_externally: bool,
1652
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
1653
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
1654
    // Notify does not support.
1655
    action_done_tx: watch::Sender<()>,
1656
    callbacks: Callbacks,
1657
    metrics: Arc<Metrics>,
1658
}
1659
1660
impl RunningActionsManagerImpl {
1661
24
    pub fn new_with_callbacks(
1662
24
        args: RunningActionsManagerArgs<'_>,
1663
24
        callbacks: Callbacks,
1664
24
    ) -> Result<Self, Error> {
1665
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
1666
24
        let filesystem_store = args
1667
24
            .cas_store
1668
24
            .fast_store()
1669
24
            .downcast_ref::<FilesystemStore>(None)
1670
24
            .err_tip(
1671
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
1672
0
            )?
1673
24
            .get_arc()
1674
24
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
1675
24
        let (action_done_tx, _) = watch::channel(());
1676
        Ok(Self {
1677
24
            root_action_directory: args.root_action_directory,
1678
24
            execution_configuration: args.execution_configuration,
1679
24
            cas_store: args.cas_store,
1680
24
            filesystem_store,
1681
24
            upload_action_results: UploadActionResults::new(
1682
24
                args.upload_action_result_config,
1683
24
                args.ac_store,
1684
24
                args.historical_store,
1685
            )
1686
24
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
1687
24
            max_action_timeout: args.max_action_timeout,
1688
24
            timeout_handled_externally: args.timeout_handled_externally,
1689
24
            running_actions: Mutex::new(HashMap::new()),
1690
24
            action_done_tx,
1691
24
            callbacks,
1692
24
            metrics: Arc::new(Metrics::default()),
1693
        })
1694
24
    }
1695
1696
10
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
1697
10
        Self::new_with_callbacks(
1698
10
            args,
1699
            Callbacks {
1700
10
                now_fn: SystemTime::now,
1701
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
1702
            },
1703
        )
1704
10
    }
1705
1706
0
    fn make_action_directory<'a>(
1707
0
        &'a self,
1708
0
        operation_id: &'a OperationId,
1709
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
1710
16
        
self.metrics.make_action_directory0
.
wrap0
(async move {
1711
16
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
1712
16
            fs::create_dir(&action_directory)
1713
16
                .await
1714
16
                .err_tip(|| format!(
"Error creating action directory {action_directory}"0
))
?0
;
1715
16
            Ok(action_directory)
1716
16
        })
1717
0
    }
1718
1719
16
    fn create_action_info(
1720
16
        &self,
1721
16
        start_execute: StartExecute,
1722
16
        queued_timestamp: SystemTime,
1723
16
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
1724
16
        self.metrics.create_action_info.wrap(async move {
1725
16
            let execute_request = start_execute
1726
16
                .execute_request
1727
16
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
1728
16
            let action_digest: DigestInfo = execute_request
1729
16
                .action_digest
1730
16
                .clone()
1731
16
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
1732
16
                .try_into()
?0
;
1733
16
            let load_start_timestamp = (self.callbacks.now_fn)();
1734
16
            let action =
1735
16
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
1736
16
                    .await
1737
16
                    .err_tip(|| "During start_action")
?0
;
1738
16
            let action_info = ActionInfo::try_from_action_and_execute_request(
1739
16
                execute_request,
1740
16
                action,
1741
16
                load_start_timestamp,
1742
16
                queued_timestamp,
1743
            )
1744
16
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
1745
16
            Ok(action_info)
1746
16
        })
1747
16
    }
1748
1749
15
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
1750
15
        let mut running_actions = self.running_actions.lock();
1751
15
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
1752
0
            format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl")
1753
0
        });
1754
        // No need to copy anything, we just are telling the receivers an event happened.
1755
15
        self.action_done_tx.send_modify(|()| {});
1756
15
        result.map(|_| ())
1757
15
    }
1758
1759
    // Note: We do not capture metrics on this call, only `.kill_all()`.
1760
    // Important: When the future returns the process may still be running.
1761
2
    
async fn kill_operation(action: Arc<RunningActionImpl>)0
{
1762
2
        warn!(
1763
2
            operation_id = ?action.operation_id,
1764
2
            "Sending kill to running operation",
1765
        );
1766
2
        let kill_channel_tx = {
1767
2
            let mut action_state = action.state.lock();
1768
2
            action_state.kill_channel_tx.take()
1769
        };
1770
2
        if let Some(kill_channel_tx) = kill_channel_tx {
  Branch (1770:16): [Folded - Ignored]
  Branch (1770:16): [Folded - Ignored]
  Branch (1770:16): [True: 2, False: 0]
1771
2
            if kill_channel_tx.send(()).is_err() {
  Branch (1771:16): [Folded - Ignored]
  Branch (1771:16): [Folded - Ignored]
  Branch (1771:16): [True: 0, False: 2]
1772
0
                error!(
1773
0
                    operation_id = ?action.operation_id,
1774
0
                    "Error sending kill to running operation",
1775
                );
1776
2
            }
1777
0
        }
1778
2
    }
1779
}
1780
1781
impl RunningActionsManager for RunningActionsManagerImpl {
1782
    type RunningAction = RunningActionImpl;
1783
1784
16
    async fn create_and_add_action(
1785
16
        self: &Arc<Self>,
1786
16
        worker_id: String,
1787
16
        start_execute: StartExecute,
1788
16
    ) -> Result<Arc<RunningActionImpl>, Error> {
1789
16
        self.metrics
1790
16
            .create_and_add_action
1791
16
            .wrap(async move {
1792
16
                let queued_timestamp = start_execute
1793
16
                    .queued_timestamp
1794
16
                    .and_then(|time| 
time10
.
try_into10
().
ok10
())
1795
16
                    .unwrap_or(SystemTime::UNIX_EPOCH);
1796
16
                let operation_id = start_execute
1797
16
                    .operation_id.as_str().into();
1798
16
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
1799
16
                debug!(
1800
                    ?action_info,
1801
0
                    "Worker received action",
1802
                );
1803
16
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
1804
16
                let execution_metadata = ExecutionMetadata {
1805
16
                    worker: worker_id,
1806
16
                    queued_timestamp: action_info.insert_timestamp,
1807
16
                    worker_start_timestamp: action_info.load_timestamp,
1808
16
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
1809
16
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
1810
16
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
1811
16
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
1812
16
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
1813
16
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
1814
16
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
1815
16
                };
1816
16
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
  Branch (1816:34): [Folded - Ignored]
  Branch (1816:67): [Folded - Ignored]
  Branch (1816:34): [Folded - Ignored]
  Branch (1816:67): [Folded - Ignored]
  Branch (1816:34): [True: 13, False: 3]
  Branch (1816:67): [True: 0, False: 3]
1817
13
                    self.max_action_timeout
1818
                } else {
1819
3
                    action_info.timeout
1820
                };
1821
16
                if timeout > self.max_action_timeout {
  Branch (1821:20): [Folded - Ignored]
  Branch (1821:20): [Folded - Ignored]
  Branch (1821:20): [True: 1, False: 15]
1822
1
                    return Err(make_err!(
1823
1
                        Code::InvalidArgument,
1824
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
1825
1
                        timeout.as_secs_f32(),
1826
1
                        self.max_action_timeout.as_secs_f32()
1827
1
                    ));
1828
15
                }
1829
15
                let running_action = Arc::new(RunningActionImpl::new(
1830
15
                    execution_metadata,
1831
15
                    operation_id.clone(),
1832
15
                    action_directory,
1833
15
                    action_info,
1834
15
                    timeout,
1835
15
                    self.clone(),
1836
                ));
1837
15
                {
1838
15
                    let mut running_actions = self.running_actions.lock();
1839
15
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
1840
15
                }
1841
15
                Ok(running_action)
1842
16
            })
1843
16
            .await
1844
16
    }
1845
1846
6
    async fn cache_action_result(
1847
6
        &self,
1848
6
        action_info: DigestInfo,
1849
6
        action_result: &mut ActionResult,
1850
6
        hasher: DigestHasherFunc,
1851
6
    ) -> Result<(), Error> {
1852
6
        self.metrics
1853
6
            .cache_action_result
1854
6
            .wrap(self.upload_action_results.cache_action_result(
1855
6
                action_info,
1856
6
                action_result,
1857
6
                hasher,
1858
6
            ))
1859
6
            .await
1860
6
    }
1861
1862
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
1863
0
        let running_action = {
1864
0
            let running_actions = self.running_actions.lock();
1865
0
            running_actions
1866
0
                .get(operation_id)
1867
0
                .and_then(Weak::upgrade)
1868
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
1869
        };
1870
0
        Self::kill_operation(running_action).await;
1871
0
        Ok(())
1872
0
    }
1873
1874
    // Waits for all running actions to complete and signals completion.
1875
    // Use the ShutdownGuard to signal the completion of the actions
1876
    // Dropping the sender automatically notifies the process to terminate.
1877
0
    async fn complete_actions(&self, _complete_msg: ShutdownGuard) {
1878
0
        drop(
1879
0
            self.action_done_tx
1880
0
                .subscribe()
1881
0
                .wait_for(|()| self.running_actions.lock().is_empty())
1882
0
                .await,
1883
        );
1884
0
    }
1885
1886
    // Note: When the future returns the process should be fully killed and cleaned up.
1887
2
    
async fn kill_all(&self)0
{
1888
2
        self.metrics
1889
2
            .kill_all
1890
2
            .wrap_no_capture_result(async move {
1891
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
1892
2
                    let running_actions = self.running_actions.lock();
1893
2
                    running_actions
1894
2
                        .iter()
1895
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
1896
2
                        .collect()
1897
                };
1898
4
                for 
action2
in kill_operations {
1899
2
                    Self::kill_operation(action).await;
1900
                }
1901
2
            })
1902
2
            .await;
1903
        // Ignore error. If error happens it means there's no sender, which is not a problem.
1904
        // Note: Sanity check this API will always check current value then future values:
1905
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
1906
2
        drop(
1907
2
            self.action_done_tx
1908
2
                .subscribe()
1909
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
1910
2
                .await,
1911
        );
1912
2
    }
1913
1914
    #[inline]
1915
2
    fn metrics(&self) -> &Arc<Metrics> {
1916
2
        &self.metrics
1917
2
    }
1918
}
1919
1920
#[derive(Debug, Default, MetricsComponent)]
1921
pub struct Metrics {
1922
    #[metric(help = "Stats about the create_and_add_action command.")]
1923
    create_and_add_action: AsyncCounterWrapper,
1924
    #[metric(help = "Stats about the cache_action_result command.")]
1925
    cache_action_result: AsyncCounterWrapper,
1926
    #[metric(help = "Stats about the kill_all command.")]
1927
    kill_all: AsyncCounterWrapper,
1928
    #[metric(help = "Stats about the create_action_info command.")]
1929
    create_action_info: AsyncCounterWrapper,
1930
    #[metric(help = "Stats about the make_work_directory command.")]
1931
    make_action_directory: AsyncCounterWrapper,
1932
    #[metric(help = "Stats about the prepare_action command.")]
1933
    prepare_action: AsyncCounterWrapper,
1934
    #[metric(help = "Stats about the execute command.")]
1935
    execute: AsyncCounterWrapper,
1936
    #[metric(help = "Stats about the upload_results command.")]
1937
    upload_results: AsyncCounterWrapper,
1938
    #[metric(help = "Stats about the cleanup command.")]
1939
    cleanup: AsyncCounterWrapper,
1940
    #[metric(help = "Stats about the get_finished_result command.")]
1941
    get_finished_result: AsyncCounterWrapper,
1942
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
1943
    get_proto_command_from_store: AsyncCounterWrapper,
1944
    #[metric(help = "Stats about the download_to_directory command.")]
1945
    download_to_directory: AsyncCounterWrapper,
1946
    #[metric(help = "Stats about the prepare_output_files command.")]
1947
    prepare_output_files: AsyncCounterWrapper,
1948
    #[metric(help = "Stats about the prepare_output_paths command.")]
1949
    prepare_output_paths: AsyncCounterWrapper,
1950
    #[metric(help = "Stats about the child_process command.")]
1951
    child_process: AsyncCounterWrapper,
1952
    #[metric(help = "Stats about the child_process_success_error_code command.")]
1953
    child_process_success_error_code: CounterWithTime,
1954
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
1955
    child_process_failure_error_code: CounterWithTime,
1956
    #[metric(help = "Total time spent uploading stdout.")]
1957
    upload_stdout: AsyncCounterWrapper,
1958
    #[metric(help = "Total time spent uploading stderr.")]
1959
    upload_stderr: AsyncCounterWrapper,
1960
    #[metric(help = "Total number of task timeouts.")]
1961
    task_timeouts: CounterWithTime,
1962
}