Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/running_actions_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::min;
16
use core::convert::Into;
17
use core::fmt::Debug;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicBool, Ordering};
20
use core::time::Duration;
21
use std::borrow::Cow;
22
use std::collections::vec_deque::VecDeque;
23
use std::collections::{HashMap, HashSet};
24
use std::env;
25
use std::ffi::{OsStr, OsString};
26
#[cfg(target_family = "unix")]
27
use std::fs::Permissions;
28
#[cfg(target_family = "unix")]
29
use std::os::unix::fs::{MetadataExt, PermissionsExt};
30
use std::path::{Path, PathBuf};
31
use std::process::Stdio;
32
use std::sync::{Arc, Weak};
33
use std::time::SystemTime;
34
35
use bytes::{Bytes, BytesMut};
36
use filetime::{FileTime, set_file_mtime};
37
use formatx::Template;
38
use futures::future::{
39
    BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,
40
};
41
use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
42
use nativelink_config::cas_server::{
43
    EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,
44
};
45
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
46
use nativelink_metric::MetricsComponent;
47
use nativelink_proto::build::bazel::remote::execution::v2::{
48
    Action, ActionResult as ProtoActionResult, Command as ProtoCommand,
49
    Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,
50
    Tree as ProtoTree, UpdateActionResultRequest,
51
};
52
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
53
    HistoricalExecuteResponse, StartExecute,
54
};
55
use nativelink_store::ac_utils::{
56
    ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,
57
};
58
use nativelink_store::cas_utils::is_zero_digest;
59
use nativelink_store::fast_slow_store::FastSlowStore;
60
use nativelink_store::filesystem_store::{FileEntry, FilesystemStore};
61
use nativelink_store::grpc_store::GrpcStore;
62
use nativelink_util::action_messages::{
63
    ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,
64
    SymlinkInfo, to_execute_response,
65
};
66
use nativelink_util::common::{DigestInfo, fs};
67
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
68
use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime};
69
use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo};
70
use nativelink_util::{background_spawn, spawn, spawn_blocking};
71
use parking_lot::Mutex;
72
use prost::Message;
73
use relative_path::RelativePath;
74
use scopeguard::{ScopeGuard, guard};
75
use serde::Deserialize;
76
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
77
use tokio::process;
78
use tokio::sync::{Notify, oneshot, watch};
79
use tokio::time::Instant;
80
use tokio_stream::wrappers::ReadDirStream;
81
use tonic::Request;
82
use tracing::{debug, error, info, trace, warn};
83
use uuid::Uuid;
84
85
/// For simplicity we use a fixed exit code for cases when our program is terminated
86
/// due to a signal.
87
const EXIT_CODE_FOR_SIGNAL: i32 = 9;
88
89
/// Default strategy for uploading historical results.
90
/// Note: If this value changes the config documentation
91
/// should reflect it.
92
const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =
93
    UploadCacheResultsStrategy::FailuresOnly;
94
95
/// Valid string reasons for a failure.
96
/// Note: If these change, the documentation should be updated.
97
#[derive(Debug, Deserialize)]
98
#[serde(rename_all = "snake_case")]
99
enum SideChannelFailureReason {
100
    /// Task should be considered timed out.
101
    Timeout,
102
}
103
104
/// This represents the json data that can be passed from the running process
105
/// to the parent via the `SideChannelFile`. See:
106
/// `config::EnvironmentSource::sidechannelfile` for more details.
107
/// Note: Any fields added here must be added to the documentation.
108
#[derive(Debug, Deserialize, Default)]
109
struct SideChannelInfo {
110
    /// If the task should be considered a failure and why.
111
    failure: Option<SideChannelFailureReason>,
112
}
113
114
/// Aggressively download the digests of files and make a local folder from it. This function
115
/// will spawn unbounded number of futures to try and get these downloaded. The store itself
116
/// should be rate limited if spawning too many requests at once is an issue.
117
/// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for
118
/// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will
119
/// assume the `FilesystemStore` has the file available immediately after and hardlink the file
120
/// to a new location.
121
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
122
// of the future. So we need to force this function to return a dynamic future instead.
123
// see: https://github.com/rust-lang/rust/issues/78649
124
25
pub fn download_to_directory<'a>(
125
25
    cas_store: &'a FastSlowStore,
126
25
    filesystem_store: Pin<&'a FilesystemStore>,
127
25
    digest: &'a DigestInfo,
128
25
    current_directory: &'a str,
129
25
) -> BoxFuture<'a, Result<(), Error>> {
130
25
    async move {
131
25
        let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())
132
25
            .await
133
25
            .err_tip(|| "Converting digest to Directory")
?0
;
134
25
        let mut futures = FuturesUnordered::new();
135
136
25
        for 
file4
in directory.files {
137
4
            let digest: DigestInfo = file
138
4
                .digest
139
4
                .err_tip(|| "Expected Digest to exist in Directory::file::digest")
?0
140
4
                .try_into()
141
4
                .err_tip(|| "In Directory::file::digest")
?0
;
142
4
            let dest = format!("{}/{}", current_directory, file.name);
143
4
            let (mtime, mut unix_mode) = match file.node_properties {
144
1
                Some(properties) => (properties.mtime, properties.unix_mode),
145
3
                None => (None, None),
146
            };
147
            #[cfg_attr(target_family = "windows", allow(unused_assignments))]
148
4
            if file.is_executable {
149
1
                unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);
150
3
            }
151
4
            futures.push(
152
4
                cas_store
153
4
                    .populate_fast_store(digest.into())
154
4
                    .and_then(move |()| async move {
155
4
                        if is_zero_digest(digest) {
156
0
                            let mut file_slot = fs::create_file(&dest).await?;
157
0
                            file_slot.write_all(&[]).await?;
158
                        }
159
                        else {
160
4
                            let file_entry = filesystem_store
161
4
                                .get_file_entry_for_digest(&digest)
162
4
                                .await
163
4
                                .err_tip(|| "During hard link")
?0
;
164
                            // TODO: add a test for #2051: deadlock with large number of files
165
8
                            let 
src_path4
=
file_entry4
.
get_file_path_locked4
(|src| async move
{4
Ok(PathBuf::from(src))4
}).
await4
?0
;
166
4
                            fs::hard_link(&src_path, &dest)
167
4
                                .await
168
4
                                .map_err(|e| 
{0
169
0
                                    if e.code == Code::NotFound {
170
0
                                        e.append(
171
0
                                            format!(
172
                                            "Could not make hardlink to {dest}, file was likely evicted from cache.\n\
173
                                            This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\
174
                                            To fix this issue:\n\
175
                                            1. Increase the 'max_bytes' value in your filesystem store configuration\n\
176
                                            2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\
177
                                            3. The setting is typically found in your nativelink.json config under:\n\
178
                                            stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\
179
                                            4. Restart NativeLink after making the change\n\n\
180
                                            If this error persists after increasing max_bytes several times, please report at:\n\
181
                                            https://github.com/TraceMachina/nativelink/issues\n\
182
                                            Include your config file and both server and client logs to help us assist you."
183
                                        ))
184
                                    } else {
185
0
                                        e.append(format!("Could not make hardlink to {dest}"))
186
                                    }
187
0
                                })?;
188
                            }
189
                        #[cfg(target_family = "unix")]
190
4
                        if let Some(
unix_mode1
) = unix_mode {
191
1
                            fs::set_permissions(&dest, Permissions::from_mode(unix_mode))
192
1
                                .await
193
1
                                .err_tip(|| 
{0
194
0
                                    format!(
195
                                        "Could not set unix mode in download_to_directory {dest}"
196
                                    )
197
0
                                })?;
198
3
                        }
199
4
                        if let Some(
mtime1
) = mtime {
200
1
                            spawn_blocking!("download_to_directory_set_mtime", move || {
201
1
                                set_file_mtime(
202
1
                                    &dest,
203
1
                                    FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),
204
                                )
205
1
                                .err_tip(|| 
{0
206
0
                                    format!("Failed to set mtime in download_to_directory {dest}")
207
0
                                })
208
1
                            })
209
1
                            .await
210
1
                            .err_tip(
211
                                || "Failed to launch spawn_blocking in download_to_directory",
212
0
                            )??;
213
3
                        }
214
4
                        Ok(())
215
8
                    })
216
4
                    .map_err(move |e| 
e0
.
append0
(
format!0
("for digest {digest}")))
217
4
                    .boxed(),
218
            );
219
        }
220
221
25
        for 
directory7
in directory.directories {
222
7
            let digest: DigestInfo = directory
223
7
                .digest
224
7
                .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
225
7
                .try_into()
226
7
                .err_tip(|| "In Directory::file::digest")
?0
;
227
7
            let new_directory_path = format!("{}/{}", current_directory, directory.name);
228
7
            futures.push(
229
7
                async move {
230
7
                    fs::create_dir(&new_directory_path)
231
7
                        .await
232
7
                        .err_tip(|| 
format!0
("Could not create directory {new_directory_path}"))
?0
;
233
7
                    download_to_directory(
234
7
                        cas_store,
235
7
                        filesystem_store,
236
7
                        &digest,
237
7
                        &new_directory_path,
238
7
                    )
239
7
                    .await
240
7
                    .err_tip(|| 
format!0
("in download_to_directory : {new_directory_path}"))
?0
;
241
7
                    Ok(())
242
7
                }
243
7
                .boxed(),
244
            );
245
        }
246
247
        #[cfg(target_family = "unix")]
248
25
        for 
symlink_node1
in directory.symlinks {
249
1
            let dest = format!("{}/{}", current_directory, symlink_node.name);
250
1
            futures.push(
251
1
                async move {
252
1
                    fs::symlink(&symlink_node.target, &dest).await.err_tip(|| 
{0
253
0
                        format!(
254
                            "Could not create symlink {} -> {}",
255
                            symlink_node.target, dest
256
                        )
257
0
                    })?;
258
1
                    Ok(())
259
1
                }
260
1
                .boxed(),
261
            );
262
        }
263
264
37
        while futures.try_next().await
?0
.is_some()
{}12
265
25
        Ok(())
266
25
    }
267
25
    .boxed()
268
25
}
269
270
/// Prepares action inputs by first trying the directory cache (if available),
271
/// then falling back to traditional `download_to_directory`.
272
///
273
/// This provides a significant performance improvement for repeated builds
274
/// with the same input directories.
275
0
pub async fn prepare_action_inputs(
276
0
    directory_cache: &Option<Arc<crate::directory_cache::DirectoryCache>>,
277
0
    cas_store: &FastSlowStore,
278
0
    filesystem_store: Pin<&FilesystemStore>,
279
0
    digest: &DigestInfo,
280
0
    work_directory: &str,
281
15
) -> Result<(), Error> {
282
    // Try cache first if available
283
15
    if let Some(
cache0
) = directory_cache {
284
0
        match cache
285
0
            .get_or_create(*digest, Path::new(work_directory))
286
0
            .await
287
        {
288
0
            Ok(cache_hit) => {
289
0
                trace!(
290
                    ?digest,
291
                    work_directory, cache_hit, "Successfully prepared inputs via directory cache"
292
                );
293
0
                return Ok(());
294
            }
295
0
            Err(e) => {
296
0
                warn!(
297
                    ?digest,
298
                    ?e,
299
                    "Directory cache failed, falling back to traditional download"
300
                );
301
                // Fall through to traditional path
302
            }
303
        }
304
15
    }
305
306
    // Traditional path (cache disabled or failed)
307
15
    download_to_directory(cas_store, filesystem_store, digest, work_directory).await
308
15
}
309
310
#[cfg(target_family = "windows")]
311
fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool {
312
    static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];
313
    EXECUTABLE_EXTENSIONS
314
        .iter()
315
        .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))
