/build/source/nativelink-worker/src/running_actions_manager.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::cmp::min;  | 
16  |  | use core::convert::Into;  | 
17  |  | use core::fmt::Debug;  | 
18  |  | use core::pin::Pin;  | 
19  |  | use core::sync::atomic::{AtomicBool, Ordering}; | 
20  |  | use core::time::Duration;  | 
21  |  | use std::borrow::Cow;  | 
22  |  | use std::collections::vec_deque::VecDeque;  | 
23  |  | use std::collections::{HashMap, HashSet}; | 
24  |  | use std::ffi::{OsStr, OsString}; | 
25  |  | #[cfg(target_family = "unix")]  | 
26  |  | use std::fs::Permissions;  | 
27  |  | #[cfg(target_family = "unix")]  | 
28  |  | use std::os::unix::fs::{MetadataExt, PermissionsExt}; | 
29  |  | use std::path::{Path, PathBuf}; | 
30  |  | use std::process::Stdio;  | 
31  |  | use std::sync::{Arc, Weak}; | 
32  |  | use std::time::SystemTime;  | 
33  |  |  | 
34  |  | use bytes::{Bytes, BytesMut}; | 
35  |  | use filetime::{FileTime, set_file_mtime}; | 
36  |  | use formatx::Template;  | 
37  |  | use futures::future::{ | 
38  |  |     BoxFuture, Future, FutureExt, TryFutureExt, try_join, try_join_all, try_join3,  | 
39  |  | };  | 
40  |  | use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt}; | 
41  |  | use nativelink_config::cas_server::{ | 
42  |  |     EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy,  | 
43  |  | };  | 
44  |  | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; | 
45  |  | use nativelink_metric::MetricsComponent;  | 
46  |  | use nativelink_proto::build::bazel::remote::execution::v2::{ | 
47  |  |     Action, ActionResult as ProtoActionResult, Command as ProtoCommand,  | 
48  |  |     Directory as ProtoDirectory, Directory, DirectoryNode, ExecuteResponse, FileNode, SymlinkNode,  | 
49  |  |     Tree as ProtoTree, UpdateActionResultRequest,  | 
50  |  | };  | 
51  |  | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ | 
52  |  |     HistoricalExecuteResponse, StartExecute,  | 
53  |  | };  | 
54  |  | use nativelink_store::ac_utils::{ | 
55  |  |     ESTIMATED_DIGEST_SIZE, compute_buf_digest, get_and_decode_digest, serialize_and_upload_message,  | 
56  |  | };  | 
57  |  | use nativelink_store::fast_slow_store::FastSlowStore;  | 
58  |  | use nativelink_store::filesystem_store::{FileEntry, FilesystemStore}; | 
59  |  | use nativelink_store::grpc_store::GrpcStore;  | 
60  |  | use nativelink_util::action_messages::{ | 
61  |  |     ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId,  | 
62  |  |     SymlinkInfo, to_execute_response,  | 
63  |  | };  | 
64  |  | use nativelink_util::common::{DigestInfo, fs}; | 
65  |  | use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; | 
66  |  | use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime}; | 
67  |  | use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo}; | 
68  |  | use nativelink_util::{background_spawn, spawn, spawn_blocking}; | 
69  |  | use parking_lot::Mutex;  | 
70  |  | use prost::Message;  | 
71  |  | use relative_path::RelativePath;  | 
72  |  | use scopeguard::{ScopeGuard, guard}; | 
73  |  | use serde::Deserialize;  | 
74  |  | use tokio::io::{AsyncReadExt, AsyncSeekExt}; | 
75  |  | use tokio::process;  | 
76  |  | use tokio::sync::{Notify, oneshot, watch}; | 
77  |  | use tokio::time::Instant;  | 
78  |  | use tokio_stream::wrappers::ReadDirStream;  | 
79  |  | use tonic::Request;  | 
80  |  | use tracing::{debug, error, info, trace, warn}; | 
81  |  | use uuid::Uuid;  | 
82  |  |  | 
83  |  | /// For simplicity we use a fixed exit code for cases when our program is terminated  | 
84  |  | /// due to a signal.  | 
85  |  | const EXIT_CODE_FOR_SIGNAL: i32 = 9;  | 
86  |  |  | 
87  |  | /// Default strategy for uploading historical results.  | 
88  |  | /// Note: If this value changes the config documentation  | 
89  |  | /// should reflect it.  | 
90  |  | const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy =  | 
91  |  |     UploadCacheResultsStrategy::FailuresOnly;  | 
92  |  |  | 
93  |  | /// Valid string reasons for a failure.  | 
94  |  | /// Note: If these change, the documentation should be updated.  | 
95  |  | #[derive(Debug, Deserialize)]  | 
96  |  | #[serde(rename_all = "snake_case")]  | 
97  |  | enum SideChannelFailureReason { | 
98  |  |     /// Task should be considered timed out.  | 
99  |  |     Timeout,  | 
100  |  | }  | 
101  |  |  | 
102  |  | /// This represents the json data that can be passed from the running process  | 
103  |  | /// to the parent via the `SideChannelFile`. See:  | 
104  |  | /// `config::EnvironmentSource::sidechannelfile` for more details.  | 
105  |  | /// Note: Any fields added here must be added to the documentation.  | 
106  |  | #[derive(Debug, Deserialize, Default)]  | 
107  |  | struct SideChannelInfo { | 
108  |  |     /// If the task should be considered a failure and why.  | 
109  |  |     failure: Option<SideChannelFailureReason>,  | 
110  |  | }  | 
111  |  |  | 
112  |  | /// Aggressively download the digests of files and make a local folder from it. This function  | 
113  |  | /// will spawn unbounded number of futures to try and get these downloaded. The store itself  | 
114  |  | /// should be rate limited if spawning too many requests at once is an issue.  | 
115  |  | /// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for  | 
116  |  | /// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will  | 
117  |  | /// assume the `FilesystemStore` has the file available immediately after and hardlink the file  | 
118  |  | /// to a new location.  | 
119  |  | // Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits  | 
120  |  | // of the future. So we need to force this function to return a dynamic future instead.  | 
121  |  | // see: https://github.com/rust-lang/rust/issues/78649  | 
122  | 25  | pub fn download_to_directory<'a>(  | 
123  | 25  |     cas_store: &'a FastSlowStore,  | 
124  | 25  |     filesystem_store: Pin<&'a FilesystemStore>,  | 
125  | 25  |     digest: &'a DigestInfo,  | 
126  | 25  |     current_directory: &'a str,  | 
127  | 25  | ) -> BoxFuture<'a, Result<(), Error>> { | 
128  | 25  |     async move { | 
129  | 25  |         let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest.into())  | 
130  | 25  |             .await  | 
131  | 25  |             .err_tip(|| "Converting digest to Directory")?0 ;  | 
132  | 25  |         let mut futures = FuturesUnordered::new();  | 
133  |  |  | 
134  | 29  |         for file4  in directory.files {  | 
135  | 4  |             let digest: DigestInfo = file  | 
136  | 4  |                 .digest  | 
137  | 4  |                 .err_tip(|| "Expected Digest to exist in Directory::file::digest")?0   | 
138  | 4  |                 .try_into()  | 
139  | 4  |                 .err_tip(|| "In Directory::file::digest")?0 ;  | 
140  | 4  |             let dest = format!("{}/{}", current_directory, file.name); | 
141  | 4  |             let (mtime, mut unix_mode) = match file.node_properties { | 
142  | 1  |                 Some(properties) => (properties.mtime, properties.unix_mode),  | 
143  | 3  |                 None => (None, None),  | 
144  |  |             };  | 
145  |  |             #[cfg_attr(target_family = "windows", allow(unused_assignments))]  | 
146  | 4  |             if file.is_executable {  Branch (146:16): [True: 1, False: 3]
   Branch (146:16): [Folded - Ignored]
  | 
147  | 1  |                 unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111);  | 
148  | 3  |             }  | 
149  | 4  |             futures.push(  | 
150  | 4  |                 cas_store  | 
151  | 4  |                     .populate_fast_store(digest.into())  | 
152  | 4  |                     .and_then(move |()| async move { | 
153  | 4  |                         let file_entry = filesystem_store  | 
154  | 4  |                             .get_file_entry_for_digest(&digest)  | 
155  | 4  |                             .await  | 
156  | 4  |                             .err_tip(|| "During hard link")?0 ;  | 
157  | 4  |                         file_entry  | 
158  | 4  |                             .get_file_path_locked(|src| fs::hard_link(src, &dest))  | 
159  | 4  |                             .await  | 
160  | 4  |                             .map_err(|e| {0   | 
161  | 0  |                                 if e.code == Code::NotFound {  Branch (161:36): [True: 0, False: 0]
   Branch (161:36): [Folded - Ignored]
  | 
162  | 0  |                                     make_err!(  | 
163  | 0  |                                         Code::Internal,  | 
164  |  |                                         "Could not make hardlink, file was likely evicted from cache. {e:?} : {dest}\n\ | 
165  |  |                                         This error often occurs when the filesystem store's max_bytes is too small for your workload.\n\  | 
166  |  |                                         To fix this issue:\n\  | 
167  |  |                                         1. Increase the 'max_bytes' value in your filesystem store configuration\n\  | 
168  |  |                                         2. Example: Change 'max_bytes: 10000000000' to 'max_bytes: 50000000000' (or higher)\n\  | 
169  |  |                                         3. The setting is typically found in your nativelink.json config under:\n\  | 
170  |  |                                            stores -> [your_filesystem_store] -> filesystem -> eviction_policy -> max_bytes\n\  | 
171  |  |                                         4. Restart NativeLink after making the change\n\n\  | 
172  |  |                                         If this error persists after increasing max_bytes several times, please report at:\n\  | 
173  |  |                                         https://github.com/TraceMachina/nativelink/issues\n\  | 
174  |  |                                         Include your config file and both server and client logs to help us assist you."  | 
175  |  |                                     )  | 
176  |  |                                 } else { | 
177  | 0  |                                     make_err!(Code::Internal, "Could not make hardlink, {e:?} : {dest}") | 
178  |  |                                 }  | 
179  | 0  |                             })?;  | 
180  |  |                         #[cfg(target_family = "unix")]  | 
181  | 4  |                         if let Some(unix_mode1 ) = unix_mode {   Branch (181:32): [True: 1, False: 3]
   Branch (181:32): [Folded - Ignored]
  | 
182  | 1  |                             fs::set_permissions(&dest, Permissions::from_mode(unix_mode))  | 
183  | 1  |                                 .await  | 
184  | 1  |                                 .err_tip(|| {0   | 
185  | 0  |                                     format!(  | 
186  | 0  |                                         "Could not set unix mode in download_to_directory {dest}" | 
187  |  |                                     )  | 
188  | 0  |                                 })?;  | 
189  | 3  |                         }  | 
190  | 4  |                         if let Some(mtime1 ) = mtime {   Branch (190:32): [True: 1, False: 3]
   Branch (190:32): [Folded - Ignored]
  | 
191  | 1  |                             spawn_blocking!("download_to_directory_set_mtime", move || { | 
192  | 1  |                                 set_file_mtime(  | 
193  | 1  |                                     &dest,  | 
194  | 1  |                                     FileTime::from_unix_time(mtime.seconds, mtime.nanos as u32),  | 
195  |  |                                 )  | 
196  | 1  |                                 .err_tip(|| {0   | 
197  | 0  |                                     format!("Failed to set mtime in download_to_directory {dest}") | 
198  | 0  |                                 })  | 
199  | 1  |                             })  | 
200  | 1  |                             .await  | 
201  | 1  |                             .err_tip(  | 
202  |  |                                 || "Failed to launch spawn_blocking in download_to_directory",  | 
203  | 0  |                             )??;  | 
204  | 3  |                         }  | 
205  | 4  |                         Ok(())  | 
206  | 8  |                     })  | 
207  | 4  |                     .map_err(move |e| e0 .append0 (format!0 ("for digest {digest}"0 )))  | 
208  | 4  |                     .boxed(),  | 
209  |  |             );  | 
210  |  |         }  | 
211  |  |  | 
212  | 32  |         for directory7  in directory.directories {  | 
213  | 7  |             let digest: DigestInfo = directory  | 
214  | 7  |                 .digest  | 
215  | 7  |                 .err_tip(|| "Expected Digest to exist in Directory::directories::digest")?0   | 
216  | 7  |                 .try_into()  | 
217  | 7  |                 .err_tip(|| "In Directory::file::digest")?0 ;  | 
218  | 7  |             let new_directory_path = format!("{}/{}", current_directory, directory.name); | 
219  | 7  |             futures.push(  | 
220  | 7  |                 async move { | 
221  | 7  |                     fs::create_dir(&new_directory_path)  | 
222  | 7  |                         .await  | 
223  | 7  |                         .err_tip(|| format!("Could not create directory {new_directory_path}"0 ))?0 ;  | 
224  | 7  |                     download_to_directory(  | 
225  | 7  |                         cas_store,  | 
226  | 7  |                         filesystem_store,  | 
227  | 7  |                         &digest,  | 
228  | 7  |                         &new_directory_path,  | 
229  | 7  |                     )  | 
230  | 7  |                     .await  | 
231  | 7  |                     .err_tip(|| format!("in download_to_directory : {new_directory_path}"0 ))?0 ;  | 
232  | 7  |                     Ok(())  | 
233  | 7  |                 }  | 
234  | 7  |                 .boxed(),  | 
235  |  |             );  | 
236  |  |         }  | 
237  |  |  | 
238  |  |         #[cfg(target_family = "unix")]  | 
239  | 26  |         for symlink_node1  in directory.symlinks {  | 
240  | 1  |             let dest = format!("{}/{}", current_directory, symlink_node.name); | 
241  | 1  |             futures.push(  | 
242  | 1  |                 async move { | 
243  | 1  |                     fs::symlink(&symlink_node.target, &dest).await.err_tip(|| {0   | 
244  | 0  |                         format!(  | 
245  | 0  |                             "Could not create symlink {} -> {}", | 
246  |  |                             symlink_node.target, dest  | 
247  |  |                         )  | 
248  | 0  |                     })?;  | 
249  | 1  |                     Ok(())  | 
250  | 1  |                 }  | 
251  | 1  |                 .boxed(),  | 
252  |  |             );  | 
253  |  |         }  | 
254  |  |  | 
255  | 37  |         while futures.try_next().await?0 .is_some() {}12    Branch (255:15): [True: 12, False: 25]
   Branch (255:15): [Folded - Ignored]
  | 
