Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::{Future, FutureExt};
20
use nativelink_config::schedulers::SimpleSpec;
21
use nativelink_error::{Code, Error, ResultExt};
22
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
25
use nativelink_util::instant_wrapper::InstantWrapper;
26
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
27
use nativelink_util::operation_state_manager::{
28
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
29
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
30
};
31
use nativelink_util::origin_context::ActiveOriginContext;
32
use nativelink_util::origin_event::{ORIGIN_EVENT_COLLECTOR, OriginEventCollector, OriginMetadata};
33
use nativelink_util::spawn;
34
use nativelink_util::task::JoinHandleDropGuard;
35
use tokio::sync::{Notify, mpsc};
36
use tokio::time::Duration;
37
use tokio_stream::StreamExt;
38
use tracing::{Level, event, info_span};
39
40
use crate::api_worker_scheduler::ApiWorkerScheduler;
41
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
42
use crate::platform_property_manager::PlatformPropertyManager;
43
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
44
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
45
use crate::worker_scheduler::WorkerScheduler;
46
47
/// Default timeout for workers in seconds.
48
/// If this changes, remember to change the documentation in the config.
49
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
50
51
/// Mark operations as completed with error if no client has updated them
52
/// within this duration.
53
/// If this changes, remember to change the documentation in the config.
54
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
55
56
/// Default times a job can retry before failing.
57
/// If this changes, remember to change the documentation in the config.
58
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
59
60
struct SimpleSchedulerActionStateResult {
61
    client_operation_id: OperationId,
62
    action_state_result: Box<dyn ActionStateResult>,
63
}
64
65
impl SimpleSchedulerActionStateResult {
66
27
    fn new(
67
27
        client_operation_id: OperationId,
68
27
        action_state_result: Box<dyn ActionStateResult>,
69
27
    ) -> Self {
70
27
        Self {
71
27
            client_operation_id,
72
27
            action_state_result,
73
27
        }
74
27
    }
75
}
76
77
#[async_trait]
78
impl ActionStateResult for SimpleSchedulerActionStateResult {
79
6
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
80
3
        let (mut action_state, origin_metadata) = self
81
3
            .action_state_result
82
3
            .as_state()
83
3
            .await
84
3
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
85
        // We need to ensure the client is not aware of the downstream
86
        // operation id, so override it before it goes out.
87
3
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
88
3
        Ok((action_state, origin_metadata))
89
6
    }
90
91
88
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
92
44
        let (
mut action_state, origin_metadata41
) = self
93
44
            .action_state_result
94
44
            .changed()
95
44
            .await
96
41
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
97
        // We need to ensure the client is not aware of the downstream
98
        // operation id, so override it before it goes out.
99
41
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
100
41
        Ok((action_state, origin_metadata))
101
85
    }
102
103
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
104
0
        self.action_state_result
105
0
            .as_action_info()
106
0
            .await
107
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
108
0
    }
109
}
110
111
/// Engine used to manage the queued/running tasks and relationship with
112
/// the worker nodes. All state on how the workers and actions are interacting
113
/// should be held in this struct.
114
#[derive(MetricsComponent)]
115
pub struct SimpleScheduler {
116
    /// Manager for matching engine side of the state manager.
117
    #[metric(group = "matching_engine_state_manager")]
118
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
119
120
    /// Manager for client state of this scheduler.
121
    #[metric(group = "client_state_manager")]
122
    client_state_manager: Arc<dyn ClientStateManager>,
123
124
    /// Manager for platform of this scheduler.
125
    #[metric(group = "platform_properties")]
126
    platform_property_manager: Arc<PlatformPropertyManager>,
127
128
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
129
    /// order based on the allocation strategy.
130
    #[metric(group = "worker_scheduler")]
131
    worker_scheduler: Arc<ApiWorkerScheduler>,
132
133
    /// The sender to send origin events to the origin events.
134
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
135
136
    /// Background task that tries to match actions to workers. If this struct
137
    /// is dropped the spawn will be cancelled as well.
138
    _task_worker_matching_spawn: JoinHandleDropGuard<()>,
139
}
140
141
impl std::fmt::Debug for SimpleScheduler {
142
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143
0
        f.debug_struct("SimpleScheduler")
144
0
            .field("platform_property_manager", &self.platform_property_manager)
145
0
            .field("worker_scheduler", &self.worker_scheduler)
146
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
147
0
            .field(
148
0
                "_task_worker_matching_spawn",
149
0
                &self._task_worker_matching_spawn,
150
0
            )
151
0
            .finish_non_exhaustive()
152
0
    }