316
}
317
318
#[cfg(target_family = "unix")]
319
7
fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool {
320
7
    (metadata.mode() & 0o111) != 0
321
7
}
322
323
type DigestUploader = Arc<tokio::sync::OnceCell<()>>;
324
325
7
async fn upload_file(
326
7
    cas_store: Pin<&impl StoreLike>,
327
7
    full_path: impl AsRef<Path> + Debug + Send + Sync,
328
7
    hasher: DigestHasherFunc,
329
7
    metadata: std::fs::Metadata,
330
7
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
331
7
) -> Result<FileInfo, Error> {
332
7
    let is_executable = is_executable(&metadata, &full_path);
333
7
    let file_size = metadata.len();
334
7
    let file = fs::open_file(&full_path, 0, u64::MAX)
335
7
        .await
336
7
        .err_tip(|| 
format!0
("Could not open file {full_path:?}"))
?0
;
337
338
7
    let (digest, mut file) = hasher
339
7
        .hasher()
340
7
        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
341
7
        .await
342
7
        .err_tip(|| 
format!0
("Failed to hash file in digest_for_file failed for {full_path:?}"))
?0
;
343
344
7
    let digest_uploader = match digest_uploaders.lock().entry(digest) {
345
0
        std::collections::hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
346
7
        std::collections::hash_map::Entry::Vacant(vacant_entry) => vacant_entry
347
7
            .insert(Arc::new(tokio::sync::OnceCell::new()))
348
7
            .clone(),
349
    };
350
351
    // Only upload a file with a given hash once.  The file may exist multiple
352
    // times in the output with different names.
353
7
    digest_uploader
354
14
        .
get_or_try_init7
(async || {
355
            // Only upload if the digest doesn't already exist, this should be
356
            // a much cheaper operation than an upload.
357
7
            let cas_store = cas_store.as_store_driver_pin();
358
7
            let store_key: nativelink_util::store_trait::StoreKey<'_> = digest.into();
359
7
            let has_start = std::time::Instant::now();
360
7
            if cas_store
361
7
                .has(store_key.borrow())
362
7
                .await
363
7
                .is_ok_and(|result| result.is_some())
364
            {
365
2
                trace!(
366
                    ?digest,
367
2
                    has_elapsed_ms = has_start.elapsed().as_millis(),
368
                    "upload_file: digest already exists in CAS, skipping upload",
369
                );
370
2
                return Ok(());
371
5
            }
372
5
            trace!(
373
                ?digest,
374
5
                has_elapsed_ms = has_start.elapsed().as_millis(),
375
5
                file_size = digest.size_bytes(),
376
                "upload_file: digest not in CAS, starting upload",
377
            );
378
379
5
            file.rewind().await.err_tip(|| "Could not rewind file")
?0
;
380
381
            // Note: For unknown reasons we appear to be hitting:
382
            // https://github.com/rust-lang/rust/issues/92096
383
            // or a similar issue if we try to use the non-store driver function, so we
384
            // are using the store driver function here.
385
5
            let store_key_for_upload = store_key.clone();
386
5
            let file_upload_start = std::time::Instant::now();
387
5
            let upload_result = cas_store
388
5
                .update_with_whole_file(
389
5
                    store_key_for_upload,
390
5
                    full_path.as_ref().into(),
391
5
                    file,
392
5
                    UploadSizeInfo::ExactSize(digest.size_bytes()),
393
5
                )
394
5
                .await
395
5
                .map(|_slot| ());
396
5
            trace!(
397
                ?digest,
398
5
                upload_elapsed_ms = file_upload_start.elapsed().as_millis(),
399
5
                success = upload_result.is_ok(),
400
                "upload_file: update_with_whole_file completed",
401
            );
402
403
5
            match upload_result {
404
5
                Ok(()) => Ok(()),
405
0
                Err(err) => {
406
                    // Output uploads run concurrently and may overlap (e.g. a file is listed
407
                    // both as an output file and inside an output directory). When another
408
                    // upload has already moved the file into CAS, this update can fail with
409
                    // NotFound even though the digest is now present. Per the RE spec, missing
410
                    // outputs should be ignored, so treat this as success if the digest exists.
411
0
                    if err.code == Code::NotFound
412
0
                        && cas_store
413
0
                            .has(store_key.borrow())
414
0
                            .await
415
0
                            .is_ok_and(|result| result.is_some())
416
                    {
417
0
                        Ok(())
418
                    } else {
419
0
                        Err(err)
420
                    }
421
                }
422
            }
423
14
        })
424
7
        .await
425
7
        .err_tip(|| 
format!0
("for {full_path:?}"))
?0
;
426
427
7
    let name = full_path
428
7
        .as_ref()
429
7
        .file_name()
430
7
        .err_tip(|| 
format!0
("Expected file_name to exist on {full_path:?}"))
?0
431
7
        .to_str()
432
7
        .err_tip(|| 
{0
433
0
            make_err!(
434
0
                Code::Internal,
435
                "Could not convert {:?} to string",
436
                full_path
437
            )
438
0
        })?
439
7
        .to_string();
440
441
7
    Ok(FileInfo {
442
7
        name_or_path: NameOrPath::Name(name),
443
7
        digest,
444
7
        is_executable,
445
7
    })
446
7
}
447
448
2
async fn upload_symlink(
449
2
    full_path: impl AsRef<Path> + Debug,
450
2
    full_work_directory_path: impl AsRef<Path>,
451
2
) -> Result<SymlinkInfo, Error> {
452
2
    let full_target_path = fs::read_link(full_path.as_ref())
453
2
        .await
454
2
        .err_tip(|| 
format!0
("Could not get read_link path of {full_path:?}"))
?0
;
455
456
    // Detect if our symlink is inside our work directory, if it is find the
457
    // relative path otherwise use the absolute path.
458
2
    let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {
459
0
        let full_target_path = RelativePath::from_path(&full_target_path)
460
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?;
461
0
        RelativePath::from_path(full_work_directory_path.as_ref())
462
0
            .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?
463
0
            .relative(full_target_path)
464
0
            .normalize()
465
0
            .into_string()
466
    } else {
467
2
        full_target_path
468
2
            .to_str()
469
2
            .err_tip(|| 
{0
470
0
                make_err!(
471
0
                    Code::Internal,
472
                    "Could not convert '{:?}' to string",
473
                    full_target_path
474
                )
475
0
            })?
476
2
            .to_string()
477
    };
478
479
2
    let name = full_path
480
2
        .as_ref()
481
2
        .file_name()
482
2
        .err_tip(|| 
format!0
("Expected file_name to exist on {full_path:?}"))
?0
483
2
        .to_str()
484
2
        .err_tip(|| 
{0
485
0
            make_err!(
486
0
                Code::Internal,
487
                "Could not convert {:?} to string",
488
                full_path
489
            )
490
0
        })?
491
2
        .to_string();
492
493
2
    Ok(SymlinkInfo {
494
2
        name_or_path: NameOrPath::Name(name),
495
2
        target,
496
2
    })
497
2
}
498
499
3
fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(
500
3
    cas_store: Pin<&'a impl StoreLike>,
501
3
    full_dir_path: P,
502
3
    full_work_directory: &'a str,
503
3
    hasher: DigestHasherFunc,
504
3
    digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,
505
3
) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> {
506
3
    Box::pin(async move {
507
3
        let file_futures = FuturesUnordered::new();
508
3
        let dir_futures = FuturesUnordered::new();
509
3
        let symlink_futures = FuturesUnordered::new();
510
        {
511
3
            let (_permit, dir_handle) = fs::read_dir(&full_dir_path)
512
3
                .await
513
3
                .err_tip(|| 
format!0
("Error reading dir for reading {full_dir_path:?}"))
?0
514
3
                .into_inner();
515
3
            let mut dir_stream = ReadDirStream::new(dir_handle);
516
            // Note: Try very hard to not leave file descriptors open. Try to keep them as short
517
            // lived as possible. This is why we iterate the directory and then build a bunch of
518
            // futures with all the work we are wanting to do then execute it. It allows us to
519
            // close the directory iterator file descriptor, then open the child files/folders.
520
8
            while let Some(
entry_result5
) = dir_stream.next().await {
521
5
                let entry = entry_result.err_tip(|| "Error while iterating directory")
?0
;
522
5
                let file_type = entry
523
5
                    .file_type()
524
5
                    .await
525
5
                    .err_tip(|| 
format!0
("Error running file_type() on {entry:?}"))
?0
;
526
5
                let full_path = full_dir_path.as_ref().join(entry.path());
527
5
                if file_type.is_dir() {
528
1
                    let full_dir_path = full_dir_path.clone();
529
1
                    dir_futures.push(
530
1
                        upload_directory(
531
1
                            cas_store,
532
1
                            full_path.clone(),
533
1
                            full_work_directory,
534
1
                            hasher,
535
1
                            digest_uploaders.clone(),
536
                        )
537
1
                        .and_then(|(dir, all_dirs)| async move {
538
1
                            let directory_name = full_path
539
1
                                .file_name()
540
1
                                .err_tip(|| 
{0
541
0
                                    format!("Expected file_name to exist on {full_dir_path:?}")
542
0
                                })?
543
1
                                .to_str()
544
1
                                .err_tip(|| 
{0
545
0
                                    make_err!(
546
0
                                        Code::Internal,
547
                                        "Could not convert {:?} to string",
548
                                        full_dir_path
549
                                    )
550
0
                                })?
551
1
                                .to_string();
552
553
1
                            let digest =
554
1
                                serialize_and_upload_message(&dir, cas_store, &mut hasher.hasher())
555
1
                                    .await
556
1
                                    .err_tip(|| 
format!0
("for {}",
full_path.display()0
))
?0
;
557
558
1
                            Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((
559
1
                                DirectoryNode {
560
1
                                    name: directory_name,
561
1
                                    digest: Some(digest.into()),
562
1
                                },
563
1
                                all_dirs,
564
1
                            ))
565
2
                        })
566
1
                        .boxed(),
567
                    );
568
4
                } else if file_type.is_file() {
569
3
                    let digest_uploaders = digest_uploaders.clone();
570
3
                    file_futures.push(async move {
571
3
                        let metadata = fs::metadata(&full_path)
572
3
                            .await
573
3
                            .err_tip(|| 
format!0
("Could not open file {}",
full_path.display()0
))
?0
;
574
3
                        upload_file(cas_store, &full_path, hasher, metadata, digest_uploaders)
575
3
                            .map_ok(TryInto::try_into)
576
3
                            .await
?0
577
3
                    });
578
1
                } else if file_type.is_symlink() {
579
1
                    symlink_futures.push(
580
1
                        upload_symlink(full_path, &full_work_directory)
581
1
                            .map(|symlink| symlink
?0
.try_into()),
582
                    );
583
0
                }
584
            }
585
        }
586
587
3
        let (mut file_nodes, dir_entries, mut symlinks) = try_join3(
588
3
            file_futures.try_collect::<Vec<FileNode>>(),
589
3
            dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),
590
3
            symlink_futures.try_collect::<Vec<SymlinkNode>>(),
591
3
        )
592
3
        .await
?0
;
593
594
3
        let mut directory_nodes = Vec::with_capacity(dir_entries.len());
595
        // For efficiency we use a deque because it allows cheap concat of Vecs.
596
        // We make the assumption here that when performance is important it is because
597
        // our directory is quite large. This allows us to cheaply merge large amounts of
598
        // directories into one VecDeque. Then after we are done we need to collapse it
599
        // down into a single Vec.
600
3
        let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());
601
3
        for (
directory_node1
,
mut recursive_child_directories1
) in dir_entries {
602
1
            directory_nodes.push(directory_node);
603
1
            all_child_directories.append(&mut recursive_child_directories);
604
1
        }
605
606
3
        file_nodes.sort_unstable_by(|a, b| 
a.name1
.
cmp1
(
&b.name1
));
607
3
        directory_nodes.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
608
3
        symlinks.sort_unstable_by(|a, b| 
a.name0
.
cmp0
(
&b.name0
));
609
610
3
        let directory = Directory {
611
3
            files: file_nodes,
612
3
            directories: directory_nodes,
613
3
            symlinks,
614
3
            node_properties: None, // We don't support file properties.
615
3
        };
616
3
        all_child_directories.push_back(directory.clone());
617
618
3
        Ok((directory, all_child_directories))
619
3
    })
620
3
}
621
622
0
async fn process_side_channel_file(
623
0
    side_channel_file: Cow<'_, OsStr>,
624
0
    args: &[&OsStr],
625
0
    timeout: Duration,
626
0
) -> Result<Option<Error>, Error> {
627
0
    let mut json_contents = String::new();
628
    {
629
        // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
630
0
        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
631
0
            Ok(file_slot) => file_slot,
632
0
            Err(e) => {
633
0
                if e.code != Code::NotFound {
634
0
                    return Err(e).err_tip(|| "Error opening side channel file");
635
0
                }
636
                // Note: If file does not exist, it's ok. Users are not required to create this file.
637
0
                return Ok(None);
638
            }
639
        };
640
0
        file_slot
641
0
            .read_to_string(&mut json_contents)
642
0
            .await
643
0
            .err_tip(|| "Error reading side channel file")?;
644
    }
645
646
0
    let side_channel_info: SideChannelInfo = serde_json5::from_str(&json_contents)
647
0
        .map_err(Error::from)
648
0
        .err_tip(|| "Could not convert contents of side channel file (json) to SideChannelInfo")?;
