/build/source/nativelink-util/src/origin_event.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::OnceLock; |
16 | | |
17 | | use base64::Engine; |
18 | | use base64::prelude::BASE64_STANDARD_NO_PAD; |
19 | | use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata; |
20 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{ |
21 | | Event, event, request_event, response_event, stream_event, |
22 | | }; |
23 | | use prost::Message; |
24 | | use rand::RngCore; |
25 | | use serde::{Deserialize, Deserializer, Serialize, Serializer}; |
26 | | |
27 | | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); |
28 | | |
29 | | /// Returns a unique ID for the given event. |
30 | | /// This ID is used to identify the event type. |
31 | | /// The max value that could be output is 0x0FFF, |
32 | | /// meaning you may use the first nibble for other |
33 | | /// purposes. |
34 | | #[inline] |
35 | 32 | pub const fn get_id_for_event(event: &Event) -> [u8; 2] { |
36 | 31 | match &event.event { |
37 | 1 | None => [0x00, 0x00], |
38 | 14 | Some(event::Event::Request(req)) => match req.event13 { |
39 | 1 | None => [0x01, 0x00], |
40 | 1 | Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], |
41 | 1 | Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], |
42 | 1 | Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], |
43 | 1 | Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], |
44 | 1 | Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], |
45 | 1 | Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], |
46 | 1 | Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], |
47 | 1 | Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], |
48 | 1 | Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], |
49 | 1 | Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], |
50 | 1 | Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], |
51 | 1 | Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], |
52 | 1 | Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D], |
53 | 0 | Some(request_event::Event::FetchBlobRequest(_)) => [0x01, 0x0E], |
54 | 0 | Some(request_event::Event::PushBlobRequest(_)) => [0x01, 0x0F], |
55 | | }, |
56 | 10 | Some(event::Event::Response(res)) => match res.event9 { |
57 | 1 | None => [0x02, 0x00], |
58 | 1 | Some(response_event::Event::Error(_)) => [0x02, 0x01], |
59 | 1 | Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], |
60 | 1 | Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], |
61 | 1 | Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], |
62 | 1 | Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], |
63 | 1 | Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], |
64 | 1 | Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], |
65 | 1 | Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], |
66 | 1 | Some(response_event::Event::Empty(())) => [0x02, 0x09], |
67 | 0 | Some(response_event::Event::FetchBlobResponse(_)) => [0x02, 0x0A], |
68 | 0 | Some(response_event::Event::PushBlobResponse(_)) => [0x02, 0x0B], |
69 | | }, |
70 | 7 | Some(event::Event::Stream(stream)) => match stream.event6 { |
71 | 1 | None => [0x03, 0x00], |
72 | 1 | Some(stream_event::Event::Error(_)) => [0x03, 0x01], |
73 | 1 | Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], |
74 | 1 | Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], |
75 | 1 | Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], |
76 | 1 | Some(stream_event::Event::Operation(_)) => [0x03, 0x05], |
77 | 1 | Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated. |
78 | | }, |
79 | | } |
80 | 32 | } |
81 | | |
82 | | /// Returns a unique node ID for this process. |
83 | 0 | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { |
84 | 0 | let mut node_id = *NODE_ID.get_or_init(|| { |
85 | 0 | let mut out = [0; 6]; |
86 | 0 | rand::rng().fill_bytes(&mut out); |
87 | 0 | out |
88 | 0 | }); |
89 | 0 | let Some(event) = event else { Branch (89:9): [True: 0, False: 0]
Branch (89:9): [Folded - Ignored]
|
90 | 0 | return node_id; |
91 | | }; |
92 | 0 | let event_id = get_id_for_event(event); |
93 | 0 | node_id[0] = (node_id[0] & 0xF0) | event_id[0]; |
94 | 0 | node_id[1] = event_id[1]; |
95 | 0 | node_id |
96 | 0 | } |
97 | | |
98 | 0 | fn serialize_request_metadata<S>( |
99 | 0 | value: &Option<RequestMetadata>, |
100 | 0 | serializer: S, |
101 | 0 | ) -> Result<S::Ok, S::Error> |
102 | 0 | where |
103 | 0 | S: Serializer, |
104 | | { |
105 | 0 | match value { |
106 | 0 | Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())), |
107 | 0 | None => serializer.serialize_none(), |
108 | | } |
109 | 0 | } |
110 | | |
111 | 0 | fn deserialize_request_metadata<'de, D>( |
112 | 0 | deserializer: D, |
113 | 0 | ) -> Result<Option<RequestMetadata>, D::Error> |
114 | 0 | where |
115 | 0 | D: Deserializer<'de>, |
116 | | { |
117 | 0 | let opt = Option::<String>::deserialize(deserializer)?; |
118 | 0 | match opt { |
119 | 0 | Some(s) => { |
120 | 0 | let decoded = BASE64_STANDARD_NO_PAD |
121 | 0 | .decode(s.as_bytes()) |
122 | 0 | .map_err(serde::de::Error::custom)?; |
123 | 0 | RequestMetadata::decode(&*decoded) |
124 | 0 | .map_err(serde::de::Error::custom) |
125 | 0 | .map(Some) |
126 | | } |
127 | 0 | None => Ok(None), |
128 | | } |
129 | 0 | } |
130 | | |
131 | | #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] |
132 | | pub struct OriginMetadata { |
133 | | pub identity: String, |
134 | | #[serde( |
135 | | serialize_with = "serialize_request_metadata", |
136 | | deserialize_with = "deserialize_request_metadata" |
137 | | )] |
138 | | pub bazel_metadata: Option<RequestMetadata>, |
139 | | } |