Coverage Report

Created: 2025-07-10 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/mock_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
17
use async_trait::async_trait;
18
use nativelink_error::{Error, make_input_err};
19
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
20
use nativelink_util::action_messages::{ActionInfo, OperationId};
21
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
22
use nativelink_util::operation_state_manager::{
23
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
24
};
25
use tokio::sync::{Mutex, mpsc};
26
27
#[allow(
28
    clippy::large_enum_variant,
29
    reason = "Testing, so this doesn't really matter. Doesn't trigger on nightly"
30
)]
31
#[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
32
enum ActionSchedulerCalls {
33
    GetGetKnownProperties(String),
34
    AddAction((OperationId, ActionInfo)),
35
    FilterOperations(OperationFilter),
36
}
37
38
#[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
39
enum ActionSchedulerReturns {
40
    GetGetKnownProperties(Result<Vec<String>, Error>),
41
    AddAction(Result<Box<dyn ActionStateResult>, Error>),
42
    FilterOperations(Result<ActionStateResultStream<'static>, Error>),
43
}
44
45
#[derive(MetricsComponent, Debug)]
46
pub struct MockActionScheduler {
47
    rx_call: Mutex<mpsc::UnboundedReceiver<ActionSchedulerCalls>>,
48
    tx_call: mpsc::UnboundedSender<ActionSchedulerCalls>,
49
50
    rx_resp: Mutex<mpsc::UnboundedReceiver<ActionSchedulerReturns>>,
51
    tx_resp: mpsc::UnboundedSender<ActionSchedulerReturns>,
52
}
53
54
impl Default for MockActionScheduler {
55
0
    fn default() -> Self {
56
0
        Self::new()
57
0
    }
58
}
59
60
impl MockActionScheduler {
61
11
    pub fn new() -> Self {
62
11
        let (tx_call, rx_call) = mpsc::unbounded_channel();
63
11
        let (tx_resp, rx_resp) = mpsc::unbounded_channel();
64
11
        Self {
65
11
            rx_call: Mutex::new(rx_call),
66
11
            tx_call,
67
11
            rx_resp: Mutex::new(rx_resp),
68
11
            tx_resp,
69
11
        }
70
11
    }
71
72
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
73
2
    pub async fn expect_get_known_properties(&self, result: Result<Vec<String>, Error>) -> String {
74
2
        let mut rx_call_lock = self.rx_call.lock().await;
75
2
        let ActionSchedulerCalls::GetGetKnownProperties(req) = rx_call_lock
  Branch (75:13): [Folded - Ignored]
  Branch (75:13): [Folded - Ignored]
  Branch (75:13): [True: 2, False: 0]
76
2
            .recv()
77
2
            .await
78
2
            .expect("Could not receive msg in mpsc")
79
        else {
80
0
            panic!("Got incorrect call waiting for get_known_properties")
81
        };
82
2
        self.tx_resp
83
2
            .send(ActionSchedulerReturns::GetGetKnownProperties(result))
84
2
            .map_err(|_| make_input_err!("Could not send request to mpsc"))
85
2
            .unwrap();
86
2
        req
87
2
    }
88
89
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
90
0
    pub async fn expect_add_action(
91
0
        &self,
92
0
        result: Result<Box<dyn ActionStateResult>, Error>,
93
6
    ) -> (OperationId, ActionInfo) {
94
6
        let mut rx_call_lock = self.rx_call.lock().await;
95
6
        let ActionSchedulerCalls::AddAction(req) = rx_call_lock
  Branch (95:13): [Folded - Ignored]
  Branch (95:13): [True: 1, False: 0]
  Branch (95:13): [Folded - Ignored]
  Branch (95:13): [True: 5, False: 0]
96
6
            .recv()
97
6
            .await
98
6
            .expect("Could not receive msg in mpsc")
99
        else {
100
0
            panic!("Got incorrect call waiting for get_known_properties")
101
        };
102
6
        self.tx_resp
103
6
            .send(ActionSchedulerReturns::AddAction(result))
104
6
            .map_err(|_| make_input_err!("Could not send request to mpsc"))
105
6
            .unwrap();
106
6
        req
107
6
    }
108
109
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
110
2
    pub async fn expect_filter_operations(
111
2
        &self,
112
2
        result: Result<ActionStateResultStream<'static>, Error>,
113
2
    ) -> OperationFilter {
114
2
        let mut rx_call_lock = self.rx_call.lock().await;
115
2
        let ActionSchedulerCalls::FilterOperations(req) = rx_call_lock
  Branch (115:13): [Folded - Ignored]
  Branch (115:13): [True: 1, False: 0]
  Branch (115:13): [Folded - Ignored]
  Branch (115:13): [True: 1, False: 0]
116
2
            .recv()
117
2
            .await
118
2
            .expect("Could not receive msg in mpsc")
119
        else {
120
0
            panic!("Got incorrect call waiting for find_by_client_operation_id")
121
        };
122
2
        self.tx_resp
123
2
            .send(ActionSchedulerReturns::FilterOperations(result))
124
2
            .map_err(|_| make_input_err!("Could not send request to mpsc"))
125
2
            .unwrap();
126
2
        req
127
2
    }
128
}
129
130
#[async_trait]
131
impl KnownPlatformPropertyProvider for MockActionScheduler {
132
4
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
133
2
        self.tx_call
134
2
            .send(ActionSchedulerCalls::GetGetKnownProperties(
135
2
                instance_name.to_string(),
136
2
            ))
137
2
            .expect("Could not send request to mpsc");
138
2
        let mut rx_resp_lock = self.rx_resp.lock().await;
139
2
        match rx_resp_lock
140
2
            .recv()
141
2
            .await
142
2
            .expect("Could not receive msg in mpsc")
143
        {
144
2
            ActionSchedulerReturns::GetGetKnownProperties(result) => result,
145
0
            _ => panic!("Expected get_known_properties return value"),
146
        }
147
4
    }
148
}
149
150
#[async_trait]
151
impl ClientStateManager for MockActionScheduler {
152
    async fn add_action(
153
        &self,
154
        client_operation_id: OperationId,
155
        action_info: Arc<ActionInfo>,
156
12
    ) -> Result<Box<dyn ActionStateResult>, Error> {
157
6
        self.tx_call
158
6
            .send(ActionSchedulerCalls::AddAction((
159
6
                client_operation_id,
160
6
                action_info.as_ref().clone(),
161
6
            )))
162
6
            .expect("Could not send request to mpsc");
163
6
        let mut rx_resp_lock = self.rx_resp.lock().await;
164
6
        match rx_resp_lock
165
6
            .recv()
166
6
            .await
167
6
            .expect("Could not receive msg in mpsc")
168
        {
169
6
            ActionSchedulerReturns::AddAction(result) => result,
170
0
            _ => panic!("Expected add_action return value"),
171
        }
172
12
    }
173
174
    async fn filter_operations<'a>(
175
        &'a self,
176
        filter: OperationFilter,
177
4
    ) -> Result<ActionStateResultStream<'a>, Error> {
178
2
        self.tx_call
179
2
            .send(ActionSchedulerCalls::FilterOperations(filter))
180
2
            .expect("Could not send request to mpsc");
181
2
        let mut rx_resp_lock = self.rx_resp.lock().await;
182
2
        match rx_resp_lock
183
2
            .recv()
184
2
            .await
185
2
            .expect("Could not receive msg in mpsc")
186
        {
187
2
            ActionSchedulerReturns::FilterOperations(result) => result,
188
0
            _ => panic!("Expected find_by_client_operation_id return value"),
189
        }
190
4
    }
191
192
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
193
0
        Some(self)
194
0
    }
195
}
196
197
impl RootMetricsComponent for MockActionScheduler {}