Coverage Report

Created: 2026-06-24 11:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{BTreeSet, HashMap};
16
use std::sync::Arc;
17
use std::time::{Instant, SystemTime};
18
19
use async_trait::async_trait;
20
use futures::{Future, StreamExt, future};
21
use nativelink_config::schedulers::SimpleSpec;
22
use nativelink_error::{Code, Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_proto::com::github::trace_machina::nativelink::events::{
25
    Event, OriginEvent, RequestEvent, event, request_event,
26
};
27
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::StartExecute;
28
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
29
use nativelink_util::instant_wrapper::InstantWrapper;
30
use nativelink_util::operation_state_manager::{
31
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
32
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
33
};
34
use nativelink_util::origin_event::{OriginMetadata, get_node_id};
35
use nativelink_util::shutdown_guard::ShutdownGuard;
36
use nativelink_util::spawn;
37
use nativelink_util::task::JoinHandleDropGuard;
38
use opentelemetry::KeyValue;
39
use opentelemetry::baggage::BaggageExt;
40
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
41
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
42
use tokio::sync::{Notify, mpsc};
43
use tokio::time::Duration;
44
use tracing::{debug, error, info, info_span, warn};
45
use uuid::Uuid;
46
47
use crate::api_worker_scheduler::ApiWorkerScheduler;
48
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
49
use crate::known_platform_property_provider::KnownPlatformPropertyProvider;
50
use crate::platform_property_manager::PlatformPropertyManager;
51
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
52
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
53
use crate::worker_registry::WorkerRegistry;
54
use crate::worker_scheduler::WorkerScheduler;
55
56
/// Default timeout for workers in seconds.
57
/// If this changes, remember to change the documentation in the config.
58
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
59
60
/// Mark operations as completed with error if no client has updated them
61
/// within this duration.
62
/// If this changes, remember to change the documentation in the config.
63
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
64
65
/// Default times a job can retry before failing.
66
/// If this changes, remember to change the documentation in the config.
67
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
68
69
struct SimpleSchedulerActionStateResult {
70
    client_operation_id: OperationId,
71
    action_state_result: Box<dyn ActionStateResult>,
72
}
73
74
impl SimpleSchedulerActionStateResult {
75
33
    fn new(
76
33
        client_operation_id: OperationId,
77
33
        action_state_result: Box<dyn ActionStateResult>,
78
33
    ) -> Self {
79
33
        Self {
80
33
            client_operation_id,
81
33
            action_state_result,
82
33
        }
83
33
    }
84
}
85
86
#[async_trait]
87
impl ActionStateResult for SimpleSchedulerActionStateResult {
88
4
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
89
        let (mut action_state, origin_metadata) = self
90
            .action_state_result
91
            .as_state()
92
            .await
93
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
94
        // We need to ensure the client is not aware of the downstream
95
        // operation id, so override it before it goes out.
96
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
97
        Ok((action_state, origin_metadata))
98
4
    }
99
100
51
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
101
        let (mut action_state, origin_metadata) = self
102
            .action_state_result
103
            .changed()
104
            .await
105
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
106
        // We need to ensure the client is not aware of the downstream
107
        // operation id, so override it before it goes out.
108
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
109
        Ok((action_state, origin_metadata))
110
51
    }
111
112
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
113
        self.action_state_result
114
            .as_action_info()
115
            .await
116
            .err_tip(|| "In SimpleSchedulerActionStateResult")
117
0
    }
