/build/source/nativelink-service/src/push_server.rs
| Line | Count | Source | 
| 1 |  | // Copyright 2025 The NativeLink Authors. All rights reserved. | 
| 2 |  | // | 
| 3 |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); | 
| 4 |  | // you may not use this file except in compliance with the License. | 
| 5 |  | // You may obtain a copy of the License at | 
| 6 |  | // | 
| 7 |  | //    See LICENSE file for details | 
| 8 |  | // | 
| 9 |  | // Unless required by applicable law or agreed to in writing, software | 
| 10 |  | // distributed under the License is distributed on an "AS IS" BASIS, | 
| 11 |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 12 |  | // See the License for the specific language governing permissions and | 
| 13 |  | // limitations under the License. | 
| 14 |  |  | 
| 15 |  | use core::convert::Into; | 
| 16 |  | use std::collections::HashMap; | 
| 17 |  |  | 
| 18 |  | use nativelink_config::cas_server::{PushConfig, WithInstanceName}; | 
| 19 |  | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; | 
| 20 |  | use nativelink_proto::build::bazel::remote::asset::v1::push_server::{Push, PushServer as Server}; | 
| 21 |  | use nativelink_proto::build::bazel::remote::asset::v1::{ | 
| 22 |  |     PushBlobRequest, PushBlobResponse, PushDirectoryRequest, PushDirectoryResponse, | 
| 23 |  | }; | 
| 24 |  | use nativelink_store::store_manager::StoreManager; | 
| 25 |  | use nativelink_util::digest_hasher::make_ctx_for_hash_func; | 
| 26 |  | use nativelink_util::store_trait::{Store, StoreLike}; | 
| 27 |  | use opentelemetry::context::FutureExt; | 
| 28 |  | use tonic::{Request, Response, Status}; | 
| 29 |  | use tracing::{Instrument, Level, error_span, info, instrument}; | 
| 30 |  |  | 
| 31 |  | use crate::remote_asset_proto::RemoteAssetArtifact; | 
| 32 |  |  | 
| 33 |  | #[derive(Debug, Clone)] | 
| 34 |  | pub struct PushStoreInfo { | 
| 35 |  |     store: Store, | 
| 36 |  |     read_only: bool, | 
| 37 |  | } | 
| 38 |  |  | 
| 39 |  | #[derive(Debug, Clone)] | 
| 40 |  | pub struct PushServer { | 
| 41 |  |     stores: HashMap<String, PushStoreInfo>, | 
| 42 |  | } | 
| 43 |  |  | 
| 44 |  | impl PushServer { | 
| 45 | 1 |     pub fn new( | 
| 46 | 1 |         configs: &[WithInstanceName<PushConfig>], | 
| 47 | 1 |         store_manager: &StoreManager, | 
| 48 | 1 |     ) -> Result<Self, Error> { | 
| 49 | 1 |         let mut stores = HashMap::with_capacity(configs.len()); | 
| 50 | 2 |         for config1in configs { | 
| 51 | 1 |             let store = store_manager.get_store(&config.push_store).ok_or_else(|| {0 | 
| 52 | 0 |                 make_input_err!("'push_store': '{}' does not exist", config.push_store) | 
| 53 | 0 |             })?; | 
| 54 | 1 |             stores.insert( | 
| 55 | 1 |                 config.instance_name.to_string(), | 
| 56 | 1 |                 PushStoreInfo { | 
| 57 | 1 |                     store, | 
| 58 | 1 |                     read_only: config.read_only, | 
| 59 | 1 |                 }, | 
| 60 |  |             ); | 
| 61 |  |         } | 
| 62 | 1 |         Ok(Self { | 
| 63 | 1 |             stores: stores.clone(), | 
| 64 | 1 |         }) | 
| 65 | 1 |     } | 
| 66 |  |  | 
| 67 | 0 |     pub fn into_service(self) -> Server<Self> { | 
| 68 | 0 |         Server::new(self) | 
| 69 | 0 |     } | 
| 70 |  |  | 
| 71 | 1 |     async fn inner_push_blob( | 
| 72 | 1 |         &self, | 
| 73 | 1 |         request: PushBlobRequest, | 
| 74 | 1 |     ) -> Result<Response<PushBlobResponse>, Error> { | 
| 75 | 1 |         let instance_name = &request.instance_name; | 
| 76 | 1 |         let store_info = self | 
| 77 | 1 |             .stores | 
| 78 | 1 |             .get(instance_name) | 
| 79 | 1 |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0)) ?0; | 
| 80 |  |  | 
| 81 | 1 |         if store_info.read_only {  Branch (81:12): [True: 0, False: 1]
  Branch (81:12): [Folded - Ignored]
 | 
| 82 | 0 |             return Err(make_err!( | 
| 83 | 0 |                 Code::PermissionDenied, | 
| 84 | 0 |                 "The store '{instance_name}' is read only on this endpoint", | 
| 85 | 0 |             )); | 
| 86 | 1 |         } | 
| 87 |  |  | 
| 88 | 1 |         let blob_digest = request.blob_digest.ok_or_else(|| {0 | 
| 89 | 0 |             Error::new( | 
| 90 | 0 |                 Code::InvalidArgument, | 
| 91 | 0 |                 "Missing blob digest in push".to_owned(), | 
| 92 |  |             ) | 
| 93 | 0 |         })?; | 
| 94 | 1 |         if request.uris.is_empty() {  Branch (94:12): [True: 0, False: 1]
  Branch (94:12): [Folded - Ignored]
 | 
| 95 | 0 |             return Err(Error::new( | 
| 96 | 0 |                 Code::InvalidArgument, | 
| 97 | 0 |                 "No uris in push request".to_owned(), | 
| 98 | 0 |             )); | 
| 99 | 1 |         } | 
| 100 | 2 |         for uri1in request.uris { | 
| 101 | 1 |             let asset = RemoteAssetArtifact::new( | 
| 102 | 1 |                 uri.clone(), | 
| 103 | 1 |                 request.qualifiers.clone(), | 
| 104 | 1 |                 blob_digest.clone(), | 
| 105 | 1 |                 request.expire_at, | 
| 106 | 1 |                 request.digest_function, | 
| 107 |  |             ); | 
| 108 | 1 |             let asset_digest = asset.digest(); | 
| 109 | 1 |             store_info | 
| 110 | 1 |                 .store | 
| 111 | 1 |                 .update_oneshot(asset_digest, asset.as_bytes()) | 
| 112 | 1 |                 .await | 
| 113 | 1 |                 .err_tip(|| "Failed to update in push server")?0; | 
| 114 | 1 |             info!( | 
| 115 |  |                 uri = uri, | 
| 116 | 1 |                 digest = format!("{}", asset_digest), | 
| 117 | 1 |                 "Pushed asset metadata with hash" | 
| 118 |  |             ); | 
| 119 |  |         } | 
| 120 |  |  | 
| 121 | 1 |         Ok(Response::new(PushBlobResponse {})) | 
| 122 | 1 |     } | 
| 123 |  | } | 
| 124 |  |  | 
| 125 |  | #[tonic::async_trait] | 
| 126 |  | impl Push for PushServer { | 
| 127 |  |     #[allow(clippy::blocks_in_conditions)] | 
| 128 |  |     #[instrument( | 
| 129 |  |         err(level = Level::WARN), | 
| 130 |  |         ret(level = Level::INFO), | 
| 131 |  |         skip_all, | 
| 132 |  |         fields(request = ?grpc_request.get_ref()) | 
| 133 |  |     )] | 
| 134 |  |     async fn push_blob( | 
| 135 |  |         &self, | 
| 136 |  |         grpc_request: Request<PushBlobRequest>, | 
| 137 |  |     ) -> Result<Response<PushBlobResponse>, Status> { | 
| 138 |  |         let request = grpc_request.into_inner(); | 
| 139 |  |         let digest_function = request.digest_function; | 
| 140 |  |         self.inner_push_blob(request) | 
| 141 |  |             .instrument(error_span!("push_push_blob")) | 
| 142 |  |             .with_context( | 
| 143 |  |                 make_ctx_for_hash_func(digest_function).err_tip(|| "In PushServer::push_blob")?, | 
| 144 |  |             ) | 
| 145 |  |             .await | 
| 146 |  |             .err_tip(|| "Failed on push_blob() command") | 
| 147 |  |             .map_err(Into::into) | 
| 148 |  |     } | 
| 149 |  |  | 
| 150 |  |     #[allow(clippy::blocks_in_conditions)] | 
| 151 |  |     #[instrument( | 
| 152 |  |         err(level = Level::WARN), | 
| 153 |  |         ret(level = Level::INFO), | 
| 154 |  |         skip_all, | 
| 155 |  |         fields(request = ?_grpc_request.get_ref()) | 
| 156 |  |     )] | 
| 157 |  |     async fn push_directory( | 
| 158 |  |         &self, | 
| 159 |  |         _grpc_request: Request<PushDirectoryRequest>, | 
| 160 |  |     ) -> Result<Response<PushDirectoryResponse>, Status> { | 
| 161 |  |         Err(Status::unimplemented("PushDirectory not implemented")) | 
| 162 |  |     } | 
| 163 |  | } |