Coverage Report

Created: 2025-01-30 02:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::marker::PhantomData;
16
use std::pin::Pin;
17
use std::sync::{Arc, OnceLock};
18
19
use futures::future::ready;
20
use futures::task::{Context, Poll};
21
use futures::{Future, FutureExt, Stream, StreamExt};
22
use nativelink_proto::build::bazel::remote::execution::v2::{
23
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
24
    BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
25
    GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse,
26
    RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest,
27
};
28
use nativelink_proto::com::github::trace_machina::nativelink::events::{
29
    batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event,
30
    response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride,
31
    Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride,
32
};
33
use nativelink_proto::google::bytestream::{
34
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
35
    WriteResponse,
36
};
37
use nativelink_proto::google::longrunning::Operation;
38
use nativelink_proto::google::rpc::Status;
39
use pin_project_lite::pin_project;
40
use rand::RngCore;
41
use tokio::sync::mpsc;
42
use tokio::sync::mpsc::error::TrySendError;
43
use tonic::{Response, Status as TonicStatus, Streaming};
44
use uuid::Uuid;
45
46
use crate::origin_context::ActiveOriginContext;
47
use crate::{background_spawn, make_symbol};
48
49
const ORIGIN_EVENT_VERSION: u32 = 0;
50
51
static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();
52
53
/// Returns a unique ID for the given event.
54
/// This ID is used to identify the event type.
55
/// The max value that could be output is 0x0FFF,
56
/// meaning you may use the first nibble for other
57
/// purposes.
58
#[inline]
59
31
pub fn get_id_for_event(event: &Event) -> [u8; 2] {
60
30
    match &event.event {
61
1
        None => [0x00, 0x00],
62
13
        Some(event::Event::Request(req)) => match 
req.event12
{
63
1
            None => [0x01, 0x00],
64
1
            Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],
65
1
            Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],
66
1
            Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],
67
1
            Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],
68
1
            Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],
69
1
            Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],
70
1
            Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],
71
1
            Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],
72
1
            Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],
73
1
            Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],
74
1
            Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],
75
1
            Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],
76
        },
77
10
        Some(event::Event::Response(res)) => match 
res.event9
{
78
1
            None => [0x02, 0x00],
79
1
            Some(response_event::Event::Error(_)) => [0x02, 0x01],
80
1
            Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],
81
1
            Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],
82
1
            Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],
83
1
            Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],
84
1
            Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],
85
1
            Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],
86
1
            Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],
87
1
            Some(response_event::Event::Empty(())) => [0x02, 0x09],
88
        },
89
7
        Some(event::Event::Stream(stream)) => match 
stream.event6
{
90
1
            None => [0x03, 0x00],
91
1
            Some(stream_event::Event::Error(_)) => [0x03, 0x01],
92
1
            Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],
93
1
            Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],
94
1
            Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],
95
1
            Some(stream_event::Event::Operation(_)) => [0x03, 0x05],
96
1
            Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated.
97
        },
98
    }
99
31
}
100
101
/// Returns a unique node ID for this process.
102
0
pub fn get_node_id(event: Option<&Event>) -> [u8; 6] {
103
0
    let mut node_id = *NODE_ID.get_or_init(|| {
104
0
        let mut rng = rand::thread_rng();
105
0
        let mut out = [0; 6];
106
0
        rng.fill_bytes(&mut out);
107
0
        out
108
0
    });
109
0
    let Some(event) = event else {
  Branch (109:9): [True: 0, False: 0]
  Branch (109:9): [Folded - Ignored]
110
0
        return node_id;
111
    };
112
0
    let event_id = get_id_for_event(event);
113
0
    node_id[0] = (node_id[0] & 0xF0) | event_id[0];
114
0
    node_id[1] = event_id[1];
115
0
    node_id
116
0
}
117
118
pub struct OriginEventCollector {
119
    sender: mpsc::Sender<OriginEvent>,
120
    identity: String,
121
    bazel_metadata: Option<RequestMetadata>,
122
}
123
124
impl OriginEventCollector {
125
0
    pub fn new(
126
0
        sender: mpsc::Sender<OriginEvent>,
127
0
        identity: String,
128
0
        bazel_metadata: Option<RequestMetadata>,
129
0
    ) -> Self {
130
0
        Self {
131
0
            sender,
132
0
            identity,
133
0
            bazel_metadata,
134
0
        }
135
0
    }
136
137
    /// Publishes an event to the origin event collector.
138
0
    async fn publish_origin_event(
139
0
        &self,
140
0
        event: Event,
141
0
        parent_event_id: Option<Uuid>,
142
0
        maybe_event_id: Option<Uuid>,
143
0
    ) -> Uuid {
144
0
        let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event))));
145
0
        let parent_event_id =
146
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
147
0
        // Ignore cases when channel is dropped.
148
0
        let _ = self
149
0
            .sender