118
}
119
120
/// Engine used to manage the queued/running tasks and relationship with
121
/// the worker nodes. All state on how the workers and actions are interacting
122
/// should be held in this struct.
123
#[derive(MetricsComponent)]
124
pub struct SimpleScheduler {
125
    /// Manager for matching engine side of the state manager.
126
    #[metric(group = "matching_engine_state_manager")]
127
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
128
129
    /// Manager for client state of this scheduler.
130
    #[metric(group = "client_state_manager")]
131
    client_state_manager: Arc<dyn ClientStateManager>,
132
133
    /// Manager for platform of this scheduler.
134
    #[metric(group = "platform_properties")]
135
    platform_property_manager: Arc<PlatformPropertyManager>,
136
137
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
138
    /// order based on the allocation strategy.
139
    #[metric(group = "worker_scheduler")]
140
    worker_scheduler: Arc<ApiWorkerScheduler>,
141
142
    /// The sender to send origin events to the origin events.
143
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
144
145
    /// Background task that tries to match actions to workers. If this struct
146
    /// is dropped the spawn will be cancelled as well.
147
    task_worker_matching_spawn: JoinHandleDropGuard<()>,
148
149
    /// Every duration, do logging of worker matching
150
    /// e.g. "worker busy", "can't find any worker"
151
    /// Set to None to disable. This is quite noisy, so we limit it
152
    worker_match_logging_interval: Option<Duration>,
153
}
154
155
impl core::fmt::Debug for SimpleScheduler {
156
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
157
0
        f.debug_struct("SimpleScheduler")
158
0
            .field("platform_property_manager", &self.platform_property_manager)
159
0
            .field("worker_scheduler", &self.worker_scheduler)
160
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
161
0
            .field(
162
0
                "task_worker_matching_spawn",
163
0
                &self.task_worker_matching_spawn,
164
0
            )
165
0
            .finish_non_exhaustive()
166
0
    }
167
}
168
169
impl SimpleScheduler {
170
1
    fn origin_event_id(event: &Event) -> String {
171
1
        Uuid::now_v6(&get_node_id(Some(event)))
172
1
            .hyphenated()
173
1
            .to_string()
174
1
    }
175
176
1
    fn scheduler_start_execute_event(
177
1
        worker_id: &WorkerId,
178
1
        operation_id: &OperationId,
179
1
        action_info: &ActionInfoWithProps,
180
1
    ) -> Event {
181
1
        let start_execute = StartExecute {
182
1
            execute_request: Some(action_info.inner.as_ref().into()),
183
1
            operation_id: operation_id.to_string(),
184
1
            queued_timestamp: Some(action_info.inner.insert_timestamp.into()),
185
1
            platform: Some((&action_info.platform_properties).into()),
186
1
            worker_id: worker_id.to_string(),
187
1
        };
188
1
        Event {
189
1
            event: Some(event::Event::Request(RequestEvent {
190
1
                event: Some(request_event::Event::SchedulerStartExecute(start_execute)),
191
1
            })),
192
1
        }
193
1
    }
194
195
1
    async fn publish_scheduler_start_execute(
196
1
        maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
197
1
        origin_metadata: &OriginMetadata,
198
1
        event_id: String,
199
1
        event: Event,
200
1
    ) {
201
1
        let Some(origin_event_tx) = maybe_origin_event_tx else {
202
0
            return;
203
        };
204
205
1
        let origin_event = OriginEvent {
206
1
            version: 0,
207
1
            event_id,
208
1
            parent_event_id: String::new(),
209
1
            bazel_request_metadata: origin_metadata.bazel_metadata.clone(),
210
1
            identity: origin_metadata.identity.clone(),
211
1
            event: Some(event),
212
1
        };
213
214
        // Awaited send (not try_send): backpressure rather than drop, so the
215
        // start-execute event that later resource-usage events reference as
216
        // their parent isn't silently lost when the queue is full.
217
1
        if let Err(
err0
) = origin_event_tx.send(origin_event).await {
218
0
            warn!(
219
                ?err,
220
                "Failed to publish scheduler start execute origin event"
221
            );
222
1
        }
223
1
    }
224
225
    /// Attempts to find a worker to execute an action and begins executing it.
226
    /// If an action is already running that is cacheable it may merge this
227
    /// action with the results and state changes of the already running
228
    /// action. If the task cannot be executed immediately it will be queued
229
    /// for execution based on priority and other metrics.
230
    /// All further updates to the action will be provided through the returned
231
    /// value.
232
33
    async fn inner_add_action(
233
33
        &self,
234
33
        client_operation_id: OperationId,
235
33
        action_info: Arc<ActionInfo>,
236
33
    ) -> Result<Box<dyn ActionStateResult>, Error> {
237
33
        let action_state_result = self
238
33
            .client_state_manager
239
33
            .add_action(client_operation_id.clone(), action_info)
240
33
            .await
241
33
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
242
33
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
243
33
            client_operation_id.clone(),
244
33
            action_state_result,
245
33
        )))
