Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/cas_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::collections::{HashMap, VecDeque};
19
20
use bytes::Bytes;
21
use futures::stream::{FuturesUnordered, Stream};
22
use futures::{StreamExt, TryStreamExt};
23
use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName};
24
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
25
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
26
    ContentAddressableStorage, ContentAddressableStorageServer as Server,
27
};
28
use nativelink_proto::build::bazel::remote::execution::v2::{
29
    BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
30
    BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse,
31
    GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response,
32
    compressor,
33
};
34
use nativelink_proto::google::rpc::Status as GrpcStatus;
35
use nativelink_store::ac_utils::get_and_decode_digest;
36
use nativelink_store::grpc_store::GrpcStore;
37
use nativelink_store::store_manager::StoreManager;
38
use nativelink_util::common::DigestInfo;
39
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
40
use nativelink_util::store_trait::{Store, StoreLike};
41
use opentelemetry::context::FutureExt;
42
use tonic::{Request, Response, Status};
43
use tracing::{Instrument, Level, debug, error_span, instrument};
44
45
#[derive(Debug)]
46
pub struct CasServer {
47
    stores: HashMap<String, Store>,
48
}
49
50
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;
51
52
/// Per-blob deadline applied inside `BatchReadBlobs` / `BatchUpdateBlobs`.
53
const BATCH_PER_BLOB_TIMEOUT: Duration = Duration::from_secs(30);
54
55
impl CasServer {
56
10
    pub fn new(
57
10
        configs: &[WithInstanceName<CasStoreConfig>],
58
10
        store_manager: &StoreManager,
59
10
    ) -> Result<Self, Error> {
60
10
        let mut stores = HashMap::with_capacity(configs.len());
61
10
        for config in configs {
62
10
            let store = store_manager.get_store(&config.cas_store).ok_or_else(|| 
{0
63
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
64
0
            })?;
65
10
            stores.insert(config.instance_name.clone(), store);
66
        }
67
10
        Ok(Self { stores })
68
10
    }
69
70
0
    pub fn into_service(self) -> Server<Self> {
71
0
        Server::new(self)
72
0
    }
73
74
4
    async fn inner_find_missing_blobs(
75
4
        &self,
76
4
        request: FindMissingBlobsRequest,
77
4
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
78
4
        let instance_name = &request.instance_name;
79
4
        let store = self
80
4
            .stores
81
4
            .get(instance_name)
82
4
            .err_tip(|| 
format!0
("'instance_name' not configured for '{instance_name}'"))
?0
83
4
            .clone();
84
85
4
        let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
86
7
        for digest in 
&request.blob_digests4
{
87
7
            requested_blobs.
push6
(DigestInfo::try_from(digest.clone())
?1
.
into6
());
88
        }
89
3
        let sizes = store
90
3
            .has_many(&requested_blobs)
91
3
            .await
92
3
            .err_tip(|| "In find_missing_blobs")
?0
;
93
3
        let missing_blob_digests = sizes
94
3
            .into_iter()
95
3
            .zip(request.blob_digests)
96
5
            .
filter_map3
(|(maybe_size, digest)| maybe_size.map_or_else(|| Some(
digest2
), |_| None))
97
3
            .collect();
98
99
3
        Ok(Response::new(FindMissingBlobsResponse {
100
3
            missing_blob_digests,
101
3
        }))
102
4
    }
103
104
3
    async fn inner_batch_update_blobs(
105
3
        &self,
106
3
        request: BatchUpdateBlobsRequest,
107
3
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
108
3
        let instance_name = &request.instance_name;
109
110
3
        let store = self
111
3
            .stores
112
3
            .get(instance_name)
113
3
            .err_tip(|| 
format!0
("'instance_name' not configured for '{instance_name}'"))
?0
114
3
            .clone();
115
116
        // If we are a GrpcStore we shortcut here, as this is a special store.
117
        // Note: We don't know the digests here, so we try perform a very shallow
118
        // check to see if it's a grpc store.
119
3
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
120
0
            return grpc_store.batch_update_blobs(Request::new(request)).await;
121
3
        }
122
123
3
        let store_ref = &store;
124
3
        let update_futures: FuturesUnordered<_> = request
125
3
            .requests
126
3
            .into_iter()
127
4
            .
map3
(|request| async move {
128
4
                let digest = request
129
4
                    .digest
130
4
                    .clone()
131
4
                    .err_tip(|| "Digest not found in request")
?0
;
132
4
                let request_data = request.data;
133
4
                let digest_info = DigestInfo::try_from(digest.clone())
?0
;
134
4
                let size_bytes = usize::try_from(digest_info.size_bytes())
135
4
                    .err_tip(|| "Digest size_bytes was not convertible to usize")
?0
;
136
0
                error_if!(
137
4
                    size_bytes != request_data.len(),
138
                    "Digest for upload had mismatching sizes, digest said {} data  said {}",
139
                    size_bytes,
140
0
                    request_data.len()
141
                );
142
                // Apply a per-blob deadline so one slow upload does not
143
                // make the whole batch hit the client's overall deadline.
144
4
                let result = match tokio::time::timeout(
145
                    BATCH_PER_BLOB_TIMEOUT,
146
4
                    store_ref.update_oneshot(digest_info, request_data),
147
                )
148
4
                .await
149
                {
150
3
                    Ok(r) => r.err_tip(|| "Error writing to store"),
151
1
                    Err(_elapsed) => Err(make_err!(
152
1
                        Code::DeadlineExceeded,
153
1
                        "BatchUpdateBlobs per-blob timeout ({} s) elapsed for digest {}",
154
1
                        BATCH_PER_BLOB_TIMEOUT.as_secs(),
155
1
                        digest_info,
156
1
                    )),
157
                };
158
                Ok::<_, Error>(batch_update_blobs_response::Response {
159
4
                    digest: Some(digest),
160
4
                    status: Some(result.map_or_else(Into::into, |()| 
GrpcStatus::default3
())),
161
                })
162
8
            })
163
3
            .collect();
164
3
        let responses = update_futures
165
3
            .try_collect::<Vec<batch_update_blobs_response::Response>>()
166
3
            .await
?0
;
167
168
3
        Ok(Response::new(BatchUpdateBlobsResponse { responses }))
169
3
    }
