Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::{Future, FutureExt};
20
use nativelink_config::schedulers::SimpleSpec;
21
use nativelink_error::{Code, Error, ResultExt};
22
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
25
use nativelink_util::instant_wrapper::InstantWrapper;
26
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
27
use nativelink_util::operation_state_manager::{
28
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
29
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
30
};
31
use nativelink_util::origin_context::ActiveOriginContext;
32
use nativelink_util::origin_event::{OriginEventCollector, OriginMetadata, ORIGIN_EVENT_COLLECTOR};
33
use nativelink_util::spawn;
34
use nativelink_util::task::JoinHandleDropGuard;
35
use tokio::sync::{mpsc, Notify};
36
use tokio::time::Duration;
37
use tokio_stream::StreamExt;
38
use tracing::{event, info_span, Level};
39
40
use crate::api_worker_scheduler::ApiWorkerScheduler;
41
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
42
use crate::platform_property_manager::PlatformPropertyManager;
43
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
44
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
45
use crate::worker_scheduler::WorkerScheduler;
46
47
/// Default timeout for workers in seconds.
48
/// If this changes, remember to change the documentation in the config.
49
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
50
51
/// Mark operations as completed with error if no client has updated them
52
/// within this duration.
53
/// If this changes, remember to change the documentation in the config.
54
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
55
56
/// Default times a job can retry before failing.
57
/// If this changes, remember to change the documentation in the config.
58
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
59
60
struct SimpleSchedulerActionStateResult {
61
    client_operation_id: OperationId,
62
    action_state_result: Box<dyn ActionStateResult>,
63
}
64
65
impl SimpleSchedulerActionStateResult {
66
27
    fn new(
67
27
        client_operation_id: OperationId,
68
27
        action_state_result: Box<dyn ActionStateResult>,
69
27
    ) -> Self {
70
27
        Self {
71
27
            client_operation_id,
72
27
            action_state_result,
73
27
        }
74
27
    }
75
}
76
77
#[async_trait]
78
impl ActionStateResult for SimpleSchedulerActionStateResult {
79
3
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
80
3
        let (mut action_state, origin_metadata) = self
81
3
            .action_state_result
82
3
            .as_state()
83
3
            .await
84
3
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
85
        // We need to ensure the client is not aware of the downstream
86
        // operation id, so override it before it goes out.
87
3
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
88
3
        Ok((action_state, origin_metadata))
89
6
    }
90
91
44
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
92
44
        let (
mut action_state, origin_metadata41
) = self
93
44
            .action_state_result
94
44
            .changed()
95
44
            .await
96
41
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
97
        // We need to ensure the client is not aware of the downstream
98
        // operation id, so override it before it goes out.
99
41
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
100
41
        Ok((action_state, origin_metadata))
101
85
    }
102
103
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
104
0
        self.action_state_result
105
0
            .as_action_info()
106
0
            .await
107
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
108
0
    }
109
}
110
111
/// Engine used to manage the queued/running tasks and relationship with
112
/// the worker nodes. All state on how the workers and actions are interacting
113
/// should be held in this struct.
114
#[derive(MetricsComponent)]
115
pub struct SimpleScheduler {
116
    /// Manager for matching engine side of the state manager.
117
    #[metric(group = "matching_engine_state_manager")]
118
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
119
120
    /// Manager for client state of this scheduler.
121
    #[metric(group = "client_state_manager")]
122
    client_state_manager: Arc<dyn ClientStateManager>,
123
124
    /// Manager for platform of this scheduler.
125
    #[metric(group = "platform_properties")]
126
    platform_property_manager: Arc<PlatformPropertyManager>,
127
128
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
129
    /// order based on the allocation strategy.
130
    #[metric(group = "worker_scheduler")]
131
    worker_scheduler: Arc<ApiWorkerScheduler>,
132
133
    /// The sender to send origin events to the origin events.
134
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
135
136
    /// Background task that tries to match actions to workers. If this struct
137
    /// is dropped the spawn will be cancelled as well.
138
    _task_worker_matching_spawn: JoinHandleDropGuard<()>,
139
}
140
141
impl SimpleScheduler {
142
    /// Attempts to find a worker to execute an action and begins executing it.
143
    /// If an action is already running that is cacheable it may merge this
144
    /// action with the results and state changes of the already running
145
    /// action. If the task cannot be executed immediately it will be queued
146
    /// for execution based on priority and other metrics.
147
    /// All further updates to the action will be provided through the returned
148
    /// value.
149
27
    async fn inner_add_action(
150
27
        &self,
151
27
        client_operation_id: OperationId,
152
27
        action_info: Arc<ActionInfo>,
153
27
    ) -> Result<Box<dyn ActionStateResult>, Error> {
154
27
        let action_state_result = self
155
27
            .client_state_manager
156
27
            .add_action(client_operation_id.clone(), action_info)
157
27
            .await
158
27
            .err_tip(|| 
"In SimpleScheduler::add_action"0
)
?0
;
159
27
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
160
27
            client_operation_id.clone(),
161
27
            action_state_result,
162
27
        )))
