Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::marker::PhantomData;
16
use std::pin::Pin;
17
use std::sync::{Arc, OnceLock};
18
19
use base64::prelude::BASE64_STANDARD_NO_PAD;
20
use base64::Engine;
21
use futures::future::ready;
22
use futures::task::{Context, Poll};
23
use futures::{Future, FutureExt, Stream, StreamExt};
24
use nativelink_proto::build::bazel::remote::execution::v2::{
25
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
26
    BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
27
    GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse,
28
    RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest,
29
};
30
use nativelink_proto::com::github::trace_machina::nativelink::events::{
31
    batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event,
32
    response_event, stream_event, BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride,
33
    Event, OriginEvent, RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride,
34
};
35
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::StartExecute;
36
use nativelink_proto::google::bytestream::{
37
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
38
    WriteResponse,
39
};
40
use nativelink_proto::google::longrunning::Operation;
41
use nativelink_proto::google::rpc::Status;
42
use pin_project_lite::pin_project;
43
use prost::Message;
44
use rand::RngCore;
45
use serde::{Deserialize, Deserializer, Serialize, Serializer};
46
use tokio::sync::mpsc;
47
use tokio::sync::mpsc::error::TrySendError;
48
use tonic::{Response, Status as TonicStatus, Streaming};
49
use uuid::Uuid;
50
51
use crate::origin_context::ActiveOriginContext;
52
use crate::{background_spawn, make_symbol};
53
54
const ORIGIN_EVENT_VERSION: u32 = 0;
55
56
static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();
57
58
/// Returns a unique ID for the given event.
59
/// This ID is used to identify the event type.
60
/// The max value that could be output is 0x0FFF,
61
/// meaning you may use the first nibble for other
62
/// purposes.
63
#[inline]
64
32
pub const fn get_id_for_event(event: &Event) -> [u8; 2] {
65
31
    match &event.event {
66
1
        None => [0x00, 0x00],
67
14
        Some(event::Event::Request(req)) => match 
req.event13
{
68
1
            None => [0x01, 0x00],
69
1
            Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],
70
1
            Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],
71
1
            Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],
72
1
            Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],
73
1
            Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],
74
1
            Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],
75
1
            Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],
76
1
            Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],
77
1
            Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],
78
1
            Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],
79
1
            Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],
80
1
            Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],
81
1
            Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D],
82
        },
83
10
        Some(event::Event::Response(res)) => match 
res.event9
{
84
1
            None => [0x02, 0x00],
85
1
            Some(response_event::Event::Error(_)) => [0x02, 0x01],
86
1
            Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],
87
1
            Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],
88
1
            Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],
89
1
            Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],
90
1
            Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],
91
1
            Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],
92
1
            Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],
93
1
            Some(response_event::Event::Empty(())) => [0x02, 0x09],
94
        },
95
7
        Some(event::Event::Stream(stream)) => match 
stream.event6
{
96
1
            None => [0x03, 0x00],
97
1
            Some(stream_event::Event::Error(_)) => [0x03, 0x01],
98
1
            Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],
99
1
            Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],
100
1
            Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],
101
1
            Some(stream_event::Event::Operation(_)) => [0x03, 0x05],
102
1
            Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated.
103
        },
104
    }
105
32
}
106
107
/// Returns a unique node ID for this process.
108
0
pub fn get_node_id(event: Option<&Event>) -> [u8; 6] {
109
0
    let mut node_id = *NODE_ID.get_or_init(|| {
110
0
        let mut out = [0; 6];
111
0
        rand::rng().fill_bytes(&mut out);
112
0
        out
113
0
    });
114
0
    let Some(event) = event else {
  Branch (114:9): [True: 0, False: 0]
  Branch (114:9): [Folded - Ignored]
115
0
        return node_id;
116
    };
117
0
    let event_id = get_id_for_event(event);
118
0
    node_id[0] = (node_id[0] & 0xF0) | event_id[0];
119
0
    node_id[1] = event_id[1];
120
0
    node_id
121
0
}
122
123
0
fn serialize_request_metadata<S>(
124
0
    value: &Option<RequestMetadata>,
125
0
    serializer: S,
126
0
) -> Result<S::Ok, S::Error>
127
0
where
128
0
    S: Serializer,
129
0
{
130
0
    match value {
131
0
        Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())),
