Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::marker::PhantomData;
16
use std::pin::Pin;
17
use std::sync::{Arc, OnceLock};
18
19
use base64::Engine;
20
use base64::prelude::BASE64_STANDARD_NO_PAD;
21
use futures::future::ready;
22
use futures::task::{Context, Poll};
23
use futures::{Future, FutureExt, Stream, StreamExt};
24
use nativelink_proto::build::bazel::remote::asset::v1::{
25
    FetchBlobRequest, FetchBlobResponse, PushBlobRequest, PushBlobResponse,
26
};
27
use nativelink_proto::build::bazel::remote::execution::v2::{
28
    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest,
29
    BatchUpdateBlobsResponse, ExecuteRequest, FindMissingBlobsRequest, FindMissingBlobsResponse,
30
    GetActionResultRequest, GetCapabilitiesRequest, GetTreeRequest, GetTreeResponse,
31
    RequestMetadata, ServerCapabilities, UpdateActionResultRequest, WaitExecutionRequest,
32
};
33
use nativelink_proto::com::github::trace_machina::nativelink::events::{
34
    BatchReadBlobsResponseOverride, BatchUpdateBlobsRequestOverride, Event, OriginEvent,
35
    RequestEvent, ResponseEvent, StreamEvent, WriteRequestOverride,
36
    batch_read_blobs_response_override, batch_update_blobs_request_override, event, request_event,
37
    response_event, stream_event,
38
};
39
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::StartExecute;
40
use nativelink_proto::google::bytestream::{
41
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
42
    WriteResponse,
43
};
44
use nativelink_proto::google::longrunning::Operation;
45
use nativelink_proto::google::rpc::Status;
46
use pin_project_lite::pin_project;
47
use prost::Message;
48
use rand::RngCore;
49
use serde::{Deserialize, Deserializer, Serialize, Serializer};
50
use tokio::sync::mpsc;
51
use tokio::sync::mpsc::error::TrySendError;
52
use tonic::{Response, Status as TonicStatus, Streaming};
53
use uuid::Uuid;
54
55
use crate::origin_context::ActiveOriginContext;
56
use crate::{background_spawn, unsafe_make_symbol};
57
58
const ORIGIN_EVENT_VERSION: u32 = 0;
59
60
static NODE_ID: OnceLock<[u8; 6]> = OnceLock::new();
61
62
/// Returns a unique ID for the given event.
63
/// This ID is used to identify the event type.
64
/// The max value that could be output is 0x0FFF,
65
/// meaning you may use the first nibble for other
66
/// purposes.
67
#[inline]
68
32
pub const fn get_id_for_event(event: &Event) -> [u8; 2] {
69
31
    match &event.event {
70
1
        None => [0x00, 0x00],
71
14
        Some(event::Event::Request(req)) => match 
req.event13
{
72
1
            None => [0x01, 0x00],
73
1
            Some(request_event::Event::GetCapabilitiesRequest(_)) => [0x01, 0x01],
74
1
            Some(request_event::Event::GetActionResultRequest(_)) => [0x01, 0x02],
75
1
            Some(request_event::Event::UpdateActionResultRequest(_)) => [0x01, 0x03],
76
1
            Some(request_event::Event::FindMissingBlobsRequest(_)) => [0x01, 0x04],
77
1
            Some(request_event::Event::BatchReadBlobsRequest(_)) => [0x01, 0x05],
78
1
            Some(request_event::Event::BatchUpdateBlobsRequest(_)) => [0x01, 0x06],
79
1
            Some(request_event::Event::GetTreeRequest(_)) => [0x01, 0x07],
80
1
            Some(request_event::Event::ReadRequest(_)) => [0x01, 0x08],
81
1
            Some(request_event::Event::WriteRequest(())) => [0x01, 0x09],
82
1
            Some(request_event::Event::QueryWriteStatusRequest(_)) => [0x01, 0x0A],
83
1
            Some(request_event::Event::ExecuteRequest(_)) => [0x01, 0x0B],
84
1
            Some(request_event::Event::WaitExecutionRequest(_)) => [0x01, 0x0C],
85
1
            Some(request_event::Event::SchedulerStartExecute(_)) => [0x01, 0x0D],
86
0
            Some(request_event::Event::FetchBlobRequest(_)) => [0x01, 0x0E],
87
0
            Some(request_event::Event::PushBlobRequest(_)) => [0x01, 0x0F],
88
        },
89
10
        Some(event::Event::Response(res)) => match 
res.event9
{
90
1
            None => [0x02, 0x00],
91
1
            Some(response_event::Event::Error(_)) => [0x02, 0x01],
92
1
            Some(response_event::Event::ServerCapabilities(_)) => [0x02, 0x02],
93
1
            Some(response_event::Event::ActionResult(_)) => [0x02, 0x03],
94
1
            Some(response_event::Event::FindMissingBlobsResponse(_)) => [0x02, 0x04],
95
1
            Some(response_event::Event::BatchReadBlobsResponse(_)) => [0x02, 0x05],
96
1
            Some(response_event::Event::BatchUpdateBlobsResponse(_)) => [0x02, 0x06],
97
1
            Some(response_event::Event::WriteResponse(_)) => [0x02, 0x07],
98
1
            Some(response_event::Event::QueryWriteStatusResponse(_)) => [0x02, 0x08],
99
1
            Some(response_event::Event::Empty(())) => [0x02, 0x09],
100
0
            Some(response_event::Event::FetchBlobResponse(_)) => [0x02, 0x0A],
101
0
            Some(response_event::Event::PushBlobResponse(_)) => [0x02, 0x0B],
102
        },
103
7
        Some(event::Event::Stream(stream)) => match 
stream.event6
{
104
1
            None => [0x03, 0x00],
105
1
            Some(stream_event::Event::Error(_)) => [0x03, 0x01],
106
1
            Some(stream_event::Event::GetTreeResponse(_)) => [0x03, 0x02],
107
1
            Some(stream_event::Event::DataLength(_)) => [0x03, 0x03],
108
1
            Some(stream_event::Event::WriteRequest(_)) => [0x03, 0x04],
109
1
            Some(stream_event::Event::Operation(_)) => [0x03, 0x05],
110
1
            Some(stream_event::Event::Closed(())) => [0x03, 0x06], // Special case when stream has terminated.
111
        },
112
    }
113
32
}
114
115
/// Returns a unique node ID for this process.
116
0
pub fn get_node_id(event: Option<&Event>) -> [u8; 6] {
117
0
    let mut node_id = *NODE_ID.get_or_init(|| {
118
0
        let mut out = [0; 6];
119
0
        rand::rng().fill_bytes(&mut out);
120
0
        out
121
0
    });
122
0
    let Some(event) = event else {
  Branch (122:9): [True: 0, False: 0]
  Branch (122:9): [Folded - Ignored]
123
0
        return node_id;
124
    };
125
0
    let event_id = get_id_for_event(event);
126
0
    node_id[0] = (node_id[0] & 0xF0) | event_id[0];
127
0
    node_id[1] = event_id[1];
128
0
    node_id
129
0
}
130
131
0
fn serialize_request_metadata<S>(
132
0
    value: &Option<RequestMetadata>,
133
0
    serializer: S,
134
0
) -> Result<S::Ok, S::Error>
135
0
where
136
0
    S: Serializer,
