Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ac_utils.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
16
// TODO(palfrey): IMPORTANT TODO: IMPORTING THIS SOMETIMES BREAKS
17
//                    THREADSAFETY. FIGURE OUT WHY AND MOVE IT TO UTILS.
18
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
19
20
use core::pin::Pin;
21
22
use bytes::BytesMut;
23
use futures::TryFutureExt;
24
use nativelink_error::{Code, Error, ResultExt};
25
use nativelink_util::common::DigestInfo;
26
use nativelink_util::digest_hasher::DigestHasher;
27
use nativelink_util::store_trait::{StoreKey, StoreLike};
28
use prost::Message;
29
30
// NOTE(aaronmondal) From some local testing it looks like action cache items are rarely greater than
31
// 1.2k. Giving a bit more just in case to reduce allocs.
32
pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
33
34
/// This is more of a safety check. We are going to collect this entire message
35
/// into memory. If we don't bound the max size of the object we enable users
36
/// to use up all the memory on this machine.
37
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.
38
39
/// Attempts to fetch the digest contents from a store into the associated proto.
40
139
pub async fn get_and_decode_digest<T: Message + Default + 'static>(
41
139
    store: &impl StoreLike,
42
139
    key: StoreKey<'_>,
43
139
) -> Result<T, Error> {
44
139
    get_size_and_decode_digest(store, key)
45
139
        .map_ok(|(v, _)| v)
46
139
        .await
47
139
}
48
49
/// Attempts to fetch the digest contents from a store into the associated proto.
50
147
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
51
147
    store: &impl StoreLike,
52
147
    key: impl Into<StoreKey<'_>>,
53
147
) -> Result<(T, u64), Error> {
54
147
    let key = key.into();
55
    // Note: For unknown reasons we appear to be hitting:
56
    // https://github.com/rust-lang/rust/issues/92096
57
    // or a smiliar issue if we try to use the non-store driver function, so we
58
    // are using the store driver function here.
59
147
    let mut store_data_resp = store
60
147
        .as_store_driver_pin()
61
147
        .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64))
62
147
        .await;
63
147
    if let Err(
err4
) = &mut store_data_resp
64
4
        && err.code == Code::NotFound
65
4
    {
66
4
        // Trim the error code. Not Found is quite common and we don't want to send a large
67
4
        // error (debug) message for something that is common. We resize to just the last
68
4
        // message as it will be the most relevant.
69
4
        err.messages.resize_with(1, String::new);
70
143
    }
71
147
    let 
store_data143
= store_data_resp
?4
;
72
143
    let store_data_len =
73
143
        u64::try_from(store_data.len()).err_tip(|| "Could not convert store_data.len() to u64")
?0
;
74
75
143
    T::decode(store_data)
76
143
        .err_tip_with_code(|_e| 
{0
77
0
            (
78
0
                Code::NotFound,
79
0
                format!("Stored value appears to be corrupt for {key:?}"),
80
0
            )
81
0
        })
82
143
        .map(|v| (v, store_data_len))
83
147
}
84
85
/// Computes the digest of a message.
86
122
pub fn message_to_digest(
87
122
    message: &impl Message,
88
122
    mut buf: &mut BytesMut,
89
122
    hasher: &mut impl DigestHasher,
90
122
) -> Result<DigestInfo, Error> {
91
122
    message
92
122
        .encode(&mut buf)
93
122
        .err_tip(|| "Could not encode directory proto")
?0
;
94
122
    hasher.update(buf);
95
122
    Ok(hasher.finalize_digest())
96
122
}
97
98
/// Takes a proto message and will serialize it and upload it to the provided store.
99
122
pub async fn serialize_and_upload_message<'a, T: Message>(
100
122
    message: &'a T,
101
122
    cas_store: Pin<&'a impl StoreLike>,
102
122
    hasher: &mut impl DigestHasher,
103
122
) -> Result<DigestInfo, Error> {
104
122
    let mut buffer = BytesMut::with_capacity(message.encoded_len());
105
122
    let digest = message_to_digest(message, &mut buffer, hasher)
106
122
        .err_tip(|| "In serialize_and_upload_message")
?0
;
107
    // Note: For unknown reasons we appear to be hitting:
108
    // https://github.com/rust-lang/rust/issues/92096
109
    // or a smiliar issue if we try to use the non-store driver function, so we
110
    // are using the store driver function here.
111
122
    cas_store
112
122
        .as_store_driver_pin()
113
122
        .update_oneshot(digest.into(), buffer.freeze())
114
122
        .await
115
122
        .err_tip(|| "In serialize_and_upload_message")
?0
;
116
122
    Ok(digest)
117
122
}
118
119
/// Computes a digest of a given buffer.
120
33
pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestInfo {
121
33
    hasher.update(buf);
122
33
    hasher.finalize_digest()
123
33
}