153
}
154
155
impl SimpleScheduler {
156
    /// Attempts to find a worker to execute an action and begins executing it.
157
    /// If an action is already running that is cacheable it may merge this
158
    /// action with the results and state changes of the already running
159
    /// action. If the task cannot be executed immediately it will be queued
160
    /// for execution based on priority and other metrics.
161
    /// All further updates to the action will be provided through the returned
162
    /// value.
163
27
    async fn inner_add_action(
164
27
        &self,
165
27
        client_operation_id: OperationId,
166
27
        action_info: Arc<ActionInfo>,
167
27
    ) -> Result<Box<dyn ActionStateResult>, Error> {
168
27
        let action_state_result = self
169
27
            .client_state_manager
170
27
            .add_action(client_operation_id.clone(), action_info)
171
27
            .await
172
27
            .err_tip(|| 
"In SimpleScheduler::add_action"0
)
?0
;
173
27
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
174
27
            client_operation_id.clone(),
175
27
            action_state_result,
176
27
        )))
177
27
    }
178
179
503
    async fn inner_filter_operations(
180
503
        &self,
181
503
        filter: OperationFilter,
182
503
    ) -> Result<ActionStateResultStream, Error> {
183
503
        self.client_state_manager
184
503
            .filter_operations(filter)
185
503
            .await
186
503
            .err_tip(|| 
"In SimpleScheduler::find_by_client_operation_id getting filter result"0
)
187
503
    }
188
189
94
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream, Error> {
190
94
        let filter = OperationFilter {
191
94
            stages: OperationStageFlags::Queued,
192
94
            order_by_priority_direction: Some(OrderDirection::Desc),
193
94
            ..Default::default()
194
94
        };
195
94
        self.matching_engine_state_manager
196
94
            .filter_operations(filter)
197
94
            .await
198
94
            .err_tip(|| 
"In SimpleScheduler::get_queued_operations getting filter result"0
)
199
94
    }
200
201
4
    pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
202
4
        self.do_try_match().await
203
4
    }
204
205
    // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
206
    // can create a map of capabilities of each worker and then try and match
207
    // the actions to the worker using the map lookup (ie. map reduce).
