Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::Future;
20
use nativelink_config::schedulers::SimpleSpec;
21
use nativelink_error::{Code, Error, ResultExt};
22
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
25
use nativelink_util::instant_wrapper::InstantWrapper;
26
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
27
use nativelink_util::operation_state_manager::{
28
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
29
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
30
};
31
use nativelink_util::origin_event::OriginMetadata;
32
use nativelink_util::spawn;
33
use nativelink_util::task::JoinHandleDropGuard;
34
use opentelemetry::KeyValue;
35
use opentelemetry::baggage::BaggageExt;
36
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
37
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
38
use tokio::sync::{Notify, mpsc};
39
use tokio::time::Duration;
40
use tokio_stream::StreamExt;
41
use tracing::{error, info_span};
42
43
use crate::api_worker_scheduler::ApiWorkerScheduler;
44
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
45
use crate::platform_property_manager::PlatformPropertyManager;
46
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
47
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
48
use crate::worker_scheduler::WorkerScheduler;
49
50
/// Default timeout for workers in seconds.
51
/// If this changes, remember to change the documentation in the config.
52
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
53
54
/// Mark operations as completed with error if no client has updated them
55
/// within this duration.
56
/// If this changes, remember to change the documentation in the config.
57
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
58
59
/// Default times a job can retry before failing.
60
/// If this changes, remember to change the documentation in the config.
61
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
62
63
struct SimpleSchedulerActionStateResult {
64
    client_operation_id: OperationId,
65
    action_state_result: Box<dyn ActionStateResult>,
66
}
67
68
impl SimpleSchedulerActionStateResult {
69
27
    fn new(
70
27
        client_operation_id: OperationId,
71
27
        action_state_result: Box<dyn ActionStateResult>,
72
27
    ) -> Self {
73
27
        Self {
74
27
            client_operation_id,
75
27
            action_state_result,
76
27
        }
77
27
    }
78
}
79
80
#[async_trait]
81
impl ActionStateResult for SimpleSchedulerActionStateResult {
82
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
83
3
        let (mut action_state, origin_metadata) = self
84
3
            .action_state_result
85
3
            .as_state()
86
3
            .await
87
3
            .err_tip(|| "In SimpleSchedulerActionStateResult")
?0
;
88
        // We need to ensure the client is not aware of the downstream
89
        // operation id, so override it before it goes out.
90
3
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
91
3
        Ok((action_state, origin_metadata))
92
6
    }
93
94
88
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
95
44
        let (
mut action_state41
,
origin_metadata41
) = self
96
44
            .action_state_result
97
44
            .changed()
98
44
            .await
99
41
            .err_tip(|| "In SimpleSchedulerActionStateResult")
?0
;
100
        // We need to ensure the client is not aware of the downstream
101
        // operation id, so override it before it goes out.
102
41
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
103
41
        Ok((action_state, origin_metadata))
104
85
    }
105
106
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
107
0
        self.action_state_result
108
0
            .as_action_info()
109
0
            .await
110
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
111
0
    }
112
}
113
114
/// Engine used to manage the queued/running tasks and relationship with
115
/// the worker nodes. All state on how the workers and actions are interacting
116
/// should be held in this struct.
117
#[derive(MetricsComponent)]
118
pub struct SimpleScheduler {
119
    /// Manager for matching engine side of the state manager.
120
    #[metric(group = "matching_engine_state_manager")]
121
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
122
123
    /// Manager for client state of this scheduler.
124
    #[metric(group = "client_state_manager")]
125
    client_state_manager: Arc<dyn ClientStateManager>,
126
127
    /// Manager for platform of this scheduler.
128
    #[metric(group = "platform_properties")]
129
    platform_property_manager: Arc<PlatformPropertyManager>,
130
131
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
132
    /// order based on the allocation strategy.
133
    #[metric(group = "worker_scheduler")]
134
    worker_scheduler: Arc<ApiWorkerScheduler>,
135
136
    /// The sender to send origin events to the origin events.
137
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
138
139
    /// Background task that tries to match actions to workers. If this struct
140
    /// is dropped the spawn will be cancelled as well.
141
    _task_worker_matching_spawn: JoinHandleDropGuard<()>,
142
}
143
144
impl core::fmt::Debug for SimpleScheduler {
145
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
146
0
        f.debug_struct("SimpleScheduler")
147
0
            .field("platform_property_manager", &self.platform_property_manager)
148
0
            .field("worker_scheduler", &self.worker_scheduler)
149
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
150
0
            .field(
151
0
                "_task_worker_matching_spawn",
152
0
                &self._task_worker_matching_spawn,
153
0
            )
154
0
            .finish_non_exhaustive()
155
0
    }