170
171
2
    async fn inner_batch_read_blobs(
172
2
        &self,
173
2
        request: BatchReadBlobsRequest,
174
2
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
175
2
        let instance_name = &request.instance_name;
176
177
2
        let store = self
178
2
            .stores
179
2
            .get(instance_name)
180
2
            .err_tip(|| 
format!0
("'instance_name' not configured for '{instance_name}'"))
?0
181
2
            .clone();
182
183
        // If we are a GrpcStore we shortcut here, as this is a special store.
184
        // Note: We don't know the digests here, so we try perform a very shallow
185
        // check to see if it's a grpc store.
186
2
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
187
0
            return grpc_store.batch_read_blobs(Request::new(request)).await;
188
2
        }
189
190
2
        let store_ref = &store;
191
2
        let read_futures: FuturesUnordered<_> = request
192
2
            .digests
193
2
            .into_iter()
194
4
            .
map2
(|digest| async move {
195
4
                let digest_copy = DigestInfo::try_from(digest.clone())
?0
;
196
                // TODO(palfrey) There is a security risk here of someone taking all the memory on the instance.
197
                // Apply a per-blob deadline so one slow read does not
198
                // make the whole batch hit the client's overall deadline.
199
4
                let result = match tokio::time::timeout(
200
                    BATCH_PER_BLOB_TIMEOUT,
201
4
                    store_ref.get_part_unchunked(digest_copy, 0, None),
202
                )
203
4
                .await
204
                {
205
3
                    Ok(r) => r.err_tip(|| "Error reading from store"),
206
1
                    Err(_elapsed) => Err(make_err!(
207
1
                        Code::DeadlineExceeded,
208
1
                        "BatchReadBlobs per-blob timeout ({} s) elapsed for digest {}",
209
1
                        BATCH_PER_BLOB_TIMEOUT.as_secs(),
210
1
                        digest_copy,
211
1
                    )),
212
                };
213
4
                let (status, data) = result.map_or_else(
214
2
                    |mut e| {
215
2
                        if e.code == Code::NotFound {
216
1
                            // Trim the error code. Not Found is quite common and we don't want to send a large
217
1
                            // error (debug) message for something that is common. We resize to just the last
218
1
                            // message as it will be the most relevant.
219
1
                            e.messages.resize_with(1, String::new);
220
1
                        }
221
2
                        (e.into(), Bytes::new())
222
2
                    },
223
2
                    |v| (GrpcStatus::default(), v),
224
                );
225
4
                Ok::<_, Error>(batch_read_blobs_response::Response {
226
4
                    status: Some(status),
227
4
                    digest: Some(digest),
228
4
                    compressor: compressor::Value::Identity.into(),
229
4
                    data,
230
4
                })
231
8
            })
