Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/cas_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use std::collections::{HashMap, VecDeque};
18
19
use bytes::Bytes;
20
use futures::stream::{FuturesUnordered, Stream};
21
use futures::{StreamExt, TryStreamExt};
22
use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName};
23
use nativelink_error::{Code, Error, ResultExt, error_if, make_input_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
25
    ContentAddressableStorage, ContentAddressableStorageServer as Server,
26
};
27
use nativelink_proto::build::bazel::remote::execution::v2::{
28
    BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
29
    BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse,
30
    GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response,
31
    compressor,
32
};
33
use nativelink_proto::google::rpc::Status as GrpcStatus;
34
use nativelink_store::ac_utils::get_and_decode_digest;
35
use nativelink_store::grpc_store::GrpcStore;
36
use nativelink_store::store_manager::StoreManager;
37
use nativelink_util::common::DigestInfo;
38
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
39
use nativelink_util::store_trait::{Store, StoreLike};
40
use opentelemetry::context::FutureExt;
41
use tonic::{Request, Response, Status};
42
use tracing::{Instrument, Level, debug, error_span, instrument};
43
44
#[derive(Debug)]
45
pub struct CasServer {
46
    stores: HashMap<String, Store>,
47
}
48
49
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;
50
51
impl CasServer {
52
8
    pub fn new(
53
8
        configs: &[WithInstanceName<CasStoreConfig>],
54
8
        store_manager: &StoreManager,
55
8
    ) -> Result<Self, Error> {
56
8
        let mut stores = HashMap::with_capacity(configs.len());
57
16
        for 
config8
in configs {
58
8
            let store = store_manager.get_store(&config.cas_store).ok_or_else(|| 
{0
59
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
60
0
            })?;
61
8
            stores.insert(config.instance_name.to_string(), store);
62
        }
63
8
        Ok(Self { stores })
64
8
    }
65
66
0
    pub fn into_service(self) -> Server<Self> {
67
0
        Server::new(self)
68
0
    }
69
70
4
    async fn inner_find_missing_blobs(
71
4
        &self,
72
4
        request: FindMissingBlobsRequest,
73
4
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
74
4
        let instance_name = &request.instance_name;
75
4
        let store = self
76
4
            .stores
77
4
            .get(instance_name)
78
4
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
79
4
            .clone();
80
81
4
        let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
82
10
        for 
digest7
in &request.blob_digests {
83
7
            requested_blobs.
push6
(DigestInfo::try_from(digest.clone())
?1
.
into6
());
84
        }
85
3
        let sizes = store
86
3
            .has_many(&requested_blobs)
87
3
            .await
88
3
            .err_tip(|| "In find_missing_blobs")
?0
;
89
3
        let missing_blob_digests = sizes
90
3
            .into_iter()
91
3
            .zip(request.blob_digests)
92
5
            .
filter_map3
(|(maybe_size, digest)| maybe_size.map_or_else(|| Some(
digest2
), |_| None))
93
3
            .collect();
94
95
3
        Ok(Response::new(FindMissingBlobsResponse {
96
3
            missing_blob_digests,
97
3
        }))
98
4
    }
99
100
2
    async fn inner_batch_update_blobs(
101
2
        &self,
102
2
        request: BatchUpdateBlobsRequest,
103
2
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
104
2
        let instance_name = &request.instance_name;
105
106
2
        let store = self
107
2
            .stores
108
2
            .get(instance_name)
109
2
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
110
2
            .clone();
111
112
        // If we are a GrpcStore we shortcut here, as this is a special store.
113
        // Note: We don't know the digests here, so we try perform a very shallow
114
        // check to see if it's a grpc store.
115
2
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (115:16): [True: 0, False: 2]
  Branch (115:16): [Folded - Ignored]
116
0
            return grpc_store.batch_update_blobs(Request::new(request)).await;
117
2
        }
118
119
2
        let store_ref = &store;
120
2
        let update_futures: FuturesUnordered<_> = request
121
2
            .requests
122
2
            .into_iter()
123
3
            .
map2
(|request| async move {
124
3
                let digest = request
125
3
                    .digest
126
3
                    .clone()
127
3
                    .err_tip(|| "Digest not found in request")
?0
;
128
3
                let request_data = request.data;
129
3
                let digest_info = DigestInfo::try_from(digest.clone())
?0
;
130
3
                let size_bytes = usize::try_from(digest_info.size_bytes())
131
3
                    .err_tip(|| "Digest size_bytes was not convertible to usize")
?0
;
132
0
                error_if!(
133
3
                    size_bytes != request_data.len(),
  Branch (133:21): [True: 0, False: 3]
  Branch (133:21): [Folded - Ignored]
134
                    "Digest for upload had mismatching sizes, digest said {} data  said {}",
135
                    size_bytes,
136
0
                    request_data.len()
137
                );
138
3
                let result = store_ref
139
3
                    .update_oneshot(digest_info, request_data)
140
3
                    .await
141
3
                    .err_tip(|| "Error writing to store");
142
                Ok::<_, Error>(batch_update_blobs_response::Response {
143
3
                    digest: Some(digest),
144
3
                    status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())),
145
                })
146
6
            })