137
0
{
138
0
    match value {
139
0
        Some(msg) => serializer.serialize_some(&BASE64_STANDARD_NO_PAD.encode(msg.encode_to_vec())),
140
0
        None => serializer.serialize_none(),
141
    }
142
0
}
143
144
0
fn deserialize_request_metadata<'de, D>(
145
0
    deserializer: D,
146
0
) -> Result<Option<RequestMetadata>, D::Error>
147
0
where
148
0
    D: Deserializer<'de>,
149
0
{
150
0
    let opt = Option::<String>::deserialize(deserializer)?;
151
0
    match opt {
152
0
        Some(s) => {
153
0
            let decoded = BASE64_STANDARD_NO_PAD
154
0
                .decode(s.as_bytes())
155
0
                .map_err(serde::de::Error::custom)?;
156
0
            RequestMetadata::decode(&*decoded)
157
0
                .map_err(serde::de::Error::custom)
158
0
                .map(Some)
159
        }
160
0
        None => Ok(None),
161
    }
162
0
}
163
164
0
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
165
pub struct OriginMetadata {
166
    pub identity: String,
167
    #[serde(
168
        serialize_with = "serialize_request_metadata",
169
        deserialize_with = "deserialize_request_metadata"
170
    )]
171
    pub bazel_metadata: Option<RequestMetadata>,
