Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/cas_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, VecDeque};
16
use std::convert::Into;
17
use std::pin::Pin;
18
19
use bytes::Bytes;
20
use futures::stream::{FuturesUnordered, Stream};
21
use futures::{StreamExt, TryStreamExt};
22
use nativelink_config::cas_server::{CasStoreConfig, InstanceName};
23
use nativelink_error::{Code, Error, ResultExt, error_if, make_input_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{
25
    ContentAddressableStorage, ContentAddressableStorageServer as Server,
26
};
27
use nativelink_proto::build::bazel::remote::execution::v2::{
28
    BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
29
    BatchUpdateBlobsResponse, Directory, FindMissingBlobsRequest, FindMissingBlobsResponse,
30
    GetTreeRequest, GetTreeResponse, batch_read_blobs_response, batch_update_blobs_response,
31
    compressor,
32
};
33
use nativelink_proto::google::rpc::Status as GrpcStatus;
34
use nativelink_store::ac_utils::get_and_decode_digest;
35
use nativelink_store::grpc_store::GrpcStore;
36
use nativelink_store::store_manager::StoreManager;
37
use nativelink_util::common::DigestInfo;
38
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
39
use nativelink_util::origin_event::OriginEventContext;
40
use nativelink_util::store_trait::{Store, StoreLike};
41
use tonic::{Request, Response, Status};
42
use tracing::{Level, error_span, event, instrument};
43
44
#[derive(Debug)]
45
pub struct CasServer {
46
    stores: HashMap<String, Store>,
47
}
48
49
type GetTreeStream = Pin<Box<dyn Stream<Item = Result<GetTreeResponse, Status>> + Send + 'static>>;
50
51
impl CasServer {
52
8
    pub fn new(
53
8
        config: &HashMap<InstanceName, CasStoreConfig>,
54
8
        store_manager: &StoreManager,
55
8
    ) -> Result<Self, Error> {
56
8
        let mut stores = HashMap::with_capacity(config.len());
57
16
        for (
instance_name, cas_cfg8
) in config {
58
8
            let store = store_manager.get_store(&cas_cfg.cas_store).ok_or_else(|| {
59
0
                make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store)
60
0
            })?;
61
8
            stores.insert(instance_name.to_string(), store);
62
        }
63
8
        Ok(CasServer { stores })
64
8
    }
65
66
0
    pub fn into_service(self) -> Server<CasServer> {
67
0
        Server::new(self)
68
0
    }
69
70
4
    async fn inner_find_missing_blobs(
71
4
        &self,
72
4
        request: FindMissingBlobsRequest,
73
4
    ) -> Result<Response<FindMissingBlobsResponse>, Error> {
74
4
        let instance_name = &request.instance_name;
75
4
        let store = self
76
4
            .stores
77
4
            .get(instance_name)
78
4
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
79
4
            .clone();
80
4
81
4
        let mut requested_blobs = Vec::with_capacity(request.blob_digests.len());
82
10
        for 
digest7
in &request.blob_digests {
83
7
            requested_blobs.
push6
(DigestInfo::try_from(digest.clone())
?1
.
into6
());
84
        }
85
3
        let sizes = store
86
3
            .has_many(&requested_blobs)
87
3
            .await
88
3
            .err_tip(|| 
"In find_missing_blobs"0
)
?0
;
89
3
        let missing_blob_digests = sizes
90
3
            .into_iter()
91
3
            .zip(request.blob_digests)
92
5
            .filter_map(|(maybe_size, digest)| maybe_size.map_or_else(|| 
Some(digest)2
, |_|
None3
))
93
3
            .collect();
94
3
95
3
        Ok(Response::new(FindMissingBlobsResponse {
96
3
            missing_blob_digests,
97
3
        }))
98
4
    }
99
100
2
    async fn inner_batch_update_blobs(
101
2
        &self,
102
2
        request: BatchUpdateBlobsRequest,
103
2
    ) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
104
2
        let instance_name = &request.instance_name;
105
106
2
        let store = self
107
2
            .stores
108
2
            .get(instance_name)
109
2
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
110
2
            .clone();
111
112
        // If we are a GrpcStore we shortcut here, as this is a special store.
113
        // Note: We don't know the digests here, so we try perform a very shallow
114
        // check to see if it's a grpc store.
115
2
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (115:16): [True: 0, False: 2]
  Branch (115:16): [Folded - Ignored]
116
0
            return grpc_store.batch_update_blobs(Request::new(request)).await;
117
2
        }
118
2
119
2
        let store_ref = &store;
120
2
        let update_futures: FuturesUnordered<_> = request
121
2
            .requests
122
2
            .into_iter()
