/build/source/nativelink-service/src/capabilities_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::sync::Arc; |
17 | | |
18 | | use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName}; |
19 | | use nativelink_error::{Error, ResultExt}; |
20 | | use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{ |
21 | | Capabilities, CapabilitiesServer as Server, |
22 | | }; |
23 | | use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy; |
26 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
27 | | ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities, |
28 | | GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities, |
29 | | }; |
30 | | use nativelink_proto::build::bazel::semver::SemVer; |
31 | | use nativelink_util::digest_hasher::default_digest_hasher_func; |
32 | | use nativelink_util::operation_state_manager::ClientStateManager; |
33 | | use nativelink_util::origin_event::OriginEventContext; |
34 | | use tonic::{Request, Response, Status}; |
35 | | use tracing::{event, instrument, Level}; |
36 | | |
37 | | const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; |
38 | | |
39 | | #[derive(Debug, Default)] |
40 | | pub struct CapabilitiesServer { |
41 | | supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>, |
42 | | } |
43 | | |
44 | | impl CapabilitiesServer { |
45 | 0 | pub async fn new( |
46 | 0 | config: &HashMap<InstanceName, CapabilitiesConfig>, |
47 | 0 | scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>, |
48 | 0 | ) -> Result<Self, Error> { |
49 | 0 | let mut supported_node_properties_for_instance = HashMap::new(); |
50 | 0 | for (instance_name, cfg) in config { |
51 | 0 | let mut properties = Vec::new(); |
52 | 0 | if let Some(remote_execution_cfg) = &cfg.remote_execution { Branch (52:20): [Folded - Ignored]
Branch (52:20): [Folded - Ignored]
|
53 | 0 | let scheduler = |
54 | 0 | scheduler_map |
55 | 0 | .get(&remote_execution_cfg.scheduler) |
56 | 0 | .err_tip(|| { |
57 | 0 | format!( |
58 | 0 | "Scheduler needs config for '{}' because it exists in capabilities", |
59 | 0 | remote_execution_cfg.scheduler |
60 | 0 | ) |
61 | 0 | })?; |
62 | 0 | if let Some(props_provider) = scheduler.as_known_platform_property_provider() { Branch (62:24): [Folded - Ignored]
Branch (62:24): [Folded - Ignored]
|
63 | 0 | for platform_key in props_provider |
64 | 0 | .get_known_properties(instance_name) |
65 | 0 | .await |
66 | 0 | .err_tip(|| { |
67 | 0 | format!("Failed to get platform properties for {instance_name}") |
68 | 0 | })? |
69 | 0 | { |
70 | 0 | properties.push(platform_key.clone()); |
71 | 0 | } |
72 | | } else { |
73 | 0 | event!( |
74 | 0 | Level::WARN, |
75 | 0 | "Scheduler '{}' does not implement KnownPlatformPropertyProvider", |
76 | | remote_execution_cfg.scheduler |
77 | | ); |
78 | | } |
79 | 0 | } |
80 | 0 | supported_node_properties_for_instance.insert(instance_name.clone(), properties); |
81 | | } |
82 | 0 | Ok(CapabilitiesServer { |
83 | 0 | supported_node_properties_for_instance, |
84 | 0 | }) |
85 | 0 | } |
86 | | |
87 | 0 | pub fn into_service(self) -> Server<CapabilitiesServer> { |
88 | 0 | Server::new(self) |
89 | 0 | } |
90 | | } |
91 | | |
92 | | #[tonic::async_trait] |
93 | | impl Capabilities for CapabilitiesServer { |
94 | | #[allow(clippy::blocks_in_conditions)] |
95 | | #[instrument( |
96 | | err, |
97 | | ret(level = Level::INFO), |
98 | | level = Level::ERROR, |
99 | | skip_all, |
100 | | fields(request = ?grpc_request.get_ref()) |
101 | | )] |
102 | | async fn get_capabilities( |
103 | | &self, |
104 | | grpc_request: Request<GetCapabilitiesRequest>, |
105 | 0 | ) -> Result<Response<ServerCapabilities>, Status> { |
106 | 0 | let request = grpc_request.into_inner(); |
107 | 0 | let ctx = OriginEventContext::new(|| &request).await; |
108 | | |
109 | 0 | let instance_name = request.instance_name; |
110 | 0 | let maybe_supported_node_properties = self |
111 | 0 | .supported_node_properties_for_instance |
112 | 0 | .get(&instance_name); |
113 | 0 | let execution_capabilities = |
114 | 0 | maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities { |
115 | 0 | digest_function: default_digest_hasher_func().proto_digest_func().into(), |
116 | 0 | exec_enabled: true, // TODO(blaise.bruer) Make this configurable. |
117 | 0 | execution_priority_capabilities: Some(PriorityCapabilities { |
118 | 0 | priorities: vec![PriorityRange { |
119 | 0 | min_priority: 0, |
120 | 0 | max_priority: i32::MAX, |
121 | 0 | }], |
122 | 0 | }), |
123 | 0 | supported_node_properties: props_for_instance.clone(), |
124 | 0 | digest_functions: vec![ |
125 | 0 | DigestFunction::Sha256.into(), |
126 | 0 | DigestFunction::Blake3.into(), |
127 | 0 | ], |
128 | 0 | }); |
129 | 0 |
|
130 | 0 | let resp = ServerCapabilities { |
131 | 0 | cache_capabilities: Some(CacheCapabilities { |
132 | 0 | digest_functions: vec![ |
133 | 0 | DigestFunction::Sha256.into(), |
134 | 0 | DigestFunction::Blake3.into(), |
135 | 0 | ], |
136 | 0 | action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { |
137 | 0 | update_enabled: true, |
138 | 0 | }), |
139 | 0 | cache_priority_capabilities: None, |
140 | 0 | max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE, |
141 | 0 | symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(), |
142 | 0 | supported_compressors: vec![], |
143 | 0 | supported_batch_update_compressors: vec![], |
144 | 0 | }), |
145 | 0 | execution_capabilities, |
146 | 0 | deprecated_api_version: None, |
147 | 0 | low_api_version: Some(SemVer { |
148 | 0 | major: 2, |
149 | 0 | minor: 0, |
150 | 0 | patch: 0, |
151 | 0 | prerelease: String::new(), |
152 | 0 | }), |
153 | 0 | high_api_version: Some(SemVer { |
154 | 0 | major: 2, |
155 | 0 | minor: 3, |
156 | 0 | patch: 0, |
157 | 0 | prerelease: String::new(), |
158 | 0 | }), |
159 | 0 | }; |
160 | 0 | ctx.emit(|| &resp).await; |
161 | 0 | Ok(Response::new(resp)) |
162 | 0 | } |
163 | | } |