Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::OnceLock;
16
17
use base64::Engine;
18
use base64::prelude::BASE64_STANDARD_NO_PAD;
19
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
20
use nativelink_proto::com::github::trace_machina::nativelink::events::{
21
    Event, event, request_event, response_event, stream_event,
22
};
23
use prost::Message;
24
use rand::RngCore;
25
use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27
static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();
28
29
/// Returns a unique ID for the given event.
30
/// This ID is used to identify the event type.
31
/// The max value that could be output is 0x0FFF,
32
/// meaning you may use the first nibble for other
33
/// purposes.
34
#[inline]
35
32
pub const fn get_id_for_event(event: &Event) -> [u8; 2] {
36
31
    match &event.event {
37
1
        None => [0x00, 0x00],
38
14
        Some(event::Event::Request(req)) => match 
req.event13
{
39
1
            None => [0x01, 0x00],
40
1
            Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],
41
1
            Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],
42
1
            Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],
43
1
            Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],
44
1
            Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],
45
1
            Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],
46
1
            Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],
47
1
            Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],
48
1
            Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],
49
1
            Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],
50
1
            Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],
51
1
            Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],
52
1
            Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D],
53
0
            Some(request_event::Event::FetchBlobRequest(_)) => [0x01, 0x0E],
54
0
            Some(request_event::Event::PushBlobRequest(_)) => [0x01, 0x0F],
55
        },
56
10
        Some(event::Event::Response(res)) => match 
res.event9
{
57
1
            None => [0x02, 0x00],
58
1
            Some(response_event::Event::Error(_)) => [0x02, 0x01],
59
1
            Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],
60
1
            Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],
61
1
            Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],
62
1
            Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],
63
1
            Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],
64
1
            Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],
65
1
            Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],
66
1
            Some(response_event::Event::Empty(())) => [0x02, 0x09],
67
0
            Some(response_event::Event::FetchBlobResponse(_)) => [0x02, 0x0A],
68
0
            Some(response_event::Event::PushBlobResponse(_)) => [0x02, 0x0B],
69
        },
70
7
        Some(event::Event::Stream(stream)) => match 
stream.event6
{
71
1
            None => [0x03, 0x00],
72
1
            Some(stream_event::Event::Error(_)) => [0x03, 0x01],
73
1
            Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],
74
1
            Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],
75
1
            Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],
76
1
            Some(stream_event::Event::Operation(_)) => [0x03, 0x05],
77
1
            Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated.
78
        },
79
    }
80
32
}
81
82
/// Returns a unique node ID for this process.
83
0
pub fn get_node_id(event: Option<&Event>) -> [u8; 6] {
84
0
    let mut node_id = *NODE_ID.get_or_init(|| {
85
0
        let mut out = [0; 6];
86
0
        rand::rng().fill_bytes(&mut out);
87
0
        out
88
0
    });
89
0
    let Some(event) = event else {
  Branch (89:9): [True: 0, False: 0]
  Branch (89:9): [Folded - Ignored]
90
0
        return node_id;
91
    };
92
0
    let event_id = get_id_for_event(event);
93
0
    node_id[0] = (node_id[0] & 0xF0) | event_id[0];
94
0
    node_id[1] = event_id[1];
95
0
    node_id
96
0
}
97
98
0
fn serialize_request_metadata<S>(
99
0
    value: &Option<RequestMetadata>,
100
0
    serializer: S,
101
0
) -> Result<S::Ok, S::Error>
102
0
where
103
0
    S: Serializer,
104
{
105
0
    match value {
106
0
        Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())),
107
0
        None => serializer.serialize_none(),
108
    }
109
0
}
110
111
0
fn deserialize_request_metadata<'de, D>(
112
0
    deserializer: D,
113
0
) -> Result<Option<RequestMetadata>, D::Error>
114
0
where
115
0
    D: Deserializer<'de>,
116
{
117
0
    let opt = Option::<String>::deserialize(deserializer)?;
118
0
    match opt {
119
0
        Some(s) => {
120
0
            let decoded = BASE64_STANDARD_NO_PAD
121
0
                .decode(s.as_bytes())
122
0
                .map_err(serde::de::Error::custom)?;
123
0
            RequestMetadata::decode(&*decoded)
124
0
                .map_err(serde::de::Error::custom)
125
0
                .map(Some)
126
        }
127
0
        None => Ok(None),
128
    }
129
0
}
130
131
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
132
pub struct OriginMetadata {
133
    pub identity: String,
134
    #[serde(
135
        serialize_with = "serialize_request_metadata",
136
        deserialize_with = "deserialize_request_metadata"
137
    )]
138
    pub bazel_metadata: Option<RequestMetadata>,
139
}