Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::cmp::min;
17
use std::collections::vec_deque::VecDeque;
18
use std::collections::HashMap;
19
use std::convert::Into;
20
use std::ffi::{OsStr, OsString};
21
use std::fmt::Debug;
22
#[cfg(target_family = "unix")]
23
use std::fs::Permissions;
24
#[cfg(target_family = "unix")]
25
use std::os::unix::fs::{MetadataExt, PermissionsExt};
26
use std::path::Path;
27
use std::pin::Pin;
28
use std::process::Stdio;
29
use std::sync::atomic::{AtomicBool, Ordering};
30
use std::sync::{Arc, Weak};
31
use std::time::{Duration, SystemTime};
32
33
use bytes::{Bytes, BytesMut};
34
use filetime::{set_file_mtime, FileTime};
35
use formatx::Template;
36
use futures::future::{
37
    try_join, try_join3, try_join_all, BoxFuture, Future, FutureExt, TryFutureExt,
38
};
39
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
40
use nativelink_config::cas_server::{
41
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
42
};
43
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
44
use nativelink_metric::MetricsComponent;
45
use nativelink_proto::build::bazel::remote::execution::v2::{
46
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
47
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
48
    Tree as ProtoTree, UpdateActionResultRequest,
49
};
50
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
51
    HistoricalExecuteResponse, StartExecute,
52
};
53
use nativelink_store::ac_utils::{
54
    compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, ESTIMATED_DIGEST_SIZE,
55
};
56
use nativelink_store::fast_slow_store::FastSlowStore;
57
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
58
use nativelink_store::grpc_store::GrpcStore;
59
use nativelink_util::action_messages::{
60
    to_execute_response, ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo,
61
    NameOrPath, OperationId, SymlinkInfo,
62
};
63
use nativelink_util::common::{fs, DigestInfo};
64
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
65
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
66
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
67
use nativelink_util::{background_spawn, spawn, spawn_blocking};
68
use parking_lot::Mutex;
69
use prost::Message;
70
use relative_path::RelativePath;
71
use scopeguard::{guard, ScopeGuard};
72
use serde::Deserialize;
73
use tokio::io::{AsyncReadExt, AsyncSeekExt};
74
use tokio::process;
75
use tokio::sync::{oneshot, watch};
76
use tokio_stream::wrappers::ReadDirStream;
77
use tonic::Request;
78
use tracing::{enabled, event, Level};
79
use uuid::Uuid;
80
81
/// For simplicity we use a fixed exit code for cases when our program is terminated
82
/// due to a signal.
83
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
84
85
/// Default strategy for uploading historical results.
86
/// Note: If this value changes the config documentation
87
/// should reflect it.
88
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
89
    UploadCacheResultsStrategy::failures_only;
90
91
/// Valid string reasons for a failure.
92
/// Note: If these change, the documentation should be updated.
93
#[allow(non_camel_case_types)]
94
0
#[derive(Debug, Deserialize)]
95
enum SideChannelFailureReason {
96
    /// Task should be considered timed out.
97
    timeout,
98
}
99
100
/// This represents the json data that can be passed from the running process
101
/// to the parent via the SideChannelFile. See:
102
/// `config::EnvironmentSource::sidechannelfile` for more details.
103
/// Note: Any fields added here must be added to the documentation.
104
0
#[derive(Debug, Deserialize, Default)]
105
struct SideChannelInfo {
106
    /// If the task should be considered a failure and why.
107
    failure: Option<SideChannelFailureReason>,
108
}
109
110
/// Aggressively download the digests of files and make a local folder from it. This function
111
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
112
/// should be rate limited if spawning too many requests at once is an issue.
113
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
114
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
115
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
116
/// to a new location.
117
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
118
// of the future. So we need to force this function to return a dynamic future instead.
119
// see: https://github.com/rust-lang/rust/issues/78649
120
16
pub fn download_to_directory<'a>(
121
16
    cas_store: &'a FastSlowStore,
122
16
    filesystem_store: Pin<&'a FilesystemStore>,
123
16
    digest: &'a DigestInfo,
124
16
    current_directory: &'a str,
125
16
) -> BoxFuture<'a, Result<(), Error>> {
126
16
    async move {
127
16
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
128
97
            .await
129
16
            .err_tip(|| 
"Converting digest to Directory"0
)
?0
;
130
16
        let mut futures = FuturesUnordered::new();
131
132
20
        for 
file4
in directory.files {
133
4
            let digest: DigestInfo = file
134
4
                .digest
135
4
                .err_tip(|| 
"Expected Digest to exist in Directory::file::digest"0
)
?0
136
4
                .try_into()
137
4
                .err_tip(|| 
"In Directory::file::digest"0
)
?0
;
138
4
            let dest = format!("{}/{}", current_directory, file.name);
139
4
            let (mtime, mut unix_mode) = match file.node_properties {
140
1
                Some(properties) => (properties.mtime, properties.unix_mode),
141
3
                None => (None, None),
142
            };
143
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
144
4
            if file.is_executable {
  Branch (144:16): [True: 1, False: 3]
  Branch (144:16): [Folded - Ignored]
145
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
146
3
            }
147
4
            futures.push(
148
4
                cas_store
149
4
                    .populate_fast_store(digest.into())
150
4
                    .and_then(move |_| async move {
151
4
                        let file_entry = filesystem_store
152
4
                            .get_file_entry_for_digest(&digest)
153
4
                            .await
154
4
                            .err_tip(|| 
"During hard link"0
)
?0
;
155
4
                        file_entry
156
4
                            .get_file_path_locked(|src| fs::hard_link(src, &dest))
157
4
                            .await
158
4
                            .map_err(|e| {
159
0
                                make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}")
160
4
                            })
?0
;
161
                        #[cfg(target_family = "unix")]
162
4
                        if let Some(
unix_mode1
) = unix_mode {
  Branch (162:32): [True: 1, False: 3]
  Branch (162:32): [Folded - Ignored]
163
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
164
1
                                .await
165
1
                                .err_tip(|| {
166
0
                                    format!(
167
0
                                        "Could not set unix mode in download_to_directory {dest}"
168
0
                                    )
169
1
                                })
?0
;
170
3
                        }
171
4
                        if let Some(
mtime1
) = mtime {
  Branch (171:32): [True: 1, False: 3]
  Branch (171:32): [Folded - Ignored]
172
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
173
1
                                set_file_mtime(
174
1
                                    &dest,
175
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
176
1
                                )
177
1
                                .err_tip(|| {
178
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
179
1
                                })
180
1
                            })
181
1
                            .await
182
1
                            .err_tip(|| {
183
0
                                "Failed to launch spawn_blocking in download_to_directory"
184
1
                            })
?0
?0
;
185
3
                        }
186
4
                        Ok(())
187
8
                    })
188
4
                    .map_err(move |e| 
e.append(format!("for digest {digest}"))0
)
189
4
                    .boxed(),
190
4
            );
191
4
        }
192
193
22
        for 
directory6
in directory.directories {
194
6
            let digest: DigestInfo = directory
195
6
                .digest
196
6
                .err_tip(|| 
"Expected Digest to exist in Directory::directories::digest"0
)
?0
197
6
                .try_into()
198
6
                .err_tip(|| 
"In Directory::file::digest"0
)
?0
;
199
6
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
200
6
            futures.push(
201
6
                async move {
202
6
                    fs::create_dir(&new_directory_path)
203
5
                        .await
204
6
                        .err_tip(|| 
format!("Could not create directory {new_directory_path}")0
)
?0
;
205
6
                    download_to_directory(
206
6
                        cas_store,
207
6
                        filesystem_store,
208
6
                        &digest,
209
6
                        &new_directory_path,
210
6
                    )
211
35
                    .await
212
6
                    .err_tip(|| 
format!("in download_to_directory : {new_directory_path}")0
)
?0
;
213
6
                    Ok(())
214
6
                }
215
6
                .boxed(),
216
6
            );
217
6
        }
218
219
        #[cfg(target_family = "unix")]
220
17
        for 
symlink_node1
in directory.symlinks {
221
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
222
1
            futures.push(
223
1
                async move {
224
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| {
225
0
                        format!(
226
0
                            "Could not create symlink {} -> {}",
227
0
                            symlink_node.target, dest
228
0
                        )
229
1
                    })
?0
;
230
1
                    Ok(())
231
1
                }
232
1
                .boxed(),
233
1
            );
234
1
        }
235
236
104
        while 
futures.try_next()27
.await
?0
.
is_some()27
{}11
  Branch (236:15): [True: 11, False: 16]
  Branch (236:15): [Folded - Ignored]
237
16
        Ok(())
238
16
    }
239
16
    .boxed()
240
16
}
241
242
#[cfg(target_family = "windows")]
243
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
244
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
245
    EXECUTABLE_EXTENSIONS
246
        .iter()
247
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
248
}
249
250
#[cfg(target_family = "unix")]
251
2
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
252
2
    (metadata.mode() & 0o111) != 0
