Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/cas_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use std::collections::{HashMap, VecDeque};
18
19
use bytes::Bytes;
20
use futures::stream::{FuturesUnordered, Stream};
21
use futures::{StreamExt, TryStreamExt};
22
use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName};
23
use nativelink_error::{Code, Error, ResultExt, error_if, make_input_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
25
    ContentAddressableStorage, ContentAddressableStorageServer as Server,
26
};
27
use nativelink_proto::build::bazel::remote::execution::v2::{
28
    BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
29
    BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse,
30
    GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response,
31
    compressor,
32
};
33
use nativelink_proto::google::rpc::Status as GrpcStatus;
34
use nativelink_store::ac_utils::get_and_decode_digest;
35
use nativelink_store::grpc_store::GrpcStore;
36
use nativelink_store::store_manager::StoreManager;
37
use nativelink_util::common::DigestInfo;
38
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
39
use nativelink_util::store_trait::{Store, StoreLike};
40
use opentelemetry::context::FutureExt;
41
use tonic::{Request, Response, Status};
42
use tracing::{Instrument, Level, debug, error_span, instrument};
43
44
#[derive(Debug)]
45
pub struct CasServer {
46
    stores: HashMap<String, Store>,
47
}
48
49
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;
50
51
impl CasServer {
52
8
    pub fn new(
53
8
        configs: &[WithInstanceName<CasStoreConfig>],
54
8
        store_manager: &StoreManager,
55
8
    ) -> Result<Self, Error> {
56
8
        let mut stores = HashMap::with_capacity(configs.len());
57
16
        for 
config8
in configs {
58
8
            let store = store_manager.get_store(&config.cas_store).ok_or_else(|| 
{0
59
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
60
0
            })?;
61
8
            stores.insert(config.instance_name.to_string(), store);
62
        }
63
8
        Ok(Self { stores })
64
8
    }
65
66
0
    pub fn into_service(self) -> Server<Self> {
67
0
        Server::new(self)
68
0
    }
69
70
4
    async fn inner_find_missing_blobs(
71
4
        &self,
72
4
        request: FindMissingBlobsRequest,
73
4
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
74
4
        let instance_name = &request.instance_name;
75
4
        let store = self
76
4
            .stores
77
4
            .get(instance_name)
78
4
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
79
4
            .clone();
80
81
4
        let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
82
10
        for 
digest7
in &request.blob_digests {
83
7
            requested_blobs.
push6
(DigestInfo::try_from(digest.clone())
?1
.
into6
());
84
        }
85
3
        let sizes = store
86
3
            .has_many(&requested_blobs)
87
3
            .await
88
3
            .err_tip(|| "In find_missing_blobs")
?0
;
89
3
        let missing_blob_digests = sizes
90
3
            .into_iter()
91
3
            .zip(request.blob_digests)
92
5
            .
filter_map3
(|(maybe_size, digest)| maybe_size.map_or_else(|| Some(
digest2
), |_| None))
93
3
            .collect();
94
95
3
        Ok(Response::new(FindMissingBlobsResponse {
96
3
            missing_blob_digests,
97
3
        }))
98
4
    }
99
100
2
    async fn inner_batch_update_blobs(
101
2
        &self,
102
2
        request: BatchUpdateBlobsRequest,
103
2
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
104
2
        let instance_name = &request.instance_name;
105
106
2
        let store = self
107
2
            .stores
108
2
            .get(instance_name)
109
2
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
110
2
            .clone();
111
112
        // If we are a GrpcStore we shortcut here, as this is a special store.
113
        // Note: We don't know the digests here, so we try perform a very shallow
114
        // check to see if it's a grpc store.
115
2
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (115:16): [True: 0, False: 2]
  Branch (115:16): [Folded - Ignored]
116
0
            return grpc_store.batch_update_blobs(Request::new(request)).await;
117
2
        }
118
119
2
        let store_ref = &store;
120
2
        let update_futures: FuturesUnordered<_> = request
121
2
            .requests
122
2
            .into_iter()
123
3
            .
map2
(|request| async move {
124
3
                let digest = request
125
3
                    .digest
126
3
                    .clone()
127
3
                    .err_tip(|| "Digest not found in request")
?0
;
128
3
                let request_data = request.data;
129
3
                let digest_info = DigestInfo::try_from(digest.clone())
?0
;
130
3
                let size_bytes = usize::try_from(digest_info.size_bytes())
131
3
                    .err_tip(|| "Digest size_bytes was not convertible to usize")
?0
;
132
0
                error_if!(
133
3
                    size_bytes != request_data.len(),
  Branch (133:21): [True: 0, False: 3]
  Branch (133:21): [Folded - Ignored]
134
                    "Digest for upload had mismatching sizes, digest said {} data  said {}",
135
                    size_bytes,
136
0
                    request_data.len()
137
                );
138
3
                let result = store_ref
139
3
                    .update_oneshot(digest_info, request_data)
140
3
                    .await
141
3
                    .err_tip(|| "Error writing to store");
142
                Ok::<_, Error>(batch_update_blobs_response::Response {
143
3
                    digest: Some(digest),
144
3
                    status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())),
145
                })
146
6
            })