246
33
    }
247
248
504
    async fn inner_filter_operations(
249
504
        &self,
250
504
        filter: OperationFilter,
251
504
    ) -> Result<ActionStateResultStream<'_>, Error> {
252
504
        self.client_state_manager
253
504
            .filter_operations(filter)
254
504
            .await
255
504
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
256
504
    }
257
258
124
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream<'_>, Error> {
259
124
        let filter = OperationFilter {
260
124
            stages: OperationStageFlags::Queued,
261
124
            order_by_priority_direction: Some(OrderDirection::Desc),
262
124
            ..Default::default()
263
124
        };
264
124
        self.matching_engine_state_manager
265
124
            .filter_operations(filter)
266
124
            .await
267
124
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
268
124
    }
269
270
11
    pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
271
11
        self.do_try_match(true).await
272
11
    }
273
274
    // TODO(palfrey) This is an O(n*m) (aka n^2) algorithm. In theory we
275
    // can create a map of capabilities of each worker and then try and match
276
    // the actions to the worker using the map lookup (ie. map reduce).
277
124
    async fn do_try_match(&self, full_worker_logging: bool) -> Result<(), Error> {
278
58
        async fn match_action_to_worker(
279
58
            action_state_result: &dyn ActionStateResult,
280
58
            workers: &ApiWorkerScheduler,
281
58
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
282
58
            platform_property_manager: &PlatformPropertyManager,
283
58
            maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
284
58
            full_worker_logging: bool,
285
58
        ) -> Result<(), Error> {
286
58
            let (action_info, maybe_origin_metadata) =
287
58
                action_state_result
288
58
                    .as_action_info()
289
58
                    .await
290
58
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
291
292
            // TODO(palfrey) We should not compute this every time and instead store
293
            // it with the ActionInfo when we receive it.
294
58
            let platform_properties = platform_property_manager
295
58
                .make_platform_properties(action_info.platform_properties.clone())
296
58
                .err_tip(
297
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
298
0
                )?;
299
300
58
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
301
58
            let action_info = ActionInfoWithProps {
302
58
                inner: action_info,
303
58
                platform_properties,
304
58
                origin_metadata: origin_metadata.clone(),
305
58
                scheduler_start_execute_event_id: None,
306
58
            };
307
308
            // Try to find a worker for the action.
309
39
            let worker_id = {
310
58
                match workers
311
58
                    .find_worker_for_action(&action_info.platform_properties, full_worker_logging)
312
58
                    .await
313
                {
314
39
                    Some(worker_id) => worker_id,
315
                    // If we could not find a worker for the action,
316
                    // we have nothing to do.
317
19
                    None => return Ok(()),
318
                }
319
            };
320
321
39
            let event_origin_metadata = origin_metadata.clone();
322
39
            let attach_operation_fut = async move {
323
                // Extract the operation_id from the action_state.
324
39
                let operation_id = {
325
39
                    let (action_state, _origin_metadata) = action_state_result
326
39
                        .as_state()
327
39
                        .await
328
39
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
329
39
                    action_state.client_operation_id.clone()
330
                };
331
332
                // Tell the matching engine that the operation is being assigned to a worker.
333
39
                let assign_result = matching_engine_state_manager
334
39
                    .assign_operation(&operation_id, Ok(&worker_id))
335
39
                    .await
336
39
                    .err_tip(|| "Failed to assign operation in do_try_match");
337
39
                if let Err(
err5
) = assign_result {
338
5
                    if err.code == Code::Aborted {
339
                        // If the operation was aborted, it means that the operation was
340
                        // cancelled due to another operation being assigned to the worker.
341
4
                        return Ok(());
342
1
                    }
343
                    // Any other error is a real error.
344
1
                    return Err(err);
345
34
                }
346
347
34
                let mut action_info = action_info;
348
34
                let scheduler_start_execute_event = maybe_origin_event_tx.map(|_| 
{1
349
1
                    let event = SimpleScheduler::scheduler_start_execute_event(
350
1
                        &worker_id,
351
1
                        &operation_id,
352
1
                        &action_info,
353
                    );
354
1
                    let event_id = SimpleScheduler::origin_event_id(&event);
355
1
                    action_info.scheduler_start_execute_event_id = Some(event_id.clone());
356
1
                    (event_id, event)
357
1
                });
358
359
34
                debug!(%worker_id, %operation_id, ?action_info, "Notifying worker of operation");
360
34
                workers
361
34
                    .worker_notify_run_action(worker_id, operation_id, action_info)
362
34
                    .await
363
34
                    .err_tip(|| 
{1
364
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
365
1
                    })?;
366
367
33
                if let Some((
event_id1
,
event1
)) = scheduler_start_execute_event {
368
1
                    SimpleScheduler::publish_scheduler_start_execute(
369
1
                        maybe_origin_event_tx,
370
1
                        &event_origin_metadata,
371
1
                        event_id,
372
1
                        event,
373
1
                    )
374
1
                    .await;
375
32
                }
376
377
33
                Ok(())
378
39
            };
379
39
            tokio::pin!(attach_operation_fut);
380
381
39
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
382
                ENDUSER_ID,
383
39
                origin_metadata.identity,
384
            )]);
