Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-service/src/capabilities_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
18
use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName};
19
use nativelink_error::{Error, ResultExt};
20
use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{
21
    Capabilities, CapabilitiesServer as Server,
22
};
23
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction;
24
use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange;
25
use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy;
26
use nativelink_proto::build::bazel::remote::execution::v2::{
27
    ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities,
28
    GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities,
29
};
30
use nativelink_proto::build::bazel::semver::SemVer;
31
use nativelink_util::digest_hasher::default_digest_hasher_func;
32
use nativelink_util::operation_state_manager::ClientStateManager;
33
use tonic::{Request, Response, Status};
34
use tracing::{event, instrument, Level};
35
36
const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024;
37
38
#[derive(Debug, Default)]
39
pub struct CapabilitiesServer {
40
    supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>,
41
}
42
43
impl CapabilitiesServer {
44
0
    pub async fn new(
45
0
        config: &HashMap<InstanceName, CapabilitiesConfig>,
46
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
47
0
    ) -> Result<Self, Error> {
48
0
        let mut supported_node_properties_for_instance = HashMap::new();
49
0
        for (instance_name, cfg) in config {
50
0
            let mut properties = Vec::new();
51
0
            if let Some(remote_execution_cfg) = &cfg.remote_execution {
  Branch (51:20): [Folded - Ignored]
  Branch (51:20): [Folded - Ignored]
52
0
                let scheduler =
53
0
                    scheduler_map
54
0
                        .get(&remote_execution_cfg.scheduler)
55
0
                        .err_tip(|| {
56
0
                            format!(
57
0
                                "Scheduler needs config for '{}' because it exists in capabilities",
58
0
                                remote_execution_cfg.scheduler
59
0
                            )
60
0
                        })?;
61
0
                if let Some(props_provider) = scheduler.as_known_platform_property_provider() {
  Branch (61:24): [Folded - Ignored]
  Branch (61:24): [Folded - Ignored]
62
0
                    for platform_key in props_provider
63
0
                        .get_known_properties(instance_name)
64
0
                        .await
65
0
                        .err_tip(|| {
66
0
                            format!("Failed to get platform properties for {instance_name}")
67
0
                        })?
68
0
                    {
69
0
                        properties.push(platform_key.clone());
70
0
                    }
71
                } else {
72
0
                    event!(
73
0
                        Level::WARN,
74
0
                        "Scheduler '{}' does not implement KnownPlatformPropertyProvider",
75
                        remote_execution_cfg.scheduler
76
                    );
77
                }
78
0
            }
79
0
            supported_node_properties_for_instance.insert(instance_name.clone(), properties);
80
        }
81
0
        Ok(CapabilitiesServer {
82
0
            supported_node_properties_for_instance,
83
0
        })
84
0
    }
85
86
0
    pub fn into_service(self) -> Server<CapabilitiesServer> {
87
0
        Server::new(self)
88
0
    }
89
}
90
91
#[tonic::async_trait]
92
impl Capabilities for CapabilitiesServer {
93
    #[allow(clippy::blocks_in_conditions)]
94
0
    #[instrument(
95
        err,
96
        ret(level = Level::INFO),
97
        level = Level::ERROR,
98
        skip_all,
99
        fields(request = ?grpc_request.get_ref())
100
0
    )]
101
    async fn get_capabilities(
102
        &self,
103
        grpc_request: Request<GetCapabilitiesRequest>,
104
0
    ) -> Result<Response<ServerCapabilities>, Status> {
105
0
        let instance_name = grpc_request.into_inner().instance_name;
106
0
        let maybe_supported_node_properties = self
107
0
            .supported_node_properties_for_instance
108
0
            .get(&instance_name);
109
0
        let execution_capabilities =
110
0
            maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
111
0
                digest_function: default_digest_hasher_func().proto_digest_func().into(),
112
0
                exec_enabled: true, // TODO(blaise.bruer) Make this configurable.
113
0
                execution_priority_capabilities: Some(PriorityCapabilities {
114
0
                    priorities: vec![PriorityRange {
115
0
                        min_priority: 0,
116
0
                        max_priority: i32::MAX,
117
0
                    }],
118
0
                }),
119
0
                supported_node_properties: props_for_instance.clone(),
120
0
                digest_functions: vec![
121
0
                    DigestFunction::Sha256.into(),
122
0
                    DigestFunction::Blake3.into(),
123
0
                ],
124
0
            });
125
0
126
0
        let resp = ServerCapabilities {
127
0
            cache_capabilities: Some(CacheCapabilities {
128
0
                digest_functions: vec![
129
0
                    DigestFunction::Sha256.into(),
130
0
                    DigestFunction::Blake3.into(),
131
0
                ],
132
0
                action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities {
133
0
                    update_enabled: true,
134
0
                }),
135
0
                cache_priority_capabilities: None,
136
0
                max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
137
0
                symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(),
138
0
                supported_compressors: vec![],
139
0
                supported_batch_update_compressors: vec![],
140
0
            }),
141
0
            execution_capabilities,
142
0
            deprecated_api_version: None,
143
0
            low_api_version: Some(SemVer {
144
0
                major: 2,
145
0
                minor: 0,
146
0
                patch: 0,
147
0
                prerelease: String::new(),
148
0
            }),
149
0
            high_api_version: Some(SemVer {
150
0
                major: 2,
151
0
                minor: 3,
152
0
                patch: 0,
153
0
                prerelease: String::new(),
154
0
            }),
155
0
        };
156
0
        Ok(Response::new(resp))
157
0
    }
158
}