253
2
}
254
255
2
async fn upload_file(
256
2
    cas_store: Pin<&impl StoreLike>,
257
2
    full_path: impl AsRef<Path> + Debug,
258
2
    hasher: DigestHasherFunc,
259
2
    metadata: std::fs::Metadata,
260
2
) -> Result<FileInfo, Error> {
261
2
    let is_executable = is_executable(&metadata, &full_path);
262
2
    let file_size = metadata.len();
263
2
    let resumeable_file = fs::open_file(&full_path, u64::MAX)
264
2
        .await
265
2
        .err_tip(|| 
format!("Could not open file {full_path:?}")0
)
?0
;
266
267
2
    let (digest, mut resumeable_file) = hasher
268
2
        .hasher()
269
2
        .digest_for_file(resumeable_file, Some(file_size))
270
6
        .await
271
2
        .err_tip(|| 
format!("Failed to hash file in digest_for_file failed for {full_path:?}")0
)
?0
;
272
273
2
    resumeable_file
274
2
        .as_reader()
275
0
        .await
276
2
        .err_tip(|| 
"Could not get reader from file slot in RunningActionsManager::upload_file()"0
)
?0
277
2
        .get_mut()
278
2
        .rewind()
279
2
        .await
280
2
        .err_tip(|| 
"Could not rewind file"0
)
?0
;
281
282
    // Note: For unknown reasons we appear to be hitting:
283
    // https://github.com/rust-lang/rust/issues/92096
284
    // or a smiliar issue if we try to use the non-store driver function, so we
285
    // are using the store driver function here.
286
2
    cas_store
287
2
        .as_store_driver_pin()
288
2
        .update_with_whole_file(
289
2
            digest.into(),
290
2
            resumeable_file,
291
2
            UploadSizeInfo::ExactSize(digest.size_bytes()),
292
2
        )
293
10
        .await
294
2
        .err_tip(|| 
format!("for {full_path:?}")0
)
?0
;
295
296
2
    let name = full_path
297
2
        .as_ref()
298
2
        .file_name()
299
2
        .err_tip(|| 
format!("Expected file_name to exist on {full_path:?}")0
)
?0
300
2
        .to_str()
301
2
        .err_tip(|| {
302
0
            make_err!(
303
0
                Code::Internal,
304
0
                "Could not convert {:?} to string",
305
0
                full_path
306
0
            )
307
2
        })
?0
308
2
        .to_string();
309
2
310
2
    Ok(FileInfo {
311
2
        name_or_path: NameOrPath::Name(name),
312
2
        digest,
313
2
        is_executable,
314
2
    })
315
2
}
316
317
0
async fn upload_symlink(
318
0
    full_path: impl AsRef<Path> + Debug,
319
0
    full_work_directory_path: impl AsRef<Path>,
320
0
) -> Result<SymlinkInfo, Error> {
321
0
    let full_target_path = fs::read_link(full_path.as_ref())
322
0
        .await
323
0
        .err_tip(|| format!("Could not get read_link path of {full_path:?}"))?;
324
325
    // Detect if our symlink is inside our work directory, if it is find the
326
    // relative path otherwise use the absolute path.
327
0
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
  Branch (327:21): [Folded - Ignored]
  Branch (327:21): [Folded - Ignored]
  Branch (327:21): [True: 0, False: 0]
  Branch (327:21): [True: 0, False: 0]
328
0
        let full_target_path = RelativePath::from_path(&full_target_path)
329
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
330
0
        RelativePath::from_path(full_work_directory_path.as_ref())
331
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
332
0
            .relative(full_target_path)
333
0
            .normalize()
334
0
            .into_string()
335
    } else {
336
0
        full_target_path
337
0
            .to_str()
338
0
            .err_tip(|| {
339
0
                make_err!(
340
0
                    Code::Internal,
341
0
                    "Could not convert '{:?}' to string",
342
0
                    full_target_path
343
0
                )
344
0
            })?
345
0
            .to_string()
346
    };
347
348
0
    let name = full_path
349
0
        .as_ref()
350
0
        .file_name()
351
0
        .err_tip(|| format!("Expected file_name to exist on {full_path:?}"))?
352
0
        .to_str()
353
0
        .err_tip(|| {
354
0
            make_err!(
355
0
                Code::Internal,
356
0
                "Could not convert {:?} to string",
357
0
                full_path
358
0
            )
359
0
        })?
360
0
        .to_string();
361
0
362
0
    Ok(SymlinkInfo {
363
0
        name_or_path: NameOrPath::Name(name),
364
0
        target,
365
0
    })
366
0
}
367
368
0
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
369
0
    cas_store: Pin<&'a impl StoreLike>,
370
0
    full_dir_path: P,
371
0
    full_work_directory: &'a str,
372
0
    hasher: DigestHasherFunc,
373
0
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
374
0
    Box::pin(async move {
375
0
        let file_futures = FuturesUnordered::new();
376
0
        let dir_futures = FuturesUnordered::new();
377
0
        let symlink_futures = FuturesUnordered::new();
378
        {
379
0
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
380
0
                .await
381
0
                .err_tip(|| format!("Error reading dir for reading {full_dir_path:?}"))?
382
0
                .into_inner();
383
0
            let mut dir_stream = ReadDirStream::new(dir_handle);
384
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
385
            // lived as possible. This is why we iterate the directory and then build a bunch of
386
            // futures with all the work we are wanting to do then execute it. It allows us to
387
            // close the directory iterator file descriptor, then open the child files/folders.
388
0
            while let Some(entry_result) = dir_stream.next().await {
  Branch (388:23): [Folded - Ignored]
  Branch (388:23): [Folded - Ignored]
  Branch (388:23): [True: 0, False: 0]
  Branch (388:23): [True: 0, False: 0]
389
0
                let entry = entry_result.err_tip(|| "Error while iterating directory")?;
390
0
                let file_type = entry
391
0
                    .file_type()
392
0
                    .await
393
0
                    .err_tip(|| format!("Error running file_type() on {entry:?}"))?;
394
0
                let full_path = full_dir_path.as_ref().join(entry.path());
395
0
                if file_type.is_dir() {
  Branch (395:20): [Folded - Ignored]
  Branch (395:20): [Folded - Ignored]
  Branch (395:20): [True: 0, False: 0]
  Branch (395:20): [True: 0, False: 0]
396
0
                    let full_dir_path = full_dir_path.clone();
397
0
                    dir_futures.push(
398
0
                        upload_directory(cas_store, full_path.clone(), full_work_directory, hasher)
399
0
                            .and_then(|(dir, all_dirs)| async move {
400
0
                                let directory_name = full_path
401
0
                                    .file_name()
402
0
                                    .err_tip(|| {
403
0
                                        format!("Expected file_name to exist on {full_dir_path:?}")
404
0
                                    })?
405
0
                                    .to_str()
406
0
                                    .err_tip(|| {
407
0
                                        make_err!(
408
0
                                            Code::Internal,
409
0
                                            "Could not convert {:?} to string",
410
0
                                            full_dir_path
411
0
                                        )
412
0
                                    })?
413
0
                                    .to_string();
414
415
0
                                let digest = serialize_and_upload_message(
416
0
                                    &dir,
417
0
                                    cas_store,
418
0
                                    &mut hasher.hasher(),
419
0
                                )
420
0
                                .await
421
0
                                .err_tip(|| format!("for {full_path:?}"))?;
422
423
0
                                Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
424
0
                                    DirectoryNode {
425
0
                                        name: directory_name,
426
0
                                        digest: Some(digest.into()),
427
0
                                    },
428
0
                                    all_dirs,
429
0
                                ))
430
0
                            })
431
0
                            .boxed(),
432
0
                    );
433
0
                } else if file_type.is_file() {
  Branch (433:27): [Folded - Ignored]
  Branch (433:27): [Folded - Ignored]
  Branch (433:27): [True: 0, False: 0]
  Branch (433:27): [True: 0, False: 0]
434
0
                    file_futures.push(async move {
435
0
                        let metadata = fs::metadata(&full_path)
436
0
                            .await
437
0
                            .err_tip(|| format!("Could not open file {full_path:?}"))?;
438
0
                        upload_file(cas_store, &full_path, hasher, metadata)
439
0
                            .map_ok(Into::into)
440
0
                            .await
441
0
                    });
442
0
                } else if file_type.is_symlink() {
  Branch (442:27): [Folded - Ignored]
  Branch (442:27): [Folded - Ignored]
  Branch (442:27): [True: 0, False: 0]
  Branch (442:27): [True: 0, False: 0]
443
0
                    symlink_futures
444
0
                        .push(upload_symlink(full_path, &full_work_directory).map_ok(Into::into));
445
0
                }
446
            }
447
        }
448
449
0
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
450
0
            file_futures.try_collect::<Vec<FileNode>>(),
451
0
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
452
0
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
453
0
        )
454
0
        .await?;
455
456
0
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
457
0
        // For efficiency we use a deque because it allows cheap concat of Vecs.
458
0
        // We make the assumption here that when performance is important it is because
459
0
        // our directory is quite large. This allows us to cheaply merge large amounts of
460
0
        // directories into one VecDeque. Then after we are done we need to collapse it
461
0
        // down into a single Vec.
462
0
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
463
0
        for (directory_node, mut recursive_child_directories) in dir_entries {
464
0
            directory_nodes.push(directory_node);
465
0
            all_child_directories.append(&mut recursive_child_directories);
466
0
        }
467
468
0
        file_nodes.sort_unstable_by(|a, b| a.name.cmp(&b.name));
469
0
        directory_nodes.sort_unstable_by(|a, b| a.name.cmp(&b.name));
470
0
        symlinks.sort_unstable_by(|a, b| a.name.cmp(&b.name));
471
0
472
0
        let directory = Directory {
473
0
            files: file_nodes,
474
0
            directories: directory_nodes,
475
0
            symlinks,
476
0
            node_properties: None, // We don't support file properties.
477
0
        };
478
0
        all_child_directories.push_back(directory.clone());
479
0
480
0
        Ok((directory, all_child_directories))
481
0
    })
482
0
}
483
484
0
async fn process_side_channel_file(
485
0
    side_channel_file: Cow<'_, OsStr>,
486
0
    args: &[&OsStr],
487
0
    timeout: Duration,
488
0
) -> Result<Option<Error>, Error> {
489
0
    let mut json_contents = String::new();
490
    {
491
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
492
0
        let mut file_slot = match fs::open_file(side_channel_file, u64::MAX).await {
493
0
            Ok(file_slot) => file_slot,
494
0
            Err(e) => {
495
0
                if e.code != Code::NotFound {
  Branch (495:20): [Folded - Ignored]
  Branch (495:20): [Folded - Ignored]
  Branch (495:20): [True: 0, False: 0]
496
0
                    return Err(e).err_tip(|| "Error opening side channel file");
497
0
                }
498
0
                // Note: If file does not exist, it's ok. Users are not required to create this file.
499
0
                return Ok(None);
500
            }
501
        };
502
0
        let reader = file_slot
503
0
            .as_reader()
504
0
            .await
505
0
            .err_tip(|| "Error getting reader from side channel file (maybe permissions?)")?;
506
0
        reader
507
0
            .read_to_string(&mut json_contents)
508
0
            .await
509
0
            .err_tip(|| "Error reading side channel file")?;
510
    }
511
512
0
    let side_channel_info: SideChannelInfo =
513
0
        serde_json5::from_str(&json_contents).map_err(|e| {
514
0
            make_input_err!(
515
0
                "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}"
516
0
            )
517
0
        })?;
518
0
    Ok(side_channel_info.failure.map(|failure| match failure {
519
0
        SideChannelFailureReason::timeout => Error::new(
520
0
            Code::DeadlineExceeded,
521
0
            format!(
522
0
                "Command '{}' timed out after {} seconds",
523
0
                args.join(OsStr::new(" ")).to_string_lossy(),
524
0
                timeout.as_secs_f32()
525
0
            ),
526
0
        ),
527
0
    }))
528
0
}
529
530
7
async fn do_cleanup(
531
7
    running_actions_manager: &RunningActionsManagerImpl,
532
7
    operation_id: &OperationId,
533
7
    action_directory: &str,
534
7
) -> Result<(), Error> {
535
7
    event!(Level::INFO, 
"Worker cleaning up"0
);
536
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
537
7
    let remove_dir_result = fs::remove_dir_all(action_directory)
538
7
        .await
539
7
        .err_tip(|| 
format!("Could not remove working directory {action_directory}")0
);
540
7
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
  Branch (540:12): [True: 0, False: 0]
  Branch (540:12): [Folded - Ignored]
  Branch (540:12): [True: 0, False: 7]
541
0
        event!(
542
0
            Level::ERROR,
543
            ?operation_id,
544
            ?err,
545
0
            "Error cleaning up action"
546
        );
547
0
        return Result::<(), Error>::Err(err).merge(remove_dir_result);
548
7
    }