385
386
39
            info_span!("do_try_match")
387
39
                .in_scope(|| attach_operation_fut)
388
39
                .with_context(ctx)
389
39
                .await
390
58
        }
391
392
124
        let mut result = Ok(());
393
394
124
        let start = Instant::now();
395
396
124
        let mut stream = self
397
124
            .get_queued_operations()
398
124
            .await
399
124
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
400
401
124
        let query_elapsed = start.elapsed();
402
124
        if query_elapsed > Duration::from_secs(1) {
403
0
            warn!(
404
0
                elapsed_ms = query_elapsed.as_millis(),
405
                "Slow get_queued_operations query"
406
            );
407
124
        }
408
409
182
        while let Some(
action_state_result58
) = stream.next().await {
410
58
            result = result.merge(
411
58
                match_action_to_worker(
412
58
                    action_state_result.as_ref(),
413
58
                    self.worker_scheduler.as_ref(),
414
58
                    self.matching_engine_state_manager.as_ref(),
415
58
                    self.platform_property_manager.as_ref(),
416
58
                    self.maybe_origin_event_tx.as_ref(),
417
58
                    full_worker_logging,
418
58
                )
419
58
                .await,
420
            );
421
        }
422
423
124
        let total_elapsed = start.elapsed();
424
124
        if total_elapsed > Duration::from_secs(5) {
425
0
            warn!(
426
0
                total_ms = total_elapsed.as_millis(),
427
0
                query_ms = query_elapsed.as_millis(),
428
                "Slow do_try_match cycle"
429
            );
430
124
        }
431
432
124
        result
433
124
    }
