/build/source/nativelink-util/src/origin_event.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::marker::PhantomData; |
16 | | use std::pin::Pin; |
17 | | use std::sync::{Arc, OnceLock}; |
18 | | |
19 | | use base64::Engine; |
20 | | use base64::prelude::BASE64_STANDARD_NO_PAD; |
21 | | use futures::future::ready; |
22 | | use futures::task::{Context, Poll}; |
23 | | use futures::{Future, FutureExt, Stream, StreamExt}; |
24 | | use nativelink_proto::build::bazel::remote::asset::v1::{ |
25 | | FetchBlobRequest, FetchBlobResponse, PushBlobRequest, PushBlobResponse, |
26 | | }; |
27 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
28 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
29 | | BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse, |
30 | | GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse, |
31 | | RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest, |
32 | | }; |
33 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{ |
34 | | BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride, Event, OriginEvent, |
35 | | RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride, |
36 | | batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event, |
37 | | response_event, stream_event, |
38 | | }; |
39 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::StartExecute; |
40 | | use nativelink_proto::google::bytestream::{ |
41 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
42 | | WriteResponse, |
43 | | }; |
44 | | use nativelink_proto::google::longrunning::Operation; |
45 | | use nativelink_proto::google::rpc::Status; |
46 | | use pin_project_lite::pin_project; |
47 | | use prost::Message; |
48 | | use rand::RngCore; |
49 | | use serde::{Deserialize, Deserializer, Serialize, Serializer}; |
50 | | use tokio::sync::mpsc; |
51 | | use tokio::sync::mpsc::error::TrySendError; |
52 | | use tonic::{Response, Status as TonicStatus, Streaming}; |
53 | | use uuid::Uuid; |
54 | | |
55 | | use crate::origin_context::ActiveOriginContext; |
56 | | use crate::{background_spawn, unsafe_make_symbol}; |
57 | | |
58 | | const ORIGIN_EVENT_VERSION: u32 = 0; |
59 | | |
60 | | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); |
61 | | |
62 | | /// Returns a unique ID for the given event. |
63 | | /// This ID is used to identify the event type. |
64 | | /// The max value that could be output is 0x0FFF, |
65 | | /// meaning you may use the first nibble for other |
66 | | /// purposes. |
67 | | #[inline] |
68 | 32 | pub const fn get_id_for_event(event: &Event) -> [u8; 2] { |
69 | 31 | match &event.event { |
70 | 1 | None => [0x00, 0x00], |
71 | 14 | Some(event::Event::Request(req)) => match req.event13 { |
72 | 1 | None => [0x01, 0x00], |
73 | 1 | Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], |
74 | 1 | Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], |
75 | 1 | Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], |
76 | 1 | Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], |
77 | 1 | Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], |
78 | 1 | Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], |
79 | 1 | Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], |
80 | 1 | Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], |
81 | 1 | Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], |
82 | 1 | Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], |
83 | 1 | Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], |
84 | 1 | Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], |
85 | 1 | Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D], |
86 | 0 | Some(request_event::Event::FetchBlobRequest(_)) => [0x01, 0x0E], |
87 | 0 | Some(request_event::Event::PushBlobRequest(_)) => [0x01, 0x0F], |
88 | | }, |
89 | 10 | Some(event::Event::Response(res)) => match res.event9 { |
90 | 1 | None => [0x02, 0x00], |
91 | 1 | Some(response_event::Event::Error(_)) => [0x02, 0x01], |
92 | 1 | Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], |
93 | 1 | Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], |
94 | 1 | Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], |
95 | 1 | Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], |
96 | 1 | Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], |
97 | 1 | Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], |
98 | 1 | Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], |
99 | 1 | Some(response_event::Event::Empty(())) => [0x02, 0x09], |
100 | 0 | Some(response_event::Event::FetchBlobResponse(_)) => [0x02, 0x0A], |
101 | 0 | Some(response_event::Event::PushBlobResponse(_)) => [0x02, 0x0B], |
102 | | }, |
103 | 7 | Some(event::Event::Stream(stream)) => match stream.event6 { |
104 | 1 | None => [0x03, 0x00], |
105 | 1 | Some(stream_event::Event::Error(_)) => [0x03, 0x01], |
106 | 1 | Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], |
107 | 1 | Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], |
108 | 1 | Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], |
109 | 1 | Some(stream_event::Event::Operation(_)) => [0x03, 0x05], |
110 | 1 | Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated. |
111 | | }, |
112 | | } |
113 | 32 | } |
114 | | |
115 | | /// Returns a unique node ID for this process. |
116 | 0 | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { |
117 | 0 | let mut node_id = *NODE_ID.get_or_init(|| { |
118 | 0 | let mut out = [0; 6]; |
119 | 0 | rand::rng().fill_bytes(&mut out); |
120 | 0 | out |
121 | 0 | }); |
122 | 0 | let Some(event) = event else { Branch (122:9): [True: 0, False: 0]
Branch (122:9): [Folded - Ignored]
|
123 | 0 | return node_id; |
124 | | }; |
125 | 0 | let event_id = get_id_for_event(event); |
126 | 0 | node_id[0] = (node_id[0] & 0xF0) | event_id[0]; |
127 | 0 | node_id[1] = event_id[1]; |
128 | 0 | node_id |
129 | 0 | } |
130 | | |
131 | 0 | fn serialize_request_metadata<S>( |
132 | 0 | value: &Option<RequestMetadata>, |
133 | 0 | serializer: S, |
134 | 0 | ) -> Result<S::Ok, S::Error> |
135 | 0 | where |
136 | 0 | S: Serializer, |
137 | 0 | { |
138 | 0 | match value { |
139 | 0 | Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())), |
140 | 0 | None => serializer.serialize_none(), |
141 | | } |
142 | 0 | } |
143 | | |
144 | 0 | fn deserialize_request_metadata<'de, D>( |
145 | 0 | deserializer: D, |
146 | 0 | ) -> Result<Option<RequestMetadata>, D::Error> |
147 | 0 | where |
148 | 0 | D: Deserializer<'de>, |
149 | 0 | { |
150 | 0 | let opt = Option::<String>::deserialize(deserializer)?; |
151 | 0 | match opt { |
152 | 0 | Some(s) => { |
153 | 0 | let decoded = BASE64_STANDARD_NO_PAD |
154 | 0 | .decode(s.as_bytes()) |
155 | 0 | .map_err(serde::de::Error::custom)?; |
156 | 0 | RequestMetadata::decode(&*decoded) |
157 | 0 | .map_err(serde::de::Error::custom) |
158 | 0 | .map(Some) |
159 | | } |
160 | 0 | None => Ok(None), |
161 | | } |
162 | 0 | } |
163 | | |
164 | 0 | #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] |
165 | | pub struct OriginMetadata { |
166 | | pub identity: String, |
167 | | #[serde( |
168 | | serialize_with = "serialize_request_metadata", |
169 | | deserialize_with = "deserialize_request_metadata" |
170 | | )] |
171 | | pub bazel_metadata: Option<RequestMetadata>, |
172 | | } |
173 | | |
174 | | #[derive(Debug)] |
175 | | pub struct OriginEventCollector { |
176 | | sender: mpsc::Sender<OriginEvent>, |
177 | | pub metadata: OriginMetadata, |
178 | | } |
179 | | |
180 | | impl OriginEventCollector { |
181 | 0 | pub const fn new(sender: mpsc::Sender<OriginEvent>, metadata: OriginMetadata) -> Self { |
182 | 0 | Self { sender, metadata } |
183 | 0 | } |
184 | | |
185 | | #[must_use] |
186 | 0 | pub fn clone_with_metadata(&self, metadata: OriginMetadata) -> Self { |
187 | 0 | Self { |
188 | 0 | sender: self.sender.clone(), |
189 | 0 | metadata, |
190 | 0 | } |
191 | 0 | } |
192 | | |
193 | | /// Publishes an event to the origin event collector. |
194 | 0 | async fn publish_origin_event( |
195 | 0 | &self, |
196 | 0 | event: Event, |
197 | 0 | parent_event_id: Option<Uuid>, |
198 | 0 | maybe_event_id: Option<Uuid>, |
199 | 0 | ) -> Uuid { |
200 | 0 | let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event)))); |
201 | 0 | let parent_event_id = |
202 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
203 | | // Ignore cases when channel is dropped. |
204 | 0 | drop( |
205 | 0 | self.sender |
206 | 0 | .send(OriginEvent { |
207 | 0 | version: ORIGIN_EVENT_VERSION, |
208 | 0 | event_id: event_id.as_hyphenated().to_string(), |
209 | 0 | parent_event_id, |
210 | 0 | bazel_request_metadata: self.metadata.bazel_metadata.clone(), |
211 | 0 | identity: self.metadata.identity.clone(), |
212 | 0 | event: Some(event), |
213 | 0 | }) |
214 | 0 | .await, |
215 | | ); |
216 | 0 | event_id |
217 | 0 | } |
218 | | |
219 | | /// Publishes an event to the origin event collector. |
220 | | /// If the buffer is full, the event will be sent in a background spawn. |
221 | | /// This is useful for cases where the event is critical and must be sent, |
222 | | /// but cannot await the send operation. |
223 | 0 | fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) { |
224 | 0 | let event_id = Uuid::now_v6(&get_node_id(Some(&event))); |
225 | 0 | let parent_event_id = |
226 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
227 | 0 | self.sender |
228 | 0 | .try_send(OriginEvent { |
229 | 0 | version: ORIGIN_EVENT_VERSION, |
230 | 0 | event_id: event_id.as_hyphenated().to_string(), |
231 | 0 | parent_event_id, |
232 | 0 | bazel_request_metadata: self.metadata.bazel_metadata.clone(), |
233 | 0 | identity: self.metadata.identity.clone(), |
234 | 0 | event: Some(event), |
235 | 0 | }) |
236 | 0 | .map_or_else( |
237 | 0 | |e| match e { |
238 | 0 | TrySendError::Full(event) => { |
239 | 0 | let sender = self.sender.clone(); |
240 | 0 | background_spawn!("send_end_stream_origin_event", async move { |
241 | | // Ignore cases when channel is dropped. |
242 | 0 | drop(sender.send(event).await); |
243 | 0 | }); |
244 | | } |
245 | | // Ignore cases when channel is dropped. |
246 | 0 | TrySendError::Closed(_) => {} |
247 | 0 | }, |
248 | 0 | |()| {}, |
249 | | ); |
250 | 0 | } |
251 | | } |
252 | | |
253 | | pin_project! { |
254 | | struct CloseEventStream<S> { |
255 | | #[pin] |
256 | | inner: S, |
257 | | ctx_impl: OriginEventContextImpl, |
258 | | } |
259 | | |
260 | | impl <S> PinnedDrop for CloseEventStream<S> { |
261 | | #[inline] |
262 | | fn drop(this: Pin<&mut Self>) { |
263 | | let event = Event { |
264 | | event: Some(event::Event::Stream(StreamEvent { |
265 | | event: Some(stream_event::Event::Closed(())) |
266 | | })), |
267 | | }; |
268 | | // Try to send the event immediately, if we cannot because |
269 | | // the buffer is full, do it in a background spawn. |
270 | | this.ctx_impl.origin_event_collector |
271 | | .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id)); |
272 | | } |
273 | | } |
274 | | } |
275 | | |
276 | | impl<T, S> Stream for CloseEventStream<S> |
277 | | where |
278 | | S: Stream<Item = T> + Send, |
279 | | { |
280 | | type Item = S::Item; |
281 | | |
282 | 0 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
283 | 0 | let project = self.project(); |
284 | 0 | project.inner.poll_next(cx) |
285 | 0 | } |
286 | | |
287 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
288 | 0 | self.inner.size_hint() |
289 | 0 | } |
290 | | } |
291 | | |
292 | | // Safety: There is no other symbol named `ORIGIN_EVENT_COLLECTOR`. |
293 | | unsafe_make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector); |
294 | | |
295 | | pub struct OriginEventContext<T> { |
296 | | inner: Option<OriginEventContextImpl>, |
297 | | _phantom: PhantomData<T>, |
298 | | } |
299 | | |
300 | | impl<T> std::fmt::Debug for OriginEventContext<T> { |
301 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
302 | 0 | f.debug_struct("OriginEventContext") |
303 | 0 | .field("inner", &self.inner) |
304 | 0 | .field("_phantom", &self._phantom) |
305 | 0 | .finish() |
306 | 0 | } |
307 | | } |
308 | | |
309 | | impl<T> Clone for OriginEventContext<T> { |
310 | 0 | fn clone(&self) -> Self { |
311 | 0 | Self { |
312 | 0 | inner: self.inner.clone(), |
313 | 0 | _phantom: PhantomData, |
314 | 0 | } |
315 | 0 | } |
316 | | } |
317 | | |
318 | | impl OriginEventContext<()> { |
319 | 68 | pub fn new<'a, T, U>( |
320 | 68 | source_cb: impl Fn() -> &'a T, |
321 | 68 | ) -> impl Future<Output = OriginEventContext<T>> + 'static |
322 | 68 | where |
323 | 68 | T: OriginEventSource<U> + 'static, |
324 | 68 | { |
325 | 0 | let Ok(Some(origin_event_collector)) = Branch (325:13): [True: 0, False: 0]
Branch (325:13): [True: 0, False: 15]
Branch (325:13): [True: 0, False: 3]
Branch (325:13): [True: 0, False: 3]
Branch (325:13): [True: 0, False: 1]
Branch (325:13): [True: 0, False: 1]
Branch (325:13): [True: 0, False: 0]
Branch (325:13): [True: 0, False: 6]
Branch (325:13): [True: 0, False: 0]
Branch (325:13): [True: 0, False: 1]
Branch (325:13): [True: 0, False: 3]
Branch (325:13): [True: 0, False: 0]
Branch (325:13): [True: 0, False: 2]
Branch (325:13): [True: 0, False: 4]
Branch (325:13): [True: 0, False: 1]
Branch (325:13): [Folded - Ignored]
Branch (325:13): [Folded - Ignored]
Branch (325:13): [True: 0, False: 27]
Branch (325:13): [True: 0, False: 1]
|
326 | 68 | ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR) |
327 | | else { |
328 | 68 | return ready(OriginEventContext { |
329 | 68 | inner: None, |
330 | 68 | _phantom: PhantomData, |
331 | 68 | }) |
332 | 68 | .left_future(); |
333 | | }; |
334 | 0 | let event = source_cb().as_event(); |
335 | 0 | async move { |
336 | 0 | let parent_event_id = origin_event_collector |
337 | 0 | .publish_origin_event(event, None, None) |
338 | 0 | .await; |
339 | 0 | OriginEventContext { |
340 | 0 | inner: Some(OriginEventContextImpl { |
341 | 0 | origin_event_collector, |
342 | 0 | parent_event_id, |
343 | 0 | }), |
344 | 0 | _phantom: PhantomData, |
345 | 0 | } |
346 | 0 | } |
347 | 0 | .right_future() |
348 | 68 | } |
349 | | } |
350 | | impl<U> OriginEventContext<U> { |
351 | 48 | pub fn emit<'a, T, F>( |
352 | 48 | &self, |
353 | 48 | event_cb: F, |
354 | 48 | ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U> |
355 | 48 | where |
356 | 48 | T: OriginEventSource<U> + 'a, |
357 | 48 | F: Fn() -> &'a T, |
358 | 48 | { |
359 | 48 | let Some(inner0 ) = &self.inner else { Branch (359:13): [True: 0, False: 9]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 14]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 3]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 3]
Branch (359:13): [True: 0, False: 1]
Branch (359:13): [True: 0, False: 1]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 6]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 1]
Branch (359:13): [True: 0, False: 3]
Branch (359:13): [True: 0, False: 0]
Branch (359:13): [True: 0, False: 2]
Branch (359:13): [True: 0, False: 4]
Branch (359:13): [True: 0, False: 1]
Branch (359:13): [Folded - Ignored]
Branch (359:13): [Folded - Ignored]
|
360 | 48 | return ready(()).left_future(); |
361 | | }; |
362 | 0 | let v = (event_cb)(); |
363 | 0 | v.publish(inner).right_future() |
364 | 48 | } |
365 | | |
366 | 24 | pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>> |
367 | 24 | where |
368 | 24 | U: Send + 'a, |
369 | 24 | O: OriginEventSource<U> + Send + 'a, |
370 | 24 | S: Stream<Item = O> + Send + 'a, |
371 | 24 | { |
372 | 24 | let Some(ctx_impl0 ) = self.inner.clone() else { Branch (372:13): [True: 0, False: 15]
Branch (372:13): [True: 0, False: 3]
Branch (372:13): [True: 0, False: 0]
Branch (372:13): [True: 0, False: 0]
Branch (372:13): [True: 0, False: 6]
Branch (372:13): [True: 0, False: 0]
Branch (372:13): [Folded - Ignored]
Branch (372:13): [Folded - Ignored]
|
373 | 24 | return Box::pin(stream); |
374 | | }; |
375 | 0 | let ctx = self.clone(); |
376 | 0 | Box::pin(CloseEventStream { |
377 | 0 | inner: stream.then(move |item| { |
378 | 0 | let ctx = ctx.clone(); |
379 | 0 | async move { |
380 | 0 | ctx.emit(|| &item).await; |
381 | 0 | item |
382 | 0 | } |
383 | 0 | }), |
384 | 0 | ctx_impl, |
385 | | }) |
386 | 24 | } |
387 | | } |
388 | | |
389 | | #[derive(Debug, Clone)] |
390 | | pub struct OriginEventContextImpl { |
391 | | origin_event_collector: Arc<OriginEventCollector>, |
392 | | parent_event_id: Uuid, |
393 | | } |
394 | | |
395 | | pub trait OriginEventSource<Source>: Sized { |
396 | | fn as_event(&self) -> Event; |
397 | | |
398 | 0 | fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a { |
399 | 0 | let event = self.as_event(); |
400 | 0 | ctx.origin_event_collector |
401 | 0 | .publish_origin_event(event, Some(ctx.parent_event_id), None) |
402 | 0 | // We don't need the Uuid here. |
403 | 0 | .map(|_| ()) |
404 | 0 | } |
405 | | } |
406 | | |
407 | | impl<'a, T, U> OriginEventSource<U> for &'a Response<T> |
408 | | where |
409 | | T: OriginEventSource<U> + 'a, |
410 | | { |
411 | | #[inline] |
412 | 0 | fn as_event(&self) -> Event { |
413 | 0 | self.get_ref().as_event() |
414 | 0 | } |
415 | | } |
416 | | |
417 | | impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus> |
418 | | where |
419 | | T: OriginEventSource<U>, |
420 | | TonicStatus: OriginEventSource<U>, |
421 | | { |
422 | | #[inline] |
423 | 0 | fn as_event(&self) -> Event { |
424 | 0 | match self { |
425 | 0 | Ok(v) => v.as_event(), |
426 | 0 | Err(v) => v.as_event(), |
427 | | } |
428 | 0 | } |
429 | | } |
430 | | |
431 | 0 | fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status { |
432 | 0 | Status { |
433 | 0 | code: tonic_status.code().into(), |
434 | 0 | message: tonic_status.message().into(), |
435 | 0 | details: Vec::new(), |
436 | 0 | } |
437 | 0 | } |
438 | | |
439 | | macro_rules! get_event_type { |
440 | | (Request, $variant:ident, $data:expr) => { |
441 | | event::Event::Request(RequestEvent { |
442 | | event: Some(request_event::Event::$variant($data)), |
443 | | }) |
444 | | }; |
445 | | (Response, $variant:ident, $data:expr) => { |
446 | | event::Event::Response(ResponseEvent { |
447 | | event: Some(response_event::Event::$variant($data)), |
448 | | }) |
449 | | }; |
450 | | (Stream, $variant:ident, $data:expr) => { |
451 | | event::Event::Stream(StreamEvent { |
452 | | event: Some(stream_event::Event::$variant($data)), |
453 | | }) |
454 | | }; |
455 | | } |
456 | | |
457 | | macro_rules! impl_as_event { |
458 | | (Stream, $origin:ident, $type:ident) => { |
459 | | impl_as_event! {Stream, $origin, $type, $type, { |
460 | | #[inline] |
461 | 0 | |val: &$type| val.clone() |
462 | | }} |
463 | | }; |
464 | | (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
465 | | impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, { |
466 | | #[inline] |
467 | 0 | |val: &Result<$type, TonicStatus>| match val { |
468 | 0 | Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)), |
469 | 0 | Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)), |
470 | 0 | } |
471 | | }} |
472 | | }; |
473 | | ($event_type:ident, $origin:ty, $type:ident) => { |
474 | | impl_as_event!(__inner, $event_type, $origin, $type, $type, { |
475 | | #[inline] |
476 | 0 | |val: &$type| get_event_type!($event_type, $type, val.clone()) |
477 | | }); |
478 | | }; |
479 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => { |
480 | | impl_as_event!(__inner, $event_type, $origin, $type, $variant, { |
481 | | #[inline] |
482 | 0 | |val: &$type| get_event_type!($event_type, $variant, val.clone()) |
483 | | }); |
484 | | }; |
485 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
486 | | impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn} |
487 | | }; |
488 | | (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
489 | | impl OriginEventSource<$origin> for $type { |
490 | | #[inline] |
491 | 0 | fn as_event(&self) -> Event { |
492 | 0 | Event { |
493 | 0 | event: Some($data_fn(self)), |
494 | 0 | } |
495 | 0 | } |
496 | | } |
497 | | }; |
498 | | } |
499 | | |
500 | | impl<Origin> OriginEventSource<Origin> for TonicStatus { |
501 | | #[inline] |
502 | 0 | fn as_event(&self) -> Event { |
503 | 0 | Event { |
504 | 0 | event: Some(get_event_type!( |
505 | 0 | Response, |
506 | 0 | Error, |
507 | 0 | tonic_status_to_proto_status(self) |
508 | 0 | )), |
509 | 0 | } |
510 | 0 | } |
511 | | } |
512 | | |
513 | | #[inline] |
514 | 0 | fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event { |
515 | 0 | get_event_type!( |
516 | 0 | Request, |
517 | 0 | BatchUpdateBlobsRequest, |
518 | 0 | BatchUpdateBlobsRequestOverride { |
519 | 0 | instance_name: val.instance_name.clone(), |
520 | 0 | requests: val |
521 | 0 | .requests |
522 | 0 | .iter() |
523 | 0 | .map(|v| batch_update_blobs_request_override::Request { |
524 | 0 | digest: v.digest.clone(), |
525 | 0 | compressor: v.compressor, |
526 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
527 | 0 | }) |
528 | 0 | .collect(), |
529 | 0 | digest_function: val.digest_function, |
530 | 0 | } |
531 | 0 | ) |
532 | 0 | } |
533 | | |
534 | | #[inline] |
535 | 0 | fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event { |
536 | 0 | get_event_type!( |
537 | 0 | Response, |
538 | 0 | BatchReadBlobsResponse, |
539 | 0 | BatchReadBlobsResponseOverride { |
540 | 0 | responses: val |
541 | 0 | .responses |
542 | 0 | .iter() |
543 | 0 | .map(|v| batch_read_blobs_response_override::Response { |
544 | 0 | digest: v.digest.clone(), |
545 | 0 | compressor: v.compressor, |
546 | 0 | status: v.status.clone(), |
547 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
548 | 0 | }) |
549 | 0 | .collect(), |
550 | 0 | } |
551 | 0 | ) |
552 | 0 | } |
553 | | |
554 | | #[inline] |
555 | 0 | fn to_empty_response<T>(_: T) -> event::Event { |
556 | 0 | get_event_type!(Response, Empty, ()) |
557 | 0 | } |
558 | | |
559 | | #[inline] |
560 | 0 | fn to_empty_write_request<T>(_: T) -> event::Event { |
561 | 0 | get_event_type!(Request, WriteRequest, ()) |
562 | 0 | } |
563 | | |
564 | | // -- Requests -- |
565 | | |
566 | | impl_as_event! {Request, (), GetCapabilitiesRequest} |
567 | | impl_as_event! {Request, (), GetActionResultRequest} |
568 | | impl_as_event! {Request, (), UpdateActionResultRequest} |
569 | | impl_as_event! {Request, (), FindMissingBlobsRequest} |
570 | | impl_as_event! {Request, (), BatchReadBlobsRequest} |
571 | | impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override} |
572 | | impl_as_event! {Request, (), FetchBlobRequest} |
573 | | impl_as_event! {Request, (), PushBlobRequest} |
574 | | impl_as_event! {Request, (), GetTreeRequest} |
575 | | impl_as_event! {Request, (), ReadRequest} |
576 | | impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request} |
577 | | impl_as_event! {Request, (), QueryWriteStatusRequest} |
578 | | impl_as_event! {Request, (), ExecuteRequest} |
579 | | impl_as_event! {Request, (), WaitExecutionRequest} |
580 | | impl_as_event! {Request, (), StartExecute, SchedulerStartExecute} |
581 | | |
582 | | // -- Responses -- |
583 | | |
584 | | impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities} |
585 | | impl_as_event! {Response, GetActionResultRequest, ActionResult} |
586 | | impl_as_event! {Response, UpdateActionResultRequest, ActionResult} |
587 | | impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse} |
588 | | impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
589 | | impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse} |
590 | | impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse} |
591 | | impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse} |
592 | | impl_as_event! {Response, FetchBlobRequest, FetchBlobResponse} |
593 | | impl_as_event! {Response, PushBlobRequest, PushBlobResponse} |
594 | | impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override} |
595 | | impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
596 | | impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
597 | | impl_as_event! {Response, StartExecute, (), Empty} |
598 | | |
599 | | // -- Streams -- |
600 | | |
601 | | impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, { |
602 | 0 | |val: &ReadResponse| val.data.len() as u64 |
603 | | }} |
604 | | impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, { |
605 | 0 | |val: &WriteRequest| WriteRequestOverride { |
606 | 0 | resource_name: val.resource_name.clone(), |
607 | 0 | write_offset: val.write_offset, |
608 | 0 | finish_write: val.finish_write, |
609 | 0 | data_len: u64::try_from(val.data.len()).unwrap_or_default(), |
610 | 0 | } |
611 | | }} |
612 | | impl_as_event! {Stream, GetTreeRequest, GetTreeResponse} |
613 | | impl_as_event! {Stream, ExecuteRequest, Operation} |
614 | | impl_as_event! {Stream, WaitExecutionRequest, Operation} |