/build/source/nativelink-scheduler/src/mock_scheduler.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::Arc; |
16 | | |
17 | | use async_trait::async_trait; |
18 | | use nativelink_error::Error; |
19 | | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; |
20 | | use nativelink_util::action_messages::{ActionInfo, OperationId}; |
21 | | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; |
22 | | use nativelink_util::operation_state_manager::{ |
23 | | ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, |
24 | | }; |
25 | | use tokio::sync::{Mutex, mpsc}; |
26 | | use tonic::Code; |
27 | | |
28 | | #[allow( |
29 | | clippy::large_enum_variant, |
30 | | reason = "Testing, so this doesn't really matter. Doesn't trigger on nightly" |
31 | | )] |
32 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
33 | | enum ActionSchedulerCalls { |
34 | | GetGetKnownProperties(String), |
35 | | AddAction((OperationId, ActionInfo)), |
36 | | FilterOperations(OperationFilter), |
37 | | } |
38 | | |
39 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
40 | | enum ActionSchedulerReturns { |
41 | | GetGetKnownProperties(Result<Vec<String>, Error>), |
42 | | AddAction(Result<Box<dyn ActionStateResult>, Error>), |
43 | | FilterOperations(Result<ActionStateResultStream<'static>, Error>), |
44 | | } |
45 | | |
46 | | #[derive(MetricsComponent, Debug)] |
47 | | pub struct MockActionScheduler { |
48 | | rx_call: Mutex<mpsc::UnboundedReceiver<ActionSchedulerCalls>>, |
49 | | tx_call: mpsc::UnboundedSender<ActionSchedulerCalls>, |
50 | | |
51 | | rx_resp: Mutex<mpsc::UnboundedReceiver<ActionSchedulerReturns>>, |
52 | | tx_resp: mpsc::UnboundedSender<ActionSchedulerReturns>, |
53 | | } |
54 | | |
55 | | impl Default for MockActionScheduler { |
56 | 0 | fn default() -> Self { |
57 | 0 | Self::new() |
58 | 0 | } |
59 | | } |
60 | | |
61 | | impl MockActionScheduler { |
62 | 22 | pub fn new() -> Self { |
63 | 22 | let (tx_call, rx_call) = mpsc::unbounded_channel(); |
64 | 22 | let (tx_resp, rx_resp) = mpsc::unbounded_channel(); |
65 | 22 | Self { |
66 | 22 | rx_call: Mutex::new(rx_call), |
67 | 22 | tx_call, |
68 | 22 | rx_resp: Mutex::new(rx_resp), |
69 | 22 | tx_resp, |
70 | 22 | } |
71 | 22 | } |
72 | | |
73 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
74 | 3 | pub async fn expect_get_known_properties(&self, result: Result<Vec<String>, Error>) -> String { |
75 | 3 | let mut rx_call_lock = self.rx_call.lock().await; |
76 | 3 | let ActionSchedulerCalls::GetGetKnownProperties(req) = rx_call_lock |
77 | 3 | .recv() |
78 | 3 | .await |
79 | 3 | .expect("Could not receive msg in mpsc") |
80 | | else { |
81 | 0 | panic!("Got incorrect call waiting for get_known_properties") |
82 | | }; |
83 | 3 | self.tx_resp |
84 | 3 | .send(ActionSchedulerReturns::GetGetKnownProperties(result)) |
85 | 3 | .map_err(|err| {0 |
86 | 0 | Error::from_std_err(Code::InvalidArgument, &err) |
87 | 0 | .append("Could not send request to mpsc") |
88 | 0 | }) |
89 | 3 | .unwrap(); |
90 | 3 | req |
91 | 3 | } |
92 | | |
93 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
94 | 9 | pub async fn expect_add_action( |
95 | 9 | &self, |
96 | 9 | result: Result<Box<dyn ActionStateResult>, Error>, |
97 | 9 | ) -> (OperationId, ActionInfo) { |
98 | 9 | let mut rx_call_lock = self.rx_call.lock().await; |
99 | 9 | let ActionSchedulerCalls::AddAction(req) = rx_call_lock |
100 | 9 | .recv() |
101 | 9 | .await |
102 | 9 | .expect("Could not receive msg in mpsc") |
103 | | else { |
104 | 0 | panic!("Got incorrect call waiting for get_known_properties") |
105 | | }; |
106 | 9 | self.tx_resp |
107 | 9 | .send(ActionSchedulerReturns::AddAction(result)) |
108 | 9 | .map_err(|err| {0 |
109 | 0 | Error::from_std_err(Code::InvalidArgument, &err) |
110 | 0 | .append("Could not send request to mpsc") |
111 | 0 | }) |
112 | 9 | .unwrap(); |
113 | 9 | req |
114 | 9 | } |
115 | | |
116 | | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")] |
117 | 6 | pub async fn expect_filter_operations( |
118 | 6 | &self, |
119 | 6 | result: Result<ActionStateResultStream<'static>, Error>, |
120 | 6 | ) -> OperationFilter { |
121 | 6 | let mut rx_call_lock = self.rx_call.lock().await; |
122 | 6 | let ActionSchedulerCalls::FilterOperations(req) = rx_call_lock |
123 | 6 | .recv() |
124 | 6 | .await |
125 | 6 | .expect("Could not receive msg in mpsc") |
126 | | else { |
127 | 0 | panic!("Got incorrect call waiting for find_by_client_operation_id") |
128 | | }; |
129 | 6 | self.tx_resp |
130 | 6 | .send(ActionSchedulerReturns::FilterOperations(result)) |
131 | 6 | .map_err(|err| {0 |
132 | 0 | Error::from_std_err(Code::InvalidArgument, &err) |
133 | 0 | .append("Could not send request to mpsc") |
134 | 0 | }) |
135 | 6 | .unwrap(); |
136 | 6 | req |
137 | 6 | } |
138 | | } |
139 | | |
140 | | #[async_trait] |
141 | | impl KnownPlatformPropertyProvider for MockActionScheduler { |
142 | 3 | async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> { |
143 | | self.tx_call |
144 | | .send(ActionSchedulerCalls::GetGetKnownProperties( |
145 | | instance_name.to_string(), |
146 | | )) |
147 | | .expect("Could not send request to mpsc"); |
148 | | let mut rx_resp_lock = self.rx_resp.lock().await; |
149 | | match rx_resp_lock |
150 | | .recv() |
151 | | .await |
152 | | .expect("Could not receive msg in mpsc") |
153 | | { |
154 | | ActionSchedulerReturns::GetGetKnownProperties(result) => result, |
155 | | _ => panic!("Expected get_known_properties return value"), |
156 | | } |
157 | 3 | } |
158 | | } |
159 | | |
160 | | #[async_trait] |
161 | | impl ClientStateManager for MockActionScheduler { |
162 | | async fn add_action( |
163 | | &self, |
164 | | client_operation_id: OperationId, |
165 | | action_info: Arc<ActionInfo>, |
166 | 9 | ) -> Result<Box<dyn ActionStateResult>, Error> { |
167 | | self.tx_call |
168 | | .send(ActionSchedulerCalls::AddAction(( |
169 | | client_operation_id, |
170 | | action_info.as_ref().clone(), |
171 | | ))) |
172 | | .expect("Could not send request to mpsc"); |
173 | | let mut rx_resp_lock = self.rx_resp.lock().await; |
174 | | match rx_resp_lock |
175 | | .recv() |
176 | | .await |
177 | | .expect("Could not receive msg in mpsc") |
178 | | { |
179 | | ActionSchedulerReturns::AddAction(result) => result, |
180 | | _ => panic!("Expected add_action return value"), |
181 | | } |
182 | 9 | } |
183 | | |
184 | | async fn filter_operations<'a>( |
185 | | &'a self, |
186 | | filter: OperationFilter, |
187 | 6 | ) -> Result<ActionStateResultStream<'a>, Error> { |
188 | | self.tx_call |
189 | | .send(ActionSchedulerCalls::FilterOperations(filter)) |
190 | | .expect("Could not send request to mpsc"); |
191 | | let mut rx_resp_lock = self.rx_resp.lock().await; |
192 | | match rx_resp_lock |
193 | | .recv() |
194 | | .await |
195 | | .expect("Could not receive msg in mpsc") |
196 | | { |
197 | | ActionSchedulerReturns::FilterOperations(result) => result, |
198 | | _ => panic!("Expected find_by_client_operation_id return value"), |
199 | | } |
200 | 6 | } |
201 | | |
202 | 0 | fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { |
203 | 0 | Some(self) |
204 | 0 | } |
205 | | } |
206 | | |
207 | | impl RootMetricsComponent for MockActionScheduler {} |