/build/source/nativelink-util/src/origin_event.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::marker::PhantomData; |
16 | | use std::pin::Pin; |
17 | | use std::sync::{Arc, OnceLock}; |
18 | | |
19 | | use futures::future::ready; |
20 | | use futures::{Future, FutureExt, Stream, StreamExt}; |
21 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
22 | | ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, |
23 | | BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse, |
24 | | GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse, |
25 | | RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest, |
26 | | }; |
27 | | use nativelink_proto::com::github::trace_machina::nativelink::events::{ |
28 | | batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event, |
29 | | response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride, |
30 | | Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride, |
31 | | }; |
32 | | use nativelink_proto::google::bytestream::{ |
33 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
34 | | WriteResponse, |
35 | | }; |
36 | | use nativelink_proto::google::longrunning::Operation; |
37 | | use nativelink_proto::google::rpc::Status; |
38 | | use rand::RngCore; |
39 | | use tokio::sync::mpsc; |
40 | | use tonic::{Response, Status as TonicStatus, Streaming}; |
41 | | use uuid::Uuid; |
42 | | |
43 | | use crate::make_symbol; |
44 | | use crate::origin_context::ActiveOriginContext; |
45 | | |
46 | | const ORIGIN_EVENT_VERSION: u32 = 0; |
47 | | |
48 | | static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new(); |
49 | | |
50 | | /// Returns a unique ID for the given event. |
51 | | /// This ID is used to identify the event type. |
52 | | /// The max value that could be output is 0x0FFF, |
53 | | /// meaning you may use the first nibble for other |
54 | | /// purposes. |
55 | | #[inline] |
56 | 30 | pub fn get_id_for_event(event: &Event) -> [u8; 2] { |
57 | 29 | match &event.event { |
58 | 1 | None => [0x00, 0x00], |
59 | 13 | Some(event::Event::Request(req)) => match req.event12 { |
60 | 1 | None => [0x01, 0x00], |
61 | 1 | Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01], |
62 | 1 | Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02], |
63 | 1 | Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03], |
64 | 1 | Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04], |
65 | 1 | Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05], |
66 | 1 | Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06], |
67 | 1 | Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07], |
68 | 1 | Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08], |
69 | 1 | Some(request_event::Event::WriteRequest(())) => [0x01, 0x09], |
70 | 1 | Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A], |
71 | 1 | Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B], |
72 | 1 | Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C], |
73 | | }, |
74 | 10 | Some(event::Event::Response(res)) => match res.event9 { |
75 | 1 | None => [0x02, 0x00], |
76 | 1 | Some(response_event::Event::Error(_)) => [0x02, 0x01], |
77 | 1 | Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02], |
78 | 1 | Some(response_event::Event::ActionResult(_)) => [0x02, 0x03], |
79 | 1 | Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04], |
80 | 1 | Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05], |
81 | 1 | Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06], |
82 | 1 | Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07], |
83 | 1 | Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08], |
84 | 1 | Some(response_event::Event::Empty(())) => [0x02, 0x09], |
85 | | }, |
86 | 6 | Some(event::Event::Stream(stream)) => match stream.event5 { |
87 | 1 | None => [0x03, 0x00], |
88 | 1 | Some(stream_event::Event::Error(_)) => [0x03, 0x01], |
89 | 1 | Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02], |
90 | 1 | Some(stream_event::Event::DataLength(_)) => [0x03, 0x03], |
91 | 1 | Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04], |
92 | 1 | Some(stream_event::Event::Operation(_)) => [0x03, 0x05], |
93 | | }, |
94 | | } |
95 | 30 | } |
96 | | |
97 | | /// Returns a unique node ID for this process. |
98 | 0 | pub fn get_node_id(event: Option<&Event>) -> [u8; 6] { |
99 | 0 | let mut node_id = *NODE_ID.get_or_init(|| { |
100 | 0 | let mut rng = rand::thread_rng(); |
101 | 0 | let mut out = [0; 6]; |
102 | 0 | rng.fill_bytes(&mut out); |
103 | 0 | out |
104 | 0 | }); |
105 | 0 | let Some(event) = event else { Branch (105:9): [True: 0, False: 0]
Branch (105:9): [Folded - Ignored]
|
106 | 0 | return node_id; |
107 | | }; |
108 | 0 | let event_id = get_id_for_event(event); |
109 | 0 | node_id[0] = (node_id[0] & 0xF0) | event_id[0]; |
110 | 0 | node_id[1] = event_id[1]; |
111 | 0 | node_id |
112 | 0 | } |
113 | | |
114 | | pub struct OriginEventCollector { |
115 | | sender: mpsc::Sender<OriginEvent>, |
116 | | identity: String, |
117 | | bazel_metadata: Option<RequestMetadata>, |
118 | | } |
119 | | |
120 | | impl OriginEventCollector { |
121 | 0 | pub fn new( |
122 | 0 | sender: mpsc::Sender<OriginEvent>, |
123 | 0 | identity: String, |
124 | 0 | bazel_metadata: Option<RequestMetadata>, |
125 | 0 | ) -> Self { |
126 | 0 | Self { |
127 | 0 | sender, |
128 | 0 | identity, |
129 | 0 | bazel_metadata, |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | 0 | async fn publish_origin_event(&self, event: Event, parent_event_id: Option<Uuid>) -> Uuid { |
134 | 0 | let event_id = Uuid::now_v6(&get_node_id(Some(&event))); |
135 | 0 | let parent_event_id = |
136 | 0 | parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string()); |
137 | 0 | // Failing to send this event means that the receiver has been dropped. |
138 | 0 | let _ = self |
139 | 0 | .sender |
140 | 0 | .send(OriginEvent { |
141 | 0 | version: ORIGIN_EVENT_VERSION, |
142 | 0 | event_id: event_id.as_hyphenated().to_string(), |
143 | 0 | parent_event_id, |
144 | 0 | bazel_request_metadata: self.bazel_metadata.clone(), |
145 | 0 | identity: self.identity.clone(), |
146 | 0 | event: Some(event), |
147 | 0 | }) |
148 | 0 | .await; |
149 | 0 | event_id |
150 | 0 | } |
151 | | } |
152 | | |
153 | | make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector); |
154 | | |
155 | | pub struct OriginEventContext<T> { |
156 | | inner: Option<OriginEventContextImpl>, |
157 | | _phantom: PhantomData<T>, |
158 | | } |
159 | | |
160 | | impl<T> Clone for OriginEventContext<T> { |
161 | 0 | fn clone(&self) -> Self { |
162 | 0 | Self { |
163 | 0 | inner: self.inner.clone(), |
164 | 0 | _phantom: PhantomData, |
165 | 0 | } |
166 | 0 | } |
167 | | } |
168 | | |
169 | | impl OriginEventContext<()> { |
170 | 38 | pub fn new<'a, T, U>( |
171 | 38 | source_cb: impl Fn() -> &'a T, |
172 | 38 | ) -> impl Future<Output = OriginEventContext<T>> + 'static |
173 | 38 | where |
174 | 38 | T: OriginEventSource<U> + 'static, |
175 | 38 | { |
176 | 0 | let Ok(Some(origin_event_collector)) = Branch (176:13): [True: 0, False: 15]
Branch (176:13): [True: 0, False: 3]
Branch (176:13): [True: 0, False: 3]
Branch (176:13): [True: 0, False: 0]
Branch (176:13): [True: 0, False: 6]
Branch (176:13): [True: 0, False: 0]
Branch (176:13): [True: 0, False: 1]
Branch (176:13): [True: 0, False: 3]
Branch (176:13): [True: 0, False: 0]
Branch (176:13): [True: 0, False: 2]
Branch (176:13): [True: 0, False: 4]
Branch (176:13): [True: 0, False: 1]
Branch (176:13): [Folded - Ignored]
Branch (176:13): [Folded - Ignored]
|
177 | 38 | ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR) |
178 | | else { |
179 | 38 | return ready(OriginEventContext { |
180 | 38 | inner: None, |
181 | 38 | _phantom: PhantomData, |
182 | 38 | }) |
183 | 38 | .left_future(); |
184 | | }; |
185 | 0 | let event = source_cb().as_event(); |
186 | 0 | async move { |
187 | 0 | let parent_event_id = origin_event_collector |
188 | 0 | .publish_origin_event(event, None) |
189 | 0 | .await; |
190 | 0 | OriginEventContext { |
191 | 0 | inner: Some(OriginEventContextImpl { |
192 | 0 | origin_event_collector, |
193 | 0 | parent_event_id, |
194 | 0 | }), |
195 | 0 | _phantom: PhantomData, |
196 | 0 | } |
197 | 0 | } |
198 | 0 | .right_future() |
199 | 38 | } |
200 | | } |
201 | | impl<U> OriginEventContext<U> { |
202 | 37 | pub fn emit<'a, T, F>( |
203 | 37 | &self, |
204 | 37 | event_cb: F, |
205 | 37 | ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U> |
206 | 37 | where |
207 | 37 | T: OriginEventSource<U> + 'a, |
208 | 37 | F: Fn() -> &'a T, |
209 | 37 | { |
210 | 37 | let Some(inner0 ) = &self.inner else { Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 14]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 3]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 3]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 6]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 1]
Branch (210:13): [True: 0, False: 3]
Branch (210:13): [True: 0, False: 0]
Branch (210:13): [True: 0, False: 2]
Branch (210:13): [True: 0, False: 4]
Branch (210:13): [True: 0, False: 1]
Branch (210:13): [Folded - Ignored]
Branch (210:13): [Folded - Ignored]
|
211 | 37 | return ready(()).left_future(); |
212 | | }; |
213 | 0 | let v = (event_cb)(); |
214 | 0 | v.publish(inner).right_future() |
215 | 37 | } |
216 | | |
217 | 24 | pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>> |
218 | 24 | where |
219 | 24 | U: Send + 'a, |
220 | 24 | O: OriginEventSource<U> + Send + 'a, |
221 | 24 | S: Stream<Item = O> + Send + 'a, |
222 | 24 | { |
223 | 24 | if self.inner.is_none() { Branch (223:12): [True: 15, False: 0]
Branch (223:12): [True: 3, False: 0]
Branch (223:12): [True: 0, False: 0]
Branch (223:12): [True: 0, False: 0]
Branch (223:12): [True: 6, False: 0]
Branch (223:12): [True: 0, False: 0]
Branch (223:12): [Folded - Ignored]
Branch (223:12): [Folded - Ignored]
|
224 | 24 | return Box::pin(stream); |
225 | 0 | } |
226 | 0 | let ctx = self.clone(); |
227 | 0 | Box::pin(stream.then(move |item| { |
228 | 0 | let ctx = ctx.clone(); |
229 | 0 | async move { |
230 | 0 | ctx.emit(|| &item).await; |
231 | 0 | item |
232 | 0 | } |
233 | 0 | })) |
234 | 24 | } |
235 | | } |
236 | | |
237 | | #[derive(Clone)] |
238 | | pub struct OriginEventContextImpl { |
239 | | origin_event_collector: Arc<OriginEventCollector>, |
240 | | parent_event_id: Uuid, |
241 | | } |
242 | | |
243 | | pub trait OriginEventSource<Source>: Sized { |
244 | | fn as_event(&self) -> Event; |
245 | | |
246 | 0 | fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a { |
247 | 0 | let event = self.as_event(); |
248 | 0 | ctx.origin_event_collector |
249 | 0 | .publish_origin_event(event, Some(ctx.parent_event_id)) |
250 | 0 | // We don't need the Uuid here. |
251 | 0 | .map(|_| ()) |
252 | 0 | } |
253 | | } |
254 | | |
255 | | impl<'a, T, U> OriginEventSource<U> for &'a Response<T> |
256 | | where |
257 | | T: OriginEventSource<U> + 'a, |
258 | | { |
259 | | #[inline] |
260 | 0 | fn as_event(&self) -> Event { |
261 | 0 | self.get_ref().as_event() |
262 | 0 | } |
263 | | } |
264 | | |
265 | | impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus> |
266 | | where |
267 | | T: OriginEventSource<U>, |
268 | | TonicStatus: OriginEventSource<U>, |
269 | | { |
270 | | #[inline] |
271 | 0 | fn as_event(&self) -> Event { |
272 | 0 | match self { |
273 | 0 | Ok(v) => v.as_event(), |
274 | 0 | Err(v) => v.as_event(), |
275 | | } |
276 | 0 | } |
277 | | } |
278 | | |
279 | 0 | fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status { |
280 | 0 | Status { |
281 | 0 | code: tonic_status.code().into(), |
282 | 0 | message: tonic_status.message().into(), |
283 | 0 | details: Vec::new(), |
284 | 0 | } |
285 | 0 | } |
286 | | |
287 | | macro_rules! get_event_type { |
288 | | (Request, $variant:ident, $data:expr) => { |
289 | | event::Event::Request(RequestEvent { |
290 | | event: Some(request_event::Event::$variant($data)), |
291 | | }) |
292 | | }; |
293 | | (Response, $variant:ident, $data:expr) => { |
294 | | event::Event::Response(ResponseEvent { |
295 | | event: Some(response_event::Event::$variant($data)), |
296 | | }) |
297 | | }; |
298 | | (Stream, $variant:ident, $data:expr) => { |
299 | | event::Event::Stream(StreamEvent { |
300 | | event: Some(stream_event::Event::$variant($data)), |
301 | | }) |
302 | | }; |
303 | | } |
304 | | |
305 | | macro_rules! impl_as_event { |
306 | | (Stream, $origin:ident, $type:ident) => { |
307 | | impl_as_event! {Stream, $origin, $type, $type, { |
308 | | #[inline] |
309 | 0 | |val: &$type| val.clone() |
310 | | }} |
311 | | }; |
312 | | (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
313 | | impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, { |
314 | | #[inline] |
315 | 0 | |val: &Result<$type, TonicStatus>| match val { |
316 | 0 | Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)), |
317 | 0 | Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)), |
318 | 0 | } |
319 | | }} |
320 | | }; |
321 | | ($event_type:ident, $origin:ty, $type:ident) => { |
322 | | impl_as_event!(__inner, $event_type, $origin, $type, $type, { |
323 | | #[inline] |
324 | 0 | |val: &$type| get_event_type!($event_type, $type, val.clone()) |
325 | | }); |
326 | | }; |
327 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => { |
328 | | impl_as_event!(__inner, $event_type, $origin, $type, $variant, { |
329 | | #[inline] |
330 | | |val: &$type| get_event_type!($event_type, $variant, val.clone()) |
331 | | }); |
332 | | }; |
333 | | ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
334 | | impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn} |
335 | | }; |
336 | | (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => { |
337 | | impl OriginEventSource<$origin> for $type { |
338 | | #[inline] |
339 | 0 | fn as_event(&self) -> Event { |
340 | 0 | Event { |
341 | 0 | event: Some($data_fn(self)), |
342 | 0 | } |
343 | 0 | } |
344 | | } |
345 | | }; |
346 | | } |
347 | | |
348 | | impl<Origin> OriginEventSource<Origin> for TonicStatus { |
349 | | #[inline] |
350 | 0 | fn as_event(&self) -> Event { |
351 | 0 | Event { |
352 | 0 | event: Some(get_event_type!( |
353 | 0 | Response, |
354 | 0 | Error, |
355 | 0 | tonic_status_to_proto_status(self) |
356 | 0 | )), |
357 | 0 | } |
358 | 0 | } |
359 | | } |
360 | | |
361 | | #[inline] |
362 | 0 | fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event { |
363 | 0 | get_event_type!( |
364 | 0 | Request, |
365 | 0 | BatchUpdateBlobsRequest, |
366 | 0 | BatchUpdateBlobsRequestOverride { |
367 | 0 | instance_name: val.instance_name.clone(), |
368 | 0 | requests: val |
369 | 0 | .requests |
370 | 0 | .iter() |
371 | 0 | .map(|v| batch_update_blobs_request_override::Request { |
372 | 0 | digest: v.digest.clone(), |
373 | 0 | compressor: v.compressor, |
374 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
375 | 0 | }) |
376 | 0 | .collect(), |
377 | 0 | digest_function: val.digest_function, |
378 | 0 | } |
379 | 0 | ) |
380 | 0 | } |
381 | | |
382 | | #[inline] |
383 | 0 | fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event { |
384 | 0 | get_event_type!( |
385 | 0 | Response, |
386 | 0 | BatchReadBlobsResponse, |
387 | 0 | BatchReadBlobsResponseOverride { |
388 | 0 | responses: val |
389 | 0 | .responses |
390 | 0 | .iter() |
391 | 0 | .map(|v| batch_read_blobs_response_override::Response { |
392 | 0 | digest: v.digest.clone(), |
393 | 0 | compressor: v.compressor, |
394 | 0 | status: v.status.clone(), |
395 | 0 | data_len: u64::try_from(v.data.len()).unwrap_or_default(), |
396 | 0 | }) |
397 | 0 | .collect(), |
398 | 0 | } |
399 | 0 | ) |
400 | 0 | } |
401 | | |
402 | | #[inline] |
403 | 0 | fn to_empty<T>(_: T) -> event::Event { |
404 | 0 | get_event_type!(Response, Empty, ()) |
405 | 0 | } |
406 | | |
407 | | // -- Requests -- |
408 | | |
409 | | impl_as_event! {Request, (), GetCapabilitiesRequest} |
410 | | impl_as_event! {Request, (), GetActionResultRequest} |
411 | | impl_as_event! {Request, (), UpdateActionResultRequest} |
412 | | impl_as_event! {Request, (), FindMissingBlobsRequest} |
413 | | impl_as_event! {Request, (), BatchReadBlobsRequest} |
414 | | impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override} |
415 | | impl_as_event! {Request, (), GetTreeRequest} |
416 | | impl_as_event! {Request, (), ReadRequest} |
417 | | impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty} |
418 | | impl_as_event! {Request, (), QueryWriteStatusRequest} |
419 | | impl_as_event! {Request, (), ExecuteRequest} |
420 | | impl_as_event! {Request, (), WaitExecutionRequest} |
421 | | |
422 | | // -- Responses -- |
423 | | |
424 | | impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities} |
425 | | impl_as_event! {Response, GetActionResultRequest, ActionResult} |
426 | | impl_as_event! {Response, UpdateActionResultRequest, ActionResult} |
427 | | impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse} |
428 | | impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty} |
429 | | impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse} |
430 | | impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse} |
431 | | impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse} |
432 | | impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override} |
433 | | impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty} |
434 | | impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty} |
435 | | |
436 | | // -- Streams -- |
437 | | |
438 | | impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, { |
439 | 0 | |val: &ReadResponse| val.data.len() as u64 |
440 | | }} |
441 | | impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, { |
442 | 0 | |val: &WriteRequest| WriteRequestOverride { |
443 | 0 | resource_name: val.resource_name.clone(), |
444 | 0 | write_offset: val.write_offset, |
445 | 0 | finish_write: val.finish_write, |
446 | 0 | data_len: u64::try_from(val.data.len()).unwrap_or_default(), |
447 | 0 | } |
448 | | }} |
449 | | impl_as_event! {Stream, GetTreeRequest, GetTreeResponse} |
450 | | impl_as_event! {Stream, ExecuteRequest, Operation} |
451 | | impl_as_event! {Stream, WaitExecutionRequest, Operation} |