156
}
157
158
impl SimpleScheduler {
159
    /// Attempts to find a worker to execute an action and begins executing it.
160
    /// If an action is already running that is cacheable it may merge this
161
    /// action with the results and state changes of the already running
162
    /// action. If the task cannot be executed immediately it will be queued
163
    /// for execution based on priority and other metrics.
164
    /// All further updates to the action will be provided through the returned
165
    /// value.
166
27
    async fn inner_add_action(
167
27
        &self,
168
27
        client_operation_id: OperationId,
169
27
        action_info: Arc<ActionInfo>,
170
27
    ) -> Result<Box<dyn ActionStateResult>, Error> {
171
27
        let action_state_result = self
172
27
            .client_state_manager
173
27
            .add_action(client_operation_id.clone(), action_info)
174
27
            .await
175
27
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
176
27
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
177
27
            client_operation_id.clone(),
178
27
            action_state_result,
179
27
        )))
180
27
    }
181
182
503
    async fn inner_filter_operations(
183
503
        &self,
184
503
        filter: OperationFilter,
185
503
    ) -> Result<ActionStateResultStream, Error> {
186
503
        self.client_state_manager
187
503
            .filter_operations(filter)
188
503
            .await
189
503
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
190
503
    }
191
192
94
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream, Error> {
193
94
        let filter = OperationFilter {
194
94
            stages: OperationStageFlags::Queued,
195
94
            order_by_priority_direction: Some(OrderDirection::Desc),
196
94
            ..Default::default()
197
94
        };
198
94
        self.matching_engine_state_manager
199
94
            .filter_operations(filter)
200
94
            .await
201
94
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
202
94
    }
203
204
4
    
pub async fn do_try_match_for_test(&self) -> Result<(), Error>0
{
205
4
        self.do_try_match().await
206
4
    }
207
208
    // TODO(aaronmondal) This is an O(n*m) (aka n^2) algorithm. In theory we
209
    // can create a map of capabilities of each worker and then try and match
210
    // the actions to the worker using the map lookup (ie. map reduce).