549
7
    if let Err(
err0
) = remove_dir_result {
  Branch (549:12): [True: 0, False: 0]
  Branch (549:12): [Folded - Ignored]
  Branch (549:12): [True: 0, False: 7]
550
0
        event!(
551
0
            Level::ERROR,
552
            ?operation_id,
553
            ?err,
554
0
            "Error removing working directory"
555
        );
556
0
        return Err(err);
557
7
    }
558
7
    Ok(())
559
7
}
560
561
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
562
    /// Returns the action id of the action.
563
    fn get_operation_id(&self) -> &OperationId;
564
565
    /// Anything that needs to execute before the actions is actually executed should happen here.
566
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
567
568
    /// Actually perform the execution of the action.
569
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
570
571
    /// Any uploading, processing or analyzing of the results should happen here.
572
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
573
574
    /// Cleanup any residual files, handles or other junk resulting from running the action.
575
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
576
577
    /// Returns the final result. As a general rule this action should be thought of as
578
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
579
    /// is over and any action performed on it after this call is undefined behavior.
580
    fn get_finished_result(
581
        self: Arc<Self>,
582
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
583
584
    /// Returns the work directory of the action.
585
    fn get_work_directory(&self) -> &String;
586
}
587
588
struct RunningActionImplExecutionResult {
589
    stdout: Bytes,
590
    stderr: Bytes,
591
    exit_code: i32,
592
}
593
594
struct RunningActionImplState {
595
    command_proto: Option<ProtoCommand>,
596
    // TODO(allada) Kill is not implemented yet, but is instrumented.
597
    // However, it is used if the worker disconnects to destroy current jobs.
598
    kill_channel_tx: Option<oneshot::Sender<()>>,
599
    kill_channel_rx: Option<oneshot::Receiver<()>>,
600
    execution_result: Option<RunningActionImplExecutionResult>,
601
    action_result: Option<ActionResult>,
602
    execution_metadata: ExecutionMetadata,
603
    // If there was an internal error, this will be set.
604
    // This should NOT be set if everything was fine, but the process had a
605
    // non-zero exit code. Instead this should be used for internal errors
606
    // that prevented the action from running, upload failures, timeouts, exc...
607
    // but we have (or could have) the action results (like stderr/stdout).
608
    error: Option<Error>,
609
}
610
611
pub struct RunningActionImpl {
612
    operation_id: OperationId,
613
    action_directory: String,
614
    work_directory: String,
615
    action_info: ActionInfo,
616
    timeout: Duration,
617
    running_actions_manager: Arc<RunningActionsManagerImpl>,
618
    state: Mutex<RunningActionImplState>,
619
    did_cleanup: AtomicBool,
620
}
621
622
impl RunningActionImpl {
623
7
    fn new(
624
7
        execution_metadata: ExecutionMetadata,
625
7
        operation_id: OperationId,
626
7
        action_directory: String,
627
7
        action_info: ActionInfo,
628
7
        timeout: Duration,
629
7
        running_actions_manager: Arc<RunningActionsManagerImpl>,
630
7
    ) -> Self {
631
7
        let work_directory = format!("{}/{}", action_directory, "work");
632
7
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
633
7
        Self {
634
7
            operation_id,
635
7
            action_directory,
636
7
            work_directory,
637
7
            action_info,
638
7
            timeout,
639
7
            running_actions_manager,
640
7
            state: Mutex::new(RunningActionImplState {
641
7
                command_proto: None,
642
7
                kill_channel_rx: Some(kill_channel_rx),
643
7
                kill_channel_tx: Some(kill_channel_tx),
644
7
                execution_result: None,
645
7
                action_result: None,
646
7
                execution_metadata,
647
7
                error: None,
648
7
            }),
649
7
            did_cleanup: AtomicBool::new(false),
650
7
        }
651
7
    }
652
653
76
    fn metrics(&self) -> &Arc<Metrics> {
654
76
        &self.running_actions_manager.metrics
655
76
    }
656
657
    /// Prepares any actions needed to execution this action. This action will do the following:
658
    ///
659
    /// * Download any files needed to execute the action
660
    /// * Build a folder with all files needed to execute the action.
661
    ///
662
    /// This function will aggressively download and spawn potentially thousands of futures. It is
663
    /// up to the stores to rate limit if needed.
664
7
    async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error> {
665
7
        {
666
7
            let mut state = self.state.lock();
667
7
            state.execution_metadata.input_fetch_start_timestamp =
668
7
                (self.running_actions_manager.callbacks.now_fn)();
669
7
        }
670
7
        let command = {
671
            // Download and build out our input files/folders. Also fetch and decode our Command.
672
7
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
673
7
                get_and_decode_digest::<ProtoCommand>(
674
7
                    self.running_actions_manager.cas_store.as_ref(),
675
7
                    self.action_info.command_digest.into(),
676
7
                )
677
52
                .await
678
7
                .err_tip(|| 
"Converting command_digest to Command"0
)
679
7
            });
680
7
            let filesystem_store_pin =
681
7
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
682
7
            let (command, _) = try_join(command_fut, async {
683
7
                fs::create_dir(&self.work_directory)
684
9
                    .await
685
7
                    .err_tip(|| 
format!("Error creating work directory {}", self.work_directory)0
)
?0
;
686
                // Download the input files/folder and place them into the temp directory.
687
7
                self.metrics()
688
7
                    .download_to_directory
689
7
                    .wrap(download_to_directory(
690
7
                        &self.running_actions_manager.cas_store,
691
7
                        filesystem_store_pin,
692
7
                        &self.action_info.input_root_digest,
693
7
                        &self.work_directory,
694
7
                    ))
695
82
                    .await
696
7
            })
697
94
            .await
?0
;
698
7
            command
699
7
        };
700
7
        {
701
7
            // Create all directories needed for our output paths. This is required by the bazel spec.
702
7
            let prepare_output_directories = |output_file| 
{4
703
4
                let full_output_path = if command.working_directory.is_empty() {
  Branch (703:43): [Folded - Ignored]
  Branch (703:43): [Folded - Ignored]
  Branch (703:43): [True: 1, False: 3]
704
1
                    format!("{}/{}", self.work_directory, output_file)
705
                } else {
706
3
                    format!(
707
3
                        "{}/{}/{}",
708
3
                        self.work_directory, command.working_directory, output_file
709
3
                    )
710
                };
711
4
                async move {
712
4
                    let full_parent_path = Path::new(&full_output_path)
713
4
                        .parent()
714
4
                        .err_tip(|| 
format!("Parent path for {full_output_path} has no parent")0
)
?0
;
715
4
                    fs::create_dir_all(full_parent_path).await.err_tip(|| {
716
0
                        format!(
717
0
                            "Error creating output directory {} (file)",
718
0
                            full_parent_path.display()
719
0
                        )
720
4
                    })
?0
;
721
4
                    Result::<(), Error>::Ok(())
722
4
                }
723
4
            };
724
7
            self.metrics()
725
7
                .prepare_output_files
726
7
                .wrap(try_join_all(
727
7
                    command.output_files.iter().map(prepare_output_directories),
728
7
                ))
729
2
                .await
?0
;
730
7
            self.metrics()
731
7
                .prepare_output_paths
732
7
                .wrap(try_join_all(
733
7
                    command.output_paths.iter().map(prepare_output_directories),
734
7
                ))
735
2
                .await
?0
;
736
        }
737
7
        event!(Level::INFO, ?command, 
"Worker received command"0
,);
738
7
        {
739
7
            let mut state = self.state.lock();
740
7
            state.command_proto = Some(command);
741
7
            state.execution_metadata.input_fetch_completed_timestamp =
742
7
                (self.running_actions_manager.callbacks.now_fn)();
743
7
        }
744
7
        Ok(self)
745
7
    }