147
2
            .collect();
148
2
        let responses = update_futures
149
2
            .try_collect::<Vec<batch_update_blobs_response::Response>>()
150
2
            .await
?0
;
151
152
2
        Ok(Response::new(BatchUpdateBlobsResponse { responses }))
153
2
    }
154
155
1
    async fn inner_batch_read_blobs(
156
1
        &self,
157
1
        request: BatchReadBlobsRequest,
158
1
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
159
1
        let instance_name = &request.instance_name;
160
161
1
        let store = self
162
1
            .stores
163
1
            .get(instance_name)
164
1
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
165
1
            .clone();
166
167
        // If we are a GrpcStore we shortcut here, as this is a special store.
168
        // Note: We don't know the digests here, so we try perform a very shallow
169
        // check to see if it's a grpc store.
170
1
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (170:16): [True: 0, False: 1]
  Branch (170:16): [Folded - Ignored]
171
0
            return grpc_store.batch_read_blobs(Request::new(request)).await;
172
1
        }
173
174
1
        let store_ref = &store;
175
1
        let read_futures: FuturesUnordered<_> = request
176
1
            .digests
177
1
            .into_iter()
178
3
            .
map1
(|digest| async move {
179
3
                let digest_copy = DigestInfo::try_from(digest.clone())
?0
;
180
                // TODO(palfrey) There is a security risk here of someone taking all the memory on the instance.
181
3
                let result = store_ref
182
3
                    .get_part_unchunked(digest_copy, 0, None)
183
3
                    .await
184
3
                    .err_tip(|| "Error reading from store");
185
3
                let (status, data) = result.map_or_else(
186
1
                    |mut e| {
187
1
                        if e.code == Code::NotFound {
  Branch (187:28): [True: 1, False: 0]
  Branch (187:28): [Folded - Ignored]
188
1
                            // Trim the error code. Not Found is quite common and we don't want to send a large
189
1
                            // error (debug) message for something that is common. We resize to just the last
190
1
                            // message as it will be the most relevant.
191
1
                            e.messages.resize_with(1, String::new);
192
1
                        
}0
193
1
                        (e.into(), Bytes::new())
194
1
                    },
195
2
                    |v| (GrpcStatus::default(), v),
196
                );
197
3
                Ok::<_, Error>(batch_read_blobs_response::Response {
198
3
                    status: Some(status),
199
3
                    digest: Some(digest),
200
3
                    compressor: compressor::Value::Identity.into(),
201
3
                    data,
202
3
                })
203
6
            })
204
1
            .collect();
205
1
        let responses = read_futures
206
1
            .try_collect::<Vec<batch_read_blobs_response::Response>>()
207
1
            .await
?0
;
208
209
1
        Ok(Response::new(BatchReadBlobsResponse { responses }))
210
1
    }