172
}
173
174
#[derive(Debug)]
175
pub struct OriginEventCollector {
176
    sender: mpsc::Sender<OriginEvent>,
177
    pub metadata: OriginMetadata,
178
}
179
180
impl OriginEventCollector {
181
0
    pub const fn new(sender: mpsc::Sender<OriginEvent>, metadata: OriginMetadata) -> Self {
182
0
        Self { sender, metadata }
183
0
    }
184
185
    #[must_use]
186
0
    pub fn clone_with_metadata(&self, metadata: OriginMetadata) -> Self {
187
0
        Self {
188
0
            sender: self.sender.clone(),
189
0
            metadata,
190
0
        }
191
0
    }
192
193
    /// Publishes an event to the origin event collector.
194
0
    async fn publish_origin_event(
195
0
        &self,
196
0
        event: Event,
197
0
        parent_event_id: Option<Uuid>,
198
0
        maybe_event_id: Option<Uuid>,
199
0
    ) -> Uuid {
200
0
        let event_id = maybe_event_id.unwrap_or_else(|| Uuid::now_v6(&get_node_id(Some(&event))));
201
0
        let parent_event_id =
202
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
203
        // Ignore cases when channel is dropped.
204
0
        drop(
205
0
            self.sender
206
0
                .send(OriginEvent {
207
0
                    version: ORIGIN_EVENT_VERSION,
208
0
                    event_id: event_id.as_hyphenated().to_string(),
209
0
                    parent_event_id,
210
0
                    bazel_request_metadata: self.metadata.bazel_metadata.clone(),
211
0
                    identity: self.metadata.identity.clone(),
212
0
                    event: Some(event),
213
0
                })
214
0
                .await,
215
        );
216
0
        event_id
217
0
    }
218
219
    /// Publishes an event to the origin event collector.
220
    /// If the buffer is full, the event will be sent in a background spawn.
221
    /// This is useful for cases where the event is critical and must be sent,
222
    /// but cannot await the send operation.
223
0
    fn publish_origin_event_now_or_in_spawn(&self, event: Event, parent_event_id: Option<Uuid>) {
224
0
        let event_id = Uuid::now_v6(&get_node_id(Some(&event)));
225
0
        let parent_event_id =
226
0
            parent_event_id.map_or_else(String::new, |id| id.as_hyphenated().to_string());
227
0
        self.sender
228
0
            .try_send(OriginEvent {
229
0
                version: ORIGIN_EVENT_VERSION,
230
0
                event_id: event_id.as_hyphenated().to_string(),
231
0
                parent_event_id,
232
0
                bazel_request_metadata: self.metadata.bazel_metadata.clone(),
233
0
                identity: self.metadata.identity.clone(),
234
0
                event: Some(event),
235
0
            })
236
0
            .map_or_else(
237
0
                |e| match e {
238
0
                    TrySendError::Full(event) => {
239
0
                        let sender = self.sender.clone();
240
0
                        background_spawn!("send_end_stream_origin_event", async move {
241
                            // Ignore cases when channel is dropped.
242
0
                            drop(sender.send(event).await);
243
0
                        });
244
                    }
245
                    // Ignore cases when channel is dropped.
246
0
                    TrySendError::Closed(_) => {}
247
0
                },
248
0
                |()| {},
249
            );
250
0
    }
251
}
252
253
pin_project! {
254
    struct CloseEventStream<S> {
255
        #[pin]
256
        inner: S,
257
        ctx_impl: OriginEventContextImpl,
258
    }
259
260
    impl <S> PinnedDrop for CloseEventStream<S> {
261
        #[inline]
262
        fn drop(this: Pin<&mut Self>) {
263
            let event = Event {
264
                event: Some(event::Event::Stream(StreamEvent {
265
                    event: Some(stream_event::Event::Closed(()))
266
                })),
267
            };
268
            // Try to send the event immediately, if we cannot because
269
            // the buffer is full, do it in a background spawn.
270
            this.ctx_impl.origin_event_collector
271
                .publish_origin_event_now_or_in_spawn(event, Some(this.ctx_impl.parent_event_id));
272
        }
273
    }
274
}
275
276
impl<T, S> Stream for CloseEventStream<S>
277
where
278
    S: Stream<Item = T> + Send,