649
0
    Ok(side_channel_info.failure.map(|failure| match failure {
650
        SideChannelFailureReason::Timeout => {
651
0
            let join_args = args.join(OsStr::new(" "));
652
0
            let command = join_args.to_string_lossy();
653
0
            warn!(%command, timeout=timeout.as_secs_f32(), "Side channel timeout for command");
654
0
            Error::new(
655
0
                Code::DeadlineExceeded,
656
0
                format!(
657
                    "Command '{}' timed out after {} seconds",
658
                    command,
659
0
                    timeout.as_secs_f32()
660
                ),
661
            )
662
        }
663
0
    }))
664
0
}
665
666
17
async fn do_cleanup(
667
17
    running_actions_manager: &Arc<RunningActionsManagerImpl>,
668
17
    operation_id: &OperationId,
669
17
    action_directory: &str,
670
17
) -> Result<(), Error> {
671
    // Mark this operation as being cleaned up
672
17
    let Some(_cleaning_guard) = running_actions_manager.perform_cleanup(operation_id.clone())
673
    else {
674
        // Cleanup is already happening elsewhere.
675
0
        return Ok(());
676
    };
677
678
17
    debug!("Worker cleaning up");
679
    // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
680
17
    let remove_dir_result = fs::remove_dir_all(action_directory)
681
17
        .await
682
17
        .err_tip(|| 
format!0
("Could not remove working directory {action_directory}"));
683
684
17
    if let Err(
err0
) = running_actions_manager.cleanup_action(operation_id) {
685
0
        error!(%operation_id, ?err, "Error cleaning up action");
686
0
        Result::<(), Error>::Err(err).merge(remove_dir_result)
687
17
    } else if let Err(
err0
) = remove_dir_result {
688
0
        error!(%operation_id, ?err, "Error removing working directory");
689
0
        Err(err)
690
    } else {
691
17
        Ok(())
692
    }
693
17
}
694
695
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
696
    /// Returns the action id of the action.
697
    fn get_operation_id(&self) -> &OperationId;
698
699
    /// Anything that needs to execute before the actions is actually executed should happen here.
700
    fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
701
702
    /// Actually perform the execution of the action.
703
    fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
704
705
    /// Any uploading, processing or analyzing of the results should happen here.
706
    fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
707
708
    /// Cleanup any residual files, handles or other junk resulting from running the action.
709
    fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;
710
711
    /// Returns the final result. As a general rule this action should be thought of as
712
    /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
713
    /// is over and any action performed on it after this call is undefined behavior.
714
    fn get_finished_result(
715
        self: Arc<Self>,
716
    ) -> impl Future<Output = Result<ActionResult, Error>> + Send;
717
718
    /// Returns the work directory of the action.
719
    fn get_work_directory(&self) -> &String;
720
}
721
722
#[derive(Debug)]
723
struct RunningActionImplExecutionResult {
724
    stdout: Bytes,
725
    stderr: Bytes,
726
    exit_code: i32,
727
}
728
729
#[derive(Debug)]
730
struct RunningActionImplState {
731
    command_proto: Option<ProtoCommand>,
732
    // TODO(palfrey) Kill is not implemented yet, but is instrumented.
733
    // However, it is used if the worker disconnects to destroy current jobs.
734
    kill_channel_tx: Option<oneshot::Sender<()>>,
735
    kill_channel_rx: Option<oneshot::Receiver<()>>,
736
    execution_result: Option<RunningActionImplExecutionResult>,
737
    action_result: Option<ActionResult>,
738
    execution_metadata: ExecutionMetadata,
739
    // If there was an internal error, this will be set.
740
    // This should NOT be set if everything was fine, but the process had a
741
    // non-zero exit code. Instead this should be used for internal errors
742
    // that prevented the action from running, upload failures, timeouts, exc...
743
    // but we have (or could have) the action results (like stderr/stdout).
744
    error: Option<Error>,
745
}
746
747
#[derive(Debug)]
748
pub struct RunningActionImpl {
749
    operation_id: OperationId,
750
    action_directory: String,
751
    work_directory: String,
752
    action_info: ActionInfo,
753
    timeout: Duration,
754
    running_actions_manager: Arc<RunningActionsManagerImpl>,
755
    state: Mutex<RunningActionImplState>,
756
    has_manager_entry: AtomicBool,
757
    did_cleanup: AtomicBool,
758
}
759
760
impl RunningActionImpl {
761
19
    pub fn new(
762
19
        execution_metadata: ExecutionMetadata,
763
19
        operation_id: OperationId,
764
19
        action_directory: String,
765
19
        action_info: ActionInfo,
766
19
        timeout: Duration,
767
19
        running_actions_manager: Arc<RunningActionsManagerImpl>,
768
19
    ) -> Self {
769
19
        let work_directory = format!("{}/{}", action_directory, "work");
770
19
        let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
771
19
        Self {
772
19
            operation_id,
773
19
            action_directory,
774
19
            work_directory,
775
19
            action_info,
776
19
            timeout,
777
19
            running_actions_manager,
778
19
            state: Mutex::new(RunningActionImplState {
779
19
                command_proto: None,
780
19
                kill_channel_rx: Some(kill_channel_rx),
781
19
                kill_channel_tx: Some(kill_channel_tx),
782
19
                execution_result: None,
783
19
                action_result: None,
784
19
                execution_metadata,
785
19
                error: None,
786
19
            }),
787
19
            // Always need to ensure that we're removed from the manager on Drop.
788
19
            has_manager_entry: AtomicBool::new(true),
789
19
            // Only needs to be cleaned up after a prepare_action call, set there.
790
19
            did_cleanup: AtomicBool::new(true),
791
19
        }
792
19
    }
793
794
    #[allow(
795
        clippy::missing_const_for_fn,
796
        reason = "False positive on stable, but not on nightly"
797
    )]
798
0
    fn metrics(&self) -> &Arc<Metrics> {
799
0
        &self.running_actions_manager.metrics
800
0
    }
801
802
    /// Prepares any actions needed to execute this action. This action will do the following:
803
    ///
804
    /// * Download any files needed to execute the action
805
    /// * Build a folder with all files needed to execute the action.
806
    ///
807
    /// This function will aggressively download and spawn potentially thousands of futures. It is
808
    /// up to the stores to rate limit if needed.
809
15
    
async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
810
15
        {
811
15
            let mut state = self.state.lock();
812
15
            state.execution_metadata.input_fetch_start_timestamp =
813
15
                (self.running_actions_manager.callbacks.now_fn)();
814
15
        }
815
15
        let command = {
816
            // Download and build out our input files/folders. Also fetch and decode our Command.
817
15
            let command_fut = self.metrics().get_proto_command_from_store.wrap(async {
818
15
                get_and_decode_digest::<ProtoCommand>(
819
15
                    self.running_actions_manager.cas_store.as_ref(),
820
15
                    self.action_info.command_digest.into(),
821
15
                )
822
15
                .await
823
15
                .err_tip(|| "Converting command_digest to Command")
824
15
            });
825
15
            let filesystem_store_pin =
826
15
                Pin::new(self.running_actions_manager.filesystem_store.as_ref());
827
15
            let (command, ()) = try_join(command_fut, async {
828
15
                fs::create_dir(&self.work_directory)
829
15
                    .await
830
15
                    .err_tip(|| 
format!0
("Error creating work directory {}",
self.work_directory0
))
?0
;
831
                // Now the work directory has been created, we have to clean up.
832
15
                self.did_cleanup.store(false, Ordering::Release);
833
                // Download the input files/folder and place them into the temp directory.
834
                // Use directory cache if available for better performance.
835
15
                self.metrics()
836
15
                    .download_to_directory
837
15
                    .wrap(prepare_action_inputs(
838
15
                        &self.running_actions_manager.directory_cache,
839
15
                        &self.running_actions_manager.cas_store,
840
15
                        filesystem_store_pin,
841
15
                        &self.action_info.input_root_digest,
842
15
                        &self.work_directory,
843
15
                    ))
844
15
                    .await
845
15
            })
846
15
            .await
?0
;
847
15
            command
848
        };
849
        {
850
            // Create all directories needed for our output paths. This is required by the bazel spec.
851
15
            let prepare_output_directories = |output_file| 
{9
852
9
                let full_output_path = if command.working_directory.is_empty() {
853
1
                    format!("{}/{}", self.work_directory, output_file)
854
                } else {
855
8
                    format!(
856
                        "{}/{}/{}",
857
8
                        self.work_directory, command.working_directory, output_file
858
                    )
859
                };
860
9
                async move {
861
9
                    let full_parent_path = Path::new(&full_output_path)
862
9
                        .parent()
863
9
                        .err_tip(|| 
format!0
("Parent path for {full_output_path} has no parent"))
?0
;
864
9
                    fs::create_dir_all(full_parent_path).await.err_tip(|| 
{0
865
0
                        format!(
866
                            "Error creating output directory {} (file)",
867
0
                            full_parent_path.display()
868
                        )
869
0
                    })?;
870
9
                    Result::<(), Error>::Ok(())
871
9
                }
872
9
            };
873
15
            self.metrics()
874
15
                .prepare_output_files
875
15
                .wrap(try_join_all(
876
15
                    command.output_files.iter().map(prepare_output_directories),
877
15
                ))
878
15
                .await
?0
;
879
15
            self.metrics()
880
15
                .prepare_output_paths
881
15
                .wrap(try_join_all(
882
15
                    command.output_paths.iter().map(prepare_output_directories),
883
15
                ))
884
15
                .await
?0
;
885
        }
886
15
        debug!(?command, "Worker received command");
887
15
        {
888
15
            let mut state = self.state.lock();
889
15
            state.command_proto = Some(command);
890
15
            state.execution_metadata.input_fetch_completed_timestamp =
891
15
                (self.running_actions_manager.callbacks.now_fn)();
892
15
        }
893
15
        Ok(self)
894
15
    }
895
896
14
    pub fn canonicalise_path(
897
14
        self: &Arc<Self>,
898
14
        arg: &OsStr,
899
14
        working_directory: &String,
900
14
    ) -> Result<PathBuf, Error> {
901
        // If the program contains a slash, we treat it as a path and resolve it relative to the work directory.
902
14
        Ok(if Path::new(arg).components().count() > 1 {
903
1
            let canonical_path = PathBuf::from(&self.work_directory)
904
1
                .join(working_directory)
905
1
                .join(arg);
906
1
            if cfg!(target_os = "windows") {
907
                // Workaround for https://github.com/rust-lang/rust/issues/42869 using a windows-specific crate
908
0
                dunce::canonicalize(canonical_path)
909
            } else {
910
1
                canonical_path.canonicalize()
911
            }
912
1
            .err_tip(|| 
{0
913
0
                format!(
914
                    "Could not canonicalize path for command root {}.",
915
0
                    arg.to_string_lossy()
916
                )
917
0
            })?
918
        } else {
919
13
            PathBuf::from(arg)
920
        })
921
14
    }
922
923
13
    
async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
924
13
        let (command_proto, mut kill_channel_rx) = {
925
13
            let mut state = self.state.lock();
926
13
            state.execution_metadata.execution_start_timestamp =
927
13
                (self.running_actions_manager.callbacks.now_fn)();
928
            (
929
13
                state
930
13
                    .command_proto
931
13
                    .take()
932
13
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
933
13
                state
934
13
                    .kill_channel_rx
935
13
                    .take()
936
13
                    .err_tip(|| "Expected state to have kill_channel_rx in execute()")
?0
937
                    // This is important as we may be killed at any point.
938
13
                    .fuse(),
939
            )
940
        };
941
13
        if command_proto.arguments.is_empty() {
942
0
            return Err(make_input_err!("No arguments provided in Command proto"));
943
13
        }
944
13
        let args: Vec<&OsStr> = if let Some(
entrypoint0
) = &self
945
13
            .running_actions_manager
946
13
            .execution_configuration
947
13
            .entrypoint
948
        {
949
0
            core::iter::once(entrypoint.as_ref())
950
0
                .chain(command_proto.arguments.iter().map(AsRef::as_ref))
951
0
                .collect()
952
        } else {
953
13
            command_proto.arguments.iter().map(AsRef::as_ref).collect()
954
        };
955
        // TODO(palfrey): This should probably be in debug, but currently
956
        //                    that's too busy and we often rely on this to
957
        //                    figure out toolchain misconfiguration issues.
958
        //                    De-bloat the `debug` level by using the `trace`
959
        //                    level more effectively and adjust this.
960
13
        info!(?args, "Executing command");
961
962
13
        let program = self.canonicalise_path(args[0], &command_proto.working_directory)
?0
;
963
964
13
        let mut command_builder = process::Command::new(program);
965
13
        command_builder
