Coverage Report

Created: 2026-06-25 16:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/capabilities_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::sync::Arc;
17
18
use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName, WithInstanceName};
19
use nativelink_error::{Error, ResultExt};
20
use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{
21
    Capabilities, CapabilitiesServer as Server,
22
};
23
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction;
24
use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange;
25
use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy;
26
use nativelink_proto::build::bazel::remote::execution::v2::{
27
    ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities,
28
    GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities,
29
};
30
use nativelink_proto::build::bazel::semver::SemVer;
31
use nativelink_scheduler::known_platform_property_provider::KnownPlatformPropertyProvider;
32
use nativelink_util::digest_hasher::default_digest_hasher_func;
33
use tonic::{Request, Response, Status};
34
use tracing::{Level, instrument};
35
36
const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024;
37
38
#[derive(Debug, Default)]
39
pub struct CapabilitiesServer {
40
    supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>,
41
}
42
43
impl CapabilitiesServer {
44
0
    pub async fn new(
45
0
        configs: &[WithInstanceName<CapabilitiesConfig>],
46
0
        scheduler_map: &HashMap<String, Arc<dyn KnownPlatformPropertyProvider>>,
47
0
    ) -> Result<Self, Error> {
48
0
        let mut supported_node_properties_for_instance = HashMap::new();
49
0
        for config in configs {
50
0
            let mut properties = Vec::new();
51
0
            if let Some(remote_execution_cfg) = &config.remote_execution {
52
0
                let scheduler =
53
0
                    scheduler_map
54
0
                        .get(&remote_execution_cfg.scheduler)
55
0
                        .err_tip(|| {
56
0
                            format!(
57
                                "Scheduler needs config for '{}' because it exists in capabilities",
58
                                remote_execution_cfg.scheduler
59
                            )
60
0
                        })?;
61
0
                for platform_key in scheduler
62
0
                    .get_known_properties(&config.instance_name)
63
0
                    .await
64
0
                    .err_tip(|| {
65
0
                        format!(
66
                            "Failed to get platform properties for {}",
67
                            config.instance_name
68
                        )
69
0
                    })?
70
0
                {
71
0
                    properties.push(platform_key.clone());
72
0
                }
73
0
            }
74
0
            supported_node_properties_for_instance.insert(config.instance_name.clone(), properties);
75
        }
76
0
        Ok(Self {
77
0
            supported_node_properties_for_instance,
78
0
        })
79
0
    }
80
81
0
    pub fn into_service(self) -> Server<Self> {
82
0
        Server::new(self)
83
0
    }
84
}
85
86
#[tonic::async_trait]
87
impl Capabilities for CapabilitiesServer {
88
    #[instrument(
89
        err,
90
        ret(level = Level::INFO),
91
        level = Level::ERROR,
92
        skip_all,
93
        fields(request = ?grpc_request.get_ref())
94
    )]
95
    async fn get_capabilities(
96
        &self,
97
        grpc_request: Request<GetCapabilitiesRequest>,
98
    ) -> Result<Response<ServerCapabilities>, Status> {
99
        let request = grpc_request.into_inner();
100
101
        let instance_name = request.instance_name;
102
        let maybe_supported_node_properties = self
103
            .supported_node_properties_for_instance
104
            .get(&instance_name);
105
        let execution_capabilities =
106
            maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities {
107
0
                digest_function: default_digest_hasher_func().proto_digest_func().into(),
108
                exec_enabled: true, // TODO(palfrey) Make this configurable.
109
0
                execution_priority_capabilities: Some(PriorityCapabilities {
110
0
                    priorities: vec![PriorityRange {
111
0
                        min_priority: 0,
112
0
                        max_priority: i32::MAX,
113
0
                    }],
114
0
                }),
115
0
                supported_node_properties: props_for_instance.clone(),
116
0
                digest_functions: vec![
117
0
                    DigestFunction::Sha256.into(),
118
0
                    DigestFunction::Blake3.into(),
119
                ],
120
0
            });
121
122
        let resp = ServerCapabilities {
123
            cache_capabilities: Some(CacheCapabilities {
124
                digest_functions: vec![
125
                    DigestFunction::Sha256.into(),
126
                    DigestFunction::Blake3.into(),
127
                ],
128
                action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities {
129
                    update_enabled: true,
130
                }),
131
                cache_priority_capabilities: None,
132
                max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE,
133
                symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(),
134
                supported_compressors: vec![],
135
                supported_batch_update_compressors: vec![],
136
            }),
137
            execution_capabilities,
138
            deprecated_api_version: None,
139
            low_api_version: Some(SemVer {
140
                major: 2,
141
                minor: 0,
142
                patch: 0,
143
                prerelease: String::new(),
144
            }),
145
            high_api_version: Some(SemVer {
146
                major: 2,
147
                minor: 3,
148
                patch: 0,
149
                prerelease: String::new(),
150
            }),
151
        };
152
        Ok(Response::new(resp))
153
    }
154
}