/build/source/nativelink-worker/src/worker_utils.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::hash::BuildHasher;  | 
16  |  | use core::str::from_utf8;  | 
17  |  | use std::collections::HashMap;  | 
18  |  | use std::io::{BufRead, BufReader, Cursor}; | 
19  |  | use std::process::Stdio;  | 
20  |  |  | 
21  |  | use futures::future::try_join_all;  | 
22  |  | use nativelink_config::cas_server::WorkerProperty;  | 
23  |  | use nativelink_error::{Error, ResultExt, make_err, make_input_err}; | 
24  |  | use nativelink_proto::build::bazel::remote::execution::v2::platform::Property;  | 
25  |  | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest;  | 
26  |  | use tokio::process;  | 
27  |  | use tracing::info;  | 
28  |  |  | 
29  |  | #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this  | 
30  | 8  | pub async fn make_connect_worker_request<S: BuildHasher>(  | 
31  | 8  |     worker_id_prefix: String,  | 
32  | 8  |     worker_properties: &HashMap<String, WorkerProperty, S>,  | 
33  | 8  | ) -> Result<ConnectWorkerRequest, Error> { | 
34  | 8  |     let mut futures = vec![];  | 
35  | 8  |     for (property_name0 , worker_property0 ) in worker_properties {  | 
36  | 0  |         futures.push(async move { | 
37  | 0  |             match worker_property { | 
38  | 0  |                 WorkerProperty::Values(values) => { | 
39  | 0  |                     let mut props = Vec::with_capacity(values.len());  | 
40  | 0  |                     for value in values { | 
41  | 0  |                         props.push(Property { | 
42  | 0  |                             name: property_name.clone(),  | 
43  | 0  |                             value: value.clone(),  | 
44  | 0  |                         });  | 
45  | 0  |                     }  | 
46  | 0  |                     Ok(props)  | 
47  |  |                 }  | 
48  | 0  |                 WorkerProperty::QueryCmd(cmd) => { | 
49  | 0  |                     let maybe_split_cmd = shlex::split(cmd);  | 
50  | 0  |                     let (command, args) = match &maybe_split_cmd { | 
51  | 0  |                         Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),  | 
52  |  |                         None => { | 
53  | 0  |                             return Err(make_input_err!(  | 
54  | 0  |                                 "Could not parse the value of worker property: {}: '{}'", | 
55  | 0  |                                 property_name,  | 
56  | 0  |                                 cmd  | 
57  | 0  |                             ));  | 
58  |  |                         }  | 
59  |  |                     };  | 
60  | 0  |                     let mut process = process::Command::new(command);  | 
61  | 0  |                     process.env_clear();  | 
62  | 0  |                     process.args(args);  | 
63  | 0  |                     process.stdin(Stdio::null());  | 
64  | 0  |                     let err_fn =  | 
65  | 0  |                         || format!("Error executing property_name {property_name} command"); | 
66  | 0  |                     info!(cmd, property_name, "Spawning process",);  | 
67  | 0  |                     let process_output = process.output().await.err_tip(err_fn)?;  | 
68  | 0  |                     if !process_output.status.success() {  Branch (68:24): [True: 0, False: 0]
   Branch (68:24): [Folded - Ignored]
   Branch (68:24): [Folded - Ignored]
  | 
69  | 0  |                         return Err(make_err!(  | 
70  | 0  |                             process_output.status.code().unwrap().into(),  | 
71  | 0  |                             "{}", | 
72  | 0  |                             err_fn()  | 
73  | 0  |                         ));  | 
74  | 0  |                     }  | 
75  | 0  |                     if !process_output.stderr.is_empty() {  Branch (75:24): [True: 0, False: 0]
   Branch (75:24): [Folded - Ignored]
   Branch (75:24): [Folded - Ignored]
  | 
76  | 0  |                         eprintln!(  | 
77  | 0  |                             "{}", | 
78  | 0  |                             from_utf8(&process_output.stderr).map_err(|e| make_input_err!(  | 
79  |  |                                 "Failed to decode stderr to utf8 : {:?}", | 
80  |  |                                 e  | 
81  | 0  |                             ))?  | 
82  |  |                         );  | 
83  | 0  |                     }  | 
84  | 0  |                     let reader = BufReader::new(Cursor::new(process_output.stdout));  | 
85  |  |  | 
86  | 0  |                     let mut props = vec![];  | 
87  | 0  |                     for value in reader.lines() { | 
88  | 0  |                         props.push(Property { | 
89  | 0  |                             name: property_name.clone(),  | 
90  | 0  |                             value: value  | 
91  | 0  |                                 .err_tip(|| "Could split input by lines")?  | 
92  | 0  |                                 .trim()  | 
93  | 0  |                                 .to_string(),  | 
94  |  |                         });  | 
95  |  |                     }  | 
96  | 0  |                     Ok(props)  | 
97  |  |                 }  | 
98  |  |             }  | 
99  | 0  |         });  | 
100  |  |     }  | 
101  |  |  | 
102  |  |     Ok(ConnectWorkerRequest { | 
103  | 8  |         worker_id_prefix,  | 
104  | 8  |         properties: try_join_all(futures).await?0 .into_iter().flatten().collect(),  | 
105  |  |     })  | 
106  | 8  | }  |