256  | 25  |         Ok(())  | 
257  | 25  |     }  | 
258  | 25  |     .boxed()  | 
259  | 25  | }  | 
260  |  |  | 
261  |  | #[cfg(target_family = "windows")]  | 
262  |  | fn is_executable(_metadata: &std::fs::Metadata, full_path: &impl AsRef<Path>) -> bool { | 
263  |  |     static EXECUTABLE_EXTENSIONS: &[&str] = &["exe", "bat", "com"];  | 
264  |  |     EXECUTABLE_EXTENSIONS  | 
265  |  |         .iter()  | 
266  |  |         .any(|ext| full_path.as_ref().extension().map_or(false, |v| v == *ext))  | 
267  |  | }  | 
268  |  |  | 
269  |  | #[cfg(target_family = "unix")]  | 
270  | 7  | fn is_executable(metadata: &std::fs::Metadata, _full_path: &impl AsRef<Path>) -> bool { | 
271  | 7  |     (metadata.mode() & 0o111) != 0  | 
272  | 7  | }  | 
273  |  |  | 
274  |  | type DigestUploader = Arc<tokio::sync::OnceCell<()>>;  | 
275  |  |  | 
276  | 7  | async fn upload_file(  | 
277  | 7  |     cas_store: Pin<&impl StoreLike>,  | 
278  | 7  |     full_path: impl AsRef<Path> + Debug + Send + Sync,  | 
279  | 7  |     hasher: DigestHasherFunc,  | 
280  | 7  |     metadata: std::fs::Metadata,  | 
281  | 7  |     digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,  | 
282  | 7  | ) -> Result<FileInfo, Error> { | 
283  | 7  |     let is_executable = is_executable(&metadata, &full_path);  | 
284  | 7  |     let file_size = metadata.len();  | 
285  | 7  |     let file = fs::open_file(&full_path, 0, u64::MAX)  | 
286  | 7  |         .await  | 
287  | 7  |         .err_tip(|| format!("Could not open file {full_path:?}"0 ))?0 ;  | 
288  |  |  | 
289  | 7  |     let (digest, mut file) = hasher  | 
290  | 7  |         .hasher()  | 
291  | 7  |         .digest_for_file(&full_path, file.into_inner(), Some(file_size))  | 
292  | 7  |         .await  | 
293  | 7  |         .err_tip(|| format!("Failed to hash file in digest_for_file failed for {full_path:?}"0 ))?0 ;  | 
294  |  |  | 
295  | 7  |     let digest_uploader = match digest_uploaders.lock().entry(digest) { | 
296  | 0  |         std::collections::hash_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),  | 
297  | 7  |         std::collections::hash_map::Entry::Vacant(vacant_entry) => vacant_entry  | 
298  | 7  |             .insert(Arc::new(tokio::sync::OnceCell::new()))  | 
299  | 7  |             .clone(),  | 
300  |  |     };  | 
301  |  |  | 
302  |  |     // Only upload a file with a given hash once.  The file may exist multiple  | 
303  |  |     // times in the output with different names.  | 
304  | 7  |     digest_uploader  | 
305  | 14  |         .get_or_try_init7 (async || {  | 
306  |  |             // Only upload if the digest doesn't already exist, this should be  | 
307  |  |             // a much cheaper operation than an upload.  | 
308  | 7  |             let cas_store = cas_store.as_store_driver_pin();  | 
309  | 7  |             let store_key: nativelink_util::store_trait::StoreKey<'_> = digest.into();  | 
310  | 7  |             if cas_store   Branch (310:16): [Folded - Ignored]
   Branch (310:16): [Folded - Ignored]
   Branch (310:16): [True: 1, False: 2]
   Branch (310:16): [True: 1, False: 3]
  | 
311  | 7  |                 .has(store_key.borrow())  | 
312  | 7  |                 .await  | 
313  | 7  |                 .is_ok_and(|result| result.is_some())  | 
314  |  |             { | 
315  | 2  |                 return Ok(());  | 
316  | 5  |             }  | 
317  |  |  | 
318  | 5  |             file.rewind().await.err_tip(|| "Could not rewind file")?0 ;  | 
319  |  |  | 
320  |  |             // Note: For unknown reasons we appear to be hitting:  | 
321  |  |             // https://github.com/rust-lang/rust/issues/92096  | 
322  |  |             // or a smiliar issue if we try to use the non-store driver function, so we  | 
323  |  |             // are using the store driver function here.  | 
324  | 5  |             cas_store  | 
325  | 5  |                 .update_with_whole_file(  | 
326  | 5  |                     store_key,  | 
327  | 5  |                     full_path.as_ref().into(),  | 
328  | 5  |                     file,  | 
329  | 5  |                     UploadSizeInfo::ExactSize(digest.size_bytes()),  | 
330  | 5  |                 )  | 
331  | 5  |                 .await  | 
332  | 5  |                 .map(|_slot| ())  | 
333  | 14  |         })  | 
334  | 7  |         .await  | 
335  | 7  |         .err_tip(|| format!("for {full_path:?}"0 ))?0 ;  | 
336  |  |  | 
337  | 7  |     let name = full_path  | 
338  | 7  |         .as_ref()  | 
339  | 7  |         .file_name()  | 
340  | 7  |         .err_tip(|| format!("Expected file_name to exist on {full_path:?}"0 ))?0   | 
341  | 7  |         .to_str()  | 
342  | 7  |         .err_tip(|| {0   | 
343  | 0  |             make_err!(  | 
344  | 0  |                 Code::Internal,  | 
345  |  |                 "Could not convert {:?} to string", | 
346  |  |                 full_path  | 
347  |  |             )  | 
348  | 0  |         })?  | 
349  | 7  |         .to_string();  | 
350  |  |  | 
351  | 7  |     Ok(FileInfo { | 
352  | 7  |         name_or_path: NameOrPath::Name(name),  | 
353  | 7  |         digest,  | 
354  | 7  |         is_executable,  | 
355  | 7  |     })  | 
356  | 7  | }  | 
357  |  |  | 
358  | 2  | async fn upload_symlink(  | 
359  | 2  |     full_path: impl AsRef<Path> + Debug,  | 
360  | 2  |     full_work_directory_path: impl AsRef<Path>,  | 
361  | 2  | ) -> Result<SymlinkInfo, Error> { | 
362  | 2  |     let full_target_path = fs::read_link(full_path.as_ref())  | 
363  | 2  |         .await  | 
364  | 2  |         .err_tip(|| format!("Could not get read_link path of {full_path:?}"0 ))?0 ;  | 
365  |  |  | 
366  |  |     // Detect if our symlink is inside our work directory, if it is find the  | 
367  |  |     // relative path otherwise use the absolute path.  | 
368  | 2  |     let target = if full_target_path.starts_with(full_work_directory_path.as_ref()) {  Branch (368:21): [Folded - Ignored]
   Branch (368:21): [Folded - Ignored]
   Branch (368:21): [True: 0, False: 1]
   Branch (368:21): [True: 0, False: 1]
  | 
369  | 0  |         let full_target_path = RelativePath::from_path(&full_target_path)  | 
370  | 0  |             .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))?; | 
371  | 0  |         RelativePath::from_path(full_work_directory_path.as_ref())  | 
372  | 0  |             .map_err(|v| make_err!(Code::Internal, "Could not convert {} to RelativePath", v))? | 
373  | 0  |             .relative(full_target_path)  | 
374  | 0  |             .normalize()  | 
375  | 0  |             .into_string()  | 
376  |  |     } else { | 
377  | 2  |         full_target_path  | 
378  | 2  |             .to_str()  | 
379  | 2  |             .err_tip(|| {0   | 
380  | 0  |                 make_err!(  | 
381  | 0  |                     Code::Internal,  | 
382  |  |                     "Could not convert '{:?}' to string", | 
383  |  |                     full_target_path  | 
384  |  |                 )  | 
385  | 0  |             })?  | 
386  | 2  |             .to_string()  | 
387  |  |     };  | 
388  |  |  | 
389  | 2  |     let name = full_path  | 
390  | 2  |         .as_ref()  | 
391  | 2  |         .file_name()  | 
392  | 2  |         .err_tip(|| format!("Expected file_name to exist on {full_path:?}"0 ))?0   | 
393  | 2  |         .to_str()  | 
394  | 2  |         .err_tip(|| {0   | 
395  | 0  |             make_err!(  | 
396  | 0  |                 Code::Internal,  | 
397  |  |                 "Could not convert {:?} to string", | 
398  |  |                 full_path  | 
399  |  |             )  | 
400  | 0  |         })?  | 
401  | 2  |         .to_string();  | 
402  |  |  | 
403  | 2  |     Ok(SymlinkInfo { | 
404  | 2  |         name_or_path: NameOrPath::Name(name),  | 
405  | 2  |         target,  | 
406  | 2  |     })  | 
407  | 2  | }  | 
408  |  |  | 
409  | 3  | fn upload_directory<'a, P: AsRef<Path> + Debug + Send + Sync + Clone + 'a>(  | 
410  | 3  |     cas_store: Pin<&'a impl StoreLike>,  | 
411  | 3  |     full_dir_path: P,  | 
412  | 3  |     full_work_directory: &'a str,  | 
413  | 3  |     hasher: DigestHasherFunc,  | 
414  | 3  |     digest_uploaders: Arc<Mutex<HashMap<DigestInfo, DigestUploader>>>,  | 
415  | 3  | ) -> BoxFuture<'a, Result<(Directory, VecDeque<ProtoDirectory>), Error>> { | 
416  | 3  |     Box::pin(async move { | 
417  | 3  |         let file_futures = FuturesUnordered::new();  | 
418  | 3  |         let dir_futures = FuturesUnordered::new();  | 
419  | 3  |         let symlink_futures = FuturesUnordered::new();  | 
420  |  |         { | 
421  | 3  |             let (_permit, dir_handle) = fs::read_dir(&full_dir_path)  | 
422  | 3  |                 .await  | 
423  | 3  |                 .err_tip(|| format!("Error reading dir for reading {full_dir_path:?}"0 ))?0   | 
424  | 3  |                 .into_inner();  | 
425  | 3  |             let mut dir_stream = ReadDirStream::new(dir_handle);  | 
426  |  |             // Note: Try very hard to not leave file descriptors open. Try to keep them as short  | 
427  |  |             // lived as possible. This is why we iterate the directory and then build a bunch of  | 
428  |  |             // futures with all the work we are wanting to do then execute it. It allows us to  | 
429  |  |             // close the directory iterator file descriptor, then open the child files/folders.  | 
430  | 8  |             while let Some(entry_result5 ) = dir_stream.next().await {   Branch (430:23): [Folded - Ignored]
   Branch (430:23): [Folded - Ignored]
   Branch (430:23): [True: 1, False: 1]
   Branch (430:23): [True: 4, False: 2]
  | 
431  | 5  |                 let entry = entry_result.err_tip(|| "Error while iterating directory")?0 ;  | 
432  | 5  |                 let file_type = entry  | 
433  | 5  |                     .file_type()  | 
434  | 5  |                     .await  | 
435  | 5  |                     .err_tip(|| format!("Error running file_type() on {entry:?}"0 ))?0 ;  | 
436  | 5  |                 let full_path = full_dir_path.as_ref().join(entry.path());  | 
437  | 5  |                 if file_type.is_dir() {  Branch (437:20): [Folded - Ignored]
   Branch (437:20): [Folded - Ignored]
   Branch (437:20): [True: 0, False: 1]
   Branch (437:20): [True: 1, False: 3]
  | 
438  | 1  |                     let full_dir_path = full_dir_path.clone();  | 
439  | 1  |                     dir_futures.push(  | 
440  | 1  |                         upload_directory(  | 
441  | 1  |                             cas_store,  | 
442  | 1  |                             full_path.clone(),  | 
443  | 1  |                             full_work_directory,  | 
444  | 1  |                             hasher,  | 
445  | 1  |                             digest_uploaders.clone(),  | 
446  |  |                         )  | 
447  | 1  |                         .and_then(|(dir, all_dirs)| async move { | 
448  | 1  |                             let directory_name = full_path  | 
449  | 1  |                                 .file_name()  | 
450  | 1  |                                 .err_tip(|| {0   | 
451  | 0  |                                     format!("Expected file_name to exist on {full_dir_path:?}") | 
452  | 0  |                                 })?  | 
453  | 1  |                                 .to_str()  | 
454  | 1  |                                 .err_tip(|| {0   | 
455  | 0  |                                     make_err!(  | 
456  | 0  |                                         Code::Internal,  | 
457  |  |                                         "Could not convert {:?} to string", | 
458  |  |                                         full_dir_path  | 
459  |  |                                     )  | 
460  | 0  |                                 })?  | 
461  | 1  |                                 .to_string();  | 
462  |  |  | 
463  | 1  |                             let digest =  | 
464  | 1  |                                 serialize_and_upload_message(&dir, cas_store, &mut hasher.hasher())  | 
465  | 1  |                                     .await  | 
466  | 1  |                                     .err_tip(|| format!("for {}"0 , full_path.display()0 ))?0 ;  | 
467  |  |  | 
468  | 1  |                             Result::<(DirectoryNode, VecDeque<Directory>), Error>::Ok((  | 
469  | 1  |                                 DirectoryNode { | 
470  | 1  |                                     name: directory_name,  | 
471  | 1  |                                     digest: Some(digest.into()),  | 
472  | 1  |                                 },  | 
473  | 1  |                                 all_dirs,  | 
474  | 1  |                             ))  | 
475  | 2  |                         })  | 
476  | 1  |                         .boxed(),  | 
477  |  |                     );  | 
478  | 4  |                 } else if file_type.is_file() {  Branch (478:27): [Folded - Ignored]
   Branch (478:27): [Folded - Ignored]
   Branch (478:27): [True: 0, False: 1]
   Branch (478:27): [True: 3, False: 0]
  | 
479  | 3  |                     let digest_uploaders = digest_uploaders.clone();  | 
480  | 3  |                     file_futures.push(async move { | 
481  | 3  |                         let metadata = fs::metadata(&full_path)  | 
482  | 3  |                             .await  | 
483  | 3  |                             .err_tip(|| format!("Could not open file {}"0 , full_path.display()0 ))?0 ;  | 
484  | 3  |                         upload_file(cas_store, &full_path, hasher, metadata, digest_uploaders)  | 
485  | 3  |                             .map_ok(TryInto::try_into)  | 
486  | 3  |                             .await?0   | 
487  | 3  |                     });  | 
488  | 1  |                 } else if file_type.is_symlink() {  Branch (488:27): [Folded - Ignored]
   Branch (488:27): [Folded - Ignored]
   Branch (488:27): [True: 1, False: 0]
   Branch (488:27): [True: 0, False: 0]
  | 
489  | 1  |                     symlink_futures.push(  | 
490  | 1  |                         upload_symlink(full_path, &full_work_directory)  | 
491  | 1  |                             .map(|symlink| symlink?0 .try_into()),  | 
492  |  |                     );  | 
493  | 0  |                 }  | 
494  |  |             }  | 
495  |  |         }  | 
496  |  |  | 
497  | 3  |         let (mut file_nodes, dir_entries, mut symlinks) = try_join3(  | 
498  | 3  |             file_futures.try_collect::<Vec<FileNode>>(),  | 
499  | 3  |             dir_futures.try_collect::<Vec<(DirectoryNode, VecDeque<Directory>)>>(),  | 
500  | 3  |             symlink_futures.try_collect::<Vec<SymlinkNode>>(),  | 
501  | 3  |         )  | 
502  | 3  |         .await?0 ;  | 
503  |  |  | 
504  | 3  |         let mut directory_nodes = Vec::with_capacity(dir_entries.len());  | 
505  |  |         // For efficiency we use a deque because it allows cheap concat of Vecs.  | 
506  |  |         // We make the assumption here that when performance is important it is because  | 
507  |  |         // our directory is quite large. This allows us to cheaply merge large amounts of  | 
508  |  |         // directories into one VecDeque. Then after we are done we need to collapse it  | 
509  |  |         // down into a single Vec.  | 
510  | 3  |         let mut all_child_directories = VecDeque::with_capacity(dir_entries.len());  | 
511  | 4  |         for (directory_node1 , mut recursive_child_directories1 ) in dir_entries {  | 
512  | 1  |             directory_nodes.push(directory_node);  | 
513  | 1  |             all_child_directories.append(&mut recursive_child_directories);  | 
514  | 1  |         }  | 
515  |  |  | 
516  | 3  |         file_nodes.sort_unstable_by(|a, b| a.name1 .cmp1 (&b.name1 ));  | 
517  | 3  |         directory_nodes.sort_unstable_by(|a, b| a.name0 .cmp0 (&b.name0 ));  | 
518  | 3  |         symlinks.sort_unstable_by(|a, b| a.name0 .cmp0 (&b.name0 ));  | 
519  |  |  | 
520  | 3  |         let directory = Directory { | 
521  | 3  |             files: file_nodes,  | 
522  | 3  |             directories: directory_nodes,  | 
523  | 3  |             symlinks,  | 
524  | 3  |             node_properties: None, // We don't support file properties.  | 
525  | 3  |         };  | 
526  | 3  |         all_child_directories.push_back(directory.clone());  | 
527  |  |  | 
528  | 3  |         Ok((directory, all_child_directories))  | 
529  | 3  |     })  | 
530  | 3  | }  | 
531  |  |  | 
532  | 0  | async fn process_side_channel_file(  | 
533  | 0  |     side_channel_file: Cow<'_, OsStr>,  | 
534  | 0  |     args: &[&OsStr],  | 
535  | 0  |     timeout: Duration,  | 
536  | 0  | ) -> Result<Option<Error>, Error> { | 
537  | 0  |     let mut json_contents = String::new();  | 
538  |  |     { | 
539  |  |         // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.  | 
540  | 0  |         let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await { | 
541  | 0  |             Ok(file_slot) => file_slot,  | 
542  | 0  |             Err(e) => { | 
543  | 0  |                 if e.code != Code::NotFound {  Branch (543:20): [Folded - Ignored]
   Branch (543:20): [Folded - Ignored]
   Branch (543:20): [True: 0, False: 0]
  | 
544  | 0  |                     return Err(e).err_tip(|| "Error opening side channel file");  | 
545  | 0  |                 }  | 
546  |  |                 // Note: If file does not exist, it's ok. Users are not required to create this file.  | 
547  | 0  |                 return Ok(None);  | 
548  |  |             }  | 
549  |  |         };  | 
550  | 0  |         file_slot  | 
551  | 0  |             .read_to_string(&mut json_contents)  | 
552  | 0  |             .await  | 
553  | 0  |             .err_tip(|| "Error reading side channel file")?;  | 
554  |  |     }  | 
555  |  |  | 
556  | 0  |     let side_channel_info: SideChannelInfo =  | 
557  | 0  |         serde_json5::from_str(&json_contents).map_err(|e| { | 
558  | 0  |             make_input_err!(  | 
559  |  |                 "Could not convert contents of side channel file (json) to SideChannelInfo : {e:?}" | 
560  |  |             )  | 
561  | 0  |         })?;  | 
562  | 0  |     Ok(side_channel_info.failure.map(|failure| match failure { | 
563  | 0  |         SideChannelFailureReason::Timeout => Error::new(  | 
564  | 0  |             Code::DeadlineExceeded,  | 
565  | 0  |             format!(  | 
566  | 0  |                 "Command '{}' timed out after {} seconds", | 
567  | 0  |                 args.join(OsStr::new(" ")).to_string_lossy(), | 
568  | 0  |                 timeout.as_secs_f32()  | 
569  |  |             ),  | 
570  |  |         ),  | 
571  | 0  |     }))  | 
572  | 0  | }  | 
573  |  |  | 
574  | 17  | async fn do_cleanup(  | 
575  | 17  |     running_actions_manager: &Arc<RunningActionsManagerImpl>,  | 
576  | 17  |     operation_id: &OperationId,  | 
577  | 17  |     action_directory: &str,  | 
578  | 17  | ) -> Result<(), Error> { | 
579  |  |     // Mark this operation as being cleaned up  | 
580  | 17  |     let Some(_cleaning_guard) = running_actions_manager.perform_cleanup(operation_id.clone())   Branch (580:9): [True: 0, False: 0]
   Branch (580:9): [Folded - Ignored]
   Branch (580:9): [True: 17, False: 0]
  | 
581  |  |     else { | 
582  |  |         // Cleanup is already happening elsewhere.  | 
583  | 0  |         return Ok(());  | 
584  |  |     };  | 
585  |  |  | 
586  | 17  |     debug!("Worker cleaning up"); | 
587  |  |     // Note: We need to be careful to keep trying to cleanup even if one of the steps fails.  | 
588  | 17  |     let remove_dir_result = fs::remove_dir_all(action_directory)  | 
589  | 17  |         .await  | 
590  | 17  |         .err_tip(|| format!("Could not remove working directory {action_directory}"0 ));  | 
591  |  |  | 
592  | 17  |     if let Err(err0 ) = running_actions_manager.cleanup_action(operation_id) {   Branch (592:12): [True: 0, False: 0]
   Branch (592:12): [Folded - Ignored]
   Branch (592:12): [True: 0, False: 17]
  | 
593  | 0  |         error!(?operation_id, ?err, "Error cleaning up action");  | 
594  | 0  |         Result::<(), Error>::Err(err).merge(remove_dir_result)  | 
595  | 17  |     } else if let Err(err0 ) = remove_dir_result {   Branch (595:19): [True: 0, False: 0]
   Branch (595:19): [Folded - Ignored]
   Branch (595:19): [True: 0, False: 17]
  | 
596  | 0  |         error!(?operation_id, ?err, "Error removing working directory");  | 
597  | 0  |         Err(err)  | 
598  |  |     } else { | 
599  | 17  |         Ok(())  | 
600  |  |     }  | 
601  | 17  | }  | 
602  |  |  | 
603  |  | pub trait RunningAction: Sync + Send + Sized + Unpin + 'static { | 
604  |  |     /// Returns the action id of the action.  | 
605  |  |     fn get_operation_id(&self) -> &OperationId;  | 
606  |  |  | 
607  |  |     /// Anything that needs to execute before the actions is actually executed should happen here.  | 
608  |  |     fn prepare_action(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;  | 
609  |  |  | 
610  |  |     /// Actually perform the execution of the action.  | 
611  |  |     fn execute(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;  | 
612  |  |  | 
613  |  |     /// Any uploading, processing or analyzing of the results should happen here.  | 
614  |  |     fn upload_results(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;  | 
615  |  |  | 
616  |  |     /// Cleanup any residual files, handles or other junk resulting from running the action.  | 
617  |  |     fn cleanup(self: Arc<Self>) -> impl Future<Output = Result<Arc<Self>, Error>> + Send;  | 
618  |  |  | 
619  |  |     /// Returns the final result. As a general rule this action should be thought of as  | 
620  |  |     /// a consumption of `self`, meaning once a return happens here the lifetime of `Self`  | 
621  |  |     /// is over and any action performed on it after this call is undefined behavior.  | 
622  |  |     fn get_finished_result(  | 
623  |  |         self: Arc<Self>,  | 
624  |  |     ) -> impl Future<Output = Result<ActionResult, Error>> + Send;  | 
625  |  |  | 
626  |  |     /// Returns the work directory of the action.  | 
627  |  |     fn get_work_directory(&self) -> &String;  | 
628  |  | }  | 
629  |  |  | 
630  |  | #[derive(Debug)]  | 
631  |  | struct RunningActionImplExecutionResult { | 
632  |  |     stdout: Bytes,  | 
633  |  |     stderr: Bytes,  | 
634  |  |     exit_code: i32,  | 
635  |  | }  | 
636  |  |  | 
637  |  | #[derive(Debug)]  | 
638  |  | struct RunningActionImplState { | 
639  |  |     command_proto: Option<ProtoCommand>,  | 
640  |  |     // TODO(palfrey) Kill is not implemented yet, but is instrumented.  | 
641  |  |     // However, it is used if the worker disconnects to destroy current jobs.  | 
642  |  |     kill_channel_tx: Option<oneshot::Sender<()>>,  | 
643  |  |     kill_channel_rx: Option<oneshot::Receiver<()>>,  | 
644  |  |     execution_result: Option<RunningActionImplExecutionResult>,  | 
645  |  |     action_result: Option<ActionResult>,  | 
646  |  |     execution_metadata: ExecutionMetadata,  | 
647  |  |     // If there was an internal error, this will be set.  | 
648  |  |     // This should NOT be set if everything was fine, but the process had a  | 
649  |  |     // non-zero exit code. Instead this should be used for internal errors  | 
650  |  |     // that prevented the action from running, upload failures, timeouts, exc...  | 
651  |  |     // but we have (or could have) the action results (like stderr/stdout).  | 
652  |  |     error: Option<Error>,  | 
653  |  | }  | 
654  |  |  | 
655  |  | #[derive(Debug)]  | 
656  |  | pub struct RunningActionImpl { | 
657  |  |     operation_id: OperationId,  | 
658  |  |     action_directory: String,  | 
659  |  |     work_directory: String,  | 
660  |  |     action_info: ActionInfo,  | 
661  |  |     timeout: Duration,  | 
662  |  |     running_actions_manager: Arc<RunningActionsManagerImpl>,  | 
663  |  |     state: Mutex<RunningActionImplState>,  | 
664  |  |     has_manager_entry: AtomicBool,  | 
665  |  |     did_cleanup: AtomicBool,  | 
666  |  | }  | 
667  |  |  | 
668  |  | impl RunningActionImpl { | 
669  | 18  |     fn new(  | 
670  | 18  |         execution_metadata: ExecutionMetadata,  | 
671  | 18  |         operation_id: OperationId,  | 
672  | 18  |         action_directory: String,  | 
673  | 18  |         action_info: ActionInfo,  | 
674  | 18  |         timeout: Duration,  | 
675  | 18  |         running_actions_manager: Arc<RunningActionsManagerImpl>,  | 
676  | 18  |     ) -> Self { | 
677  | 18  |         let work_directory = format!("{}/{}", action_directory, "work"); | 
678  | 18  |         let (kill_channel_tx, kill_channel_rx) = oneshot::channel();  | 
679  | 18  |         Self { | 
680  | 18  |             operation_id,  | 
681  | 18  |             action_directory,  | 
682  | 18  |             work_directory,  | 
683  | 18  |             action_info,  | 
684  | 18  |             timeout,  | 
685  | 18  |             running_actions_manager,  | 
686  | 18  |             state: Mutex::new(RunningActionImplState { | 
687  | 18  |                 command_proto: None,  | 
688  | 18  |                 kill_channel_rx: Some(kill_channel_rx),  | 
689  | 18  |                 kill_channel_tx: Some(kill_channel_tx),  | 
690  | 18  |                 execution_result: None,  | 
691  | 18  |                 action_result: None,  | 
692  | 18  |                 execution_metadata,  | 
693  | 18  |                 error: None,  | 
694  | 18  |             }),  | 
695  | 18  |             // Always need to ensure that we're removed from the manager on Drop.  | 
696  | 18  |             has_manager_entry: AtomicBool::new(true),  | 
697  | 18  |             // Only needs to be cleaned up after a prepare_action call, set there.  | 
698  | 18  |             did_cleanup: AtomicBool::new(true),  | 
699  | 18  |         }  | 
700  | 18  |     }  | 
701  |  |  | 
702  |  |     #[allow(  | 
703  |  |         clippy::missing_const_for_fn,  | 
704  |  |         reason = "False positive on stable, but not on nightly"  | 
705  |  |     )]  | 
706  | 0  |     fn metrics(&self) -> &Arc<Metrics> { | 
707  | 0  |         &self.running_actions_manager.metrics  | 
708  | 0  |     }  | 
709  |  |  | 
710  |  |     /// Prepares any actions needed to execution this action. This action will do the following:  | 
711  |  |     ///  | 
712  |  |     /// * Download any files needed to execute the action  | 
713  |  |     /// * Build a folder with all files needed to execute the action.  | 
714  |  |     ///  | 
715  |  |     /// This function will aggressively download and spawn potentially thousands of futures. It is  | 
716  |  |     /// up to the stores to rate limit if needed.  | 
717  | 15  |     async fn inner_prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0  {  | 
718  | 15  |         { | 
719  | 15  |             let mut state = self.state.lock();  | 
720  | 15  |             state.execution_metadata.input_fetch_start_timestamp =  | 
721  | 15  |                 (self.running_actions_manager.callbacks.now_fn)();  | 
722  | 15  |         }  | 
723  | 15  |         let command = { | 
724  |  |             // Download and build out our input files/folders. Also fetch and decode our Command.  | 
725  | 15  |             let command_fut = self.metrics().get_proto_command_from_store.wrap(async { | 
726  | 15  |                 get_and_decode_digest::<ProtoCommand>(  | 
727  | 15  |                     self.running_actions_manager.cas_store.as_ref(),  | 
728  | 15  |                     self.action_info.command_digest.into(),  | 
729  | 15  |                 )  | 
730  | 15  |                 .await  | 
731  | 15  |                 .err_tip(|| "Converting command_digest to Command")  | 
732  | 15  |             });  | 
733  | 15  |             let filesystem_store_pin =  | 
734  | 15  |                 Pin::new(self.running_actions_manager.filesystem_store.as_ref());  | 
735  | 15  |             let (command, ()) = try_join(command_fut, async { | 
736  | 15  |                 fs::create_dir(&self.work_directory)  | 
737  | 15  |                     .await  | 
738  | 15  |                     .err_tip(|| format!("Error creating work directory {}"0 , self.work_directory0 ))?0 ;  | 
739  |  |                 // Now the work directory has been created, we have to clean up.  | 
740  | 15  |                 self.did_cleanup.store(false, Ordering::Release);  | 
741  |  |                 // Download the input files/folder and place them into the temp directory.  | 
742  | 15  |                 self.metrics()  | 
743  | 15  |                     .download_to_directory  | 
744  | 15  |                     .wrap(download_to_directory(  | 
745  | 15  |                         &self.running_actions_manager.cas_store,  | 
746  | 15  |                         filesystem_store_pin,  | 
747  | 15  |                         &self.action_info.input_root_digest,  | 
748  | 15  |                         &self.work_directory,  | 
749  | 15  |                     ))  | 
750  | 15  |                     .await  | 
751  | 15  |             })  | 
752  | 15  |             .await?0 ;  | 
753  | 15  |             command  | 
754  |  |         };  | 
755  |  |         { | 
756  |  |             // Create all directories needed for our output paths. This is required by the bazel spec.  | 
757  | 15  |             let prepare_output_directories = |output_file| {9   | 
758  | 9  |                 let full_output_path = if command.working_directory.is_empty() {  Branch (758:43): [Folded - Ignored]
   Branch (758:43): [Folded - Ignored]
   Branch (758:43): [True: 1, False: 8]
  | 
759  | 1  |                     format!("{}/{}", self.work_directory, output_file) | 
760  |  |                 } else { | 
761  | 8  |                     format!(  | 
762  | 8  |                         "{}/{}/{}", | 
763  | 8  |                         self.work_directory, command.working_directory, output_file  | 
764  |  |                     )  | 
765  |  |                 };  | 
766  | 9  |                 async move { | 
767  | 9  |                     let full_parent_path = Path::new(&full_output_path)  | 
768  | 9  |                         .parent()  | 
769  | 9  |                         .err_tip(|| format!("Parent path for {full_output_path} has no parent"0 ))?0 ;  | 
770  | 9  |                     fs::create_dir_all(full_parent_path).await.err_tip(|| {0   | 
771  | 0  |                         format!(  | 
772  | 0  |                             "Error creating output directory {} (file)", | 
773  | 0  |                             full_parent_path.display()  | 
774  |  |                         )  | 
775  | 0  |                     })?;  | 
776  | 9  |                     Result::<(), Error>::Ok(())  | 
777  | 9  |                 }  | 
778  | 9  |             };  | 
779  | 15  |             self.metrics()  | 
780  | 15  |                 .prepare_output_files  | 
781  | 15  |                 .wrap(try_join_all(  | 
782  | 15  |                     command.output_files.iter().map(prepare_output_directories),  | 
783  | 15  |                 ))  | 
784  | 15  |                 .await?0 ;  | 
785  | 15  |             self.metrics()  | 
786  | 15  |                 .prepare_output_paths  | 
787  | 15  |                 .wrap(try_join_all(  | 
788  | 15  |                     command.output_paths.iter().map(prepare_output_directories),  | 
789  | 15  |                 ))  | 
790  | 15  |                 .await?0 ;  | 
791  |  |         }  | 
792  | 15  |         debug!(?command, "Worker received command");  | 
793  | 15  |         { | 
794  | 15  |             let mut state = self.state.lock();  | 
795  | 15  |             state.command_proto = Some(command);  | 
796  | 15  |             state.execution_metadata.input_fetch_completed_timestamp =  | 
797  | 15  |                 (self.running_actions_manager.callbacks.now_fn)();  | 
798  | 15  |         }  | 
799  | 15  |         Ok(self)  | 
800  | 15  |     }  | 
801  |  |  | 
802  | 13  |     async fn inner_execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0  {  | 
803  | 13  |         let (command_proto, mut kill_channel_rx) = { | 
804  | 13  |             let mut state = self.state.lock();  | 
805  | 13  |             state.execution_metadata.execution_start_timestamp =  | 
806  | 13  |                 (self.running_actions_manager.callbacks.now_fn)();  | 
807  |  |             (  | 
808  | 13  |                 state  | 
809  | 13  |                     .command_proto  | 
810  | 13  |                     .take()  | 
811  | 13  |                     .err_tip(|| "Expected state to have command_proto in execute()")?0 ,  | 
812  | 13  |                 state  | 
813  | 13  |                     .kill_channel_rx  | 
814  | 13  |                     .take()  | 
815  | 13  |                     .err_tip(|| "Expected state to have kill_channel_rx in execute()")?0   | 
816  |  |                     // This is important as we may be killed at any point.  | 
817  | 13  |                     .fuse(),  | 
818  |  |             )  | 
819  |  |         };  | 
820  | 13  |         if command_proto.arguments.is_empty() {  Branch (820:12): [Folded - Ignored]
   Branch (820:12): [Folded - Ignored]
   Branch (820:12): [True: 0, False: 13]
  | 
821  | 0  |             return Err(make_input_err!("No arguments provided in Command proto")); | 
822  | 13  |         }  | 
823  | 13  |         let args: Vec<&OsStr> = if let Some(entrypoint0 ) = &self   Branch (823:40): [Folded - Ignored]
   Branch (823:40): [Folded - Ignored]
   Branch (823:40): [True: 0, False: 13]
  | 
824  | 13  |             .running_actions_manager  | 
825  | 13  |             .execution_configuration  | 
826  | 13  |             .entrypoint  | 
827  |  |         { | 
828  | 0  |             core::iter::once(entrypoint.as_ref())  | 
829  | 0  |                 .chain(command_proto.arguments.iter().map(AsRef::as_ref))  | 
830  | 0  |                 .collect()  | 
831  |  |         } else { | 
832  | 13  |             command_proto.arguments.iter().map(AsRef::as_ref).collect()  | 
833  |  |         };  | 
834  |  |         // TODO(palfrey): This should probably be in debug, but currently  | 
835  |  |         //                    that's too busy and we often rely on this to  | 
836  |  |         //                    figure out toolchain misconfiguration issues.  | 
837  |  |         //                    De-bloat the `debug` level by using the `trace`  | 
838  |  |         //                    level more effectively and adjust this.  | 
839  | 13  |         info!(?args, "Executing command",);  | 
840  | 13  |         let mut command_builder = process::Command::new(args[0]);  | 
841  | 13  |         command_builder  | 
842  | 13  |             .args(&args[1..])  | 
843  | 13  |             .kill_on_drop(true)  | 
844  | 13  |             .stdin(Stdio::null())  | 
845  | 13  |             .stdout(Stdio::piped())  | 
846  | 13  |             .stderr(Stdio::piped())  | 
847  | 13  |             .current_dir(format!(  | 
848  | 13  |                 "{}/{}", | 
849  | 13  |                 self.work_directory, command_proto.working_directory  | 
850  | 13  |             ))  | 
851  | 13  |             .env_clear();  | 
852  |  |  | 
853  | 13  |         let requested_timeout = if self.action_info.timeout.is_zero() {  Branch (853:36): [Folded - Ignored]
   Branch (853:36): [Folded - Ignored]
   Branch (853:36): [True: 11, False: 2]
  | 
854  | 11  |             self.running_actions_manager.max_action_timeout  | 
855  |  |         } else { | 
856  | 2  |             self.action_info.timeout  | 
857  |  |         };  | 
858  |  |  | 
859  | 13  |         let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;  | 
860  | 13  |         if let Some(additional_environment0 ) = &self   Branch (860:16): [Folded - Ignored]
   Branch (860:16): [Folded - Ignored]
   Branch (860:16): [True: 0, False: 13]
  | 
861  | 13  |             .running_actions_manager  | 
862  | 13  |             .execution_configuration  | 
863  | 13  |             .additional_environment  | 
864  |  |         { | 
865  | 0  |             for (name, source) in additional_environment { | 
866  | 0  |                 let value = match source { | 
867  | 0  |                     EnvironmentSource::Property(property) => self  | 
868  | 0  |                         .action_info  | 
869  | 0  |                         .platform_properties  | 
870  | 0  |                         .get(property)  | 
871  | 0  |                         .map_or_else(|| Cow::Borrowed(""), |v| Cow::Borrowed(v.as_str())), | 
872  | 0  |                     EnvironmentSource::Value(value) => Cow::Borrowed(value.as_str()),  | 
873  |  |                     EnvironmentSource::TimeoutMillis => { | 
874  | 0  |                         Cow::Owned(requested_timeout.as_millis().to_string())  | 
875  |  |                     }  | 
876  |  |                     EnvironmentSource::SideChannelFile => { | 
877  | 0  |                         let file_cow =  | 
878  | 0  |                             format!("{}/{}", self.action_directory, Uuid::new_v4().simple()); | 
879  | 0  |                         maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));  | 
880  | 0  |                         Cow::Owned(file_cow)  | 
881  |  |                     }  | 
882  |  |                     EnvironmentSource::ActionDirectory => { | 
883  | 0  |                         Cow::Borrowed(self.action_directory.as_str())  | 
884  |  |                     }  | 
885  |  |                 };  | 
886  | 0  |                 command_builder.env(name, value.as_ref());  | 
887  |  |             }  | 
888  | 13  |         }  | 
889  |  |  | 
890  |  |         #[cfg(target_family = "unix")]  | 
891  | 13  |         let envs = &command_proto.environment_variables;  | 
892  |  |         // If SystemRoot is not set on windows we set it to default. Failing to do  | 
893  |  |         // this causes all commands to fail.  | 
894  |  |         #[cfg(target_family = "windows")]  | 
895  |  |         let envs = { | 
896  |  |             let mut envs = command_proto.environment_variables.clone();  | 
897  |  |             if !envs.iter().any(|v| v.name.to_uppercase() == "SYSTEMROOT") { | 
898  |  |                 envs.push(  | 
899  |  |                     nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable { | 
900  |  |                         name: "SystemRoot".to_string(),  | 
901  |  |                         value: "C:\\Windows".to_string(),  | 
902  |  |                     },  | 
903  |  |                 );  | 
904  |  |             }  | 
905  |  |             if !envs.iter().any(|v| v.name.to_uppercase() == "PATH") { | 
906  |  |                 envs.push(  | 
907  |  |                     nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable { | 
908  |  |                         name: "PATH".to_string(),  | 
909  |  |                         value: "C:\\Windows\\System32".to_string(),  | 
910  |  |                     },  | 
911  |  |                 );  | 
912  |  |             }  | 
913  |  |             envs  | 
914  |  |         };  | 
915  | 26  |         for environment_variable13  in envs {  | 
916  | 13  |             command_builder.env(&environment_variable.name, &environment_variable.value);  | 
917  | 13  |         }  | 
918  |  |  | 
919  | 13  |         let mut child_process = command_builder  | 
920  | 13  |             .spawn()  | 
921  | 13  |             .err_tip(|| format!("Could not execute command {args:?}"0 ))?0 ;  | 
922  | 13  |         let mut stdout_reader = child_process  | 
923  | 13  |             .stdout  | 
924  | 13  |             .take()  | 
925  | 13  |             .err_tip(|| "Expected stdout to exist on command this should never happen")?0 ;  | 
926  | 13  |         let mut stderr_reader = child_process  | 
927  | 13  |             .stderr  | 
928  | 13  |             .take()  | 
929  | 13  |             .err_tip(|| "Expected stderr to exist on command this should never happen")?0 ;  | 
930  |  |  | 
931  | 13  |         let mut child_process_guard = guard(child_process, |mut child_process| {0   | 
932  | 0  |             if child_process.try_wait().is_ok_and(|res| res.is_some()) {  Branch (932:16): [Folded - Ignored]
   Branch (932:16): [Folded - Ignored]
   Branch (932:16): [True: 0, False: 0]
  | 
933  |  |                 // The child already exited, probably a timeout or kill operation.  | 
934  | 0  |                 return;  | 
935  | 0  |             }  | 
936  | 0  |             error!(  | 
937  | 0  |                 "Child process was not cleaned up before dropping the call to execute(), killing in background spawn."  | 
938  |  |             );  | 
939  | 0  |             background_spawn!("running_actions_manager_kill_child_process", async move { | 
940  | 0  |                 child_process.kill().await  | 
941  | 0  |             });  | 
942  | 0  |         });  | 
943  |  |  | 
944  | 13  |         let all_stdout_fut = spawn!("stdout_reader", async move { | 
945  | 13  |             let mut all_stdout = BytesMut::new();  | 
946  |  |             loop { | 
947  | 16  |                 let sz15  = stdout_reader  | 
948  | 16  |                     .read_buf(&mut all_stdout)  | 
949  | 16  |                     .await  | 
950  | 15  |                     .err_tip(|| "Error reading stdout stream")?0 ;  | 
951  | 15  |                 if sz == 0 {  Branch (951:20): [Folded - Ignored]
   Branch (951:20): [Folded - Ignored]
   Branch (951:20): [True: 12, False: 3]
  | 
952  | 12  |                     break; // EOF.  | 
953  | 3  |                 }  | 
954  |  |             }  | 
955  | 12  |             Result::<Bytes, Error>::Ok(all_stdout.freeze())  | 
956  | 12  |         });  | 
957  | 13  |         let all_stderr_fut = spawn!("stderr_reader", async move { | 
958  | 13  |             let mut all_stderr = BytesMut::new();  | 
959  |  |             loop { | 
960  | 16  |                 let sz = stderr_reader  | 
961  | 16  |                     .read_buf(&mut all_stderr)  | 
962  | 16  |                     .await  | 
963  | 16  |                     .err_tip(|| "Error reading stderr stream")?0 ;  | 
964  | 16  |                 if sz == 0 {  Branch (964:20): [Folded - Ignored]
   Branch (964:20): [Folded - Ignored]
   Branch (964:20): [True: 13, False: 3]
  | 
965  | 13  |                     break; // EOF.  | 
966  | 3  |                 }  | 
967  |  |             }  | 
968  | 13  |             Result::<Bytes, Error>::Ok(all_stderr.freeze())  | 
969  | 13  |         });  | 
970  | 13  |         let mut killed_action = false;  | 
971  |  |  | 
972  | 13  |         let timer = self.metrics().child_process.begin_timer();  | 
973  | 13  |         let mut sleep_fut = (self.running_actions_manager.callbacks.sleep_fn)(self.timeout).fuse();  | 
974  |  |         loop { | 
975  | 17  |             tokio::select! { | 
976  | 17  |                 () = &mut sleep_fut => { | 
977  | 2  |                     self.running_actions_manager.metrics.task_timeouts.inc();  | 
978  | 2  |                     killed_action = true;  | 
979  | 2  |                     if let Err(err0 ) = child_process_guard.kill().await {   Branch (979:28): [Folded - Ignored]
   Branch (979:28): [Folded - Ignored]
   Branch (979:28): [True: 0, False: 2]
  | 
980  | 0  |                         error!(  | 
981  |  |                             ?err,  | 
982  | 0  |                             "Could not kill process in RunningActionsManager for action timeout",  | 
983  |  |                         );  | 
984  | 2  |                     }  | 
985  | 2  |                     { | 
986  | 2  |                         let mut state = self.state.lock();  | 
987  | 2  |                         state.error = Error::merge_option(state.error.take(), Some(Error::new(  | 
988  | 2  |                             Code::DeadlineExceeded,  | 
989  | 2  |                             format!(  | 
990  | 2  |                                 "Command '{}' timed out after {} seconds", | 
991  | 2  |                                 args.join(OsStr::new(" ")).to_string_lossy(), | 
992  | 2  |                                 self.action_info.timeout.as_secs_f32()  | 
993  | 2  |                             )  | 
994  | 2  |                         )));  | 
995  | 2  |                     }  | 
996  |  |                 },  | 
997  | 17  |                 maybe_exit_status13  = child_process_guard.wait() => {  | 
998  |  |                     // Defuse our guard so it does not try to cleanup and make nessless logs.  | 
999  | 13  |                     drop(ScopeGuard::<_, _>::into_inner(child_process_guard));  | 
1000  | 13  |                     let exit_status = maybe_exit_status.err_tip(|| "Failed to collect exit code of process")?0 ;  | 
1001  |  |                     // TODO(palfrey) We should implement stderr/stdout streaming to client here.  | 
1002  |  |                     // If we get killed before the stream is started, then these will lock up.  | 
1003  |  |                     // TODO(palfrey) There is a significant bug here. If we kill the action and the action creates  | 
1004  |  |                     // child processes, it can create zombies. See: https://github.com/tracemachina/nativelink/issues/225  | 
1005  | 13  |                     let (stdout, stderr) = if killed_action {  Branch (1005:47): [Folded - Ignored]
   Branch (1005:47): [Folded - Ignored]
   Branch (1005:47): [True: 4, False: 9]
  | 
1006  | 4  |                         drop(timer);  | 
1007  | 4  |                         (Bytes::new(), Bytes::new())  | 
1008  |  |                     } else { | 
1009  | 9  |                         timer.measure();  | 
1010  | 9  |                         let (maybe_all_stdout, maybe_all_stderr) = tokio::join!(all_stdout_fut, all_stderr_fut);  | 
1011  |  |                         (  | 
1012  | 9  |                             maybe_all_stdout.err_tip(|| "Internal error reading from stdout of worker task")?0 ?0 ,  | 
1013  | 9  |                             maybe_all_stderr.err_tip(|| "Internal error reading from stderr of worker task")?0 ?0   | 
1014  |  |                         )  | 
1015  |  |                     };  | 
1016  |  |  | 
1017  | 13  |                     let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| {9   | 
1018  | 9  |                         if exit_code == 0 {  Branch (1018:28): [Folded - Ignored]
   Branch (1018:28): [Folded - Ignored]
   Branch (1018:28): [True: 8, False: 1]
  | 
1019  | 8  |                             self.metrics().child_process_success_error_code.inc();  | 
1020  | 8  |                         } else { | 
1021  | 1  |                             self.metrics().child_process_failure_error_code.inc();  | 
1022  | 1  |                         }  | 
1023  | 9  |                         exit_code  | 
1024  | 9  |                     });  | 
1025  |  |  | 
1026  | 13  |                     let maybe_error_override = if let Some(side_channel_file0 ) = maybe_side_channel_file {   Branch (1026:55): [Folded - Ignored]
   Branch (1026:55): [Folded - Ignored]
   Branch (1026:55): [True: 0, False: 13]
  | 
1027  | 0  |                         process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await  | 
1028  | 0  |                         .err_tip(|| format!("Error processing side channel file: {}", side_channel_file.display()))? | 
1029  |  |                     } else { | 
1030  | 13  |                         None  | 
1031  |  |                     };  | 
1032  | 13  |                     { | 
1033  | 13  |                         let mut state = self.state.lock();  | 
1034  | 13  |                         state.error = Error::merge_option(state.error.take(), maybe_error_override);  | 
1035  | 13  |  | 
1036  | 13  |                         state.command_proto = Some(command_proto);  | 
1037  | 13  |                         state.execution_result = Some(RunningActionImplExecutionResult{ | 
1038  | 13  |                             stdout,  | 
1039  | 13  |                             stderr,  | 
1040  | 13  |                             exit_code,  | 
1041  | 13  |                         });  | 
1042  | 13  |                         state.execution_metadata.execution_completed_timestamp = (self.running_actions_manager.callbacks.now_fn)();  | 
1043  | 13  |                     }  | 
1044  | 13  |                     return Ok(self);  | 
1045  |  |                 },  | 
1046  | 17  |                 _ = &mut kill_channel_rx => { | 
1047  | 2  |                     killed_action = true;  | 
1048  | 2  |                     if let Err(err0 ) = child_process_guard.kill().await {   Branch (1048:28): [Folded - Ignored]
   Branch (1048:28): [Folded - Ignored]
   Branch (1048:28): [True: 0, False: 2]
  | 
1049  | 0  |                         error!(  | 
1050  | 0  |                             operation_id = ?self.operation_id,  | 
1051  |  |                             ?err,  | 
1052  | 0  |                             "Could not kill process",  | 
1053  |  |                         );  | 
1054  | 2  |                     }  | 
1055  | 2  |                     { | 
1056  | 2  |                         let mut state = self.state.lock();  | 
1057  | 2  |                         state.error = Error::merge_option(state.error.take(), Some(Error::new(  | 
1058  | 2  |                             Code::Aborted,  | 
1059  | 2  |                             format!(  | 
1060  | 2  |                                 "Command '{}' was killed by scheduler", | 
1061  | 2  |                                 args.join(OsStr::new(" ")).to_string_lossy() | 
1062  | 2  |                             )  | 
1063  | 2  |                         )));  | 
1064  | 2  |                     }  | 
1065  |  |                 },  | 
1066  |  |             }  | 
1067  |  |         }  | 
1068  |  |         // Unreachable.  | 
1069  | 13  |     }  | 
1070  |  |  | 
1071  | 11  |     async fn inner_upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> { | 
1072  |  |         enum OutputType { | 
1073  |  |             None,  | 
1074  |  |             File(FileInfo),  | 
1075  |  |             Directory(DirectoryInfo),  | 
1076  |  |             FileSymlink(SymlinkInfo),  | 
1077  |  |             DirectorySymlink(SymlinkInfo),  | 
1078  |  |         }  | 
1079  |  |  | 
1080  | 11  |         debug!("Worker uploading results",); | 
1081  | 11  |         let (mut command_proto, execution_result, mut execution_metadata) = { | 
1082  | 11  |             let mut state = self.state.lock();  | 
1083  | 11  |             state.execution_metadata.output_upload_start_timestamp =  | 
1084  | 11  |                 (self.running_actions_manager.callbacks.now_fn)();  | 
1085  |  |             (  | 
1086  | 11  |                 state  | 
1087  | 11  |                     .command_proto  | 
1088  | 11  |                     .take()  | 
1089  | 11  |                     .err_tip(|| "Expected state to have command_proto in execute()")?0 ,  | 
1090  | 11  |                 state  | 
1091  | 11  |                     .execution_result  | 
1092  | 11  |                     .take()  | 
1093  | 11  |                     .err_tip(|| "Execution result does not exist at upload_results stage")?0 ,  | 
1094  | 11  |                 state.execution_metadata.clone(),  | 
1095  |  |             )  | 
1096  |  |         };  | 
1097  | 11  |         let cas_store = self.running_actions_manager.cas_store.as_ref();  | 
1098  | 11  |         let hasher = self.action_info.unique_qualifier.digest_function();  | 
1099  |  |  | 
1100  | 11  |         let mut output_path_futures = FuturesUnordered::new();  | 
1101  | 11  |         let mut output_paths = command_proto.output_paths;  | 
1102  | 11  |         if output_paths.is_empty() {  Branch (1102:12): [Folded - Ignored]
   Branch (1102:12): [Folded - Ignored]
   Branch (1102:12): [True: 6, False: 5]
  | 
1103  | 6  |             output_paths  | 
1104  | 6  |                 .reserve(command_proto.output_files.len() + command_proto.output_directories.len());  | 
1105  | 6  |             output_paths.append(&mut command_proto.output_files);  | 
1106  | 6  |             output_paths.append(&mut command_proto.output_directories);  | 
1107  | 6  |         }5   | 
1108  | 11  |         let digest_uploaders = Arc::new(Mutex::new(HashMap::new()));  | 
1109  | 18  |         for entry7  in output_paths {  | 
1110  | 7  |             let full_path = OsString::from(if command_proto.working_directory.is_empty() {  Branch (1110:47): [Folded - Ignored]
   Branch (1110:47): [Folded - Ignored]
   Branch (1110:47): [True: 0, False: 7]
  | 
1111  | 0  |                 format!("{}/{}", self.work_directory, entry) | 
1112  |  |             } else { | 
1113  | 7  |                 format!(  | 
1114  | 7  |                     "{}/{}/{}", | 
1115  | 7  |                     self.work_directory, command_proto.working_directory, entry  | 
1116  |  |                 )  | 
1117  |  |             });  | 
1118  | 7  |             let work_directory = &self.work_directory;  | 
1119  | 7  |             let digest_uploaders = digest_uploaders.clone();  | 
1120  | 7  |             output_path_futures.push(async move { | 
1121  | 3  |                 let metadata = { | 
1122  | 7  |                     let metadata = match fs::symlink_metadata(&full_path).await { | 
1123  | 7  |                         Ok(file) => file,  | 
1124  | 0  |                         Err(e) => { | 
1125  | 0  |                             if e.code == Code::NotFound {  Branch (1125:32): [Folded - Ignored]
   Branch (1125:32): [Folded - Ignored]
   Branch (1125:32): [True: 0, False: 0]
  | 
1126  |  |                                 // In the event our output does not exist, according to the bazel remote  | 
1127  |  |                                 // execution spec, we simply ignore it continue.  | 
1128  | 0  |                                 return Result::<OutputType, Error>::Ok(OutputType::None);  | 
1129  | 0  |                             }  | 
1130  | 0  |                             return Err(e).err_tip(|| { | 
1131  | 0  |                                 format!("Could not open file {}", full_path.display()) | 
1132  | 0  |                             });  | 
1133  |  |                         }  | 
1134  |  |                     };  | 
1135  |  |  | 
1136  | 7  |                     if metadata.is_file() {  Branch (1136:24): [Folded - Ignored]
   Branch (1136:24): [Folded - Ignored]
   Branch (1136:24): [True: 4, False: 3]
  | 
1137  |  |                         return Ok(OutputType::File(  | 
1138  | 4  |                             upload_file(  | 
1139  | 4  |                                 cas_store.as_pin(),  | 
1140  | 4  |                                 &full_path,  | 
1141  | 4  |                                 hasher,  | 
1142  | 4  |                                 metadata,  | 
1143  | 4  |                                 digest_uploaders,  | 
1144  | 4  |                             )  | 
1145  | 4  |                             .await  | 
1146  | 4  |                             .map(|mut file_info| { | 
1147  | 4  |                                 file_info.name_or_path = NameOrPath::Path(entry);  | 
1148  | 4  |                                 file_info  | 
1149  | 4  |                             })  | 
1150  | 4  |                             .err_tip(|| format!("Uploading file {}"0 , full_path.display()0 ))?0 ,  | 
1151  |  |                         ));  | 
1152  | 3  |                     }  | 
1153  | 3  |                     metadata  | 
1154  |  |                 };  | 
1155  | 3  |                 if metadata.is_dir() {  Branch (1155:20): [Folded - Ignored]
   Branch (1155:20): [Folded - Ignored]
   Branch (1155:20): [True: 2, False: 1]
  | 
1156  |  |                     Ok(OutputType::Directory(  | 
1157  | 2  |                         upload_directory(  | 
1158  | 2  |                             cas_store.as_pin(),  | 
1159  | 2  |                             &full_path,  | 
1160  | 2  |                             work_directory,  | 
1161  | 2  |                             hasher,  | 
1162  | 2  |                             digest_uploaders,  | 
1163  |  |                         )  | 
1164  | 2  |                         .and_then(|(root_dir, children)| async move { | 
1165  | 2  |                             let tree = ProtoTree { | 
1166  | 2  |                                 root: Some(root_dir),  | 
1167  | 2  |                                 children: children.into(),  | 
1168  | 2  |                             };  | 
1169  | 2  |                             let tree_digest = serialize_and_upload_message(  | 
1170  | 2  |                                 &tree,  | 
1171  | 2  |                                 cas_store.as_pin(),  | 
1172  | 2  |                                 &mut hasher.hasher(),  | 
1173  | 2  |                             )  | 
1174  | 2  |                             .await  | 
1175  | 2  |                             .err_tip(|| format!("While processing {entry}"0 ))?0 ;  | 
1176  | 2  |                             Ok(DirectoryInfo { | 
1177  | 2  |                                 path: entry,  | 
1178  | 2  |                                 tree_digest,  | 
1179  | 2  |                             })  | 
1180  | 4  |                         })  | 
1181  | 2  |                         .await  | 
1182  | 2  |                         .err_tip(|| format!("Uploading directory {}"0 , full_path.display()0 ))?0 ,  | 
1183  |  |                     ))  | 
1184  | 1  |                 } else if metadata.is_symlink() {  Branch (1184:27): [Folded - Ignored]
   Branch (1184:27): [Folded - Ignored]
   Branch (1184:27): [True: 1, False: 0]
  | 
1185  | 1  |                     let output_symlink = upload_symlink(&full_path, work_directory)  | 
1186  | 1  |                         .await  | 
1187  | 1  |                         .map(|mut symlink_info| { | 
1188  | 1  |                             symlink_info.name_or_path = NameOrPath::Path(entry);  | 
1189  | 1  |                             symlink_info  | 
1190  | 1  |                         })  | 
1191  | 1  |                         .err_tip(|| format!("Uploading symlink {}"0 , full_path.display()0 ))?0 ;  | 
1192  | 1  |                     match fs::metadata(&full_path).await { | 
1193  | 1  |                         Ok(metadata) => { | 
1194  | 1  |                             if metadata.is_dir() {  Branch (1194:32): [Folded - Ignored]
   Branch (1194:32): [Folded - Ignored]
   Branch (1194:32): [True: 0, False: 1]
  | 
1195  | 0  |                                 return Ok(OutputType::DirectorySymlink(output_symlink));  | 
1196  | 1  |                             }  | 
1197  |  |                             // Note: If it's anything but directory we put it as a file symlink.  | 
1198  | 1  |                             return Ok(OutputType::FileSymlink(output_symlink));  | 
1199  |  |                         }  | 
1200  | 0  |                         Err(e) => { | 
1201  | 0  |                             if e.code != Code::NotFound {  Branch (1201:32): [Folded - Ignored]
   Branch (1201:32): [Folded - Ignored]
   Branch (1201:32): [True: 0, False: 0]
  | 
1202  | 0  |                                 return Err(e).err_tip(|| { | 
1203  | 0  |                                     format!(  | 
1204  | 0  |                                         "While querying target symlink metadata for {}", | 
1205  | 0  |                                         full_path.display()  | 
1206  |  |                                     )  | 
1207  | 0  |                                 });  | 
1208  | 0  |                             }  | 
1209  |  |                             // If the file doesn't exist, we consider it a file. Even though the  | 
1210  |  |                             // file doesn't exist we still need to populate an entry.  | 
1211  | 0  |                             return Ok(OutputType::FileSymlink(output_symlink));  | 
1212  |  |                         }  | 
1213  |  |                     }  | 
1214  |  |                 } else { | 
1215  | 0  |                     Err(make_err!(  | 
1216  | 0  |                         Code::Internal,  | 
1217  | 0  |                         "{full_path:?} was not a file, folder or symlink. Must be one.", | 
1218  | 0  |                     ))  | 
1219  |  |                 }  | 
1220  | 7  |             });  | 
1221  |  |         }  | 
1222  | 11  |         let mut output_files = vec![];  | 
1223  | 11  |         let mut output_folders = vec![];  | 
1224  | 11  |         let mut output_directory_symlinks = vec![];  | 
1225  | 11  |         let mut output_file_symlinks = vec![];  | 
1226  |  |  | 
1227  | 11  |         if execution_result.exit_code != 0 {  Branch (1227:12): [Folded - Ignored]
   Branch (1227:12): [Folded - Ignored]
   Branch (1227:12): [True: 5, False: 6]
  | 
1228  | 5  |             let stdout = core::str::from_utf8(&execution_result.stdout).unwrap_or("<no-utf8>"); | 
1229  | 5  |             let stderr = core::str::from_utf8(&execution_result.stderr).unwrap_or("<no-utf8>"); | 
1230  | 5  |             error!(  | 
1231  |  |                 exit_code = ?execution_result.exit_code,  | 
1232  | 5  |                 stdout = ?stdout[..min(stdout.len(), 1000)],  | 
1233  | 5  |                 stderr = ?stderr[..min(stderr.len(), 1000)],  | 
1234  | 5  |                 "Command returned non-zero exit code",  | 
1235  |  |             );  | 
1236  | 6  |         }  | 
1237  |  |  | 
1238  | 11  |         let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { | 
1239  | 11  |             let data = execution_result.stdout;  | 
1240  | 11  |             let digest = compute_buf_digest(&data, &mut hasher.hasher());  | 
1241  | 11  |             cas_store  | 
1242  | 11  |                 .update_oneshot(digest, data)  | 
1243  | 11  |                 .await  | 
1244  | 11  |                 .err_tip(|| "Uploading stdout")?0 ;  | 
1245  | 11  |             Result::<DigestInfo, Error>::Ok(digest)  | 
1246  | 11  |         });  | 
1247  | 11  |         let stderr_digest_fut = self.metrics().upload_stderr.wrap(async { | 
1248  | 11  |             let data = execution_result.stderr;  | 
1249  | 11  |             let digest = compute_buf_digest(&data, &mut hasher.hasher());  | 
1250  | 11  |             cas_store  | 
1251  | 11  |                 .update_oneshot(digest, data)  | 
1252  | 11  |                 .await  | 
1253  | 11  |                 .err_tip(|| "Uploading stdout")?0 ;  | 
1254  | 11  |             Result::<DigestInfo, Error>::Ok(digest)  | 
1255  | 11  |         });  | 
1256  |  |  | 
1257  | 11  |         let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async { | 
1258  | 18  |             while let Some(output_type7 ) = output_path_futures.try_next().await?0  {   Branch (1258:23): [Folded - Ignored]
   Branch (1258:23): [Folded - Ignored]
   Branch (1258:23): [True: 7, False: 11]
  | 
1259  | 7  |                 match output_type { | 
1260  | 4  |                     OutputType::File(output_file) => output_files.push(output_file),  | 
1261  | 2  |                     OutputType::Directory(output_folder) => output_folders.push(output_folder),  | 
1262  | 1  |                     OutputType::FileSymlink(output_symlink) => { | 
1263  | 1  |                         output_file_symlinks.push(output_symlink);  | 
1264  | 1  |                     }  | 
1265  | 0  |                     OutputType::DirectorySymlink(output_symlink) => { | 
1266  | 0  |                         output_directory_symlinks.push(output_symlink);  | 
1267  | 0  |                     }  | 
1268  | 0  |                     OutputType::None => { /* Safe to ignore */ } | 
1269  |  |                 }  | 
1270  |  |             }  | 
1271  | 11  |             Ok(())  | 
1272  | 11  |         });  | 
1273  | 11  |         drop(output_path_futures);  | 
1274  | 11  |         let (stdout_digest, stderr_digest) = match upload_result { | 
1275  | 11  |             Ok((stdout_digest, stderr_digest, ())) => (stdout_digest, stderr_digest),  | 
1276  | 0  |             Err(e) => return Err(e).err_tip(|| "Error while uploading results"),  | 
1277  |  |         };  | 
1278  |  |  | 
1279  | 11  |         execution_metadata.output_upload_completed_timestamp =  | 
1280  | 11  |             (self.running_actions_manager.callbacks.now_fn)();  | 
1281  | 11  |         output_files.sort_unstable_by(|a, b| a.name_or_path0 .cmp0 (&b.name_or_path0 ));  | 
1282  | 11  |         output_folders.sort_unstable_by(|a, b| a.path0 .cmp0 (&b.path0 ));  | 
1283  | 11  |         output_file_symlinks.sort_unstable_by(|a, b| a.name_or_path0 .cmp0 (&b.name_or_path0 ));  | 
1284  | 11  |         output_directory_symlinks.sort_unstable_by(|a, b| a.name_or_path0 .cmp0 (&b.name_or_path0 ));  | 
1285  | 11  |         { | 
1286  | 11  |             let mut state = self.state.lock();  | 
1287  | 11  |             execution_metadata.worker_completed_timestamp =  | 
1288  | 11  |                 (self.running_actions_manager.callbacks.now_fn)();  | 
1289  | 11  |             state.action_result = Some(ActionResult { | 
1290  | 11  |                 output_files,  | 
1291  | 11  |                 output_folders,  | 
1292  | 11  |                 output_directory_symlinks,  | 
1293  | 11  |                 output_file_symlinks,  | 
1294  | 11  |                 exit_code: execution_result.exit_code,  | 
1295  | 11  |                 stdout_digest,  | 
1296  | 11  |                 stderr_digest,  | 
1297  | 11  |                 execution_metadata,  | 
1298  | 11  |                 server_logs: HashMap::default(), // TODO(palfrey) Not implemented.  | 
1299  | 11  |                 error: state.error.clone(),  | 
1300  | 11  |                 message: String::new(), // Will be filled in on cache_action_result if needed.  | 
1301  | 11  |             });  | 
1302  | 11  |         }  | 
1303  | 11  |         Ok(self)  | 
1304  | 11  |     }  | 
1305  |  |  | 
1306  | 11  |     async fn inner_get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error> { | 
1307  | 11  |         let mut state = self.state.lock();  | 
1308  | 11  |         state  | 
1309  | 11  |             .action_result  | 
1310  | 11  |             .take()  | 
1311  | 11  |             .err_tip(|| "Expected action_result to exist in get_finished_result")  | 
1312  | 11  |     }  | 
1313  |  | }  | 
1314  |  |  | 
1315  |  | impl Drop for RunningActionImpl { | 
1316  | 18  |     fn drop(&mut self) { | 
1317  | 18  |         if self.did_cleanup.load(Ordering::Acquire) {  Branch (1317:12): [True: 18, False: 0]
   Branch (1317:12): [Folded - Ignored]
  | 
1318  | 18  |             if self.has_manager_entry.load(Ordering::Acquire) {  Branch (1318:16): [True: 1, False: 17]
   Branch (1318:16): [Folded - Ignored]
  | 
1319  | 1  |                 drop(  | 
1320  | 1  |                     self.running_actions_manager  | 
1321  | 1  |                         .cleanup_action(&self.operation_id),  | 
1322  | 1  |                 );  | 
1323  | 17  |             }  | 
1324  | 18  |             return;  | 
1325  | 0  |         }  | 
1326  | 0  |         let operation_id = self.operation_id.clone();  | 
1327  | 0  |         error!(  | 
1328  |  |             ?operation_id,  | 
1329  | 0  |             "RunningActionImpl did not cleanup. This is a violation of the requirements, will attempt to do it in the background."  | 
1330  |  |         );  | 
1331  | 0  |         let running_actions_manager = self.running_actions_manager.clone();  | 
1332  | 0  |         let action_directory = self.action_directory.clone();  | 
1333  | 0  |         background_spawn!("running_action_impl_drop", async move { | 
1334  | 0  |             let Err(err) =   Branch (1334:17): [True: 0, False: 0]
   Branch (1334:17): [Folded - Ignored]
  | 
1335  | 0  |                 do_cleanup(&running_actions_manager, &operation_id, &action_directory).await  | 
1336  |  |             else { | 
1337  | 0  |                 return;  | 
1338  |  |             };  | 
1339  | 0  |             error!(  | 
1340  |  |                 ?operation_id,  | 
1341  |  |                 ?action_directory,  | 
1342  |  |                 ?err,  | 
1343  | 0  |                 "Error cleaning up action"  | 
1344  |  |             );  | 
1345  | 0  |         });  | 
1346  | 18  |     }  | 
1347  |  | }  | 
1348  |  |  | 
1349  |  | impl RunningAction for RunningActionImpl { | 
1350  | 0  |     fn get_operation_id(&self) -> &OperationId { | 
1351  | 0  |         &self.operation_id  | 
1352  | 0  |     }  | 
1353  |  |  | 
1354  | 15  |     async fn prepare_action(self: Arc<Self>) -> Result<Arc<Self>, Error>0  {  | 
1355  | 15  |         self.metrics()  | 
1356  | 15  |             .clone()  | 
1357  | 15  |             .prepare_action  | 
1358  | 15  |             .wrap(Self::inner_prepare_action(self))  | 
1359  | 15  |             .await  | 
1360  | 15  |     }  | 
1361  |  |  | 
1362  | 13  |     async fn execute(self: Arc<Self>) -> Result<Arc<Self>, Error>0  {  | 
1363  | 13  |         self.metrics()  | 
1364  | 13  |             .clone()  | 
1365  | 13  |             .execute  | 
1366  | 13  |             .wrap(Self::inner_execute(self))  | 
1367  | 13  |             .await  | 
1368  | 13  |     }  | 
1369  |  |  | 
1370  | 11  |     async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> { | 
1371  | 11  |         self.metrics()  | 
1372  | 11  |             .clone()  | 
1373  | 11  |             .upload_results  | 
1374  | 11  |             .wrap(Self::inner_upload_results(self))  | 
1375  | 11  |             .await  | 
1376  | 11  |     }  | 
1377  |  |  | 
1378  | 17  |     async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> { | 
1379  | 17  |         self.metrics()  | 
1380  | 17  |             .clone()  | 
1381  | 17  |             .cleanup  | 
1382  | 17  |             .wrap(async move { | 
1383  | 17  |                 let result = do_cleanup(  | 
1384  | 17  |                     &self.running_actions_manager,  | 
1385  | 17  |                     &self.operation_id,  | 
1386  | 17  |                     &self.action_directory,  | 
1387  | 17  |                 )  | 
1388  | 17  |                 .await;  | 
1389  | 17  |                 self.has_manager_entry.store(false, Ordering::Release);  | 
1390  | 17  |                 self.did_cleanup.store(true, Ordering::Release);  | 
1391  | 17  |                 result.map(move |()| self)  | 
1392  | 17  |             })  | 
1393  | 17  |             .await  | 
1394  | 17  |     }  | 
1395  |  |  | 
1396  | 11  |     async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>0  {  | 
1397  | 11  |         self.metrics()  | 
1398  | 11  |             .clone()  | 
1399  | 11  |             .get_finished_result  | 
1400  | 11  |             .wrap(Self::inner_get_finished_result(self))  | 
1401  | 11  |             .await  | 
1402  | 11  |     }  | 
1403  |  |  | 
1404  | 0  |     fn get_work_directory(&self) -> &String { | 
1405  | 0  |         &self.work_directory  | 
1406  | 0  |     }  | 
1407  |  | }  | 
1408  |  |  | 
1409  |  | pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static { | 
1410  |  |     type RunningAction: RunningAction;  | 
1411  |  |  | 
1412  |  |     fn create_and_add_action(  | 
1413  |  |         self: &Arc<Self>,  | 
1414  |  |         worker_id: String,  | 
1415  |  |         start_execute: StartExecute,  | 
1416  |  |     ) -> impl Future<Output = Result<Arc<Self::RunningAction>, Error>> + Send;  | 
1417  |  |  | 
1418  |  |     fn cache_action_result(  | 
1419  |  |         &self,  | 
1420  |  |         action_digest: DigestInfo,  | 
1421  |  |         action_result: &mut ActionResult,  | 
1422  |  |         hasher: DigestHasherFunc,  | 
1423  |  |     ) -> impl Future<Output = Result<(), Error>> + Send;  | 
1424  |  |  | 
1425  |  |     fn kill_all(&self) -> impl Future<Output = ()> + Send;  | 
1426  |  |  | 
1427  |  |     fn kill_operation(  | 
1428  |  |         &self,  | 
1429  |  |         operation_id: &OperationId,  | 
1430  |  |     ) -> impl Future<Output = Result<(), Error>> + Send;  | 
1431  |  |  | 
1432  |  |     fn metrics(&self) -> &Arc<Metrics>;  | 
1433  |  | }  | 
1434  |  |  | 
1435  |  | /// A function to get the current system time, used to allow mocking for tests  | 
1436  |  | type NowFn = fn() -> SystemTime;  | 
1437  |  | type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;  | 
1438  |  |  | 
1439  |  | /// Functions that may be injected for testing purposes, during standard control  | 
1440  |  | /// flows these are specified by the new function.  | 
1441  |  | #[derive(Clone, Copy)]  | 
1442  |  | pub struct Callbacks { | 
1443  |  |     /// A function that gets the current time.  | 
1444  |  |     pub now_fn: NowFn,  | 
1445  |  |     /// A function that sleeps for a given Duration.  | 
1446  |  |     pub sleep_fn: SleepFn,  | 
1447  |  | }  | 
1448  |  |  | 
1449  |  | impl Debug for Callbacks { | 
1450  | 0  |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
1451  | 0  |         f.debug_struct("Callbacks").finish_non_exhaustive() | 
1452  | 0  |     }  | 
1453  |  | }  | 
1454  |  |  | 
1455  |  | /// The set of additional information for executing an action over and above  | 
1456  |  | /// those given in the `ActionInfo` passed to the worker.  This allows  | 
1457  |  | /// modification of the action for execution on this particular worker.  This  | 
1458  |  | /// may be used to run the action with a particular set of additional  | 
1459  |  | /// environment variables, or perhaps configure it to execute within a  | 
1460  |  | /// container.  | 
1461  |  | #[derive(Debug, Default)]  | 
1462  |  | pub struct ExecutionConfiguration { | 
1463  |  |     /// If set, will be executed instead of the first argument passed in the  | 
1464  |  |     /// `ActionInfo` with all of the arguments in the `ActionInfo` passed as  | 
1465  |  |     /// arguments to this command.  | 
1466  |  |     pub entrypoint: Option<String>,  | 
1467  |  |     /// The only environment variables that will be specified when the command  | 
1468  |  |     /// executes other than those in the `ActionInfo`.  On Windows, `SystemRoot`  | 
1469  |  |     /// and PATH are also assigned (see `inner_execute`).  | 
1470  |  |     pub additional_environment: Option<HashMap<String, EnvironmentSource>>,  | 
1471  |  | }  | 
1472  |  |  | 
1473  |  | #[derive(Debug)]  | 
1474  |  | struct UploadActionResults { | 
1475  |  |     upload_ac_results_strategy: UploadCacheResultsStrategy,  | 
1476  |  |     upload_historical_results_strategy: UploadCacheResultsStrategy,  | 
1477  |  |     ac_store: Option<Store>,  | 
1478  |  |     historical_store: Store,  | 
1479  |  |     success_message_template: Template,  | 
1480  |  |     failure_message_template: Template,  | 
1481  |  | }  | 
1482  |  |  | 
1483  |  | impl UploadActionResults { | 
1484  | 26  |     fn new(  | 
1485  | 26  |         config: &UploadActionResultConfig,  | 
1486  | 26  |         ac_store: Option<Store>,  | 
1487  | 26  |         historical_store: Store,  | 
1488  | 26  |     ) -> Result<Self, Error> { | 
1489  | 26  |         let upload_historical_results_strategy = config  | 
1490  | 26  |             .upload_historical_results_strategy  | 
1491  | 26  |             .unwrap_or(DEFAULT_HISTORICAL_RESULTS_STRATEGY);  | 
1492  | 8  |         if !matches!(   Branch (1492:12): [True: 8, False: 18]
   Branch (1492:12): [Folded - Ignored]
  | 
1493  | 26  |             config.upload_ac_results_strategy,  | 
1494  |  |             UploadCacheResultsStrategy::Never  | 
1495  | 8  |         ) && ac_store.is_none()   Branch (1495:14): [True: 0, False: 8]
   Branch (1495:14): [Folded - Ignored]
  | 
1496  |  |         { | 
1497  | 0  |             return Err(make_input_err!(  | 
1498  | 0  |                 "upload_ac_results_strategy is set, but no ac_store is configured"  | 
1499  | 0  |             ));  | 
1500  | 26  |         }  | 
1501  |  |         Ok(Self { | 
1502  | 26  |             upload_ac_results_strategy: config.upload_ac_results_strategy,  | 
1503  | 26  |             upload_historical_results_strategy,  | 
1504  | 26  |             ac_store,  | 
1505  | 26  |             historical_store,  | 
1506  | 26  |             success_message_template: Template::new(&config.success_message_template).map_err(  | 
1507  | 0  |                 |e| { | 
1508  | 0  |                     make_input_err!(  | 
1509  |  |                         "Could not convert success_message_template to rust template: {} : {e:?}", | 
1510  |  |                         config.success_message_template  | 
1511  |  |                     )  | 
1512  | 0  |                 },  | 
1513  | 0  |             )?,  | 
1514  | 26  |             failure_message_template: Template::new(&config.failure_message_template).map_err(  | 
1515  | 0  |                 |e| { | 
1516  | 0  |                     make_input_err!(  | 
1517  |  |                         "Could not convert failure_message_template to rust template: {} : {e:?}", | 
1518  |  |                         config.success_message_template  | 
1519  |  |                     )  | 
1520  | 0  |                 },  | 
1521  | 0  |             )?,  | 
1522  |  |         })  | 
1523  | 26  |     }  | 
1524  |  |  | 
1525  | 0  |     const fn should_cache_result(  | 
1526  | 0  |         strategy: UploadCacheResultsStrategy,  | 
1527  | 0  |         action_result: &ActionResult,  | 
1528  | 0  |         treat_infra_error_as_failure: bool,  | 
1529  | 0  |     ) -> bool { | 
1530  | 0  |         let did_fail = action_result.exit_code != 0   Branch (1530:24): [Folded - Ignored]
  | 
1531  | 0  |             || (treat_infra_error_as_failure && action_result.error.is_some());   Branch (1531:17): [Folded - Ignored]
  | 
1532  | 0  |         match strategy { | 
1533  | 0  |             UploadCacheResultsStrategy::SuccessOnly => !did_fail,  | 
1534  | 0  |             UploadCacheResultsStrategy::Never => false,  | 
1535  |  |             // Never cache internal errors or timeouts.  | 
1536  |  |             UploadCacheResultsStrategy::Everything => { | 
1537  | 0  |                 treat_infra_error_as_failure || action_result.error.is_none()   Branch (1537:17): [Folded - Ignored]
  | 
1538  |  |             }  | 
1539  | 0  |             UploadCacheResultsStrategy::FailuresOnly => did_fail,  | 
1540  |  |         }  | 
1541  | 0  |     }  | 
1542  |  |  | 
1543  |  |     /// Formats the message field in `ExecuteResponse` from the `success_message_template`  | 
1544  |  |     /// or `failure_message_template` config templates.  | 
1545  | 5  |     fn format_execute_response_message(  | 
1546  | 5  |         mut template_str: Template,  | 
1547  | 5  |         action_digest_info: DigestInfo,  | 
1548  | 5  |         maybe_historical_digest_info: Option<DigestInfo>,  | 
1549  | 5  |         hasher: DigestHasherFunc,  | 
1550  | 5  |     ) -> Result<String, Error> { | 
1551  | 5  |         template_str.replace(  | 
1552  |  |             "digest_function",  | 
1553  | 5  |             hasher.proto_digest_func().as_str_name().to_lowercase(),  | 
1554  |  |         );  | 
1555  | 5  |         template_str.replace(  | 
1556  |  |             "action_digest_hash",  | 
1557  | 5  |             action_digest_info.packed_hash().to_string(),  | 
1558  |  |         );  | 
1559  | 5  |         template_str.replace("action_digest_size", action_digest_info.size_bytes()); | 
1560  | 5  |         if let Some(historical_digest_info3 ) = maybe_historical_digest_info {   Branch (1560:16): [True: 3, False: 2]
   Branch (1560:16): [Folded - Ignored]
  | 
1561  | 3  |             template_str.replace(  | 
1562  | 3  |                 "historical_results_hash",  | 
1563  | 3  |                 format!("{}", historical_digest_info.packed_hash()), | 
1564  | 3  |             );  | 
1565  | 3  |             template_str.replace(  | 
1566  | 3  |                 "historical_results_size",  | 
1567  | 3  |                 historical_digest_info.size_bytes(),  | 
1568  | 3  |             );  | 
1569  | 3  |         } else { | 
1570  | 2  |             template_str.replace("historical_results_hash", ""); | 
1571  | 2  |             template_str.replace("historical_results_size", ""); | 
1572  | 2  |         }  | 
1573  | 5  |         template_str  | 
1574  | 5  |             .text()  | 
1575  | 5  |             .map_err(|e| make_input_err!("Could not convert template to text: {e:?}")) | 
1576  | 5  |     }  | 
1577  |  |  | 
1578  | 5  |     async fn upload_ac_results(  | 
1579  | 5  |         &self,  | 
1580  | 5  |         action_digest: DigestInfo,  | 
1581  | 5  |         action_result: ProtoActionResult,  | 
1582  | 5  |         hasher: DigestHasherFunc,  | 
1583  | 5  |     ) -> Result<(), Error> { | 
1584  | 5  |         let Some(ac_store) = self.ac_store.as_ref() else {  Branch (1584:13): [Folded - Ignored]
   Branch (1584:13): [Folded - Ignored]
   Branch (1584:13): [True: 5, False: 0]
  | 
1585  | 0  |             return Ok(());  | 
1586  |  |         };  | 
1587  |  |         // If we are a GrpcStore we shortcut here, as this is a special store.  | 
1588  | 5  |         if let Some(grpc_store0 ) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {   Branch (1588:16): [Folded - Ignored]
   Branch (1588:16): [Folded - Ignored]
   Branch (1588:16): [True: 0, False: 5]
  | 
1589  | 0  |             let update_action_request = UpdateActionResultRequest { | 
1590  | 0  |                 // This is populated by `update_action_result`.  | 
1591  | 0  |                 instance_name: String::new(),  | 
1592  | 0  |                 action_digest: Some(action_digest.into()),  | 
1593  | 0  |                 action_result: Some(action_result),  | 
1594  | 0  |                 results_cache_policy: None,  | 
1595  | 0  |                 digest_function: hasher.proto_digest_func().into(),  | 
1596  | 0  |             };  | 
1597  | 0  |             return grpc_store  | 
1598  | 0  |                 .update_action_result(Request::new(update_action_request))  | 
1599  | 0  |                 .await  | 
1600  | 0  |                 .map(|_| ())  | 
1601  | 0  |                 .err_tip(|| "Caching ActionResult");  | 
1602  | 5  |         }  | 
1603  |  |  | 
1604  | 5  |         let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);  | 
1605  | 5  |         action_result  | 
1606  | 5  |             .encode(&mut store_data)  | 
1607  | 5  |             .err_tip(|| "Encoding ActionResult for caching")?0 ;  | 
1608  |  |  | 
1609  | 5  |         ac_store  | 
1610  | 5  |             .update_oneshot(action_digest, store_data.split().freeze())  | 
1611  | 5  |             .await  | 
1612  | 5  |             .err_tip(|| "Caching ActionResult")  | 
1613  | 5  |     }  | 
1614  |  |  | 
1615  | 3  |     async fn upload_historical_results_with_message(  | 
1616  | 3  |         &self,  | 
1617  | 3  |         action_digest: DigestInfo,  | 
1618  | 3  |         execute_response: ExecuteResponse,  | 
1619  | 3  |         message_template: Template,  | 
1620  | 3  |         hasher: DigestHasherFunc,  | 
1621  | 3  |     ) -> Result<String, Error> { | 
1622  | 3  |         let historical_digest_info = serialize_and_upload_message(  | 
1623  | 3  |             &HistoricalExecuteResponse { | 
1624  | 3  |                 action_digest: Some(action_digest.into()),  | 
1625  | 3  |                 execute_response: Some(execute_response.clone()),  | 
1626  | 3  |             },  | 
1627  | 3  |             self.historical_store.as_pin(),  | 
1628  | 3  |             &mut hasher.hasher(),  | 
1629  | 3  |         )  | 
1630  | 3  |         .await  | 
1631  | 3  |         .err_tip(|| format!("Caching HistoricalExecuteResponse for digest: {action_digest}"0 ))?0 ;  | 
1632  |  |  | 
1633  | 3  |         Self::format_execute_response_message(  | 
1634  | 3  |             message_template,  | 
1635  | 3  |             action_digest,  | 
1636  | 3  |             Some(historical_digest_info),  | 
1637  | 3  |             hasher,  | 
1638  |  |         )  | 
1639  | 3  |         .err_tip(|| "Could not format message in upload_historical_results_with_message")  | 
1640  | 3  |     }  | 
1641  |  |  | 
1642  | 6  |     async fn cache_action_result(  | 
1643  | 6  |         &self,  | 
1644  | 6  |         action_info: DigestInfo,  | 
1645  | 6  |         action_result: &mut ActionResult,  | 
1646  | 6  |         hasher: DigestHasherFunc,  | 
1647  | 6  |     ) -> Result<(), Error> { | 
1648  | 6  |         let should_upload_historical_results =  | 
1649  | 6  |             Self::should_cache_result(self.upload_historical_results_strategy, action_result, true);  | 
1650  | 6  |         let should_upload_ac_results =  | 
1651  | 6  |             Self::should_cache_result(self.upload_ac_results_strategy, action_result, false);  | 
1652  |  |         // Shortcut so we don't need to convert to proto if not needed.  | 
1653  | 6  |         if !should_upload_ac_results && !should_upload_historical_results1  {   Branch (1653:12): [Folded - Ignored]
  Branch (1653:41): [Folded - Ignored]
   Branch (1653:12): [Folded - Ignored]
  Branch (1653:41): [Folded - Ignored]
   Branch (1653:12): [True: 1, False: 5]
  Branch (1653:41): [True: 1, False: 0]
  | 
1654  | 1  |             return Ok(());  | 
1655  | 5  |         }  | 
1656  |  |  | 
1657  | 5  |         let mut execute_response = to_execute_response(action_result.clone());  | 
1658  |  |  | 
1659  |  |         // In theory exit code should always be != 0 if there's an error, but for safety we  | 
1660  |  |         // catch both.  | 
1661  | 5  |         let message_template = if action_result.exit_code == 0 && action_result.error4 .is_none4 () {   Branch (1661:35): [Folded - Ignored]
  Branch (1661:67): [Folded - Ignored]
   Branch (1661:35): [Folded - Ignored]
  Branch (1661:67): [Folded - Ignored]
   Branch (1661:35): [True: 4, False: 1]
  Branch (1661:67): [True: 3, False: 1]
  | 
1662  | 3  |             self.success_message_template.clone()  | 
1663  |  |         } else { | 
1664  | 2  |             self.failure_message_template.clone()  | 
1665  |  |         };  | 
1666  |  |  | 
1667  | 5  |         let upload_historical_results_with_message_result = if should_upload_historical_results {  Branch (1667:64): [Folded - Ignored]
   Branch (1667:64): [Folded - Ignored]
   Branch (1667:64): [True: 3, False: 2]
  | 
1668  | 3  |             let maybe_message = self  | 
1669  | 3  |                 .upload_historical_results_with_message(  | 
1670  | 3  |                     action_info,  | 
1671  | 3  |                     execute_response.clone(),  | 
1672  | 3  |                     message_template,  | 
1673  | 3  |                     hasher,  | 
1674  | 3  |                 )  | 
1675  | 3  |                 .await;  | 
1676  | 3  |             match maybe_message { | 
1677  | 3  |                 Ok(message) => { | 
1678  | 3  |                     action_result.message.clone_from(&message);  | 
1679  | 3  |                     execute_response.message = message;  | 
1680  | 3  |                     Ok(())  | 
1681  |  |                 }  | 
1682  | 0  |                 Err(e) => Result::<(), Error>::Err(e),  | 
1683  |  |             }  | 
1684  |  |         } else { | 
1685  | 2  |             match Self::format_execute_response_message(message_template, action_info, None, hasher)  | 
1686  |  |             { | 
1687  | 2  |                 Ok(message) => { | 
1688  | 2  |                     action_result.message.clone_from(&message);  | 
1689  | 2  |                     execute_response.message = message;  | 
1690  | 2  |                     Ok(())  | 
1691  |  |                 }  | 
1692  | 0  |                 Err(e) => Err(e).err_tip(|| "Could not format message in cache_action_result"),  | 
1693  |  |             }  | 
1694  |  |         };  | 
1695  |  |  | 
1696  |  |         // Note: Done in this order because we assume most results will succeed and most configs will  | 
1697  |  |         // either always upload upload historical results or only upload on filure. In which case  | 
1698  |  |         // we can avoid an extra clone of the protos by doing this last with the above assumption.  | 
1699  | 5  |         let ac_upload_results = if should_upload_ac_results {  Branch (1699:36): [Folded - Ignored]
   Branch (1699:36): [Folded - Ignored]
   Branch (1699:36): [True: 5, False: 0]
  | 
1700  | 5  |             self.upload_ac_results(  | 
1701  | 5  |                 action_info,  | 
1702  | 5  |                 execute_response  | 
1703  | 5  |                     .result  | 
1704  | 5  |                     .err_tip(|| "No result set in cache_action_result")?0 ,  | 
1705  | 5  |                 hasher,  | 
1706  |  |             )  | 
1707  | 5  |             .await  | 
1708  |  |         } else { | 
1709  | 0  |             Ok(())  | 
1710  |  |         };  | 
1711  | 5  |         upload_historical_results_with_message_result.merge(ac_upload_results)  | 
1712  | 6  |     }  | 
1713  |  | }  | 
1714  |  |  | 
1715  |  | #[derive(Debug)]  | 
1716  |  | pub struct RunningActionsManagerArgs<'a> { | 
1717  |  |     pub root_action_directory: String,  | 
1718  |  |     pub execution_configuration: ExecutionConfiguration,  | 
1719  |  |     pub cas_store: Arc<FastSlowStore>,  | 
1720  |  |     pub ac_store: Option<Store>,  | 
1721  |  |     pub historical_store: Store,  | 
1722  |  |     pub upload_action_result_config: &'a UploadActionResultConfig,  | 
1723  |  |     pub max_action_timeout: Duration,  | 
1724  |  |     pub timeout_handled_externally: bool,  | 
1725  |  | }  | 
1726  |  |  | 
1727  |  | struct CleanupGuard { | 
1728  |  |     manager: Weak<RunningActionsManagerImpl>,  | 
1729  |  |     operation_id: OperationId,  | 
1730  |  | }  | 
1731  |  |  | 
1732  |  | impl Drop for CleanupGuard { | 
1733  | 17  |     fn drop(&mut self) { | 
1734  | 17  |         let Some(manager) = self.manager.upgrade() else {  Branch (1734:13): [True: 17, False: 0]
   Branch (1734:13): [Folded - Ignored]
  | 
1735  | 0  |             return;  | 
1736  |  |         };  | 
1737  | 17  |         let mut cleaning = manager.cleaning_up_operations.lock();  | 
1738  | 17  |         cleaning.remove(&self.operation_id);  | 
1739  | 17  |         manager.cleanup_complete_notify.notify_waiters();  | 
1740  | 17  |     }  | 
1741  |  | }  | 
1742  |  |  | 
1743  |  | /// Holds state info about what is being executed and the interface for interacting  | 
1744  |  | /// with actions while they are running.  | 
1745  |  | #[derive(Debug)]  | 
1746  |  | pub struct RunningActionsManagerImpl { | 
1747  |  |     root_action_directory: String,  | 
1748  |  |     execution_configuration: ExecutionConfiguration,  | 
1749  |  |     cas_store: Arc<FastSlowStore>,  | 
1750  |  |     filesystem_store: Arc<FilesystemStore>,  | 
1751  |  |     upload_action_results: UploadActionResults,  | 
1752  |  |     max_action_timeout: Duration,  | 
1753  |  |     timeout_handled_externally: bool,  | 
1754  |  |     running_actions: Mutex<HashMap<OperationId, Weak<RunningActionImpl>>>,  | 
1755  |  |     // Note: We don't use Notify because we need to support a .wait_for()-like function, which  | 
1756  |  |     // Notify does not support.  | 
1757  |  |     action_done_tx: watch::Sender<()>,  | 
1758  |  |     callbacks: Callbacks,  | 
1759  |  |     metrics: Arc<Metrics>,  | 
1760  |  |     /// Track operations being cleaned up to avoid directory collisions during action retries.  | 
1761  |  |     /// When an action fails and is retried on the same worker, we need to ensure the previous  | 
1762  |  |     /// attempt's directory is fully cleaned up before creating a new one.  | 
1763  |  |     /// See: <https://github.com/TraceMachina/nativelink/issues/1859>  | 
1764  |  |     cleaning_up_operations: Mutex<HashSet<OperationId>>,  | 
1765  |  |     /// Notify waiters when a cleanup operation completes. This is used in conjunction with  | 
1766  |  |     /// `cleaning_up_operations` to coordinate directory cleanup and creation.  | 
1767  |  |     cleanup_complete_notify: Arc<Notify>,  | 
1768  |  | }  | 
1769  |  |  | 
1770  |  | impl RunningActionsManagerImpl { | 
1771  |  |     /// Maximum time to wait for a cleanup operation to complete before timing out.  | 
1772  |  |     /// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future  | 
1773  |  |     const MAX_WAIT: Duration = Duration::from_secs(30);  | 
1774  |  |     /// Maximum backoff duration for exponential backoff when waiting for cleanup.  | 
1775  |  |     const MAX_BACKOFF: Duration = Duration::from_millis(500);  | 
1776  | 26  |     pub fn new_with_callbacks(  | 
1777  | 26  |         args: RunningActionsManagerArgs<'_>,  | 
1778  | 26  |         callbacks: Callbacks,  | 
1779  | 26  |     ) -> Result<Self, Error> { | 
1780  |  |         // Sadly because of some limitations of how Any works we need to clone more times than optimal.  | 
1781  | 26  |         let filesystem_store = args  | 
1782  | 26  |             .cas_store  | 
1783  | 26  |             .fast_store()  | 
1784  | 26  |             .downcast_ref::<FilesystemStore>(None)  | 
1785  | 26  |             .err_tip(  | 
1786  |  |                 || "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl",  | 
1787  | 0  |             )?  | 
1788  | 26  |             .get_arc()  | 
1789  | 26  |             .err_tip(|| "FilesystemStore's internal Arc was lost")?0 ;  | 
1790  | 26  |         let (action_done_tx, _) = watch::channel(());  | 
1791  |  |         Ok(Self { | 
1792  | 26  |             root_action_directory: args.root_action_directory,  | 
1793  | 26  |             execution_configuration: args.execution_configuration,  | 
1794  | 26  |             cas_store: args.cas_store,  | 
1795  | 26  |             filesystem_store,  | 
1796  | 26  |             upload_action_results: UploadActionResults::new(  | 
1797  | 26  |                 args.upload_action_result_config,  | 
1798  | 26  |                 args.ac_store,  | 
1799  | 26  |                 args.historical_store,  | 
1800  |  |             )  | 
1801  | 26  |             .err_tip(|| "During RunningActionsManagerImpl construction")?0 ,  | 
1802  | 26  |             max_action_timeout: args.max_action_timeout,  | 
1803  | 26  |             timeout_handled_externally: args.timeout_handled_externally,  | 
1804  | 26  |             running_actions: Mutex::new(HashMap::new()),  | 
1805  | 26  |             action_done_tx,  | 
1806  | 26  |             callbacks,  | 
1807  | 26  |             metrics: Arc::new(Metrics::default()),  | 
1808  | 26  |             cleaning_up_operations: Mutex::new(HashSet::new()),  | 
1809  | 26  |             cleanup_complete_notify: Arc::new(Notify::new()),  | 
1810  |  |         })  | 
1811  | 26  |     }  | 
1812  |  |  | 
1813  | 12  |     pub fn new(args: RunningActionsManagerArgs<'_>) -> Result<Self, Error> { | 
1814  | 12  |         Self::new_with_callbacks(  | 
1815  | 12  |             args,  | 
1816  |  |             Callbacks { | 
1817  | 12  |                 now_fn: SystemTime::now,  | 
1818  | 2  |                 sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),  | 
1819  |  |             },  | 
1820  |  |         )  | 
1821  | 12  |     }  | 
1822  |  |  | 
1823  |  |     /// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker  | 
1824  |  |     /// attempts to re-execute the same action before the physical cleanup (file is removed) completes.  | 
1825  |  |     /// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>  | 
1826  | 19  |     async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error>0  {  | 
1827  | 19  |         let start = Instant::now();  | 
1828  | 19  |         let mut backoff = Duration::from_millis(10);  | 
1829  | 19  |         let mut has_waited = false;  | 
1830  |  |  | 
1831  |  |         loop { | 
1832  | 19  |             let should_wait = { | 
1833  | 19  |                 let cleaning = self.cleaning_up_operations.lock();  | 
1834  | 19  |                 cleaning.contains(operation_id)  | 
1835  |  |             };  | 
1836  |  |  | 
1837  | 19  |             if !should_wait {  Branch (1837:16): [Folded - Ignored]
   Branch (1837:16): [Folded - Ignored]
   Branch (1837:16): [True: 19, False: 0]
  | 
1838  | 19  |                 let dir_path =  | 
1839  | 19  |                     PathBuf::from(&self.root_action_directory).join(operation_id.to_string());  | 
1840  |  |  | 
1841  | 19  |                 if !dir_path.exists() {  Branch (1841:20): [Folded - Ignored]
   Branch (1841:20): [Folded - Ignored]
   Branch (1841:20): [True: 18, False: 1]
  | 
1842  | 18  |                     return Ok(());  | 
1843  | 1  |                 }  | 
1844  |  |  | 
1845  |  |                 // Safety check: ensure we're only removing directories under root_action_directory  | 
1846  | 1  |                 let root_path = Path::new(&self.root_action_directory);  | 
1847  | 1  |                 let canonical_root = root_path.canonicalize().err_tip(|| {0   | 
1848  | 0  |                     format!(  | 
1849  | 0  |                         "Failed to canonicalize root directory: {}", | 
1850  |  |                         self.root_action_directory  | 
1851  |  |                     )  | 
1852  | 0  |                 })?;  | 
1853  | 1  |                 let canonical_dir = dir_path.canonicalize().err_tip(|| {0   | 
1854  | 0  |                     format!("Failed to canonicalize directory: {}", dir_path.display()) | 
1855  | 0  |                 })?;  | 
1856  |  |  | 
1857  | 1  |                 if !canonical_dir.starts_with(&canonical_root) {  Branch (1857:20): [Folded - Ignored]
   Branch (1857:20): [Folded - Ignored]
   Branch (1857:20): [True: 0, False: 1]
  | 
1858  | 0  |                     return Err(make_err!(  | 
1859  | 0  |                         Code::Internal,  | 
1860  | 0  |                         "Attempted to remove directory outside of root_action_directory: {}", | 
1861  | 0  |                         dir_path.display()  | 
1862  | 0  |                     ));  | 
1863  | 1  |                 }  | 
1864  |  |  | 
1865  |  |                 // Directory exists but not being cleaned - remove it  | 
1866  | 1  |                 warn!(  | 
1867  | 1  |                     "Removing stale directory for {}: {}", | 
1868  |  |                     operation_id,  | 
1869  | 1  |                     dir_path.display()  | 
1870  |  |                 );  | 
1871  | 1  |                 self.metrics.stale_removals.inc();  | 
1872  |  |  | 
1873  |  |                 // Try to remove the directory, with one retry on failure  | 
1874  | 1  |                 let remove_result = fs::remove_dir_all(&dir_path).await;  | 
1875  | 1  |                 if let Err(e0 ) = remove_result {   Branch (1875:24): [Folded - Ignored]
   Branch (1875:24): [Folded - Ignored]
   Branch (1875:24): [True: 0, False: 1]
  | 
1876  |  |                     // Retry once after a short delay in case the directory is temporarily locked  | 
1877  | 0  |                     tokio::time::sleep(Duration::from_millis(100)).await;  | 
1878  | 0  |                     fs::remove_dir_all(&dir_path).await.err_tip(|| { | 
1879  | 0  |                         format!(  | 
1880  | 0  |                             "Failed to remove stale directory {} for retry of {} after retry (original error: {})", | 
1881  | 0  |                             dir_path.display(),  | 
1882  |  |                             operation_id,  | 
1883  |  |                             e  | 
1884  |  |                         )  | 
1885  | 0  |                     })?;  | 
1886  | 1  |                 }  | 
1887  | 1  |                 return Ok(());  | 
1888  | 0  |             }  | 
1889  |  |  | 
1890  | 0  |             if start.elapsed() > Self::MAX_WAIT {  Branch (1890:16): [Folded - Ignored]
   Branch (1890:16): [Folded - Ignored]
   Branch (1890:16): [True: 0, False: 0]
  | 
1891  | 0  |                 self.metrics.cleanup_wait_timeouts.inc();  | 
1892  | 0  |                 return Err(make_err!(  | 
1893  | 0  |                     Code::DeadlineExceeded,  | 
1894  | 0  |                     "Timeout waiting for previous operation cleanup: {} (waited {:?})", | 
1895  | 0  |                     operation_id,  | 
1896  | 0  |                     start.elapsed()  | 
1897  | 0  |                 ));  | 
1898  | 0  |             }  | 
1899  |  |  | 
1900  | 0  |             if !has_waited {  Branch (1900:16): [Folded - Ignored]
   Branch (1900:16): [Folded - Ignored]
   Branch (1900:16): [True: 0, False: 0]
  | 
1901  | 0  |                 self.metrics.cleanup_waits.inc();  | 
1902  | 0  |                 has_waited = true;  | 
1903  | 0  |             }  | 
1904  |  |  | 
1905  | 0  |             trace!(  | 
1906  | 0  |                 "Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})", | 
1907  |  |                 operation_id,  | 
1908  | 0  |                 start.elapsed(),  | 
1909  |  |                 backoff  | 
1910  |  |             );  | 
1911  |  |  | 
1912  | 0  |             tokio::select! { | 
1913  | 0  |                 () = self.cleanup_complete_notify.notified() => {}, | 
1914  | 0  |                 () = tokio::time::sleep(backoff) => { | 
1915  | 0  |                     // Exponential backoff  | 
1916  | 0  |                     backoff = (backoff * 2).min(Self::MAX_BACKOFF);  | 
1917  | 0  |                 },  | 
1918  |  |             }  | 
1919  |  |         }  | 
1920  | 19  |     }  | 
1921  |  |  | 
1922  | 0  |     fn make_action_directory<'a>(  | 
1923  | 0  |         &'a self,  | 
1924  | 0  |         operation_id: &'a OperationId,  | 
1925  | 0  |     ) -> impl Future<Output = Result<String, Error>> + 'a { | 
1926  | 19  |         self.metrics.make_action_directory0 .wrap0 (async move {  | 
1927  | 19  |             let action_directory = format!("{}/{}", self.root_action_directory, operation_id); | 
1928  | 19  |             fs::create_dir(&action_directory)  | 
1929  | 19  |                 .await  | 
1930  | 19  |                 .err_tip(|| format!("Error creating action directory {action_directory}"0 ))?0 ;  | 
1931  | 19  |             Ok(action_directory)  | 
1932  | 19  |         })  | 
1933  | 0  |     }  | 
1934  |  |  | 
1935  | 19  |     fn create_action_info(  | 
1936  | 19  |         &self,  | 
1937  | 19  |         start_execute: StartExecute,  | 
1938  | 19  |         queued_timestamp: SystemTime,  | 
1939  | 19  |     ) -> impl Future<Output = Result<ActionInfo, Error>> + '_ { | 
1940  | 19  |         self.metrics.create_action_info.wrap(async move { | 
1941  | 19  |             let execute_request = start_execute  | 
1942  | 19  |                 .execute_request  | 
1943  | 19  |                 .err_tip(|| "Expected execute_request to exist in StartExecute")?0 ;  | 
1944  | 19  |             let action_digest: DigestInfo = execute_request  | 
1945  | 19  |                 .action_digest  | 
1946  | 19  |                 .clone()  | 
1947  | 19  |                 .err_tip(|| "Expected action_digest to exist on StartExecute")?0   | 
1948  | 19  |                 .try_into()?0 ;  | 
1949  | 19  |             let load_start_timestamp = (self.callbacks.now_fn)();  | 
1950  | 19  |             let action =  | 
1951  | 19  |                 get_and_decode_digest::<Action>(self.cas_store.as_ref(), action_digest.into())  | 
1952  | 19  |                     .await  | 
1953  | 19  |                     .err_tip(|| "During start_action")?0 ;  | 
1954  | 19  |             let action_info = ActionInfo::try_from_action_and_execute_request(  | 
1955  | 19  |                 execute_request,  | 
1956  | 19  |                 action,  | 
1957  | 19  |                 load_start_timestamp,  | 
1958  | 19  |                 queued_timestamp,  | 
1959  |  |             )  | 
1960  | 19  |             .err_tip(|| "Could not create ActionInfo in create_and_add_action()")?0 ;  | 
1961  | 19  |             Ok(action_info)  | 
1962  | 19  |         })  | 
1963  | 19  |     }  | 
1964  |  |  | 
1965  | 18  |     fn cleanup_action(&self, operation_id: &OperationId) -> Result<(), Error> { | 
1966  | 18  |         let mut running_actions = self.running_actions.lock();  | 
1967  | 18  |         let result = running_actions.remove(operation_id).err_tip(|| {0   | 
1968  | 0  |             format!("Expected action id '{operation_id:?}' to exist in RunningActionsManagerImpl") | 
1969  | 0  |         });  | 
1970  |  |         // No need to copy anything, we just are telling the receivers an event happened.  | 
1971  | 18  |         self.action_done_tx.send_modify(|()| {}); | 
1972  | 18  |         result.map(|_| ())  | 
1973  | 18  |     }  | 
1974  |  |  | 
1975  |  |     // Note: We do not capture metrics on this call, only `.kill_all()`.  | 
1976  |  |     // Important: When the future returns the process may still be running.  | 
1977  | 2  |     async fn kill_operation(action: Arc<RunningActionImpl>)0  {  | 
1978  | 2  |         warn!(  | 
1979  | 2  |             operation_id = ?action.operation_id,  | 
1980  | 2  |             "Sending kill to running operation",  | 
1981  |  |         );  | 
1982  | 2  |         let kill_channel_tx = { | 
1983  | 2  |             let mut action_state = action.state.lock();  | 
1984  | 2  |             action_state.kill_channel_tx.take()  | 
1985  |  |         };  | 
1986  | 2  |         if let Some(kill_channel_tx) = kill_channel_tx {  Branch (1986:16): [Folded - Ignored]
   Branch (1986:16): [Folded - Ignored]
   Branch (1986:16): [True: 2, False: 0]
  | 
1987  | 2  |             if kill_channel_tx.send(()).is_err() {  Branch (1987:16): [Folded - Ignored]
   Branch (1987:16): [Folded - Ignored]
   Branch (1987:16): [True: 0, False: 2]
  | 
1988  | 0  |                 error!(  | 
1989  | 0  |                     operation_id = ?action.operation_id,  | 
1990  | 0  |                     "Error sending kill to running operation",  | 
1991  |  |                 );  | 
1992  | 2  |             }  | 
1993  | 0  |         }  | 
1994  | 2  |     }  | 
1995  |  |  | 
1996  | 17  |     fn perform_cleanup(self: &Arc<Self>, operation_id: OperationId) -> Option<CleanupGuard> { | 
1997  | 17  |         let mut cleaning = self.cleaning_up_operations.lock();  | 
1998  | 17  |         cleaning  | 
1999  | 17  |             .insert(operation_id.clone())  | 
2000  | 17  |             .then_some(CleanupGuard { | 
2001  | 17  |                 manager: Arc::downgrade(self),  | 
2002  | 17  |                 operation_id,  | 
2003  | 17  |             })  | 
2004  | 17  |     }  | 
2005  |  | }  | 
2006  |  |  | 
2007  |  | impl RunningActionsManager for RunningActionsManagerImpl { | 
2008  |  |     type RunningAction = RunningActionImpl;  | 
2009  |  |  | 
2010  | 19  |     async fn create_and_add_action(  | 
2011  | 19  |         self: &Arc<Self>,  | 
2012  | 19  |         worker_id: String,  | 
2013  | 19  |         start_execute: StartExecute,  | 
2014  | 19  |     ) -> Result<Arc<RunningActionImpl>, Error> { | 
2015  | 19  |         self.metrics  | 
2016  | 19  |             .create_and_add_action  | 
2017  | 19  |             .wrap(async move { | 
2018  | 19  |                 let queued_timestamp = start_execute  | 
2019  | 19  |                     .queued_timestamp  | 
2020  | 19  |                     .and_then(|time| time13 .try_into13 ().ok13 ())  | 
2021  | 19  |                     .unwrap_or(SystemTime::UNIX_EPOCH);  | 
2022  | 19  |                 let operation_id = start_execute  | 
2023  | 19  |                     .operation_id.as_str().into();  | 
2024  | 19  |                 let action_info = self.create_action_info(start_execute, queued_timestamp).await?0 ;  | 
2025  | 19  |                 debug!(  | 
2026  |  |                     ?action_info,  | 
2027  | 19  |                     "Worker received action",  | 
2028  |  |                 );  | 
2029  |  |                 // Wait for any previous cleanup to complete before creating directory  | 
2030  | 19  |                 self.wait_for_cleanup_if_needed(&operation_id).await?0 ;  | 
2031  | 19  |                 let action_directory = self.make_action_directory(&operation_id).await?0 ;  | 
2032  | 19  |                 let execution_metadata = ExecutionMetadata { | 
2033  | 19  |                     worker: worker_id,  | 
2034  | 19  |                     queued_timestamp: action_info.insert_timestamp,  | 
2035  | 19  |                     worker_start_timestamp: action_info.load_timestamp,  | 
2036  | 19  |                     worker_completed_timestamp: SystemTime::UNIX_EPOCH,  | 
2037  | 19  |                     input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,  | 
2038  | 19  |                     input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,  | 
2039  | 19  |                     execution_start_timestamp: SystemTime::UNIX_EPOCH,  | 
2040  | 19  |                     execution_completed_timestamp: SystemTime::UNIX_EPOCH,  | 
2041  | 19  |                     output_upload_start_timestamp: SystemTime::UNIX_EPOCH,  | 
2042  | 19  |                     output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,  | 
2043  | 19  |                 };  | 
2044  | 19  |                 let timeout = if action_info.timeout.is_zero() || self.timeout_handled_externally3  {   Branch (2044:34): [Folded - Ignored]
  Branch (2044:67): [Folded - Ignored]
   Branch (2044:34): [Folded - Ignored]
  Branch (2044:67): [Folded - Ignored]
   Branch (2044:34): [True: 16, False: 3]
  Branch (2044:67): [True: 0, False: 3]
  | 
2045  | 16  |                     self.max_action_timeout  | 
2046  |  |                 } else { | 
2047  | 3  |                     action_info.timeout  | 
2048  |  |                 };  | 
2049  | 19  |                 if timeout > self.max_action_timeout {  Branch (2049:20): [Folded - Ignored]
   Branch (2049:20): [Folded - Ignored]
   Branch (2049:20): [True: 1, False: 18]
  | 
2050  | 1  |                     return Err(make_err!(  | 
2051  | 1  |                         Code::InvalidArgument,  | 
2052  | 1  |                         "Action timeout of {} seconds is greater than the maximum allowed timeout of {} seconds", | 
2053  | 1  |                         timeout.as_secs_f32(),  | 
2054  | 1  |                         self.max_action_timeout.as_secs_f32()  | 
2055  | 1  |                     ));  | 
2056  | 18  |                 }  | 
2057  | 18  |                 let running_action = Arc::new(RunningActionImpl::new(  | 
2058  | 18  |                     execution_metadata,  | 
2059  | 18  |                     operation_id.clone(),  | 
2060  | 18  |                     action_directory,  | 
2061  | 18  |                     action_info,  | 
2062  | 18  |                     timeout,  | 
2063  | 18  |                     self.clone(),  | 
2064  |  |                 ));  | 
2065  |  |                 { | 
2066  | 18  |                     let mut running_actions = self.running_actions.lock();  | 
2067  |  |                     // Check if action already exists and is still alive  | 
2068  | 18  |                     if let Some(existing_weak0 ) = running_actions.get(&operation_id) {   Branch (2068:28): [Folded - Ignored]
   Branch (2068:28): [Folded - Ignored]
   Branch (2068:28): [True: 0, False: 18]
  | 
2069  | 0  |                         if let Some(_existing_action) = existing_weak.upgrade() {  Branch (2069:32): [Folded - Ignored]
   Branch (2069:32): [Folded - Ignored]
   Branch (2069:32): [True: 0, False: 0]
  | 
2070  | 0  |                             return Err(make_err!(  | 
2071  | 0  |                                 Code::AlreadyExists,  | 
2072  | 0  |                                 "Action with operation_id {} is already running", | 
2073  | 0  |                                 operation_id  | 
2074  | 0  |                             ));  | 
2075  | 0  |                         }  | 
2076  | 18  |                     }  | 
2077  | 18  |                     running_actions.insert(operation_id, Arc::downgrade(&running_action));  | 
2078  |  |                 }  | 
2079  | 18  |                 Ok(running_action)  | 
2080  | 19  |             })  | 
2081  | 19  |             .await  | 
2082  | 19  |     }  | 
2083  |  |  | 
2084  | 6  |     async fn cache_action_result(  | 
2085  | 6  |         &self,  | 
2086  | 6  |         action_info: DigestInfo,  | 
2087  | 6  |         action_result: &mut ActionResult,  | 
2088  | 6  |         hasher: DigestHasherFunc,  | 
2089  | 6  |     ) -> Result<(), Error> { | 
2090  | 6  |         self.metrics  | 
2091  | 6  |             .cache_action_result  | 
2092  | 6  |             .wrap(self.upload_action_results.cache_action_result(  | 
2093  | 6  |                 action_info,  | 
2094  | 6  |                 action_result,  | 
2095  | 6  |                 hasher,  | 
2096  | 6  |             ))  | 
2097  | 6  |             .await  | 
2098  | 6  |     }  | 
2099  |  |  | 
2100  | 0  |     async fn kill_operation(&self, operation_id: &OperationId) -> Result<(), Error> { | 
2101  | 0  |         let running_action = { | 
2102  | 0  |             let running_actions = self.running_actions.lock();  | 
2103  | 0  |             running_actions  | 
2104  | 0  |                 .get(operation_id)  | 
2105  | 0  |                 .and_then(Weak::upgrade)  | 
2106  | 0  |                 .ok_or_else(|| make_input_err!("Failed to get running action {operation_id}"))? | 
2107  |  |         };  | 
2108  | 0  |         Self::kill_operation(running_action).await;  | 
2109  | 0  |         Ok(())  | 
2110  | 0  |     }  | 
2111  |  |  | 
2112  |  |     // Note: When the future returns the process should be fully killed and cleaned up.  | 
2113  | 2  |     async fn kill_all(&self)0  {  | 
2114  | 2  |         self.metrics  | 
2115  | 2  |             .kill_all  | 
2116  | 2  |             .wrap_no_capture_result(async move { | 
2117  | 2  |                 let kill_operations: Vec<Arc<RunningActionImpl>> = { | 
2118  | 2  |                     let running_actions = self.running_actions.lock();  | 
2119  | 2  |                     running_actions  | 
2120  | 2  |                         .iter()  | 
2121  | 2  |                         .filter_map(|(_operation_id, action)| action.upgrade())  | 
2122  | 2  |                         .collect()  | 
2123  |  |                 };  | 
2124  | 2  |                 let mut kill_futures: FuturesUnordered<_> = kill_operations  | 
2125  | 2  |                     .into_iter()  | 
2126  | 2  |                     .map(Self::kill_operation)  | 
2127  | 2  |                     .collect();  | 
2128  | 4  |                 while kill_futures.next().await.is_some() {}2    Branch (2128:23): [Folded - Ignored]
   Branch (2128:23): [Folded - Ignored]
   Branch (2128:23): [True: 2, False: 2]
  | 
2129  | 2  |             })  | 
2130  | 2  |             .await;  | 
2131  |  |         // Ignore error. If error happens it means there's no sender, which is not a problem.  | 
2132  |  |         // Note: Sanity check this API will always check current value then future values:  | 
2133  |  |         // https://play.rust-lang.org/?version=stable&edition=2021&gist=23103652cc1276a97e5f9938da87fdb2  | 
2134  | 2  |         drop(  | 
2135  | 2  |             self.action_done_tx  | 
2136  | 2  |                 .subscribe()  | 
2137  | 4  |                 .wait_for2 (|()| self.running_actions.lock().is_empty())  | 
2138  | 2  |                 .await,  | 
2139  |  |         );  | 
2140  | 2  |     }  | 
2141  |  |  | 
2142  |  |     #[inline]  | 
2143  | 2  |     fn metrics(&self) -> &Arc<Metrics> { | 
2144  | 2  |         &self.metrics  | 
2145  | 2  |     }  | 
2146  |  | }  | 
2147  |  |  | 
2148  |  | #[derive(Debug, Default, MetricsComponent)]  | 
2149  |  | pub struct Metrics { | 
2150  |  |     #[metric(help = "Stats about the create_and_add_action command.")]  | 
2151  |  |     create_and_add_action: AsyncCounterWrapper,  | 
2152  |  |     #[metric(help = "Stats about the cache_action_result command.")]  | 
2153  |  |     cache_action_result: AsyncCounterWrapper,  | 
2154  |  |     #[metric(help = "Stats about the kill_all command.")]  | 
2155  |  |     kill_all: AsyncCounterWrapper,  | 
2156  |  |     #[metric(help = "Stats about the create_action_info command.")]  | 
2157  |  |     create_action_info: AsyncCounterWrapper,  | 
2158  |  |     #[metric(help = "Stats about the make_work_directory command.")]  | 
2159  |  |     make_action_directory: AsyncCounterWrapper,  | 
2160  |  |     #[metric(help = "Stats about the prepare_action command.")]  | 
2161  |  |     prepare_action: AsyncCounterWrapper,  | 
2162  |  |     #[metric(help = "Stats about the execute command.")]  | 
2163  |  |     execute: AsyncCounterWrapper,  | 
2164  |  |     #[metric(help = "Stats about the upload_results command.")]  | 
2165  |  |     upload_results: AsyncCounterWrapper,  | 
2166  |  |     #[metric(help = "Stats about the cleanup command.")]  | 
2167  |  |     cleanup: AsyncCounterWrapper,  | 
2168  |  |     #[metric(help = "Stats about the get_finished_result command.")]  | 
2169  |  |     get_finished_result: AsyncCounterWrapper,  | 
2170  |  |     #[metric(help = "Number of times an action waited for cleanup to complete.")]  | 
2171  |  |     cleanup_waits: CounterWithTime,  | 
2172  |  |     #[metric(help = "Number of stale directories removed during action retries.")]  | 
2173  |  |     stale_removals: CounterWithTime,  | 
2174  |  |     #[metric(help = "Number of timeouts while waiting for cleanup to complete.")]  | 
2175  |  |     cleanup_wait_timeouts: CounterWithTime,  | 
2176  |  |     #[metric(help = "Stats about the get_proto_command_from_store command.")]  | 
2177  |  |     get_proto_command_from_store: AsyncCounterWrapper,  | 
2178  |  |     #[metric(help = "Stats about the download_to_directory command.")]  | 
2179  |  |     download_to_directory: AsyncCounterWrapper,  | 
2180  |  |     #[metric(help = "Stats about the prepare_output_files command.")]  | 
2181  |  |     prepare_output_files: AsyncCounterWrapper,  | 
2182  |  |     #[metric(help = "Stats about the prepare_output_paths command.")]  | 
2183  |  |     prepare_output_paths: AsyncCounterWrapper,  | 
2184  |  |     #[metric(help = "Stats about the child_process command.")]  | 
2185  |  |     child_process: AsyncCounterWrapper,  | 
2186  |  |     #[metric(help = "Stats about the child_process_success_error_code command.")]  | 
2187  |  |     child_process_success_error_code: CounterWithTime,  | 
2188  |  |     #[metric(help = "Stats about the child_process_failure_error_code command.")]  | 
2189  |  |     child_process_failure_error_code: CounterWithTime,  | 
2190  |  |     #[metric(help = "Total time spent uploading stdout.")]  | 
2191  |  |     upload_stdout: AsyncCounterWrapper,  | 
2192  |  |     #[metric(help = "Total time spent uploading stderr.")]  | 
2193  |  |     upload_stderr: AsyncCounterWrapper,  | 
2194  |  |     #[metric(help = "Total number of task timeouts.")]  | 
2195  |  |     task_timeouts: CounterWithTime,  | 
2196  |  | }  |