211
94
    async fn do_try_match(&self) -> Result<(), Error> {
212
45
        async fn match_action_to_worker(
213
45
            action_state_result: &dyn ActionStateResult,
214
45
            workers: &ApiWorkerScheduler,
215
45
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
216
45
            platform_property_manager: &PlatformPropertyManager,
217
45
        ) -> Result<(), Error> {
218
45
            let (action_info, maybe_origin_metadata) =
219
45
                action_state_result
220
45
                    .as_action_info()
221
45
                    .await
222
45
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
223
224
            // TODO(aaronmondal) We should not compute this every time and instead store
225
            // it with the ActionInfo when we receive it.
226
45
            let platform_properties = platform_property_manager
227
45
                .make_platform_properties(action_info.platform_properties.clone())
228
45
                .err_tip(
229
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
230
0
                )?;
231
232
45
            let action_info = ActionInfoWithProps {
233
45
                inner: action_info,
234
45
                platform_properties,
235
45
            };
236
237
            // Try to find a worker for the action.
238
28
            let worker_id = {
239
45
                match workers
240
45
                    .find_worker_for_action(&action_info.platform_properties)
241
45
                    .await
242
                {
243
28
                    Some(worker_id) => worker_id,
244
                    // If we could not find a worker for the action,
245
                    // we have nothing to do.
246
17
                    None => return Ok(()),
247
                }
248
            };
249
250
28
            let attach_operation_fut = async move {
251
                // Extract the operation_id from the action_state.
252
28
                let operation_id = {
253
28
                    let (action_state, _origin_metadata) = action_state_result
254
28
                        .as_state()
255
28
                        .await
256
28
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
257
28
                    action_state.client_operation_id.clone()
258
                };
259
260
                // Tell the matching engine that the operation is being assigned to a worker.
261
28
                let assign_result = matching_engine_state_manager
262
28
                    .assign_operation(&operation_id, Ok(&worker_id))
263
28
                    .await
264
28
                    .err_tip(|| "Failed to assign operation in do_try_match");
265
28
                if let Err(
err1
) = assign_result {
  Branch (265:24): [True: 0, False: 0]
  Branch (265:24): [Folded - Ignored]
  Branch (265:24): [True: 1, False: 27]
266
1
                    if err.code == Code::Aborted {
  Branch (266:24): [True: 0, False: 0]
  Branch (266:24): [Folded - Ignored]
  Branch (266:24): [True: 0, False: 1]
267
                        // If the operation was aborted, it means that the operation was
268
                        // cancelled due to another operation being assigned to the worker.
269
0
                        return Ok(());
270
1
                    }
271
                    // Any other error is a real error.
272
1
                    return Err(err);
273
27
                }
274
275
27
                workers
276
27
                    .worker_notify_run_action(worker_id, operation_id, action_info)
277
27
                    .await
278
27
                    .err_tip(|| 
{1
279
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
280
1
                    })
281
28
            };
282
28
            tokio::pin!(attach_operation_fut);
283
284
28
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
285
286
28
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
287
                ENDUSER_ID,
288
28
                origin_metadata.identity,
289
            )]);
290
291
28
            info_span!("do_try_match")
292
28
                .in_scope(|| attach_operation_fut)
293
28
                .with_context(ctx)
294
28
                .await
295
45
        }
296
297
94
        let mut result = Ok(());
298
299
94
        let mut stream = self
300
94
            .get_queued_operations()
301
94
            .await
302
94
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
303
304
139
        while let Some(
action_state_result45
) = stream.next().await {
  Branch (304:19): [True: 0, False: 0]
  Branch (304:19): [Folded - Ignored]
  Branch (304:19): [True: 45, False: 94]
305
45
            result = result.merge(
306
45
                match_action_to_worker(
307
45
                    action_state_result.as_ref(),
308
45
                    self.worker_scheduler.as_ref(),
309
45
                    self.matching_engine_state_manager.as_ref(),
310
45
                    self.platform_property_manager.as_ref(),
311
45
                )
312
45
                .await,
313
            );
314
        }
315
94
        result
316
94
    }
317
}
318
319
impl SimpleScheduler {
320
0
    pub fn new<A: AwaitedActionDb>(
321
0
        spec: &SimpleSpec,
322
0
        awaited_action_db: A,
323
0
        task_change_notify: Arc<Notify>,
324
0
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
325
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
326
0
        Self::new_with_callback(
327
0
            spec,
328
0
            awaited_action_db,
329
0
            || {
330
                // The cost of running `do_try_match()` is very high, but constant
331
                // in relation to the number of changes that have happened. This
332
                // means that grabbing this lock to process `do_try_match()` should
333
                // always yield to any other tasks that might want the lock. The
334
                // easiest and most fair way to do this is to sleep for a small
335
                // amount of time. Using something like tokio::task::yield_now()
336
                // does not yield as aggressively as we'd like if new futures are
337
                // scheduled within a future.
338
0
                tokio::time::sleep(Duration::from_millis(1))
339
0
            },
340
0
            task_change_notify,
341
            SystemTime::now,
342
0
            maybe_origin_event_tx,
343
        )
344
0
    }
345
346
22
    pub fn new_with_callback<
347
22
        Fut: Future<Output = ()> + Send,
348
22
        F: Fn() -> Fut + Send + Sync + 'static,
349
22
        A: AwaitedActionDb,
350
22
        I: InstantWrapper,
351
22
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
352
22
    >(
353
22
        spec: &SimpleSpec,
354
22
        awaited_action_db: A,
355
22
        on_matching_engine_run: F,
356
22
        task_change_notify: Arc<Notify>,
357
22
        now_fn: NowFn,
358
22
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
359
22
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
360
22
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
361
22
            spec.supported_platform_properties
362
22
                .clone()
363
22
                .unwrap_or_default(),
364
        ));