208
94
    async fn do_try_match(&self) -> Result<(), Error> {
209
45
        async fn match_action_to_worker(
210
45
            action_state_result: &dyn ActionStateResult,
211
45
            workers: &ApiWorkerScheduler,
212
45
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
213
45
            platform_property_manager: &PlatformPropertyManager,
214
45
            maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
215
45
        ) -> Result<(), Error> {
216
45
            let (action_info, maybe_origin_metadata) =
217
45
                action_state_result
218
45
                    .as_action_info()
219
45
                    .await
220
45
                    .err_tip(|| 
"Failed to get action_info from as_action_info_result stream"0
)
?0
;
221
222
            // TODO(allada) We should not compute this every time and instead store
223
            // it with the ActionInfo when we receive it.
224
45
            let platform_properties = platform_property_manager
225
45
                .make_platform_properties(action_info.platform_properties.clone())
226
45
                .err_tip(
227
0
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
228
0
                )?;
229
230
45
            let action_info = ActionInfoWithProps {
231
45
                inner: action_info,
232
45
                platform_properties,
233
45
            };
234
235
            // Try to find a worker for the action.
236
28
            let worker_id = {
237
45
                match workers
238
45
                    .find_worker_for_action(&action_info.platform_properties)
239
45
                    .await
240
                {
241
28
                    Some(worker_id) => worker_id,
242
                    // If we could not find a worker for the action,
243
                    // we have nothing to do.
244
17
                    None => return Ok(()),
245
                }
246
            };
247
248
28
            let attach_operation_fut = async move {
249
                // Extract the operation_id from the action_state.
250
28
                let operation_id = {
251
28
                    let (action_state, _origin_metadata) = action_state_result
252
28
                        .as_state()
253
28
                        .await
254
28
                        .err_tip(|| 
"Failed to get action_info from as_state_result stream"0
)
?0
;
255
28
                    action_state.client_operation_id.clone()
256
                };
257
258
                // Tell the matching engine that the operation is being assigned to a worker.
259
28
                let assign_result = matching_engine_state_manager
260
28
                    .assign_operation(&operation_id, Ok(&worker_id))
261
28
                    .await
262
28
                    .err_tip(|| 
"Failed to assign operation in do_try_match"1
);
263
28
                if let Err(
err1
) = assign_result {
  Branch (263:24): [True: 0, False: 0]
  Branch (263:24): [Folded - Ignored]
  Branch (263:24): [True: 1, False: 27]
264
1
                    if err.code == Code::Aborted {
  Branch (264:24): [True: 0, False: 0]
  Branch (264:24): [Folded - Ignored]
  Branch (264:24): [True: 0, False: 1]
265
                        // If the operation was aborted, it means that the operation was
266
                        // cancelled due to another operation being assigned to the worker.
267
0
                        return Ok(());
268
1
                    }
269
1
                    // Any other error is a real error.
270
1
                    return Err(err);
271
27
                }
272
27
273
27
                workers
274
27
                    .worker_notify_run_action(worker_id, operation_id, action_info)
275
27
                    .await
276
27
                    .err_tip(|| {
277
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
278
1
                    })
279
28
            };
280
28
            tokio::pin!(attach_operation_fut);
281
282
28
            let attach_operation_fut = if let Some(
origin_event_tx0
) = maybe_origin_event_tx {
  Branch (282:47): [True: 0, False: 0]
  Branch (282:47): [Folded - Ignored]
  Branch (282:47): [True: 0, False: 28]
283
0
                let mut ctx = ActiveOriginContext::fork().unwrap_or_default();
284
0
285
0
                // Populate our origin event collector with the origin metadata
286
0
                // associated with the action.
287
0
                ctx.replace_value(&ORIGIN_EVENT_COLLECTOR, move |maybe_old_collector| {
288
0
                    let origin_metadata = maybe_origin_metadata.unwrap_or_default();
289
0
                    let Some(old_collector) = maybe_old_collector else {
  Branch (289:25): [True: 0, False: 0]
  Branch (289:25): [Folded - Ignored]
  Branch (289:25): [True: 0, False: 0]
290
0
                        return Some(Arc::new(OriginEventCollector::new(
291
0
                            origin_event_tx.clone(),
292
0
                            origin_metadata,
293
0
                        )));
294
                    };
295
0
                    Some(Arc::new(old_collector.clone_with_metadata(origin_metadata)))
296
0
                });
297
0
                Arc::new(ctx)
298
0
                    .wrap_async(info_span!("do_try_match"), attach_operation_fut)
299
0
                    .left_future()
300
            } else {
301
28
                attach_operation_fut.right_future()
302
            };
303
28
            attach_operation_fut.await
304
45
        }
305
306
94
        let mut result = Ok(());
307
308
94
        let mut stream = self
309
94
            .get_queued_operations()
310
94
            .await
311
94
            .err_tip(|| 
"Failed to get queued operations in do_try_match"0
)
?0
;
312
313
139
        while let Some(
action_state_result45
) = stream.next().await {
  Branch (313:19): [True: 0, False: 0]
  Branch (313:19): [Folded - Ignored]
  Branch (313:19): [True: 45, False: 94]
314
45
            result = result.merge(
315
45
                match_action_to_worker(
316
45
                    action_state_result.as_ref(),
317
45
                    self.worker_scheduler.as_ref(),
318
45
                    self.matching_engine_state_manager.as_ref(),
319
45
                    self.platform_property_manager.as_ref(),
320
45
                    self.maybe_origin_event_tx.as_ref(),
321
45
                )
322
45
                .await,
323
            );
324
        }
325
94
        result
326
94
    }
327
}
328
329
impl SimpleScheduler {
330
0
    pub fn new<A: AwaitedActionDb>(
331
0
        spec: &SimpleSpec,
332
0
        awaited_action_db: A,
333
0
        task_change_notify: Arc<Notify>,
334
0
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
335
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
336
0
        Self::new_with_callback(
337
0
            spec,
338
0
            awaited_action_db,
339
0
            || {
340
0
                // The cost of running `do_try_match()` is very high, but constant
341
0
                // in relation to the number of changes that have happened. This
342
0
                // means that grabbing this lock to process `do_try_match()` should
343
0
                // always yield to any other tasks that might want the lock. The
344
0
                // easiest and most fair way to do this is to sleep for a small
345
0
                // amount of time. Using something like tokio::task::yield_now()
346
0
                // does not yield as aggresively as we'd like if new futures are
347
0
                // scheduled within a future.
348
0
                tokio::time::sleep(Duration::from_millis(1))
349
0
            },
350
0
            task_change_notify,
351
0
            SystemTime::now,
352
0
            maybe_origin_event_tx,
353
0
        )
354
0
    }
355
356
22
    pub fn new_with_callback<
357
22
        Fut: Future<Output = ()> + Send,
358
22
        F: Fn() -> Fut + Send + Sync + 'static,
359
22
        A: AwaitedActionDb,
360
22
        I: InstantWrapper,
361
22
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
362
22
    >(
363
22
        spec: &SimpleSpec,
364
22
        awaited_action_db: A,
365
22
        on_matching_engine_run: F,
366
22
        task_change_notify: Arc<Notify>,
367
22
        now_fn: NowFn,
368
22
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
369
22
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
370
22
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
371
22
            spec.supported_platform_properties
372
22
                .clone()
373
22
                .unwrap_or_default(),
374
22
        ));