132
0
        None => serializer.serialize_none(),
133
    }
134
0
}
135
136
0
fn deserialize_request_metadata<'de, D>(
137
0
    deserializer: D,
138
0
) -> Result<Option<RequestMetadata>, D::Error>
139
0
where
140
0
    D: Deserializer<'de>,
141
0
{
142
0
    let opt = Option::<String>::deserialize(deserializer)?;
143
0
    match opt {
144
0
        Some(s) => {
145
0
            let decoded = BASE64_STANDARD_NO_PAD
146
0
                .decode(s.as_bytes())
147
0
                .map_err(serde::de::Error::custom)?;
148
0
            RequestMetadata::decode(&*decoded)
149
0
                .map_err(serde::de::Error::custom)
150
0
                .map(Some)
151
        }
152
0
        None => Ok(None),
153
    }
154
0
}
155
156
0
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
157
pub struct OriginMetadata {
158
    pub identity: String,
159
    #[serde(
160
        serialize_with = "serialize_request_metadata",
161
        deserialize_with = "deserialize_request_metadata"
162
    )]
163
    pub bazel_metadata: Option<RequestMetadata>,
164
}
165
166
pub struct OriginEventCollector {
167
    sender: mpsc::Sender<OriginEvent>,
168
    pub metadata: OriginMetadata,
169
}
170
171
impl OriginEventCollector {
172
0
    pub const fn new(sender: mpsc::Sender<OriginEvent>, metadata: OriginMetadata) -> Self {
173
0
        Self { sender, metadata }
174
0
    }
175
176
    #[must_use]
177
0
    pub fn clone_with_metadata(&self, metadata: OriginMetadata) -> Self {
178
0
        Self {
179
0
            sender: self.sender.clone(),
180
0
            metadata,
181
0
        }
182
0
    }
183
184
    /// Publishes an event to the origin event collector.
185
0
    async fn publish_origin_event(
186
0
        &self,
187
0
        event: Event,
188
0
        parent_event_id: Option<Uuid>,
189
0
        maybe_event_id: Option<Uuid>,
190
0
    ) -> Uuid {
191
0
        let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event))));
192
0
        let parent_event_id =
193
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
194
0
        // Ignore cases when channel is dropped.
195
0
        let _ = self
196
0
            .sender
197
0
            .send(OriginEvent {
198
0
                version: ORIGIN_EVENT_VERSION,
199
0
                event_id: event_id.as_hyphenated().to_string(),
200
0
                parent_event_id,
201
0
                bazel_request_metadata: self.metadata.bazel_metadata.clone(),
202
0
                identity: self.metadata.identity.clone(),
203
0
                event: Some(event),
204
0
            })
205
0
            .await;
206
0
        event_id
207
0
    }
208
209
    /// Publishes an event to the origin event collector.
210
    /// If the buffer is full, the event will be sent in a background spawn.
211
    /// This is useful for cases where the event is critical and must be sent,
212
    /// but cannot await the send operation.
213
0
    fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) {
214
0
        let event_id = Uuid::now_v6(&get_node_id(Some(&event)));
215
0
        let parent_event_id =
216
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
217
0
        self.sender
218
0
            .try_send(OriginEvent {
219
0
                version: ORIGIN_EVENT_VERSION,
220
0
                event_id: event_id.as_hyphenated().to_string(),
221
0
                parent_event_id,
222
0
                bazel_request_metadata: self.metadata.bazel_metadata.clone(),
223
0
                identity: self.metadata.identity.clone(),
224
0
                event: Some(event),
225
0
            })
226
0
            .map_or_else(
227
0
                |e| match e {
228
0
                    TrySendError::Full(event) => {
229
0
                        let sender = self.sender.clone();
230
0
                        background_spawn!("send_end_stream_origin_event", async move {
231
0
                            // Ignore cases when channel is dropped.
232
0
                            let _ = sender.send(event).await;
233
0
                        });
234
                    }
235
                    // Ignore cases when channel is dropped.
236
0
                    TrySendError::Closed(_) => {}
237
0
                },
238
0
                |()| {},
239
0
            );
240
0
    }
