Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/shutdown_guard.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use tokio::sync::watch;
18
19
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
20
pub enum Priority {
21
    // The least important priority.
22
    LeastImportant = 2,
23
    // All priorities greater than 1 must be complted.
24
    P1 = 1,
25
    // All other priorities must be completed.
26
    P0 = 0,
27
}
28
29
impl From<usize> for Priority {
30
0
    fn from(value: usize) -> Self {
31
0
        match value {
32
0
            0 => Priority::P0,
33
0
            1 => Priority::P1,
34
0
            _ => Priority::LeastImportant,
35
        }
36
0
    }
37
}
38
39
impl From<Priority> for usize {
40
0
    fn from(priority: Priority) -> usize {
41
0
        match priority {
42
0
            Priority::P0 => 0,
43
0
            Priority::P1 => 1,
44
0
            Priority::LeastImportant => 2,
45
        }
46
0
    }
47
}
48
49
/// Tracks other services that have registered to be notified when
50
/// the process is being shut down.
51
#[derive(Debug)]
52
pub struct ShutdownGuard {
53
    priority: Priority,
54
    tx: watch::Sender<HashMap<Priority, usize>>,
55
    rx: watch::Receiver<HashMap<Priority, usize>>,
56
}
57
58
impl ShutdownGuard {
59
    /// Waits for all priorities less important than the given
60
    /// priority to be completed.
61
0
    pub async fn wait_for(&mut self, priority: Priority) {
62
0
        if priority != self.priority {
  Branch (62:12): [Folded - Ignored]
  Branch (62:12): [Folded - Ignored]
63
            // Promote our priority to the new priority.
64
0
            self.tx.send_modify(|map| {
65
0
                let old_count = map.remove(&self.priority).unwrap_or(0).saturating_sub(1);
66
0
                map.insert(self.priority, old_count);
67
0
68
0
                self.priority = priority;
69
0
70
0
                let new_count = map.get(&priority).unwrap_or(&0).saturating_add(1);
71
0
                map.insert(priority, new_count);
72
0
            });
73
0
        }
74
        // Ignore error because the receiver will never be closed
75
        // if the sender is still alive here.
76
0
        drop(
77
0
            self.rx
78
0
                .wait_for(|map| {
79
0
                    let start = usize::from(priority) + 1;
80
0
                    let end = usize::from(Priority::LeastImportant);
81
0
                    for p in start..=end {
82
0
                        if *map.get(&p.into()).unwrap_or(&0) > 0 {
  Branch (82:28): [Folded - Ignored]
  Branch (82:28): [Folded - Ignored]
83
0
                            return false;
84
0
                        }
85
                    }
86
0
                    true
87
0
                })
88
0
                .await,
89
        );
90
0
    }
91
}
92
93
impl Default for ShutdownGuard {
94
0
    fn default() -> Self {
95
0
        let priority = Priority::LeastImportant;
96
0
        let mut map = HashMap::new();
97
0
        map.insert(priority, 0);
98
0
        let (tx, rx) = watch::channel(map);
99
0
        Self { priority, tx, rx }
100
0
    }
101
}
102
103
impl Clone for ShutdownGuard {
104
0
    fn clone(&self) -> Self {
105
0
        self.tx.send_modify(|map| {
106
0
            map.insert(
107
0
                self.priority,
108
0
                map.get(&Priority::LeastImportant)
109
0
                    .unwrap_or(&0)
110
0
                    .saturating_add(1),
111
0
            );
112
0
        });
113
0
        Self {
114
0
            priority: Priority::LeastImportant,
115
0
            tx: self.tx.clone(),
116
0
            rx: self.rx.clone(),
117
0
        }
118
0
    }
119
}
120
121
impl Drop for ShutdownGuard {
122
0
    fn drop(&mut self) {
123
0
        self.tx.send_modify(|map| {
124
0
            map.insert(
125
0
                self.priority,
126
0
                map.get(&self.priority).unwrap_or(&0).saturating_sub(1),
127
0
            );
128
0
        });
129
0
    }
130
}