Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::Future;
20
use nativelink_config::schedulers::SimpleSpec;
21
use nativelink_error::{Code, Error, ResultExt};
22
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
25
use nativelink_util::instant_wrapper::InstantWrapper;
26
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
27
use nativelink_util::operation_state_manager::{
28
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
29
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
30
};
31
use nativelink_util::origin_event::OriginMetadata;
32
use nativelink_util::shutdown_guard::ShutdownGuard;
33
use nativelink_util::spawn;
34
use nativelink_util::task::JoinHandleDropGuard;
35
use opentelemetry::KeyValue;
36
use opentelemetry::baggage::BaggageExt;
37
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
38
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
39
use tokio::sync::{Notify, mpsc};
40
use tokio::time::Duration;
41
use tokio_stream::StreamExt;
42
use tracing::{error, info_span};
43
44
use crate::api_worker_scheduler::ApiWorkerScheduler;
45
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
46
use crate::platform_property_manager::PlatformPropertyManager;
47
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
48
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
49
use crate::worker_scheduler::WorkerScheduler;
50
51
/// Default timeout for workers in seconds.
52
/// If this changes, remember to change the documentation in the config.
53
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
54
55
/// Mark operations as completed with error if no client has updated them
56
/// within this duration.
57
/// If this changes, remember to change the documentation in the config.
58
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
59
60
/// Default times a job can retry before failing.
61
/// If this changes, remember to change the documentation in the config.
62
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
63
64
struct SimpleSchedulerActionStateResult {
65
    client_operation_id: OperationId,
66
    action_state_result: Box<dyn ActionStateResult>,
67
}
68
69
impl SimpleSchedulerActionStateResult {
70
30
    fn new(
71
30
        client_operation_id: OperationId,
72
30
        action_state_result: Box<dyn ActionStateResult>,
73
30
    ) -> Self {
74
30
        Self {
75
30
            client_operation_id,
76
30
            action_state_result,
77
30
        }
78
30
    }
79
}
80
81
#[async_trait]
82
impl ActionStateResult for SimpleSchedulerActionStateResult {
83
8
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
84
4
        let (mut action_state, origin_metadata) = self
85
4
            .action_state_result
86
4
            .as_state()
87
4
            .await
88
4
            .err_tip(|| "In SimpleSchedulerActionStateResult")
?0
;
89
        // We need to ensure the client is not aware of the downstream
90
        // operation id, so override it before it goes out.
91
4
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
92
4
        Ok((action_state, origin_metadata))
93
8
    }
94
95
92
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
96
46
        let (
mut action_state43
,
origin_metadata43
) = self
97
46
            .action_state_result
98
46
            .changed()
99
46
            .await
100
43
            .err_tip(|| "In SimpleSchedulerActionStateResult")
?0
;
101
        // We need to ensure the client is not aware of the downstream
102
        // operation id, so override it before it goes out.
103
43
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
104
43
        Ok((action_state, origin_metadata))
105
89
    }
106
107
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
108
0
        self.action_state_result
109
0
            .as_action_info()
110
0
            .await
111
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
112
0
    }
113
}
114
115
/// Engine used to manage the queued/running tasks and relationship with
116
/// the worker nodes. All state on how the workers and actions are interacting
117
/// should be held in this struct.
118
#[derive(MetricsComponent)]
119
pub struct SimpleScheduler {
120
    /// Manager for matching engine side of the state manager.
121
    #[metric(group = "matching_engine_state_manager")]
122
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
123
124
    /// Manager for client state of this scheduler.
125
    #[metric(group = "client_state_manager")]
126
    client_state_manager: Arc<dyn ClientStateManager>,
127
128
    /// Manager for platform of this scheduler.
129
    #[metric(group = "platform_properties")]
130
    platform_property_manager: Arc<PlatformPropertyManager>,
131
132
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
133
    /// order based on the allocation strategy.
134
    #[metric(group = "worker_scheduler")]
135
    worker_scheduler: Arc<ApiWorkerScheduler>,
136
137
    /// The sender to send origin events to the origin events.
138
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
139
140
    /// Background task that tries to match actions to workers. If this struct
141
    /// is dropped the spawn will be cancelled as well.
142
    task_worker_matching_spawn: JoinHandleDropGuard<()>,
143
}
144
145
impl core::fmt::Debug for SimpleScheduler {
146
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
147
0
        f.debug_struct("SimpleScheduler")
148
0
            .field("platform_property_manager", &self.platform_property_manager)
149
0
            .field("worker_scheduler", &self.worker_scheduler)
150
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
151
0
            .field(
152
0
                "task_worker_matching_spawn",
153
0
                &self.task_worker_matching_spawn,
154
0
            )
155
0
            .finish_non_exhaustive()
156
0
    }