746
747
5
    async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error> {
748
5
        let (command_proto, mut kill_channel_rx) = {
749
5
            let mut state = self.state.lock();
750
5
            state.execution_metadata.execution_start_timestamp =
751
5
                (self.running_actions_manager.callbacks.now_fn)();
752
5
            (
753
5
                state
754
5
                    .command_proto
755
5
                    .take()
756
5
                    .err_tip(|| 
"Expected state to have command_proto in execute()"0
)
?0
,
757
5
                state
758
5
                    .kill_channel_rx
759
5
                    .take()
760
5
                    .err_tip(|| 
"Expected state to have kill_channel_rx in execute()"0
)
?0
761
                    // This is important as we may be killed at any point.
762
5
                    .fuse(),
763
5
            )
764
5
        };
765
5
        if command_proto.arguments.is_empty() {
  Branch (765:12): [Folded - Ignored]
  Branch (765:12): [Folded - Ignored]
  Branch (765:12): [True: 0, False: 5]
766
0
            return Err(make_input_err!("No arguments provided in Command proto"));
767
5
        }
768
5
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
  Branch (768:40): [Folded - Ignored]
  Branch (768:40): [Folded - Ignored]
  Branch (768:40): [True: 0, False: 5]
769
5
            .running_actions_manager
770
5
            .execution_configuration
771
5
            .entrypoint
772
        {
773
0
            std::iter::once(entrypoint.as_ref())
774
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
775
0
                .collect()
776
        } else {
777
5
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
778
        };
779
5
        event!(Level::INFO, ?args, 
"Executing command"0
,);
780
5
        let mut command_builder = process::Command::new(args[0]);
781
5
        command_builder
782
5
            .args(&args[1..])
783
5
            .kill_on_drop(true)
784
5
            .stdin(Stdio::null())
785
5
            .stdout(Stdio::piped())
786
5
            .stderr(Stdio::piped())
787
5
            .current_dir(format!(
788
5
                "{}/{}",
789
5
                self.work_directory, command_proto.working_directory
790
5
            ))
791
5
            .env_clear();
792
793
5
        let requested_timeout = if self.action_info.timeout.is_zero() {
  Branch (793:36): [Folded - Ignored]
  Branch (793:36): [Folded - Ignored]
  Branch (793:36): [True: 5, False: 0]
794
5
            self.running_actions_manager.max_action_timeout
795
        } else {
796
0
            self.action_info.timeout
797
        };
798
799
5
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
800
5
        if let Some(
additional_environment0
) = &self
  Branch (800:16): [Folded - Ignored]
  Branch (800:16): [Folded - Ignored]
  Branch (800:16): [True: 0, False: 5]
801
5
            .running_actions_manager
802
5
            .execution_configuration
803
5
            .additional_environment
804
        {
805
0
            for (name, source) in additional_environment {
806
0
                let value = match source {
807
0
                    EnvironmentSource::property(property) => self
808
0
                        .action_info
809
0
                        .platform_properties
810
0
                        .get(property)
811
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
812
0
                    EnvironmentSource::value(value) => Cow::Borrowed(value.as_str()),
813
                    EnvironmentSource::timeout_millis => {
814
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
815
                    }
816
                    EnvironmentSource::side_channel_file => {
817
0
                        let file_cow =
818
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
819
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
820
0
                        Cow::Owned(file_cow)
821
                    }
822
                    EnvironmentSource::action_directory => {
823
0
                        Cow::Borrowed(self.action_directory.as_str())
824
                    }
825
                };
826
0
                command_builder.env(name, value.as_ref());
827
            }
828
5
        }
829
830
        #[cfg(target_family = "unix")]
831
5
        let envs = &command_proto.environment_variables;
832
        // If SystemRoot is not set on windows we set it to default. Failing to do
833
        // this causes all commands to fail.
834
        #[cfg(target_family = "windows")]
835
        let envs = {
836
            let mut envs = command_proto.environment_variables.clone();
837
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
838
                envs.push(
839
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
840
                        name: "SystemRoot".to_string(),
841
                        value: "C:\\Windows".to_string(),
842
                    },
843
                );
844
            }
845
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
846
                envs.push(
847
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
848
                        name: "PATH".to_string(),
849
                        value: "C:\\Windows\\System32".to_string(),
850
                    },
851
                );
852
            }
853
            envs
854
        };
855
6
        for 
environment_variable1
in envs {
856
1
            command_builder.env(&environment_variable.name, &environment_variable.value);
857
1
        }
858
859
5
        let mut child_process = command_builder
860
5
            .spawn()
861
5
            .err_tip(|| 
format!("Could not execute command {args:?}")0
)
?0
;
862
5
        let mut stdout_reader = child_process
863
5
            .stdout
864
5
            .take()
865
5
            .err_tip(|| 
"Expected stdout to exist on command this should never happen"0
)
?0
;
866
5
        let mut stderr_reader = child_process
867
5
            .stderr
868
5
            .take()
869
5
            .err_tip(|| 
"Expected stderr to exist on command this should never happen"0
)
?0
;
870
871
5
        let mut child_process_guard = guard(child_process, |mut child_process| {
872
0
            event!(
873
0
                Level::ERROR,
874
0
                "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
875
            );
876
0
            background_spawn!("running_actions_manager_kill_child_process", async move {
877
0
                child_process.kill().await
878
0
            });
879
5
        
}0
);
880
881
5
        let all_stdout_fut = spawn!("stdout_reader", async move {
882
5
            let mut all_stdout = BytesMut::new();
883
            loop {
884
7
                let 
sz6
= stdout_reader
885
7
                    .read_buf(&mut all_stdout)
886
6
                    .await
887
6
                    .err_tip(|| 
"Error reading stdout stream"0
)
?0
;
888
6
                if sz == 0 {
  Branch (888:20): [Folded - Ignored]
  Branch (888:20): [Folded - Ignored]
  Branch (888:20): [True: 4, False: 2]
889
4
                    break; // EOF.
890
2
                }
891
            }
892
4
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
893
5
        
}4
);
894
5
        let all_stderr_fut = spawn!("stderr_reader", async move {
895
5
            let mut all_stderr = BytesMut::new();
896
            loop {
897
7
                let 
sz6
= stderr_reader
898
7
                    .read_buf(&mut all_stderr)
899
6
                    .await
900
6
                    .err_tip(|| 
"Error reading stderr stream"0
)
?0
;
901
6
                if sz == 0 {
  Branch (901:20): [Folded - Ignored]
  Branch (901:20): [Folded - Ignored]
  Branch (901:20): [True: 4, False: 2]
902
4
                    break; // EOF.
903
2
                }
904
            }
905
4
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
906
5
        
}4
);
907
5
        let mut killed_action = false;
908
5
909
5
        let timer = self.metrics().child_process.begin_timer();
910
5
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
911
        loop {
912
6
            tokio::select! {
913
6
                _ = &mut sleep_fut => {
914
1
                    self.running_actions_manager.metrics.task_timeouts.inc();
915
1
                    killed_action = true;
916
1
                    if let Err(
err0
) = child_process_guard.start_kill() {
  Branch (916:28): [Folded - Ignored]
  Branch (916:28): [Folded - Ignored]
  Branch (916:28): [True: 0, False: 1]
917
0
                        event!(
918
0
                            Level::ERROR,
919
                            ?err,
920
0
                            "Could not kill process in RunningActionsManager for action timeout",
921
                        );
922
1
                    }
923
1
                    {
924
1
                        let mut state = self.state.lock();
925
1
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
926
1
                            Code::DeadlineExceeded,
927
1
                            format!(
928
1
                                "Command '{}' timed out after {} seconds",
929
1
                                args.join(OsStr::new(" ")).to_string_lossy(),
930
1
                                self.action_info.timeout.as_secs_f32()
931
1
                            )
932
1
                        )));
933
1
                    }
934
                },
935
6
                
maybe_exit_status5
= child_process_guard.wait() => {
936
                    // Defuse our guard so it does not try to cleanup and make nessless logs.
937
5
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
938
5
                    let exit_status = maybe_exit_status.err_tip(|| 
"Failed to collect exit code of process"0
)
?0
;
939
                    // TODO(allada) We should implement stderr/stdout streaming to client here.
940
                    // If we get killed before the stream is started, then these will lock up.
941
                    // TODO(allada) There is a significant bug here. If we kill the action and the action creates
942
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
943
5
                    let (stdout, stderr) = if killed_action {
  Branch (943:47): [Folded - Ignored]
  Branch (943:47): [Folded - Ignored]
  Branch (943:47): [True: 1, False: 4]
944
1
                        drop(timer);
945
1
                        (Bytes::new(), Bytes::new())
946
                    } else {
947
4
                        timer.measure();
948
4
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
949
                        (
950
4
                            maybe_all_stdout.err_tip(|| 
"Internal error reading from stdout of worker task"0
)
?0
?0
,
951
4
                            maybe_all_stderr.err_tip(|| 
"Internal error reading from stderr of worker task"0
)
?0
?0
952
                        )
953
                    };
954
5
                    let exit_code = if let Some(
exit_code4
) = exit_status.code() {
  Branch (954:44): [Folded - Ignored]
  Branch (954:44): [Folded - Ignored]
  Branch (954:44): [True: 4, False: 1]
955
4
                        if exit_code == 0 {
  Branch (955:28): [Folded - Ignored]
  Branch (955:28): [Folded - Ignored]
  Branch (955:28): [True: 3, False: 1]
956
3
                            self.metrics().child_process_success_error_code.inc();
957
3
                        } else {
958
1
                            self.metrics().child_process_failure_error_code.inc();
959
1
                        }
960
4
                        exit_code
961
                    } else {
962
1
                        EXIT_CODE_FOR_SIGNAL
963
                    };
964
965
5
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
  Branch (965:55): [Folded - Ignored]
  Branch (965:55): [Folded - Ignored]
  Branch (965:55): [True: 0, False: 5]
966
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
967
0
                        .err_tip(|| format!("Error processing side channel file: {side_channel_file:?}"))?
968
                    } else {
969
5
                        None
970
                    };
971
5
                    {
972
5
                        let mut state = self.state.lock();
973
5
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
974
5
975
5
                        state.command_proto = Some(command_proto);
976
5
                        state.execution_result = Some(RunningActionImplExecutionResult{
977
5
                            stdout,
978
5
                            stderr,
979
5
                            exit_code,
980
5
                        });
981
5
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
982
5
                    }
983
5
                    return Ok(self);
984
                },
985
6
                _ = &mut kill_channel_rx => {
986
0
                    killed_action = true;
987
0
                    if let Err(err) = child_process_guard.start_kill() {
  Branch (987:28): [Folded - Ignored]
  Branch (987:28): [Folded - Ignored]
  Branch (987:28): [True: 0, False: 0]
988
0
                        event!(
989
0
                            Level::ERROR,
990
0
                            operation_id = ?self.operation_id,
991
0
                            ?err,
992
0
                            "Could not kill process",
993
                        );
994
                    } else {
995
0
                        event!(
996
0
                            Level::ERROR,
997
0
                            operation_id = ?self.operation_id,
998
0
                            "Could not get child process id, maybe already dead?",
999
                        );
1000
                    }
1001
0
                    {
1002
0
                        let mut state = self.state.lock();
1003
0
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1004
0
                            Code::Aborted,
1005
0
                            format!(
1006
0
                                "Command '{}' was killed by scheduler",
1007
0
                                args.join(OsStr::new(" ")).to_string_lossy()
1008
0
                            )
1009
0
                        )));
1010
0
                    }
1011
                },
1012
            }
1013
        }
1014
        // Unreachable.
1015
5
    }
1016
1017
5
    
