/build/source/nativelink-service/src/push_server.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::convert::Into; |
16 | | use std::collections::HashMap; |
17 | | |
18 | | use nativelink_config::cas_server::{PushConfig, WithInstanceName}; |
19 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
20 | | use nativelink_proto::build::bazel::remote::asset::v1::push_server::{Push, PushServer as Server}; |
21 | | use nativelink_proto::build::bazel::remote::asset::v1::{ |
22 | | PushBlobRequest, PushBlobResponse, PushDirectoryRequest, PushDirectoryResponse, |
23 | | }; |
24 | | use nativelink_store::store_manager::StoreManager; |
25 | | use nativelink_util::digest_hasher::make_ctx_for_hash_func; |
26 | | use nativelink_util::store_trait::{Store, StoreLike}; |
27 | | use opentelemetry::context::FutureExt; |
28 | | use tonic::{Request, Response, Status}; |
29 | | use tracing::{Instrument, Level, error_span, info, instrument}; |
30 | | |
31 | | use crate::remote_asset_proto::RemoteAssetArtifact; |
32 | | |
33 | | #[derive(Debug, Clone)] |
34 | | pub struct PushStoreInfo { |
35 | | store: Store, |
36 | | read_only: bool, |
37 | | } |
38 | | |
39 | | #[derive(Debug, Clone)] |
40 | | pub struct PushServer { |
41 | | stores: HashMap<String, PushStoreInfo>, |
42 | | } |
43 | | |
44 | | impl PushServer { |
45 | 1 | pub fn new( |
46 | 1 | configs: &[WithInstanceName<PushConfig>], |
47 | 1 | store_manager: &StoreManager, |
48 | 1 | ) -> Result<Self, Error> { |
49 | 1 | let mut stores = HashMap::with_capacity(configs.len()); |
50 | 2 | for config1 in configs { |
51 | 1 | let store = store_manager.get_store(&config.push_store).ok_or_else(|| {0 |
52 | 0 | make_input_err!("'push_store': '{}' does not exist", config.push_store) |
53 | 0 | })?; |
54 | 1 | stores.insert( |
55 | 1 | config.instance_name.to_string(), |
56 | 1 | PushStoreInfo { |
57 | 1 | store, |
58 | 1 | read_only: config.read_only, |
59 | 1 | }, |
60 | | ); |
61 | | } |
62 | 1 | Ok(Self { |
63 | 1 | stores: stores.clone(), |
64 | 1 | }) |
65 | 1 | } |
66 | | |
67 | 0 | pub fn into_service(self) -> Server<Self> { |
68 | 0 | Server::new(self) |
69 | 0 | } |
70 | | |
71 | 1 | async fn inner_push_blob( |
72 | 1 | &self, |
73 | 1 | request: PushBlobRequest, |
74 | 1 | ) -> Result<Response<PushBlobResponse>, Error> { |
75 | 1 | let instance_name = &request.instance_name; |
76 | 1 | let store_info = self |
77 | 1 | .stores |
78 | 1 | .get(instance_name) |
79 | 1 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0 ; |
80 | | |
81 | 1 | if store_info.read_only { Branch (81:12): [True: 0, False: 1]
Branch (81:12): [Folded - Ignored]
|
82 | 0 | return Err(make_err!( |
83 | 0 | Code::PermissionDenied, |
84 | 0 | "The store '{instance_name}' is read only on this endpoint", |
85 | 0 | )); |
86 | 1 | } |
87 | | |
88 | 1 | let blob_digest = request.blob_digest.ok_or_else(|| {0 |
89 | 0 | Error::new( |
90 | 0 | Code::InvalidArgument, |
91 | 0 | "Missing blob digest in push".to_owned(), |
92 | | ) |
93 | 0 | })?; |
94 | 1 | if request.uris.is_empty() { Branch (94:12): [True: 0, False: 1]
Branch (94:12): [Folded - Ignored]
|
95 | 0 | return Err(Error::new( |
96 | 0 | Code::InvalidArgument, |
97 | 0 | "No uris in push request".to_owned(), |
98 | 0 | )); |
99 | 1 | } |
100 | 2 | for uri1 in request.uris { |
101 | 1 | let asset = RemoteAssetArtifact::new( |
102 | 1 | uri.clone(), |
103 | 1 | request.qualifiers.clone(), |
104 | 1 | blob_digest.clone(), |
105 | 1 | request.expire_at, |
106 | 1 | request.digest_function, |
107 | | ); |
108 | 1 | let asset_digest = asset.digest(); |
109 | 1 | store_info |
110 | 1 | .store |
111 | 1 | .update_oneshot(asset_digest, asset.as_bytes()) |
112 | 1 | .await |
113 | 1 | .err_tip(|| "Failed to update in push server")?0 ; |
114 | 1 | info!( |
115 | | uri = uri, |
116 | 1 | digest = format!("{}", asset_digest), |
117 | 1 | "Pushed asset metadata with hash" |
118 | | ); |
119 | | } |
120 | | |
121 | 1 | Ok(Response::new(PushBlobResponse {})) |
122 | 1 | } |
123 | | } |
124 | | |
125 | | #[tonic::async_trait] |
126 | | impl Push for PushServer { |
127 | | #[allow(clippy::blocks_in_conditions)] |
128 | | #[instrument( |
129 | | err(level = Level::WARN), |
130 | | ret(level = Level::INFO), |
131 | | skip_all, |
132 | | fields(request = ?grpc_request.get_ref()) |
133 | | )] |
134 | | async fn push_blob( |
135 | | &self, |
136 | | grpc_request: Request<PushBlobRequest>, |
137 | 2 | ) -> Result<Response<PushBlobResponse>, Status> { |
138 | 1 | let request = grpc_request.into_inner(); |
139 | 1 | let digest_function = request.digest_function; |
140 | 1 | self.inner_push_blob(request) |
141 | 1 | .instrument(error_span!("push_push_blob")) |
142 | 1 | .with_context( |
143 | 1 | make_ctx_for_hash_func(digest_function).err_tip(|| "In PushServer::push_blob")?0 , |
144 | | ) |
145 | 1 | .await |
146 | 1 | .err_tip(|| "Failed on push_blob() command") |
147 | 1 | .map_err(Into::into) |
148 | 2 | } |
149 | | |
150 | | #[allow(clippy::blocks_in_conditions)] |
151 | | #[instrument( |
152 | | err(level = Level::WARN), |
153 | | ret(level = Level::INFO), |
154 | | skip_all, |
155 | | fields(request = ?_grpc_request.get_ref()) |
156 | | )] |
157 | | async fn push_directory( |
158 | | &self, |
159 | | _grpc_request: Request<PushDirectoryRequest>, |
160 | 0 | ) -> Result<Response<PushDirectoryResponse>, Status> { |
161 | 0 | todo!() |
162 | 0 | } |
163 | | } |