163
27
    }
164
165
503
    async fn inner_filter_operations(
166
503
        &self,
167
503
        filter: OperationFilter,
168
503
    ) -> Result<ActionStateResultStream, Error> {
169
503
        self.client_state_manager
170
503
            .filter_operations(filter)
171
503
            .await
172
503
            .err_tip(|| 
"In SimpleScheduler::find_by_client_operation_id getting filter result"0
)
173
503
    }
174
175
94
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream, Error> {
176
94
        let filter = OperationFilter {
177
94
            stages: OperationStageFlags::Queued,
178
94
            order_by_priority_direction: Some(OrderDirection::Desc),
179
94
            ..Default::default()
180
94
        };
181
94
        self.matching_engine_state_manager
182
94
            .filter_operations(filter)
183
94
            .await
184
94
            .err_tip(|| 
"In SimpleScheduler::get_queued_operations getting filter result"0
)
185
94
    }
186
187
4
    
pub async fn do_try_match_for_test(&self) -> Result<(), Error> 0
{
188
4
        self.do_try_match().await
189
4
    }
190
191
    // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
192
    // can create a map of capabilities of each worker and then try and match
193
    // the actions to the worker using the map lookup (ie. map reduce).
194
94
    async fn do_try_match(&self) -> Result<(), Error> {
195
45
        async fn match_action_to_worker(
196
45
            action_state_result: &dyn ActionStateResult,
197
45
            workers: &ApiWorkerScheduler,
198
45
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
199
45
            platform_property_manager: &PlatformPropertyManager,
200
45
            maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
201
45
        ) -> Result<(), Error> {
202
45
            let (action_info, maybe_origin_metadata) =
203
45
                action_state_result
204
45
                    .as_action_info()
205
45
                    .await
206
45
                    .err_tip(|| 
"Failed to get action_info from as_action_info_result stream"0
)
?0
;
207
208
            // TODO(allada) We should not compute this every time and instead store
209
            // it with the ActionInfo when we receive it.
210
45
            let platform_properties = platform_property_manager
211
45
                .make_platform_properties(action_info.platform_properties.clone())
212
45
                .err_tip(|| {
213
0
                    "Failed to make platform properties in SimpleScheduler::do_try_match"
214
45
                })
?0
;
215
216
45
            let action_info = ActionInfoWithProps {
217
45
                inner: action_info,
218
45
                platform_properties,
219
45
            };
220
221
            // Try to find a worker for the action.
222
28
            let worker_id = {
223
45
                match workers
224
45
                    .find_worker_for_action(&action_info.platform_properties)
225
45
                    .await
226
                {
227
28
                    Some(worker_id) => worker_id,
228
                    // If we could not find a worker for the action,
229
                    // we have nothing to do.
230
17
                    None => return Ok(()),
231
                }
232
            };
233
234
28
            let attach_operation_fut = async move {
235
                // Extract the operation_id from the action_state.
236
28
                let operation_id = {
237
28
                    let (action_state, _origin_metadata) = action_state_result
238
28
                        .as_state()
239
28
                        .await
240
28
                        .err_tip(|| 
"Failed to get action_info from as_state_result stream"0
)
?0
;
241
28
                    action_state.client_operation_id.clone()
242
                };
243
244
                // Tell the matching engine that the operation is being assigned to a worker.
245
28
                let assign_result = matching_engine_state_manager
246
28
                    .assign_operation(&operation_id, Ok(&worker_id))
247
28
                    .await
248
28
                    .err_tip(|| 
"Failed to assign operation in do_try_match"1
);
249
28
                if let Err(
err1
) = assign_result {
  Branch (249:24): [True: 0, False: 0]
  Branch (249:24): [Folded - Ignored]
  Branch (249:24): [True: 1, False: 27]
250
1
                    if err.code == Code::Aborted {
  Branch (250:24): [True: 0, False: 0]
  Branch (250:24): [Folded - Ignored]
  Branch (250:24): [True: 0, False: 1]
251
                        // If the operation was aborted, it means that the operation was
252
                        // cancelled due to another operation being assigned to the worker.
253
0
                        return Ok(());
254
1
                    }
255
1
                    // Any other error is a real error.
256
1
                    return Err(err);
257
27
                }
258
27
259
27
                workers
260
27
                    .worker_notify_run_action(worker_id, operation_id, action_info)
261
27
                    .await
262
27
                    .err_tip(|| {
263
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
264
27
                    })
265
28
            };
266
28
            tokio::pin!(attach_operation_fut);
267
268
28
            let attach_operation_fut = if let Some(
origin_event_tx0
) = maybe_origin_event_tx {
  Branch (268:47): [True: 0, False: 0]
  Branch (268:47): [Folded - Ignored]
  Branch (268:47): [True: 0, False: 28]
269
0
                let mut ctx = ActiveOriginContext::fork().unwrap_or_default();
270
0
271
0
                // Populate our origin event collector with the origin metadata
272
0
                // associated with the action.
273
0
                ctx.replace_value(&ORIGIN_EVENT_COLLECTOR, move |maybe_old_collector| {
274
0
                    let origin_metadata = maybe_origin_metadata.unwrap_or_default();
275
0
                    let Some(old_collector) = maybe_old_collector else {
  Branch (275:25): [True: 0, False: 0]
  Branch (275:25): [Folded - Ignored]
  Branch (275:25): [True: 0, False: 0]
276
0
                        return Some(Arc::new(OriginEventCollector::new(
277
0
                            origin_event_tx.clone(),
278
0
                            origin_metadata,
279
0
                        )));
280
                    };
281
0
                    Some(Arc::new(old_collector.clone_with_metadata(origin_metadata)))
282
0
                });
283
0
                Arc::new(ctx)
284
0
                    .wrap_async(info_span!("do_try_match"), attach_operation_fut)
285
0
                    .left_future()
286
            } else {
287
28
                attach_operation_fut.right_future()
288
            };
289
28
            attach_operation_fut.await
290
45
        }
291
292
94
        let mut result = Ok(());
293
294
94
        let mut stream = self
295
94
            .get_queued_operations()
296
94
            .await
297
94
            .err_tip(|| 
"Failed to get queued operations in do_try_match"0
)
?0
;
298
299
139
        while let Some(
action_state_result45
) = stream.next().await {
  Branch (299:19): [True: 0, False: 0]
  Branch (299:19): [Folded - Ignored]
  Branch (299:19): [True: 45, False: 94]
300
45
            result = result.merge(
301
45
                match_action_to_worker(
302
45
                    action_state_result.as_ref(),
303
45
                    self.worker_scheduler.as_ref(),
304
45
                    self.matching_engine_state_manager.as_ref(),
305
45
                    self.platform_property_manager.as_ref(),
306
45
                    self.maybe_origin_event_tx.as_ref(),
307
45
                )
308
45
                .await,
309
            );
310
        }
311
94
        result
312
94
    }