434
}
435
436
impl SimpleScheduler {
437
1
    pub fn new<A: AwaitedActionDb>(
438
1
        spec: &SimpleSpec,
439
1
        awaited_action_db: A,
440
1
        task_change_notify: Arc<Notify>,
441
1
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
442
1
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
443
1
        Self::new_with_callback(
444
1
            spec,
445
1
            awaited_action_db,
446
0
            || {
447
                // The cost of running `do_try_match()` is very high, but constant
448
                // in relation to the number of changes that have happened. This
449
                // means that grabbing this lock to process `do_try_match()` should
450
                // always yield to any other tasks that might want the lock. The
451
                // easiest and most fair way to do this is to sleep for a small
452
                // amount of time. Using something like tokio::task::yield_now()
453
                // does not yield as aggressively as we'd like if new futures are
454
                // scheduled within a future.
455
0
                tokio::time::sleep(Duration::from_millis(1))
456
0
            },
457
1
            task_change_notify,
458
            SystemTime::now,
459
1
            maybe_origin_event_tx,
460
        )
461
1
    }
462
463
27
    pub fn new_with_callback<
464
27
        Fut: Future<Output = ()> + Send,
465
27
        F: Fn() -> Fut + Send + Sync + 'static,
466
27
        A: AwaitedActionDb,
467
27
        I: InstantWrapper,
468
27
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
469
27
    >(
470
27
        spec: &SimpleSpec,
471
27
        awaited_action_db: A,
472
27
        on_matching_engine_run: F,
473
27
        task_change_notify: Arc<Notify>,
474
27
        now_fn: NowFn,
475
27
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
476
27
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
477
27
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
478
27
            spec.supported_platform_properties
479
27
                .clone()
480
27
                .unwrap_or_default(),
481
        ));
482
483
27
        let mut worker_timeout_s = spec.worker_timeout_s;
484
27
        if worker_timeout_s == 0 {
485
22
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
486
22
        
}5
487
488
27
        let mut client_action_timeout_s = spec.client_action_timeout_s;
489
27
        if client_action_timeout_s == 0 {
490
26
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
491
26
        
}1
492
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
493
        // tasks are going to be dropped all over the place, this isn't a good
494
        // setting.
495
27
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
496
0
            error!(
497
                client_action_timeout_s,
498
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
499
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
500
            );
501
27
        }
502
503
27
        let mut max_job_retries = spec.max_job_retries;
504
27
        if max_job_retries == 0 {
505
25
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
506
25
        
}2
507
508
27
        let worker_change_notify = Arc::new(Notify::new());
509
510
        // Create shared worker registry for single heartbeat per worker.
511
27
        let worker_registry = Arc::new(WorkerRegistry::new());
512
513
27
        let state_manager = SimpleSchedulerStateManager::new(
514
27
            max_job_retries,
515
27
            Duration::from_secs(worker_timeout_s),
516
27
            Duration::from_secs(client_action_timeout_s),
517
27
            Duration::from_secs(spec.max_action_executing_timeout_s),
518
27
            awaited_action_db,
519
27
            now_fn,
520
27
            Some(worker_registry.clone()),
521
        );
522
523
27
        let worker_scheduler = ApiWorkerScheduler::new(
524
27
            state_manager.clone(),
525
27
            platform_property_manager.clone(),
526
27
            spec.allocation_strategy,
527
27
            worker_change_notify.clone(),
528
27
            worker_timeout_s,
529
27
            worker_registry,
530
27
            maybe_origin_event_tx.clone(),
531
        );
532
533
27
        let worker_scheduler_clone = worker_scheduler.clone();
534
535
27
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
536
27
            let weak_inner = weak_self.clone();
537
27
            let task_worker_matching_spawn =