966
13
            .args(&args[1..])
967
13
            .kill_on_drop(true)
968
13
            .stdin(Stdio::null())
969
13
            .stdout(Stdio::piped())
970
13
            .stderr(Stdio::piped())
971
13
            .current_dir(format!(
972
13
                "{}/{}",
973
13
                self.work_directory, command_proto.working_directory
974
13
            ))
975
13
            .env_clear();
976
977
13
        let requested_timeout = if self.action_info.timeout.is_zero() {
978
11
            self.running_actions_manager.max_action_timeout
979
        } else {
980
2
            self.action_info.timeout
981
        };
982
983
13
        let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
984
13
        if let Some(
additional_environment0
) = &self
985
13
            .running_actions_manager
986
13
            .execution_configuration
987
13
            .additional_environment
988
        {
989
0
            for (name, source) in additional_environment {
990
0
                let value = match source {
991
0
                    EnvironmentSource::Property(property) => self
992
0
                        .action_info
993
0
                        .platform_properties
994
0
                        .get(property)
995
0
                        .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())),
996
0
                    EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),
997
                    EnvironmentSource::FromEnvironment => {
998
0
                        Cow::Owned(env::var(name).unwrap_or_default())
999
                    }
1000
                    EnvironmentSource::TimeoutMillis => {
1001
0
                        Cow::Owned(requested_timeout.as_millis().to_string())
1002
                    }
1003
                    EnvironmentSource::SideChannelFile => {
1004
0
                        let file_cow =
1005
0
                            format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
1006
0
                        maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
1007
0
                        Cow::Owned(file_cow)
1008
                    }
1009
                    EnvironmentSource::ActionDirectory => {
1010
0
                        Cow::Borrowed(self.action_directory.as_str())
1011
                    }
1012
                };
1013
0
                command_builder.env(name, value.as_ref());
1014
            }
1015
13
        }
1016
1017
        #[cfg(target_family = "unix")]
1018
13
        let envs = &command_proto.environment_variables;
1019
        // If SystemRoot is not set on windows we set it to default. Failing to do
1020
        // this causes all commands to fail.
1021
        #[cfg(target_family = "windows")]
1022
        let envs = {
1023
            let mut envs = command_proto.environment_variables.clone();
1024
            if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") {
1025
                envs.push(
1026
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
1027
                        name: "SystemRoot".to_string(),
1028
                        value: "C:\\Windows".to_string(),
1029
                    },
1030
                );
1031
            }
1032
            if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") {
1033
                envs.push(
1034
                    nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable {
1035
                        name: "PATH".to_string(),
1036
                        value: "C:\\Windows\\System32".to_string(),
1037
                    },
1038
                );
1039
            }
1040
            envs
1041
        };
1042
13
        for environment_variable in envs {
1043
13
            command_builder.env(&environment_variable.name, &environment_variable.value);
1044
13
        }
1045
1046
        // Sandboxing of the command if we are running on Linux, this resolves issues where
1047
        // children can spawn children and also provides better reproducibility.
1048
        #[cfg(target_os = "linux")]
1049
        {
1050
13
            let use_namespaces = self.running_actions_manager.use_namespaces;
1051
13
            let root_action_directory =
1052
13
                std::ffi::CString::new(self.running_actions_manager.root_action_directory.clone())
1053
13
                    .err_tip(|| "In RunningActionImpl::inner_execute()")
?0
;
1054
13
            let action_directory = std::ffi::CString::new(self.action_directory.clone())
1055
13
                .err_tip(|| "In RunningActionImpl::inner_execute()")
?0
;
1056
1057
            // SAFETY: This function is specifically designed to operate in a async-signal-safe
1058
            // environment.
1059
            unsafe {
1060
13
                command_builder.pre_exec(move || match 
use_namespaces0
{
1061
0
                    UseNamespaces::No => Ok(()),
1062
0
                    _ => crate::namespace_utils::configure_namespace(
1063
0
                        matches!(use_namespaces, UseNamespaces::YesAndMount),
1064
0
                        &root_action_directory,
1065
0
                        &action_directory,
1066
                    ),
1067
0
                });
1068
            }
1069
        }
1070
1071
13
        let mut child_process = command_builder
1072
13
            .spawn()
1073
13
            .err_tip(|| 
format!0
("Could not execute command {args:?}"))
?0
;
1074
13
        let mut stdout_reader = child_process
1075
13
            .stdout
1076
13
            .take()
1077
13
            .err_tip(|| "Expected stdout to exist on command this should never happen")
?0
;
1078
13
        let mut stderr_reader = child_process
1079
13
            .stderr
1080
13
            .take()
1081
13
            .err_tip(|| "Expected stderr to exist on command this should never happen")
?0
;
1082
1083
        #[cfg(target_os = "linux")]
1084
        // Wrap the child process to send SIGTERM rather than SIGKILL if namespaced to
1085
        // prevent zombie processes.
1086
13
        let child_process = crate::namespace_utils::MaybeNamespacedChild::new(
1087
13
            !matches!(
1088
13
                self.running_actions_manager.use_namespaces,
1089
                UseNamespaces::No,
1090
            ),
1091
13
            child_process,
1092
        );
1093
1094
13
        let mut child_process_guard = guard(child_process, |mut child_process| 
{0
1095
0
            let result: Result<Option<std::process::ExitStatus>, std::io::Error> =
1096
0
                child_process.try_wait();
1097
0
            match result {
1098
0
                Ok(res) if res.is_some() => {
1099
0
                    // The child already exited, probably a timeout or kill operation
1100
0
                }
1101
0
                result => {
1102
0
                    error!(
1103
                        ?result,
1104
                        "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."
1105
                    );
1106
0
                    background_spawn!("running_actions_manager_kill_child_process", async move {
1107
0
                        drop(child_process.kill().await);
1108
0
                    });
1109
                }
1110
            }
1111
0
        });
1112
1113
13
        let all_stdout_fut = spawn!("stdout_reader", async move {
1114
13
            let mut all_stdout = BytesMut::new();
1115
            loop {
1116
16
                let sz = stdout_reader
1117
16
                    .read_buf(&mut all_stdout)
1118
16
                    .await
1119
16
                    .err_tip(|| "Error reading stdout stream")
?0
;
1120
16
                if sz == 0 {
1121
13
                    break; // EOF.
1122
3
                }
1123
            }
1124
13
            Result::<Bytes, Error>::Ok(all_stdout.freeze())
1125
13
        });
1126
13
        let all_stderr_fut = spawn!("stderr_reader", async move {
1127
13
            let mut all_stderr = BytesMut::new();
1128
            loop {
1129
16
                let sz = stderr_reader
1130
16
                    .read_buf(&mut all_stderr)
1131
16
                    .await
1132
16
                    .err_tip(|| "Error reading stderr stream")
?0
;
1133
16
                if sz == 0 {
1134
13
                    break; // EOF.
1135
3
                }
1136
            }
1137
13
            Result::<Bytes, Error>::Ok(all_stderr.freeze())
1138
13
        });
1139
13
        let mut killed_action = false;
1140
1141
13
        let timer = self.metrics().child_process.begin_timer();
1142
13
        let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();
1143
        loop {
1144
17
            tokio::select! {
1145
17
                () = &mut sleep_fut => {
1146
2
                    self.running_actions_manager.metrics.task_timeouts.inc();
1147
2
                    killed_action = true;
1148
2
                    if let Err(
err0
) = child_process_guard.kill().await {
1149
0
                        error!(
1150
                            ?err,
1151
                            "Could not kill process in RunningActionsManager for action timeout",
1152
                        );
1153
2
                    }
1154
                    {
1155
2
                        let joined_command = args.join(OsStr::new(" "));
1156
2
                        let command = joined_command.to_string_lossy();
1157
2
                        info!(
1158
2
                            seconds = self.action_info.timeout.as_secs_f32(),
1159
                            %command,
1160
                            "Command timed out"
1161
                        );
1162
2
                        let mut state = self.state.lock();
1163
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1164
2
                            Code::DeadlineExceeded,
1165
2
                            format!(
1166
2
                                "Command '{}' timed out after {} seconds",
1167
2
                                command,
1168
2
                                self.action_info.timeout.as_secs_f32()
1169
2
                            )
1170
2
                        )));
1171
                    }
1172
                },
1173
17
                
maybe_exit_status13
= child_process_guard.wait() => {
1174
                    // Defuse our guard so it does not try to cleanup and make senseless logs.
1175
13
                    drop(ScopeGuard::<_, _>::into_inner(child_process_guard));
1176
13
                    let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")
?0
;
1177
                    // TODO(palfrey) We should implement stderr/stdout streaming to client here.
1178
                    // If we get killed before the stream is started, then these will lock up.
1179
                    // TODO(palfrey) There is a significant bug here. If we kill the action and the action creates
1180
                    // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225
1181
13
                    let (stdout, stderr) = if killed_action {
1182
4
                        drop(timer);
1183
4
                        (Bytes::new(), Bytes::new())
1184
                    } else {
1185
9
                        timer.measure();
1186
9
                        let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);
1187
                        (
1188
9
                            maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")
?0
?0
,
1189
9
                            maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")
?0
?0
1190
                        )
1191
                    };
1192
1193
13
                    let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| 
{9
1194
9
                        if exit_code == 0 {
1195
8
                            self.metrics().child_process_success_error_code.inc();
1196
8
                        } else {
1197
1
                            self.metrics().child_process_failure_error_code.inc();
1198
1
                        }
1199
9
                        exit_code
1200
9
                    });
1201
1202
13
                    info!(?args, "Command complete");
1203
1204
13
                    let maybe_error_override = if let Some(
side_channel_file0
) = maybe_side_channel_file {
1205
0
                        process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
1206
0
                        .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))?
1207
                    } else {
1208
13
                        None
1209
                    };
1210
13
                    {
1211
13
                        let mut state = self.state.lock();
1212
13
                        state.error = Error::merge_option(state.error.take(), maybe_error_override);
1213
13
1214
13
                        state.command_proto = Some(command_proto);
1215
13
                        state.execution_result = Some(RunningActionImplExecutionResult{
1216
13
                            stdout,
1217
13
                            stderr,
1218
13
                            exit_code,
1219
13
                        });
1220
13
                        state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();
1221
13
                    }
1222
13
                    return Ok(self);
1223
                },
1224
17
                _ = &mut kill_channel_rx => {
1225
2
                    killed_action = true;
1226
2
                    if let Err(
err0
) = child_process_guard.kill().await {
1227
0
                        error!(
1228
0
                            operation_id = ?self.operation_id,
1229
                            ?err,
1230
                            "Could not kill process",
1231
                        );
1232
2
                    }
1233
2
                    {
1234
2
                        let mut state = self.state.lock();
1235
2
                        state.error = Error::merge_option(state.error.take(), Some(Error::new(
1236
2
                            Code::Aborted,
1237
2
                            format!(
1238
2
                                "Command '{}' was killed by scheduler",
1239
2
                                args.join(OsStr::new(" ")).to_string_lossy()
1240
2
                            )
1241
2
                        )));
1242
2
                    }
1243
                },
1244
            }
1245
        }
1246
        // Unreachable.
1247
13
    }
1248
1249
11
    async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1250
        enum OutputType {
1251
            None,
1252
            File(FileInfo),
1253
            Directory(DirectoryInfo),
1254
            FileSymlink(SymlinkInfo),
1255
            DirectorySymlink(SymlinkInfo),
1256
        }
1257
1258
11
        let upload_start = std::time::Instant::now();
1259
11
        debug!(
1260
11
            operation_id = ?self.operation_id,
1261
            "Worker uploading results - starting",
1262
        );
1263
11
        let (mut command_proto, execution_result, mut execution_metadata) = {
1264
11
            let mut state = self.state.lock();
1265
11
            state.execution_metadata.output_upload_start_timestamp =
1266
11
                (self.running_actions_manager.callbacks.now_fn)();
1267
            (
1268
11
                state
1269
11
                    .command_proto
1270
11
                    .take()
1271
11
                    .err_tip(|| "Expected state to have command_proto in execute()")
?0
,
1272
11
                state
1273
11
                    .execution_result
1274
11
                    .take()
1275
11
                    .err_tip(|| "Execution result does not exist at upload_results stage")
?0
,
1276
11
                state.execution_metadata.clone(),
1277
            )
1278
        };
1279
11
        let cas_store = self.running_actions_manager.cas_store.as_ref();
1280
11
        let hasher = self.action_info.unique_qualifier.digest_function();
1281
1282
11
        let mut output_path_futures = FuturesUnordered::new();
1283
11
        let mut output_paths = command_proto.output_paths;