147
2
            .collect();
148
2
        let responses = update_futures
149
2
            .try_collect::<Vec<batch_update_blobs_response::Response>>()
150
2
            .await
?0
;
151
152
2
        Ok(Response::new(BatchUpdateBlobsResponse { responses }))
153
2
    }
154
155
1
    async fn inner_batch_read_blobs(
156
1
        &self,
157
1
        request: BatchReadBlobsRequest,
158
1
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
159
1
        let instance_name = &request.instance_name;
160
161
1
        let store = self
162
1
            .stores
163
1
            .get(instance_name)
164
1
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
165
1
            .clone();
166
167
        // If we are a GrpcStore we shortcut here, as this is a special store.
168
        // Note: We don't know the digests here, so we try perform a very shallow
169
        // check to see if it's a grpc store.
170
1
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (170:16): [True: 0, False: 1]
  Branch (170:16): [Folded - Ignored]
171
0
            return grpc_store.batch_read_blobs(Request::new(request)).await;
172
1
        }
173
174
1
        let store_ref = &store;
175
1
        let read_futures: FuturesUnordered<_> = request
176
1
            .digests
177
1
            .into_iter()
178
3
            .
map1
(|digest| async move {
179
3
                let digest_copy = DigestInfo::try_from(digest.clone())
?0
;
180
                // TODO(aaronmondal) There is a security risk here of someone taking all the memory on the instance.
181
3
                let result = store_ref
182
3
                    .get_part_unchunked(digest_copy, 0, None)
183
3
                    .await
184
3
                    .err_tip(|| "Error reading from store");
185
3
                let (status, data) = result.map_or_else(
186
1
                    |mut e| {
187
1
                        if e.code == Code::NotFound {
  Branch (187:28): [True: 1, False: 0]
  Branch (187:28): [Folded - Ignored]
188
1
                            // Trim the error code. Not Found is quite common and we don't want to send a large
189
1
                            // error (debug) message for something that is common. We resize to just the last
190
1
                            // message as it will be the most relevant.
191
1
                            e.messages.resize_with(1, String::new);
192
1
                        
}0
193
1
                        (e.into(), Bytes::new())
194
1
                    },
195
2
                    |v| (GrpcStatus::default(), v),
196
                );
197
3
                Ok::<_, Error>(batch_read_blobs_response::Response {
198
3
                    status: Some(status),
199
3
                    digest: Some(digest),
200
3
                    compressor: compressor::Value::Identity.into(),
201
3
                    data,
202
3
                })
203
6
            })
204
1
            .collect();
205
1
        let responses = read_futures
206
1
            .try_collect::<Vec<batch_read_blobs_response::Response>>()
207
1
            .await
?0
;
208
209
1
        Ok(Response::new(BatchReadBlobsResponse { responses }))
210
1
    }