313
}
314
315
impl SimpleScheduler {
316
0
    pub fn new<A: AwaitedActionDb>(
317
0
        spec: &SimpleSpec,
318
0
        awaited_action_db: A,
319
0
        task_change_notify: Arc<Notify>,
320
0
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
321
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
322
0
        Self::new_with_callback(
323
0
            spec,
324
0
            awaited_action_db,
325
0
            || {
326
0
                // The cost of running `do_try_match()` is very high, but constant
327
0
                // in relation to the number of changes that have happened. This
328
0
                // means that grabbing this lock to process `do_try_match()` should
329
0
                // always yield to any other tasks that might want the lock. The
330
0
                // easiest and most fair way to do this is to sleep for a small
331
0
                // amount of time. Using something like tokio::task::yield_now()
332
0
                // does not yield as aggresively as we'd like if new futures are
333
0
                // scheduled within a future.
334
0
                tokio::time::sleep(Duration::from_millis(1))
335
0
            },
336
0
            task_change_notify,
337
0
            SystemTime::now,
338
0
            maybe_origin_event_tx,
339
0
        )
340
0
    }
341
342
22
    pub fn new_with_callback<
343
22
        Fut: Future<Output = ()> + Send,
344
22
        F: Fn() -> Fut + Send + Sync + 'static,
345
22
        A: AwaitedActionDb,
346
22
        I: InstantWrapper,
347
22
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
348
22
    >(
349
22
        spec: &SimpleSpec,
350
22
        awaited_action_db: A,
351
22
        on_matching_engine_run: F,
352
22
        task_change_notify: Arc<Notify>,
353
22
        now_fn: NowFn,
354
22
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
355
22
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
356
22
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
357
22
            spec.supported_platform_properties
358
22
                .clone()
359
22
                .unwrap_or_default(),
360
22
        ));