365
366
22
        let mut worker_timeout_s = spec.worker_timeout_s;
367
22
        if worker_timeout_s == 0 {
  Branch (367:12): [True: 0, False: 0]
  Branch (367:12): [True: 0, False: 0]
  Branch (367:12): [Folded - Ignored]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 0, False: 1]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 0, False: 1]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 0, False: 1]
  Branch (367:12): [True: 0, False: 1]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 0, False: 1]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
  Branch (367:12): [True: 1, False: 0]
368
17
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
369
17
        
}5
370
371
22
        let mut client_action_timeout_s = spec.client_action_timeout_s;
372
22
        if client_action_timeout_s == 0 {
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [True: 0, False: 0]
  Branch (372:12): [Folded - Ignored]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 0, False: 1]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
  Branch (372:12): [True: 1, False: 0]
373
21
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
374
21
        
}1
375
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
376
        // tasks are going to be dropped all over the place, this isn't a good
377
        // setting.
378
22
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (378:12): [True: 0, False: 0]
  Branch (378:12): [True: 0, False: 0]
  Branch (378:12): [Folded - Ignored]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
  Branch (378:12): [True: 0, False: 1]
379
0
            error!(
380
                client_action_timeout_s,
381
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
382
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
383
            );
384
22
        }
385
386
22
        let mut max_job_retries = spec.max_job_retries;
387
22
        if max_job_retries == 0 {
  Branch (387:12): [True: 0, False: 0]
  Branch (387:12): [True: 0, False: 0]
  Branch (387:12): [Folded - Ignored]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
  Branch (387:12): [True: 1, False: 0]
388
21
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
389
21
        
}1
390
391
22
        let worker_change_notify = Arc::new(Notify::new());
392
22
        let state_manager = SimpleSchedulerStateManager::new(
393
22
            max_job_retries,
394
22
            Duration::from_secs(worker_timeout_s),
395
22
            Duration::from_secs(client_action_timeout_s),
396
22
            awaited_action_db,
397
22
            now_fn,
398
        );
399
400
22
        let worker_scheduler = ApiWorkerScheduler::new(
401
22
            state_manager.clone(),
402
22
            platform_property_manager.clone(),
403
22
            spec.allocation_strategy,
404
22
            worker_change_notify.clone(),
405
22
            worker_timeout_s,
406
        );
407
408
22
        let worker_scheduler_clone = worker_scheduler.clone();
409
410
22
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
411
22
            let weak_inner = weak_self.clone();
412
22
            let task_worker_matching_spawn =
413
22
                spawn!("simple_scheduler_task_worker_matching", async move 
{21
414
                    // Break out of the loop only when the inner is dropped.
415
                    loop {
416
111
                        let task_change_fut = task_change_notify.notified();
417
111
                        let worker_change_fut = worker_change_notify.notified();
418
111
                        tokio::pin!(task_change_fut);
419
111
                        tokio::pin!(worker_change_fut);
420
                        // Wait for either of these futures to be ready.
421
111
                        let _ = futures::future::select(task_change_fut, worker_change_fut).await;
422
90
                        let result = match weak_inner.upgrade() {
423
90
                            Some(scheduler) => scheduler.do_try_match().await,
424
                            // If the inner went away it means the scheduler is shutting
425
                            // down, so we need to resolve our future.
426
0
                            None => return,
427
                        };
428
90
                        if let Err(
err1
) = result {
  Branch (428:32): [True: 0, False: 0]
  Branch (428:32): [True: 0, False: 0]
  Branch (428:32): [Folded - Ignored]
  Branch (428:32): [True: 0, False: 3]
  Branch (428:32): [True: 0, False: 1]
  Branch (428:32): [True: 0, False: 1]
  Branch (428:32): [True: 0, False: 0]
  Branch (428:32): [True: 0, False: 1]
  Branch (428:32): [True: 0, False: 3]
  Branch (428:32): [True: 0, False: 3]
  Branch (428:32): [True: 0, False: 4]
  Branch (428:32): [True: 0, False: 5]
  Branch (428:32): [True: 0, False: 7]
  Branch (428:32): [True: 0, False: 2]
  Branch (428:32): [True: 0, False: 4]
  Branch (428:32): [True: 0, False: 7]
  Branch (428:32): [True: 0, False: 6]
  Branch (428:32): [True: 0, False: 6]
  Branch (428:32): [True: 0, False: 9]
  Branch (428:32): [True: 0, False: 3]
  Branch (428:32): [True: 0, False: 3]
  Branch (428:32): [True: 0, False: 4]
  Branch (428:32): [True: 0, False: 7]
  Branch (428:32): [True: 1, False: 3]
  Branch (428:32): [True: 0, False: 7]
429
1
                            error!(?err, "Error while running do_try_match");
430
89
                        }
431
432
90
                        on_matching_engine_run().await;
433
                    }
434
                    // Unreachable.
435
0
                });