279
{
280
    type Item = S::Item;
281
282
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
283
0
        let project = self.project();
284
0
        project.inner.poll_next(cx)
285
0
    }
286
287
0
    fn size_hint(&self) -> (usize, Option<usize>) {
288
0
        self.inner.size_hint()
289
0
    }
290
}
291
292
// Safety: There is no other symbol named `ORIGIN_EVENT_COLLECTOR`.
293
unsafe_make_symbol!(ORIGIN_EVENT_COLLECTOR, OriginEventCollector);
294
295
pub struct OriginEventContext<T> {
296
    inner: Option<OriginEventContextImpl>,
297
    _phantom: PhantomData<T>,
298
}
299
300
impl<T> std::fmt::Debug for OriginEventContext<T> {
301
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302
0
        f.debug_struct("OriginEventContext")
303
0
            .field("inner", &self.inner)
304
0
            .field("_phantom", &self._phantom)
305
0
            .finish()
306
0
    }
307
}
308
309
impl<T> Clone for OriginEventContext<T> {
310
0
    fn clone(&self) -> Self {
311
0
        Self {
312
0
            inner: self.inner.clone(),
313
0
            _phantom: PhantomData,
314
0
        }
315
0
    }
316
}
317
318
impl OriginEventContext<()> {
319
68
    pub fn new<'a, T, U>(
320
68
        source_cb: impl Fn() -> &'a T,
321
68
    ) -> impl Future<Output = OriginEventContext<T>> + 'static
322
68
    where
323
68
        T: OriginEventSource<U> + 'static,
324
68
    {
325
0
        let Ok(Some(origin_event_collector)) =
  Branch (325:13): [True: 0, False: 0]
  Branch (325:13): [True: 0, False: 15]
  Branch (325:13): [True: 0, False: 3]
  Branch (325:13): [True: 0, False: 3]
  Branch (325:13): [True: 0, False: 1]
  Branch (325:13): [True: 0, False: 1]
  Branch (325:13): [True: 0, False: 0]
  Branch (325:13): [True: 0, False: 6]
  Branch (325:13): [True: 0, False: 0]
  Branch (325:13): [True: 0, False: 1]
  Branch (325:13): [True: 0, False: 3]
  Branch (325:13): [True: 0, False: 0]
  Branch (325:13): [True: 0, False: 2]
  Branch (325:13): [True: 0, False: 4]
  Branch (325:13): [True: 0, False: 1]
  Branch (325:13): [Folded - Ignored]
  Branch (325:13): [Folded - Ignored]
  Branch (325:13): [True: 0, False: 27]
  Branch (325:13): [True: 0, False: 1]
326
68
            ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
327
        else {
328
68
            return ready(OriginEventContext {
329
68
                inner: None,
330
68
                _phantom: PhantomData,
331
68
            })
332
68
            .left_future();
333
        };
334
0
        let event = source_cb().as_event();
335
0
        async move {
336
0
            let parent_event_id = origin_event_collector
337
0
                .publish_origin_event(event, None, None)
338
0
                .await;
339
0
            OriginEventContext {
340
0
                inner: Some(OriginEventContextImpl {
341
0
                    origin_event_collector,
342
0
                    parent_event_id,
343
0
                }),
344
0
                _phantom: PhantomData,
345
0
            }
346
0
        }
347
0
        .right_future()
348
68
    }