async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> 0
{
1018
        enum OutputType {
1019
            None,
1020
            File(FileInfo),
1021
            Directory(DirectoryInfo),
1022
            FileSymlink(SymlinkInfo),
1023
            DirectorySymlink(SymlinkInfo),
1024
        }
1025
1026
5
        event!(Level::INFO, 
"Worker uploading results"0
,);
1027
5
        let (mut command_proto, execution_result, mut execution_metadata) = {
1028
5
            let mut state = self.state.lock();
1029
5
            state.execution_metadata.output_upload_start_timestamp =
1030
5
                (self.running_actions_manager.callbacks.now_fn)();
1031
5
            (
1032
5
                state
1033
5
                    .command_proto
1034
5
                    .take()
1035
5
                    .err_tip(|| 
"Expected state to have command_proto in execute()"0
)
?0
,
1036
5
                state
1037
5
                    .execution_result
1038
5
                    .take()
1039
5
                    .err_tip(|| 
"Execution result does not exist at upload_results stage"0
)
?0
,
1040
5
                state.execution_metadata.clone(),
1041
5
            )
1042
5
        };
1043
5
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1044
5
        let hasher = self.action_info.unique_qualifier.digest_function();
1045
5
1046
5
        let mut output_path_futures = FuturesUnordered::new();
1047
5
        let mut output_paths = command_proto.output_paths;
1048
5
        if output_paths.is_empty() {
  Branch (1048:12): [Folded - Ignored]
  Branch (1048:12): [Folded - Ignored]
  Branch (1048:12): [True: 3, False: 2]
1049
3
            output_paths
1050
3
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1051
3
            output_paths.append(&mut command_proto.output_files);
1052
3
            output_paths.append(&mut command_proto.output_directories);
1053
3
        }
2
1054
7
        for 
entry2
in output_paths {
1055
2
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
  Branch (1055:47): [Folded - Ignored]
  Branch (1055:47): [Folded - Ignored]
  Branch (1055:47): [True: 0, False: 2]
1056
0
                format!("{}/{}", self.work_directory, entry)
1057
            } else {
1058
2
                format!(
1059
2
                    "{}/{}/{}",
1060
2
                    self.work_directory, command_proto.working_directory, entry
1061
2
                )
1062
            });
1063
2
            let work_directory = &self.work_directory;
1064
2
            output_path_futures.push(async move {
1065
0
                let metadata = {
1066
2
                    let metadata = match fs::symlink_metadata(&full_path).await {
1067
2
                        Ok(file) => file,
1068
0
                        Err(e) => {
1069
0
                            if e.code == Code::NotFound {
  Branch (1069:32): [Folded - Ignored]
  Branch (1069:32): [Folded - Ignored]
  Branch (1069:32): [True: 0, False: 0]
1070
                                // In the event our output does not exist, according to the bazel remote
1071
                                // execution spec, we simply ignore it continue.
1072
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1073
0
                            }
1074
0
                            return Err(e).err_tip(|| format!("Could not open file {full_path:?}"));
1075
                        }
1076
                    };
1077
1078
2
                    if metadata.is_file() {
  Branch (1078:24): [Folded - Ignored]
  Branch (1078:24): [Folded - Ignored]
  Branch (1078:24): [True: 2, False: 0]
1079
                        return Ok(OutputType::File(
1080
2
                            upload_file(cas_store.as_pin(), &full_path, hasher, metadata)
1081
20
                                .await
1082
2
                                .map(|mut file_info| {
1083
2
                                    file_info.name_or_path = NameOrPath::Path(entry);
1084
2
                                    file_info
1085
2
                                })
1086
2
                                .err_tip(|| 
format!("Uploading file {full_path:?}")0
)
?0
,
1087
                        ));
1088
0
                    }
1089
0
                    metadata
1090
0
                };
1091
0
                if metadata.is_dir() {
  Branch (1091:20): [Folded - Ignored]
  Branch (1091:20): [Folded - Ignored]
  Branch (1091:20): [True: 0, False: 0]
1092
                    Ok(OutputType::Directory(
1093
0
                        upload_directory(cas_store.as_pin(), &full_path, work_directory, hasher)
1094
0
                            .and_then(|(root_dir, children)| async move {
1095
0
                                let tree = ProtoTree {
1096
0
                                    root: Some(root_dir),
1097
0
                                    children: children.into(),
1098
0
                                };
1099
0
                                let tree_digest = serialize_and_upload_message(
1100
0
                                    &tree,
1101
0
                                    cas_store.as_pin(),
1102
0
                                    &mut hasher.hasher(),
1103
0
                                )
1104
0
                                .await
1105
0
                                .err_tip(|| format!("While processing {entry}"))?;
1106
0
                                Ok(DirectoryInfo {
1107
0
                                    path: entry,
1108
0
                                    tree_digest,
1109
0
                                })
1110
0
                            })
1111
0
                            .await
1112
0
                            .err_tip(|| format!("Uploading directory {full_path:?}"))?,
1113
                    ))
1114
0
                } else if metadata.is_symlink() {
  Branch (1114:27): [Folded - Ignored]
  Branch (1114:27): [Folded - Ignored]
  Branch (1114:27): [True: 0, False: 0]
1115
0
                    let output_symlink = upload_symlink(&full_path, work_directory)
1116
0
                        .await
1117
0
                        .map(|mut symlink_info| {
1118
0
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1119
0
                            symlink_info
1120
0
                        })
1121
0
                        .err_tip(|| format!("Uploading symlink {full_path:?}"))?;
1122
0
                    match fs::metadata(&full_path).await {
1123
0
                        Ok(metadata) => {
1124
0
                            if metadata.is_dir() {
  Branch (1124:32): [Folded - Ignored]
  Branch (1124:32): [Folded - Ignored]
  Branch (1124:32): [True: 0, False: 0]
1125
0
                                return Ok(OutputType::DirectorySymlink(output_symlink));
1126
0
                            }
1127
0
                            // Note: If it's anything but directory we put it as a file symlink.
1128
0
                            return Ok(OutputType::FileSymlink(output_symlink));
1129
                        }
1130
0
                        Err(e) => {
1131
0
                            if e.code != Code::NotFound {
  Branch (1131:32): [Folded - Ignored]
  Branch (1131:32): [Folded - Ignored]
  Branch (1131:32): [True: 0, False: 0]
1132
0
                                return Err(e).err_tip(|| {
1133
0
                                    format!(
1134
0
                                        "While querying target symlink metadata for {full_path:?}"
1135
0
                                    )
1136
0
                                });
1137
0
                            }
1138
0
                            // If the file doesn't exist, we consider it a file. Even though the
1139
0
                            // file doesn't exist we still need to populate an entry.
1140
0
                            return Ok(OutputType::FileSymlink(output_symlink));
1141
                        }
1142
                    }
1143
                } else {
1144
0
                    Err(make_err!(
1145
0
                        Code::Internal,
1146
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1147
0
                    ))
1148
                }
1149
2
            });
1150
2
        }
1151
5
        let mut output_files = vec![];
1152
5
        let mut output_folders = vec![];
1153
5
        let mut output_directory_symlinks = vec![];
1154
5
        let mut output_file_symlinks = vec![];
1155
5
1156
5
        if execution_result.exit_code != 0 {
  Branch (1156:12): [Folded - Ignored]
  Branch (1156:12): [Folded - Ignored]
  Branch (1156:12): [True: 2, False: 3]
1157
            // Don't convert our stdout/stderr to strings unless we are need too.
1158
2
            if enabled!(Level::ERROR) {
1159
2
                let stdout = std::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1160
2
                let stderr = std::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1161
2
                event!(
1162
2
                    Level::ERROR,
1163
                    exit_code = ?execution_result.exit_code,
1164
2
                    stdout = ?stdout[..min(stdout.len(), 1000)],
1165
2
                    stderr = ?stderr[..min(stderr.len(), 1000)],
1166
2
                    "Command returned non-zero exit code",
1167
                );
1168
0
            }
1169
3
        }
1170
1171
5
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1172
5
            let data = execution_result.stdout;
1173
5
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1174
5
            cas_store
1175
5
                .update_oneshot(digest, data)
1176
53
                .await
1177
5
                .err_tip(|| 
"Uploading stdout"0
)
?0
;
1178
5
            Result::<DigestInfo, Error>::Ok(digest)
1179
5
        });
1180
5
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1181
5
            let data = execution_result.stderr;
1182
5
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1183
5
            cas_store
1184
5
                .update_oneshot(digest, data)
1185
56
                .await
1186
5
                .err_tip(|| 
"Uploading stdout"0
)
?0
;
1187
5
            Result::<DigestInfo, Error>::Ok(digest)
1188
5
        });
1189
1190
5
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1191
39
            while let Some(
output_type2
) =
output_path_futures.try_next()7
.await
?0
{
  Branch (1191:23): [Folded - Ignored]
  Branch (1191:23): [Folded - Ignored]
  Branch (1191:23): [True: 2, False: 5]
1192
2
                match output_type {
1193
2
                    OutputType::File(output_file) => output_files.push(output_file),
1194
0
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1195
0
                    OutputType::FileSymlink(output_symlink) => {
1196
0
                        output_file_symlinks.push(output_symlink);
1197
0
                    }
1198
0
                    OutputType::DirectorySymlink(output_symlink) => {
1199
0
                        output_directory_symlinks.push(output_symlink);
1200
0
                    }
1201
0
                    OutputType::None => { /* Safe to ignore */ }
1202
                }
1203
            }
1204
5
            Ok(())
1205
5
        });
1206
5
        drop(output_path_futures);
1207
5
        let (stdout_digest, stderr_digest) = match upload_result {
1208
5
            Ok((stdout_digest, stderr_digest, _)) => (stdout_digest, stderr_digest),
1209
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1210
        };
1211
1212
5
        execution_metadata.output_upload_completed_timestamp =
1213
5
            (self.running_actions_manager.callbacks.now_fn)();
1214
5
        output_files.sort_unstable_by(|a, b| 
a.name_or_path.cmp(&b.name_or_path)0
);
1215
5
        output_folders.sort_unstable_by(|a, b| 
a.path.cmp(&b.path)0
);
1216
5
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path.cmp(&b.name_or_path)0
);
1217
5
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path.cmp(&b.name_or_path)0
);
1218
5
        {
1219
5
            let mut state = self.state.lock();
1220
5
            execution_metadata.worker_completed_timestamp =
1221
5
                (self.running_actions_manager.callbacks.now_fn)();
1222
5
            state.action_result = Some(ActionResult {
1223
5
                output_files,
1224
5
                output_folders,
1225
5
                output_directory_symlinks,
1226
5
                output_file_symlinks,
1227
5
                exit_code: execution_result.exit_code,
1228
5
                stdout_digest,
1229
5
                stderr_digest,
1230
5
                execution_metadata,
1231
5
                server_logs: HashMap::default(), // TODO(allada) Not implemented.
1232
5
                error: state.error.clone(),
1233
5
                message: String::new(), // Will be filled in on cache_action_result if needed.
1234
5
            });
1235
5
        }
