Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-store/src/ac_utils.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
16
// TODO(aaronmondal): IMPORTANT TODO: IMPORTING THIS SOMETMIES BREAKS
17
//                    THREADSAFETY. FIGURE OUT WHY AND MOVE IT TO UTILS.
18
// @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
19
20
use std::pin::Pin;
21
22
use bytes::BytesMut;
23
use futures::TryFutureExt;
24
use nativelink_error::{Code, Error, ResultExt};
25
use nativelink_util::common::DigestInfo;
26
use nativelink_util::digest_hasher::DigestHasher;
27
use nativelink_util::store_trait::{StoreKey, StoreLike};
28
use prost::Message;
29
30
// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
31
// 1.2k. Giving a bit more just in case to reduce allocs.
32
pub const ESTIMATED_DIGEST_SIZE: usize = 2048;
33
34
/// This is more of a safety check. We are going to collect this entire message
35
/// into memory. If we don't bound the max size of the object we enable users
36
/// to use up all the memory on this machine.
37
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.
38
39
/// Attempts to fetch the digest contents from a store into the associated proto.
40
72
pub async fn get_and_decode_digest<T: Message + Default + 'static>(
41
72
    store: &impl StoreLike,
42
72
    key: StoreKey<'_>,
43
72
) -> Result<T, Error> {
44
72
    get_size_and_decode_digest(store, key)
45
72
        .map_ok(|(v, _)| 
v70
)
46
285
        .await
47
72
}
48
49
/// Attempts to fetch the digest contents from a store into the associated proto.
50
80
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
51
80
    store: &impl StoreLike,
52
80
    key: impl Into<StoreKey<'_>>,
53
80
) -> Result<(T, u64), Error> {
54
80
    let key = key.into();
55
    // Note: For unknown reasons we appear to be hitting:
56
    // https://github.com/rust-lang/rust/issues/92096
57
    // or a smiliar issue if we try to use the non-store driver function, so we
58
    // are using the store driver function here.
59
80
    let mut store_data_resp = store
60
80
        .as_store_driver_pin()
61
80
        .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64))
62
293
        .await;
63
80
    if let Err(
err2
) = &mut store_data_resp {
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 2, False: 1]
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 0, False: 26]
  Branch (63:12): [True: 0, False: 8]
  Branch (63:12): [True: 0, False: 8]
  Branch (63:12): [True: 0, False: 16]
  Branch (63:12): [Folded - Ignored]
  Branch (63:12): [True: 0, False: 2]
  Branch (63:12): [True: 0, False: 3]
  Branch (63:12): [True: 0, False: 0]
  Branch (63:12): [True: 0, False: 7]
  Branch (63:12): [True: 0, False: 7]
64
2
        if err.code == Code::NotFound {
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 2, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [Folded - Ignored]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
  Branch (64:12): [True: 0, False: 0]
65
2
            // Trim the error code. Not Found is quite common and we don't want to send a large
66
2
            // error (debug) message for something that is common. We resize to just the last
67
2
            // message as it will be the most relevant.
68
2
            err.messages.resize_with(1, String::new);
69
2
        }
0
70
78
    }
71
80
    let 
store_data78
= store_data_resp
?2
;
72
78
    let store_data_len =
73
78
        u64::try_from(store_data.len()).err_tip(|| 
"Could not convert store_data.len() to u64"0
)
?0
;
74
75
78
    T::decode(store_data)
76
78
        .err_tip_with_code(|e| {
77
0
            (
78
0
                Code::NotFound,
79
0
                format!("Stored value appears to be corrupt: {e} - {key:?}"),
80
0
            )
81
78
        })
82
78
        .map(|v| (v, store_data_len))
83
80
}
84
85
/// Computes the digest of a message.
86
64
pub fn message_to_digest(
87
64
    message: &impl Message,
88
64
    mut buf: &mut BytesMut,
89
64
    hasher: &mut impl DigestHasher,
90
64
) -> Result<DigestInfo, Error> {
91
64
    message
92
64
        .encode(&mut buf)
93
64
        .err_tip(|| 
"Could not encode directory proto"0
)
?0
;
94
64
    hasher.update(buf);
95
64
    Ok(hasher.finalize_digest())
96
64
}
97
98
/// Takes a proto message and will serialize it and upload it to the provided store.
99
64
pub async fn serialize_and_upload_message<'a, T: Message>(
100
64
    message: &'a T,
101
64
    cas_store: Pin<&'a impl StoreLike>,
102
64
    hasher: &mut impl DigestHasher,
103
64
) -> Result<DigestInfo, Error> {
104
64
    let mut buffer = BytesMut::with_capacity(message.encoded_len());
105
64
    let digest = message_to_digest(message, &mut buffer, hasher)
106
64
        .err_tip(|| 
"In serialize_and_upload_message"0
)
?0
;
107
    // Note: For unknown reasons we appear to be hitting:
108
    // https://github.com/rust-lang/rust/issues/92096
109
    // or a smiliar issue if we try to use the non-store driver function, so we
110
    // are using the store driver function here.
111
64
    cas_store
112
64
        .as_store_driver_pin()
113
64
        .update_oneshot(digest.into(), buffer.freeze())
114
102
        .await
115
64
        .err_tip(|| 
"In serialize_and_upload_message"0
)
?0
;
116
64
    Ok(digest)
117
64
}
118
119
/// Computes a digest of a given buffer.
120
10
pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestInfo {
121
10
    hasher.update(buf);
122
10
    hasher.finalize_digest()
123
10
}