/build/source/nativelink-util/src/origin_event.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::marker::PhantomData; |
16 | | use std::pin::Pin; |
17 | | use std::sync::{Arc, OnceLock}; |
18 | | |
19 | | use futures::future::ready; |
20 | | use futures::task::{Context, Poll}; |
21 | | use futures::{Future, FutureExt, Stream, StreamExt}; |
22 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
23 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
24 | | BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse, |
25 | | GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse, |
26 | | RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest, |
27 | | }; |
28 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{ |
29 | | batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event, |
30 | | response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride, |
31 | | Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride, |
32 | | }; |
33 | | use nativelink_proto::google::bytestream::{ |
34 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
35 | | WriteResponse, |
36 | | }; |
37 | | use nativelink_proto::google::longrunning::Operation; |
38 | | use nativelink_proto::google::rpc::Status; |
39 | | use pin_project_lite::pin_project; |
40 | | use rand::RngCore; |
41 | | use tokio::sync::mpsc; |
42 | | use tokio::sync::mpsc::error::TrySendError; |
43 | | use tonic::{Response, Status as TonicStatus, Streaming}; |
44 | | use uuid::Uuid; |
45 | | |
46 | | use crate::origin_context::ActiveOriginContext; |
47 | | use crate::{background_spawn, make_symbol}; |
48 | | |
49 | | const ORIGIN_EVENT_VERSION: u32 = 0; |
50 | | |
51 | | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); |
52 | | |
53 | | /// Returns a unique ID for the given event. |
54 | | /// This ID is used to identify the event type. |
55 | | /// The max value that could be output is 0x0FFF, |
56 | | /// meaning you may use the first nibble for other |
57 | | /// purposes. |
58 | | #[inline] |
59 | 31 | pub fn get_id_for_event(event: &Event) -> [u8; 2] { |
60 | 30 | match &event.event { |
61 | 1 | None => [0x00, 0x00], |
62 | 13 | Some(event::Event::Request(req)) => match req.event12 { |
63 | 1 | None => [0x01, 0x00], |
64 | 1 | Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], |
65 | 1 | Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], |
66 | 1 | Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], |
67 | 1 | Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], |
68 | 1 | Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], |
69 | 1 | Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], |
70 | 1 | Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], |
71 | 1 | Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], |
72 | 1 | Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], |
73 | 1 | Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], |
74 | 1 | Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], |
75 | 1 | Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], |
76 | | }, |
77 | 10 | Some(event::Event::Response(res)) => match res.event9 { |
78 | 1 | None => [0x02, 0x00], |
79 | 1 | Some(response_event::Event::Error(_)) => [0x02, 0x01], |
80 | 1 | Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], |
81 | 1 | Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], |
82 | 1 | Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], |
83 | 1 | Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], |
84 | 1 | Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], |
85 | 1 | Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], |
86 | 1 | Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], |
87 | 1 | Some(response_event::Event::Empty(())) => [0x02, 0x09], |
88 | | }, |
89 | 7 | Some(event::Event::Stream(stream)) => match stream.event6 { |
90 | 1 | None => [0x03, 0x00], |
91 | 1 | Some(stream_event::Event::Error(_)) => [0x03, 0x01], |
92 | 1 | Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], |
93 | 1 | Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], |
94 | 1 | Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], |
95 | 1 | Some(stream_event::Event::Operation(_)) => [0x03, 0x05], |
96 | 1 | Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated. |
97 | | }, |
98 | | } |
99 | 31 | } |
100 | | |
101 | | /// Returns a unique node ID for this process. |
102 | 0 | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { |
103 | 0 | let mut node_id = *NODE_ID.get_or_init(|| { |
104 | 0 | let mut rng = rand::thread_rng(); |
105 | 0 | let mut out = [0; 6]; |
106 | 0 | rng.fill_bytes(&mut out); |
107 | 0 | out |
108 | 0 | }); |
109 | 0 | let Some(event) = event else { Branch (109:9): [True: 0, False: 0]
Branch (109:9): [Folded - Ignored]
|
110 | 0 | return node_id; |
111 | | }; |
112 | 0 | let event_id = get_id_for_event(event); |
113 | 0 | node_id[0] = (node_id[0] & 0xF0) | event_id[0]; |
114 | 0 | node_id[1] = event_id[1]; |
115 | 0 | node_id |
116 | 0 | } |
117 | | |
118 | | pub struct OriginEventCollector { |
119 | | sender: mpsc::Sender<OriginEvent>, |
120 | | identity: String, |
121 | | bazel_metadata: Option<RequestMetadata>, |
122 | | } |
123 | | |
124 | | impl OriginEventCollector { |
125 | 0 | pub fn new( |
126 | 0 | sender: mpsc::Sender<OriginEvent>, |
127 | 0 | identity: String, |
128 | 0 | bazel_metadata: Option<RequestMetadata>, |
129 | 0 | ) -> Self { |
130 | 0 | Self { |
131 | 0 | sender, |
132 | 0 | identity, |
133 | 0 | bazel_metadata, |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | | /// Publishes an event to the origin event collector. |
138 | 0 | async fn publish_origin_event( |
139 | 0 | &self, |
140 | 0 | event: Event, |
141 | 0 | parent_event_id: Option<Uuid>, |
142 | 0 | maybe_event_id: Option<Uuid>, |
143 | 0 | ) -> Uuid { |
144 | 0 | let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event)))); |
145 | 0 | let parent_event_id = |
146 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
147 | 0 | // Ignore cases when channel is dropped. |
148 | 0 | let _ = self |
149 | 0 | .sender |
150 | 0 | .send(OriginEvent { |
151 | 0 | version: ORIGIN_EVENT_VERSION, |
152 | 0 | event_id: event_id.as_hyphenated().to_string(), |
153 | 0 | parent_event_id, |
154 | 0 | bazel_request_metadata: self.bazel_metadata.clone(), |
155 | 0 | identity: self.identity.clone(), |
156 | 0 | event: Some(event), |
157 | 0 | }) |
158 | 0 | .await; |
159 | 0 | event_id |
160 | 0 | } |
161 | | |
162 | | /// Publishes an event to the origin event collector. |
163 | | /// If the buffer is full, the event will be sent in a background spawn. |
164 | | /// This is useful for cases where the event is critical and must be sent, |
165 | | /// but cannot await the send operation. |
166 | 0 | fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) { |
167 | 0 | let event_id = Uuid::now_v6(&get_node_id(Some(&event))); |
168 | 0 | let parent_event_id = |
169 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
170 | 0 | self.sender |
171 | 0 | .try_send(OriginEvent { |
172 | 0 | version: ORIGIN_EVENT_VERSION, |
173 | 0 | event_id: event_id.as_hyphenated().to_string(), |
174 | 0 | parent_event_id, |
175 | 0 | bazel_request_metadata: self.bazel_metadata.clone(), |
176 | 0 | identity: self.identity.clone(), |
177 | 0 | event: Some(event), |
178 | 0 | }) |
179 | 0 | .map_or_else( |
180 | 0 | |e| match e { |
181 | 0 | TrySendError::Full(event) => { |
182 | 0 | let sender = self.sender.clone(); |
183 | 0 | background_spawn!("send_end_stream_origin_event", async move { |
184 | 0 | // Ignore cases when channel is dropped. |
185 | 0 | let _ = sender.send(event).await; |
186 | 0 | }); |
187 | | } |
188 | | // Ignore cases when channel is dropped. |
189 | 0 | TrySendError::Closed(_) => {} |
190 | 0 | }, |
191 | 0 | |()| {}, |
192 | 0 | ); |
193 | 0 | } |
194 | | } |
195 | | |
196 | | pin_project! { |
197 | | struct CloseEventStream<S> { |
198 | | #[pin] |
199 | | inner: S, |
200 | | ctx_impl: OriginEventContextImpl, |
201 | | } |
202 | | |
203 | | impl <S> PinnedDrop for CloseEventStream<S> { |
204 | | #[inline] |
205 | | fn drop(this: Pin<&mut Self>) { |
206 | | let event = Event { |
207 | | event: Some(event::Event::Stream(StreamEvent { |
208 | | event: Some(stream_event::Event::Closed(())) |
209 | | })), |
210 | | }; |
211 | | // Try to send the event immediately, if we cannot because |
212 | | // the buffer is full, do it in a background spawn. |
213 | | this.ctx_impl.origin_event_collector |
214 | | .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id)); |
215 | | } |
216 | | } |
217 | | } |
218 | | |
219 | | impl<T, S> Stream for CloseEventStream<S> |
220 | | where |
221 | | S: Stream<Item = T> + Send, |
222 | | { |
223 | | type Item = S::Item; |
224 | | |
225 | 0 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
226 | 0 | let project = self.project(); |
227 | 0 | project.inner.poll_next(cx) |
228 | 0 | } |
229 | | |
230 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
231 | 0 | self.inner.size_hint() |
232 | 0 | } |
233 | | } |
234 | | |
235 | | make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector); |
236 | | |
237 | | pub struct OriginEventContext<T> { |
238 | | inner: Option<OriginEventContextImpl>, |
239 | | _phantom: PhantomData<T>, |
240 | | } |
241 | | |
242 | | impl<T> Clone for OriginEventContext<T> { |
243 | 0 | fn clone(&self) -> Self { |
244 | 0 | Self { |
245 | 0 | inner: self.inner.clone(), |
246 | 0 | _phantom: PhantomData, |
247 | 0 | } |
248 | 0 | } |
249 | | } |
250 | | |
251 | | impl OriginEventContext<()> { |
252 | 38 | pub fn new<'a, T, U>( |
253 | 38 | source_cb: impl Fn() -> &'a T, |
254 | 38 | ) -> impl Future<Output = OriginEventContext<T>> + 'static |
255 | 38 | where |
256 | 38 | T: OriginEventSource<U> + 'static, |
257 | 38 | { |
258 | 0 | let Ok(Some(origin_event_collector)) = Branch (258:13): [True: 0, False: 15]
Branch (258:13): [True: 0, False: 3]
Branch (258:13): [True: 0, False: 3]
Branch (258:13): [True: 0, False: 0]
Branch (258:13): [True: 0, False: 6]
Branch (258:13): [True: 0, False: 0]
Branch (258:13): [True: 0, False: 1]
Branch (258:13): [True: 0, False: 3]
Branch (258:13): [True: 0, False: 0]
Branch (258:13): [True: 0, False: 2]
Branch (258:13): [True: 0, False: 4]
Branch (258:13): [True: 0, False: 1]
Branch (258:13): [Folded - Ignored]
Branch (258:13): [Folded - Ignored]
|
259 | 38 | ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR) |
260 | | else { |
261 | 38 | return ready(OriginEventContext { |
262 | 38 | inner: None, |
263 | 38 | _phantom: PhantomData, |
264 | 38 | }) |
265 | 38 | .left_future(); |
266 | | }; |
267 | 0 | let event = source_cb().as_event(); |
268 | 0 | async move { |
269 | 0 | let parent_event_id = origin_event_collector |
270 | 0 | .publish_origin_event(event, None, None) |
271 | 0 | .await; |
272 | 0 | OriginEventContext { |
273 | 0 | inner: Some(OriginEventContextImpl { |
274 | 0 | origin_event_collector, |
275 | 0 | parent_event_id, |
276 | 0 | }), |
277 | 0 | _phantom: PhantomData, |
278 | 0 | } |
279 | 0 | } |
280 | 0 | .right_future() |
281 | 38 | } |
282 | | } |
283 | | impl<U> OriginEventContext<U> { |
284 | 37 | pub fn emit<'a, T, F>( |
285 | 37 | &self, |
286 | 37 | event_cb: F, |
287 | 37 | ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U> |
288 | 37 | where |
289 | 37 | T: OriginEventSource<U> + 'a, |
290 | 37 | F: Fn() -> &'a T, |
291 | 37 | { |
292 | 37 | let Some(inner0 ) = &self.inner else { Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 14]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 3]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 3]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 6]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 1]
Branch (292:13): [True: 0, False: 3]
Branch (292:13): [True: 0, False: 0]
Branch (292:13): [True: 0, False: 2]
Branch (292:13): [True: 0, False: 4]
Branch (292:13): [True: 0, False: 1]
Branch (292:13): [Folded - Ignored]
Branch (292:13): [Folded - Ignored]
|
293 | 37 | return ready(()).left_future(); |
294 | | }; |
295 | 0 | let v = (event_cb)(); |
296 | 0 | v.publish(inner).right_future() |
297 | 37 | } |
298 | | |
299 | 24 | pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>> |
300 | 24 | where |
301 | 24 | U: Send + 'a, |
302 | 24 | O: OriginEventSource<U> + Send + 'a, |
303 | 24 | S: Stream<Item = O> + Send + 'a, |
304 | 24 | { |
305 | 24 | let Some(ctx_impl0 ) = self.inner.clone() else { Branch (305:13): [True: 0, False: 15]
Branch (305:13): [True: 0, False: 3]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [True: 0, False: 6]
Branch (305:13): [True: 0, False: 0]
Branch (305:13): [Folded - Ignored]
Branch (305:13): [Folded - Ignored]
|
306 | 24 | return Box::pin(stream); |
307 | | }; |
308 | 0 | let ctx = self.clone(); |
309 | 0 | Box::pin(CloseEventStream { |
310 | 0 | inner: stream.then(move |item| { |
311 | 0 | let ctx = ctx.clone(); |
312 | 0 | async move { |
313 | 0 | ctx.emit(|| &item).await; |
314 | 0 | item |
315 | 0 | } |
316 | 0 | }), |
317 | 0 | ctx_impl, |
318 | 0 | }) |
319 | 24 | } |
320 | | } |
321 | | |
322 | | #[derive(Clone)] |
323 | | pub struct OriginEventContextImpl { |
324 | | origin_event_collector: Arc<OriginEventCollector>, |
325 | | parent_event_id: Uuid, |
326 | | } |
327 | | |
328 | | pub trait OriginEventSource<Source>: Sized { |
329 | | fn as_event(&self) -> Event; |
330 | | |
331 | 0 | fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a { |
332 | 0 | let event = self.as_event(); |
333 | 0 | ctx.origin_event_collector |
334 | 0 | .publish_origin_event(event, Some(ctx.parent_event_id), None) |
335 | 0 | // We don't need the Uuid here. |
336 | 0 | .map(|_| ()) |
337 | 0 | } |
338 | | } |
339 | | |
340 | | impl<'a, T, U> OriginEventSource<U> for &'a Response<T> |
341 | | where |
342 | | T: OriginEventSource<U> + 'a, |
343 | | { |
344 | | #[inline] |
345 | 0 | fn as_event(&self) -> Event { |
346 | 0 | self.get_ref().as_event() |
347 | 0 | } |
348 | | } |
349 | | |
350 | | impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus> |
351 | | where |
352 | | T: OriginEventSource<U>, |
353 | | TonicStatus: OriginEventSource<U>, |
354 | | { |
355 | | #[inline] |
356 | 0 | fn as_event(&self) -> Event { |
357 | 0 | match self { |
358 | 0 | Ok(v) => v.as_event(), |
359 | 0 | Err(v) => v.as_event(), |
360 | | } |
361 | 0 | } |
362 | | } |
363 | | |
364 | 0 | fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status { |
365 | 0 | Status { |
366 | 0 | code: tonic_status.code().into(), |
367 | 0 | message: tonic_status.message().into(), |
368 | 0 | details: Vec::new(), |
369 | 0 | } |
370 | 0 | } |
371 | | |
372 | | macro_rules! get_event_type { |
373 | | (Request, $variant:ident, $data:expr) => { |
374 | | event::Event::Request(RequestEvent { |
375 | | event: Some(request_event::Event::$variant($data)), |
376 | | }) |
377 | | }; |
378 | | (Response, $variant:ident, $data:expr) => { |
379 | | event::Event::Response(ResponseEvent { |
380 | | event: Some(response_event::Event::$variant($data)), |
381 | | }) |
382 | | }; |
383 | | (Stream, $variant:ident, $data:expr) => { |
384 | | event::Event::Stream(StreamEvent { |
385 | | event: Some(stream_event::Event::$variant($data)), |
386 | | }) |
387 | | }; |
388 | | } |
389 | | |
390 | | macro_rules! impl_as_event { |
391 | | (Stream, $origin:ident, $type:ident) => { |
392 | | impl_as_event! {Stream, $origin, $type, $type, { |
393 | | #[inline] |
394 | 0 | |val: &$type| val.clone() |
395 | | }} |
396 | | }; |
397 | | (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
398 | | impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, { |
399 | | #[inline] |
400 | 0 | |val: &Result<$type, TonicStatus>| match val { |
401 | 0 | Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)), |
402 | 0 | Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)), |
403 | 0 | } |
404 | | }} |
405 | | }; |
406 | | ($event_type:ident, $origin:ty, $type:ident) => { |
407 | | impl_as_event!(__inner, $event_type, $origin, $type, $type, { |
408 | | #[inline] |
409 | 0 | |val: &$type| get_event_type!($event_type, $type, val.clone()) |
410 | | }); |
411 | | }; |
412 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => { |
413 | | impl_as_event!(__inner, $event_type, $origin, $type, $variant, { |
414 | | #[inline] |
415 | | |val: &$type| get_event_type!($event_type, $variant, val.clone()) |
416 | | }); |
417 | | }; |
418 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
419 | | impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn} |
420 | | }; |
421 | | (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
422 | | impl OriginEventSource<$origin> for $type { |
423 | | #[inline] |
424 | 0 | fn as_event(&self) -> Event { |
425 | 0 | Event { |
426 | 0 | event: Some($data_fn(self)), |
427 | 0 | } |
428 | 0 | } |
429 | | } |
430 | | }; |
431 | | } |
432 | | |
433 | | impl<Origin> OriginEventSource<Origin> for TonicStatus { |
434 | | #[inline] |
435 | 0 | fn as_event(&self) -> Event { |
436 | 0 | Event { |
437 | 0 | event: Some(get_event_type!( |
438 | 0 | Response, |
439 | 0 | Error, |
440 | 0 | tonic_status_to_proto_status(self) |
441 | 0 | )), |
442 | 0 | } |
443 | 0 | } |
444 | | } |
445 | | |
446 | | #[inline] |
447 | 0 | fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event { |
448 | 0 | get_event_type!( |
449 | 0 | Request, |
450 | 0 | BatchUpdateBlobsRequest, |
451 | 0 | BatchUpdateBlobsRequestOverride { |
452 | 0 | instance_name: val.instance_name.clone(), |
453 | 0 | requests: val |
454 | 0 | .requests |
455 | 0 | .iter() |
456 | 0 | .map(|v| batch_update_blobs_request_override::Request { |
457 | 0 | digest: v.digest.clone(), |
458 | 0 | compressor: v.compressor, |
459 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
460 | 0 | }) |
461 | 0 | .collect(), |
462 | 0 | digest_function: val.digest_function, |
463 | 0 | } |
464 | 0 | ) |
465 | 0 | } |
466 | | |
467 | | #[inline] |
468 | 0 | fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event { |
469 | 0 | get_event_type!( |
470 | 0 | Response, |
471 | 0 | BatchReadBlobsResponse, |
472 | 0 | BatchReadBlobsResponseOverride { |
473 | 0 | responses: val |
474 | 0 | .responses |
475 | 0 | .iter() |
476 | 0 | .map(|v| batch_read_blobs_response_override::Response { |
477 | 0 | digest: v.digest.clone(), |
478 | 0 | compressor: v.compressor, |
479 | 0 | status: v.status.clone(), |
480 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
481 | 0 | }) |
482 | 0 | .collect(), |
483 | 0 | } |
484 | 0 | ) |
485 | 0 | } |
486 | | |
487 | | #[inline] |
488 | 0 | fn to_empty_response<T>(_: T) -> event::Event { |
489 | 0 | get_event_type!(Response, Empty, ()) |
490 | 0 | } |
491 | | |
492 | | #[inline] |
493 | 0 | fn to_empty_write_request<T>(_: T) -> event::Event { |
494 | 0 | get_event_type!(Request, WriteRequest, ()) |
495 | 0 | } |
496 | | |
497 | | // -- Requests -- |
498 | | |
499 | | impl_as_event! {Request, (), GetCapabilitiesRequest} |
500 | | impl_as_event! {Request, (), GetActionResultRequest} |
501 | | impl_as_event! {Request, (), UpdateActionResultRequest} |
502 | | impl_as_event! {Request, (), FindMissingBlobsRequest} |
503 | | impl_as_event! {Request, (), BatchReadBlobsRequest} |
504 | | impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override} |
505 | | impl_as_event! {Request, (), GetTreeRequest} |
506 | | impl_as_event! {Request, (), ReadRequest} |
507 | | impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request} |
508 | | impl_as_event! {Request, (), QueryWriteStatusRequest} |
509 | | impl_as_event! {Request, (), ExecuteRequest} |
510 | | impl_as_event! {Request, (), WaitExecutionRequest} |
511 | | |
512 | | // -- Responses -- |
513 | | |
514 | | impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities} |
515 | | impl_as_event! {Response, GetActionResultRequest, ActionResult} |
516 | | impl_as_event! {Response, UpdateActionResultRequest, ActionResult} |
517 | | impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse} |
518 | | impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
519 | | impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse} |
520 | | impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse} |
521 | | impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse} |
522 | | impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override} |
523 | | impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
524 | | impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response} |
525 | | |
526 | | // -- Streams -- |
527 | | |
528 | | impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, { |
529 | 0 | |val: &ReadResponse| val.data.len() as u64 |
530 | | }} |
531 | | impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, { |
532 | 0 | |val: &WriteRequest| WriteRequestOverride { |
533 | 0 | resource_name: val.resource_name.clone(), |
534 | 0 | write_offset: val.write_offset, |
535 | 0 | finish_write: val.finish_write, |
536 | 0 | data_len: u64::try_from(val.data.len()).unwrap_or_default(), |
537 | 0 | } |
538 | | }} |
539 | | impl_as_event! {Stream, GetTreeRequest, GetTreeResponse} |
540 | | impl_as_event! {Stream, ExecuteRequest, Operation} |
541 | | impl_as_event! {Stream, WaitExecutionRequest, Operation} |