1284
11
        if output_paths.is_empty() {
1285
6
            output_paths
1286
6
                .reserve(command_proto.output_files.len() + command_proto.output_directories.len());
1287
6
            output_paths.append(&mut command_proto.output_files);
1288
6
            output_paths.append(&mut command_proto.output_directories);
1289
6
        
}5
1290
11
        let digest_uploaders = Arc::new(Mutex::new(HashMap::new()));
1291
11
        for 
entry7
in output_paths {
1292
7
            let full_path = OsString::from(if command_proto.working_directory.is_empty() {
1293
0
                format!("{}/{}", self.work_directory, entry)
1294
            } else {
1295
7
                format!(
1296
                    "{}/{}/{}",
1297
7
                    self.work_directory, command_proto.working_directory, entry
1298
                )
1299
            });
1300
7
            let work_directory = &self.work_directory;
1301
7
            let digest_uploaders = digest_uploaders.clone();
1302
7
            output_path_futures.push(async move {
1303
3
                let metadata = {
1304
7
                    let metadata = match fs::symlink_metadata(&full_path).await {
1305
7
                        Ok(file) => file,
1306
0
                        Err(e) => {
1307
0
                            if e.code == Code::NotFound {
1308
                                // In the event our output does not exist, according to the bazel remote
1309
                                // execution spec, we simply ignore it continue.
1310
0
                                return Result::<OutputType, Error>::Ok(OutputType::None);
1311
0
                            }
1312
0
                            return Err(e).err_tip(|| {
1313
0
                                format!("Could not open file {}", full_path.display())
1314
0
                            });
1315
                        }
1316
                    };
1317
1318
7
                    if metadata.is_file() {
1319
                        return Ok(OutputType::File(
1320
4
                            upload_file(
1321
4
                                cas_store.as_pin(),
1322
4
                                &full_path,
1323
4
                                hasher,
1324
4
                                metadata,
1325
4
                                digest_uploaders,
1326
4
                            )
1327
4
                            .await
1328
4
                            .map(|mut file_info| {
1329
4
                                file_info.name_or_path = NameOrPath::Path(entry);
1330
4
                                file_info
1331
4
                            })
1332
4
                            .err_tip(|| 
format!0
("Uploading file {}",
full_path.display()0
))
?0
,
1333
                        ));
1334
3
                    }
1335
3
                    metadata
1336
                };
1337
3
                if metadata.is_dir() {
1338
                    Ok(OutputType::Directory(
1339
2
                        upload_directory(
1340
2
                            cas_store.as_pin(),
1341
2
                            &full_path,
1342
2
                            work_directory,
1343
2
                            hasher,
1344
2
                            digest_uploaders,
1345
                        )
1346
2
                        .and_then(|(root_dir, children)| async move {
1347
2
                            let tree = ProtoTree {
1348
2
                                root: Some(root_dir),
1349
2
                                children: children.into(),
1350
2
                            };
1351
2
                            let tree_digest = serialize_and_upload_message(
1352
2
                                &tree,
1353
2
                                cas_store.as_pin(),
1354
2
                                &mut hasher.hasher(),
1355
2
                            )
1356
2
                            .await
1357
2
                            .err_tip(|| 
format!0
("While processing {entry}"))
?0
;
1358
2
                            Ok(DirectoryInfo {
1359
2
                                path: entry,
1360
2
                                tree_digest,
1361
2
                            })
1362
4
                        })
1363
2
                        .await
1364
2
                        .err_tip(|| 
format!0
("Uploading directory {}",
full_path.display()0
))
?0
,
1365
                    ))
1366
1
                } else if metadata.is_symlink() {
1367
1
                    let output_symlink = upload_symlink(&full_path, work_directory)
1368
1
                        .await
1369
1
                        .map(|mut symlink_info| {
1370
1
                            symlink_info.name_or_path = NameOrPath::Path(entry);
1371
1
                            symlink_info
1372
1
                        })
1373
1
                        .err_tip(|| 
format!0
("Uploading symlink {}",
full_path.display()0
))
?0
;
1374
1
                    match fs::metadata(&full_path).await {
1375
1
                        Ok(metadata) => {
1376
1
                            if metadata.is_dir() {
1377
0
                                Ok(OutputType::DirectorySymlink(output_symlink))
1378
                            } else {
1379
                                // Note: If it's anything but directory we put it as a file symlink.
1380
1
                                Ok(OutputType::FileSymlink(output_symlink))
1381
                            }
1382
                        }
1383
0
                        Err(e) => {
1384
0
                            if e.code != Code::NotFound {
1385
0
                                return Err(e).err_tip(|| {
1386
0
                                    format!(
1387
                                        "While querying target symlink metadata for {}",
1388
0
                                        full_path.display()
1389
                                    )
1390
0
                                });
1391
0
                            }
1392
                            // If the file doesn't exist, we consider it a file. Even though the
1393
                            // file doesn't exist we still need to populate an entry.
1394
0
                            Ok(OutputType::FileSymlink(output_symlink))
1395
                        }
1396
                    }
1397
                } else {
1398
0
                    Err(make_err!(
1399
0
                        Code::Internal,
1400
0
                        "{full_path:?} was not a file, folder or symlink. Must be one.",
1401
0
                    ))
1402
                }
1403
7
            });
1404
        }
1405
11
        let mut output_files = vec![];
1406
11
        let mut output_folders = vec![];
1407
11
        let mut output_directory_symlinks = vec![];
1408
11
        let mut output_file_symlinks = vec![];
1409
1410
11
        if execution_result.exit_code != 0 {
1411
5
            let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>");
1412
5
            let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>");
1413
5
            error!(
1414
                exit_code = ?execution_result.exit_code,
1415
5
                stdout = ?stdout[..min(stdout.len(), 1000)],
1416
5
                stderr = ?stderr[..min(stderr.len(), 1000)],
1417
                "Command returned non-zero exit code",
1418
            );
1419
6
        }
1420
1421
11
        let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1422
11
            let start = std::time::Instant::now();
1423
11
            let data = execution_result.stdout;
1424
11
            let data_len = data.len();
1425
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1426
11
            cas_store
1427
11
                .update_oneshot(digest, data)
1428
11
                .await
1429
11
                .err_tip(|| "Uploading stdout")
?0
;
1430
11
            debug!(
1431
                ?digest,
1432
                data_len,
1433
11
                elapsed_ms = start.elapsed().as_millis(),
1434
                "upload_results: stdout upload completed",
1435
            );
1436
11
            Result::<DigestInfo, Error>::Ok(digest)
1437
11
        });
1438
11
        let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1439
11
            let start = std::time::Instant::now();
1440
11
            let data = execution_result.stderr;
1441
11
            let data_len = data.len();
1442
11
            let digest = compute_buf_digest(&data, &mut hasher.hasher());
1443
11
            cas_store
1444
11
                .update_oneshot(digest, data)
1445
11
                .await
1446
11
                .err_tip(|| "Uploading  stderr")
?0
;
1447
11
            debug!(
1448
                ?digest,
1449
                data_len,
1450
11
                elapsed_ms = start.elapsed().as_millis(),
1451
                "upload_results: stderr upload completed",
1452
            );
1453
11
            Result::<DigestInfo, Error>::Ok(digest)
1454
11
        });
1455
1456
11
        debug!(
1457
11
            operation_id = ?self.operation_id,
1458
11
            num_output_paths = output_path_futures.len(),
1459
            "upload_results: starting stdout/stderr/output_paths uploads",
1460
        );
1461
11
        let join_start = std::time::Instant::now();
1462
11
        let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
1463
18
            while let Some(
output_type7
) = output_path_futures.try_next().await
?0
{
1464
7
                match output_type {
1465
4
                    OutputType::File(output_file) => output_files.push(output_file),
1466
2
                    OutputType::Directory(output_folder) => output_folders.push(output_folder),
1467
1
                    OutputType::FileSymlink(output_symlink) => {
1468
1
                        output_file_symlinks.push(output_symlink);
1469
1
                    }
1470
0
                    OutputType::DirectorySymlink(output_symlink) => {
1471
0
                        output_directory_symlinks.push(output_symlink);
1472
0
                    }
1473
0
                    OutputType::None => { /* Safe to ignore */ }
1474
                }
1475
            }
1476
11
            Ok(())
1477
11
        });
1478
11
        drop(output_path_futures);
1479
11
        debug!(
1480
11
            operation_id = ?self.operation_id,
1481
11
            elapsed_ms = join_start.elapsed().as_millis(),
1482
11
            success = upload_result.is_ok(),
1483
            "upload_results: all uploads completed",
1484
        );
1485
11
        let (stdout_digest, stderr_digest) = match upload_result {
1486
11
            Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),
1487
0
            Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
1488
        };
1489
1490
11
        execution_metadata.output_upload_completed_timestamp =
1491
11
            (self.running_actions_manager.callbacks.now_fn)();
1492
11
        output_files.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1493
11
        output_folders.sort_unstable_by(|a, b| 
a.path0
.
cmp0
(
&b.path0
));
1494
11
        output_file_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1495
11
        output_directory_symlinks.sort_unstable_by(|a, b| 
a.name_or_path0
.
cmp0
(
&b.name_or_path0
));
1496
11
        let num_output_files = output_files.len();
1497
11
        let num_output_folders = output_folders.len();
1498
11
        {
1499
11
            let mut state = self.state.lock();
1500
11
            execution_metadata.worker_completed_timestamp =
1501
11
                (self.running_actions_manager.callbacks.now_fn)();
1502
11
            state.action_result = Some(ActionResult {
1503
11
                output_files,
1504
11
                output_folders,
1505
11
                output_directory_symlinks,
1506
11
                output_file_symlinks,
1507
11
                exit_code: execution_result.exit_code,
1508
11
                stdout_digest,
1509
11
                stderr_digest,
1510
11
                execution_metadata,
1511
11
                server_logs: HashMap::default(), // TODO(palfrey) Not implemented.
1512
11
                error: state.error.clone(),
1513
11
                message: String::new(), // Will be filled in on cache_action_result if needed.
1514
11
            });
1515
11
        }
1516
11
        debug!(
1517
11
            operation_id = ?self.operation_id,
1518
11
            total_elapsed_ms = upload_start.elapsed().as_millis(),
1519
            num_output_files,
1520
            num_output_folders,
1521
            "upload_results: inner_upload_results completed successfully",
1522
        );
1523
11
        Ok(self)
1524
11
    }
1525
1526
11
    
async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0
{
1527
11
        let mut state = self.state.lock();
1528
11
        state
1529
11
            .action_result
1530
11
            .take()
1531
11
            .err_tip(|| "Expected action_result to exist in get_finished_result")
1532
11
    }
1533
}
1534
1535
impl Drop for RunningActionImpl {
1536
19
    fn drop(&mut self) {
1537
19
        if self.did_cleanup.load(Ordering::Acquire) {
1538
19
            if self.has_manager_entry.load(Ordering::Acquire) {
1539
2
                drop(
1540
2
                    self.running_actions_manager
1541
2
                        .cleanup_action(&self.operation_id),
1542
2
                );
1543
17
            }
1544
19
            return;
1545
0
        }
1546
0
        let operation_id = self.operation_id.clone();
1547
0
        error!(
1548
            %operation_id,
1549
            "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."
1550
        );
1551
0
        let running_actions_manager = self.running_actions_manager.clone();
1552
0
        let action_directory = self.action_directory.clone();
1553
0
        background_spawn!("running_action_impl_drop", async move {
1554
0
            let Err(err) =
1555
0
                do_cleanup(&running_actions_manager, &operation_id, &action_directory).await
1556
            else {
1557
0
                return;
1558
            };
1559
0
            error!(
1560
                %operation_id,
1561
                ?action_directory,
1562
                ?err,
1563
                "Error cleaning up action"
1564
            );
1565
0
        });
1566
19
    }
1567
}
1568
1569
impl RunningAction for RunningActionImpl {
1570
0
    fn get_operation_id(&self) -> &OperationId {
1571
0
        &self.operation_id
1572
0
    }
1573
1574
15
    
async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1575
15
        let res = self
1576
15
            .metrics()
1577
15
            .clone()
1578
15
            .prepare_action
1579
15
            .wrap(Self::inner_prepare_action(self))
1580
15
            .await;
1581
15
        if let Err(
ref e0
) = res {
1582
0
            warn!(?e, "Error during prepare_action");
1583
15
        }
1584
15
        res
1585
15
    }
1586
1587
13
    
async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1588
13
        let res = self
1589
13
            .metrics()
1590
13
            .clone()
1591
13
            .execute
1592
13
            .wrap(Self::inner_execute(self))