157
}
158
159
impl SimpleScheduler {
160
    /// Attempts to find a worker to execute an action and begins executing it.
161
    /// If an action is already running that is cacheable it may merge this
162
    /// action with the results and state changes of the already running
163
    /// action. If the task cannot be executed immediately it will be queued
164
    /// for execution based on priority and other metrics.
165
    /// All further updates to the action will be provided through the returned
166
    /// value.
167
30
    async fn inner_add_action(
168
30
        &self,
169
30
        client_operation_id: OperationId,
170
30
        action_info: Arc<ActionInfo>,
171
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
172
30
        let action_state_result = self
173
30
            .client_state_manager
174
30
            .add_action(client_operation_id.clone(), action_info)
175
30
            .await
176
30
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
177
30
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
178
30
            client_operation_id.clone(),
179
30
            action_state_result,
180
30
        )))
181
30
    }
182
183
504
    async fn inner_filter_operations(
184
504
        &self,
185
504
        filter: OperationFilter,
186
504
    ) -> Result<ActionStateResultStream<'_>, Error> {
187
504
        self.client_state_manager
188
504
            .filter_operations(filter)
189
504
            .await
190
504
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
191
504
    }
192
193
111
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream<'_>, Error> {
194
111
        let filter = OperationFilter {
195
111
            stages: OperationStageFlags::Queued,
196
111
            order_by_priority_direction: Some(OrderDirection::Desc),
197
111
            ..Default::default()
198
111
        };
199
111
        self.matching_engine_state_manager
200
111
            .filter_operations(filter)
201
111
            .await
202
111
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
203
111
    }
204
205
9
    pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
206
9
        self.do_try_match().await
207
9
    }
208
209
    // TODO(palfrey) This is an O(n*m) (aka n^2) algorithm. In theory we
210
    // can create a map of capabilities of each worker and then try and match
211
    // the actions to the worker using the map lookup (ie. map reduce).
212
111
    async fn do_try_match(&self) -> Result<(), Error> {
213
50
        async fn match_action_to_worker(
214
50
            action_state_result: &dyn ActionStateResult,
215
50
            workers: &ApiWorkerScheduler,
216
50
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
217
50
            platform_property_manager: &PlatformPropertyManager,
218
50
        ) -> Result<(), Error> {
219
50
            let (action_info, maybe_origin_metadata) =
220
50
                action_state_result
221
50
                    .as_action_info()
222
50
                    .await
223
50
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
224
225
            // TODO(palfrey) We should not compute this every time and instead store
226
            // it with the ActionInfo when we receive it.
227
50
            let platform_properties = platform_property_manager
228
50
                .make_platform_properties(action_info.platform_properties.clone())
229
50
                .err_tip(
230
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
231
0
                )?;
232
233
50
            let action_info = ActionInfoWithProps {
234
50
                inner: action_info,
235
50
                platform_properties,
236
50
            };
237
238
            // Try to find a worker for the action.
239
32
            let worker_id = {
240
50
                match workers
241
50
                    .find_worker_for_action(&action_info.platform_properties)
242
50
                    .await
243
                {
244
32
                    Some(worker_id) => worker_id,
245
                    // If we could not find a worker for the action,
246
                    // we have nothing to do.
247
18
                    None => return Ok(()),
248
                }
249
            };
250
251
32
            let attach_operation_fut = async move {
252
                // Extract the operation_id from the action_state.
253
32
                let operation_id = {
254
32
                    let (action_state, _origin_metadata) = action_state_result
255
32
                        .as_state()
256
32
                        .await
257
32
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
258
32
                    action_state.client_operation_id.clone()
259
                };
260
261
                // Tell the matching engine that the operation is being assigned to a worker.
262
32
                let assign_result = matching_engine_state_manager
263
32
                    .assign_operation(&operation_id, Ok(&worker_id))
264
32
                    .await
265
32
                    .err_tip(|| "Failed to assign operation in do_try_match");
266
32
                if let Err(
err1
) = assign_result {
  Branch (266:24): [True: 0, False: 0]
  Branch (266:24): [Folded - Ignored]
  Branch (266:24): [True: 0, False: 4]
  Branch (266:24): [True: 1, False: 27]
267
1
                    if err.code == Code::Aborted {
  Branch (267:24): [True: 0, False: 0]
  Branch (267:24): [Folded - Ignored]
  Branch (267:24): [True: 0, False: 0]
  Branch (267:24): [True: 0, False: 1]
268
                        // If the operation was aborted, it means that the operation was
269
                        // cancelled due to another operation being assigned to the worker.
270
0
                        return Ok(());
271
1
                    }
272
                    // Any other error is a real error.
273
1
                    return Err(err);
274
31
                }
275
276
31
                workers
277
31
                    .worker_notify_run_action(worker_id, operation_id, action_info)
278
31
                    .await
279
31
                    .err_tip(|| 
{1
280
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
281
1
                    })
282
32
            };
283
32
            tokio::pin!(attach_operation_fut);
284
285
32
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
286
287
32
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
288
                ENDUSER_ID,
289
32
                origin_metadata.identity,
290
            )]);
