Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ac_utils.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
16
// TODO(aaronmondal): IMPORTANT TODO: IMPORTING THIS SOMETMIES BREAKS
17
//                    THREADSAFETY. FIGURE OUT WHY AND MOVE IT TO UTILS.
18
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
19
20
use std::pin::Pin;
21
22
use bytes::BytesMut;
23
use futures::TryFutureExt;
24
use nativelink_error::{Code, Error, ResultExt};
25
use nativelink_util::common::DigestInfo;
26
use nativelink_util::digest_hasher::DigestHasher;
27
use nativelink_util::store_trait::{StoreKey, StoreLike};
28
use prost::Message;
29
30
// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
31
// 1.2k. Giving a bit more just in case to reduce allocs.
32
pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
33
34
/// This is more of a safety check. We are going to collect this entire message
35
/// into memory. If we don't bound the max size of the object we enable users
36
/// to use up all the memory on this machine.
37
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.
38
39
/// Attempts to fetch the digest contents from a store into the associated proto.
40
99
pub async fn get_and_decode_digest<T: Message + Default + 'static>(
41
99
    store: &impl StoreLike,
42
99
    key: StoreKey<'_>,
43
99
) -> Result<T, Error> {
44
99
    get_size_and_decode_digest(store, key)
45
99
        .map_ok(|(v, _)| 
v97
)
46
99
        .await
47
99
}
48
49
/// Attempts to fetch the digest contents from a store into the associated proto.
50
107
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
51
107
    store: &impl StoreLike,
52
107
    key: impl Into<StoreKey<'_>>,
53
107
) -> Result<(T, u64), Error> {
54
107
    let key = key.into();
55
    // Note: For unknown reasons we appear to be hitting:
56
    // https://github.com/rust-lang/rust/issues/92096
57
    // or a smiliar issue if we try to use the non-store driver function, so we
58
    // are using the store driver function here.
59
107
    let mut store_data_resp = store
60
107
        .as_store_driver_pin()
61
107
        .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64))
62
107
        .await;
63
107
    if let Err(
err2
) = &mut store_data_resp {
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 2, False: 1]
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 0, False: 26]
  Branch (63:12): [True: 0, False: 8]
  Branch (63:12): [True: 0, False: 8]
  Branch (63:12): [True: 0, False: 25]
  Branch (63:12): [Folded - Ignored]
  Branch (63:12): [True: 0, False: 2]
  Branch (63:12): [True: 0, False: 3]
  Branch (63:12): [True: 0, False: 1]
  Branch (63:12): [True: 0, False: 16]
  Branch (63:12): [True: 0, False: 15]
64
2
        if err.code == Code::NotFound {
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 2, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [Folded - Ignored]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
65
2
            // Trim the error code. Not Found is quite common and we don't want to send a large
66
2
            // error (debug) message for something that is common. We resize to just the last
67
2
            // message as it will be the most relevant.
68
2
            err.messages.resize_with(1, String::new);
69
2
        
}0
70
105
    }
71
107
    let 
store_data105
= store_data_resp
?2
;
72
105
    let store_data_len =
73
105
        u64::try_from(store_data.len()).err_tip(|| 
"Could not convert store_data.len() to u64"0
)
?0
;
74
75
105
    T::decode(store_data)
76
105
        .err_tip_with_code(|e| {
77
0
            (
78
0
                Code::NotFound,
79
0
                format!("Stored value appears to be corrupt: {e} - {key:?}"),
80
0
            )
81
105
        })
82
105
        .map(|v| (v, store_data_len))
83
107
}
84
85
/// Computes the digest of a message.
86
91
pub fn message_to_digest(
87
91
    message: &impl Message,
88
91
    mut buf: &mut BytesMut,
89
91
    hasher: &mut impl DigestHasher,
90
91
) -> Result<DigestInfo, Error> {
91
91
    message
92
91
        .encode(&mut buf)
93
91
        .err_tip(|| 
"Could not encode directory proto"0
)
?0
;
94
91
    hasher.update(buf);
95
91
    Ok(hasher.finalize_digest())
96
91
}
97
98
/// Takes a proto message and will serialize it and upload it to the provided store.
99
91
pub async fn serialize_and_upload_message<'a, T: Message>(
100
91
    message: &'a T,
101
91
    cas_store: Pin<&'a impl StoreLike>,
102
91
    hasher: &mut impl DigestHasher,
103
91
) -> Result<DigestInfo, Error> {
104
91
    let mut buffer = BytesMut::with_capacity(message.encoded_len());
105
91
    let digest = message_to_digest(message, &mut buffer, hasher)
106
91
        .err_tip(|| 
"In serialize_and_upload_message"0
)
?0
;
107
    // Note: For unknown reasons we appear to be hitting:
108
    // https://github.com/rust-lang/rust/issues/92096
109
    // or a smiliar issue if we try to use the non-store driver function, so we
110
    // are using the store driver function here.
111
91
    cas_store
112
91
        .as_store_driver_pin()
113
91
        .update_oneshot(digest.into(), buffer.freeze())
114
91
        .await
115
91
        .err_tip(|| 
"In serialize_and_upload_message"0
)
?0
;
116
91
    Ok(digest)
117
91
}
118
119
/// Computes a digest of a given buffer.
120
22
pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestInfo {
121
22
    hasher.update(buf);
122
22
    hasher.finalize_digest()
123
22
}