Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/worker_utils.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::BuildHasher;
16
use core::str::from_utf8;
17
use std::collections::HashMap;
18
use std::io::{BufRead, BufReader, Cursor};
19
use std::process::Stdio;
20
21
use futures::future::try_join_all;
22
use nativelink_config::cas_server::WorkerProperty;
23
use nativelink_error::{Error, ResultExt, make_err, make_input_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::platform::Property;
25
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest;
26
use tokio::process;
27
use tracing::info;
28
29
#[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
30
9
pub async fn make_connect_worker_request<S: BuildHasher>(
31
9
    worker_id_prefix: String,
32
9
    worker_properties: &HashMap<String, WorkerProperty, S>,
33
9
    extra_envs: &HashMap<String, String, S>,
34
9
    max_inflight_tasks: u64,
35
9
) -> Result<ConnectWorkerRequest, Error> {
36
9
    let mut futures = vec![];
37
10
    for (
property_name1
,
worker_property1
) in worker_properties {
38
1
        futures.push(async move {
39
1
            match worker_property {
40
0
                WorkerProperty::Values(values) => {
41
0
                    let mut props = Vec::with_capacity(values.len());
42
0
                    for value in values {
43
0
                        props.push(Property {
44
0
                            name: property_name.clone(),
45
0
                            value: value.clone(),
46
0
                        });
47
0
                    }
48
0
                    Ok(props)
49
                }
50
1
                WorkerProperty::QueryCmd(cmd) => {
51
1
                    let maybe_split_cmd = shlex::split(cmd);
52
1
                    let (command, args) = match &maybe_split_cmd {
53
1
                        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
54
                        None => {
55
0
                            return Err(make_input_err!(
56
0
                                "Could not parse the value of worker property: {}: '{}'",
57
0
                                property_name,
58
0
                                cmd
59
0
                            ));
60
                        }
61
                    };
62
1
                    let mut process = process::Command::new(command);
63
1
                    process.env_clear();
64
1
                    process.envs(extra_envs);
65
1
                    process.args(args);
66
1
                    process.stdin(Stdio::null());
67
1
                    let err_fn =
68
0
                        || format!("Error executing property_name {property_name} command");
69
1
                    info!(cmd, property_name, "Spawning process",);
70
1
                    let process_output = process.output().await.err_tip(err_fn)
?0
;
71
1
                    if !process_output.status.success() {
  Branch (71:24): [True: 0, False: 0]
  Branch (71:24): [Folded - Ignored]
  Branch (71:24): [Folded - Ignored]
  Branch (71:24): [True: 0, False: 1]
72
0
                        return Err(make_err!(
73
0
                            process_output.status.code().unwrap().into(),
74
0
                            "{}",
75
0
                            err_fn()
76
0
                        ));
77
1
                    }
78
1
                    if !process_output.stderr.is_empty() {
  Branch (78:24): [True: 0, False: 0]
  Branch (78:24): [Folded - Ignored]
  Branch (78:24): [Folded - Ignored]
  Branch (78:24): [True: 0, False: 1]
79
0
                        eprintln!(
80
0
                            "{}",
81
0
                            from_utf8(&process_output.stderr).map_err(|e| make_input_err!(
82
                                "Failed to decode stderr to utf8 : {:?}",
83
                                e
84
0
                            ))?
85
                        );
86
1
                    }
87
1
                    let reader = BufReader::new(Cursor::new(process_output.stdout));
88
89
1
                    let mut props = vec![];
90
1
                    for value in reader.lines() {
91
1
                        props.push(Property {
92
1
                            name: property_name.clone(),
93
1
                            value: value
94
1
                                .err_tip(|| "Could split input by lines")
?0
95
1
                                .trim()
96
1
                                .to_string(),
97
                        });
98
                    }
99
1
                    Ok(props)
100
                }
101
            }
102
1
        });
103
    }
104
105
    Ok(ConnectWorkerRequest {
106
9
        worker_id_prefix,
107
9
        properties: try_join_all(futures).await
?0
.into_iter().flatten().collect(),
108
9
        max_inflight_tasks,
109
    })
110
9
}