361
22
362
22
        let mut worker_timeout_s = spec.worker_timeout_s;
363
22
        if worker_timeout_s == 0 {
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [True: 0, False: 0]
  Branch (363:12): [Folded - Ignored]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 0, False: 1]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
  Branch (363:12): [True: 1, False: 0]
364
17
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
365
17
        
}5
366
367
22
        let mut client_action_timeout_s = spec.client_action_timeout_s;
368
22
        if client_action_timeout_s == 0 {
  Branch (368:12): [True: 0, False: 0]
  Branch (368:12): [True: 0, False: 0]
  Branch (368:12): [Folded - Ignored]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
369
21
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
370
21
        
}1
371
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
372
        // tasks are going to be dropped all over the place, this isn't a good
373
        // setting.
374
22
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (374:12): [True: 0, False: 0]
  Branch (374:12): [True: 0, False: 0]
  Branch (374:12): [Folded - Ignored]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
  Branch (374:12): [True: 0, False: 1]
375
0
            event!(Level::ERROR, client_action_timeout_s, "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.", CLIENT_KEEPALIVE_DURATION.as_secs());
376
22
        }
377
378
22
        let mut max_job_retries = spec.max_job_retries;
379
22
        if max_job_retries == 0 {
  Branch (379:12): [True: 0, False: 0]
  Branch (379:12): [True: 0, False: 0]
  Branch (379:12): [Folded - Ignored]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
  Branch (379:12): [True: 1, False: 0]
380
21
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
381
21
        
}1
382
383
22
        let worker_change_notify = Arc::new(Notify::new());
384
22
        let state_manager = SimpleSchedulerStateManager::new(
385
22
            max_job_retries,
386
22
            Duration::from_secs(worker_timeout_s),
387
22
            Duration::from_secs(client_action_timeout_s),
388
22
            awaited_action_db,
389
22
            now_fn,
390
22
        );
391
22
392
22
        let worker_scheduler = ApiWorkerScheduler::new(
393
22
            state_manager.clone(),
394
22
            platform_property_manager.clone(),
395
22
            spec.allocation_strategy,
396
22
            worker_change_notify.clone(),
397
22
            worker_timeout_s,
398
22
        );
399
22
400
22
        let worker_scheduler_clone = worker_scheduler.clone();
401
22
402
22
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
403
22
            let weak_inner = weak_self.clone();
404
22
            let task_worker_matching_spawn =