211
212
6
    async fn inner_get_tree(
213
6
        &self,
214
6
        request: GetTreeRequest,
215
6
    ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> {
216
6
        let instance_name = &request.instance_name;
217
218
6
        let store = self
219
6
            .stores
220
6
            .get(instance_name)
221
6
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
222
6
            .clone();
223
224
        // If we are a GrpcStore we shortcut here, as this is a special store.
225
        // Note: We don't know the digests here, so we try perform a very shallow
226
        // check to see if it's a grpc store.
227
6
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (227:16): [True: 0, False: 6]
  Branch (227:16): [Folded - Ignored]
228
0
            let stream = grpc_store
229
0
                .get_tree(Request::new(request))
230
0
                .await?
231
0
                .into_inner();
232
0
            return Ok(stream.left_stream());
233
6
        }
234
6
        let root_digest: DigestInfo = request
235
6
            .root_digest
236
6
            .err_tip(|| "Expected root_digest to exist in GetTreeRequest")
?0
237
6
            .try_into()
238
6
            .err_tip(|| "In GetTreeRequest::root_digest")
?0
;
239
240
6
        let mut deque: VecDeque<DigestInfo> = VecDeque::new();
241
6
        let mut directories: Vec<Directory> = Vec::new();
242
        // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
243
6
        let page_token_digest = if request.page_token.is_empty() {
  Branch (243:36): [True: 2, False: 4]
  Branch (243:36): [Folded - Ignored]
244
2
            root_digest
245
        } else {
246
4
            let mut page_token_parts = request.page_token.split('-');
247
4
            DigestInfo::try_new(
248
4
                page_token_parts
249
4
                    .next()
250
4
                    .err_tip(|| "Failed to parse `hash_str` in `page_token`")
?0
,
251
4
                page_token_parts
252
4
                    .next()
253
4
                    .err_tip(|| "Failed to parse `size_bytes` in `page_token`")
?0
254
4
                    .parse::<i64>()
255
4
                    .err_tip(|| "Failed to parse `size_bytes` as i64")
?0
,
256
            )
257
4
            .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")
?0
258
        };
259
6
        let page_size = request.page_size;
260
        // If `page_size` is 0, paging is not necessary.
261
6
        let mut page_token_matched = page_size == 0;
262
6
        deque.push_back(root_digest);
263
264
28
        while !deque.is_empty() {
  Branch (264:15): [True: 26, False: 2]
  Branch (264:15): [Folded - Ignored]
265
26
            let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")
?0
;
266
26
            let directory = get_and_decode_digest::<Directory>(&store, digest.into())
267
26
                .await
268
26
                .err_tip(|| "Converting digest to Directory")
?0
;
269
26
            if digest == page_token_digest {
  Branch (269:16): [True: 6, False: 20]
  Branch (269:16): [Folded - Ignored]
270
6
                page_token_matched = true;
271
20
            }
272
56
            for 
directory30
in &directory.directories {
273
30
                let digest: DigestInfo = directory
274
30
                    .digest
275
30
                    .clone()
276
30
                    .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
277
30
                    .try_into()
278
30
                    .err_tip(|| "In Directory::file::digest")
?0
;
279
30
                deque.push_back(digest);
280
            }
281
282
26
            let page_size_usize = usize::try_from(page_size).unwrap_or(usize::MAX);
283
284
26
            if page_token_matched {
  Branch (284:16): [True: 20, False: 6]
  Branch (284:16): [Folded - Ignored]
285
20
                directories.push(directory);
286
20
                if directories.len() == page_size_usize {
  Branch (286:20): [True: 4, False: 16]
  Branch (286:20): [Folded - Ignored]
287
4
                    break;
288
16
                }
289
6
            }
290
        }
291
        // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
292
        // It will be an empty string when it reached the end of the directory tree.
293
6
        let next_page_token: String = deque
294
6
            .front()
295
6
            .map_or_else(String::new, |value| format!(
"{value}"3
));
296
297
6
        Ok(futures::stream::once(async {
298
6
            Ok(GetTreeResponse {
299
6
                directories,
300
6
                next_page_token,
301
6
            })
302
6
        })
303
6
        .right_stream())
304
6
    }