241
}
242
243
pin_project! {
244
    struct CloseEventStream<S> {
245
        #[pin]
246
        inner: S,
247
        ctx_impl: OriginEventContextImpl,
248
    }
249
250
    impl <S> PinnedDrop for CloseEventStream<S> {
251
        #[inline]
252
        fn drop(this: Pin<&mut Self>) {
253
            let event = Event {
254
                event: Some(event::Event::Stream(StreamEvent {
255
                    event: Some(stream_event::Event::Closed(()))
256
                })),
257
            };
258
            // Try to send the event immediately, if we cannot because
259
            // the buffer is full, do it in a background spawn.
260
            this.ctx_impl.origin_event_collector
261
                .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id));
262
        }
263
    }
264
}
265
266
impl<T, S> Stream for CloseEventStream<S>
267
where
268
    S: Stream<Item = T> + Send,
269
{
270
    type Item = S::Item;
271
272
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
273
0
        let project = self.project();
274
0
        project.inner.poll_next(cx)
275
0
    }
276
277
0
    fn size_hint(&self) -> (usize, Option<usize>) {
278
0
        self.inner.size_hint()
279
0
    }
280
}
281
282
make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);
283
284
pub struct OriginEventContext<T> {
285
    inner: Option<OriginEventContextImpl>,
286
    _phantom: PhantomData<T>,
287
}
288
289
impl<T> Clone for OriginEventContext<T> {
290
0
    fn clone(&self) -> Self {
291
0
        Self {
292
0
            inner: self.inner.clone(),
293
0
            _phantom: PhantomData,
294
0
        }
295
0
    }
296
}
297
298
impl OriginEventContext<()> {
299
66
    pub fn new<'a, T, U>(
300
66
        source_cb: impl Fn() -> &'a T,
301
66
    ) -> impl Future<Output = OriginEventContext<T>> + 'static
302
66
    where
303
66
        T: OriginEventSource<U> + 'static,
304
66
    {
305
0
        let Ok(Some(origin_event_collector)) =
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 15]
  Branch (305:13): [True: 0, False: 3]
  Branch (305:13): [True: 0, False: 3]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 6]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 1]
  Branch (305:13): [True: 0, False: 3]
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [True: 0, False: 2]
  Branch (305:13): [True: 0, False: 4]
  Branch (305:13): [True: 0, False: 1]
  Branch (305:13): [Folded - Ignored]
  Branch (305:13): [Folded - Ignored]
  Branch (305:13): [True: 0, False: 27]
  Branch (305:13): [True: 0, False: 1]
306
66
            ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
307
        else {
308
66
            return ready(OriginEventContext {
309
66
                inner: None,
310
66
                _phantom: PhantomData,
311
66
            })
312
66
            .left_future();
313
        };
314
0
        let event = source_cb().as_event();
315
0
        async move {
316
0
            let parent_event_id = origin_event_collector
317
0
                .publish_origin_event(event, None, None)
318
0
                .await;
319
0
            OriginEventContext {
320
0
                inner: Some(OriginEventContextImpl {
321
0
                    origin_event_collector,
322
0
                    parent_event_id,
323
0
                }),
324
0
                _phantom: PhantomData,
325
0
            }
326
0
        }
327
0
        .right_future()
328
66
    }
329
}
330
impl<U> OriginEventContext<U> {
331
46
    pub fn emit<'a, T, F>(
332
46
        &self,
333
46
        event_cb: F,
334
46
    ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U>
335
46
    where
336
46
        T: OriginEventSource<U> + 'a,
337
46
        F: Fn() -> &'a T,
338
46
    {
339
46
        let Some(
inner0
) = &self.inner else {
  Branch (339:13): [True: 0, False: 9]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 14]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 3]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 3]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 6]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 1]
  Branch (339:13): [True: 0, False: 3]
  Branch (339:13): [True: 0, False: 0]
  Branch (339:13): [True: 0, False: 2]
  Branch (339:13): [True: 0, False: 4]
  Branch (339:13): [True: 0, False: 1]
  Branch (339:13): [Folded - Ignored]
  Branch (339:13): [Folded - Ignored]
340
46
            return ready(()).left_future();
341
        };
342
0
        let v = (event_cb)();
343
0
        v.publish(inner).right_future()
344
46
    }
345
346
24
    pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>>
347
24
    where
348
24
        U: Send + 'a,