291
292
32
            info_span!("do_try_match")
293
32
                .in_scope(|| attach_operation_fut)
294
32
                .with_context(ctx)
295
32
                .await
296
50
        }
297
298
111
        let mut result = Ok(());
299
300
111
        let mut stream = self
301
111
            .get_queued_operations()
302
111
            .await
303
111
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
304
305
161
        while let Some(
action_state_result50
) = stream.next().await {
  Branch (305:19): [True: 0, False: 0]
  Branch (305:19): [Folded - Ignored]
  Branch (305:19): [True: 5, False: 17]
  Branch (305:19): [True: 45, False: 94]
306
50
            result = result.merge(
307
50
                match_action_to_worker(
308
50
                    action_state_result.as_ref(),
309
50
                    self.worker_scheduler.as_ref(),
310
50
                    self.matching_engine_state_manager.as_ref(),
311
50
                    self.platform_property_manager.as_ref(),
312
50
                )
313
50
                .await,
314
            );
315
        }
316
111
        result
317
111
    }
318
}
319
320
impl SimpleScheduler {
321
0
    pub fn new<A: AwaitedActionDb>(
322
0
        spec: &SimpleSpec,
323
0
        awaited_action_db: A,
324
0
        task_change_notify: Arc<Notify>,
325
0
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
326
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
327
0
        Self::new_with_callback(
328
0
            spec,
329
0
            awaited_action_db,
330
0
            || {
331
                // The cost of running `do_try_match()` is very high, but constant
332
                // in relation to the number of changes that have happened. This
333
                // means that grabbing this lock to process `do_try_match()` should
334
                // always yield to any other tasks that might want the lock. The
335
                // easiest and most fair way to do this is to sleep for a small
336
                // amount of time. Using something like tokio::task::yield_now()
337
                // does not yield as aggressively as we'd like if new futures are
338
                // scheduled within a future.
339
0
                tokio::time::sleep(Duration::from_millis(1))
340
0
            },
341
0
            task_change_notify,
342
            SystemTime::now,
343
0
            maybe_origin_event_tx,
344
        )
345
0
    }
346
347
23
    pub fn new_with_callback<
348
23
        Fut: Future<Output = ()> + Send,
349
23
        F: Fn() -> Fut + Send + Sync + 'static,
350
23
        A: AwaitedActionDb,
351
23
        I: InstantWrapper,
352
23
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
353
23
    >(
354
23
        spec: &SimpleSpec,
355
23
        awaited_action_db: A,
356
23
        on_matching_engine_run: F,
357
23
        task_change_notify: Arc<Notify>,
358
23
        now_fn: NowFn,
359
23
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
360
23
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
361
23
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
362
23
            spec.supported_platform_properties
363
23
                .clone()
364
23
                .unwrap_or_default(),
365
        ));
366
367
23
        let mut worker_timeout_s = spec.worker_timeout_s;
368
23
        if worker_timeout_s == 0 {
  Branch (368:12): [True: 0, False: 0]
  Branch (368:12): [True: 0, False: 0]
  Branch (368:12): [Folded - Ignored]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 0, False: 1]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
  Branch (368:12): [True: 1, False: 0]
369
18
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
370
18
        
}5
371
372
23
        let mut client_action_timeout_s = spec.client_action_timeout_s;
373
23
        if client_action_timeout_s == 0 {
  Branch (373:12): [True: 0, False: 0]
  Branch (373:12): [True: 0, False: 0]
  Branch (373:12): [Folded - Ignored]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 0, False: 1]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
  Branch (373:12): [True: 1, False: 0]
374
22
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
375
22
        
}1
376
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
377
        // tasks are going to be dropped all over the place, this isn't a good
378
        // setting.
379
23
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (379:12): [True: 0, False: 0]
  Branch (379:12): [True: 0, False: 0]
  Branch (379:12): [Folded - Ignored]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
  Branch (379:12): [True: 0, False: 1]
