Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/worker_utils.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::hash::BuildHasher;
17
use std::io::{BufRead, BufReader, Cursor};
18
use std::process::Stdio;
19
use std::str::from_utf8;
20
21
use futures::future::try_join_all;
22
use nativelink_config::cas_server::WorkerProperty;
23
use nativelink_error::{make_err, make_input_err, Error, ResultExt};
24
use nativelink_proto::build::bazel::remote::execution::v2::platform::Property;
25
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest;
26
use tokio::process;
27
use tracing::{event, Level};
28
29
8
pub async fn make_connect_worker_request<S: BuildHasher>(
30
8
    worker_id_prefix: String,
31
8
    worker_properties: &HashMap<String, WorkerProperty, S>,
32
8
) -> Result<ConnectWorkerRequest, Error> {
33
8
    let mut futures = vec![];
34
8
    for (
property_name, worker_property0
) in worker_properties {
35
0
        futures.push(async move {
36
0
            match worker_property {
37
0
                WorkerProperty::values(values) => {
38
0
                    let mut props = Vec::with_capacity(values.len());
39
0
                    for value in values {
40
0
                        props.push(Property {
41
0
                            name: property_name.clone(),
42
0
                            value: value.clone(),
43
0
                        });
44
0
                    }
45
0
                    Ok(props)
46
                }
47
0
                WorkerProperty::query_cmd(cmd) => {
48
0
                    let maybe_split_cmd = shlex::split(cmd);
49
0
                    let (command, args) = match &maybe_split_cmd {
50
0
                        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
51
                        None => {
52
0
                            return Err(make_input_err!(
53
0
                                "Could not parse the value of worker property: {}: '{}'",
54
0
                                property_name,
55
0
                                cmd
56
0
                            ))
57
                        }
58
                    };
59
0
                    let mut process = process::Command::new(command);
60
0
                    process.env_clear();
61
0
                    process.args(args);
62
0
                    process.stdin(Stdio::null());
63
0
                    let err_fn =
64
0
                        || format!("Error executing property_name {property_name} command");
65
0
                    event!(Level::INFO, cmd, property_name, "Spawning process",);
66
0
                    let process_output = process.output().await.err_tip(err_fn)?;
67
0
                    if !process_output.status.success() {
  Branch (67:24): [True: 0, False: 0]
  Branch (67:24): [Folded - Ignored]
  Branch (67:24): [Folded - Ignored]
68
0
                        return Err(make_err!(
69
0
                            process_output.status.code().unwrap().into(),
70
0
                            "{}",
71
0
                            err_fn()
72
0
                        ));
73
0
                    }
74
0
                    if !process_output.stderr.is_empty() {
  Branch (74:24): [True: 0, False: 0]
  Branch (74:24): [Folded - Ignored]
  Branch (74:24): [Folded - Ignored]
75
0
                        eprintln!(
76
0
                            "{}",
77
0
                            from_utf8(&process_output.stderr).map_err(|e| make_input_err!(
78
0
                                "Failed to decode stderr to utf8 : {:?}",
79
0
                                e
80
0
                            ))?
81
                        );
82
0
                    }
83
0
                    let reader = BufReader::new(Cursor::new(process_output.stdout));
84
0
85
0
                    let mut props = vec![];
86
0
                    for value in reader.lines() {
87
0
                        props.push(Property {
88
0
                            name: property_name.clone(),
89
0
                            value: value
90
0
                                .err_tip(|| "Could split input by lines")?
91
0
                                .trim()
92
0
                                .to_string(),
93
                        });
94
                    }
95
0
                    Ok(props)
96
                }
97
            }
98
0
        });
99
0
    }
100
101
    Ok(ConnectWorkerRequest {
102
8
        worker_id_prefix,
103
8
        properties: try_join_all(futures).await
?0
.into_iter().flatten().collect(),
104
    })
105
8
}