211
212
6
    async fn inner_get_tree(
213
6
        &self,
214
6
        request: GetTreeRequest,
215
6
    ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> {
216
6
        let instance_name = &request.instance_name;
217
218
6
        let store = self
219
6
            .stores
220
6
            .get(instance_name)
221
6
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
222
6
            .clone();
223
224
        // If we are a GrpcStore we shortcut here, as this is a special store.
225
        // Note: We don't know the digests here, so we try perform a very shallow
226
        // check to see if it's a grpc store.
227
6
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (227:16): [True: 0, False: 6]
  Branch (227:16): [Folded - Ignored]
228
0
            let stream = grpc_store
229
0
                .get_tree(Request::new(request))
230
0
                .await?
231
0
                .into_inner();
232
0
            return Ok(stream.left_stream());
233
6
        }
234
6
        let root_digest: DigestInfo = request
235
6
            .root_digest
236
6
            .err_tip(|| "Expected root_digest to exist in GetTreeRequest")
?0
237
6
            .try_into()
238
6
            .err_tip(|| "In GetTreeRequest::root_digest")
?0
;
239
240
6
        let mut deque: VecDeque<DigestInfo> = VecDeque::new();
241
6
        let mut directories: Vec<Directory> = Vec::new();
242
        // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
243
6
        let page_token_digest = if request.page_token.is_empty() {
  Branch (243:36): [True: 2, False: 4]
  Branch (243:36): [Folded - Ignored]
244
2
            root_digest
245
        } else {
246
4
            let mut page_token_parts = request.page_token.split('-');
247
4
            DigestInfo::try_new(
248
4
                page_token_parts
249
4
                    .next()
250
4
                    .err_tip(|| "Failed to parse `hash_str` in `page_token`")
?0
,
251
4
                page_token_parts
252
4
                    .next()
253
4
                    .err_tip(|| "Failed to parse `size_bytes` in `page_token`")
?0
254
4
                    .parse::<i64>()
255
4
                    .err_tip(|| "Failed to parse `size_bytes` as i64")
?0
,
256
            )
257
4
            .err_tip(|| "Failed to parse `page_token` as `Digest` in `GetTreeRequest`")
?0
258
        };
259
6
        let page_size = request.page_size;
260
        // If `page_size` is 0, paging is not necessary.
261
6
        let mut page_token_matched = page_size == 0;
262
6
        deque.push_back(root_digest);
263
264
28
        while !deque.is_empty() {
  Branch (264:15): [True: 26, False: 2]
  Branch (264:15): [Folded - Ignored]
265
26
            let digest: DigestInfo = deque.pop_front().err_tip(|| "In VecDeque::pop_front")
?0
;
266
26
            let directory = get_and_decode_digest::<Directory>(&store, digest.into())
267
26
                .await
268
26
                .err_tip(|| "Converting digest to Directory")
?0
;
269
26
            if digest == page_token_digest {
  Branch (269:16): [True: 6, False: 20]
  Branch (269:16): [Folded - Ignored]
270
6
                page_token_matched = true;
271
20
            }
272
56
            for 
directory30
in &directory.directories {
273
30
                let digest: DigestInfo = directory
274
30
                    .digest
275
30
                    .clone()
276
30
                    .err_tip(|| "Expected Digest to exist in Directory::directories::digest")
?0
277
30
                    .try_into()
278
30
                    .err_tip(|| "In Directory::file::digest")
?0
;
279
30
                deque.push_back(digest);
280
            }
281
26
            if page_token_matched {
  Branch (281:16): [True: 20, False: 6]
  Branch (281:16): [Folded - Ignored]
282
20
                directories.push(directory);
283
20
                if directories.len() as i32 == page_size {
  Branch (283:20): [True: 4, False: 16]
  Branch (283:20): [Folded - Ignored]
284
4
                    break;
285
16
                }
286
6
            }
287
        }
288
        // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
289
        // It will be an empty string when it reached the end of the directory tree.
290
6
        let next_page_token: String = deque
291
6
            .front()
292
6
            .map_or_else(String::new, |value| format!(
"{value}"3
));
293
294
6
        Ok(futures::stream::once(async {
295
6
            Ok(GetTreeResponse {
296
6
                directories,
297
6
                next_page_token,
298
6
            })
299
6
        })
300
6
        .right_stream())
301
6
    }
302
}
303
304
#[tonic::async_trait]
305
impl ContentAddressableStorage for CasServer {
306
    type GetTreeStream = GetTreeStream;
307
308
    #[instrument(
309
        err,
310
        ret(level = Level::DEBUG),
311
        level = Level::ERROR,
312
        skip_all,
313
        fields(request = ?grpc_request.get_ref())
314
    )]