305
}
306
307
#[tonic::async_trait]
308
impl ContentAddressableStorage for CasServer {
309
    type GetTreeStream = GetTreeStream;
310
311
    #[instrument(
312
        err,
313
        ret(level = Level::DEBUG),
314
        level = Level::ERROR,
315
        skip_all,
316
        fields(
317
            // Mostly to skip request.blob_digests which is sometimes enormous
318
            request.instance_name = ?grpc_request.get_ref().instance_name,
319
            request.digest_function = ?grpc_request.get_ref().digest_function
320
        )
321
    )]
322
    async fn find_missing_blobs(
323
        &self,
324
        grpc_request: Request<FindMissingBlobsRequest>,
325
    ) -> Result<Response<FindMissingBlobsResponse>, Status> {
326
        let request = grpc_request.into_inner();
327
        let digest_function = request.digest_function;
328
        self.inner_find_missing_blobs(request)
329
            .instrument(error_span!("cas_server_find_missing_blobs"))
330
            .with_context(
331
                make_ctx_for_hash_func(digest_function)
332
                    .err_tip(|| "In CasServer::find_missing_blobs")?,
333
            )
334
            .await
335
            .err_tip(|| "Failed on find_missing_blobs() command")
336
            .map_err(Into::into)
337
    }
338
339
    #[instrument(
340
        err,
341
        ret(level = Level::DEBUG),
342
        level = Level::ERROR,
343
        skip_all,
344
        fields(request = ?grpc_request.get_ref())
345
    )]
346
    async fn batch_update_blobs(
347
        &self,
348
        grpc_request: Request<BatchUpdateBlobsRequest>,
349
    ) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
350
        let request = grpc_request.into_inner();
351
        let digest_function = request.digest_function;
352
353
        self.inner_batch_update_blobs(request)
354
            .instrument(error_span!("cas_server_batch_update_blobs"))
355
            .with_context(
356
                make_ctx_for_hash_func(digest_function)
357
                    .err_tip(|| "In CasServer::batch_update_blobs")?,
358
            )
359
            .await
360
            .err_tip(|| "Failed on batch_update_blobs() command")
361
            .map_err(Into::into)
362
    }
363
364
    #[instrument(
365
        err,
366
        ret(level = Level::INFO),
367
        level = Level::ERROR,
368
        skip_all,
369
        fields(request = ?grpc_request.get_ref())
370
    )]
371
    async fn batch_read_blobs(
372
        &self,
373
        grpc_request: Request<BatchReadBlobsRequest>,
374
    ) -> Result<Response<BatchReadBlobsResponse>, Status> {
375
        let request = grpc_request.into_inner();
376
        let digest_function = request.digest_function;
377
378
        self.inner_batch_read_blobs(request)
379
            .instrument(error_span!("cas_server_batch_read_blobs"))
380
            .with_context(
381
                make_ctx_for_hash_func(digest_function)
382
                    .err_tip(|| "In CasServer::batch_read_blobs")?,
383
            )
384
            .await
385
            .err_tip(|| "Failed on batch_read_blobs() command")
386
            .map_err(Into::into)
387
    }
388
389
    #[instrument(
390
        err,
391
        level = Level::ERROR,
392
        skip_all,
393
        fields(request = ?grpc_request.get_ref())
394
    )]
395
    async fn get_tree(
396
        &self,
397
        grpc_request: Request<GetTreeRequest>,
398
    ) -> Result<Response<Self::GetTreeStream>, Status> {
399
        let request = grpc_request.into_inner();
400
        let digest_function = request.digest_function;
401
402
        let resp = self
403
            .inner_get_tree(request)
404
            .instrument(error_span!("cas_server_get_tree"))
405
            .with_context(
406
                make_ctx_for_hash_func(digest_function).err_tip(|| "In CasServer::get_tree")?,
407
            )
408
            .await
409
            .err_tip(|| "Failed on get_tree() command")
410
6
            .map(|stream| -> Response<Self::GetTreeStream> { Response::new(Box::pin(stream)) })
411
            .map_err(Into::into);
412
413
        if resp.is_ok() {
414
            debug!(return = "Ok(<stream>)");
415
        }
416
        resp
417
    }
418
}