349
}
350
impl<U> OriginEventContext<U> {
351
48
    pub fn emit<'a, T, F>(
352
48
        &self,
353
48
        event_cb: F,
354
48
    ) -> impl Future<Output = ()> + Send + use<'_, 'a, T, F, U>
355
48
    where
356
48
        T: OriginEventSource<U> + 'a,
357
48
        F: Fn() -> &'a T,
358
48
    {
359
48
        let Some(
inner0
) = &self.inner else {
  Branch (359:13): [True: 0, False: 9]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 14]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 3]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 3]
  Branch (359:13): [True: 0, False: 1]
  Branch (359:13): [True: 0, False: 1]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 6]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 1]
  Branch (359:13): [True: 0, False: 3]
  Branch (359:13): [True: 0, False: 0]
  Branch (359:13): [True: 0, False: 2]
  Branch (359:13): [True: 0, False: 4]
  Branch (359:13): [True: 0, False: 1]
  Branch (359:13): [Folded - Ignored]
  Branch (359:13): [Folded - Ignored]
360
48
            return ready(()).left_future();
361
        };
362
0
        let v = (event_cb)();
363
0
        v.publish(inner).right_future()
364
48
    }
365
366
24
    pub fn wrap_stream<'a, O, S>(&self, stream: S) -> Pin<Box<dyn Stream<Item = O> + Send + 'a>>
367
24
    where
368
24
        U: Send + 'a,
369
24
        O: OriginEventSource<U> + Send + 'a,
370
24
        S: Stream<Item = O> + Send + 'a,
371
24
    {
372
24
        let Some(
ctx_impl0
) = self.inner.clone() else {
  Branch (372:13): [True: 0, False: 15]
  Branch (372:13): [True: 0, False: 3]
  Branch (372:13): [True: 0, False: 0]
  Branch (372:13): [True: 0, False: 0]
  Branch (372:13): [True: 0, False: 6]
  Branch (372:13): [True: 0, False: 0]
  Branch (372:13): [Folded - Ignored]
  Branch (372:13): [Folded - Ignored]
373
24
            return Box::pin(stream);
374
        };
375
0
        let ctx = self.clone();
376
0
        Box::pin(CloseEventStream {
377
0
            inner: stream.then(move |item| {
378
0
                let ctx = ctx.clone();
379
0
                async move {
380
0
                    ctx.emit(|| &item).await;
381
0
                    item
382
0
                }
383
0
            }),
384
0
            ctx_impl,
385
        })
386
24
    }
387
}
388
389
#[derive(Debug, Clone)]
390
pub struct OriginEventContextImpl {
391
    origin_event_collector: Arc<OriginEventCollector>,
392
    parent_event_id: Uuid,
393
}
394
395
pub trait OriginEventSource<Source>: Sized {
396
    fn as_event(&self) -> Event;
397
398
0
    fn publish<'a>(&self, ctx: &'a OriginEventContextImpl) -> impl Future<Output = ()> + Send + 'a {
399
0
        let event = self.as_event();
400
0
        ctx.origin_event_collector
401
0
            .publish_origin_event(event, Some(ctx.parent_event_id), None)
402
0
            // We don't need the Uuid here.
403
0
            .map(|_| ())
404
0
    }
405
}
406
407
impl<'a, T, U> OriginEventSource<U> for &'a Response<T>
408
where
409
    T: OriginEventSource<U> + 'a,
410
{
411
    #[inline]
412
0
    fn as_event(&self) -> Event {
413
0
        self.get_ref().as_event()
414
0
    }
415
}
416
417
impl<T, U> OriginEventSource<U> for Result<Response<T>, TonicStatus>
418
where
419
    T: OriginEventSource<U>,
420
    TonicStatus: OriginEventSource<U>,
