/build/source/nativelink-scheduler/src/mock_scheduler.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use std::sync::Arc;  | 
16  |  |  | 
17  |  | use async_trait::async_trait;  | 
18  |  | use nativelink_error::{Error, make_input_err}; | 
19  |  | use nativelink_metric::{MetricsComponent, RootMetricsComponent}; | 
20  |  | use nativelink_util::action_messages::{ActionInfo, OperationId}; | 
21  |  | use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;  | 
22  |  | use nativelink_util::operation_state_manager::{ | 
23  |  |     ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,  | 
24  |  | };  | 
25  |  | use tokio::sync::{Mutex, mpsc}; | 
26  |  |  | 
27  |  | #[allow(  | 
28  |  |     clippy::large_enum_variant,  | 
29  |  |     reason = "Testing, so this doesn't really matter. Doesn't trigger on nightly"  | 
30  |  | )]  | 
31  |  | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]  | 
32  |  | enum ActionSchedulerCalls { | 
33  |  |     GetGetKnownProperties(String),  | 
34  |  |     AddAction((OperationId, ActionInfo)),  | 
35  |  |     FilterOperations(OperationFilter),  | 
36  |  | }  | 
37  |  |  | 
38  |  | #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]  | 
39  |  | enum ActionSchedulerReturns { | 
40  |  |     GetGetKnownProperties(Result<Vec<String>, Error>),  | 
41  |  |     AddAction(Result<Box<dyn ActionStateResult>, Error>),  | 
42  |  |     FilterOperations(Result<ActionStateResultStream<'static>, Error>),  | 
43  |  | }  | 
44  |  |  | 
45  |  | #[derive(MetricsComponent, Debug)]  | 
46  |  | pub struct MockActionScheduler { | 
47  |  |     rx_call: Mutex<mpsc::UnboundedReceiver<ActionSchedulerCalls>>,  | 
48  |  |     tx_call: mpsc::UnboundedSender<ActionSchedulerCalls>,  | 
49  |  |  | 
50  |  |     rx_resp: Mutex<mpsc::UnboundedReceiver<ActionSchedulerReturns>>,  | 
51  |  |     tx_resp: mpsc::UnboundedSender<ActionSchedulerReturns>,  | 
52  |  | }  | 
53  |  |  | 
54  |  | impl Default for MockActionScheduler { | 
55  | 0  |     fn default() -> Self { | 
56  | 0  |         Self::new()  | 
57  | 0  |     }  | 
58  |  | }  | 
59  |  |  | 
60  |  | impl MockActionScheduler { | 
61  | 15  |     pub fn new() -> Self { | 
62  | 15  |         let (tx_call, rx_call) = mpsc::unbounded_channel();  | 
63  | 15  |         let (tx_resp, rx_resp) = mpsc::unbounded_channel();  | 
64  | 15  |         Self { | 
65  | 15  |             rx_call: Mutex::new(rx_call),  | 
66  | 15  |             tx_call,  | 
67  | 15  |             rx_resp: Mutex::new(rx_resp),  | 
68  | 15  |             tx_resp,  | 
69  | 15  |         }  | 
70  | 15  |     }  | 
71  |  |  | 
72  |  |     #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]  | 
73  | 3  |     pub async fn expect_get_known_properties(&self, result: Result<Vec<String>, Error>) -> String { | 
74  | 3  |         let mut rx_call_lock = self.rx_call.lock().await;  | 
75  | 3  |         let ActionSchedulerCalls::GetGetKnownProperties(req) = rx_call_lock   Branch (75:13): [Folded - Ignored]
   Branch (75:13): [Folded - Ignored]
   Branch (75:13): [True: 3, False: 0]
  | 
76  | 3  |             .recv()  | 
77  | 3  |             .await  | 
78  | 3  |             .expect("Could not receive msg in mpsc") | 
79  |  |         else { | 
80  | 0  |             panic!("Got incorrect call waiting for get_known_properties") | 
81  |  |         };  | 
82  | 3  |         self.tx_resp  | 
83  | 3  |             .send(ActionSchedulerReturns::GetGetKnownProperties(result))  | 
84  | 3  |             .map_err(|_| make_input_err!("Could not send request to mpsc")) | 
85  | 3  |             .unwrap();  | 
86  | 3  |         req  | 
87  | 3  |     }  | 
88  |  |  | 
89  |  |     #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]  | 
90  | 0  |     pub async fn expect_add_action(  | 
91  | 0  |         &self,  | 
92  | 0  |         result: Result<Box<dyn ActionStateResult>, Error>,  | 
93  | 9  |     ) -> (OperationId, ActionInfo) { | 
94  | 9  |         let mut rx_call_lock = self.rx_call.lock().await;  | 
95  | 9  |         let ActionSchedulerCalls::AddAction(req) = rx_call_lock   Branch (95:13): [Folded - Ignored]
   Branch (95:13): [True: 1, False: 0]
   Branch (95:13): [Folded - Ignored]
   Branch (95:13): [True: 8, False: 0]
  | 
96  | 9  |             .recv()  | 
97  | 9  |             .await  | 
98  | 9  |             .expect("Could not receive msg in mpsc") | 
99  |  |         else { | 
100  | 0  |             panic!("Got incorrect call waiting for get_known_properties") | 
101  |  |         };  | 
102  | 9  |         self.tx_resp  | 
103  | 9  |             .send(ActionSchedulerReturns::AddAction(result))  | 
104  | 9  |             .map_err(|_| make_input_err!("Could not send request to mpsc")) | 
105  | 9  |             .unwrap();  | 
106  | 9  |         req  | 
107  | 9  |     }  | 
108  |  |  | 
109  |  |     #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]  | 
110  | 2  |     pub async fn expect_filter_operations(  | 
111  | 2  |         &self,  | 
112  | 2  |         result: Result<ActionStateResultStream<'static>, Error>,  | 
113  | 2  |     ) -> OperationFilter { | 
114  | 2  |         let mut rx_call_lock = self.rx_call.lock().await;  | 
115  | 2  |         let ActionSchedulerCalls::FilterOperations(req) = rx_call_lock   Branch (115:13): [Folded - Ignored]
   Branch (115:13): [True: 1, False: 0]
   Branch (115:13): [Folded - Ignored]
   Branch (115:13): [True: 1, False: 0]
  | 
116  | 2  |             .recv()  | 
117  | 2  |             .await  | 
118  | 2  |             .expect("Could not receive msg in mpsc") | 
119  |  |         else { | 
120  | 0  |             panic!("Got incorrect call waiting for find_by_client_operation_id") | 
121  |  |         };  | 
122  | 2  |         self.tx_resp  | 
123  | 2  |             .send(ActionSchedulerReturns::FilterOperations(result))  | 
124  | 2  |             .map_err(|_| make_input_err!("Could not send request to mpsc")) | 
125  | 2  |             .unwrap();  | 
126  | 2  |         req  | 
127  | 2  |     }  | 
128  |  | }  | 
129  |  |  | 
130  |  | #[async_trait]  | 
131  |  | impl KnownPlatformPropertyProvider for MockActionScheduler { | 
132  | 3  |     async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> { | 
133  |  |         self.tx_call  | 
134  |  |             .send(ActionSchedulerCalls::GetGetKnownProperties(  | 
135  |  |                 instance_name.to_string(),  | 
136  |  |             ))  | 
137  |  |             .expect("Could not send request to mpsc"); | 
138  |  |         let mut rx_resp_lock = self.rx_resp.lock().await;  | 
139  |  |         match rx_resp_lock  | 
140  |  |             .recv()  | 
141  |  |             .await  | 
142  |  |             .expect("Could not receive msg in mpsc") | 
143  |  |         { | 
144  |  |             ActionSchedulerReturns::GetGetKnownProperties(result) => result,  | 
145  |  |             _ => panic!("Expected get_known_properties return value"), | 
146  |  |         }  | 
147  | 3  |     }  | 
148  |  | }  | 
149  |  |  | 
150  |  | #[async_trait]  | 
151  |  | impl ClientStateManager for MockActionScheduler { | 
152  |  |     async fn add_action(  | 
153  |  |         &self,  | 
154  |  |         client_operation_id: OperationId,  | 
155  |  |         action_info: Arc<ActionInfo>,  | 
156  | 9  |     ) -> Result<Box<dyn ActionStateResult>, Error> { | 
157  |  |         self.tx_call  | 
158  |  |             .send(ActionSchedulerCalls::AddAction((  | 
159  |  |                 client_operation_id,  | 
160  |  |                 action_info.as_ref().clone(),  | 
161  |  |             )))  | 
162  |  |             .expect("Could not send request to mpsc"); | 
163  |  |         let mut rx_resp_lock = self.rx_resp.lock().await;  | 
164  |  |         match rx_resp_lock  | 
165  |  |             .recv()  | 
166  |  |             .await  | 
167  |  |             .expect("Could not receive msg in mpsc") | 
168  |  |         { | 
169  |  |             ActionSchedulerReturns::AddAction(result) => result,  | 
170  |  |             _ => panic!("Expected add_action return value"), | 
171  |  |         }  | 
172  | 9  |     }  | 
173  |  |  | 
174  |  |     async fn filter_operations<'a>(  | 
175  |  |         &'a self,  | 
176  |  |         filter: OperationFilter,  | 
177  | 2  |     ) -> Result<ActionStateResultStream<'a>, Error> { | 
178  |  |         self.tx_call  | 
179  |  |             .send(ActionSchedulerCalls::FilterOperations(filter))  | 
180  |  |             .expect("Could not send request to mpsc"); | 
181  |  |         let mut rx_resp_lock = self.rx_resp.lock().await;  | 
182  |  |         match rx_resp_lock  | 
183  |  |             .recv()  | 
184  |  |             .await  | 
185  |  |             .expect("Could not receive msg in mpsc") | 
186  |  |         { | 
187  |  |             ActionSchedulerReturns::FilterOperations(result) => result,  | 
188  |  |             _ => panic!("Expected find_by_client_operation_id return value"), | 
189  |  |         }  | 
190  | 2  |     }  | 
191  |  |  | 
192  | 0  |     fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { | 
193  | 0  |         Some(self)  | 
194  | 0  |     }  | 
195  |  | }  | 
196  |  |  | 
197  |  | impl RootMetricsComponent for MockActionScheduler {} |