436
22
            Self {
437
22
                matching_engine_state_manager: state_manager.clone(),
438
22
                client_state_manager: state_manager.clone(),
439
22
                worker_scheduler,
440
22
                platform_property_manager,
441
22
                maybe_origin_event_tx,
442
22
                _task_worker_matching_spawn: task_worker_matching_spawn,
443
22
            }
444
22
        });
445
22
        (action_scheduler, worker_scheduler_clone)
446
22
    }
447
}
448
449
#[async_trait]
450
impl ClientStateManager for SimpleScheduler {
451
    async fn add_action(
452
        &self,
453
        client_operation_id: OperationId,
454
        action_info: Arc<ActionInfo>,
455
54
    ) -> Result<Box<dyn ActionStateResult>, Error> {
456
27
        self.inner_add_action(client_operation_id, action_info)
457
27
            .await
458
54
    }
459
460
    async fn filter_operations<'a>(
461
        &'a self,
462
        filter: OperationFilter,
463
1.00k
    ) -> Result<ActionStateResultStream<'a>, Error> {
464
503
        self.inner_filter_operations(filter).await
465
1.00k
    }
466
467
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
468
0
        Some(self)
469
0
    }
470
}
471
472
#[async_trait]
473
impl KnownPlatformPropertyProvider for SimpleScheduler {
474
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
475
0
        Ok(self
476
0
            .worker_scheduler
477
0
            .get_platform_property_manager()
478
0
            .get_known_properties()
479
0
            .keys()
480
0
            .cloned()
481
0
            .collect())
482
0
    }
483
}
484
485
#[async_trait]
486
impl WorkerScheduler for SimpleScheduler {
487
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
488
0
        self.worker_scheduler.get_platform_property_manager()
489
0
    }
490
491
50
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
492
25
        self.worker_scheduler.add_worker(worker).await
493
50
    }
494
495
    async fn update_action(
496
        &self,
497
        worker_id: &WorkerId,
498
        operation_id: &OperationId,
499
        update: UpdateOperationType,
500
18
    ) -> Result<(), Error> {
501
9
        self.worker_scheduler
502
9
            .update_action(worker_id, operation_id, update)
503
9
            .await
504
18
    }
505
506
    async fn worker_keep_alive_received(
507
        &self,
508
        worker_id: &WorkerId,
509
        timestamp: WorkerTimestamp,
510
2
    ) -> Result<(), Error> {
511
1
        self.worker_scheduler
512
1
            .worker_keep_alive_received(worker_id, timestamp)
513
1
            .await
514
2
    }
515
516
2
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
517
1
        self.worker_scheduler.remove_worker(worker_id).await
518
2
    }
519
520
2
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
521
1
        self.worker_scheduler
522
1
            .remove_timedout_workers(now_timestamp)
523
1
            .await
524
2
    }
525
526
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
527
2
        self.worker_scheduler
528
2
            .set_drain_worker(worker_id, is_draining)
529
2
            .await
530
4
    }
531
}
532
533
impl RootMetricsComponent for SimpleScheduler {}