1236
5
        Ok(self)
1237
5
    }
1238
1239
5
    async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
1240
5
        let mut state = self.state.lock();
1241
5
        state
1242
5
            .action_result
1243
5
            .take()
1244
5
            .err_tip(|| 
"Expected action_result to exist in get_finished_result"0
)
1245
5
    }
1246
}
1247
1248
impl Drop for RunningActionImpl {
1249
7
    fn drop(&mut self) {
1250
7
        if self.did_cleanup.load(Ordering::Acquire) {
  Branch (1250:12): [True: 7, False: 0]
  Branch (1250:12): [Folded - Ignored]
1251
7
            return;
1252
0
        }
1253
0
        let operation_id = self.operation_id.clone();
1254
0
        event!(
1255
0
            Level::ERROR,
1256
            ?operation_id,
1257
0
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1258
        );
1259
0
        let running_actions_manager = self.running_actions_manager.clone();
1260
0
        let action_directory = self.action_directory.clone();
1261
0
        background_spawn!("running_action_impl_drop", async move {
1262
0
            let Err(err) =
  Branch (1262:17): [True: 0, False: 0]
  Branch (1262:17): [Folded - Ignored]
1263
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1264
            else {
1265
0
                return;
1266
            };
1267
0
            event!(
1268
0
                Level::ERROR,
1269
                ?operation_id,
1270
                ?action_directory,
1271
                ?err,
1272
0
                "Error cleaning up action"
1273
            );
1274
0
        });
1275
7
    }
1276
}
1277
1278
impl RunningAction for RunningActionImpl {
1279
0
    fn get_operation_id(&self) -> &OperationId {
1280
0
        &self.operation_id
1281
0
    }
1282
1283
7
    async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1284
7
        self.metrics()
1285
7
            .clone()
1286
7
            .prepare_action
1287
7
            .wrap(Self::inner_prepare_action(self))
1288
98
            .await
1289
7
    }
1290
1291
5
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error> 0
{
1292
5
        self.metrics()
1293
5
            .clone()
1294
5
            .execute
1295
5
            .wrap(Self::inner_execute(self))
1296
9
            .await
1297
5
    }
1298
1299
5
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1300
5
        self.metrics()
1301
5
            .clone()
1302
5
            .upload_results
1303
5
            .wrap(Self::inner_upload_results(self))
1304
56
            .await
1305
5
    }
1306
1307
7
    async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1308
7
        self.metrics()
1309
7
            .clone()
1310
7
            .cleanup
1311
7
            .wrap(async move {
1312
7
                let result = do_cleanup(
1313
7
                    &self.running_actions_manager,
1314
7
                    &self.operation_id,
1315
7
                    &self.action_directory,
1316
7
                )
1317
7
                .await;
1318
7
                self.did_cleanup.store(true, Ordering::Release);
1319
7
                result.map(move |()| self)
1320
7
            })
1321
7
            .await
1322
7
    }
1323
1324
5
    
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> 0
{
1325
5
        self.metrics()
1326
5
            .clone()
1327
5
            .get_finished_result
1328
5
            .wrap(Self::inner_get_finished_result(self))
1329
0
            .await
1330
5
    }
1331
1332
2
    fn get_work_directory(&self) -> &String {
1333
2
        &self.work_directory
1334
2
    }
1335
}
1336
1337
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1338
    type RunningAction: RunningAction;
1339
1340
    fn create_and_add_action(
1341
        self: &Arc<Self>,
1342
        worker_id: String,
1343
        start_execute: StartExecute,
1344
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1345
1346
    fn cache_action_result(
1347
        &self,
1348
        action_digest: DigestInfo,
1349
        action_result: &mut ActionResult,
1350
        hasher: DigestHasherFunc,
1351
    ) -> impl Future<Output = Result<(), Error>> + Send;
1352
1353
    fn complete_actions(
1354
        &self,
1355
        complete_msg: Arc<oneshot::Sender<()>>,
1356
    ) -> impl Future<Output = ()> + Send;
1357
1358
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1359
1360
    fn kill_operation(
1361
        &self,
1362
        operation_id: &OperationId,
1363
    ) -> impl Future<Output = Result<(), Error>> + Send;
1364
1365
    fn metrics(&self) -> &Arc<Metrics>;
1366
}
1367
1368
/// A function to get the current system time, used to allow mocking for tests
1369
type NowFn = fn() -> SystemTime;
1370
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1371
1372
/// Functions that may be injected for testing purposes, during standard control
1373
/// flows these are specified by the new function.
1374
pub struct Callbacks {
1375
    /// A function that gets the current time.
1376
    pub now_fn: NowFn,
1377
    /// A function that sleeps for a given Duration.
1378
    pub sleep_fn: SleepFn,
1379
}
1380
1381
/// The set of additional information for executing an action over and above
1382
/// those given in the ActionInfo passed to the worker.  This allows
1383
/// modification of the action for execution on this particular worker.  This
1384
/// may be used to run the action with a particular set of additional
1385
/// environment variables, or perhaps configure it to execute within a
1386
/// container.
1387
#[derive(Default)]
1388
pub struct ExecutionConfiguration {
1389
    /// If set, will be executed instead of the first argument passed in the
1390
    /// ActionInfo with all of the arguments in the ActionInfo passed as
1391
    /// arguments to this command.
1392
    pub entrypoint: Option<String>,
1393
    /// The only environment variables that will be specified when the command
1394
    /// executes other than those in the ActionInfo.  On Windows, SystemRoot
1395
    /// and PATH are also assigned (see inner_execute).
1396
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1397
}
1398
1399
struct UploadActionResults {
1400
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1401
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1402
    ac_store: Option<Store>,
1403
    historical_store: Store,
1404
    success_message_template: Template,
1405
    failure_message_template: Template,
1406
}
1407
1408
impl UploadActionResults {
1409
15
    fn new(
1410
15
        config: &UploadActionResultConfig,
1411
15
        ac_store: Option<Store>,
1412
15
        historical_store: Store,
1413
15
    ) -> Result<Self, Error> {
1414
15
        let upload_historical_results_strategy = config
1415
15
            .upload_historical_results_strategy
1416
15
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1417
8
        if !matches!(
  Branch (1417:12): [True: 8, False: 7]
  Branch (1417:12): [Folded - Ignored]
1418
15
            config.upload_ac_results_strategy,
1419
            UploadCacheResultsStrategy::never
1420
8
        ) && ac_store.is_none()
  Branch (1420:14): [True: 0, False: 8]
  Branch (1420:14): [Folded - Ignored]
1421
        {
1422
0
            return Err(make_input_err!(
1423
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1424
0
            ));
1425
15
        }
1426
15
        Ok(Self {
1427
15
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1428
15
            upload_historical_results_strategy,
1429
15
            ac_store,
1430
15
            historical_store,
1431
15
            success_message_template: Template::new(&config.success_message_template).map_err(
1432
15
                |e| {
1433
0
                    make_input_err!(
1434
0
                        "Could not convert success_message_template to rust template: {} : {e:?}",
1435
0
                        config.success_message_template
1436
0
                    )
1437
15
                },
1438
15
            )
?0
,
1439
15
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1440
15
                |e| {
1441
0
                    make_input_err!(
1442
0
                        "Could not convert failure_message_template to rust template: {} : {e:?}",
1443
0
                        config.success_message_template
1444
0
                    )
1445
15
                },
1446
15
            )
?0
,
1447
        })
1448
15
    }
1449
1450
0
    fn should_cache_result(
1451
0
        strategy: UploadCacheResultsStrategy,
1452
0
        action_result: &ActionResult,
1453
0
        treat_infra_error_as_failure: bool,
1454
0
    ) -> bool {
1455
0
        let mut did_fail = action_result.exit_code != 0;
1456
0
        if treat_infra_error_as_failure && action_result.error.is_some() {
  Branch (1456:12): [Folded - Ignored]
  Branch (1456:44): [Folded - Ignored]
1457
0
            did_fail = true;
1458
0
        }
1459
0
        match strategy {
1460
0
            UploadCacheResultsStrategy::success_only => !did_fail,
1461
0
            UploadCacheResultsStrategy::never => false,
1462
            // Never cache internal errors or timeouts.
1463
            UploadCacheResultsStrategy::everything => {
1464
0
                treat_infra_error_as_failure || action_result.error.is_none()
  Branch (1464:17): [Folded - Ignored]
1465
            }
1466
0
            UploadCacheResultsStrategy::failures_only => did_fail,
1467
        }
1468
0
    }
1469
1470
    /// Formats the message field in ExecuteResponse from the success_message_template or
1471
    /// failure_message_template config templates.
1472
5
    fn format_execute_response_message(
1473
5
        mut template_str: Template,
1474
5
        action_digest_info: DigestInfo,
1475
5
        maybe_historical_digest_info: Option<DigestInfo>,
1476
5
        hasher: DigestHasherFunc,
1477
5
    ) -> Result<String, Error> {
1478
5
        template_str.replace(
1479
5
            "digest_function",
1480
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1481
5
        );
1482
5
        template_str.replace(
1483
5
            "action_digest_hash",
1484
5
            action_digest_info.packed_hash().to_string(),
1485
5
        );
1486
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1487
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
  Branch (1487:16): [True: 3, False: 2]
  Branch (1487:16): [Folded - Ignored]
1488
3
            template_str.replace(
1489
3
                "historical_results_hash",
1490
3
                format!("{}", historical_digest_info.packed_hash()),
1491
3
            );
1492
3
            template_str.replace(
1493
3
                "historical_results_size",
1494
3
                historical_digest_info.size_bytes(),
1495
3
            );
1496
3
        } else {
1497
2
            template_str.replace("historical_results_hash", "");
1498
2
            template_str.replace("historical_results_size", "");
1499
2
        }
1500
5
        template_str
1501
5
            .text()
1502
5
            .map_err(|e| 
make_input_err!("Could not convert template to text: {e:?}")0
)
1503
5
    }