315
    async fn find_missing_blobs(
316
        &self,
317
        grpc_request: Request<FindMissingBlobsRequest>,
318
8
    ) -> Result<Response<FindMissingBlobsResponse>, Status> {
319
4
        let request = grpc_request.into_inner();
320
4
        let digest_function = request.digest_function;
321
4
        self.inner_find_missing_blobs(request)
322
4
            .instrument(error_span!("cas_server_find_missing_blobs"))
323
4
            .with_context(
324
4
                make_ctx_for_hash_func(digest_function)
325
4
                    .err_tip(|| "In CasServer::find_missing_blobs")
?0
,
326
            )
327
4
            .await
328
4
            .err_tip(|| "Failed on find_missing_blobs() command")
329
4
            .map_err(Into::into)
330
8
    }
331
332
    #[instrument(
333
        err,
334
        ret(level = Level::DEBUG),
335
        level = Level::ERROR,
336
        skip_all,
337
        fields(request = ?grpc_request.get_ref())
338
    )]
339
    async fn batch_update_blobs(
340
        &self,
341
        grpc_request: Request<BatchUpdateBlobsRequest>,
342
4
    ) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
343
2
        let request = grpc_request.into_inner();
344
2
        let digest_function = request.digest_function;
345
346
2
        self.inner_batch_update_blobs(request)
347
2
            .instrument(error_span!("cas_server_batch_update_blobs"))
348
2
            .with_context(
349
2
                make_ctx_for_hash_func(digest_function)
350
2
                    .err_tip(|| "In CasServer::batch_update_blobs")
?0
,
351
            )
352
2
            .await
353
2
            .err_tip(|| "Failed on batch_update_blobs() command")
354
2
            .map_err(Into::into)
355
4
    }
356
357
    #[instrument(
358
        err,
359
        ret(level = Level::INFO),
360
        level = Level::ERROR,
361
        skip_all,
362
        fields(request = ?grpc_request.get_ref())
363
    )]
364
    async fn batch_read_blobs(
365
        &self,
366
        grpc_request: Request<BatchReadBlobsRequest>,
367
2
    ) -> Result<Response<BatchReadBlobsResponse>, Status> {
368
1
        let request = grpc_request.into_inner();
369
1
        let digest_function = request.digest_function;
370
371
1
        self.inner_batch_read_blobs(request)
372
1
            .instrument(error_span!("cas_server_batch_read_blobs"))
373
1
            .with_context(
374
1
                make_ctx_for_hash_func(digest_function)
375
1
                    .err_tip(|| "In CasServer::batch_read_blobs")
?0
,
376
            )
377
1
            .await
378
1
            .err_tip(|| "Failed on batch_read_blobs() command")
379
1
            .map_err(Into::into)
380
2
    }
381
382
    #[instrument(
383
        err,
384
        level = Level::ERROR,
385
        skip_all,
386
        fields(request = ?grpc_request.get_ref())
387
    )]
388
    async fn get_tree(
389
        &self,
390
        grpc_request: Request<GetTreeRequest>,
391
12
    ) -> Result<Response<Self::GetTreeStream>, Status> {
392
6
        let request = grpc_request.into_inner();
393
6
        let digest_function = request.digest_function;
394
395
6
        let resp = self
396
6
            .inner_get_tree(request)
397
6
            .instrument(error_span!("cas_server_get_tree"))
398
6
            .with_context(
399
6
                make_ctx_for_hash_func(digest_function).err_tip(|| "In CasServer::get_tree")
?0
,
400
            )
401
6
            .await
402
6
            .err_tip(|| "Failed on get_tree() command")
403
6
            .map(|stream| -> Response<Self::GetTreeStream> { Response::new(Box::pin(stream)) })
404
6
            .map_err(Into::into);
405
406
6
        if resp.is_ok() {
  Branch (406:12): [True: 6, False: 0]
  Branch (406:12): [Folded - Ignored]
407
6
            debug!(return = "Ok(<stream>)");
408
0
        }
409
6
        resp
410
12
    }
411
}