Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/fetch_server.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use std::collections::HashMap;
17
18
use nativelink_config::cas_server::{FetchConfig, WithInstanceName};
19
use nativelink_error::{Error, ResultExt, make_err, make_input_err};
20
use nativelink_proto::build::bazel::remote::asset::v1::fetch_server::{
21
    Fetch, FetchServer as Server,
22
};
23
use nativelink_proto::build::bazel::remote::asset::v1::{
24
    FetchBlobRequest, FetchBlobResponse, FetchDirectoryRequest, FetchDirectoryResponse,
25
};
26
use nativelink_proto::google::rpc::Status as GoogleStatus;
27
use nativelink_store::store_manager::StoreManager;
28
use nativelink_util::digest_hasher::{default_digest_hasher_func, make_ctx_for_hash_func};
29
use nativelink_util::store_trait::{Store, StoreLike};
30
use opentelemetry::context::FutureExt;
31
use prost::Message;
32
use tonic::{Code, Request, Response, Status};
33
use tracing::{Instrument, Level, error_span, info, instrument};
34
35
use crate::remote_asset_proto::{RemoteAssetArtifact, RemoteAssetQuery};
36
37
#[derive(Debug, Clone)]
38
pub struct FetchStoreInfo {
39
    store: Store,
40
}
41
42
#[derive(Debug, Clone)]
43
pub struct FetchServer {
44
    stores: HashMap<String, FetchStoreInfo>,
45
}
46
47
impl FetchServer {
48
1
    pub fn new(
49
1
        configs: &[WithInstanceName<FetchConfig>],
50
1
        store_manager: &StoreManager,
51
1
    ) -> Result<Self, Error> {
52
1
        let mut stores = HashMap::with_capacity(configs.len());
53
2
        for 
config1
in configs {
54
1
            let store = store_manager
55
1
                .get_store(&config.fetch_store)
56
1
                .ok_or_else(|| 
{0
57
0
                    make_input_err!("'fetch_store': '{}' does not exist", config.fetch_store)
58
0
                })?;
59
1
            stores.insert(config.instance_name.to_string(), FetchStoreInfo { store });
60
        }
61
1
        Ok(Self {
62
1
            stores: stores.clone(),
63
1
        })
64
1
    }
65
66
0
    pub fn into_service(self) -> Server<Self> {
67
0
        Server::new(self)
68
0
    }
69
70
1
    async fn inner_fetch_blob(
71
1
        &self,
72
1
        request: FetchBlobRequest,
73
1
    ) -> Result<Response<FetchBlobResponse>, Error> {
74
1
        let instance_name = &request.instance_name;
75
1
        let store_info = self
76
1
            .stores
77
1
            .get(instance_name)
78
1
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
;
79
80
1
        if request.uris.is_empty() {
  Branch (80:12): [True: 0, False: 1]
  Branch (80:12): [Folded - Ignored]
81
0
            return Err(Error::new(
82
0
                Code::InvalidArgument,
83
0
                "No uris in fetch request".to_owned(),
84
0
            ));
85
1
        }
86
1
        for uri in &request.uris {
87
1
            let asset_request = RemoteAssetQuery::new(uri.clone(), request.qualifiers.clone());
88
1
            let asset_digest = asset_request.digest();
89
1
            let asset_response_possible = store_info
90
1
                .store
91
1
                .get_part_unchunked(asset_digest, 0, None)
92
1
                .await;
93
94
1
            info!(
95
                uri = uri,
96
1
                digest = format!("{}", asset_digest),
97
1
                "Looked up fetch asset"
98
            );
99
100
1
            if let Ok(asset_response_raw) = asset_response_possible {
  Branch (100:20): [True: 1, False: 0]
  Branch (100:20): [Folded - Ignored]
101
1
                let asset_response = RemoteAssetArtifact::decode(asset_response_raw).unwrap();
102
1
                return Ok(Response::new(FetchBlobResponse {
103
1
                    status: Some(GoogleStatus {
104
1
                        code: Code::Ok.into(),
105
1
                        message: "Fetch object found".to_owned(),
106
1
                        details: vec![],
107
1
                    }),
108
1
                    uri: asset_response.uri,
109
1
                    qualifiers: asset_response.qualifiers,
110
1
                    expires_at: asset_response.expire_at,
111
1
                    blob_digest: asset_response.blob_digest,
112
1
                    digest_function: asset_response.digest_function,
113
1
                }));
114
0
            }
115
        }
116
0
        Ok(Response::new(FetchBlobResponse {
117
0
            status: Some(make_err!(Code::NotFound, "No item found").into()),
118
0
            uri: request.uris.first().cloned().unwrap_or(String::new()),
119
0
            qualifiers: vec![],
120
0
            expires_at: None,
121
0
            blob_digest: None,
122
0
            digest_function: default_digest_hasher_func().proto_digest_func().into(),
123
0
        }))
124
1
    }
125
}
126
127
#[tonic::async_trait]
128
impl Fetch for FetchServer {
129
    #[allow(clippy::blocks_in_conditions)]
130
    #[instrument(
131
        err(level = Level::WARN),
132
        ret(level = Level::INFO),
133
        skip_all,
134
        fields(request = ?grpc_request.get_ref())
135
    )]
136
    async fn fetch_blob(
137
        &self,
138
        grpc_request: Request<FetchBlobRequest>,
139
2
    ) -> Result<Response<FetchBlobResponse>, Status> {
140
1
        let request = grpc_request.into_inner();
141
1
        let digest_function = request.digest_function;
142
1
        self.inner_fetch_blob(request)
143
1
            .instrument(error_span!("fetch_server_fetch_blob"))
144
1
            .with_context(
145
1
                make_ctx_for_hash_func(digest_function).err_tip(|| "In FetchServer::fetch_blob")
?0
,
146
            )
147
1
            .await
148
1
            .err_tip(|| "Failed on fetch_blob() command")
149
1
            .map_err(Into::into)
150
2
    }
151
152
    #[allow(clippy::blocks_in_conditions)]
153
    #[instrument(
154
        err(level = Level::WARN),
155
        ret(level = Level::INFO),
156
        skip_all,
157
        fields(request = ?_grpc_request.get_ref())
158
    )]
159
    async fn fetch_directory(
160
        &self,
161
        _grpc_request: Request<FetchDirectoryRequest>,
162
0
    ) -> Result<Response<FetchDirectoryResponse>, Status> {
163
0
        todo!()
164
0
    }
165
}