538
27
                spawn!("simple_scheduler_task_worker_matching", async move 
{25
539
25
                    let mut last_match_successful = true;
540
25
                    let mut worker_match_logging_last: Option<Instant> = None;
541
                    // Break out of the loop only when the inner is dropped.
542
                    loop {
543
138
                        let task_change_fut = task_change_notify.notified();
544
138
                        let worker_change_fut = worker_change_notify.notified();
545
138
                        tokio::pin!(task_change_fut);
546
138
                        tokio::pin!(worker_change_fut);
547
                        // Wait for either of these futures to be ready.
548
138
                        let state_changed = future::select(task_change_fut, worker_change_fut);
549
138
                        if last_match_successful {
550
137
                            let _ = state_changed.await;
551
                        } else {
552
                            // If the last match failed, then run again after a short sleep.
553
                            // This resolves issues where we tried to re-schedule a job to
554
                            // a disconnected worker.  The sleep ensures we don't enter a
555
                            // hard loop if there's something wrong inside do_try_match.
556
1
                            let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
557
1
                            tokio::pin!(sleep_fut);
558
1
                            let _ = future::select(state_changed, sleep_fut).await;
559
                        }
560
561
113
                        let result = match weak_inner.upgrade() {
562
113
                            Some(scheduler) => {
563
113
                                let now = Instant::now();
564
113
                                let full_worker_logging = {
565
113
                                    match scheduler.worker_match_logging_interval {
566
108
                                        None => false,
567
5
                                        Some(duration) => match worker_match_logging_last {
568
2
                                            None => true,
569
3
                                            Some(when) => now.duration_since(when) >= duration,
570
                                        },
571
                                    }
572
                                };
573
574
113
                                let res = scheduler.do_try_match(full_worker_logging).await;
575
113
                                if full_worker_logging {
576
2
                                    let operations_stream = scheduler
577
2
                                        .matching_engine_state_manager
578
2
                                        .filter_operations(OperationFilter::default())
579
2
                                        .await
580
2
                                        .err_tip(|| "In action_scheduler getting filter result");
581
582
2
                                    let mut oldest_actions_in_state: HashMap<
583
2
                                        String,
584
2
                                        BTreeSet<Arc<ActionState>>,
585
2
                                    > = HashMap::new();
586
2
                                    let max_items = 5;
587
588
2
                                    match operations_stream {
589
2
                                        Ok(stream) => {
590
2
                                            let actions = stream
591
2
                                                .filter_map(|item| async move 
{0
592
0
                                                    match item.as_ref().as_state().await {
593
0
                                                        Ok((action_state, _origin_metadata)) => {
594
0
                                                            Some(action_state)
595
                                                        }
596
0
                                                        Err(e) => {
597
0
                                                            error!(
598
                                                                ?e,
599
                                                                "Failed to get action state!"
600
                                                            );
601
0
                                                            None
602
                                                        }
603
                                                    }
604
0
                                                })
605
2
                                                .collect::<Vec<_>>()
606
2
                                                .await;
607
2
                                            for 
action_state0
in &actions {
608
0
                                                let name = action_state.stage.name();
609
0
                                                if let Some(values) =
610
0
                                                    oldest_actions_in_state.get_mut(&name)
611
                                                {
612
0
                                                    values.insert(action_state.clone());
613
0
                                                    if values.len() > max_items {
614
0
                                                        values.pop_first();
615
0
                                                    }
616
0
                                                } else {
617
0
                                                    let mut values = BTreeSet::new();
618
0
                                                    values.insert(action_state.clone());
619
0
                                                    oldest_actions_in_state.insert(name, values);
620
0
                                                }
621
                                            }
622
                                        }
623
0
                                        Err(e) => {
624
0
                                            error!(?e, "Failed to get operations list!");
625
                                        }
626
                                    }
627
628
2
                                    for 
value0
in oldest_actions_in_state.values() {
629
0
                                        let mut items = vec![];
630
0
                                        for item in value {
631
0
                                            items.push(item.to_string());
632
0
                                        }
633
0
                                        info!(?items, "Oldest actions in state");
634
                                    }
635
636
2
                                    worker_match_logging_last.replace(now);
637
111
                                }
638
113
                                res
639
                            }
640
                            // If the inner went away it means the scheduler is shutting
641
                            // down, so we need to resolve our future.
642
0
                            None => return,
643
                        };
644
113
                        last_match_successful = result.is_ok();
645
113
                        if let Err(
err1
) = result {
646
1
                            error!(?err, "Error while running do_try_match");
647
112
                        }
648
649
113
                        on_matching_engine_run().await;
650
                    }
651
                    // Unreachable.
652
0
                });