150
0
            .send(OriginEvent {
151
0
                version: ORIGIN_EVENT_VERSION,
152
0
                event_id: event_id.as_hyphenated().to_string(),
153
0
                parent_event_id,
154
0
                bazel_request_metadata: self.bazel_metadata.clone(),
155
0
                identity: self.identity.clone(),
156
0
                event: Some(event),
157
0
            })
158
0
            .await;
159
0
        event_id
160
0
    }
161
162
    /// Publishes an event to the origin event collector.
163
    /// If the buffer is full, the event will be sent in a background spawn.
164
    /// This is useful for cases where the event is critical and must be sent,
165
    /// but cannot await the send operation.
166
0
    fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) {
167
0
        let event_id = Uuid::now_v6(&get_node_id(Some(&event)));
168
0
        let parent_event_id =
169
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
170
0
        self.sender
171
0
            .try_send(OriginEvent {
172
0
                version: ORIGIN_EVENT_VERSION,
173
0
                event_id: event_id.as_hyphenated().to_string(),
174
0
                parent_event_id,
175
0
                bazel_request_metadata: self.bazel_metadata.clone(),
176
0
                identity: self.identity.clone(),
177
0
                event: Some(event),
178
0
            })
179
0
            .map_or_else(
180
0
                |e| match e {
181
0
                    TrySendError::Full(event) => {
182
0
                        let sender = self.sender.clone();
183
0
                        background_spawn!("send_end_stream_origin_event", async move {
184
0
                            // Ignore cases when channel is dropped.
185
0
                            let _ = sender.send(event).await;
186
0
                        });
187
                    }
188
                    // Ignore cases when channel is dropped.
189
0
                    TrySendError::Closed(_) => {}
190
0
                },
191
0
                |()| {},
192
0
            );
193
0
    }
194
}
195
196
pin_project! {
197
    struct CloseEventStream<S> {
198
        #[pin]
199
        inner: S,
200
        ctx_impl: OriginEventContextImpl,
201
    }
202
203
    impl <S> PinnedDrop for CloseEventStream<S> {
204
        #[inline]
205
        fn drop(this: Pin<&mut Self>) {
206
            let event = Event {
207
                event: Some(event::Event::Stream(StreamEvent {
208
                    event: Some(stream_event::Event::Closed(()))
209
                })),
210
            };
211
            // Try to send the event immediately, if we cannot because
212
            // the buffer is full, do it in a background spawn.
213
            this.ctx_impl.origin_event_collector
214
                .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id));
215
        }
216
    }
217
}
218
219
impl<T, S> Stream for CloseEventStream<S>
220
where
221
    S: Stream<Item = T> + Send,
222
{
223
    type Item = S::Item;
224
225
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
226
0
        let project = self.project();
227
0
        project.inner.poll_next(cx)
228
0
    }
229
230
0
    fn size_hint(&self) -> (usize, Option<usize>) {
231
0
        self.inner.size_hint()
232
0
    }
233
}
234
235
make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);
236
237
pub struct OriginEventContext<T> {
238
    inner: Option<OriginEventContextImpl>,
239
    _phantom: PhantomData<T>,
240
}
241
242
impl<T> Clone for OriginEventContext<T> {
243
0
    fn clone(&self) -> Self {
244
0
        Self {
245
0
            inner: self.inner.clone(),
246
0
            _phantom: PhantomData,
247
0
        }
248
0
    }
249
}
250
251
impl OriginEventContext<()> {
252
38
    pub fn new<'a, T, U>(
253
38
        source_cb: impl Fn() -> &'a T,
254
38
    ) -> impl Future<Output = OriginEventContext<T>> + 'static
255
38
    where
256
38
        T: OriginEventSource<U> + 'static,
257
38
    {
258
0
        let Ok(Some(origin_event_collector)) =
  Branch (258:13): [True: 0, False: 15]
  Branch (258:13): [True: 0, False: 3]
  Branch (258:13): [True: 0, False: 3]
  Branch (258:13): [True: 0, False: 0]
  Branch (258:13): [True: 0, False: 6]
  Branch (258:13): [True: 0, False: 0]
  Branch (258:13): [True: 0, False: 1]
  Branch (258:13): [True: 0, False: 3]
  Branch (258:13): [True: 0, False: 0]
  Branch (258:13): [True: 0, False: 2]
  Branch (258:13): [True: 0, False: 4]
  Branch (258:13): [True: 0, False: 1]
  Branch (258:13): [Folded - Ignored]
  Branch (258:13): [Folded - Ignored]
259
38
            ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
260
        else {
261
38
            return ready(OriginEventContext {
262
38
                inner: None,
263
38
                _phantom: PhantomData,
264
38
            })
265
38
            .left_future();
266
        };
267
0
        let event = source_cb().as_event();
268
0
        async move {
269
0
            let parent_event_id = origin_event_collector
270
0
                .publish_origin_event(event, None, None)
271
0
                .await;
272
0
            OriginEventContext {
273
0
                inner: Some(OriginEventContextImpl {
274
0
                    origin_event_collector,
275
0
                    parent_event_id,
276
0
                }),
277
0
                _phantom: PhantomData,
278
0
            }
279
0
        }
280
0
        .right_future()
281
38
    }