1504
1505
5
    async fn upload_ac_results(
1506
5
        &self,
1507
5
        action_digest: DigestInfo,
1508
5
        action_result: ProtoActionResult,
1509
5
        hasher: DigestHasherFunc,
1510
5
    ) -> Result<(), Error> {
1511
5
        let Some(ac_store) = self.ac_store.as_ref() else {
  Branch (1511:13): [Folded - Ignored]
  Branch (1511:13): [Folded - Ignored]
  Branch (1511:13): [True: 5, False: 0]
1512
0
            return Ok(());
1513
        };
1514
        // If we are a GrpcStore we shortcut here, as this is a special store.
1515
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
  Branch (1515:16): [Folded - Ignored]
  Branch (1515:16): [Folded - Ignored]
  Branch (1515:16): [True: 0, False: 5]
1516
0
            let update_action_request = UpdateActionResultRequest {
1517
0
                // This is populated by `update_action_result`.
1518
0
                instance_name: String::new(),
1519
0
                action_digest: Some(action_digest.into()),
1520
0
                action_result: Some(action_result),
1521
0
                results_cache_policy: None,
1522
0
                digest_function: hasher.proto_digest_func().into(),
1523
0
            };
1524
0
            return grpc_store
1525
0
                .update_action_result(Request::new(update_action_request))
1526
0
                .await
1527
0
                .map(|_| ())
1528
0
                .err_tip(|| "Caching ActionResult");
1529
5
        }
1530
5
1531
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1532
5
        action_result
1533
5
            .encode(&mut store_data)
1534
5
            .err_tip(|| 
"Encoding ActionResult for caching"0
)
?0
;
1535
1536
5
        ac_store
1537
5
            .update_oneshot(action_digest, store_data.split().freeze())
1538
0
            .await
1539
5
            .err_tip(|| 
"Caching ActionResult"0
)
1540
5
    }
1541
1542
3
    async fn upload_historical_results_with_message(
1543
3
        &self,
1544
3
        action_digest: DigestInfo,
1545
3
        execute_response: ExecuteResponse,
1546
3
        message_template: Template,
1547
3
        hasher: DigestHasherFunc,
1548
3
    ) -> Result<String, Error> {
1549
3
        let historical_digest_info = serialize_and_upload_message(
1550
3
            &HistoricalExecuteResponse {
1551
3
                action_digest: Some(action_digest.into()),
1552
3
                execute_response: Some(execute_response.clone()),
1553
3
            },
1554
3
            self.historical_store.as_pin(),
1555
3
            &mut hasher.hasher(),
1556
3
        )
1557
11
        .await
1558
3
        .err_tip(|| 
format!("Caching HistoricalExecuteResponse for digest: {action_digest}")0
)
?0
;
1559
1560
3
        Self::format_execute_response_message(
1561
3
            message_template,
1562
3
            action_digest,
1563
3
            Some(historical_digest_info),
1564
3
            hasher,
1565
3
        )
1566
3
        .err_tip(|| 
"Could not format message in upload_historical_results_with_message"0
)
1567
3
    }
1568
1569
0
    async fn cache_action_result(
1570
0
        &self,
1571
0
        action_info: DigestInfo,
1572
0
        action_result: &mut ActionResult,
1573
0
        hasher: DigestHasherFunc,
1574
6
    ) -> Result<(), Error> {
1575
6
        let should_upload_historical_results =
1576
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1577
6
        let should_upload_ac_results =
1578
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1579
6
        // Shortcut so we don't need to convert to proto if not needed.
1580
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
  Branch (1580:12): [Folded - Ignored]
  Branch (1580:41): [Folded - Ignored]
  Branch (1580:12): [Folded - Ignored]
  Branch (1580:41): [Folded - Ignored]
  Branch (1580:12): [True: 1, False: 5]
  Branch (1580:41): [True: 1, False: 0]
1581
1
            return Ok(());
1582
5
        }
1583
5
1584
5
        let mut execute_response = to_execute_response(action_result.clone());
1585
1586
        // In theory exit code should always be != 0 if there's an error, but for safety we
1587
        // catch both.
1588
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error.is_none()4
{
  Branch (1588:35): [Folded - Ignored]
  Branch (1588:67): [Folded - Ignored]
  Branch (1588:35): [Folded - Ignored]
  Branch (1588:67): [Folded - Ignored]
  Branch (1588:35): [True: 4, False: 1]
  Branch (1588:67): [True: 3, False: 1]
1589
3
            self.success_message_template.clone()
1590
        } else {
1591
2
            self.failure_message_template.clone()
1592
        };
1593
1594
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
  Branch (1594:64): [Folded - Ignored]
  Branch (1594:64): [Folded - Ignored]
  Branch (1594:64): [True: 3, False: 2]
1595
3
            let maybe_message = self
1596
3
                .upload_historical_results_with_message(
1597
3
                    action_info,
1598
3
                    execute_response.clone(),
1599
3
                    message_template,
1600
3
                    hasher,
1601
3
                )
1602
11
                .await;
1603
3
            match maybe_message {
1604
3
                Ok(message) => {
1605
3
                    action_result.message.clone_from(&message);
1606
3
                    execute_response.message = message;
1607
3
                    Ok(())
1608
                }
1609
0
                Err(e) => Result::<(), Error>::Err(e),
1610
            }
1611
        } else {
1612
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1613
            {
1614
2
                Ok(message) => {
1615
2
                    action_result.message.clone_from(&message);
1616
2
                    execute_response.message = message;
1617
2
                    Ok(())
1618
                }
1619
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1620
            }
1621
        };
1622
1623
        // Note: Done in this order because we assume most results will succed and most configs will
1624
        // either always upload upload historical results or only upload on filure. In which case
1625
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1626
5
        let ac_upload_results = if should_upload_ac_results {
  Branch (1626:36): [Folded - Ignored]
  Branch (1626:36): [Folded - Ignored]
  Branch (1626:36): [True: 5, False: 0]
1627
5
            self.upload_ac_results(
1628
5
                action_info,
1629
5
                execute_response
1630
5
                    .result
1631
5
                    .err_tip(|| 
"No result set in cache_action_result"0
)
?0
,
1632
5
                hasher,
1633
            )
1634
0
            .await
1635
        } else {
1636
0
            Ok(())
1637
        };
1638
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1639
6
    }
1640
}
1641
1642
pub struct RunningActionsManagerArgs<'a> {
1643
    pub root_action_directory: String,
1644
    pub execution_configuration: ExecutionConfiguration,
1645
    pub cas_store: Arc<FastSlowStore>,
1646
    pub ac_store: Option<Store>,
1647
    pub historical_store: Store,
1648
    pub upload_action_result_config: &'a UploadActionResultConfig,
1649
    pub max_action_timeout: Duration,
1650
    pub timeout_handled_externally: bool,
1651
}
1652
1653
/// Holds state info about what is being executed and the interface for interacting
1654
/// with actions while they are running.
1655
pub struct RunningActionsManagerImpl {
1656
    root_action_directory: String,
1657
    execution_configuration: ExecutionConfiguration,
1658
    cas_store: Arc<FastSlowStore>,
1659
    filesystem_store: Arc<FilesystemStore>,
1660
    upload_action_results: UploadActionResults,
1661
    max_action_timeout: Duration,
1662
    timeout_handled_externally: bool,
1663
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
1664
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
1665
    // Notify does not support.
1666
    action_done_tx: watch::Sender<()>,
1667
    callbacks: Callbacks,
1668
    metrics: Arc<Metrics>,
1669
}
1670
1671
impl RunningActionsManagerImpl {
1672
15
    pub fn new_with_callbacks(
1673
15
        args: RunningActionsManagerArgs<'_>,
1674
15
        callbacks: Callbacks,
1675
15
    ) -> Result<Self, Error> {
1676
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
1677
15
        let filesystem_store = args
1678
15
            .cas_store
1679
15
            .fast_store()
1680
15
            .downcast_ref::<FilesystemStore>(None)
1681
15
            .err_tip(|| {
1682
0
                "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl"
1683
15
            })
?0
1684
15
            .get_arc()
1685
15
            .err_tip(|| 
"FilesystemStore's internal Arc was lost"0
)
?0
;
1686
15
        let (action_done_tx, _) = watch::channel(());
1687
15
        Ok(Self {
1688
15
            root_action_directory: args.root_action_directory,
1689
15
            execution_configuration: args.execution_configuration,
1690
15
            cas_store: args.cas_store,
1691
15
            filesystem_store,
1692
15
            upload_action_results: UploadActionResults::new(
1693
15
                args.upload_action_result_config,
1694
15
                args.ac_store,
1695
15
                args.historical_store,
1696
15
            )
1697
15
            .err_tip(|| 
"During RunningActionsManagerImpl construction"0
)
?0
,
1698
15
            max_action_timeout: args.max_action_timeout,
1699
15
            timeout_handled_externally: args.timeout_handled_externally,
1700
15
            running_actions: Mutex::new(HashMap::new()),
1701
15
            action_done_tx,
1702
15
            callbacks,
1703
15
            metrics: Arc::new(Metrics::default()),
1704
        })
1705
15
    }
1706
1707
9
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
1708
9
        Self::new_with_callbacks(
1709
9
            args,
1710
9
            Callbacks {
1711
9
                now_fn: SystemTime::now,
1712
9
                sleep_fn: |duration| 
Box::pin(tokio::time::sleep(duration))1
,
1713
9
            },
1714
9
        )
1715
9
    }
1716
1717
0
    fn make_action_directory<'a>(
1718
0
        &'a self,
1719
0
        operation_id: &'a OperationId,
1720
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
1721
7
        self.metrics.make_action_directory.wrap(async move {
1722
7
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
1723
7
            fs::create_dir(&action_directory)
1724
7
                .await
1725
7
                .err_tip(|| 
format!("Error creating action directory {action_directory}")0
)
?0
;
1726
7
            Ok(action_directory)
1727
7
        })
1728
0
    }
1729
1730
7
    fn create_action_info(
1731
7
        &self,
1732
7
        start_execute: StartExecute,
1733
7
        queued_timestamp: SystemTime,
1734
7
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
1735
7
        self.metrics.create_action_info.wrap(async move {
1736
7
            let execute_request = start_execute
1737
7
                .execute_request
1738
7
                .err_tip(|| 
"Expected execute_request to exist in StartExecute"0
)
?0
;
1739
7
            let action_digest: DigestInfo = execute_request
1740
7
                .action_digest
1741
7
                .clone()
1742
7
                .err_tip(|| 
"Expected action_digest to exist on StartExecute"0
)
?0
1743
7
                .try_into()
?0
;
1744
7
            let load_start_timestamp = (self.callbacks.now_fn)();
1745
7
            let action =
1746
7
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
1747
56
                    .await
1748
7
                    .err_tip(|| 
"During start_action"0
)
?0
;
1749
7
            let action_info = ActionInfo::try_from_action_and_execute_request(
1750
7
                execute_request,
1751
7
                action,
1752
7
                load_start_timestamp,
1753
7
                queued_timestamp,
1754
7
            )
1755
7
            .err_tip(|| 
"Could not create ActionInfo in create_and_add_action()"0
)
?0
;
1756
7
            Ok(action_info)
1757
7
        })
1758
7
    }