123
3
            .map(|request| async move {
124
3
                let digest = request
125
3
                    .digest
126
3
                    .clone()
127
3
                    .err_tip(|| 
"Digest not found in request"0
)
?0
;
128
3
                let request_data = request.data;
129
3
                let digest_info = DigestInfo::try_from(digest.clone())
?0
;
130
3
                let size_bytes = usize::try_from(digest_info.size_bytes())
131
3
                    .err_tip(|| 
"Digest size_bytes was not convertible to usize"0
)
?0
;
132
0
                error_if!(
133
3
                    size_bytes != request_data.len(),
  Branch (133:21): [True: 0, False: 3]
  Branch (133:21): [Folded - Ignored]
134
                    "Digest for upload had mismatching sizes, digest said {} data  said {}",
135
                    size_bytes,
136
0
                    request_data.len()
137
                );
138
3
                let result = store_ref
139
3
                    .update_oneshot(digest_info, request_data)
140
3
                    .await
141
3
                    .err_tip(|| 
"Error writing to store"0
);
142
                Ok::<_, Error>(batch_update_blobs_response::Response {
143
3
                    digest: Some(digest),
144
3
                    status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())),
145
                })
146
6
            })
147
2
            .collect();
148
2
        let responses = update_futures
149
2
            .try_collect::<Vec<batch_update_blobs_response::Response>>()
150
2
            .await
?0
;
151
152
2
        Ok(Response::new(BatchUpdateBlobsResponse { responses }))
153
2
    }
154
155
1
    async fn inner_batch_read_blobs(
156
1
        &self,
157
1
        request: BatchReadBlobsRequest,
158
1
    ) -> Result<Response<BatchReadBlobsResponse>, Error> {
159
1
        let instance_name = &request.instance_name;
160
161
1
        let store = self
162
1
            .stores
163
1
            .get(instance_name)
164
1
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
165
1
            .clone();
166
167
        // If we are a GrpcStore we shortcut here, as this is a special store.
168
        // Note: We don't know the digests here, so we try perform a very shallow
169
        // check to see if it's a grpc store.
170
1
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (170:16): [True: 0, False: 1]
  Branch (170:16): [Folded - Ignored]
171
0
            return grpc_store.batch_read_blobs(Request::new(request)).await;
172
1
        }
173
1
174
1
        let store_ref = &store;
175
1
        let read_futures: FuturesUnordered<_> = request
176
1
            .digests
177
1
            .into_iter()
178
3
            .map(|digest| async move {
179
3
                let digest_copy = DigestInfo::try_from(digest.clone())
?0
;
180
                // TODO(allada) There is a security risk here of someone taking all the memory on the instance.
181
3
                let result = store_ref
182
3
                    .get_part_unchunked(digest_copy, 0, None)
183
3
                    .await
184
3
                    .err_tip(|| 
"Error reading from store"1
);
185
3
                let (status, data) = result.map_or_else(
186
1
                    |mut e| {
187
1
                        if e.code == Code::NotFound {
  Branch (187:28): [True: 1, False: 0]
  Branch (187:28): [Folded - Ignored]
188
1
                            // Trim the error code. Not Found is quite common and we don't want to send a large
189
1
                            // error (debug) message for something that is common. We resize to just the last
190
1
                            // message as it will be the most relevant.
191
1
                            e.messages.resize_with(1, String::new);
192
1
                        
}0
193
1
                        (e.into(), Bytes::new())
194
1
                    },
195
2
                    |v| (GrpcStatus::default(), v),
196
                );
197
3
                Ok::<_, Error>(batch_read_blobs_response::Response {
198
3
                    status: Some(status),
199
3
                    digest: Some(digest),
200
3
                    compressor: compressor::Value::Identity.into(),
201
3
                    data,
202
3
                })
203
6
            })
204
1
            .collect();
205
1
        let responses = read_futures
206
1
            .try_collect::<Vec<batch_read_blobs_response::Response>>()
207
1
            .await
?0
;
208
209
1
        Ok(Response::new(BatchReadBlobsResponse { responses }))
210
1
    }
