/build/source/nativelink-service/src/fetch_server.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2025 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::convert::Into;  | 
16  |  | use std::collections::HashMap;  | 
17  |  |  | 
18  |  | use nativelink_config::cas_server::{FetchConfig, WithInstanceName}; | 
19  |  | use nativelink_error::{Error, ResultExt, make_err, make_input_err}; | 
20  |  | use nativelink_proto::build::bazel::remote::asset::v1::fetch_server::{ | 
21  |  |     Fetch, FetchServer as Server,  | 
22  |  | };  | 
23  |  | use nativelink_proto::build::bazel::remote::asset::v1::{ | 
24  |  |     FetchBlobRequest, FetchBlobResponse, FetchDirectoryRequest, FetchDirectoryResponse,  | 
25  |  | };  | 
26  |  | use nativelink_proto::google::rpc::Status as GoogleStatus;  | 
27  |  | use nativelink_store::store_manager::StoreManager;  | 
28  |  | use nativelink_util::digest_hasher::{default_digest_hasher_func, make_ctx_for_hash_func}; | 
29  |  | use nativelink_util::store_trait::{Store, StoreLike}; | 
30  |  | use opentelemetry::context::FutureExt;  | 
31  |  | use prost::Message;  | 
32  |  | use tonic::{Code, Request, Response, Status}; | 
33  |  | use tracing::{Instrument, Level, error_span, info, instrument}; | 
34  |  |  | 
35  |  | use crate::remote_asset_proto::{RemoteAssetArtifact, RemoteAssetQuery}; | 
36  |  |  | 
37  |  | #[derive(Debug, Clone)]  | 
38  |  | pub struct FetchStoreInfo { | 
39  |  |     store: Store,  | 
40  |  | }  | 
41  |  |  | 
42  |  | #[derive(Debug, Clone)]  | 
43  |  | pub struct FetchServer { | 
44  |  |     stores: HashMap<String, FetchStoreInfo>,  | 
45  |  | }  | 
46  |  |  | 
47  |  | impl FetchServer { | 
48  | 1  |     pub fn new(  | 
49  | 1  |         configs: &[WithInstanceName<FetchConfig>],  | 
50  | 1  |         store_manager: &StoreManager,  | 
51  | 1  |     ) -> Result<Self, Error> { | 
52  | 1  |         let mut stores = HashMap::with_capacity(configs.len());  | 
53  | 2  |         for config1  in configs {  | 
54  | 1  |             let store = store_manager  | 
55  | 1  |                 .get_store(&config.fetch_store)  | 
56  | 1  |                 .ok_or_else(|| {0   | 
57  | 0  |                     make_input_err!("'fetch_store': '{}' does not exist", config.fetch_store) | 
58  | 0  |                 })?;  | 
59  | 1  |             stores.insert(config.instance_name.to_string(), FetchStoreInfo { store }); | 
60  |  |         }  | 
61  | 1  |         Ok(Self { | 
62  | 1  |             stores: stores.clone(),  | 
63  | 1  |         })  | 
64  | 1  |     }  | 
65  |  |  | 
66  | 0  |     pub fn into_service(self) -> Server<Self> { | 
67  | 0  |         Server::new(self)  | 
68  | 0  |     }  | 
69  |  |  | 
70  | 1  |     async fn inner_fetch_blob(  | 
71  | 1  |         &self,  | 
72  | 1  |         request: FetchBlobRequest,  | 
73  | 1  |     ) -> Result<Response<FetchBlobResponse>, Error> { | 
74  | 1  |         let instance_name = &request.instance_name;  | 
75  | 1  |         let store_info = self  | 
76  | 1  |             .stores  | 
77  | 1  |             .get(instance_name)  | 
78  | 1  |             .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"0 ))?0 ;  | 
79  |  |  | 
80  | 1  |         if request.uris.is_empty() {  Branch (80:12): [True: 0, False: 1]
   Branch (80:12): [Folded - Ignored]
  | 
81  | 0  |             return Err(Error::new(  | 
82  | 0  |                 Code::InvalidArgument,  | 
83  | 0  |                 "No uris in fetch request".to_owned(),  | 
84  | 0  |             ));  | 
85  | 1  |         }  | 
86  | 1  |         for uri in &request.uris { | 
87  | 1  |             let asset_request = RemoteAssetQuery::new(uri.clone(), request.qualifiers.clone());  | 
88  | 1  |             let asset_digest = asset_request.digest();  | 
89  | 1  |             let asset_response_possible = store_info  | 
90  | 1  |                 .store  | 
91  | 1  |                 .get_part_unchunked(asset_digest, 0, None)  | 
92  | 1  |                 .await;  | 
93  |  |  | 
94  | 1  |             info!(  | 
95  |  |                 uri = uri,  | 
96  | 1  |                 digest = format!("{}", asset_digest), | 
97  | 1  |                 "Looked up fetch asset"  | 
98  |  |             );  | 
99  |  |  | 
100  | 1  |             if let Ok(asset_response_raw) = asset_response_possible {  Branch (100:20): [True: 1, False: 0]
   Branch (100:20): [Folded - Ignored]
  | 
101  | 1  |                 let asset_response = RemoteAssetArtifact::decode(asset_response_raw).unwrap();  | 
102  | 1  |                 return Ok(Response::new(FetchBlobResponse { | 
103  | 1  |                     status: Some(GoogleStatus { | 
104  | 1  |                         code: Code::Ok.into(),  | 
105  | 1  |                         message: "Fetch object found".to_owned(),  | 
106  | 1  |                         details: vec![],  | 
107  | 1  |                     }),  | 
108  | 1  |                     uri: asset_response.uri,  | 
109  | 1  |                     qualifiers: asset_response.qualifiers,  | 
110  | 1  |                     expires_at: asset_response.expire_at,  | 
111  | 1  |                     blob_digest: asset_response.blob_digest,  | 
112  | 1  |                     digest_function: asset_response.digest_function,  | 
113  | 1  |                 }));  | 
114  | 0  |             }  | 
115  |  |         }  | 
116  | 0  |         Ok(Response::new(FetchBlobResponse { | 
117  | 0  |             status: Some(make_err!(Code::NotFound, "No item found").into()),  | 
118  | 0  |             uri: request.uris.first().cloned().unwrap_or(String::new()),  | 
119  | 0  |             qualifiers: vec![],  | 
120  | 0  |             expires_at: None,  | 
121  | 0  |             blob_digest: None,  | 
122  | 0  |             digest_function: default_digest_hasher_func().proto_digest_func().into(),  | 
123  | 0  |         }))  | 
124  | 1  |     }  | 
125  |  | }  | 
126  |  |  | 
127  |  | #[tonic::async_trait]  | 
128  |  | impl Fetch for FetchServer { | 
129  |  |     #[allow(clippy::blocks_in_conditions)]  | 
130  |  |     #[instrument(  | 
131  |  |         err(level = Level::WARN),  | 
132  |  |         ret(level = Level::INFO),  | 
133  |  |         skip_all,  | 
134  |  |         fields(request = ?grpc_request.get_ref())  | 
135  |  |     )]  | 
136  |  |     async fn fetch_blob(  | 
137  |  |         &self,  | 
138  |  |         grpc_request: Request<FetchBlobRequest>,  | 
139  |  |     ) -> Result<Response<FetchBlobResponse>, Status> { | 
140  |  |         let request = grpc_request.into_inner();  | 
141  |  |         let digest_function = request.digest_function;  | 
142  |  |         self.inner_fetch_blob(request)  | 
143  |  |             .instrument(error_span!("fetch_server_fetch_blob")) | 
144  |  |             .with_context(  | 
145  |  |                 make_ctx_for_hash_func(digest_function).err_tip(|| "In FetchServer::fetch_blob")?,  | 
146  |  |             )  | 
147  |  |             .await  | 
148  |  |             .err_tip(|| "Failed on fetch_blob() command")  | 
149  |  |             .map_err(Into::into)  | 
150  |  |     }  | 
151  |  |  | 
152  |  |     #[allow(clippy::blocks_in_conditions)]  | 
153  |  |     #[instrument(  | 
154  |  |         err(level = Level::WARN),  | 
155  |  |         ret(level = Level::INFO),  | 
156  |  |         skip_all,  | 
157  |  |         fields(request = ?_grpc_request.get_ref())  | 
158  |  |     )]  | 
159  |  |     async fn fetch_directory(  | 
160  |  |         &self,  | 
161  |  |         _grpc_request: Request<FetchDirectoryRequest>,  | 
162  |  |     ) -> Result<Response<FetchDirectoryResponse>, Status> { | 
163  |  |         Err(Status::unimplemented("FetchDirectory not implemented")) | 
164  |  |     }  | 
165  |  | }  |