/build/source/nativelink-util/src/origin_event.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::marker::PhantomData; |
16 | | use std::pin::Pin; |
17 | | use std::sync::{Arc, OnceLock}; |
18 | | |
19 | | use base64::prelude::BASE64_STANDARD_NO_PAD; |
20 | | use base64::Engine; |
21 | | use futures::future::ready; |
22 | | use futures::task::{Context, Poll}; |
23 | | use futures::{Future, FutureExt, Stream, StreamExt}; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
25 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
26 | | BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse, |
27 | | GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse, |
28 | | RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest, |
29 | | }; |
30 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{ |
31 | | batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event, |
32 | | response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride, |
33 | | Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride, |
34 | | }; |
35 | | use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::StartExecute; |
36 | | use nativelink_proto::google::bytestream::{ |
37 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
38 | | WriteResponse, |
39 | | }; |
40 | | use nativelink_proto::google::longrunning::Operation; |
41 | | use nativelink_proto::google::rpc::Status; |
42 | | use pin_project_lite::pin_project; |
43 | | use prost::Message; |
44 | | use rand::RngCore; |
45 | | use serde::{Deserialize, Deserializer, Serialize, Serializer}; |
46 | | use tokio::sync::mpsc; |
47 | | use tokio::sync::mpsc::error::TrySendError; |
48 | | use tonic::{Response, Status as TonicStatus, Streaming}; |
49 | | use uuid::Uuid; |
50 | | |
51 | | use crate::origin_context::ActiveOriginContext; |
52 | | use crate::{background_spawn, make_symbol}; |
53 | | |
54 | | const ORIGIN_EVENT_VERSION: u32 = 0; |
55 | | |
56 | | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); |
57 | | |
58 | | /// Returns a unique ID for the given event. |
59 | | /// This ID is used to identify the event type. |
60 | | /// The max value that could be output is 0x0FFF, |
61 | | /// meaning you may use the first nibble for other |
62 | | /// purposes. |
63 | | #[inline] |
64 | 32 | pub const fn get_id_for_event(event: &Event) -> [u8; 2] { |
65 | 31 | match &event.event { |
66 | 1 | None => [0x00, 0x00], |
67 | 14 | Some(event::Event::Request(req)) => match req.event13 { |
68 | 1 | None => [0x01, 0x00], |
69 | 1 | Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], |
70 | 1 | Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], |
71 | 1 | Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], |
72 | 1 | Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], |
73 | 1 | Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], |
74 | 1 | Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], |
75 | 1 | Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], |
76 | 1 | Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], |
77 | 1 | Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], |
78 | 1 | Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], |
79 | 1 | Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], |
80 | 1 | Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], |
81 | 1 | Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D], |
82 | | }, |
83 | 10 | Some(event::Event::Response(res)) => match res.event9 { |
84 | 1 | None => [0x02, 0x00], |
85 | 1 | Some(response_event::Event::Error(_)) => [0x02, 0x01], |
86 | 1 | Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], |
87 | 1 | Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], |
88 | 1 | Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], |
89 | 1 | Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], |
90 | 1 | Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], |
91 | 1 | Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], |
92 | 1 | Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], |
93 | 1 | Some(response_event::Event::Empty(())) => [0x02, 0x09], |
94 | | }, |
95 | 7 | Some(event::Event::Stream(stream)) => match stream.event6 { |
96 | 1 | None => [0x03, 0x00], |
97 | 1 | Some(stream_event::Event::Error(_)) => [0x03, 0x01], |
98 | 1 | Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], |
99 | 1 | Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], |
100 | 1 | Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], |
101 | 1 | Some(stream_event::Event::Operation(_)) => [0x03, 0x05], |
102 | 1 | Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated. |
103 | | }, |
104 | | } |
105 | 32 | } |
106 | | |
107 | | /// Returns a unique node ID for this process. |
108 | 0 | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { |
109 | 0 | let mut node_id = *NODE_ID.get_or_init(|| { |
110 | 0 | let mut out = [0; 6]; |
111 | 0 | rand::rng().fill_bytes(&mut out); |
112 | 0 | out |
113 | 0 | }); |
114 | 0 | let Some(event) = event else { Branch (114:9): [True: 0, False: 0]
Branch (114:9): [Folded - Ignored]
|
115 | 0 | return node_id; |
116 | | }; |
117 | 0 | let event_id = get_id_for_event(event); |
118 | 0 | node_id[0] = (node_id[0] & 0xF0) | event_id[0]; |
119 | 0 | node_id[1] = event_id[1]; |
120 | 0 | node_id |
121 | 0 | } |
122 | | |
123 | 0 | fn serialize_request_metadata<S>( |
124 | 0 | value: &Option<RequestMetadata>, |
125 | 0 | serializer: S, |
126 | 0 | ) -> Result<S::Ok, S::Error> |
127 | 0 | where |
128 | 0 | S: Serializer, |
129 | 0 | { |
130 | 0 | match value { |
131 | 0 | Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())), |
132 | 0 | None => serializer.serialize_none(), |
133 | | } |
134 | 0 | } |
135 | | |
136 | 0 | fn deserialize_request_metadata<'de, D>( |
137 | 0 | deserializer: D, |
138 | 0 | ) -> Result<Option<RequestMetadata>, D::Error> |
139 | 0 | where |
140 | 0 | D: Deserializer<'de>, |
141 | 0 | { |
142 | 0 | let opt = Option::<String>::deserialize(deserializer)?; |
143 | 0 | match opt { |
144 | 0 | Some(s) => { |
145 | 0 | let decoded = BASE64_STANDARD_NO_PAD |
146 | 0 | .decode(s.as_bytes()) |
147 | 0 | .map_err(serde::de::Error::custom)?; |
148 | 0 | RequestMetadata::decode(&*decoded) |
149 | 0 | .map_err(serde::de::Error::custom) |
150 | 0 | .map(Some) |
151 | | } |
152 | 0 | None => Ok(None), |
153 | | } |
154 | 0 | } |
155 | | |
156 | 0 | #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] |
157 | | pub struct OriginMetadata { |
158 | | pub identity: String, |
159 | | #[serde( |
160 | | serialize_with = "serialize_request_metadata", |
161 | | deserialize_with = "deserialize_request_metadata" |
162 | | )] |
163 | | pub bazel_metadata: Option<RequestMetadata>, |
164 | | } |
165 | | |
166 | | pub struct OriginEventCollector { |
167 | | sender: mpsc::Sender<OriginEvent>, |
168 | | pub metadata: OriginMetadata, |
169 | | } |
170 | | |
171 | | impl OriginEventCollector { |
172 | 0 | pub const fn new(sender: mpsc::Sender<OriginEvent>, metadata: OriginMetadata) -> Self { |
173 | 0 | Self { sender, metadata } |
174 | 0 | } |
175 | | |
176 | | #[must_use] |
177 | 0 | pub fn clone_with_metadata(&self, metadata: OriginMetadata) -> Self { |
178 | 0 | Self { |
179 | 0 | sender: self.sender.clone(), |
180 | 0 | metadata, |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | | /// Publishes an event to the origin event collector. |
185 | 0 | async fn publish_origin_event( |
186 | 0 | &self, |
187 | 0 | event: Event, |
188 | 0 | parent_event_id: Option<Uuid>, |
189 | 0 | maybe_event_id: Option<Uuid>, |
190 | 0 | ) -> Uuid { |
191 | 0 | let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event)))); |
192 | 0 | let parent_event_id = |
193 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
194 | 0 | // Ignore cases when channel is dropped. |
195 | 0 | let _ = self |
196 | 0 | .sender |
197 | 0 | .send(OriginEvent { |
198 | 0 | version: ORIGIN_EVENT_VERSION, |
199 | 0 | event_id: event_id.as_hyphenated().to_string(), |
200 | 0 | parent_event_id, |
201 | 0 | bazel_request_metadata: self.metadata.bazel_metadata.clone(), |
202 | 0 | identity: self.metadata.identity.clone(), |
203 | 0 | event: Some(event), |
204 | 0 | }) |
205 | 0 | .await; |
206 | 0 | event_id |
207 | 0 | } |
208 | | |
209 | | /// Publishes an event to the origin event collector. |
210 | | /// If the buffer is full, the event will be sent in a background spawn. |
211 | | /// This is useful for cases where the event is critical and must be sent, |
212 | | /// but cannot await the send operation. |
213 | 0 | fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) { |
214 | 0 | let event_id = Uuid::now_v6(&get_node_id(Some(&event))); |
215 | 0 | let parent_event_id = |
216 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
217 | 0 | self.sender |
218 | 0 | .try_send(OriginEvent { |
219 | 0 | version: ORIGIN_EVENT_VERSION, |
220 | 0 | event_id: event_id.as_hyphenated().to_string(), |
221 | 0 | parent_event_id, |
222 | 0 | bazel_request_metadata: self.metadata.bazel_metadata.clone(), |
223 | 0 | identity: self.metadata.identity.clone(), |
224 | 0 | event: Some(event), |
225 | 0 | }) |
226 | 0 | .map_or_else( |
227 | 0 | |e| match e { |
228 | 0 | TrySendError::Full(event) => { |
229 | 0 | let sender = self.sender.clone(); |
230 | 0 | background_spawn!("send_end_stream_origin_event", async move { |
231 | 0 | // Ignore cases when channel is dropped. |
232 | 0 | let _ = sender.send(event).await; |
233 | 0 | }); |
234 | | } |
235 | | // Ignore cases when channel is dropped. |
236 | 0 | TrySendError::Closed(_) => {} |
237 | 0 | }, |
238 | 0 | |()| {}, |
239 | 0 | ); |
240 | 0 | } |
241 | | } |
242 | | |
243 | | pin_project! { |
244 | | struct CloseEventStream<S> { |
245 | | #[pin] |
246 | | inner: S, |
247 | | ctx_impl: OriginEventContextImpl, |
248 | | } |
249 | | |
250 | | impl <S> PinnedDrop for CloseEventStream<S> { |
251 | | #[inline] |
252 | | fn drop(this: Pin<&mut Self>) { |
253 | | let event = Event { |
254 | | event: Some(event::Event::Stream(StreamEvent { |
255 | | event: Some(stream_event::Event::Closed(())) |
256 | | })), |
257 | | }; |
258 | | // Try to send the event immediately, if we cannot because |
259 | | // the buffer is full, do it in a background spawn. |
260 | | this.ctx_impl.origin_event_collector |
261 | | .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id)); |
262 | | } |
263 | | } |
264 | | } |
265 | | |
266 | | impl<T, S> Stream for CloseEventStream<S> |
267 | | where |
268 | | S: Stream<Item = T> + Send, |
269 | | { |
270 | | type Item = S::Item; |
271 | | |
272 | 0 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
273 | 0 | let project = self.project(); |
274 | 0 | project.inner.poll_next(cx) |
275 | 0 | } |
276 | | |
277 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
278 | 0 | self.inner.size_hint() |
279 | 0 | } |
280 | | } |
281 | | |
282 | | make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector); |
283 | | |
284 | | pub struct OriginEventContext<T> { |
285 | | inner: Option<OriginEventContextImpl>, |
286 | | _phantom: PhantomData<T>, |
287 | | } |
288 | | |
289 | | impl<T> Clone for OriginEventContext<T> { |
290 | 0 | fn clone(&self) -> Self { |
291 | 0 | Self { |
292 | 0 | inner: self.inner.clone(), |
293 | 0 | _phantom: PhantomData, |
294 | 0 | } |
295 | 0 | } |
296 | | } |
297 | | |
298 | | impl OriginEventContext<()> { |
299 | 66 | pub fn new<'a, T, U>( |
300 | 66 | source_cb: impl Fn() -> &'a T, |
301 | 66 | ) -> impl Future<Output = OriginEventContext<T>> + 'static |
302 | 66 | where |
303 | 66 | T: OriginEventSource<U> + 'static, |
304 | 66 | { |
305 | 0 | let Ok(Some(origin_event_collector)) = Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 15]
Branch (305:13): [True: 0, False: 3]
Branch (305:13): [True: 0, False: 3]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 6]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 1]
Branch (305:13): [True: 0, False: 3]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 2]
Branch (305:13): [True: 0, False: 4]
Branch (305:13): [True: 0, False: 1]
Branch (305:13): [Folded - Ignored]
Branch (305:13): [Folded - Ignored]
Branch (305:13): [True: 0, False: 27]
Branch (305:13): [True: 0, False: 1]
|
306 | 66 | ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR) |
307 | | else { |
308 | 66 | return ready(OriginEventContext { |
309 | 66 | inner: None, |
310 | 66 | _phantom: PhantomData, |
311 | 66 | }) |
312 | 66 | .left_future(); |
313 | | }; |
314 | 0 | let event = source_cb().as_event(); |
315 | 0 | async move { |
316 | 0 | let parent_event_id = origin_event_collector |
317 | 0 | .publish_origin_event(event, None, None) |
318 | 0 | .await; |
319 | 0 | OriginEventContext { |
320 | 0 | inner: Some(OriginEventContextImpl { |
321 | 0 | origin_event_collector, |
322 | 0 | parent_event_id, |
323 | 0 | }), |
324 | 0 | _phantom: PhantomData, |
325 | 0 | } |
326 | 0 | } |
327 | 0 | .right_future() |
328 | 66 | } |
329 | | } |
330 | | impl<U> OriginEventContext<U> { |
331 | 46 | pub fn emit<'a, T, F>( |
332 | 46 | &self, |
333 | 46 | event_cb: F, |
334 | 46 | ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U> |
335 | 46 | where |
336 | 46 | T: OriginEventSource<U> + 'a, |
337 | 46 | F: Fn() -> &'a T, |
338 | 46 | { |
339 | 46 | let Some(inner0 ) = &self.inner else { Branch (339:13): [True: 0, False: 9]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 14]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 3]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 3]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 6]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 1]
Branch (339:13): [True: 0, False: 3]
Branch (339:13): [True: 0, False: 0]
Branch (339:13): [True: 0, False: 2]
Branch (339:13): [True: 0, False: 4]
Branch (339:13): [True: 0, False: 1]
Branch (339:13): [Folded - Ignored]
Branch (339:13): [Folded - Ignored]
|
340 | 46 | return ready(()).left_future(); |
341 | | }; |
342 | 0 | let v = (event_cb)(); |
343 | 0 | v.publish(inner).right_future() |
344 | 46 | } |
345 | | |
346 | 24 | pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>> |
347 | 24 | where |
348 | 24 | U: Send + 'a, |
349 | 24 | O: OriginEventSource<U> + Send + 'a, |
350 | 24 | S: Stream<Item = O> + Send + 'a, |
351 | 24 | { |
352 | 24 | let Some(ctx_impl0 ) = self.inner.clone() else { Branch (352:13): [True: 0, False: 15]
Branch (352:13): [True: 0, False: 3]
Branch (352:13): [True: 0, False: 0]
Branch (352:13): [True: 0, False: 0]
Branch (352:13): [True: 0, False: 6]
Branch (352:13): [True: 0, False: 0]
Branch (352:13): [Folded - Ignored]
Branch (352:13): [Folded - Ignored]
|
353 | 24 | return Box::pin(stream); |
354 | | }; |
355 | 0 | let ctx = self.clone(); |
356 | 0 | Box::pin(CloseEventStream { |
357 | 0 | inner: stream.then(move |item| { |
358 | 0 | let ctx = ctx.clone(); |
359 | 0 | async move { |
360 | 0 | ctx.emit(|| &item).await; |
361 | 0 | item |
362 | 0 | } |
363 | 0 | }), |
364 | 0 | ctx_impl, |
365 | 0 | }) |
366 | 24 | } |
367 | | } |
368 | | |
369 | | #[derive(Clone)] |
370 | | pub struct OriginEventContextImpl { |
371 | | origin_event_collector: Arc<OriginEventCollector>, |
372 | | parent_event_id: Uuid, |
373 | | } |
374 | | |
375 | | pub trait OriginEventSource<Source>: Sized { |
376 | | fn as_event(&self) -> Event; |
377 | | |
378 | 0 | fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a { |
379 | 0 | let event = self.as_event(); |
380 | 0 | ctx.origin_event_collector |
381 | 0 | .publish_origin_event(event, Some(ctx.parent_event_id), None) |
382 | 0 | // We don't need the Uuid here. |
383 | 0 | .map(|_| ()) |
384 | 0 | } |
385 | | } |
386 | | |
387 | | impl<'a, T, U> OriginEventSource<U> for &'a Response<T> |
388 | | where |
389 | | T: OriginEventSource<U> + 'a, |
390 | | { |
391 | | #[inline] |
392 | 0 | fn as_event(&self) -> Event { |
393 | 0 | self.get_ref().as_event() |
394 | 0 | } |
395 | | } |
396 | | |
397 | | impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus> |
398 | | where |
399 | | T: OriginEventSource<U>, |
400 | | TonicStatus: OriginEventSource<U>, |
401 | | { |
402 | | #[inline] |
403 | 0 | fn as_event(&self) -> Event { |
404 | 0 | match self { |
405 | 0 | Ok(v) => v.as_event(), |
406 | 0 | Err(v) => v.as_event(), |
407 | | } |
408 | 0 | } |
409 | | } |
410 | | |
411 | 0 | fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status { |
412 | 0 | Status { |
413 | 0 | code: tonic_status.code().into(), |
414 | 0 | message: tonic_status.message().into(), |
415 | 0 | details: Vec::new(), |
416 | 0 | } |
417 | 0 | } |
418 | | |
419 | | macro_rules! get_event_type { |
420 | | (Request, $variant:ident, $data:expr) => { |
421 | | event::Event::Request(RequestEvent { |
422 | | event: Some(request_event::Event::$variant($data)), |
423 | | }) |
424 | | }; |
425 | | (Response, $variant:ident, $data:expr) => { |
426 | | event::Event::Response(ResponseEvent { |
427 | | event: Some(response_event::Event::$variant($data)), |
428 | | }) |
429 | | }; |
430 | | (Stream, $variant:ident, $data:expr) => { |
431 | | event::Event::Stream(StreamEvent { |
432 | | event: Some(stream_event::Event::$variant($data)), |
433 | | }) |
434 | | }; |
435 | | } |
436 | | |
437 | | macro_rules! impl_as_event { |
438 | | (Stream, $origin:ident, $type:ident) => { |
439 | | impl_as_event! {Stream, $origin, $type, $type, { |
440 | | #[inline] |
441 | 0 | |val: &$type| val.clone() |
442 | | }} |
443 | | }; |
444 | | (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
445 | | impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, { |
446 | | #[inline] |
447 | 0 | |val: &Result<$type, TonicStatus>| match val { |
448 | 0 | Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)), |
449 | 0 | Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)), |
450 | 0 | } |
451 | | }} |
452 | | }; |
453 | | ($event_type:ident, $origin:ty, $type:ident) => { |
454 | | impl_as_event!(__inner, $event_type, $origin, $type, $type, { |
455 | | #[inline] |
456 | 0 | |val: &$type| get_event_type!($event_type, $type, val.clone()) |
457 | | }); |
458 | | }; |
459 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => { |
460 | | impl_as_event!(__inner, $event_type, $origin, $type, $variant, { |
461 | | #[inline] |
462 | 0 | |val: &$type| get_event_type!($event_type, $variant, val.clone()) |
463 | | }); |
464 | | }; |
465 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
466 | | impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn} |
467 | | }; |
468 | | (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
469 | | impl OriginEventSource<$origin> for $type { |
470 | | #[inline] |
471 | 0 | fn as_event(&self) -> Event { |
472 | 0 | Event { |
473 | 0 | event: Some($data_fn(self)), |
474 | 0 | } |
475 | 0 | } |
476 | | } |
477 | | }; |
478 | | } |
479 | | |
480 | | impl<Origin> OriginEventSource<Origin> for TonicStatus { |
481 | | #[inline] |
482 | 0 | fn as_event(&self) -> Event { |
483 | 0 | Event { |
484 | 0 | event: Some(get_event_type!( |
485 | 0 | Response, |
486 | 0 | Error, |
487 | 0 | tonic_status_to_proto_status(self) |
488 | 0 | )), |
489 | 0 | } |
490 | 0 | } |
491 | | } |
492 | | |
493 | | #[inline] |
494 | 0 | fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event { |
495 | 0 | get_event_type!( |
496 | 0 | Request, |
497 | 0 | BatchUpdateBlobsRequest, |
498 | 0 | BatchUpdateBlobsRequestOverride { |
499 | 0 | instance_name: val.instance_name.clone(), |
500 | 0 | requests: val |
501 | 0 | .requests |
502 | 0 | .iter() |
503 | 0 | .map(|v| batch_update_blobs_request_override::Request { |
504 | 0 | digest: v.digest.clone(), |
505 | 0 | compressor: v.compressor, |
506 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
507 | 0 | }) |
508 | 0 | .collect(), |
509 | 0 | digest_function: val.digest_function, |
510 | 0 | } |
511 | 0 | ) |
512 | 0 | } |
513 | | |
514 | | #[inline] |
515 | 0 | fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event { |
516 | 0 | get_event_type!( |
517 | 0 | Response, |
518 | 0 | BatchReadBlobsResponse, |
519 | 0 | BatchReadBlobsResponseOverride { |
520 | 0 | responses: val |
521 | 0 | .responses |
522 | 0 | .iter() |
523 | 0 | .map(|v| batch_read_blobs_response_override::Response { |
524 | 0 | digest: v.digest.clone(), |
525 | 0 | compressor: v.compressor, |
526 | 0 | status: v.status.clone(), |
527 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
528 | 0 | }) |
529 | 0 | .collect(), |
530 | 0 | } |
531 | 0 | ) |
532 | 0 | } |
533 | | |
534 | | #[inline] |
535 | 0 | fn to_empty_response<T>(_: T) -> event::Event { |
536 | 0 | get_event_type!(Response, Empty, ()) |
537 | 0 | } |
538 | | |
539 | | #[inline] |
540 | 0 | fn to_empty_write_request<T>(_: T) -> event::Event { |
541 | 0 | get_event_type!(Request, WriteRequest, ()) |
542 | 0 | } |
543 | | |
544 | | // -- Requests -- |
545 | | |
546 | | impl_as_event! {Request, (), GetCapabilitiesRequest} |
547 | | impl_as_event! {Request, (), GetActionResultRequest} |
548 | | impl_as_event! {Request, (), UpdateActionResultRequest} |
549 | | impl_as_event! {Request, (), FindMissingBlobsRequest} |
550 | | impl_as_event! {Request, (), BatchReadBlobsRequest} |
551 | | impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override} |
552 | | impl_as_event! {Request, (), GetTreeRequest} |
553 | | impl_as_event! {Request, (), ReadRequest} |
554 | | impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request} |
555 | | impl_as_event! {Request, (), QueryWriteStatusRequest} |
556 | | impl_as_event! {Request, (), ExecuteRequest} |
557 | | impl_as_event! {Request, (), WaitExecutionRequest} |
558 | | impl_as_event! {Request, (), StartExecute, SchedulerStartExecute} |
559 | | |
560 | | // -- Responses -- |
561 | | |
562 | | impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities} |
563 | | impl_as_event! {Response, GetActionResultRequest, ActionResult} |
564 | | impl_as_event! {Response, UpdateActionResultRequest, ActionResult} |
565 | | impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse} |
566 | | impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
567 | | impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse} |
568 | | impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse} |
569 | | impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse} |
570 | | impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override} |
571 | | impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
572 | | impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
573 | | impl_as_event! {Response, StartExecute, (), Empty} |
574 | | |
575 | | // -- Streams -- |
576 | | |
577 | | impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, { |
578 | 0 | |val: &ReadResponse| val.data.len() as u64 |
579 | | }} |
580 | | impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, { |
581 | 0 | |val: &WriteRequest| WriteRequestOverride { |
582 | 0 | resource_name: val.resource_name.clone(), |
583 | 0 | write_offset: val.write_offset, |
584 | 0 | finish_write: val.finish_write, |
585 | 0 | data_len: u64::try_from(val.data.len()).unwrap_or_default(), |
586 | 0 | } |
587 | | }} |
588 | | impl_as_event! {Stream, GetTreeRequest, GetTreeResponse} |
589 | | impl_as_event! {Stream, ExecuteRequest, Operation} |
590 | | impl_as_event! {Stream, WaitExecutionRequest, Operation} |