211
212
6
    async fn inner_get_tree(
213
6
        &self,
214
6
        request: GetTreeRequest,
215
6
    ) -> Result<impl Stream<Item = Result<GetTreeResponse, Status>> + Send + use<>, Error> {
216
6
        let instance_name = &request.instance_name;
217
218
6
        let store = self
219
6
            .stores
220
6
            .get(instance_name)
221
6
            .err_tip(|| 
format!("'instance_name' not configured for '{instance_name}'")0
)
?0
222
6
            .clone();
223
224
        // If we are a GrpcStore we shortcut here, as this is a special store.
225
        // Note: We don't know the digests here, so we try perform a very shallow
226
        // check to see if it's a grpc store.
227
6
        if let Some(
grpc_store0
) = store.downcast_ref::<GrpcStore>(None) {
  Branch (227:16): [True: 0, False: 6]
  Branch (227:16): [Folded - Ignored]
228
0
            let stream = grpc_store
229
0
                .get_tree(Request::new(request))
230
0
                .await?
231
0
                .into_inner();
232
0
            return Ok(stream.left_stream());
233
6
        }
234
6
        let root_digest: DigestInfo = request
235
6
            .root_digest
236
6
            .err_tip(|| 
"Expected root_digest to exist in GetTreeRequest"0
)
?0
237
6
            .try_into()
238
6
            .err_tip(|| 
"In GetTreeRequest::root_digest"0
)
?0
;
239
240
6
        let mut deque: VecDeque<DigestInfo> = VecDeque::new();
241
6
        let mut directories: Vec<Directory> = Vec::new();
242
        // `page_token` will return the `{hash_str}-{size_bytes}` of the current request's first directory digest.
243
6
        let page_token_digest = if request.page_token.is_empty() {
  Branch (243:36): [True: 2, False: 4]
  Branch (243:36): [Folded - Ignored]
244
2
            root_digest
245
        } else {
246
4
            let mut page_token_parts = request.page_token.split('-');
247
4
            DigestInfo::try_new(
248
4
                page_token_parts
249
4
                    .next()
250
4
                    .err_tip(|| 
"Failed to parse `hash_str` in `page_token`"0
)
?0
,
251
4
                page_token_parts
252
4
                    .next()
253
4
                    .err_tip(|| 
"Failed to parse `size_bytes` in `page_token`"0
)
?0
254
4
                    .parse::<i64>()
255
4
                    .err_tip(|| 
"Failed to parse `size_bytes` as i64"0
)
?0
,
256
            )
257
4
            .err_tip(|| 
"Failed to parse `page_token` as `Digest` in `GetTreeRequest`"0
)
?0
258
        };
259
6
        let page_size = request.page_size;
260
6
        // If `page_size` is 0, paging is not necessary.
261
6
        let mut page_token_matched = page_size == 0;
262
6
        deque.push_back(root_digest);
263
264
28
        while !deque.is_empty() {
  Branch (264:15): [True: 26, False: 2]
  Branch (264:15): [Folded - Ignored]
265
26
            let digest: DigestInfo = deque.pop_front().err_tip(|| 
"In VecDeque::pop_front"0
)
?0
;
266
26
            let directory = get_and_decode_digest::<Directory>(&store, digest.into())
267
26
                .await
268
26
                .err_tip(|| 
"Converting digest to Directory"0
)
?0
;
269
26
            if digest == page_token_digest {
  Branch (269:16): [True: 6, False: 20]
  Branch (269:16): [Folded - Ignored]
270
6
                page_token_matched = true;
271
20
            }
272
56
            for 
directory30
in &directory.directories {
273
30
                let digest: DigestInfo = directory
274
30
                    .digest
275
30
                    .clone()
276
30
                    .err_tip(|| 
"Expected Digest to exist in Directory::directories::digest"0
)
?0
277
30
                    .try_into()
278
30
                    .err_tip(|| 
"In Directory::file::digest"0
)
?0
;
279
30
                deque.push_back(digest);
280
            }
281
26
            if page_token_matched {
  Branch (281:16): [True: 20, False: 6]
  Branch (281:16): [Folded - Ignored]
282
20
                directories.push(directory);
283
20
                if directories.len() as i32 == page_size {
  Branch (283:20): [True: 4, False: 16]
  Branch (283:20): [Folded - Ignored]
284
4
                    break;
285
16
                }
286
6
            }
287
        }
288
        // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
289
        // It will be an empty string when it reached the end of the directory tree.
290
6
        let next_page_token: String = if let Some(
value3
) = deque.front() {
  Branch (290:46): [True: 3, False: 3]
  Branch (290:46): [Folded - Ignored]
291
3
            format!("{value}")
292
        } else {
293
3
            String::new()
294
        };
295
296
6
        Ok(futures::stream::once(async {
297
6
            Ok(GetTreeResponse {
298
6
                directories,
299
6
                next_page_token,
300
6
            })
301
6
        })
302
6
        .right_stream())
303
6
    }
304
}
305
306
#[tonic::async_trait]
307
impl ContentAddressableStorage for CasServer {
308
    type GetTreeStream = GetTreeStream;
309
310
    #[instrument(
311
        err,
312
        ret(level = Level::INFO),
313
        level = Level::ERROR,
314
        skip_all,
315
        fields(request = ?grpc_request.get_ref())
316
    )]