405
22
                spawn!("simple_scheduler_task_worker_matching", async move 
{21
406
                    // Break out of the loop only when the inner is dropped.
407
                    loop {
408
111
                        let task_change_fut = task_change_notify.notified();
409
111
                        let worker_change_fut = worker_change_notify.notified();
410
111
                        tokio::pin!(task_change_fut);
411
111
                        tokio::pin!(worker_change_fut);
412
111
                        // Wait for either of these futures to be ready.
413
111
                        let _ = futures::future::select(task_change_fut, worker_change_fut).await;
414
90
                        let result = match weak_inner.upgrade() {
415
90
                            Some(scheduler) => scheduler.do_try_match().await,
416
                            // If the inner went away it means the scheduler is shutting
417
                            // down, so we need to resolve our future.
418
0
                            None => return,
419
                        };
420
90
                        if let Err(
err1
) = result {
  Branch (420:32): [True: 0, False: 0]
  Branch (420:32): [True: 0, False: 0]
  Branch (420:32): [Folded - Ignored]
  Branch (420:32): [True: 0, False: 3]
  Branch (420:32): [True: 0, False: 1]
  Branch (420:32): [True: 0, False: 1]
  Branch (420:32): [True: 0, False: 0]
  Branch (420:32): [True: 0, False: 1]
  Branch (420:32): [True: 0, False: 3]
  Branch (420:32): [True: 0, False: 3]
  Branch (420:32): [True: 0, False: 4]
  Branch (420:32): [True: 0, False: 5]
  Branch (420:32): [True: 0, False: 7]
  Branch (420:32): [True: 0, False: 2]
  Branch (420:32): [True: 0, False: 4]
  Branch (420:32): [True: 0, False: 7]
  Branch (420:32): [True: 0, False: 6]
  Branch (420:32): [True: 0, False: 6]
  Branch (420:32): [True: 0, False: 9]
  Branch (420:32): [True: 0, False: 3]
  Branch (420:32): [True: 0, False: 3]
  Branch (420:32): [True: 0, False: 4]
  Branch (420:32): [True: 0, False: 7]
  Branch (420:32): [True: 1, False: 3]
  Branch (420:32): [True: 0, False: 7]
421
1
                            event!(Level::ERROR, ?err, "Error while running do_try_match");
422
89
                        }
423
424
90
                        on_matching_engine_run().await;
425
                    }
426
                    // Unreachable.
427
22
                
}0
);
428
22
            SimpleScheduler {
429
22
                matching_engine_state_manager: state_manager.clone(),
430
22
                client_state_manager: state_manager.clone(),
431
22
                worker_scheduler,
432
22
                platform_property_manager,
433
22
                maybe_origin_event_tx,
434
22
                _task_worker_matching_spawn: task_worker_matching_spawn,
435
22
            }
436
22
        });
437
22
        (action_scheduler, worker_scheduler_clone)
438
22
    }
439
}
440
441
#[async_trait]
442
impl ClientStateManager for SimpleScheduler {
443
    async fn add_action(
444
        &self,
445
        client_operation_id: OperationId,
446
        action_info: Arc<ActionInfo>,
447
27
    ) -> Result<Box<dyn ActionStateResult>, Error> {
448
27
        self.inner_add_action(client_operation_id, action_info)
449
27
            .await
450
54
    }
451
452
    async fn filter_operations<'a>(
453
        &'a self,
454
        filter: OperationFilter,
455
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
456
503
        self.inner_filter_operations(filter).await
457
1.00k
    }
458
459
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
460
0
        Some(self)
461
0
    }
462
}
463
464
#[async_trait]
465
impl KnownPlatformPropertyProvider for SimpleScheduler {
466
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
467
0
        Ok(self
468
0
            .worker_scheduler
469
0
            .get_platform_property_manager()
470
0
            .get_known_properties()
471
0
            .keys()
472
0
            .cloned()
473
0
            .collect())
474
0
    }
475
}
476
477
#[async_trait]
478
impl WorkerScheduler for SimpleScheduler {
479
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
480
0
        self.worker_scheduler.get_platform_property_manager()
481
0
    }
482
483
25
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
484
25
        self.worker_scheduler.add_worker(worker).await
485
50
    }
486
487
    async fn update_action(
488
        &self,
489
        worker_id: &WorkerId,
490
        operation_id: &OperationId,
491
        update: UpdateOperationType,
492
9
    ) -> Result<(), Error> {
493
9
        self.worker_scheduler
494
9
            .update_action(worker_id, operation_id, update)
495
9
            .await
496
18
    }
497
498
    async fn worker_keep_alive_received(
499
        &self,
500
        worker_id: &WorkerId,
501
        timestamp: WorkerTimestamp,
502
1
    ) -> Result<(), Error> {
503
1
        self.worker_scheduler
504
1
            .worker_keep_alive_received(worker_id, timestamp)
505
1
            .await
506
2
    }
507
508
1
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
509
1
        self.worker_scheduler.remove_worker(worker_id).await
510
2
    }
511
512
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
513
1
        self.worker_scheduler
514
1
            .remove_timedout_workers(now_timestamp)
515
1
            .await
516
2
    }
517
518
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
519
2
        self.worker_scheduler
520
2
            .set_drain_worker(worker_id, is_draining)
521
2
            .await
522
4
    }
523
}
524
525
impl RootMetricsComponent for SimpleScheduler {}