653
654
27
            let worker_match_logging_interval = match spec.worker_match_logging_interval_s {
655
                // -1 or 0 means disabled (0 used to cause expensive logging on every call)
656
24
                -1 | 0 => None,
657
3
                signed_secs => {
658
3
                    if let Ok(
secs2
) = TryInto::<u64>::try_into(signed_secs) {
659
2
                        Some(Duration::from_secs(secs))
660
                    } else {
661
1
                        error!(
662
                            worker_match_logging_interval_s = spec.worker_match_logging_interval_s,
663
                            "Valid values for worker_match_logging_interval_s are -1, 0, or a positive integer, setting to disabled",
664
                        );
665
1
                        None
666
                    }
667
                }
668
            };
669
27
            Self {
670
27
                matching_engine_state_manager: state_manager.clone(),
671
27
                client_state_manager: state_manager.clone(),
672
27
                worker_scheduler,
673
27
                platform_property_manager,
674
27
                maybe_origin_event_tx,
675
27
                task_worker_matching_spawn,
676
27
                worker_match_logging_interval,
677
27
            }
678
27
        });
679
27
        (action_scheduler, worker_scheduler_clone)
680
27
    }
681
}
682
683
#[async_trait]
684
impl ClientStateManager for SimpleScheduler {
685
    async fn add_action(
686
        &self,
687
        client_operation_id: OperationId,
688
        action_info: Arc<ActionInfo>,
689
33
    ) -> Result<Box<dyn ActionStateResult>, Error> {
690
        self.inner_add_action(client_operation_id, action_info)
691
            .await
692
33
    }
693
694
    async fn filter_operations<'a>(
695
        &'a self,
696
        filter: OperationFilter,
697
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
698
        self.inner_filter_operations(filter).await
699
504
    }
700
}
701
702
#[async_trait]
703
impl KnownPlatformPropertyProvider for SimpleScheduler {
704
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
705
        Ok(self
706
            .worker_scheduler
707
            .get_platform_property_manager()
708
            .get_known_properties()
709
            .keys()
710
            .cloned()
711
            .collect())
712
0
    }
713
}
714
715
#[async_trait]
716
impl WorkerScheduler for SimpleScheduler {
717
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
718
0
        self.worker_scheduler.get_platform_property_manager()
719
0
    }
720
721
33
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
722
        self.worker_scheduler.add_worker(worker).await
723
33
    }
724
725
    async fn update_action(
726
        &self,
727
        worker_id: &WorkerId,
728
        operation_id: &OperationId,
729
        update: UpdateOperationType,
730
11
    ) -> Result<(), Error> {
731
        self.worker_scheduler
732
            .update_action(worker_id, operation_id, update)
733
            .await
734
11
    }
735
736
    async fn worker_keep_alive_received(
737
        &self,
738
        worker_id: &WorkerId,
739
        timestamp: WorkerTimestamp,
740
1
    ) -> Result<(), Error> {
741
        self.worker_scheduler
742
            .worker_keep_alive_received(worker_id, timestamp)
743
            .await
744
1
    }
745
746
5
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
747
        self.worker_scheduler.remove_worker(worker_id).await
748
5
    }
749
750
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
751
        self.worker_scheduler.shutdown(shutdown_guard).await;
752
0
    }
753
754
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
755
        self.worker_scheduler
756
            .remove_timedout_workers(now_timestamp)
757
            .await
758
1
    }
759
760
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
761
        self.worker_scheduler
762
            .set_drain_worker(worker_id, is_draining)
763
            .await
764
2
    }
765
}
766
767
impl RootMetricsComponent for SimpleScheduler {}