/build/source/nativelink-scheduler/src/mock_scheduler.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::Arc; |
16 | | |
17 | | use async_trait::async_trait; |
18 | | use nativelink_error::{Error, make_input_err}; |
19 | | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; |
20 | | use nativelink_util::action_messages::{ActionInfo, OperationId}; |
21 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
22 | | use nativelink_util::operation_state_manager::{ |
23 | | ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, |
24 | | }; |
25 | | use tokio::sync::{Mutex, mpsc}; |
26 | | |
27 | | #[allow( |
28 | | clippy::large_enum_variant, |
29 | | reason = "Testing, so this doesn't really matter. Doesn't trigger on nightly" |
30 | | )] |
31 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
32 | | enum ActionSchedulerCalls { |
33 | | GetGetKnownProperties(String), |
34 | | AddAction((OperationId, ActionInfo)), |
35 | | FilterOperations(OperationFilter), |
36 | | } |
37 | | |
38 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
39 | | enum ActionSchedulerReturns { |
40 | | GetGetKnownProperties(Result<Vec<String>, Error>), |
41 | | AddAction(Result<Box<dyn ActionStateResult>, Error>), |
42 | | FilterOperations(Result<ActionStateResultStream<'static>, Error>), |
43 | | } |
44 | | |
45 | | #[derive(MetricsComponent, Debug)] |
46 | | pub struct MockActionScheduler { |
47 | | rx_call: Mutex<mpsc::UnboundedReceiver<ActionSchedulerCalls>>, |
48 | | tx_call: mpsc::UnboundedSender<ActionSchedulerCalls>, |
49 | | |
50 | | rx_resp: Mutex<mpsc::UnboundedReceiver<ActionSchedulerReturns>>, |
51 | | tx_resp: mpsc::UnboundedSender<ActionSchedulerReturns>, |
52 | | } |
53 | | |
54 | | impl Default for MockActionScheduler { |
55 | 0 | fn default() -> Self { |
56 | 0 | Self::new() |
57 | 0 | } |
58 | | } |
59 | | |
60 | | impl MockActionScheduler { |
61 | 11 | pub fn new() -> Self { |
62 | 11 | let (tx_call, rx_call) = mpsc::unbounded_channel(); |
63 | 11 | let (tx_resp, rx_resp) = mpsc::unbounded_channel(); |
64 | 11 | Self { |
65 | 11 | rx_call: Mutex::new(rx_call), |
66 | 11 | tx_call, |
67 | 11 | rx_resp: Mutex::new(rx_resp), |
68 | 11 | tx_resp, |
69 | 11 | } |
70 | 11 | } |
71 | | |
72 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
73 | 2 | pub async fn expect_get_known_properties(&self, result: Result<Vec<String>, Error>) -> String { |
74 | 2 | let mut rx_call_lock = self.rx_call.lock().await; |
75 | 2 | let ActionSchedulerCalls::GetGetKnownProperties(req) = rx_call_lock Branch (75:13): [Folded - Ignored]
Branch (75:13): [Folded - Ignored]
Branch (75:13): [True: 2, False: 0]
|
76 | 2 | .recv() |
77 | 2 | .await |
78 | 2 | .expect("Could not receive msg in mpsc") |
79 | | else { |
80 | 0 | panic!("Got incorrect call waiting for get_known_properties") |
81 | | }; |
82 | 2 | self.tx_resp |
83 | 2 | .send(ActionSchedulerReturns::GetGetKnownProperties(result)) |
84 | 2 | .map_err(|_| make_input_err!("Could not send request to mpsc")) |
85 | 2 | .unwrap(); |
86 | 2 | req |
87 | 2 | } |
88 | | |
89 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
90 | 0 | pub async fn expect_add_action( |
91 | 0 | &self, |
92 | 0 | result: Result<Box<dyn ActionStateResult>, Error>, |
93 | 6 | ) -> (OperationId, ActionInfo) { |
94 | 6 | let mut rx_call_lock = self.rx_call.lock().await; |
95 | 6 | let ActionSchedulerCalls::AddAction(req) = rx_call_lock Branch (95:13): [Folded - Ignored]
Branch (95:13): [True: 1, False: 0]
Branch (95:13): [Folded - Ignored]
Branch (95:13): [True: 5, False: 0]
|
96 | 6 | .recv() |
97 | 6 | .await |
98 | 6 | .expect("Could not receive msg in mpsc") |
99 | | else { |
100 | 0 | panic!("Got incorrect call waiting for get_known_properties") |
101 | | }; |
102 | 6 | self.tx_resp |
103 | 6 | .send(ActionSchedulerReturns::AddAction(result)) |
104 | 6 | .map_err(|_| make_input_err!("Could not send request to mpsc")) |
105 | 6 | .unwrap(); |
106 | 6 | req |
107 | 6 | } |
108 | | |
109 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
110 | 2 | pub async fn expect_filter_operations( |
111 | 2 | &self, |
112 | 2 | result: Result<ActionStateResultStream<'static>, Error>, |
113 | 2 | ) -> OperationFilter { |
114 | 2 | let mut rx_call_lock = self.rx_call.lock().await; |
115 | 2 | let ActionSchedulerCalls::FilterOperations(req) = rx_call_lock Branch (115:13): [Folded - Ignored]
Branch (115:13): [True: 1, False: 0]
Branch (115:13): [Folded - Ignored]
Branch (115:13): [True: 1, False: 0]
|
116 | 2 | .recv() |
117 | 2 | .await |
118 | 2 | .expect("Could not receive msg in mpsc") |
119 | | else { |
120 | 0 | panic!("Got incorrect call waiting for find_by_client_operation_id") |
121 | | }; |
122 | 2 | self.tx_resp |
123 | 2 | .send(ActionSchedulerReturns::FilterOperations(result)) |
124 | 2 | .map_err(|_| make_input_err!("Could not send request to mpsc")) |
125 | 2 | .unwrap(); |
126 | 2 | req |
127 | 2 | } |
128 | | } |
129 | | |
130 | | #[async_trait] |
131 | | impl KnownPlatformPropertyProvider for MockActionScheduler { |
132 | 4 | async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> { |
133 | 2 | self.tx_call |
134 | 2 | .send(ActionSchedulerCalls::GetGetKnownProperties( |
135 | 2 | instance_name.to_string(), |
136 | 2 | )) |
137 | 2 | .expect("Could not send request to mpsc"); |
138 | 2 | let mut rx_resp_lock = self.rx_resp.lock().await; |
139 | 2 | match rx_resp_lock |
140 | 2 | .recv() |
141 | 2 | .await |
142 | 2 | .expect("Could not receive msg in mpsc") |
143 | | { |
144 | 2 | ActionSchedulerReturns::GetGetKnownProperties(result) => result, |
145 | 0 | _ => panic!("Expected get_known_properties return value"), |
146 | | } |
147 | 4 | } |
148 | | } |
149 | | |
150 | | #[async_trait] |
151 | | impl ClientStateManager for MockActionScheduler { |
152 | | async fn add_action( |
153 | | &self, |
154 | | client_operation_id: OperationId, |
155 | | action_info: Arc<ActionInfo>, |
156 | 12 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
157 | 6 | self.tx_call |
158 | 6 | .send(ActionSchedulerCalls::AddAction(( |
159 | 6 | client_operation_id, |
160 | 6 | action_info.as_ref().clone(), |
161 | 6 | ))) |
162 | 6 | .expect("Could not send request to mpsc"); |
163 | 6 | let mut rx_resp_lock = self.rx_resp.lock().await; |
164 | 6 | match rx_resp_lock |
165 | 6 | .recv() |
166 | 6 | .await |
167 | 6 | .expect("Could not receive msg in mpsc") |
168 | | { |
169 | 6 | ActionSchedulerReturns::AddAction(result) => result, |
170 | 0 | _ => panic!("Expected add_action return value"), |
171 | | } |
172 | 12 | } |
173 | | |
174 | | async fn filter_operations<'a>( |
175 | | &'a self, |
176 | | filter: OperationFilter, |
177 | 4 | ) -> Result<ActionStateResultStream<'a>, Error> { |
178 | 2 | self.tx_call |
179 | 2 | .send(ActionSchedulerCalls::FilterOperations(filter)) |
180 | 2 | .expect("Could not send request to mpsc"); |
181 | 2 | let mut rx_resp_lock = self.rx_resp.lock().await; |
182 | 2 | match rx_resp_lock |
183 | 2 | .recv() |
184 | 2 | .await |
185 | 2 | .expect("Could not receive msg in mpsc") |
186 | | { |
187 | 2 | ActionSchedulerReturns::FilterOperations(result) => result, |
188 | 0 | _ => panic!("Expected find_by_client_operation_id return value"), |
189 | | } |
190 | 4 | } |
191 | | |
192 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
193 | 0 | Some(self) |
194 | 0 | } |
195 | | } |
196 | | |
197 | | impl RootMetricsComponent for MockActionScheduler {} |