232
2
            .collect();
233
2
        let responses = read_futures
234
2
            .try_collect::<Vec<batch_read_blobs_response::Response>>()
235
2
            .await
?0
;
236
237
2
        Ok(Response::new(BatchReadBlobsResponse { responses }))
238
2
    }
239
240
6
    async fn inner_get_tree(
241
6
        &self,
242
6
        request: GetTreeRequest,
243
6
    ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> {
244
6
        let instance_name = &request.instance_name;
245
246
6
        let store = self
247
6
            .stores
248
6
            .get(instance_name)
249
6
            .err_tip(|| 
format!0
("'instance_name' not configured for '{instance_name}'"))
?0
250
6
            .clone();
251
252
        // If we are a GrpcStore we shortcut here, as this is a special store.
253
        // Note: We don't know the digests here, so we try perform a very shallow
254
        // check to see if it's a grpc store.
255
6
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
256
0
            let stream = grpc_store
257
0
                .get_tree(Request::new(request))
258
0
                .await?
259
0
                .into_inner();
260
0
            return Ok(stream.left_stream());
261
6
        }
262
6
        let root_digest: DigestInfo = request
263
6
            .root_digest
264
6
            .err_tip(|| "Expected root_digest to exist in GetTreeRequest")
?0
265
6
            .try_into()
266
6
            .err_tip(|| "In GetTreeRequest::root_digest")
?0
;
267
268
6
        let mut deque: VecDeque<DigestInfo> = VecDeque::new();
269
6
        let mut directories: Vec<Directory> = Vec::new();
270
        // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
271
6
        let page_token_digest = if request.page_token.is_empty() {
272
2
            root_digest
273
        } else {
274
4
            let mut page_token_parts = request.page_token.split('-');
275
4
            DigestInfo::try_new(
276
4
                page_token_parts
277
4
                    .next()
278
4
                    .err_tip(|| "Failed to parse `hash_str` in `page_token`")
?0
,
279
4
                page_token_parts
280
4
                    .next()
281
4
                    .err_tip(|| "Failed to parse `size_bytes` in `page_token`")
?0
282
4
                    .parse::<i64>()
283
4
                    .err_tip(|| "Failed to parse `size_bytes` as i64")
?0
,
284
            )
285
4
            .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")
?0
286
        };
287
6
        let page_size = request.page_size;
288
        // If `page_size` is 0, paging is not necessary.
289
6
        let mut page_token_matched = page_size == 0;
290
6
        deque.push_back(root_digest);
291
292
28
        while !deque.is_empty() {
293
26
            let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")
?0
;
294
26
            let directory = get_and_decode_digest::<Directory>(&store, digest.into())
295
26
                .await
296
26
                .err_tip(|| "Converting digest to Directory")
?0
;
297
26
            if digest == page_token_digest {
298
6
                page_token_matched = true;
299
20
            }
300
30
            for directory in 
&directory.directories26
{
301
30
                let digest: DigestInfo = directory
302
30
                    .digest
303
30
                    .clone()
304
30
                    .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
305
30
                    .try_into()
306
30
                    .err_tip(|| "In Directory::file::digest")
?0
;
307
30
                deque.push_back(digest);
308
            }
309
310
26
            let page_size_usize = usize::try_from(page_size).unwrap_or(usize::MAX);
311
312
26
            if page_token_matched {
313
20
                directories.push(directory);
314
20
                if directories.len() == page_size_usize {
315
4
                    break;
316
16
                }
317
6
            }
318
        }
319
        // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
320
        // It will be an empty string when it reached the end of the directory tree.
321
6
        let next_page_token: String = deque
322
6
            .front()
323
6
            .map_or_else(String::new, |value| 
format!3
("{value}"));
324
325
6
        Ok(futures::stream::once(async {
326
6
            Ok(GetTreeResponse {
327
6
                directories,
328
6
                next_page_token,
329
6
            })
330
6
        })
331
6
        .right_stream())
332
6
    }