421
{
422
    #[inline]
423
0
    fn as_event(&self) -> Event {
424
0
        match self {
425
0
            Ok(v) => v.as_event(),
426
0
            Err(v) => v.as_event(),
427
        }
428
0
    }
429
}
430
431
0
fn tonic_status_to_proto_status(tonic_status: &TonicStatus) -> Status {
432
0
    Status {
433
0
        code: tonic_status.code().into(),
434
0
        message: tonic_status.message().into(),
435
0
        details: Vec::new(),
436
0
    }
437
0
}
438
439
macro_rules! get_event_type {
440
    (Request, $variant:ident, $data:expr) => {
441
        event::Event::Request(RequestEvent {
442
            event: Some(request_event::Event::$variant($data)),
443
        })
444
    };
445
    (Response, $variant:ident, $data:expr) => {
446
        event::Event::Response(ResponseEvent {
447
            event: Some(response_event::Event::$variant($data)),
448
        })
449
    };
450
    (Stream, $variant:ident, $data:expr) => {
451
        event::Event::Stream(StreamEvent {
452
            event: Some(stream_event::Event::$variant($data)),
453
        })
454
    };
455
}
456
457
macro_rules! impl_as_event {
458
    (Stream, $origin:ident, $type:ident) => {
459
        impl_as_event! {Stream, $origin, $type, $type, {
460
            #[inline]
461
0
            |val: &$type| val.clone()
462
        }}
463
    };
464
    (Stream, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
465
        impl_as_event! {__inner, Stream, $origin, Result<$type, TonicStatus>, $variant, {
466
            #[inline]
467
0
            |val: &Result<$type, TonicStatus>| match val {
468
0
                Ok(v) => get_event_type!(Stream, $variant, $data_fn(v)),
469
0
                Err(e) => get_event_type!(Stream, Error, tonic_status_to_proto_status(e)),
470
0
            }
471
        }}
472
    };
473
    ($event_type:ident, $origin:ty, $type:ident) => {
474
        impl_as_event!(__inner, $event_type, $origin, $type, $type, {
475
            #[inline]
476
0
            |val: &$type| get_event_type!($event_type, $type, val.clone())
477
        });
478
    };
479
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident) => {
480
        impl_as_event!(__inner, $event_type, $origin, $type, $variant, {
481
            #[inline]
482
0
            |val: &$type| get_event_type!($event_type, $variant, val.clone())
483
        });
484
    };
485
    ($event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
486
        impl_as_event! {__inner, $event_type, $origin, $type, $variant, $data_fn}
487
    };
488
    (__inner, $event_type:ident, $origin:ty, $type:ty, $variant:ident, $data_fn:tt) => {
489
        impl OriginEventSource<$origin> for $type {
490
            #[inline]
491
0
            fn as_event(&self) -> Event {
492
0
                Event {
493
0
                    event: Some($data_fn(self)),
494
0
                }
495
0
            }
496
        }
497
    };