375
22
376
22
        let mut worker_timeout_s = spec.worker_timeout_s;
377
22
        if worker_timeout_s == 0 {
  Branch (377:12): [True: 0, False: 0]
  Branch (377:12): [True: 0, False: 0]
  Branch (377:12): [Folded - Ignored]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 0, False: 1]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 0, False: 1]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 0, False: 1]
  Branch (377:12): [True: 0, False: 1]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 0, False: 1]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
  Branch (377:12): [True: 1, False: 0]
378
17
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
379
17
        
}5
380
381
22
        let mut client_action_timeout_s = spec.client_action_timeout_s;
382
22
        if client_action_timeout_s == 0 {
  Branch (382:12): [True: 0, False: 0]
  Branch (382:12): [True: 0, False: 0]
  Branch (382:12): [Folded - Ignored]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 0, False: 1]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
  Branch (382:12): [True: 1, False: 0]
383
21
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
384
21
        
}1
385
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
386
        // tasks are going to be dropped all over the place, this isn't a good
387
        // setting.
388
22
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (388:12): [True: 0, False: 0]
  Branch (388:12): [True: 0, False: 0]
  Branch (388:12): [Folded - Ignored]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 0, False: 1]
389
0
            event!(
390
0
                Level::ERROR,
391
                client_action_timeout_s,
392
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
393
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
394
            );
395
22
        }
396
397
22
        let mut max_job_retries = spec.max_job_retries;
398
22
        if max_job_retries == 0 {
  Branch (398:12): [True: 0, False: 0]
  Branch (398:12): [True: 0, False: 0]
  Branch (398:12): [Folded - Ignored]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 0, False: 1]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
  Branch (398:12): [True: 1, False: 0]
399
21
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
400
21
        
}1
401
402
22
        let worker_change_notify = Arc::new(Notify::new());
403
22
        let state_manager = SimpleSchedulerStateManager::new(
404
22
            max_job_retries,
405
22
            Duration::from_secs(worker_timeout_s),
406
22
            Duration::from_secs(client_action_timeout_s),
407
22
            awaited_action_db,
408
22
            now_fn,
409
22
        );
410
22
411
22
        let worker_scheduler = ApiWorkerScheduler::new(
412
22
            state_manager.clone(),
413
22
            platform_property_manager.clone(),
414
22
            spec.allocation_strategy,
415
22
            worker_change_notify.clone(),
416
22
            worker_timeout_s,
417
22
        );
418
22
419
22
        let worker_scheduler_clone = worker_scheduler.clone();
420
22
421
22
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
422
22
            let weak_inner = weak_self.clone();
423
22
            let task_worker_matching_spawn =