1759
1760
7
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
1761
7
        let mut running_actions = self.running_actions.lock();
1762
7
        let result = running_actions.remove(operation_id).err_tip(|| {
1763
0
            format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl")
1764
7
        });
1765
7
        // No need to copy anything, we just are telling the receivers an event happened.
1766
7
        self.action_done_tx.send_modify(|_| {});
1767
7
        result.map(|_| ())
1768
7
    }
1769
1770
    // Note: We do not capture metrics on this call, only `.kill_all()`.
1771
    // Important: When the future returns the process may still be running.
1772
0
    async fn kill_operation(action: Arc<RunningActionImpl>) {
1773
0
        event!(
1774
0
            Level::WARN,
1775
0
            operation_id = ?action.operation_id,
1776
0
            "Sending kill to running operation",
1777
        );
1778
0
        let kill_channel_tx = {
1779
0
            let mut action_state = action.state.lock();
1780
0
            action_state.kill_channel_tx.take()
1781
        };
1782
0
        if let Some(kill_channel_tx) = kill_channel_tx {
  Branch (1782:16): [Folded - Ignored]
  Branch (1782:16): [Folded - Ignored]
  Branch (1782:16): [True: 0, False: 0]
1783
0
            if kill_channel_tx.send(()).is_err() {
  Branch (1783:16): [Folded - Ignored]
  Branch (1783:16): [Folded - Ignored]
  Branch (1783:16): [True: 0, False: 0]
1784
0
                event!(
1785
0
                    Level::ERROR,
1786
0
                    operation_id = ?action.operation_id,
1787
0
                    "Error sending kill to running operation",
1788
                );
1789
0
            }
1790
0
        }
1791
0
    }
1792
}
1793
1794
impl RunningActionsManager for RunningActionsManagerImpl {
1795
    type RunningAction = RunningActionImpl;
1796
1797
7
    async fn create_and_add_action(
1798
7
        self: &Arc<Self>,
1799
7
        worker_id: String,
1800
7
        start_execute: StartExecute,
1801
7
    ) -> Result<Arc<RunningActionImpl>, Error> {
1802
7
        self.metrics
1803
7
            .create_and_add_action
1804
7
            .wrap(async move {
1805
7
                let queued_timestamp = start_execute
1806
7
                    .queued_timestamp
1807
7
                    .and_then(|time| 
time.try_into().ok()3
)
1808
7
                    .unwrap_or(SystemTime::UNIX_EPOCH);
1809
7
                let operation_id = start_execute
1810
7
                    .operation_id.as_str().into();
1811
56
                let 
action_info7
=
self.create_action_info(start_execute, queued_timestamp)7
.await
?0
;
1812
7
                event!(
1813
7
                    Level::INFO,
1814
                    ?action_info,
1815
0
                    "Worker received action",
1816
                );
1817
7
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
1818
7
                let execution_metadata = ExecutionMetadata {
1819
7
                    worker: worker_id,
1820
7
                    queued_timestamp: action_info.insert_timestamp,
1821
7
                    worker_start_timestamp: action_info.load_timestamp,
1822
7
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
1823
7
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
1824
7
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
1825
7
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
1826
7
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
1827
7
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
1828
7
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
1829
7
                };
1830
7
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally0
{
  Branch (1830:34): [Folded - Ignored]
  Branch (1830:67): [Folded - Ignored]
  Branch (1830:34): [Folded - Ignored]
  Branch (1830:67): [Folded - Ignored]
  Branch (1830:34): [True: 7, False: 0]
  Branch (1830:67): [True: 0, False: 0]
1831
7
                    self.max_action_timeout
1832
                } else {
1833
0
                    action_info.timeout
1834
                };
1835
7
                if timeout > self.max_action_timeout {
  Branch (1835:20): [Folded - Ignored]
  Branch (1835:20): [Folded - Ignored]
  Branch (1835:20): [True: 0, False: 7]
1836
0
                    return Err(make_err!(
1837
0
                        Code::InvalidArgument,
1838
0
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
1839
0
                        timeout.as_secs_f32(),
1840
0
                        self.max_action_timeout.as_secs_f32()
1841
0
                    ));
1842
7
                }
1843
7
                let running_action = Arc::new(RunningActionImpl::new(
1844
7
                    execution_metadata,
1845
7
                    operation_id.clone(),
1846
7
                    action_directory,
1847
7
                    action_info,
1848
7
                    timeout,
1849
7
                    self.clone(),
1850
7
                ));
1851
7
                {
1852
7
                    let mut running_actions = self.running_actions.lock();
1853
7
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
1854
7
                }
1855
7
                Ok(running_action)
1856
7
            })
1857
63
            .await
1858
7
    }
1859
1860
0
    async fn cache_action_result(
1861
0
        &self,
1862
0
        action_info: DigestInfo,
1863
0
        action_result: &mut ActionResult,
1864
0
        hasher: DigestHasherFunc,
1865
6
    ) -> Result<(), Error> {
1866
6
        self.metrics
1867
6
            .cache_action_result
1868
6
            .wrap(self.upload_action_results.cache_action_result(
1869
6
                action_info,
1870
6
                action_result,
1871
6
                hasher,
1872
6
            ))
1873
11
            .await
1874
6
    }
1875
1876
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
1877
0
        let running_action = {
1878
0
            let running_actions = self.running_actions.lock();
1879
0
            running_actions
1880
0
                .get(operation_id)
1881
0
                .and_then(Weak::upgrade)
1882
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
1883
        };
1884
0
        Self::kill_operation(running_action).await;
1885
0
        Ok(())
1886
0
    }
1887
1888
    // Waits for all running actions to complete and signals completion.
1889
    // Use the Arc<oneshot::Sender<()>> to signal the completion of the actions
1890
    // Dropping the sender automatically notifies the process to terminate.
1891
0
    async fn complete_actions(&self, _complete_msg: Arc<oneshot::Sender<()>>) {
1892
0
        let _ = self
1893
0
            .action_done_tx
1894
0
            .subscribe()
1895
0
            .wait_for(|_| self.running_actions.lock().is_empty())
1896
0
            .await;
1897
0
    }
1898
1899
    // Note: When the future returns the process should be fully killed and cleaned up.
1900
0
    async fn kill_all(&self) {
1901
0
        self.metrics
1902
0
            .kill_all
1903
0
            .wrap_no_capture_result(async move {
1904
0
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
1905
0
                    let running_actions = self.running_actions.lock();
1906
0
                    running_actions
1907
0
                        .iter()
1908
0
                        .filter_map(|(_operation_id, action)| action.upgrade())
1909
0
                        .collect()
1910
                };
1911
0
                for action in kill_operations {
1912
0
                    Self::kill_operation(action).await;
1913
                }
1914
0
            })
1915
0
            .await;
1916
        // Ignore error. If error happens it means there's no sender, which is not a problem.
1917
        // Note: Sanity check this API will always check current value then future values:
1918
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
1919
0
        let _ = self
1920
0
            .action_done_tx
1921
0
            .subscribe()
1922
0
            .wait_for(|_| self.running_actions.lock().is_empty())
1923
0
            .await;
1924
0
    }
1925
1926
    #[inline]
1927
2
    fn metrics(&self) -> &Arc<Metrics> {
1928
2
        &self.metrics
1929
2
    }
1930
}
1931
1932
0
#[derive(Default, MetricsComponent)]
1933
pub struct Metrics {
1934
    #[metric(help = "Stats about the create_and_add_action command.")]
1935
    create_and_add_action: AsyncCounterWrapper,
1936
    #[metric(help = "Stats about the cache_action_result command.")]
1937
    cache_action_result: AsyncCounterWrapper,
1938
    #[metric(help = "Stats about the kill_all command.")]
1939
    kill_all: AsyncCounterWrapper,
1940
    #[metric(help = "Stats about the create_action_info command.")]
1941
    create_action_info: AsyncCounterWrapper,
1942
    #[metric(help = "Stats about the make_work_directory command.")]
1943
    make_action_directory: AsyncCounterWrapper,
1944
    #[metric(help = "Stats about the prepare_action command.")]
1945
    prepare_action: AsyncCounterWrapper,
1946
    #[metric(help = "Stats about the execute command.")]
1947
    execute: AsyncCounterWrapper,
1948
    #[metric(help = "Stats about the upload_results command.")]
1949
    upload_results: AsyncCounterWrapper,
1950
    #[metric(help = "Stats about the cleanup command.")]
1951
    cleanup: AsyncCounterWrapper,
1952
    #[metric(help = "Stats about the get_finished_result command.")]
1953
    get_finished_result: AsyncCounterWrapper,
1954
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
1955
    get_proto_command_from_store: AsyncCounterWrapper,
1956
    #[metric(help = "Stats about the download_to_directory command.")]
1957
    download_to_directory: AsyncCounterWrapper,
1958
    #[metric(help = "Stats about the prepare_output_files command.")]
1959
    prepare_output_files: AsyncCounterWrapper,
1960
    #[metric(help = "Stats about the prepare_output_paths command.")]
1961
    prepare_output_paths: AsyncCounterWrapper,
1962
    #[metric(help = "Stats about the child_process command.")]
1963
    child_process: AsyncCounterWrapper,
1964
    #[metric(help = "Stats about the child_process_success_error_code command.")]
1965
    child_process_success_error_code: CounterWithTime,
1966
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
1967
    child_process_failure_error_code: CounterWithTime,
1968
    #[metric(help = "Total time spent uploading stdout.")]
1969
    upload_stdout: AsyncCounterWrapper,
1970
    #[metric(help = "Total time spent uploading stderr.")]
1971
    upload_stderr: AsyncCounterWrapper,
1972
    #[metric(help = "Total number of task timeouts.")]
1973
    task_timeouts: CounterWithTime,
1974
}