498
}
499
500
impl<Origin> OriginEventSource<Origin> for TonicStatus {
501
    #[inline]
502
0
    fn as_event(&self) -> Event {
503
0
        Event {
504
0
            event: Some(get_event_type!(
505
0
                Response,
506
0
                Error,
507
0
                tonic_status_to_proto_status(self)
508
0
            )),
509
0
        }
510
0
    }
511
}
512
513
#[inline]
514
0
fn to_batch_update_blobs_request_override(val: &BatchUpdateBlobsRequest) -> event::Event {
515
0
    get_event_type!(
516
0
        Request,
517
0
        BatchUpdateBlobsRequest,
518
0
        BatchUpdateBlobsRequestOverride {
519
0
            instance_name: val.instance_name.clone(),
520
0
            requests: val
521
0
                .requests
522
0
                .iter()
523
0
                .map(|v| batch_update_blobs_request_override::Request {
524
0
                    digest: v.digest.clone(),
525
0
                    compressor: v.compressor,
526
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
527
0
                })
528
0
                .collect(),
529
0
            digest_function: val.digest_function,
530
0
        }
531
0
    )
532
0
}
533
534
#[inline]
535
0
fn to_batch_read_blobs_response_override(val: &BatchReadBlobsResponse) -> event::Event {
536
0
    get_event_type!(
537
0
        Response,
538
0
        BatchReadBlobsResponse,
539
0
        BatchReadBlobsResponseOverride {
540
0
            responses: val
541
0
                .responses
542
0
                .iter()
543
0
                .map(|v| batch_read_blobs_response_override::Response {
544
0
                    digest: v.digest.clone(),
545
0
                    compressor: v.compressor,
546
0
                    status: v.status.clone(),
547
0
                    data_len: u64::try_from(v.data.len()).unwrap_or_default(),
548
0
                })
549
0
                .collect(),
550
0
        }
551
0
    )
552
0
}
553
554
#[inline]
555
0
fn to_empty_response<T>(_: T) -> event::Event {
556
0
    get_event_type!(Response, Empty, ())
557
0
}
558
559
#[inline]
560
0
fn to_empty_write_request<T>(_: T) -> event::Event {
561
0
    get_event_type!(Request, WriteRequest, ())
562
0
}
563
564
// -- Requests --
565
566
impl_as_event! {Request, (), GetCapabilitiesRequest}
567
impl_as_event! {Request, (), GetActionResultRequest}
568
impl_as_event! {Request, (), UpdateActionResultRequest}
569
impl_as_event! {Request, (), FindMissingBlobsRequest}
570
impl_as_event! {Request, (), BatchReadBlobsRequest}
571
impl_as_event! {Request, (), BatchUpdateBlobsRequest, BatchUpdateBlobsRequest, to_batch_update_blobs_request_override}
572
impl_as_event! {Request, (), FetchBlobRequest}
573
impl_as_event! {Request, (), PushBlobRequest}
574
impl_as_event! {Request, (), GetTreeRequest}
575
impl_as_event! {Request, (), ReadRequest}
576
impl_as_event! {Request, (), Streaming<WriteRequest>, WriteRequest, to_empty_write_request}
577
impl_as_event! {Request, (), QueryWriteStatusRequest}
578
impl_as_event! {Request, (), ExecuteRequest}
579
impl_as_event! {Request, (), WaitExecutionRequest}
580
impl_as_event! {Request, (), StartExecute, SchedulerStartExecute}
581
582
// -- Responses --
583
584
impl_as_event! {Response, GetCapabilitiesRequest, ServerCapabilities}
585
impl_as_event! {Response, GetActionResultRequest, ActionResult}
586
impl_as_event! {Response, UpdateActionResultRequest, ActionResult}
587
impl_as_event! {Response, Streaming<WriteRequest>, WriteResponse}
588
impl_as_event! {Response, ReadRequest, Pin<Box<dyn Stream<Item = Result<ReadResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
589
impl_as_event! {Response, QueryWriteStatusRequest, QueryWriteStatusResponse}
590
impl_as_event! {Response, FindMissingBlobsRequest, FindMissingBlobsResponse}
591
impl_as_event! {Response, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse}
592
impl_as_event! {Response, FetchBlobRequest, FetchBlobResponse}
593
impl_as_event! {Response, PushBlobRequest, PushBlobResponse}
594
impl_as_event! {Response, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchReadBlobsResponseOverride, to_batch_read_blobs_response_override}
595
impl_as_event! {Response, GetTreeRequest, Pin<Box<dyn Stream<Item = Result<GetTreeResponse, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
596
impl_as_event! {Response, ExecuteRequest, Pin<Box<dyn Stream<Item = Result<Operation, TonicStatus>> + Send + '_>>, Empty, to_empty_response}
597
impl_as_event! {Response, StartExecute, (), Empty}
598
599
// -- Streams --
600
601
impl_as_event! {Stream, ReadRequest, ReadResponse, DataLength, {
602
0
    |val: &ReadResponse| val.data.len() as u64
603
}}
604
impl_as_event! {Stream, Streaming<WriteRequest>, WriteRequest, WriteRequest, {
605
0
    |val: &WriteRequest| WriteRequestOverride {
606
0
        resource_name: val.resource_name.clone(),
607
0
        write_offset: val.write_offset,
608
0
        finish_write: val.finish_write,
609
0
        data_len: u64::try_from(val.data.len()).unwrap_or_default(),
610
0
    }
611
}}
612
impl_as_event! {Stream, GetTreeRequest, GetTreeResponse}
613
impl_as_event! {Stream, ExecuteRequest, Operation}
614
impl_as_event! {Stream, WaitExecutionRequest, Operation}