317
    async fn find_missing_blobs(
318
        &self,
319
        grpc_request: Request<FindMissingBlobsRequest>,
320
8
    ) -> Result<Response<FindMissingBlobsResponse>, Status> {
321
4
        let request = grpc_request.into_inner();
322
4
        let ctx = OriginEventContext::new(|| 
&request0
).await;
323
4
        let resp = make_ctx_for_hash_func(request.digest_function)
324
4
            .err_tip(|| 
"In CasServer::find_missing_blobs"0
)
?0
325
4
            .wrap_async(
326
4
                error_span!("cas_server_find_missing_blobs"),
327
4
                self.inner_find_missing_blobs(request),
328
4
            )
329
4
            .await
330
4
            .err_tip(|| 
"Failed on find_missing_blobs() command"1
)
331
4
            .map_err(Into::into);
332
4
        ctx.emit(|| 
&resp0
).await;
333
4
        resp
334
8
    }
335
336
    #[instrument(
337
        err,
338
        ret(level = Level::INFO),
339
        level = Level::ERROR,
340
        skip_all,
341
        fields(request = ?grpc_request.get_ref())
342
    )]
343
    async fn batch_update_blobs(
344
        &self,
345
        grpc_request: Request<BatchUpdateBlobsRequest>,
346
4
    ) -> Result<Response<BatchUpdateBlobsResponse>, Status> {
347
2
        let request = grpc_request.into_inner();
348
2
        let ctx = OriginEventContext::new(|| 
&request0
).await;
349
2
        let resp = make_ctx_for_hash_func(request.digest_function)
350
2
            .err_tip(|| 
"In CasServer::batch_update_blobs"0
)
?0
351
2
            .wrap_async(
352
2
                error_span!("cas_server_batch_update_blobs"),
353
2
                self.inner_batch_update_blobs(request),
354
2
            )
355
2
            .await
356
2
            .err_tip(|| 
"Failed on batch_update_blobs() command"0
)
357
2
            .map_err(Into::into);
358
2
        ctx.emit(|| 
&resp0
).await;
359
2
        resp
360
4
    }
361
362
    #[instrument(
363
        err,
364
        ret(level = Level::INFO),
365
        level = Level::ERROR,
366
        skip_all,
367
        fields(request = ?grpc_request.get_ref())
368
    )]
369
    async fn batch_read_blobs(
370
        &self,
371
        grpc_request: Request<BatchReadBlobsRequest>,
372
2
    ) -> Result<Response<BatchReadBlobsResponse>, Status> {
373
1
        let request = grpc_request.into_inner();
374
1
        let ctx = OriginEventContext::new(|| 
&request0
).await;
375
1
        let resp = make_ctx_for_hash_func(request.digest_function)
376
1
            .err_tip(|| 
"In CasServer::batch_read_blobs"0
)
?0
377
1
            .wrap_async(
378
1
                error_span!("cas_server_batch_read_blobs"),
379
1
                self.inner_batch_read_blobs(request),
380
1
            )
381
1
            .await
382
1
            .err_tip(|| 
"Failed on batch_read_blobs() command"0
)
383
1
            .map_err(Into::into);
384
1
        ctx.emit(|| 
&resp0
).await;
385
1
        resp
386
2
    }
387
388
    #[instrument(
389
        err,
390
        level = Level::ERROR,
391
        skip_all,
392
        fields(request = ?grpc_request.get_ref())
393
    )]
394
    async fn get_tree(
395
        &self,
396
        grpc_request: Request<GetTreeRequest>,
397
12
    ) -> Result<Response<Self::GetTreeStream>, Status> {
398
6
        let request = grpc_request.into_inner();
399
6
        let ctx = OriginEventContext::new(|| 
&request0
).await;
400
6
        let resp = make_ctx_for_hash_func(request.digest_function)
401
6
            .err_tip(|| 
"In CasServer::get_tree"0
)
?0
402
6
            .wrap_async(
403
6
                error_span!("cas_server_get_tree"),
404
6
                self.inner_get_tree(request),
405
6
            )
406
6
            .await
407
6
            .err_tip(|| 
"Failed on get_tree() command"0
)
408
6
            .map(|stream| -> Response<Self::GetTreeStream> {
409
6
                Response::new(ctx.wrap_stream(stream))
410
6
            })
411
6
            .map_err(Into::into);
412
6
        if resp.is_ok() {
  Branch (412:12): [True: 6, False: 0]
  Branch (412:12): [Folded - Ignored]
413
6
            event!(Level::DEBUG, return = "Ok(<stream>)");
414
0
        }
415
6
        ctx.emit(|| 
&resp0
).await;
416
6
        resp
417
12
    }
418
}