333
}
334
335
#[tonic::async_trait]
336
impl ContentAddressableStorage for CasServer {
337
    type GetTreeStream = GetTreeStream;
338
339
    #[instrument(
340
        err,
341
        ret(level = Level::DEBUG),
342
        level = Level::ERROR,
343
        skip_all,
344
        fields(
345
            // Mostly to skip request.blob_digests which is sometimes enormous
346
            request.instance_name = ?grpc_request.get_ref().instance_name,
347
            request.digest_function = ?grpc_request.get_ref().digest_function
348
        )
349
    )]
350
    async fn find_missing_blobs(
351
        &self,
352
        grpc_request: Request<FindMissingBlobsRequest>,
353
    ) -> Result<Response<FindMissingBlobsResponse>, Status> {
354
        let request = grpc_request.into_inner();
355
        let digest_function = request.digest_function;
356
        self.inner_find_missing_blobs(request)
357
            .instrument(error_span!("cas_server_find_missing_blobs"))
358
            .with_context(
359
                make_ctx_for_hash_func(digest_function)
360
                    .err_tip(|| "In CasServer::find_missing_blobs")?,
361
            )
362
            .await
363
            .err_tip(|| "Failed on find_missing_blobs() command")
364
            .map_err(Into::into)
365
    }
366
367
    #[instrument(
368
        err,
369
        ret(level = Level::DEBUG),
370
        level = Level::ERROR,
371
        skip_all,
372
        fields(request = ?grpc_request.get_ref())
373
    )]
374
    async fn batch_update_blobs(
375
        &self,
376
        grpc_request: Request<BatchUpdateBlobsRequest>,
377
    ) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
378
        let request = grpc_request.into_inner();
379
        let digest_function = request.digest_function;
380
381
        self.inner_batch_update_blobs(request)
382
            .instrument(error_span!("cas_server_batch_update_blobs"))
383
            .with_context(
384
                make_ctx_for_hash_func(digest_function)
385
                    .err_tip(|| "In CasServer::batch_update_blobs")?,
386
            )
387
            .await
388
            .err_tip(|| "Failed on batch_update_blobs() command")
389
            .map_err(Into::into)
390
    }
391
392
    #[instrument(
393
        err,
394
        ret(level = Level::INFO),
395
        level = Level::ERROR,
396
        skip_all,
397
        fields(request = ?grpc_request.get_ref())
398
    )]
399
    async fn batch_read_blobs(
400
        &self,
401
        grpc_request: Request<BatchReadBlobsRequest>,
402
    ) -> Result<Response<BatchReadBlobsResponse>, Status> {
403
        let request = grpc_request.into_inner();
404
        let digest_function = request.digest_function;
405
406
        self.inner_batch_read_blobs(request)
407
            .instrument(error_span!("cas_server_batch_read_blobs"))
408
            .with_context(
409
                make_ctx_for_hash_func(digest_function)
410
                    .err_tip(|| "In CasServer::batch_read_blobs")?,
411
            )
412
            .await
413
            .err_tip(|| "Failed on batch_read_blobs() command")
414
            .map_err(Into::into)
415
    }
416
417
    #[instrument(
418
        err,
419
        level = Level::ERROR,
420
        skip_all,
421
        fields(request = ?grpc_request.get_ref())
422
    )]
423
    async fn get_tree(
424
        &self,
425
        grpc_request: Request<GetTreeRequest>,
426
    ) -> Result<Response<Self::GetTreeStream>, Status> {
427
        let request = grpc_request.into_inner();
428
        let digest_function = request.digest_function;
429
430
        let resp = self
431
            .inner_get_tree(request)
432
            .instrument(error_span!("cas_server_get_tree"))
433
            .with_context(
434
                make_ctx_for_hash_func(digest_function).err_tip(|| "In CasServer::get_tree")?,
435
            )
436
            .await
437
            .err_tip(|| "Failed on get_tree() command")
438
6
            .map(|stream| -> Response<Self::GetTreeStream> { Response::new(Box::pin(stream)) })
439
            .map_err(Into::into);
440
441
        if resp.is_ok() {
442
            debug!(return = "Ok(<stream>)");
443
        }
444
        resp
445
    }
446
}