349
24
        O: OriginEventSource<U> + Send + 'a,
350
24
        S: Stream<Item = O> + Send + 'a,
351
24
    {
352
24
        let Some(
ctx_impl0
) = self.inner.clone() else {
  Branch (352:13): [True: 0, False: 15]
  Branch (352:13): [True: 0, False: 3]
  Branch (352:13): [True: 0, False: 0]
  Branch (352:13): [True: 0, False: 0]
  Branch (352:13): [True: 0, False: 6]
  Branch (352:13): [True: 0, False: 0]
  Branch (352:13): [Folded - Ignored]
  Branch (352:13): [Folded - Ignored]
353
24
            return Box::pin(stream);
354
        };
355
0
        let ctx = self.clone();
356
0
        Box::pin(CloseEventStream {
357
0
            inner: stream.then(move |item| {
358
0
                let ctx = ctx.clone();
359
0
                async move {
360
0
                    ctx.emit(|| &item).await;
361
0
                    item
362
0
                }
363
0
            }),
364
0
            ctx_impl,
365
0
        })
366
24
    }
367
}
368
369
#[derive(Clone)]
370
pub struct OriginEventContextImpl {
371
    origin_event_collector: Arc<OriginEventCollector>,
372
    parent_event_id: Uuid,
373
}
374
375
pub trait OriginEventSource<Source>: Sized {
376
    fn as_event(&self) -> Event;
377
378
0
    fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a {
379
0
        let event = self.as_event();
380
0
        ctx.origin_event_collector
381
0
            .publish_origin_event(event, Some(ctx.parent_event_id), None)
382
0
            // We don't need the Uuid here.
383
0
            .map(|_| ())
384
0
    }
385
}
386
387
impl<'a, T, U> OriginEventSource<U> for &'a Response<T>
388
where
389
    T: OriginEventSource<U> + 'a,
390
{
391
    #[inline]
392
0
    fn as_event(&self) -> Event {
393
0
        self.get_ref().as_event()
394
0
    }
395
}
396
397
impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus>
398
where
399
    T: OriginEventSource<U>,
400
    TonicStatus: OriginEventSource<U>,
401
{
402
    #[inline]
403
0
    fn as_event(&self) -> Event {
404
0
        match self {
405
0
            Ok(v) => v.as_event(),
406
0
            Err(v) => v.as_event(),
407
        }
408
0
    }
409
}
410
411
0
fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status {
412
0
    Status {
413
0
        code: tonic_status.code().into(),
414
0
        message: tonic_status.message().into(),
415
0
        details: Vec::new(),
416
0
    }
417
0
}
418
419
macro_rules! get_event_type {
420
    (Request, $variant:ident, $data:expr) => {
421
        event::Event::Request(RequestEvent {
422
            event: Some(request_event::Event::$variant($data)),
423
        })
424
    };
425
    (Response, $variant:ident, $data:expr) => {
426
        event::Event::Response(ResponseEvent {
427
            event: Some(response_event::Event::$variant($data)),
428
        })
429
    };
430
    (Stream, $variant:ident, $data:expr) => {
431
        event::Event::Stream(StreamEvent {
432
            event: Some(stream_event::Event::$variant($data)),
433
        })
434
    };
435
}
436
437
macro_rules! impl_as_event {
438
    (Stream, $origin:ident, $type:ident) => {
439
        impl_as_event! {Stream, $origin, $type, $type, {
440
            #[inline]
441
0
            |val: &$type| val.clone()
442
        }}
443
    };
444
    (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
445
        impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, {
446
            #[inline]
447
0
            |val: &Result<$type, TonicStatus>| match val {
448
0
                Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)),
449
0
                Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)),
450
0
            }
451
        }}
452
    };
453
    ($event_type:ident, $origin:ty, $type:ident) => {
454
        impl_as_event!(__inner, $event_type, $origin, $type, $type, {
455
            #[inline]
456
0
            |val: &$type| get_event_type!($event_type, $type, val.clone())
457
        });
458
    };
459
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => {
460
        impl_as_event!(__inner, $event_type, $origin, $type, $variant, {
461
            #[inline]
462
0
            |val: &$type| get_event_type!($event_type, $variant, val.clone())
463
        });
464
    };
465
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
466
        impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn}
467
    };