282
}
283
impl<U> OriginEventContext<U> {
284
37
    pub fn emit<'a, T, F>(
285
37
        &self,
286
37
        event_cb: F,
287
37
    ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U>
288
37
    where
289
37
        T: OriginEventSource<U> + 'a,
290
37
        F: Fn() -> &'a T,
291
37
    {
292
37
        let Some(
inner0
) = &self.inner else {
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 14]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 3]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 3]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 6]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 1]
  Branch (292:13): [True: 0, False: 3]
  Branch (292:13): [True: 0, False: 0]
  Branch (292:13): [True: 0, False: 2]
  Branch (292:13): [True: 0, False: 4]
  Branch (292:13): [True: 0, False: 1]
  Branch (292:13): [Folded - Ignored]
  Branch (292:13): [Folded - Ignored]
293
37
            return ready(()).left_future();
294
        };
295
0
        let v = (event_cb)();
296
0
        v.publish(inner).right_future()
297
37
    }
298
299
24
    pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>>
300
24
    where
301
24
        U: Send + 'a,
302
24
        O: OriginEventSource<U> + Send + 'a,
303
24
        S: Stream<Item = O> + Send + 'a,
304
24
    {
305
24
        let Some(
ctx_impl0
) = self.inner.clone() else {
  Branch (305:13): [True: 0, False: 15]
  Branch (305:13): [True: 0, False: 3]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 6]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [Folded - Ignored]
  Branch (305:13): [Folded - Ignored]
306
24
            return Box::pin(stream);
307
        };
308
0
        let ctx = self.clone();
309
0
        Box::pin(CloseEventStream {
310
0
            inner: stream.then(move |item| {
311
0
                let ctx = ctx.clone();
312
0
                async move {
313
0
                    ctx.emit(|| &item).await;
314
0
                    item
315
0
                }
316
0
            }),
317
0
            ctx_impl,
318
0
        })
319
24
    }
320
}
321
322
#[derive(Clone)]
323
pub struct OriginEventContextImpl {
324
    origin_event_collector: Arc<OriginEventCollector>,
325
    parent_event_id: Uuid,
326
}
327
328
pub trait OriginEventSource<Source>: Sized {
329
    fn as_event(&self) -> Event;
330
331
0
    fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a {
332
0
        let event = self.as_event();
333
0
        ctx.origin_event_collector
334
0
            .publish_origin_event(event, Some(ctx.parent_event_id), None)
335
0
            // We don't need the Uuid here.
336
0
            .map(|_| ())
337
0
    }
338
}
339
340
impl<'a, T, U> OriginEventSource<U> for &'a Response<T>
341
where
342
    T: OriginEventSource<U> + 'a,
343
{
344
    #[inline]
345
0
    fn as_event(&self) -> Event {
346
0
        self.get_ref().as_event()
347
0
    }
348
}
349
350
impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus>
351
where
352
    T: OriginEventSource<U>,
353
    TonicStatus: OriginEventSource<U>,
354
{
355
    #[inline]
356
0
    fn as_event(&self) -> Event {
357
0
        match self {
358
0
            Ok(v) => v.as_event(),
359
0
            Err(v) => v.as_event(),
360
        }
361
0
    }
362
}
363
364
0
fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status {
365
0
    Status {
366
0
        code: tonic_status.code().into(),
367
0
        message: tonic_status.message().into(),
368
0
        details: Vec::new(),
369
0
    }
370
0
}
371
372
macro_rules! get_event_type {
373
    (Request, $variant:ident, $data:expr) => {
374
        event::Event::Request(RequestEvent {
375
            event: Some(request_event::Event::$variant($data)),
376
        })
377
    };
378
    (Response, $variant:ident, $data:expr) => {
379
        event::Event::Response(ResponseEvent {
380
            event: Some(response_event::Event::$variant($data)),
381
        })
382
    };
383
    (Stream, $variant:ident, $data:expr) => {
384
        event::Event::Stream(StreamEvent {
385
            event: Some(stream_event::Event::$variant($data)),
386
        })
387
    };
388
}
389
390
macro_rules! impl_as_event {
391
    (Stream, $origin:ident, $type:ident) => {
392
        impl_as_event! {Stream, $origin, $type, $type, {
393
            #[inline]
394
0
            |val: &$type| val.clone()
395
        }}
396
    };
397
    (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
398
        impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, {
399
            #[inline]
400
0
            |val: &Result<$type, TonicStatus>| match val {
401
0
                Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)),
402
0
                Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)),
