Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/origin_event_publisher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use bytes::BytesMut;
16
use futures::{FutureExt, future};
17
use nativelink_proto::com::github::trace_machina::nativelink::events::{OriginEvent, OriginEvents};
18
use prost::Message;
19
use tokio::sync::{broadcast, mpsc};
20
use tracing::error;
21
use uuid::{Timestamp, Uuid};
22
23
use crate::shutdown_guard::{Priority, ShutdownGuard};
24
use crate::store_trait::{Store, StoreLike};
25
26
/// Publishes origin events to the store.
27
#[derive(Debug)]
28
pub struct OriginEventPublisher {
29
    store: Store,
30
    rx: mpsc::Receiver<OriginEvent>,
31
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
32
    node_id: [u8; 6],
33
}
34
35
impl OriginEventPublisher {
36
0
    pub fn new(
37
0
        store: Store,
38
0
        rx: mpsc::Receiver<OriginEvent>,
39
0
        shutdown_tx: broadcast::Sender<ShutdownGuard>,
40
0
    ) -> Self {
41
        // Generate a random node_id for this instance
42
        use rand::Rng;
43
0
        let mut rng = rand::rng();
44
0
        let mut node_id = [0u8; 6];
45
0
        rng.fill(&mut node_id);
46
47
0
        Self {
48
0
            store,
49
0
            rx,
50
0
            shutdown_tx,
51
0
            node_id,
52
0
        }
53
0
    }
54
55
    /// Runs the origin event publisher.
56
0
    pub async fn run(mut self) {
57
        const MAX_EVENTS_PER_BATCH: usize = 1024;
58
0
        let mut batch: Vec<OriginEvent> = Vec::with_capacity(MAX_EVENTS_PER_BATCH);
59
0
        let mut shutdown_rx = self.shutdown_tx.subscribe();
60
0
        let shutdown_fut = shutdown_rx.recv().fuse();
61
0
        tokio::pin!(shutdown_fut);
62
0
        let shutdown_guard = future::pending().left_future();
63
0
        tokio::pin!(shutdown_guard);
64
        loop {
65
0
            tokio::select! {
66
                biased;
67
0
                _ = self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH) => {
68
0
                    self.handle_batch(&mut batch).await;
69
                }
70
0
                shutdown_guard_res = &mut shutdown_fut => {
71
0
                    tracing::info!("Received shutdown down in origin event publisher");
72
0
                    let Ok(mut local_shutdown_guard) = shutdown_guard_res else {
  Branch (72:25): [Folded - Ignored]
  Branch (72:25): [Folded - Ignored]
73
0
                        tracing::error!("Received shutdown down in origin event publisher but failed to get shutdown guard");
74
0
                        return;
75
                    };
76
0
                    shutdown_guard.set(async move {
77
0
                        local_shutdown_guard.wait_for(Priority::P0).await;
78
0
                    }
79
0
                    .right_future());
80
                }
81
0
                () = &mut shutdown_guard => {
82
                    // All other services with less priority have completed.
83
                    // We may still need to process any remaining events.
84
0
                    while !self.rx.is_empty() {
  Branch (84:27): [Folded - Ignored]
  Branch (84:27): [Folded - Ignored]
85
0
                        self.rx.recv_many(&mut batch, MAX_EVENTS_PER_BATCH).await;
86
0
                        self.handle_batch(&mut batch).await;
87
                    }
88
0
                    return;
89
                }
90
            }
91
        }
92
0
    }
93
94
0
    async fn handle_batch(&self, batch: &mut Vec<OriginEvent>) {
95
        // UUID v6 requires a timestamp and node ID
96
        // Create timestamp from current system time with nanosecond precision
97
0
        let now = std::time::SystemTime::now()
98
0
            .duration_since(std::time::UNIX_EPOCH)
99
0
            .unwrap();
100
0
        let ts = Timestamp::from_unix(
101
0
            uuid::timestamp::context::NoContext,
102
0
            now.as_secs(),
103
0
            now.subsec_nanos(),
104
        );
105
0
        let uuid = Uuid::new_v6(ts, &self.node_id);
106
0
        let events = OriginEvents {
107
0
            #[expect(
108
0
                clippy::drain_collect,
109
0
                reason = "Clippy wants us to use use `mem::take`, but this would move all capacity \
110
0
                    as well to the new vector. Since it is much more likely that we will have a \
111
0
                    small number of events in the batch, we prefer to use `drain` and `collect` \
112
0
                    here, so we only need to allocate the exact amount of memory needed and let \
113
0
                    the batch vector's capacity be reused."
114
0
            )]
115
0
            events: batch.drain(..).collect(),
116
0
        };
117
0
        let mut data = BytesMut::new();
118
0
        if let Err(e) = events.encode(&mut data) {
  Branch (118:16): [Folded - Ignored]
  Branch (118:16): [Folded - Ignored]
119
0
            error!("Failed to encode origin events: {}", e);
120
0
            return;
121
0
        }
122
0
        let update_result = self
123
0
            .store
124
0
            .as_store_driver_pin()
125
0
            .update_oneshot(
126
0
                format!("OriginEvents:{}", uuid.hyphenated()).into(),
127
0
                data.freeze(),
128
0
            )
129
0
            .await;
130
0
        if let Err(err) = update_result {
  Branch (130:16): [Folded - Ignored]
  Branch (130:16): [Folded - Ignored]
131
0
            error!("Failed to upload origin events: {}", err);
132
0
        }
133
0
    }
134
}