Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/capabilities_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
18
use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName, WithInstanceName};
19
use nativelink_error::{Error, ResultExt};
20
use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{
21
    Capabilities, CapabilitiesServer as Server,
22
};
23
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction;
24
use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange;
25
use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy;
26
use nativelink_proto::build::bazel::remote::execution::v2::{
27
    ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities,
28
    GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities,
29
};
30
use nativelink_proto::build::bazel::semver::SemVer;
31
use nativelink_util::digest_hasher::default_digest_hasher_func;
32
use nativelink_util::operation_state_manager::ClientStateManager;
33
use tonic::{Request, Response, Status};
34
use tracing::{Level, instrument, warn};
35
36
const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024;
37
38
#[derive(Debug, Default)]
39
pub struct CapabilitiesServer {
40
    supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>,
41
}
42
43
impl CapabilitiesServer {
44
0
    pub async fn new(
45
0
        configs: &[WithInstanceName<CapabilitiesConfig>],
46
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
47
0
    ) -> Result<Self, Error> {
48
0
        let mut supported_node_properties_for_instance = HashMap::new();
49
0
        for config in configs {
50
0
            let mut properties = Vec::new();
51
0
            if let Some(remote_execution_cfg) = &config.remote_execution {
  Branch (51:20): [Folded - Ignored]
  Branch (51:20): [Folded - Ignored]
52
0
                let scheduler =
53
0
                    scheduler_map
54
0
                        .get(&remote_execution_cfg.scheduler)
55
0
                        .err_tip(|| {
56
0
                            format!(
57
0
                                "Scheduler needs config for '{}' because it exists in capabilities",
58
                                remote_execution_cfg.scheduler
59
                            )
60
0
                        })?;
61
0
                if let Some(props_provider) = scheduler.as_known_platform_property_provider() {
  Branch (61:24): [Folded - Ignored]
  Branch (61:24): [Folded - Ignored]
62
0
                    for platform_key in props_provider
63
0
                        .get_known_properties(&config.instance_name)
64
0
                        .await
65
0
                        .err_tip(|| {
66
0
                            format!(
67
0
                                "Failed to get platform properties for {}",
68
                                config.instance_name
69
                            )
70
0
                        })?
71
0
                    {
72
0
                        properties.push(platform_key.clone());
73
0
                    }
74
                } else {
75
0
                    warn!(
76
0
                        "Scheduler '{}' does not implement KnownPlatformPropertyProvider",
77
                        remote_execution_cfg.scheduler
78
                    );
79
                }
80
0
            }
81
0
            supported_node_properties_for_instance.insert(config.instance_name.clone(), properties);
82
        }
83
0
        Ok(Self {
84
0
            supported_node_properties_for_instance,
85
0
        })
86
0
    }
87
88
0
    pub fn into_service(self) -> Server<Self> {
89
0
        Server::new(self)
90
0
    }
91
}
92
93
#[tonic::async_trait]
94
impl Capabilities for CapabilitiesServer {
95
    #[instrument(
96
        err,
97
        ret(level = Level::INFO),
98
        level = Level::ERROR,
99
        skip_all,
100
        fields(request = ?grpc_request.get_ref())
101
    )]
102
    async fn get_capabilities(
103
        &self,
104
        grpc_request: Request<GetCapabilitiesRequest>,
105
0
    ) -> Result<Response<ServerCapabilities>, Status> {
106
0
        let request = grpc_request.into_inner();
107
108
0
        let instance_name = request.instance_name;
109
0
        let maybe_supported_node_properties = self
110
0
            .supported_node_properties_for_instance
111
0
            .get(&instance_name);
112
0
        let execution_capabilities =
113
0
            maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
114
0
                digest_function: default_digest_hasher_func().proto_digest_func().into(),
115
                exec_enabled: true, // TODO(aaronmondal) Make this configurable.
116
0
                execution_priority_capabilities: Some(PriorityCapabilities {
117
0
                    priorities: vec![PriorityRange {
118
0
                        min_priority: 0,
119
0
                        max_priority: i32::MAX,
120
0
                    }],
121
0
                }),
122
0
                supported_node_properties: props_for_instance.clone(),
123
0
                digest_functions: vec![
124
0
                    DigestFunction::Sha256.into(),
125
0
                    DigestFunction::Blake3.into(),
126
                ],
127
0
            });
128
129
0
        let resp = ServerCapabilities {
130
0
            cache_capabilities: Some(CacheCapabilities {
131
0
                digest_functions: vec![
132
0
                    DigestFunction::Sha256.into(),
133
0
                    DigestFunction::Blake3.into(),
134
0
                ],
135
0
                action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities {
136
0
                    update_enabled: true,
137
0
                }),
138
0
                cache_priority_capabilities: None,
139
0
                max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
140
0
                symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(),
141
0
                supported_compressors: vec![],
142
0
                supported_batch_update_compressors: vec![],
143
0
            }),
144
0
            execution_capabilities,
145
0
            deprecated_api_version: None,
146
0
            low_api_version: Some(SemVer {
147
0
                major: 2,
148
0
                minor: 0,
149
0
                patch: 0,
150
0
                prerelease: String::new(),
151
0
            }),
152
0
            high_api_version: Some(SemVer {
153
0
                major: 2,
154
0
                minor: 3,
155
0
                patch: 0,
156
0
                prerelease: String::new(),
157
0
            }),
158
0
        };
159
0
        Ok(Response::new(resp))
160
0
    }
161
}