Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-worker/src/worker_utils.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::hash::BuildHasher;
17
use std::io::{BufRead, BufReader, Cursor};
18
use std::process::Stdio;
19
use std::str::from_utf8;
20
21
use futures::future::try_join_all;
22
use nativelink_config::cas_server::WorkerProperty;
23
use nativelink_error::{make_err, make_input_err, Error, ResultExt};
24
use nativelink_proto::build::bazel::remote::execution::v2::platform::Property;
25
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::SupportedProperties;
26
use tokio::process;
27
use tracing::{event, Level};
28
29
8
pub async fn make_supported_properties<S: BuildHasher>(
30
8
    worker_properties: &HashMap<String, WorkerProperty, S>,
31
8
) -> Result<SupportedProperties, Error> {
32
8
    let mut futures = vec![];
33
8
    for (
property_name, worker_property0
) in worker_properties {
34
0
        futures.push(async move {
35
0
            match worker_property {
36
0
                WorkerProperty::values(values) => {
37
0
                    let mut props = Vec::with_capacity(values.len());
38
0
                    for value in values {
39
0
                        props.push(Property {
40
0
                            name: property_name.clone(),
41
0
                            value: value.clone(),
42
0
                        });
43
0
                    }
44
0
                    Ok(props)
45
                }
46
0
                WorkerProperty::query_cmd(cmd) => {
47
0
                    let maybe_split_cmd = shlex::split(cmd);
48
0
                    let (command, args) = match &maybe_split_cmd {
49
0
                        Some(split_cmd) => (&split_cmd[0], &split_cmd[1..]),
50
                        None => {
51
0
                            return Err(make_input_err!(
52
0
                                "Could not parse the value of worker property: {}: '{}'",
53
0
                                property_name,
54
0
                                cmd
55
0
                            ))
56
                        }
57
                    };
58
0
                    let mut process = process::Command::new(command);
59
0
                    process.env_clear();
60
0
                    process.args(args);
61
0
                    process.stdin(Stdio::null());
62
0
                    let err_fn =
63
0
                        || format!("Error executing property_name {property_name} command");
64
0
                    event!(Level::INFO, cmd, property_name, "Spawning process",);
65
0
                    let process_output = process.output().await.err_tip(err_fn)?;
66
0
                    if !process_output.status.success() {
  Branch (66:24): [True: 0, False: 0]
  Branch (66:24): [Folded - Ignored]
  Branch (66:24): [Folded - Ignored]
67
0
                        return Err(make_err!(
68
0
                            process_output.status.code().unwrap().into(),
69
0
                            "{}",
70
0
                            err_fn()
71
0
                        ));
72
0
                    }
73
0
                    if !process_output.stderr.is_empty() {
  Branch (73:24): [True: 0, False: 0]
  Branch (73:24): [Folded - Ignored]
  Branch (73:24): [Folded - Ignored]
74
0
                        eprintln!(
75
0
                            "{}",
76
0
                            from_utf8(&process_output.stderr).map_err(|e| make_input_err!(
77
0
                                "Failed to decode stderr to utf8 : {:?}",
78
0
                                e
79
0
                            ))?
80
                        );
81
0
                    }
82
0
                    let reader = BufReader::new(Cursor::new(process_output.stdout));
83
0
84
0
                    let mut props = vec![];
85
0
                    for value in reader.lines() {
86
0
                        props.push(Property {
87
0
                            name: property_name.clone(),
88
0
                            value: value
89
0
                                .err_tip(|| "Could split input by lines")?
90
0
                                .trim()
91
0
                                .to_string(),
92
                        });
93
                    }
94
0
                    Ok(props)
95
                }
96
            }
97
0
        });
98
0
    }
99
100
    Ok(SupportedProperties {
101
8
        properties: try_join_all(futures).
await0
?0
.into_iter().flatten().collect(),
102
    })
103
8
}