/build/source/nativelink-service/src/capabilities_server.rs
| Line | Count | Source | 
| 1 |  | // Copyright 2024 The NativeLink Authors. All rights reserved. | 
| 2 |  | // | 
| 3 |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); | 
| 4 |  | // you may not use this file except in compliance with the License. | 
| 5 |  | // You may obtain a copy of the License at | 
| 6 |  | // | 
| 7 |  | //    See LICENSE file for details | 
| 8 |  | // | 
| 9 |  | // Unless required by applicable law or agreed to in writing, software | 
| 10 |  | // distributed under the License is distributed on an "AS IS" BASIS, | 
| 11 |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 12 |  | // See the License for the specific language governing permissions and | 
| 13 |  | // limitations under the License. | 
| 14 |  |  | 
| 15 |  | use std::collections::HashMap; | 
| 16 |  | use std::sync::Arc; | 
| 17 |  |  | 
| 18 |  | use nativelink_config::cas_server::{CapabilitiesConfig, InstanceName, WithInstanceName}; | 
| 19 |  | use nativelink_error::{Error, ResultExt}; | 
| 20 |  | use nativelink_proto::build::bazel::remote::execution::v2::capabilities_server::{ | 
| 21 |  |     Capabilities, CapabilitiesServer as Server, | 
| 22 |  | }; | 
| 23 |  | use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as DigestFunction; | 
| 24 |  | use nativelink_proto::build::bazel::remote::execution::v2::priority_capabilities::PriorityRange; | 
| 25 |  | use nativelink_proto::build::bazel::remote::execution::v2::symlink_absolute_path_strategy::Value as SymlinkAbsolutePathStrategy; | 
| 26 |  | use nativelink_proto::build::bazel::remote::execution::v2::{ | 
| 27 |  |     ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities, | 
| 28 |  |     GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities, | 
| 29 |  | }; | 
| 30 |  | use nativelink_proto::build::bazel::semver::SemVer; | 
| 31 |  | use nativelink_util::digest_hasher::default_digest_hasher_func; | 
| 32 |  | use nativelink_util::operation_state_manager::ClientStateManager; | 
| 33 |  | use tonic::{Request, Response, Status}; | 
| 34 |  | use tracing::{Level, instrument, warn}; | 
| 35 |  |  | 
| 36 |  | const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; | 
| 37 |  |  | 
| 38 |  | #[derive(Debug, Default)] | 
| 39 |  | pub struct CapabilitiesServer { | 
| 40 |  |     supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>, | 
| 41 |  | } | 
| 42 |  |  | 
| 43 |  | impl CapabilitiesServer { | 
| 44 | 0 |     pub async fn new( | 
| 45 | 0 |         configs: &[WithInstanceName<CapabilitiesConfig>], | 
| 46 | 0 |         scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>, | 
| 47 | 0 |     ) -> Result<Self, Error> { | 
| 48 | 0 |         let mut supported_node_properties_for_instance = HashMap::new(); | 
| 49 | 0 |         for config in configs { | 
| 50 | 0 |             let mut properties = Vec::new(); | 
| 51 | 0 |             if let Some(remote_execution_cfg) = &config.remote_execution {  Branch (51:20): [Folded - Ignored]
  Branch (51:20): [Folded - Ignored]
 | 
| 52 | 0 |                 let scheduler = | 
| 53 | 0 |                     scheduler_map | 
| 54 | 0 |                         .get(&remote_execution_cfg.scheduler) | 
| 55 | 0 |                         .err_tip(|| { | 
| 56 | 0 |                             format!( | 
| 57 | 0 |                                 "Scheduler needs config for '{}' because it exists in capabilities", | 
| 58 |  |                                 remote_execution_cfg.scheduler | 
| 59 |  |                             ) | 
| 60 | 0 |                         })?; | 
| 61 | 0 |                 if let Some(props_provider) = scheduler.as_known_platform_property_provider() {  Branch (61:24): [Folded - Ignored]
  Branch (61:24): [Folded - Ignored]
 | 
| 62 | 0 |                     for platform_key in props_provider | 
| 63 | 0 |                         .get_known_properties(&config.instance_name) | 
| 64 | 0 |                         .await | 
| 65 | 0 |                         .err_tip(|| { | 
| 66 | 0 |                             format!( | 
| 67 | 0 |                                 "Failed to get platform properties for {}", | 
| 68 |  |                                 config.instance_name | 
| 69 |  |                             ) | 
| 70 | 0 |                         })? | 
| 71 | 0 |                     { | 
| 72 | 0 |                         properties.push(platform_key.clone()); | 
| 73 | 0 |                     } | 
| 74 |  |                 } else { | 
| 75 | 0 |                     warn!( | 
| 76 | 0 |                         "Scheduler '{}' does not implement KnownPlatformPropertyProvider", | 
| 77 |  |                         remote_execution_cfg.scheduler | 
| 78 |  |                     ); | 
| 79 |  |                 } | 
| 80 | 0 |             } | 
| 81 | 0 |             supported_node_properties_for_instance.insert(config.instance_name.clone(), properties); | 
| 82 |  |         } | 
| 83 | 0 |         Ok(Self { | 
| 84 | 0 |             supported_node_properties_for_instance, | 
| 85 | 0 |         }) | 
| 86 | 0 |     } | 
| 87 |  |  | 
| 88 | 0 |     pub fn into_service(self) -> Server<Self> { | 
| 89 | 0 |         Server::new(self) | 
| 90 | 0 |     } | 
| 91 |  | } | 
| 92 |  |  | 
| 93 |  | #[tonic::async_trait] | 
| 94 |  | impl Capabilities for CapabilitiesServer { | 
| 95 |  |     #[instrument( | 
| 96 |  |         err, | 
| 97 |  |         ret(level = Level::INFO), | 
| 98 |  |         level = Level::ERROR, | 
| 99 |  |         skip_all, | 
| 100 |  |         fields(request = ?grpc_request.get_ref()) | 
| 101 |  |     )] | 
| 102 |  |     async fn get_capabilities( | 
| 103 |  |         &self, | 
| 104 |  |         grpc_request: Request<GetCapabilitiesRequest>, | 
| 105 |  |     ) -> Result<Response<ServerCapabilities>, Status> { | 
| 106 |  |         let request = grpc_request.into_inner(); | 
| 107 |  |  | 
| 108 |  |         let instance_name = request.instance_name; | 
| 109 |  |         let maybe_supported_node_properties = self | 
| 110 |  |             .supported_node_properties_for_instance | 
| 111 |  |             .get(&instance_name); | 
| 112 |  |         let execution_capabilities = | 
| 113 |  |             maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities { | 
| 114 | 0 |                 digest_function: default_digest_hasher_func().proto_digest_func().into(), | 
| 115 |  |                 exec_enabled: true, // TODO(palfrey) Make this configurable. | 
| 116 | 0 |                 execution_priority_capabilities: Some(PriorityCapabilities { | 
| 117 | 0 |                     priorities: vec![PriorityRange { | 
| 118 | 0 |                         min_priority: 0, | 
| 119 | 0 |                         max_priority: i32::MAX, | 
| 120 | 0 |                     }], | 
| 121 | 0 |                 }), | 
| 122 | 0 |                 supported_node_properties: props_for_instance.clone(), | 
| 123 | 0 |                 digest_functions: vec![ | 
| 124 | 0 |                     DigestFunction::Sha256.into(), | 
| 125 | 0 |                     DigestFunction::Blake3.into(), | 
| 126 |  |                 ], | 
| 127 | 0 |             }); | 
| 128 |  |  | 
| 129 |  |         let resp = ServerCapabilities { | 
| 130 |  |             cache_capabilities: Some(CacheCapabilities { | 
| 131 |  |                 digest_functions: vec![ | 
| 132 |  |                     DigestFunction::Sha256.into(), | 
| 133 |  |                     DigestFunction::Blake3.into(), | 
| 134 |  |                 ], | 
| 135 |  |                 action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { | 
| 136 |  |                     update_enabled: true, | 
| 137 |  |                 }), | 
| 138 |  |                 cache_priority_capabilities: None, | 
| 139 |  |                 max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE, | 
| 140 |  |                 symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(), | 
| 141 |  |                 supported_compressors: vec![], | 
| 142 |  |                 supported_batch_update_compressors: vec![], | 
| 143 |  |             }), | 
| 144 |  |             execution_capabilities, | 
| 145 |  |             deprecated_api_version: None, | 
| 146 |  |             low_api_version: Some(SemVer { | 
| 147 |  |                 major: 2, | 
| 148 |  |                 minor: 0, | 
| 149 |  |                 patch: 0, | 
| 150 |  |                 prerelease: String::new(), | 
| 151 |  |             }), | 
| 152 |  |             high_api_version: Some(SemVer { | 
| 153 |  |                 major: 2, | 
| 154 |  |                 minor: 3, | 
| 155 |  |                 patch: 0, | 
| 156 |  |                 prerelease: String::new(), | 
| 157 |  |             }), | 
| 158 |  |         }; | 
| 159 |  |         Ok(Response::new(resp)) | 
| 160 |  |     } | 
| 161 |  | } |