Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/worker_utils.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::BuildHasher;
16
use core::str::from_utf8;
17
use std::collections::HashMap;
18
use std::io::{BufRead, BufReader, Cursor};
19
use std::process::Stdio;
20
21
use futures::future::try_join_all;
22
use nativelink_config::cas_server::WorkerProperty;
23
use nativelink_error::{Error, ResultExt, make_err, make_input_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::platform::Property;
25
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest;
26
use tokio::process;
27
use tracing::info;
28
29
#[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
30
8
pub async fn make_connect_worker_request<S: BuildHasher>(
31
8
    worker_id_prefix: String,
32
8
    worker_properties: &HashMap<String, WorkerProperty, S>,
33
8
) -> Result<ConnectWorkerRequest, Error> {
34
8
    let mut futures = vec![];
35
8
    for (
property_name0
,
worker_property0
) in worker_properties {
36
0
        futures.push(async move {
37
0
            match worker_property {
38
0
                WorkerProperty::Values(values) => {
39
0
                    let mut props = Vec::with_capacity(values.len());
40
0
                    for value in values {
41
0
                        props.push(Property {
42
0
                            name: property_name.clone(),
43
0
                            value: value.clone(),
44
0
                        });
45
0
                    }
46
0
                    Ok(props)
47
                }
48
0
                WorkerProperty::QueryCmd(cmd) => {
49
0
                    let maybe_split_cmd = shlex::split(cmd);
50
0
                    let (command, args) = match &maybe_split_cmd {
51
0
                        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
52
                        None => {
53
0
                            return Err(make_input_err!(
54
0
                                "Could not parse the value of worker property: {}: '{}'",
55
0
                                property_name,
56
0
                                cmd
57
0
                            ));
58
                        }
59
                    };
60
0
                    let mut process = process::Command::new(command);
61
0
                    process.env_clear();
62
0
                    process.args(args);
63
0
                    process.stdin(Stdio::null());
64
0
                    let err_fn =
65
0
                        || format!("Error executing property_name {property_name} command");
66
0
                    info!(cmd, property_name, "Spawning process",);
67
0
                    let process_output = process.output().await.err_tip(err_fn)?;
68
0
                    if !process_output.status.success() {
  Branch (68:24): [True: 0, False: 0]
  Branch (68:24): [Folded - Ignored]
  Branch (68:24): [Folded - Ignored]
69
0
                        return Err(make_err!(
70
0
                            process_output.status.code().unwrap().into(),
71
0
                            "{}",
72
0
                            err_fn()
73
0
                        ));
74
0
                    }
75
0
                    if !process_output.stderr.is_empty() {
  Branch (75:24): [True: 0, False: 0]
  Branch (75:24): [Folded - Ignored]
  Branch (75:24): [Folded - Ignored]
76
0
                        eprintln!(
77
0
                            "{}",
78
0
                            from_utf8(&process_output.stderr).map_err(|e| make_input_err!(
79
                                "Failed to decode stderr to utf8 : {:?}",
80
                                e
81
0
                            ))?
82
                        );
83
0
                    }
84
0
                    let reader = BufReader::new(Cursor::new(process_output.stdout));
85
86
0
                    let mut props = vec![];
87
0
                    for value in reader.lines() {
88
0
                        props.push(Property {
89
0
                            name: property_name.clone(),
90
0
                            value: value
91
0
                                .err_tip(|| "Could split input by lines")?
92
0
                                .trim()
93
0
                                .to_string(),
94
                        });
95
                    }
96
0
                    Ok(props)
97
                }
98
            }
99
0
        });
100
    }
101
102
    Ok(ConnectWorkerRequest {
103
8
        worker_id_prefix,
104
8
        properties: try_join_all(futures).await
?0
.into_iter().flatten().collect(),
105
    })
106
8
}