/build/source/nativelink-util/src/origin_event.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use std::sync::OnceLock;  | 
16  |  |  | 
17  |  | use base64::Engine;  | 
18  |  | use base64::prelude::BASE64_STANDARD_NO_PAD;  | 
19  |  | use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;  | 
20  |  | use nativelink_proto::com::github::trace_machina::nativelink::events::{ | 
21  |  |     Event, event, request_event, response_event, stream_event,  | 
22  |  | };  | 
23  |  | use prost::Message;  | 
24  |  | use rand::RngCore;  | 
25  |  | use serde::{Deserialize, Deserializer, Serialize, Serializer}; | 
26  |  |  | 
27  |  | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();  | 
28  |  |  | 
29  |  | /// Returns a unique ID for the given event.  | 
30  |  | /// This ID is used to identify the event type.  | 
31  |  | /// The max value that could be output is 0x0FFF,  | 
32  |  | /// meaning you may use the first nibble for other  | 
33  |  | /// purposes.  | 
34  |  | #[inline]  | 
35  | 32  | pub const fn get_id_for_event(event: &Event) -> [u8; 2] { | 
36  | 31  |     match &event.event { | 
37  | 1  |         None => [0x00, 0x00],  | 
38  | 14  |         Some(event::Event::Request(req)) => match req.event13  {  | 
39  | 1  |             None => [0x01, 0x00],  | 
40  | 1  |             Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],  | 
41  | 1  |             Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],  | 
42  | 1  |             Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],  | 
43  | 1  |             Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],  | 
44  | 1  |             Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],  | 
45  | 1  |             Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],  | 
46  | 1  |             Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],  | 
47  | 1  |             Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],  | 
48  | 1  |             Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],  | 
49  | 1  |             Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],  | 
50  | 1  |             Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],  | 
51  | 1  |             Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],  | 
52  | 1  |             Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D],  | 
53  | 0  |             Some(request_event::Event::FetchBlobRequest(_)) => [0x01, 0x0E],  | 
54  | 0  |             Some(request_event::Event::PushBlobRequest(_)) => [0x01, 0x0F],  | 
55  |  |         },  | 
56  | 10  |         Some(event::Event::Response(res)) => match res.event9  {  | 
57  | 1  |             None => [0x02, 0x00],  | 
58  | 1  |             Some(response_event::Event::Error(_)) => [0x02, 0x01],  | 
59  | 1  |             Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],  | 
60  | 1  |             Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],  | 
61  | 1  |             Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],  | 
62  | 1  |             Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],  | 
63  | 1  |             Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],  | 
64  | 1  |             Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],  | 
65  | 1  |             Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],  | 
66  | 1  |             Some(response_event::Event::Empty(())) => [0x02, 0x09],  | 
67  | 0  |             Some(response_event::Event::FetchBlobResponse(_)) => [0x02, 0x0A],  | 
68  | 0  |             Some(response_event::Event::PushBlobResponse(_)) => [0x02, 0x0B],  | 
69  |  |         },  | 
70  | 7  |         Some(event::Event::Stream(stream)) => match stream.event6  {  | 
71  | 1  |             None => [0x03, 0x00],  | 
72  | 1  |             Some(stream_event::Event::Error(_)) => [0x03, 0x01],  | 
73  | 1  |             Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],  | 
74  | 1  |             Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],  | 
75  | 1  |             Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],  | 
76  | 1  |             Some(stream_event::Event::Operation(_)) => [0x03, 0x05],  | 
77  | 1  |             Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated.  | 
78  |  |         },  | 
79  |  |     }  | 
80  | 32  | }  | 
81  |  |  | 
82  |  | /// Returns a unique node ID for this process.  | 
83  | 0  | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { | 
84  | 0  |     let mut node_id = *NODE_ID.get_or_init(|| { | 
85  | 0  |         let mut out = [0; 6];  | 
86  | 0  |         rand::rng().fill_bytes(&mut out);  | 
87  | 0  |         out  | 
88  | 0  |     });  | 
89  | 0  |     let Some(event) = event else {  Branch (89:9): [True: 0, False: 0]
  Branch (89:9): [Folded - Ignored]
  | 
90  | 0  |         return node_id;  | 
91  |  |     };  | 
92  | 0  |     let event_id = get_id_for_event(event);  | 
93  | 0  |     node_id[0] = (node_id[0] & 0xF0) | event_id[0];  | 
94  | 0  |     node_id[1] = event_id[1];  | 
95  | 0  |     node_id  | 
96  | 0  | }  | 
97  |  |  | 
98  | 0  | fn serialize_request_metadata<S>(  | 
99  | 0  |     value: &Option<RequestMetadata>,  | 
100  | 0  |     serializer: S,  | 
101  | 0  | ) -> Result<S::Ok, S::Error>  | 
102  | 0  | where  | 
103  | 0  |     S: Serializer,  | 
104  |  | { | 
105  | 0  |     match value { | 
106  | 0  |         Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())),  | 
107  | 0  |         None => serializer.serialize_none(),  | 
108  |  |     }  | 
109  | 0  | }  | 
110  |  |  | 
111  | 0  | fn deserialize_request_metadata<'de, D>(  | 
112  | 0  |     deserializer: D,  | 
113  | 0  | ) -> Result<Option<RequestMetadata>, D::Error>  | 
114  | 0  | where  | 
115  | 0  |     D: Deserializer<'de>,  | 
116  |  | { | 
117  | 0  |     let opt = Option::<String>::deserialize(deserializer)?;  | 
118  | 0  |     match opt { | 
119  | 0  |         Some(s) => { | 
120  | 0  |             let decoded = BASE64_STANDARD_NO_PAD  | 
121  | 0  |                 .decode(s.as_bytes())  | 
122  | 0  |                 .map_err(serde::de::Error::custom)?;  | 
123  | 0  |             RequestMetadata::decode(&*decoded)  | 
124  | 0  |                 .map_err(serde::de::Error::custom)  | 
125  | 0  |                 .map(Some)  | 
126  |  |         }  | 
127  | 0  |         None => Ok(None),  | 
128  |  |     }  | 
129  | 0  | }  | 
130  |  |  | 
131  |  | #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]  | 
132  |  | pub struct OriginMetadata { | 
133  |  |     pub identity: String,  | 
134  |  |     #[serde(  | 
135  |  |         serialize_with = "serialize_request_metadata",  | 
136  |  |         deserialize_with = "deserialize_request_metadata"  | 
137  |  |     )]  | 
138  |  |     pub bazel_metadata: Option<RequestMetadata>,  | 
139  |  | }  |