424
22
                spawn!("simple_scheduler_task_worker_matching", async move 
{21
425
                    // Break out of the loop only when the inner is dropped.
426
                    loop {
427
111
                        let task_change_fut = task_change_notify.notified();
428
111
                        let worker_change_fut = worker_change_notify.notified();
429
111
                        tokio::pin!(task_change_fut);
430
111
                        tokio::pin!(worker_change_fut);
431
111
                        // Wait for either of these futures to be ready.
432
111
                        let _ = futures::future::select(task_change_fut, worker_change_fut).await;
433
90
                        let result = match weak_inner.upgrade() {
434
90
                            Some(scheduler) => scheduler.do_try_match().await,
435
                            // If the inner went away it means the scheduler is shutting
436
                            // down, so we need to resolve our future.
437
0
                            None => return,
438
                        };
439
90
                        if let Err(
err1
) = result {
  Branch (439:32): [True: 0, False: 0]
  Branch (439:32): [True: 0, False: 0]
  Branch (439:32): [Folded - Ignored]
  Branch (439:32): [True: 0, False: 3]
  Branch (439:32): [True: 0, False: 1]
  Branch (439:32): [True: 0, False: 1]
  Branch (439:32): [True: 0, False: 0]
  Branch (439:32): [True: 0, False: 1]
  Branch (439:32): [True: 0, False: 3]
  Branch (439:32): [True: 0, False: 3]
  Branch (439:32): [True: 0, False: 4]
  Branch (439:32): [True: 0, False: 5]
  Branch (439:32): [True: 0, False: 7]
  Branch (439:32): [True: 0, False: 2]
  Branch (439:32): [True: 0, False: 4]
  Branch (439:32): [True: 0, False: 7]
  Branch (439:32): [True: 0, False: 6]
  Branch (439:32): [True: 0, False: 6]
  Branch (439:32): [True: 0, False: 9]
  Branch (439:32): [True: 0, False: 3]
  Branch (439:32): [True: 0, False: 3]
  Branch (439:32): [True: 0, False: 4]
  Branch (439:32): [True: 0, False: 7]
  Branch (439:32): [True: 1, False: 3]
  Branch (439:32): [True: 0, False: 7]
440
1
                            event!(Level::ERROR, ?err, "Error while running do_try_match");
441
89
                        }
442
443
90
                        on_matching_engine_run().await;
444
                    }
445
                    // Unreachable.
446
0
                });
447
22
            SimpleScheduler {
448
22
                matching_engine_state_manager: state_manager.clone(),
449
22
                client_state_manager: state_manager.clone(),
450
22
                worker_scheduler,
451
22
                platform_property_manager,
452
22
                maybe_origin_event_tx,
453
22
                _task_worker_matching_spawn: task_worker_matching_spawn,
454
22
            }
455
22
        });
456
22
        (action_scheduler, worker_scheduler_clone)
457
22
    }
458
}
459
460
#[async_trait]
461
impl ClientStateManager for SimpleScheduler {
462
    async fn add_action(
463
        &self,
464
        client_operation_id: OperationId,
465
        action_info: Arc<ActionInfo>,
466
54
    ) -> Result<Box<dyn ActionStateResult>, Error> {
467
27
        self.inner_add_action(client_operation_id, action_info)
468
27
            .await
469
54
    }
470
471
    async fn filter_operations<'a>(
472
        &'a self,
473
        filter: OperationFilter,
474
1.00k
    ) -> Result<ActionStateResultStream<'a>, Error> {
475
503
        self.inner_filter_operations(filter).await
476
1.00k
    }
477
478
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
479
0
        Some(self)
480
0
    }
481
}
482
483
#[async_trait]
484
impl KnownPlatformPropertyProvider for SimpleScheduler {
485
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
486
0
        Ok(self
487
0
            .worker_scheduler
488
0
            .get_platform_property_manager()
489
0
            .get_known_properties()
490
0
            .keys()
491
0
            .cloned()
492
0
            .collect())
493
0
    }
494
}
495
496
#[async_trait]
497
impl WorkerScheduler for SimpleScheduler {
498
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
499
0
        self.worker_scheduler.get_platform_property_manager()
500
0
    }
501
502
50
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
503
25
        self.worker_scheduler.add_worker(worker).await
504
50
    }
505
506
    async fn update_action(
507
        &self,
508
        worker_id: &WorkerId,
509
        operation_id: &OperationId,
510
        update: UpdateOperationType,
511
18
    ) -> Result<(), Error> {
512
9
        self.worker_scheduler
513
9
            .update_action(worker_id, operation_id, update)
514
9
            .await
515
18
    }
516
517
    async fn worker_keep_alive_received(
518
        &self,
519
        worker_id: &WorkerId,
520
        timestamp: WorkerTimestamp,
521
2
    ) -> Result<(), Error> {
522
1
        self.worker_scheduler
523
1
            .worker_keep_alive_received(worker_id, timestamp)
524
1
            .await
525
2
    }
526
527
2
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
528
1
        self.worker_scheduler.remove_worker(worker_id).await
529
2
    }
530
531
2
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
532
1
        self.worker_scheduler
533
1
            .remove_timedout_workers(now_timestamp)
534
1
            .await
535
2
    }
536
537
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
538
2
        self.worker_scheduler
539
2
            .set_drain_worker(worker_id, is_draining)
540
2
            .await
541
4
    }
542
}
543
544
impl RootMetricsComponent for SimpleScheduler {}