Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event_publisher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use bytes::BytesMut;
16
use futures::{FutureExt, future};
17
use nativelink_proto::com::github::trace_machina::nativelink::events::{OriginEvent, OriginEvents};
18
use prost::Message;
19
use tokio::sync::{broadcast, mpsc};
20
use tracing::error;
21
use uuid::Uuid;
22
23
use crate::origin_event::get_node_id;
24
use crate::shutdown_guard::{Priority, ShutdownGuard};
25
use crate::store_trait::{Store, StoreLike};
26
27
/// Publishes origin events to the store.
28
#[derive(Debug)]
29
pub struct OriginEventPublisher {
30
    store: Store,
31
    rx: mpsc::Receiver<OriginEvent>,
32
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
33
}
34
35
impl OriginEventPublisher {
36
0
    pub const fn new(
37
0
        store: Store,
38
0
        rx: mpsc::Receiver<OriginEvent>,
39
0
        shutdown_tx: broadcast::Sender<ShutdownGuard>,
40
0
    ) -> Self {
41
0
        Self {
42
0
            store,
43
0
            rx,
44
0
            shutdown_tx,
45
0
        }
46
0
    }
47
48
    /// Runs the origin event publisher.
49
0
    pub async fn run(mut self) {
50
        const MAX_EVENTS_PER_BATCH: usize = 1024;
51
0
        let mut batch: Vec<OriginEvent> = Vec::with_capacity(MAX_EVENTS_PER_BATCH);
52
0
        let mut shutdown_rx = self.shutdown_tx.subscribe();
53
0
        let shutdown_fut = shutdown_rx.recv().fuse();
54
0
        tokio::pin!(shutdown_fut);
55
0
        let shutdown_guard = future::pending().left_future();
56
0
        tokio::pin!(shutdown_guard);
57
        loop {
58
0
            tokio::select! {
59
                biased;
60
0
                _ = self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH) => {
61
0
                    self.handle_batch(&mut batch).await;
62
                }
63
0
                shutdown_guard_res = &mut shutdown_fut => {
64
0
                    tracing::info!("Received shutdown down in origin event publisher");
65
0
                    let Ok(mut local_shutdown_guard) = shutdown_guard_res else {
  Branch (65:25): [Folded - Ignored]
  Branch (65:25): [Folded - Ignored]
66
0
                        tracing::error!("Received shutdown down in origin event publisher but failed to get shutdown guard");
67
0
                        return;
68
                    };
69
0
                    shutdown_guard.set(async move {
70
0
                        local_shutdown_guard.wait_for(Priority::P0).await;
71
0
                    }
72
0
                    .right_future());
73
                }
74
0
                () = &mut shutdown_guard => {
75
                    // All other services with less priority have completed.
76
                    // We may still need to process any remaining events.
77
0
                    while !self.rx.is_empty() {
  Branch (77:27): [Folded - Ignored]
  Branch (77:27): [Folded - Ignored]
78
0
                        self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH).await;
79
0
                        self.handle_batch(&mut batch).await;
80
                    }
81
0
                    return;
82
                }
83
            }
84
        }
85
0
    }
86
87
0
    async fn handle_batch(&self, batch: &mut Vec<OriginEvent>) {
88
0
        let uuid = Uuid::now_v6(&get_node_id(None));
89
0
        let events = OriginEvents {
90
0
            #[expect(
91
0
                clippy::drain_collect,
92
0
                reason = "Clippy wants us to use use `mem::take`, but this would move all capacity \
93
0
                    as well to the new vector. Since it is much more likely that we will have a \
94
0
                    small number of events in the batch, we prefer to use `drain` and `collect` \
95
0
                    here, so we only need to allocate the exact amount of memory needed and let \
96
0
                    the batch vector's capacity be reused."
97
0
            )]
98
0
            events: batch.drain(..).collect(),
99
0
        };
100
0
        let mut data = BytesMut::new();
101
0
        if let Err(e) = events.encode(&mut data) {
  Branch (101:16): [Folded - Ignored]
  Branch (101:16): [Folded - Ignored]
102
0
            error!("Failed to encode origin events: {}", e);
103
0
            return;
104
0
        }
105
0
        let update_result = self
106
0
            .store
107
0
            .as_store_driver_pin()
108
0
            .update_oneshot(
109
0
                format!("OriginEvents:{}", uuid.hyphenated()).into(),
110
0
                data.freeze(),
111
0
            )
112
0
            .await;
113
0
        if let Err(err) = update_result {
  Branch (113:16): [Folded - Ignored]
  Branch (113:16): [Folded - Ignored]
114
0
            error!("Failed to upload origin events: {}", err);
115
0
        }
116
0
    }
117
}