380
0
            error!(
381
                client_action_timeout_s,
382
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
383
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
384
            );
385
23
        }
386
387
23
        let mut max_job_retries = spec.max_job_retries;
388
23
        if max_job_retries == 0 {
  Branch (388:12): [True: 0, False: 0]
  Branch (388:12): [True: 0, False: 0]
  Branch (388:12): [Folded - Ignored]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 0, False: 1]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
  Branch (388:12): [True: 1, False: 0]
389
22
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
390
22
        
}1
391
392
23
        let worker_change_notify = Arc::new(Notify::new());
393
23
        let state_manager = SimpleSchedulerStateManager::new(
394
23
            max_job_retries,
395
23
            Duration::from_secs(worker_timeout_s),
396
23
            Duration::from_secs(client_action_timeout_s),
397
23
            awaited_action_db,
398
23
            now_fn,
399
        );
400
401
23
        let worker_scheduler = ApiWorkerScheduler::new(
402
23
            state_manager.clone(),
403
23
            platform_property_manager.clone(),
404
23
            spec.allocation_strategy,
405
23
            worker_change_notify.clone(),
406
23
            worker_timeout_s,
407
        );
408
409
23
        let worker_scheduler_clone = worker_scheduler.clone();
410
411
23
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
412
23
            let weak_inner = weak_self.clone();
413
23
            let task_worker_matching_spawn =
414
23
                spawn!("simple_scheduler_task_worker_matching", async move 
{22
415
22
                    let mut last_match_successful = true;
416
                    // Break out of the loop only when the inner is dropped.
417
                    loop {
418
124
                        let task_change_fut = task_change_notify.notified();
419
124
                        let worker_change_fut = worker_change_notify.notified();
420
124
                        tokio::pin!(task_change_fut);
421
124
                        tokio::pin!(worker_change_fut);
422
                        // Wait for either of these futures to be ready.
423
124
                        let state_changed =
424
124
                            futures::future::select(task_change_fut, worker_change_fut);
425
124
                        if last_match_successful {
  Branch (425:28): [True: 0, False: 0]
  Branch (425:28): [True: 0, False: 0]
  Branch (425:28): [Folded - Ignored]
  Branch (425:28): [True: 13, False: 0]
  Branch (425:28): [True: 4, False: 0]
  Branch (425:28): [True: 2, False: 0]
  Branch (425:28): [True: 2, False: 0]
  Branch (425:28): [True: 0, False: 0]
  Branch (425:28): [True: 2, False: 0]
  Branch (425:28): [True: 4, False: 0]
  Branch (425:28): [True: 4, False: 0]
  Branch (425:28): [True: 5, False: 0]
  Branch (425:28): [True: 6, False: 0]
  Branch (425:28): [True: 8, False: 0]
  Branch (425:28): [True: 3, False: 0]
  Branch (425:28): [True: 5, False: 0]
  Branch (425:28): [True: 8, False: 0]
  Branch (425:28): [True: 7, False: 0]
  Branch (425:28): [True: 7, False: 0]
  Branch (425:28): [True: 10, False: 0]
  Branch (425:28): [True: 4, False: 0]
  Branch (425:28): [True: 4, False: 0]
  Branch (425:28): [True: 5, False: 0]
  Branch (425:28): [True: 8, False: 0]
  Branch (425:28): [True: 4, False: 1]
  Branch (425:28): [True: 8, False: 0]
426
123
                            let _ = state_changed.await;
427
                        } else {
428
                            // If the last match failed, then run again after a short sleep.
429
                            // This resolves issues where we tried to re-schedule a job to
430
                            // a disconnected worker.  The sleep ensures we don't enter a
431
                            // hard loop if there's something wrong inside do_try_match.
432
1
                            let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
433
1
                            tokio::pin!(sleep_fut);
434
1
                            let _ = futures::future::select(state_changed, sleep_fut).await;
435
                        }
436
102
                        let result = match weak_inner.upgrade() {
437
102
                            Some(scheduler) => scheduler.do_try_match().await,
438
                            // If the inner went away it means the scheduler is shutting
439
                            // down, so we need to resolve our future.
440
0
                            None => return,
441
                        };
442
102
                        last_match_successful = result.is_ok();
443
102
                        if let Err(
err1
) = result {
  Branch (443:32): [True: 0, False: 0]
  Branch (443:32): [True: 0, False: 0]
  Branch (443:32): [Folded - Ignored]
  Branch (443:32): [True: 0, False: 12]
  Branch (443:32): [True: 0, False: 3]
  Branch (443:32): [True: 0, False: 1]
  Branch (443:32): [True: 0, False: 1]
  Branch (443:32): [True: 0, False: 0]
  Branch (443:32): [True: 0, False: 1]
  Branch (443:32): [True: 0, False: 3]
  Branch (443:32): [True: 0, False: 3]
  Branch (443:32): [True: 0, False: 4]
  Branch (443:32): [True: 0, False: 5]
  Branch (443:32): [True: 0, False: 7]
  Branch (443:32): [True: 0, False: 2]
  Branch (443:32): [True: 0, False: 4]
  Branch (443:32): [True: 0, False: 7]
  Branch (443:32): [True: 0, False: 6]
  Branch (443:32): [True: 0, False: 6]
  Branch (443:32): [True: 0, False: 9]
  Branch (443:32): [True: 0, False: 3]
  Branch (443:32): [True: 0, False: 3]
  Branch (443:32): [True: 0, False: 4]
  Branch (443:32): [True: 0, False: 7]
  Branch (443:32): [True: 1, False: 3]
  Branch (443:32): [True: 0, False: 7]
444
1
                            error!(?err, "Error while running do_try_match");
445
101
                        }
446
447
102
                        on_matching_engine_run().await;
448
                    }
449
                    // Unreachable.
450
0
                });