468
    (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
469
        impl OriginEventSource<$origin> for $type {
470
            #[inline]
471
0
            fn as_event(&self) -> Event {
472
0
                Event {
473
0
                    event: Some($data_fn(self)),
474
0
                }
475
0
            }
476
        }
477
    };
478
}
479
480
impl<Origin> OriginEventSource<Origin> for TonicStatus {
481
    #[inline]
482
0
    fn as_event(&self) -> Event {
483
0
        Event {
484
0
            event: Some(get_event_type!(
485
0
                Response,
486
0
                Error,
487
0
                tonic_status_to_proto_status(self)
488
0
            )),
489
0
        }
490
0
    }
491
}
492
493
#[inline]
494
0
fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event {
495
0
    get_event_type!(
496
0
        Request,
497
0
        BatchUpdateBlobsRequest,
498
0
        BatchUpdateBlobsRequestOverride {
499
0
            instance_name: val.instance_name.clone(),
500
0
            requests: val
501
0
                .requests
502
0
                .iter()
503
0
                .map(|v| batch_update_blobs_request_override::Request {
504
0
                    digest: v.digest.clone(),
505
0
                    compressor: v.compressor,
506
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
507
0
                })
508
0
                .collect(),
509
0
            digest_function: val.digest_function,
510
0
        }
511
0
    )
512
0
}
513
514
#[inline]
515
0
fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event {
516
0
    get_event_type!(
517
0
        Response,
518
0
        BatchReadBlobsResponse,
519
0
        BatchReadBlobsResponseOverride {
520
0
            responses: val
521
0
                .responses
522
0
                .iter()
523
0
                .map(|v| batch_read_blobs_response_override::Response {
524
0
                    digest: v.digest.clone(),
525
0
                    compressor: v.compressor,
526
0
                    status: v.status.clone(),
527
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
528
0
                })
529
0
                .collect(),
530
0
        }
531
0
    )
532
0
}
533
534
#[inline]
535
0
fn to_empty_response<T>(_: T) -> event::Event {
536
0
    get_event_type!(Response, Empty, ())
537
0
}
538
539
#[inline]
540
0
fn to_empty_write_request<T>(_: T) -> event::Event {
541
0
    get_event_type!(Request, WriteRequest, ())
542
0
}
543
544
// -- Requests --
545
546
impl_as_event! {Request, (), GetCapabilitiesRequest}
547
impl_as_event! {Request, (), GetActionResultRequest}
548
impl_as_event! {Request, (), UpdateActionResultRequest}
549
impl_as_event! {Request, (), FindMissingBlobsRequest}
550
impl_as_event! {Request, (), BatchReadBlobsRequest}
551
impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override}
552
impl_as_event! {Request, (), GetTreeRequest}
553
impl_as_event! {Request, (), ReadRequest}
554
impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request}
555
impl_as_event! {Request, (), QueryWriteStatusRequest}
556
impl_as_event! {Request, (), ExecuteRequest}
557
impl_as_event! {Request, (), WaitExecutionRequest}
558
impl_as_event! {Request, (), StartExecute, SchedulerStartExecute}
559
560
// -- Responses --
561
562
impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities}
563
impl_as_event! {Response, GetActionResultRequest, ActionResult}
564
impl_as_event! {Response, UpdateActionResultRequest, ActionResult}
565
impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse}
566
impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
567
impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse}
568
impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse}
569
impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse}
570
impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override}
571
impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
572
impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
573
impl_as_event! {Response, StartExecute, (), Empty}
574
575
// -- Streams --
576
577
impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, {
578
0
    |val: &ReadResponse| val.data.len() as u64
579
}}
580
impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, {
581
0
    |val: &WriteRequest| WriteRequestOverride {
582
0
        resource_name: val.resource_name.clone(),
583
0
        write_offset: val.write_offset,
584
0
        finish_write: val.finish_write,
585
0
        data_len: u64::try_from(val.data.len()).unwrap_or_default(),
586
0
    }
587
}}
588
impl_as_event! {Stream, GetTreeRequest, GetTreeResponse}
589
impl_as_event! {Stream, ExecuteRequest, Operation}
590
impl_as_event! {Stream, WaitExecutionRequest, Operation}