1593
13
            .await;
1594
13
        if let Err(
ref e0
) = res {
1595
0
            warn!(?e, "Error during prepare_action");
1596
13
        }
1597
13
        res
1598
13
    }
1599
1600
11
    async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
1601
11
        let upload_timeout = self.running_actions_manager.max_upload_timeout;
1602
11
        let operation_id = self.operation_id.clone();
1603
11
        info!(
1604
            ?operation_id,
1605
11
            upload_timeout_s = upload_timeout.as_secs(),
1606
            "upload_results: starting with timeout",
1607
        );
1608
11
        let metrics = self.metrics().clone();
1609
11
        let upload_fut = metrics
1610
11
            .upload_results
1611
11
            .wrap(Self::inner_upload_results(self));
1612
1613
11
        let stall_warn_fut = async 
{7
1614
7
            let mut elapsed_secs = 0u64;
1615
            loop {
1616
7
                tokio::time::sleep(Duration::from_mins(1)).await;
1617
0
                elapsed_secs += 60;
1618
0
                warn!(
1619
                    ?operation_id,
1620
                    elapsed_s = elapsed_secs,
1621
0
                    timeout_s = upload_timeout.as_secs(),
1622
                    "upload_results: still in progress — possible stall",
1623
                );
1624
            }
1625
        };
1626
1627
11
        let res = tokio::time::timeout(upload_timeout, async {
1628
11
            tokio::pin!(upload_fut);
1629
11
            tokio::pin!(stall_warn_fut);
1630
11
            tokio::select! {
1631
11
                result = &mut upload_fut => result,
1632
11
                () = &mut stall_warn_fut => 
unreachable!0
(),
1633
            }
1634
11
        })
1635
11
        .await
1636
11
        .map_err(|err| 
{0
1637
0
            warn!(%operation_id, timeout=upload_timeout.as_secs(), "Upload results timeout");
1638
0
            Error::from_std_err(Code::DeadlineExceeded, &err).append(format!(
1639
                "Upload results timed out after {}s for operation {:?}",
1640
0
                upload_timeout.as_secs(),
1641
                operation_id,
1642
            ))
1643
0
        })?;
1644
11
        if let Err(
ref e0
) = res {
1645
0
            warn!(?operation_id, ?e, "Error during upload_results");
1646
11
        }
1647
11
        res
1648
11
    }
1649
1650
17
    
async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error>0
{
1651
17
        let res = self
1652
17
            .metrics()
1653
17
            .clone()
1654
17
            .cleanup
1655
17
            .wrap(async move {
1656
17
                let result = do_cleanup(
1657
17
                    &self.running_actions_manager,
1658
17
                    &self.operation_id,
1659
17
                    &self.action_directory,
1660
17
                )
1661
17
                .await;
1662
17
                self.has_manager_entry.store(false, Ordering::Release);
1663
17
                self.did_cleanup.store(true, Ordering::Release);
1664
17
                result.map(move |()| self)
1665
17
            })
1666
17
            .await;
1667
17
        if let Err(
ref e0
) = res {
1668
0
            warn!(?e, "Error during cleanup");
1669
17
        }
1670
17
        res
1671
17
    }
1672
1673
11
    async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> {
1674
11
        self.metrics()
1675
11
            .clone()
1676
11
            .get_finished_result
1677
11
            .wrap(Self::inner_get_finished_result(self))
1678
11
            .await
1679
11
    }
1680
1681
0
    fn get_work_directory(&self) -> &String {
1682
0
        &self.work_directory
1683
0
    }
