Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/mock_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
17
use async_trait::async_trait;
18
use nativelink_error::Error;
19
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
20
use nativelink_util::action_messages::{ActionInfo, OperationId};
21
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
22
use nativelink_util::operation_state_manager::{
23
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
24
};
25
use tokio::sync::{Mutex, mpsc};
26
use tonic::Code;
27
28
#[allow(
29
    clippy::large_enum_variant,
30
    reason = "Testing, so this doesn't really matter. Doesn't trigger on nightly"
31
)]
32
#[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
33
enum ActionSchedulerCalls {
34
    GetGetKnownProperties(String),
35
    AddAction((OperationId, ActionInfo)),
36
    FilterOperations(OperationFilter),
37
}
38
39
#[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
40
enum ActionSchedulerReturns {
41
    GetGetKnownProperties(Result<Vec<String>, Error>),
42
    AddAction(Result<Box<dyn ActionStateResult>, Error>),
43
    FilterOperations(Result<ActionStateResultStream<'static>, Error>),
44
}
45
46
#[derive(MetricsComponent, Debug)]
47
pub struct MockActionScheduler {
48
    rx_call: Mutex<mpsc::UnboundedReceiver<ActionSchedulerCalls>>,
49
    tx_call: mpsc::UnboundedSender<ActionSchedulerCalls>,
50
51
    rx_resp: Mutex<mpsc::UnboundedReceiver<ActionSchedulerReturns>>,
52
    tx_resp: mpsc::UnboundedSender<ActionSchedulerReturns>,
53
}
54
55
impl Default for MockActionScheduler {
56
0
    fn default() -> Self {
57
0
        Self::new()
58
0
    }
59
}
60
61
impl MockActionScheduler {
62
22
    pub fn new() -> Self {
63
22
        let (tx_call, rx_call) = mpsc::unbounded_channel();
64
22
        let (tx_resp, rx_resp) = mpsc::unbounded_channel();
65
22
        Self {
66
22
            rx_call: Mutex::new(rx_call),
67
22
            tx_call,
68
22
            rx_resp: Mutex::new(rx_resp),
69
22
            tx_resp,
70
22
        }
71
22
    }
72
73
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
74
3
    pub async fn expect_get_known_properties(&self, result: Result<Vec<String>, Error>) -> String {
75
3
        let mut rx_call_lock = self.rx_call.lock().await;
76
3
        let ActionSchedulerCalls::GetGetKnownProperties(req) = rx_call_lock
77
3
            .recv()
78
3
            .await
79
3
            .expect("Could not receive msg in mpsc")
80
        else {
81
0
            panic!("Got incorrect call waiting for get_known_properties")
82
        };
83
3
        self.tx_resp
84
3
            .send(ActionSchedulerReturns::GetGetKnownProperties(result))
85
3
            .map_err(|err| 
{0
86
0
                Error::from_std_err(Code::InvalidArgument, &err)
87
0
                    .append("Could not send request to mpsc")
88
0
            })
89
3
            .unwrap();
90
3
        req
91
3
    }
92
93
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
94
9
    pub async fn expect_add_action(
95
9
        &self,
96
9
        result: Result<Box<dyn ActionStateResult>, Error>,
97
9
    ) -> (OperationId, ActionInfo) {
98
9
        let mut rx_call_lock = self.rx_call.lock().await;
99
9
        let ActionSchedulerCalls::AddAction(req) = rx_call_lock
100
9
            .recv()
101
9
            .await
102
9
            .expect("Could not receive msg in mpsc")
103
        else {
104
0
            panic!("Got incorrect call waiting for get_known_properties")
105
        };
106
9
        self.tx_resp
107
9
            .send(ActionSchedulerReturns::AddAction(result))
108
9
            .map_err(|err| 
{0
109
0
                Error::from_std_err(Code::InvalidArgument, &err)
110
0
                    .append("Could not send request to mpsc")
111
0
            })
112
9
            .unwrap();
113
9
        req
114
9
    }
115
116
    #[allow(dead_code, reason = "https://github.com/rust-lang/rust/issues/46379")]
117
6
    pub async fn expect_filter_operations(
118
6
        &self,
119
6
        result: Result<ActionStateResultStream<'static>, Error>,
120
6
    ) -> OperationFilter {
121
6
        let mut rx_call_lock = self.rx_call.lock().await;
122
6
        let ActionSchedulerCalls::FilterOperations(req) = rx_call_lock
123
6
            .recv()
124
6
            .await
125
6
            .expect("Could not receive msg in mpsc")
126
        else {
127
0
            panic!("Got incorrect call waiting for find_by_client_operation_id")
128
        };
129
6
        self.tx_resp
130
6
            .send(ActionSchedulerReturns::FilterOperations(result))
131
6
            .map_err(|err| 
{0
132
0
                Error::from_std_err(Code::InvalidArgument, &err)
133
0
                    .append("Could not send request to mpsc")
134
0
            })
135
6
            .unwrap();
136
6
        req
137
6
    }
138
}
139
140
#[async_trait]
141
impl KnownPlatformPropertyProvider for MockActionScheduler {
142
3
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
143
        self.tx_call
144
            .send(ActionSchedulerCalls::GetGetKnownProperties(
145
                instance_name.to_string(),
146
            ))
147
            .expect("Could not send request to mpsc");
148
        let mut rx_resp_lock = self.rx_resp.lock().await;
149
        match rx_resp_lock
150
            .recv()
151
            .await
152
            .expect("Could not receive msg in mpsc")
153
        {
154
            ActionSchedulerReturns::GetGetKnownProperties(result) => result,
155
            _ => panic!("Expected get_known_properties return value"),
156
        }
157
3
    }
158
}
159
160
#[async_trait]
161
impl ClientStateManager for MockActionScheduler {
162
    async fn add_action(
163
        &self,
164
        client_operation_id: OperationId,
165
        action_info: Arc<ActionInfo>,
166
9
    ) -> Result<Box<dyn ActionStateResult>, Error> {
167
        self.tx_call
168
            .send(ActionSchedulerCalls::AddAction((
169
                client_operation_id,
170
                action_info.as_ref().clone(),
171
            )))
172
            .expect("Could not send request to mpsc");
173
        let mut rx_resp_lock = self.rx_resp.lock().await;
174
        match rx_resp_lock
175
            .recv()
176
            .await
177
            .expect("Could not receive msg in mpsc")
178
        {
179
            ActionSchedulerReturns::AddAction(result) => result,
180
            _ => panic!("Expected add_action return value"),
181
        }
182
9
    }
183
184
    async fn filter_operations<'a>(
185
        &'a self,
186
        filter: OperationFilter,
187
6
    ) -> Result<ActionStateResultStream<'a>, Error> {
188
        self.tx_call
189
            .send(ActionSchedulerCalls::FilterOperations(filter))
190
            .expect("Could not send request to mpsc");
191
        let mut rx_resp_lock = self.rx_resp.lock().await;
192
        match rx_resp_lock
193
            .recv()
194
            .await
195
            .expect("Could not receive msg in mpsc")
196
        {
197
            ActionSchedulerReturns::FilterOperations(result) => result,
198
            _ => panic!("Expected find_by_client_operation_id return value"),
199
        }
200
6
    }
201
202
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
203
0
        Some(self)
204
0
    }
205
}
206
207
impl RootMetricsComponent for MockActionScheduler {}