451
23
            Self {
452
23
                matching_engine_state_manager: state_manager.clone(),
453
23
                client_state_manager: state_manager.clone(),
454
23
                worker_scheduler,
455
23
                platform_property_manager,
456
23
                maybe_origin_event_tx,
457
23
                task_worker_matching_spawn,
458
23
            }
459
23
        });
460
23
        (action_scheduler, worker_scheduler_clone)
461
23
    }
462
}
463
464
#[async_trait]
465
impl ClientStateManager for SimpleScheduler {
466
    async fn add_action(
467
        &self,
468
        client_operation_id: OperationId,
469
        action_info: Arc<ActionInfo>,
470
60
    ) -> Result<Box<dyn ActionStateResult>, Error> {
471
30
        self.inner_add_action(client_operation_id, action_info)
472
30
            .await
473
60
    }
474
475
    async fn filter_operations<'a>(
476
        &'a self,
477
        filter: OperationFilter,
478
1.00k
    ) -> Result<ActionStateResultStream<'a>, Error> {
479
504
        self.inner_filter_operations(filter).await
480
1.00k
    }
481
482
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
483
0
        Some(self)
484
0
    }
485
}
486
487
#[async_trait]
488
impl KnownPlatformPropertyProvider for SimpleScheduler {
489
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
490
0
        Ok(self
491
0
            .worker_scheduler
492
0
            .get_platform_property_manager()
493
0
            .get_known_properties()
494
0
            .keys()
495
0
            .cloned()
496
0
            .collect())
497
0
    }
498
}
499
500
#[async_trait]
501
impl WorkerScheduler for SimpleScheduler {
502
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
503
0
        self.worker_scheduler.get_platform_property_manager()
504
0
    }
505
506
58
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
507
29
        self.worker_scheduler.add_worker(worker).await
508
58
    }
509
510
    async fn update_action(
511
        &self,
512
        worker_id: &WorkerId,
513
        operation_id: &OperationId,
514
        update: UpdateOperationType,
515
18
    ) -> Result<(), Error> {
516
9
        self.worker_scheduler
517
9
            .update_action(worker_id, operation_id, update)
518
9
            .await
519
18
    }
520
521
    async fn notify_complete(
522
        &self,
523
        worker_id: &WorkerId,
524
        operation_id: &OperationId,
525
0
    ) -> Result<(), Error> {
526
0
        self.worker_scheduler
527
0
            .notify_complete(worker_id, operation_id)
528
0
            .await
529
0
    }
530
531
    async fn worker_keep_alive_received(
532
        &self,
533
        worker_id: &WorkerId,
534
        timestamp: WorkerTimestamp,
535
2
    ) -> Result<(), Error> {
536
1
        self.worker_scheduler
537
1
            .worker_keep_alive_received(worker_id, timestamp)
538
1
            .await
539
2
    }
540
541
10
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
542
5
        self.worker_scheduler.remove_worker(worker_id).await
543
10
    }
544
545
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
546
0
        self.worker_scheduler.shutdown(shutdown_guard).await;
547
0
    }
548
549
2
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
550
1
        self.worker_scheduler
551
1
            .remove_timedout_workers(now_timestamp)
552
1
            .await
553
2
    }
554
555
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
556
2
        self.worker_scheduler
557
2
            .set_drain_worker(worker_id, is_draining)
558
2
            .await
559
4
    }
560
}
561
562
impl RootMetricsComponent for SimpleScheduler {}