/build/source/nativelink-service/src/fetch_server.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::convert::Into; |
16 | | use std::collections::HashMap; |
17 | | |
18 | | use nativelink_config::cas_server::{FetchConfig, WithInstanceName}; |
19 | | use nativelink_error::{Error, ResultExt, make_err, make_input_err}; |
20 | | use nativelink_proto::build::bazel::remote::asset::v1::fetch_server::{ |
21 | | Fetch, FetchServer as Server, |
22 | | }; |
23 | | use nativelink_proto::build::bazel::remote::asset::v1::{ |
24 | | FetchBlobRequest, FetchBlobResponse, FetchDirectoryRequest, FetchDirectoryResponse, |
25 | | }; |
26 | | use nativelink_proto::google::rpc::Status as GoogleStatus; |
27 | | use nativelink_store::store_manager::StoreManager; |
28 | | use nativelink_util::digest_hasher::{default_digest_hasher_func, make_ctx_for_hash_func}; |
29 | | use nativelink_util::store_trait::{Store, StoreLike}; |
30 | | use opentelemetry::context::FutureExt; |
31 | | use prost::Message; |
32 | | use tonic::{Code, Request, Response, Status}; |
33 | | use tracing::{Instrument, Level, error_span, info, instrument}; |
34 | | |
35 | | use crate::remote_asset_proto::{RemoteAssetArtifact, RemoteAssetQuery}; |
36 | | |
37 | | #[derive(Debug, Clone)] |
38 | | pub struct FetchStoreInfo { |
39 | | store: Store, |
40 | | } |
41 | | |
42 | | #[derive(Debug, Clone)] |
43 | | pub struct FetchServer { |
44 | | stores: HashMap<String, FetchStoreInfo>, |
45 | | } |
46 | | |
47 | | impl FetchServer { |
48 | 1 | pub fn new( |
49 | 1 | configs: &[WithInstanceName<FetchConfig>], |
50 | 1 | store_manager: &StoreManager, |
51 | 1 | ) -> Result<Self, Error> { |
52 | 1 | let mut stores = HashMap::with_capacity(configs.len()); |
53 | 2 | for config1 in configs { |
54 | 1 | let store = store_manager |
55 | 1 | .get_store(&config.fetch_store) |
56 | 1 | .ok_or_else(|| {0 |
57 | 0 | make_input_err!("'fetch_store': '{}' does not exist", config.fetch_store) |
58 | 0 | })?; |
59 | 1 | stores.insert(config.instance_name.to_string(), FetchStoreInfo { store }); |
60 | | } |
61 | 1 | Ok(Self { |
62 | 1 | stores: stores.clone(), |
63 | 1 | }) |
64 | 1 | } |
65 | | |
66 | 0 | pub fn into_service(self) -> Server<Self> { |
67 | 0 | Server::new(self) |
68 | 0 | } |
69 | | |
70 | 1 | async fn inner_fetch_blob( |
71 | 1 | &self, |
72 | 1 | request: FetchBlobRequest, |
73 | 1 | ) -> Result<Response<FetchBlobResponse>, Error> { |
74 | 1 | let instance_name = &request.instance_name; |
75 | 1 | let store_info = self |
76 | 1 | .stores |
77 | 1 | .get(instance_name) |
78 | 1 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0 ; |
79 | | |
80 | 1 | if request.uris.is_empty() { Branch (80:12): [True: 0, False: 1]
Branch (80:12): [Folded - Ignored]
|
81 | 0 | return Err(Error::new( |
82 | 0 | Code::InvalidArgument, |
83 | 0 | "No uris in fetch request".to_owned(), |
84 | 0 | )); |
85 | 1 | } |
86 | 1 | for uri in &request.uris { |
87 | 1 | let asset_request = RemoteAssetQuery::new(uri.clone(), request.qualifiers.clone()); |
88 | 1 | let asset_digest = asset_request.digest(); |
89 | 1 | let asset_response_possible = store_info |
90 | 1 | .store |
91 | 1 | .get_part_unchunked(asset_digest, 0, None) |
92 | 1 | .await; |
93 | | |
94 | 1 | info!( |
95 | | uri = uri, |
96 | 1 | digest = format!("{}", asset_digest), |
97 | 1 | "Looked up fetch asset" |
98 | | ); |
99 | | |
100 | 1 | if let Ok(asset_response_raw) = asset_response_possible { Branch (100:20): [True: 1, False: 0]
Branch (100:20): [Folded - Ignored]
|
101 | 1 | let asset_response = RemoteAssetArtifact::decode(asset_response_raw).unwrap(); |
102 | 1 | return Ok(Response::new(FetchBlobResponse { |
103 | 1 | status: Some(GoogleStatus { |
104 | 1 | code: Code::Ok.into(), |
105 | 1 | message: "Fetch object found".to_owned(), |
106 | 1 | details: vec![], |
107 | 1 | }), |
108 | 1 | uri: asset_response.uri, |
109 | 1 | qualifiers: asset_response.qualifiers, |
110 | 1 | expires_at: asset_response.expire_at, |
111 | 1 | blob_digest: asset_response.blob_digest, |
112 | 1 | digest_function: asset_response.digest_function, |
113 | 1 | })); |
114 | 0 | } |
115 | | } |
116 | 0 | Ok(Response::new(FetchBlobResponse { |
117 | 0 | status: Some(make_err!(Code::NotFound, "No item found").into()), |
118 | 0 | uri: request.uris.first().cloned().unwrap_or(String::new()), |
119 | 0 | qualifiers: vec![], |
120 | 0 | expires_at: None, |
121 | 0 | blob_digest: None, |
122 | 0 | digest_function: default_digest_hasher_func().proto_digest_func().into(), |
123 | 0 | })) |
124 | 1 | } |
125 | | } |
126 | | |
127 | | #[tonic::async_trait] |
128 | | impl Fetch for FetchServer { |
129 | | #[allow(clippy::blocks_in_conditions)] |
130 | | #[instrument( |
131 | | err(level = Level::WARN), |
132 | | ret(level = Level::INFO), |
133 | | skip_all, |
134 | | fields(request = ?grpc_request.get_ref()) |
135 | | )] |
136 | | async fn fetch_blob( |
137 | | &self, |
138 | | grpc_request: Request<FetchBlobRequest>, |
139 | 2 | ) -> Result<Response<FetchBlobResponse>, Status> { |
140 | 1 | let request = grpc_request.into_inner(); |
141 | 1 | let digest_function = request.digest_function; |
142 | 1 | self.inner_fetch_blob(request) |
143 | 1 | .instrument(error_span!("fetch_server_fetch_blob")) |
144 | 1 | .with_context( |
145 | 1 | make_ctx_for_hash_func(digest_function).err_tip(|| "In FetchServer::fetch_blob")?0 , |
146 | | ) |
147 | 1 | .await |
148 | 1 | .err_tip(|| "Failed on fetch_blob() command") |
149 | 1 | .map_err(Into::into) |
150 | 2 | } |
151 | | |
152 | | #[allow(clippy::blocks_in_conditions)] |
153 | | #[instrument( |
154 | | err(level = Level::WARN), |
155 | | ret(level = Level::INFO), |
156 | | skip_all, |
157 | | fields(request = ?_grpc_request.get_ref()) |
158 | | )] |
159 | | async fn fetch_directory( |
160 | | &self, |
161 | | _grpc_request: Request<FetchDirectoryRequest>, |
162 | 0 | ) -> Result<Response<FetchDirectoryResponse>, Status> { |
163 | 0 | todo!() |
164 | 0 | } |
165 | | } |