403
0
            }
404
        }}
405
    };
406
    ($event_type:ident, $origin:ty, $type:ident) => {
407
        impl_as_event!(__inner, $event_type, $origin, $type, $type, {
408
            #[inline]
409
0
            |val: &$type| get_event_type!($event_type, $type, val.clone())
410
        });
411
    };
412
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => {
413
        impl_as_event!(__inner, $event_type, $origin, $type, $variant, {
414
            #[inline]
415
            |val: &$type| get_event_type!($event_type, $variant, val.clone())
416
        });
417
    };
418
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
419
        impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn}
420
    };
421
    (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
422
        impl OriginEventSource<$origin> for $type {
423
            #[inline]
424
0
            fn as_event(&self) -> Event {
425
0
                Event {
426
0
                    event: Some($data_fn(self)),
427
0
                }
428
0
            }
429
        }
430
    };
431
}
432
433
impl<Origin> OriginEventSource<Origin> for TonicStatus {
434
    #[inline]
435
0
    fn as_event(&self) -> Event {
436
0
        Event {
437
0
            event: Some(get_event_type!(
438
0
                Response,
439
0
                Error,
440
0
                tonic_status_to_proto_status(self)
441
0
            )),
442
0
        }
443
0
    }
444
}
445
446
#[inline]
447
0
fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event {
448
0
    get_event_type!(
449
0
        Request,
450
0
        BatchUpdateBlobsRequest,
451
0
        BatchUpdateBlobsRequestOverride {
452
0
            instance_name: val.instance_name.clone(),
453
0
            requests: val
454
0
                .requests
455
0
                .iter()
456
0
                .map(|v| batch_update_blobs_request_override::Request {
457
0
                    digest: v.digest.clone(),
458
0
                    compressor: v.compressor,
459
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
460
0
                })
461
0
                .collect(),
462
0
            digest_function: val.digest_function,
463
0
        }
464
0
    )
465
0
}
466
467
#[inline]
468
0
fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event {
469
0
    get_event_type!(
470
0
        Response,
471
0
        BatchReadBlobsResponse,
472
0
        BatchReadBlobsResponseOverride {
473
0
            responses: val
474
0
                .responses
475
0
                .iter()
476
0
                .map(|v| batch_read_blobs_response_override::Response {
477
0
                    digest: v.digest.clone(),
478
0
                    compressor: v.compressor,
479
0
                    status: v.status.clone(),
480
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
481
0
                })
482
0
                .collect(),
483
0
        }
484
0
    )
485
0
}
486
487
#[inline]
488
0
fn to_empty_response<T>(_: T) -> event::Event {
489
0
    get_event_type!(Response, Empty, ())
490
0
}
491
492
#[inline]
493
0
fn to_empty_write_request<T>(_: T) -> event::Event {
494
0
    get_event_type!(Request, WriteRequest, ())
495
0
}
496
497
// -- Requests --
498
499
impl_as_event! {Request, (), GetCapabilitiesRequest}
500
impl_as_event! {Request, (), GetActionResultRequest}
501
impl_as_event! {Request, (), UpdateActionResultRequest}
502
impl_as_event! {Request, (), FindMissingBlobsRequest}
503
impl_as_event! {Request, (), BatchReadBlobsRequest}
504
impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override}
505
impl_as_event! {Request, (), GetTreeRequest}
506
impl_as_event! {Request, (), ReadRequest}
507
impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request}
508
impl_as_event! {Request, (), QueryWriteStatusRequest}
509
impl_as_event! {Request, (), ExecuteRequest}
510
impl_as_event! {Request, (), WaitExecutionRequest}
511
512
// -- Responses --
513
514
impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities}
515
impl_as_event! {Response, GetActionResultRequest, ActionResult}
516
impl_as_event! {Response, UpdateActionResultRequest, ActionResult}
517
impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse}
518
impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
519
impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse}
520
impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse}
521
impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse}
522
impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override}
523
impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
524
impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
525
526
// -- Streams --
527
528
impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, {
529
0
    |val: &ReadResponse| val.data.len() as u64
530
}}
531
impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, {
532
0
    |val: &WriteRequest| WriteRequestOverride {
533
0
        resource_name: val.resource_name.clone(),
534
0
        write_offset: val.write_offset,
535
0
        finish_write: val.finish_write,
536
0
        data_len: u64::try_from(val.data.len()).unwrap_or_default(),
537
0
    }
538
}}
539
impl_as_event! {Stream, GetTreeRequest, GetTreeResponse}
540
impl_as_event! {Stream, ExecuteRequest, Operation}
541
impl_as_event! {Stream, WaitExecutionRequest, Operation}