/build/source/nativelink-worker/src/worker_utils.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::hash::BuildHasher; |
16 | | use core::str::from_utf8; |
17 | | use std::collections::HashMap; |
18 | | use std::io::{BufRead, BufReader, Cursor}; |
19 | | use std::process::Stdio; |
20 | | |
21 | | use futures::future::try_join_all; |
22 | | use nativelink_config::cas_server::WorkerProperty; |
23 | | use nativelink_error::{Error, ResultExt, make_err, make_input_err}; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; |
25 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest; |
26 | | use tokio::process; |
27 | | use tracing::info; |
28 | | |
29 | | #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this |
30 | 9 | pub async fn make_connect_worker_request<S: BuildHasher>( |
31 | 9 | worker_id_prefix: String, |
32 | 9 | worker_properties: &HashMap<String, WorkerProperty, S>, |
33 | 9 | extra_envs: &HashMap<String, String, S>, |
34 | 9 | max_inflight_tasks: u64, |
35 | 9 | ) -> Result<ConnectWorkerRequest, Error> { |
36 | 9 | let mut futures = vec![]; |
37 | 10 | for (property_name1 , worker_property1 ) in worker_properties { |
38 | 1 | futures.push(async move { |
39 | 1 | match worker_property { |
40 | 0 | WorkerProperty::Values(values) => { |
41 | 0 | let mut props = Vec::with_capacity(values.len()); |
42 | 0 | for value in values { |
43 | 0 | props.push(Property { |
44 | 0 | name: property_name.clone(), |
45 | 0 | value: value.clone(), |
46 | 0 | }); |
47 | 0 | } |
48 | 0 | Ok(props) |
49 | | } |
50 | 1 | WorkerProperty::QueryCmd(cmd) => { |
51 | 1 | let maybe_split_cmd = shlex::split(cmd); |
52 | 1 | let (command, args) = match &maybe_split_cmd { |
53 | 1 | Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]), |
54 | | None => { |
55 | 0 | return Err(make_input_err!( |
56 | 0 | "Could not parse the value of worker property: {}: '{}'", |
57 | 0 | property_name, |
58 | 0 | cmd |
59 | 0 | )); |
60 | | } |
61 | | }; |
62 | 1 | let mut process = process::Command::new(command); |
63 | 1 | process.env_clear(); |
64 | 1 | process.envs(extra_envs); |
65 | 1 | process.args(args); |
66 | 1 | process.stdin(Stdio::null()); |
67 | 1 | let err_fn = |
68 | 0 | || format!("Error executing property_name {property_name} command"); |
69 | 1 | info!(cmd, property_name, "Spawning process",); |
70 | 1 | let process_output = process.output().await.err_tip(err_fn)?0 ; |
71 | 1 | if !process_output.status.success() { Branch (71:24): [True: 0, False: 0]
Branch (71:24): [Folded - Ignored]
Branch (71:24): [Folded - Ignored]
Branch (71:24): [True: 0, False: 1]
|
72 | 0 | return Err(make_err!( |
73 | 0 | process_output.status.code().unwrap().into(), |
74 | 0 | "{}", |
75 | 0 | err_fn() |
76 | 0 | )); |
77 | 1 | } |
78 | 1 | if !process_output.stderr.is_empty() { Branch (78:24): [True: 0, False: 0]
Branch (78:24): [Folded - Ignored]
Branch (78:24): [Folded - Ignored]
Branch (78:24): [True: 0, False: 1]
|
79 | 0 | eprintln!( |
80 | 0 | "{}", |
81 | 0 | from_utf8(&process_output.stderr).map_err(|e| make_input_err!( |
82 | | "Failed to decode stderr to utf8 : {:?}", |
83 | | e |
84 | 0 | ))? |
85 | | ); |
86 | 1 | } |
87 | 1 | let reader = BufReader::new(Cursor::new(process_output.stdout)); |
88 | | |
89 | 1 | let mut props = vec![]; |
90 | 1 | for value in reader.lines() { |
91 | 1 | props.push(Property { |
92 | 1 | name: property_name.clone(), |
93 | 1 | value: value |
94 | 1 | .err_tip(|| "Could split input by lines")?0 |
95 | 1 | .trim() |
96 | 1 | .to_string(), |
97 | | }); |
98 | | } |
99 | 1 | Ok(props) |
100 | | } |
101 | | } |
102 | 1 | }); |
103 | | } |
104 | | |
105 | | Ok(ConnectWorkerRequest { |
106 | 9 | worker_id_prefix, |
107 | 9 | properties: try_join_all(futures).await?0 .into_iter().flatten().collect(), |
108 | 9 | max_inflight_tasks, |
109 | | }) |
110 | 9 | } |