1684
}
1685
1686
pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {
1687
    type RunningAction: RunningAction;
1688
1689
    fn create_and_add_action(
1690
        self: &Arc<Self>,
1691
        worker_id: String,
1692
        start_execute: StartExecute,
1693
    ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;
1694
1695
    fn cache_action_result(
1696
        &self,
1697
        action_digest: DigestInfo,
1698
        action_result: &mut ActionResult,
1699
        hasher: DigestHasherFunc,
1700
    ) -> impl Future<Output = Result<(), Error>> + Send;
1701
1702
    fn kill_all(&self) -> impl Future<Output = ()> + Send;
1703
1704
    fn kill_operation(
1705
        &self,
1706
        operation_id: &OperationId,
1707
    ) -> impl Future<Output = Result<(), Error>> + Send;
1708
1709
    fn metrics(&self) -> &Arc<Metrics>;
1710
}
1711
1712
/// A function to get the current system time, used to allow mocking for tests
1713
type NowFn = fn() -> SystemTime;
1714
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;
1715
1716
/// Functions that may be injected for testing purposes, during standard control
1717
/// flows these are specified by the new function.
1718
#[derive(Clone, Copy)]
1719
pub struct Callbacks {
1720
    /// A function that gets the current time.
1721
    pub now_fn: NowFn,
1722
    /// A function that sleeps for a given Duration.
1723
    pub sleep_fn: SleepFn,
1724
}
1725
1726
impl Debug for Callbacks {
1727
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1728
0
        f.debug_struct("Callbacks").finish_non_exhaustive()
1729
0
    }
1730
}
1731
1732
/// The set of additional information for executing an action over and above
1733
/// those given in the `ActionInfo` passed to the worker.  This allows
1734
/// modification of the action for execution on this particular worker.  This
1735
/// may be used to run the action with a particular set of additional
1736
/// environment variables, or perhaps configure it to execute within a
1737
/// container.
1738
#[derive(Debug, Default)]
1739
pub struct ExecutionConfiguration {
1740
    /// If set, will be executed instead of the first argument passed in the
1741
    /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as
1742
    /// arguments to this command.
1743
    pub entrypoint: Option<String>,
1744
    /// The only environment variables that will be specified when the command
1745
    /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`
1746
    /// and PATH are also assigned (see `inner_execute`).
1747
    pub additional_environment: Option<HashMap<String, EnvironmentSource>>,
1748
}
1749
1750
#[derive(Debug)]
1751
struct UploadActionResults {
1752
    upload_ac_results_strategy: UploadCacheResultsStrategy,
1753
    upload_historical_results_strategy: UploadCacheResultsStrategy,
1754
    ac_store: Option<Store>,
1755
    historical_store: Store,
1756
    success_message_template: Template,
1757
    failure_message_template: Template,
1758
}
1759
1760
impl UploadActionResults {
1761
27
    fn new(
1762
27
        config: &UploadActionResultConfig,
1763
27
        ac_store: Option<Store>,
1764
27
        historical_store: Store,
1765
27
    ) -> Result<Self, Error> {
1766
27
        let upload_historical_results_strategy = config
1767
27
            .upload_historical_results_strategy
1768
27
            .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);
1769
8
        if !matches!(
1770
27
            config.upload_ac_results_strategy,
1771
            UploadCacheResultsStrategy::Never
1772
8
        ) && ac_store.is_none()
1773
        {
1774
0
            return Err(make_input_err!(
1775
0
                "upload_ac_results_strategy is set, but no ac_store is configured"
1776
0
            ));
1777
27
        }
1778
        Ok(Self {
1779
27
            upload_ac_results_strategy: config.upload_ac_results_strategy,
1780
27
            upload_historical_results_strategy,
1781
27
            ac_store,
1782
27
            historical_store,
1783
27
            success_message_template: Template::new(&config.success_message_template).map_err(
1784
0
                |e| {
1785
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(format!(
1786
                        "Could not convert success_message_template to rust template: {}",
1787
                        config.success_message_template
1788
                    ))
1789
0
                },
1790
0
            )?,
1791
27
            failure_message_template: Template::new(&config.failure_message_template).map_err(
1792
0
                |e| {
1793
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(format!(
1794
                        "Could not convert failure_message_template to rust template: {}",
1795
                        config.success_message_template
1796
                    ))
1797
0
                },
1798
0
            )?,
1799
        })
1800
27
    }
1801
1802
0
    const fn should_cache_result(
1803
0
        strategy: UploadCacheResultsStrategy,
1804
0
        action_result: &ActionResult,
1805
0
        treat_infra_error_as_failure: bool,
1806
0
    ) -> bool {
1807
0
        let did_fail = action_result.exit_code != 0
1808
0
            || (treat_infra_error_as_failure && action_result.error.is_some());
1809
0
        match strategy {
1810
0
            UploadCacheResultsStrategy::SuccessOnly => !did_fail,
1811
0
            UploadCacheResultsStrategy::Never => false,
1812
            // Never cache internal errors or timeouts.
1813
            UploadCacheResultsStrategy::Everything => {
1814
0
                treat_infra_error_as_failure || action_result.error.is_none()
1815
            }
1816
0
            UploadCacheResultsStrategy::FailuresOnly => did_fail,
1817
        }
1818
0
    }
1819
1820
    /// Formats the message field in `ExecuteResponse` from the `success_message_template`
1821
    /// or `failure_message_template` config templates.
1822
5
    fn format_execute_response_message(
1823
5
        mut template_str: Template,
1824
5
        action_digest_info: DigestInfo,
1825
5
        maybe_historical_digest_info: Option<DigestInfo>,
1826
5
        hasher: DigestHasherFunc,
1827
5
    ) -> Result<String, Error> {
1828
5
        template_str.replace(
1829
            "digest_function",
1830
5
            hasher.proto_digest_func().as_str_name().to_lowercase(),
1831
        );
1832
5
        template_str.replace(
1833
            "action_digest_hash",
1834
5
            action_digest_info.packed_hash().to_string(),
1835
        );
1836
5
        template_str.replace("action_digest_size", action_digest_info.size_bytes());
1837
5
        if let Some(
historical_digest_info3
) = maybe_historical_digest_info {
1838
3
            template_str.replace(
1839
3
                "historical_results_hash",
1840
3
                format!("{}", historical_digest_info.packed_hash()),
1841
3
            );
1842
3
            template_str.replace(
1843
3
                "historical_results_size",
1844
3
                historical_digest_info.size_bytes(),
1845
3
            );
1846
3
        } else {
1847
2
            template_str.replace("historical_results_hash", "");
1848
2
            template_str.replace("historical_results_size", "");
1849
2
        }
1850
5
        template_str.text().map_err(|e| 
{0
1851
0
            Error::from_std_err(Code::InvalidArgument, &e)
1852
0
                .append("Could not convert template to text")
1853
0
        })
1854
5
    }
1855
1856
0
    async fn upload_ac_results(
1857
0
        &self,
1858
0
        action_digest: DigestInfo,
1859
0
        action_result: ProtoActionResult,
1860
0
        hasher: DigestHasherFunc,
1861
5
    ) -> Result<(), Error> {
1862
5
        let Some(ac_store) = self.ac_store.as_ref() else {
1863
0
            return Ok(());
1864
        };
1865
        // If we are a GrpcStore we shortcut here, as this is a special store.
1866
5
        if let Some(
grpc_store0
) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
1867
0
            let update_action_request = UpdateActionResultRequest {
1868
0
                // This is populated by `update_action_result`.
1869
0
                instance_name: String::new(),
1870
0
                action_digest: Some(action_digest.into()),
1871
0
                action_result: Some(action_result),
1872
0
                results_cache_policy: None,
1873
0
                digest_function: hasher.proto_digest_func().into(),
1874
0
            };
1875
0
            return grpc_store
1876
0
                .update_action_result(Request::new(update_action_request))
1877
0
                .await
1878
0
                .map(|_| ())
1879
0
                .err_tip(|| "Caching ActionResult");
1880
5
        }
1881
1882
5
        let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
1883
5
        action_result
1884
5
            .encode(&mut store_data)
1885
5
            .err_tip(|| "Encoding ActionResult for caching")
?0
;
1886
1887
5
        ac_store
1888
5
            .update_oneshot(action_digest, store_data.split().freeze())
1889
5
            .await
1890
5
            .err_tip(|| "Caching ActionResult")
1891
5
    }
1892
1893
3
    async fn upload_historical_results_with_message(
1894
3
        &self,
1895
3
        action_digest: DigestInfo,
1896
3
        execute_response: ExecuteResponse,
1897
3
        message_template: Template,
1898
3
        hasher: DigestHasherFunc,
1899
3
    ) -> Result<String, Error> {
1900
3
        let historical_digest_info = serialize_and_upload_message(
1901
3
            &HistoricalExecuteResponse {
1902
3
                action_digest: Some(action_digest.into()),
1903
3
                execute_response: Some(execute_response.clone()),
1904
3
            },
1905
3
            self.historical_store.as_pin(),
1906
3
            &mut hasher.hasher(),
1907
3
        )
1908
3
        .await
1909
3
        .err_tip(|| 
format!0
("Caching HistoricalExecuteResponse for digest: {action_digest}"))
?0
;
1910
1911
3
        Self::format_execute_response_message(
1912
3
            message_template,
1913
3
            action_digest,
1914
3
            Some(historical_digest_info),
1915
3
            hasher,
1916
        )
1917
3
        .err_tip(|| "Could not format message in upload_historical_results_with_message")
1918
3
    }
1919
1920
0
    async fn cache_action_result(
1921
0
        &self,
1922
0
        action_info: DigestInfo,
1923
0
        action_result: &mut ActionResult,
1924
0
        hasher: DigestHasherFunc,
1925
6
    ) -> Result<(), Error> {
1926
6
        let should_upload_historical_results =
1927
6
            Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);
1928
6
        let should_upload_ac_results =
1929
6
            Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);
1930
        // Shortcut so we don't need to convert to proto if not needed.
1931
6
        if !should_upload_ac_results && 
!should_upload_historical_results1
{
1932
1
            return Ok(());
1933
5
        }
1934
1935
5
        let mut execute_response = to_execute_response(action_result.clone());
1936
1937
        // In theory exit code should always be != 0 if there's an error, but for safety we
1938
        // catch both.
1939
5
        let message_template = if action_result.exit_code == 0 && 
action_result.error4
.
is_none4
() {
1940
3
            self.success_message_template.clone()
1941
        } else {
1942
2
            self.failure_message_template.clone()
1943
        };
1944
1945
5
        let upload_historical_results_with_message_result = if should_upload_historical_results {
1946
3
            let maybe_message = self
1947
3
                .upload_historical_results_with_message(
1948
3
                    action_info,
1949
3
                    execute_response.clone(),
1950
3
                    message_template,
1951
3
                    hasher,
1952
3
                )
1953
3
                .await;
1954
3
            match maybe_message {
1955
3
                Ok(message) => {
1956
3
                    action_result.message.clone_from(&message);
1957
3
                    execute_response.message = message;
1958
3
                    Ok(())
1959
                }
1960
0
                Err(e) => Result::<(), Error>::Err(e),
1961
            }
1962
        } else {
1963
2
            match Self::format_execute_response_message(message_template, action_info, None, hasher)
1964
            {
1965
2
                Ok(message) => {
1966
2
                    action_result.message.clone_from(&message);
1967
2
                    execute_response.message = message;
1968
2
                    Ok(())
1969
                }
1970
0
                Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),
1971
            }
1972
        };
1973
1974
        // Note: Done in this order because we assume most results will succeed and most configs will
1975
        // either always upload upload historical results or only upload on failure. In which case
1976
        // we can avoid an extra clone of the protos by doing this last with the above assumption.
1977
5
        let ac_upload_results = if should_upload_ac_results {
1978
5
            self.upload_ac_results(
1979
5
                action_info,
1980
5
                execute_response
1981
5
                    .result
1982
5
                    .err_tip(|| "No result set in cache_action_result")
?0
,
1983
5
                hasher,
1984
            )
1985
5
            .await
1986
        } else {
1987
0
            Ok(())
1988
        };
1989
5
        upload_historical_results_with_message_result.merge(ac_upload_results)
1990
6
    }
1991
}
1992
1993
#[cfg(target_os = "linux")]
1994
#[derive(Copy, Clone, Debug)]
1995
pub enum UseNamespaces {
1996
    No,
1997
    Yes,
1998
    YesAndMount,
1999
}
2000
2001
#[derive(Debug)]
2002
pub struct RunningActionsManagerArgs<'a> {
2003
    pub root_action_directory: String,
2004
    pub execution_configuration: ExecutionConfiguration,
2005
    pub cas_store: Arc<FastSlowStore>,
2006
    pub ac_store: Option<Store>,
2007
    pub historical_store: Store,
2008
    pub upload_action_result_config: &'a UploadActionResultConfig,
2009
    pub max_action_timeout: Duration,
2010
    pub max_upload_timeout: Duration,
2011
    pub timeout_handled_externally: bool,
2012
    pub directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
2013
    #[cfg(target_os = "linux")]
2014
    pub use_namespaces: UseNamespaces,
2015
}
2016
2017
struct CleanupGuard {
2018
    manager: Weak<RunningActionsManagerImpl>,
2019
    operation_id: OperationId,
2020
}
2021
2022
impl Drop for CleanupGuard {
2023
17
    fn drop(&mut self) {
2024
17
        let Some(manager) = self.manager.upgrade() else {
2025
0
            return;
2026
        };
2027
17
        let mut cleaning = manager.cleaning_up_operations.lock();
2028
17
        cleaning.remove(&self.operation_id);
2029
17
        manager.cleanup_complete_notify.notify_waiters();
2030
17
    }
2031
}
2032
2033
/// Holds state info about what is being executed and the interface for interacting
2034
/// with actions while they are running.
2035
#[derive(Debug)]
2036
pub struct RunningActionsManagerImpl {
2037
    root_action_directory: String,
2038
    execution_configuration: ExecutionConfiguration,
2039
    cas_store: Arc<FastSlowStore>,
2040
    filesystem_store: Arc<FilesystemStore>,
2041
    upload_action_results: UploadActionResults,
2042
    max_action_timeout: Duration,
2043
    max_upload_timeout: Duration,
2044
    timeout_handled_externally: bool,
2045
    #[cfg(target_os = "linux")]
2046
    use_namespaces: UseNamespaces,
2047
    running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,
2048
    // Note: We don't use Notify because we need to support a .wait_for()-like function, which
2049
    // Notify does not support.
2050
    action_done_tx: watch::Sender<()>,
2051
    callbacks: Callbacks,
2052
    metrics: Arc<Metrics>,
2053
    /// Track operations being cleaned up to avoid directory collisions during action retries.
2054
    /// When an action fails and is retried on the same worker, we need to ensure the previous
2055
    /// attempt's directory is fully cleaned up before creating a new one.
2056
    /// See: <https://github.com/TraceMachina/nativelink/issues/1859>
2057
    cleaning_up_operations: Mutex<HashSet<OperationId>>,
2058
    /// Notify waiters when a cleanup operation completes. This is used in conjunction with
2059
    /// `cleaning_up_operations` to coordinate directory cleanup and creation.
2060
    cleanup_complete_notify: Arc<Notify>,
2061
    /// Optional directory cache for improving performance by caching reconstructed
2062
    /// input directories and using hardlinks.
2063
    directory_cache: Option<Arc<crate::directory_cache::DirectoryCache>>,
2064
}
2065
2066
impl RunningActionsManagerImpl {
2067
    /// Maximum time to wait for a cleanup operation to complete before timing out.
2068
    /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
2069
    const MAX_WAIT: Duration = Duration::from_secs(30);
2070
    /// Maximum backoff duration for exponential backoff when waiting for cleanup.
2071
    const MAX_BACKOFF: Duration = Duration::from_millis(500);
2072
27
    pub fn new_with_callbacks(
2073
27
        args: RunningActionsManagerArgs<'_>,
2074
27
        callbacks: Callbacks,
2075
27
    ) -> Result<Self, Error> {
2076
        // Sadly because of some limitations of how Any works we need to clone more times than optimal.
2077
27
        let filesystem_store = args
2078
27
            .cas_store
2079
27
            .fast_store()
2080
27
            .downcast_ref::<FilesystemStore>(None)
2081
27
            .err_tip(
2082
                || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",
2083
0
            )?
2084
27
            .get_arc()
2085
27
            .err_tip(|| "FilesystemStore's internal Arc was lost")
?0
;
2086
27
        let (action_done_tx, _) = watch::channel(());
2087
        Ok(Self {
2088
27
            root_action_directory: args.root_action_directory,
2089
27
            execution_configuration: args.execution_configuration,
2090
27
            cas_store: args.cas_store,
2091
27
            filesystem_store,
2092
27
            upload_action_results: UploadActionResults::new(
2093
27
                args.upload_action_result_config,
2094
27
                args.ac_store,
2095
27
                args.historical_store,
2096
            )
2097
27
            .err_tip(|| "During RunningActionsManagerImpl construction")
?0
,
2098
27
            max_action_timeout: args.max_action_timeout,
2099
27
            max_upload_timeout: args.max_upload_timeout,
2100
27
            timeout_handled_externally: args.timeout_handled_externally,
2101
27
            running_actions: Mutex::new(HashMap::new()),
2102
27
            action_done_tx,
2103
27
            callbacks,
2104
27
            metrics: Arc::new(Metrics::default()),
2105
27
            cleaning_up_operations: Mutex::new(HashSet::new()),
2106
27
            cleanup_complete_notify: Arc::new(Notify::new()),
2107
27
            directory_cache: args.directory_cache,
2108
            #[cfg(target_os = "linux")]
2109
27
            use_namespaces: args.use_namespaces,
2110
        })
2111
27
    }
2112
2113
13
    pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> {
2114
13
        Self::new_with_callbacks(
2115
13
            args,
2116
            Callbacks {
2117
13
                now_fn: SystemTime::now,
2118
2
                sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
2119
            },
2120
        )
2121
13
    }
2122
2123
    /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
2124
    /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
2125
    /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
2126
20
    async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error> {
2127
20
        let start = Instant::now();
2128
20
        let mut backoff = Duration::from_millis(10);
2129
20
        let mut has_waited = false;
2130
2131
        loop {
2132
20
            let should_wait = {
2133
20
                let cleaning = self.cleaning_up_operations.lock();
2134
20
                cleaning.contains(operation_id)
2135
            };
2136
2137
20
            if !should_wait {
2138
20
                let dir_path =
2139
20
                    PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
2140
2141
20
                if !dir_path.exists() {
2142
19
                    return Ok(());
2143
1
                }
2144
2145
                // Safety check: ensure we're only removing directories under root_action_directory
2146
1
                let root_path = Path::new(&self.root_action_directory);
2147
1
                let canonical_root = root_path.canonicalize().err_tip(|| 
{0
2148
0
                    format!(
2149
                        "Failed to canonicalize root directory: {}",
2150
                        self.root_action_directory
2151
                    )
2152
0
                })?;
2153
1
                let canonical_dir = dir_path.canonicalize().err_tip(|| 
{0
2154
0
                    format!("Failed to canonicalize directory: {}", dir_path.display())
2155
0
                })?;
2156
2157
1
                if !canonical_dir.starts_with(&canonical_root) {
2158
0
                    return Err(make_err!(
2159
0
                        Code::Internal,
2160
0
                        "Attempted to remove directory outside of root_action_directory: {}",
2161
0
                        dir_path.display()
2162
0
                    ));
2163
1
                }
2164
2165
                // Directory exists but not being cleaned - remove it
2166
1
                warn!(
2167
                    "Removing stale directory for {}: {}",
2168
                    operation_id,
2169
1
                    dir_path.display()
2170
                );
2171
1
                self.metrics.stale_removals.inc();
2172
2173
                // Try to remove the directory, with one retry on failure
2174
1
                let remove_result = fs::remove_dir_all(&dir_path).await;
2175
1
                if let Err(
e0
) = remove_result {
2176
                    // Retry once after a short delay in case the directory is temporarily locked
2177
0
                    tokio::time::sleep(Duration::from_millis(100)).await;
2178
0
                    fs::remove_dir_all(&dir_path).await.err_tip(|| {
2179
0
                        format!(
2180
                            "Failed to remove stale directory {} for retry of {} after retry (original error: {})",
2181
0
                            dir_path.display(),
2182
                            operation_id,
2183
                            e
2184
                        )
2185
0
                    })?;
2186
1
                }
2187
1
                return Ok(());
2188
0
            }
2189
2190
0
            if start.elapsed() > Self::MAX_WAIT {
2191
0
                self.metrics.cleanup_wait_timeouts.inc();
2192
0
                warn!(%operation_id, waited=?start.elapsed(), "Timeout waiting for previous operation cleanup");
2193
0
                return Err(make_err!(
2194
0
                    Code::DeadlineExceeded,
2195
0
                    "Timeout waiting for previous operation cleanup: {} (waited {:?})",
2196
0
                    operation_id,
2197
0
                    start.elapsed()
2198
0
                ));
2199
0
            }
2200
2201
0
            if !has_waited {
2202
0
                self.metrics.cleanup_waits.inc();
2203
0
                has_waited = true;
2204
0
            }
2205
2206
0
            trace!(
2207
                "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
2208
                operation_id,
2209
0
                start.elapsed(),
2210
                backoff
2211
            );
2212
2213
0
            tokio::select! {
2214
0
                () = self.cleanup_complete_notify.notified() => {},
2215
0
                () = tokio::time::sleep(backoff) => {
2216
0
                    // Exponential backoff
2217
0
                    backoff = (backoff * 2).min(Self::MAX_BACKOFF);
2218
0
                },
2219
            }
2220
        }
2221
20
    }
2222
2223
0
    fn make_action_directory<'a>(
2224
0
        &'a self,
2225
0
        operation_id: &'a OperationId,
2226
0
    ) -> impl Future<Output = Result<String, Error>> + 'a {
2227
20
        
self.metrics.make_action_directory0
.
wrap0
(async move {
2228
20
            let action_directory = format!("{}/{}", self.root_action_directory, operation_id);
2229
20
            fs::create_dir(&action_directory)
2230
20
                .await
2231
20
                .err_tip(|| 
format!0
("Error creating action directory {action_directory}"))
?0
;
2232
20
            Ok(action_directory)
2233
20
        })
2234
0
    }
2235
2236
20
    fn create_action_info(
2237
20
        &self,
2238
20
        start_execute: StartExecute,
2239
20
        queued_timestamp: SystemTime,
2240
20
    ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ {
2241
20
        self.metrics.create_action_info.wrap(async move {
2242
20
            let execute_request = start_execute
2243
20
                .execute_request
2244
20
                .err_tip(|| "Expected execute_request to exist in StartExecute")
?0
;
2245
20
            let action_digest: DigestInfo = execute_request
2246
20
                .action_digest
2247
20
                .clone()
2248
20
                .err_tip(|| "Expected action_digest to exist on StartExecute")
?0
2249
20
                .try_into()
?0
;
2250
20
            let load_start_timestamp = (self.callbacks.now_fn)();
2251
20
            let action =
2252
20
                get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())
2253
20
                    .await
2254
20
                    .err_tip(|| "During start_action")
?0
;
2255
20
            let action_info = ActionInfo::try_from_action_and_execute_request(
2256
20
                execute_request,
2257
20
                action,
2258
20
                load_start_timestamp,
2259
20
                queued_timestamp,
2260
            )
2261
20
            .err_tip(|| "Could not create ActionInfo in create_and_add_action()")
?0
;
2262
20
            Ok(action_info)
2263
20
        })
2264
20
    }
2265
2266
19
    fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> {
2267
19
        let mut running_actions = self.running_actions.lock();
2268
19
        let result = running_actions.remove(operation_id).err_tip(|| 
{0
2269
0
            format!("Expected operation id '{operation_id}' to exist in RunningActionsManagerImpl")
2270
0
        });
2271
        // No need to copy anything, we just are telling the receivers an event happened.
2272
19
        self.action_done_tx.send_modify(|()| {});
2273
19
        result.map(|_| ())
2274
19
    }
2275
2276
    // Note: We do not capture metrics on this call, only `.kill_all()`.
2277
    // Important: When the future returns the process may still be running.
2278
2
    
async fn kill_operation(action: Arc<RunningActionImpl>)0
{
2279
2
        warn!(
2280
2
            operation_id = ?action.operation_id,
2281
            "Sending kill to running operation",
2282
        );
2283
2
        let kill_channel_tx = {
2284
2
            let mut action_state = action.state.lock();
2285
2
            action_state.kill_channel_tx.take()
2286
        };
2287
2
        if let Some(kill_channel_tx) = kill_channel_tx
2288
2
            && kill_channel_tx.send(()).is_err()
2289
        {
2290
0
            error!(
2291
0
                operation_id = ?action.operation_id,
2292
                "Error sending kill to running operation",
2293
            );
2294
2
        }
2295
2
    }
2296
2297
17
    fn perform_cleanup(self: &Arc<Self>, operation_id: OperationId) -> Option<CleanupGuard> {
2298
17
        let mut cleaning = self.cleaning_up_operations.lock();
2299
17
        cleaning
2300
17
            .insert(operation_id.clone())
2301
17
            .then_some(CleanupGuard {
2302
17
                manager: Arc::downgrade(self),
2303
17
                operation_id,
2304
17
            })
2305
17
    }
2306
}
2307
2308
impl RunningActionsManager for RunningActionsManagerImpl {
2309
    type RunningAction = RunningActionImpl;
2310
2311
20
    async fn create_and_add_action(
2312
20
        self: &Arc<Self>,
2313
20
        worker_id: String,
2314
20
        start_execute: StartExecute,
2315
20
    ) -> Result<Arc<RunningActionImpl>, Error> {
2316
20
        self.metrics
2317
20
            .create_and_add_action
2318
20
            .wrap(async move {
2319
20
                let queued_timestamp = start_execute
2320
20
                    .queued_timestamp
2321
20
                    .and_then(|time| 
time13
.
try_into13
().
ok13
())
2322
20
                    .unwrap_or(SystemTime::UNIX_EPOCH);
2323
20
                let operation_id = start_execute
2324
20
                    .operation_id.as_str().into();
2325
20
                let action_info = self.create_action_info(start_execute, queued_timestamp).await
?0
;
2326
20
                debug!(
2327
                    ?action_info,
2328
                    "Worker received action",
2329
                );
2330
                // Wait for any previous cleanup to complete before creating directory
2331
20
                self.wait_for_cleanup_if_needed(&operation_id).await
?0
;
2332
20
                let action_directory = self.make_action_directory(&operation_id).await
?0
;
2333
20
                let execution_metadata = ExecutionMetadata {
2334
20
                    worker: worker_id,
2335
20
                    queued_timestamp: action_info.insert_timestamp,
2336
20
                    worker_start_timestamp: action_info.load_timestamp,
2337
20
                    worker_completed_timestamp: SystemTime::UNIX_EPOCH,
2338
20
                    input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
2339
20
                    input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
2340
20
                    execution_start_timestamp: SystemTime::UNIX_EPOCH,
2341
20
                    execution_completed_timestamp: SystemTime::UNIX_EPOCH,
2342
20
                    output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
2343
20
                    output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
2344
20
                };
2345
20
                let timeout = if action_info.timeout.is_zero() || 
self.timeout_handled_externally3
{
2346
17
                    self.max_action_timeout
2347
                } else {
2348
3
                    action_info.timeout
2349
                };
2350
20
                if timeout > self.max_action_timeout {
2351
1
                    return Err(make_err!(
2352
1
                        Code::InvalidArgument,
2353
1
                        "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds",
2354
1
                        timeout.as_secs_f32(),
2355
1
                        self.max_action_timeout.as_secs_f32()
2356
1
                    ));
2357
19
                }
2358
19
                let running_action = Arc::new(RunningActionImpl::new(
2359
19
                    execution_metadata,
2360
19
                    operation_id.clone(),
2361
19
                    action_directory,
2362
19
                    action_info,
2363
19
                    timeout,
2364
19
                    self.clone(),
2365
                ));
2366
                {
2367
19
                    let mut running_actions = self.running_actions.lock();
2368
                    // Check if action already exists and is still alive
2369
19
                    if let Some(
existing_weak0
) = running_actions.get(&operation_id)
2370
0
                        && let Some(_existing_action) = existing_weak.upgrade() {
2371
0
                            return Err(make_err!(
2372
0
                                Code::AlreadyExists,
2373
0
                                "Action with operation_id {} is already running",
2374
0
                                operation_id
2375
0
                            ));
2376
19
                    }
2377
19
                    running_actions.insert(operation_id, Arc::downgrade(&running_action));
2378
                }
2379
19
                Ok(running_action)
2380
20
            })
2381
20
            .await
2382
20
    }
2383
2384
6
    async fn cache_action_result(
2385
6
        &self,
2386
6
        action_info: DigestInfo,
2387
6
        action_result: &mut ActionResult,
2388
6
        hasher: DigestHasherFunc,
2389
6
    ) -> Result<(), Error> {
2390
6
        self.metrics
2391
6
            .cache_action_result
2392
6
            .wrap(self.upload_action_results.cache_action_result(
2393
6
                action_info,
2394
6
                action_result,
2395
6
                hasher,
2396
6
            ))
2397
6
            .await
2398
6
    }
2399
2400
0
    async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> {
2401
0
        let running_action = {
2402
0
            let running_actions = self.running_actions.lock();
2403
0
            running_actions
2404
0
                .get(operation_id)
2405
0
                .and_then(Weak::upgrade)
2406
0
                .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))?
2407
        };
2408
0
        Self::kill_operation(running_action).await;
2409
0
        Ok(())
2410
0
    }
2411
2412
    // Note: When the future returns the process should be fully killed and cleaned up.
2413
2
    async fn kill_all(&self) {
2414
2
        self.metrics
2415
2
            .kill_all
2416
2
            .wrap_no_capture_result(async move {
2417
2
                let kill_operations: Vec<Arc<RunningActionImpl>> = {
2418
2
                    let running_actions = self.running_actions.lock();
2419
2
                    running_actions
2420
2
                        .iter()
2421
2
                        .filter_map(|(_operation_id, action)| action.upgrade())
2422
2
                        .collect()
2423
                };
2424
2
                let mut kill_futures: FuturesUnordered<_> = kill_operations
2425
2
                    .into_iter()
2426
2
                    .map(Self::kill_operation)
2427
2
                    .collect();
2428
4
                while kill_futures.next().await.is_some() 
{}2
2429
2
            })
2430
2
            .await;
2431
        // Ignore error. If error happens it means there's no sender, which is not a problem.
2432
        // Note: Sanity check this API will always check current value then future values:
2433
        // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2
2434
2
        drop(
2435
2
            self.action_done_tx
2436
2
                .subscribe()
2437
4
                .
wait_for2
(|()| self.running_actions.lock().is_empty())
2438
2
                .await,
2439
        );
2440
2
    }
2441
2442
    #[inline]
2443
2
    fn metrics(&self) -> &Arc<Metrics> {
2444
2
        &self.metrics
2445
2
    }
2446
}
2447
2448
#[derive(Debug, Default, MetricsComponent)]
2449
pub struct Metrics {
2450
    #[metric(help = "Stats about the create_and_add_action command.")]
2451
    create_and_add_action: AsyncCounterWrapper,
2452
    #[metric(help = "Stats about the cache_action_result command.")]
2453
    cache_action_result: AsyncCounterWrapper,
2454
    #[metric(help = "Stats about the kill_all command.")]
2455
    kill_all: AsyncCounterWrapper,
2456
    #[metric(help = "Stats about the create_action_info command.")]
2457
    create_action_info: AsyncCounterWrapper,
2458
    #[metric(help = "Stats about the make_work_directory command.")]
2459
    make_action_directory: AsyncCounterWrapper,
2460
    #[metric(help = "Stats about the prepare_action command.")]
2461
    prepare_action: AsyncCounterWrapper,
2462
    #[metric(help = "Stats about the execute command.")]
2463
    execute: AsyncCounterWrapper,
2464
    #[metric(help = "Stats about the upload_results command.")]
2465
    upload_results: AsyncCounterWrapper,
2466
    #[metric(help = "Stats about the cleanup command.")]
2467
    cleanup: AsyncCounterWrapper,
2468
    #[metric(help = "Stats about the get_finished_result command.")]
2469
    get_finished_result: AsyncCounterWrapper,
2470
    #[metric(help = "Number of times an action waited for cleanup to complete.")]
2471
    cleanup_waits: CounterWithTime,
2472
    #[metric(help = "Number of stale directories removed during action retries.")]
2473
    stale_removals: CounterWithTime,
2474
    #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2475
    cleanup_wait_timeouts: CounterWithTime,
2476
    #[metric(help = "Stats about the get_proto_command_from_store command.")]
2477
    get_proto_command_from_store: AsyncCounterWrapper,
2478
    #[metric(help = "Stats about the download_to_directory command.")]
2479
    download_to_directory: AsyncCounterWrapper,
2480
    #[metric(help = "Stats about the prepare_output_files command.")]
2481
    prepare_output_files: AsyncCounterWrapper,
2482
    #[metric(help = "Stats about the prepare_output_paths command.")]
2483
    prepare_output_paths: AsyncCounterWrapper,
2484
    #[metric(help = "Stats about the child_process command.")]
2485
    child_process: AsyncCounterWrapper,
2486
    #[metric(help = "Stats about the child_process_success_error_code command.")]
2487
    child_process_success_error_code: CounterWithTime,
2488
    #[metric(help = "Stats about the child_process_failure_error_code command.")]
2489
    child_process_failure_error_code: CounterWithTime,
2490
    #[metric(help = "Total time spent uploading stdout.")]
2491
    upload_stdout: AsyncCounterWrapper,
2492
    #[metric(help = "Total time spent uploading stderr.")]
2493
    upload_stderr: AsyncCounterWrapper,
2494
    #[metric(help = "Total number of task timeouts.")]
2495
    task_timeouts: CounterWithTime,
2496
}