Coverage Report

Created: 2025-12-04 21:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{BTreeSet, HashMap};
16
use std::sync::Arc;
17
use std::time::{Instant, SystemTime};
18
19
use async_trait::async_trait;
20
use futures::{Future, StreamExt, future};
21
use nativelink_config::schedulers::SimpleSpec;
22
use nativelink_error::{Code, Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
25
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
28
use nativelink_util::operation_state_manager::{
29
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
30
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
31
};
32
use nativelink_util::origin_event::OriginMetadata;
33
use nativelink_util::shutdown_guard::ShutdownGuard;
34
use nativelink_util::spawn;
35
use nativelink_util::task::JoinHandleDropGuard;
36
use opentelemetry::KeyValue;
37
use opentelemetry::baggage::BaggageExt;
38
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
39
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
40
use tokio::sync::{Notify, mpsc};
41
use tokio::time::Duration;
42
use tracing::{error, info, info_span};
43
44
use crate::api_worker_scheduler::ApiWorkerScheduler;
45
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
46
use crate::platform_property_manager::PlatformPropertyManager;
47
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
48
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
49
use crate::worker_scheduler::WorkerScheduler;
50
51
/// Default timeout for workers in seconds.
52
/// If this changes, remember to change the documentation in the config.
53
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
54
55
/// Mark operations as completed with error if no client has updated them
56
/// within this duration.
57
/// If this changes, remember to change the documentation in the config.
58
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
59
60
/// Default times a job can retry before failing.
61
/// If this changes, remember to change the documentation in the config.
62
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
63
64
struct SimpleSchedulerActionStateResult {
65
    client_operation_id: OperationId,
66
    action_state_result: Box<dyn ActionStateResult>,
67
}
68
69
impl SimpleSchedulerActionStateResult {
70
30
    fn new(
71
30
        client_operation_id: OperationId,
72
30
        action_state_result: Box<dyn ActionStateResult>,
73
30
    ) -> Self {
74
30
        Self {
75
30
            client_operation_id,
76
30
            action_state_result,
77
30
        }
78
30
    }
79
}
80
81
#[async_trait]
82
impl ActionStateResult for SimpleSchedulerActionStateResult {
83
4
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
84
        let (mut action_state, origin_metadata) = self
85
            .action_state_result
86
            .as_state()
87
            .await
88
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
89
        // We need to ensure the client is not aware of the downstream
90
        // operation id, so override it before it goes out.
91
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
92
        Ok((action_state, origin_metadata))
93
4
    }
94
95
46
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
96
        let (mut action_state, origin_metadata) = self
97
            .action_state_result
98
            .changed()
99
            .await
100
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
101
        // We need to ensure the client is not aware of the downstream
102
        // operation id, so override it before it goes out.
103
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
104
        Ok((action_state, origin_metadata))
105
46
    }
106
107
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
108
        self.action_state_result
109
            .as_action_info()
110
            .await
111
            .err_tip(|| "In SimpleSchedulerActionStateResult")
112
0
    }
113
}
114
115
/// Engine used to manage the queued/running tasks and relationship with
116
/// the worker nodes. All state on how the workers and actions are interacting
117
/// should be held in this struct.
118
#[derive(MetricsComponent)]
119
pub struct SimpleScheduler {
120
    /// Manager for matching engine side of the state manager.
121
    #[metric(group = "matching_engine_state_manager")]
122
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
123
124
    /// Manager for client state of this scheduler.
125
    #[metric(group = "client_state_manager")]
126
    client_state_manager: Arc<dyn ClientStateManager>,
127
128
    /// Manager for platform of this scheduler.
129
    #[metric(group = "platform_properties")]
130
    platform_property_manager: Arc<PlatformPropertyManager>,
131
132
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
133
    /// order based on the allocation strategy.
134
    #[metric(group = "worker_scheduler")]
135
    worker_scheduler: Arc<ApiWorkerScheduler>,
136
137
    /// The sender to send origin events to the origin events.
138
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
139
140
    /// Background task that tries to match actions to workers. If this struct
141
    /// is dropped the spawn will be cancelled as well.
142
    task_worker_matching_spawn: JoinHandleDropGuard<()>,
143
144
    /// Every duration, do logging of worker matching
145
    /// e.g. "worker busy", "can't find any worker"
146
    /// Set to None to disable. This is quite noisy, so we limit it
147
    worker_match_logging_interval: Option<Duration>,
148
}
149
150
impl core::fmt::Debug for SimpleScheduler {
151
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152
0
        f.debug_struct("SimpleScheduler")
153
0
            .field("platform_property_manager", &self.platform_property_manager)
154
0
            .field("worker_scheduler", &self.worker_scheduler)
155
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
156
0
            .field(
157
0
                "task_worker_matching_spawn",
158
0
                &self.task_worker_matching_spawn,
159
0
            )
160
0
            .finish_non_exhaustive()
161
0
    }
162
}
163
164
impl SimpleScheduler {
165
    /// Attempts to find a worker to execute an action and begins executing it.
166
    /// If an action is already running that is cacheable it may merge this
167
    /// action with the results and state changes of the already running
168
    /// action. If the task cannot be executed immediately it will be queued
169
    /// for execution based on priority and other metrics.
170
    /// All further updates to the action will be provided through the returned
171
    /// value.
172
30
    async fn inner_add_action(
173
30
        &self,
174
30
        client_operation_id: OperationId,
175
30
        action_info: Arc<ActionInfo>,
176
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
177
30
        let action_state_result = self
178
30
            .client_state_manager
179
30
            .add_action(client_operation_id.clone(), action_info)
180
30
            .await
181
30
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
182
30
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
183
30
            client_operation_id.clone(),
184
30
            action_state_result,
185
30
        )))
186
30
    }
187
188
504
    async fn inner_filter_operations(
189
504
        &self,
190
504
        filter: OperationFilter,
191
504
    ) -> Result<ActionStateResultStream<'_>, Error> {
192
504
        self.client_state_manager
193
504
            .filter_operations(filter)
194
504
            .await
195
504
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
196
504
    }
197
198
111
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream<'_>, Error> {
199
111
        let filter = OperationFilter {
200
111
            stages: OperationStageFlags::Queued,
201
111
            order_by_priority_direction: Some(OrderDirection::Desc),
202
111
            ..Default::default()
203
111
        };
204
111
        self.matching_engine_state_manager
205
111
            .filter_operations(filter)
206
111
            .await
207
111
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
208
111
    }
209
210
9
    
pub async fn do_try_match_for_test(&self) -> Result<(), Error>0
{
211
9
        self.do_try_match(true).await
212
9
    }
213
214
    // TODO(palfrey) This is an O(n*m) (aka n^2) algorithm. In theory we
215
    // can create a map of capabilities of each worker and then try and match
216
    // the actions to the worker using the map lookup (ie. map reduce).
217
111
    async fn do_try_match(&self, full_worker_logging: bool) -> Result<(), Error> {
218
50
        async fn match_action_to_worker(
219
50
            action_state_result: &dyn ActionStateResult,
220
50
            workers: &ApiWorkerScheduler,
221
50
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
222
50
            platform_property_manager: &PlatformPropertyManager,
223
50
            full_worker_logging: bool,
224
50
        ) -> Result<(), Error> {
225
50
            let (action_info, maybe_origin_metadata) =
226
50
                action_state_result
227
50
                    .as_action_info()
228
50
                    .await
229
50
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
230
231
            // TODO(palfrey) We should not compute this every time and instead store
232
            // it with the ActionInfo when we receive it.
233
50
            let platform_properties = platform_property_manager
234
50
                .make_platform_properties(action_info.platform_properties.clone())
235
50
                .err_tip(
236
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
237
0
                )?;
238
239
50
            let action_info = ActionInfoWithProps {
240
50
                inner: action_info,
241
50
                platform_properties,
242
50
            };
243
244
            // Try to find a worker for the action.
245
32
            let worker_id = {
246
50
                match workers
247
50
                    .find_worker_for_action(&action_info.platform_properties, full_worker_logging)
248
50
                    .await
249
                {
250
32
                    Some(worker_id) => worker_id,
251
                    // If we could not find a worker for the action,
252
                    // we have nothing to do.
253
18
                    None => return Ok(()),
254
                }
255
            };
256
257
32
            let attach_operation_fut = async move {
258
                // Extract the operation_id from the action_state.
259
32
                let operation_id = {
260
32
                    let (action_state, _origin_metadata) = action_state_result
261
32
                        .as_state()
262
32
                        .await
263
32
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
264
32
                    action_state.client_operation_id.clone()
265
                };
266
267
                // Tell the matching engine that the operation is being assigned to a worker.
268
32
                let assign_result = matching_engine_state_manager
269
32
                    .assign_operation(&operation_id, Ok(&worker_id))
270
32
                    .await
271
32
                    .err_tip(|| "Failed to assign operation in do_try_match");
272
32
                if let Err(
err1
) = assign_result {
  Branch (272:24): [True: 0, False: 0]
  Branch (272:24): [Folded - Ignored]
  Branch (272:24): [True: 0, False: 4]
  Branch (272:24): [True: 1, False: 27]
273
1
                    if err.code == Code::Aborted {
  Branch (273:24): [True: 0, False: 0]
  Branch (273:24): [Folded - Ignored]
  Branch (273:24): [True: 0, False: 0]
  Branch (273:24): [True: 0, False: 1]
274
                        // If the operation was aborted, it means that the operation was
275
                        // cancelled due to another operation being assigned to the worker.
276
0
                        return Ok(());
277
1
                    }
278
                    // Any other error is a real error.
279
1
                    return Err(err);
280
31
                }
281
282
31
                workers
283
31
                    .worker_notify_run_action(worker_id, operation_id, action_info)
284
31
                    .await
285
31
                    .err_tip(|| 
{1
286
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
287
1
                    })
288
32
            };
289
32
            tokio::pin!(attach_operation_fut);
290
291
32
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
292
293
32
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
294
                ENDUSER_ID,
295
32
                origin_metadata.identity,
296
            )]);
297
298
32
            info_span!("do_try_match")
299
32
                .in_scope(|| attach_operation_fut)
300
32
                .with_context(ctx)
301
32
                .await
302
50
        }
303
304
111
        let mut result = Ok(());
305
306
111
        let mut stream = self
307
111
            .get_queued_operations()
308
111
            .await
309
111
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
310
311
161
        while let Some(
action_state_result50
) = stream.next().await {
  Branch (311:19): [True: 0, False: 0]
  Branch (311:19): [Folded - Ignored]
  Branch (311:19): [True: 5, False: 17]
  Branch (311:19): [True: 45, False: 94]
312
50
            result = result.merge(
313
50
                match_action_to_worker(
314
50
                    action_state_result.as_ref(),
315
50
                    self.worker_scheduler.as_ref(),
316
50
                    self.matching_engine_state_manager.as_ref(),
317
50
                    self.platform_property_manager.as_ref(),
318
50
                    full_worker_logging,
319
50
                )
320
50
                .await,
321
            );
322
        }
323
324
111
        result
325
111
    }
326
}
327
328
impl SimpleScheduler {
329
1
    pub fn new<A: AwaitedActionDb>(
330
1
        spec: &SimpleSpec,
331
1
        awaited_action_db: A,
332
1
        task_change_notify: Arc<Notify>,
333
1
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
334
1
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
335
1
        Self::new_with_callback(
336
1
            spec,
337
1
            awaited_action_db,
338
0
            || {
339
                // The cost of running `do_try_match()` is very high, but constant
340
                // in relation to the number of changes that have happened. This
341
                // means that grabbing this lock to process `do_try_match()` should
342
                // always yield to any other tasks that might want the lock. The
343
                // easiest and most fair way to do this is to sleep for a small
344
                // amount of time. Using something like tokio::task::yield_now()
345
                // does not yield as aggressively as we'd like if new futures are
346
                // scheduled within a future.
347
0
                tokio::time::sleep(Duration::from_millis(1))
348
0
            },
349
1
            task_change_notify,
350
            SystemTime::now,
351
1
            maybe_origin_event_tx,
352
        )
353
1
    }
354
355
24
    pub fn new_with_callback<
356
24
        Fut: Future<Output = ()> + Send,
357
24
        F: Fn() -> Fut + Send + Sync + 'static,
358
24
        A: AwaitedActionDb,
359
24
        I: InstantWrapper,
360
24
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
361
24
    >(
362
24
        spec: &SimpleSpec,
363
24
        awaited_action_db: A,
364
24
        on_matching_engine_run: F,
365
24
        task_change_notify: Arc<Notify>,
366
24
        now_fn: NowFn,
367
24
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
368
24
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
369
24
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
370
24
            spec.supported_platform_properties
371
24
                .clone()
372
24
                .unwrap_or_default(),
373
        ));
374
375
24
        let mut worker_timeout_s = spec.worker_timeout_s;
376
24
        if worker_timeout_s == 0 {
  Branch (376:12): [True: 0, False: 0]
  Branch (376:12): [True: 0, False: 0]
  Branch (376:12): [Folded - Ignored]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 0, False: 1]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 0, False: 1]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 0, False: 1]
  Branch (376:12): [True: 0, False: 1]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 0, False: 1]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
  Branch (376:12): [True: 1, False: 0]
377
19
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
378
19
        
}5
379
380
24
        let mut client_action_timeout_s = spec.client_action_timeout_s;
381
24
        if client_action_timeout_s == 0 {
  Branch (381:12): [True: 0, False: 0]
  Branch (381:12): [True: 0, False: 0]
  Branch (381:12): [Folded - Ignored]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 0, False: 1]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
  Branch (381:12): [True: 1, False: 0]
382
23
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
383
23
        
}1
384
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
385
        // tasks are going to be dropped all over the place, this isn't a good
386
        // setting.
387
24
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (387:12): [True: 0, False: 0]
  Branch (387:12): [True: 0, False: 0]
  Branch (387:12): [Folded - Ignored]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
  Branch (387:12): [True: 0, False: 1]
388
0
            error!(
389
                client_action_timeout_s,
390
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
391
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
392
            );
393
24
        }
394
395
24
        let mut max_job_retries = spec.max_job_retries;
396
24
        if max_job_retries == 0 {
  Branch (396:12): [True: 0, False: 0]
  Branch (396:12): [True: 0, False: 0]
  Branch (396:12): [Folded - Ignored]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
397
23
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
398
23
        
}1
399
400
24
        let worker_change_notify = Arc::new(Notify::new());
401
24
        let state_manager = SimpleSchedulerStateManager::new(
402
24
            max_job_retries,
403
24
            Duration::from_secs(worker_timeout_s),
404
24
            Duration::from_secs(client_action_timeout_s),
405
24
            awaited_action_db,
406
24
            now_fn,
407
        );
408
409
24
        let worker_scheduler = ApiWorkerScheduler::new(
410
24
            state_manager.clone(),
411
24
            platform_property_manager.clone(),
412
24
            spec.allocation_strategy,
413
24
            worker_change_notify.clone(),
414
24
            worker_timeout_s,
415
        );
416
417
24
        let worker_scheduler_clone = worker_scheduler.clone();
418
419
24
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
420
24
            let weak_inner = weak_self.clone();
421
24
            let task_worker_matching_spawn =
422
24
                spawn!("simple_scheduler_task_worker_matching", async move 
{22
423
22
                    let mut last_match_successful = true;
424
22
                    let mut worker_match_logging_last: Option<Instant> = None;
425
                    // Break out of the loop only when the inner is dropped.
426
                    loop {
427
124
                        let task_change_fut = task_change_notify.notified();
428
124
                        let worker_change_fut = worker_change_notify.notified();
429
124
                        tokio::pin!(task_change_fut);
430
124
                        tokio::pin!(worker_change_fut);
431
                        // Wait for either of these futures to be ready.
432
124
                        let state_changed = future::select(task_change_fut, worker_change_fut);
433
124
                        if last_match_successful {
  Branch (433:28): [True: 0, False: 0]
  Branch (433:28): [True: 0, False: 0]
  Branch (433:28): [Folded - Ignored]
  Branch (433:28): [True: 13, False: 0]
  Branch (433:28): [True: 4, False: 0]
  Branch (433:28): [True: 2, False: 0]
  Branch (433:28): [True: 2, False: 0]
  Branch (433:28): [True: 0, False: 0]
  Branch (433:28): [True: 2, False: 0]
  Branch (433:28): [True: 4, False: 0]
  Branch (433:28): [True: 4, False: 0]
  Branch (433:28): [True: 5, False: 0]
  Branch (433:28): [True: 6, False: 0]
  Branch (433:28): [True: 8, False: 0]
  Branch (433:28): [True: 3, False: 0]
  Branch (433:28): [True: 5, False: 0]
  Branch (433:28): [True: 8, False: 0]
  Branch (433:28): [True: 7, False: 0]
  Branch (433:28): [True: 7, False: 0]
  Branch (433:28): [True: 10, False: 0]
  Branch (433:28): [True: 4, False: 0]
  Branch (433:28): [True: 4, False: 0]
  Branch (433:28): [True: 5, False: 0]
  Branch (433:28): [True: 8, False: 0]
  Branch (433:28): [True: 4, False: 1]
  Branch (433:28): [True: 8, False: 0]
  Branch (433:28): [True: 0, False: 0]
434
123
                            let _ = state_changed.await;
435
                        } else {
436
                            // If the last match failed, then run again after a short sleep.
437
                            // This resolves issues where we tried to re-schedule a job to
438
                            // a disconnected worker.  The sleep ensures we don't enter a
439
                            // hard loop if there's something wrong inside do_try_match.
440
1
                            let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
441
1
                            tokio::pin!(sleep_fut);
442
1
                            let _ = future::select(state_changed, sleep_fut).await;
443
                        }
444
445
102
                        let result = match weak_inner.upgrade() {
446
102
                            Some(scheduler) => {
447
102
                                let now = Instant::now();
448
102
                                let full_worker_logging = {
449
102
                                    match scheduler.worker_match_logging_interval {
450
0
                                        None => false,
451
102
                                        Some(duration) => match worker_match_logging_last {
452
22
                                            None => true,
453
80
                                            Some(when) => now.duration_since(when) >= duration,
454
                                        },
455
                                    }
456
                                };
457
458
102
                                let res = scheduler.do_try_match(full_worker_logging).await;
459
102
                                if full_worker_logging {
  Branch (459:36): [True: 0, False: 0]
  Branch (459:36): [True: 0, False: 0]
  Branch (459:36): [Folded - Ignored]
  Branch (459:36): [True: 12, False: 0]
  Branch (459:36): [True: 3, False: 0]
  Branch (459:36): [True: 1, False: 0]
  Branch (459:36): [True: 1, False: 0]
  Branch (459:36): [True: 0, False: 0]
  Branch (459:36): [True: 1, False: 0]
  Branch (459:36): [True: 3, False: 0]
  Branch (459:36): [True: 3, False: 0]
  Branch (459:36): [True: 4, False: 0]
  Branch (459:36): [True: 5, False: 0]
  Branch (459:36): [True: 7, False: 0]
  Branch (459:36): [True: 2, False: 0]
  Branch (459:36): [True: 4, False: 0]
  Branch (459:36): [True: 7, False: 0]
  Branch (459:36): [True: 6, False: 0]
  Branch (459:36): [True: 6, False: 0]
  Branch (459:36): [True: 9, False: 0]
  Branch (459:36): [True: 3, False: 0]
  Branch (459:36): [True: 3, False: 0]
  Branch (459:36): [True: 4, False: 0]
  Branch (459:36): [True: 7, False: 0]
  Branch (459:36): [True: 4, False: 0]
  Branch (459:36): [True: 7, False: 0]
  Branch (459:36): [True: 0, False: 0]
460
102
                                    let operations_stream = scheduler
461
102
                                        .matching_engine_state_manager
462
102
                                        .filter_operations(OperationFilter::default())
463
102
                                        .await
464
102
                                        .err_tip(|| "In action_scheduler getting filter result");
465
466
102
                                    let mut oldest_actions_in_state: HashMap<
467
102
                                        String,
468
102
                                        BTreeSet<Arc<ActionState>>,
469
102
                                    > = HashMap::new();
470
102
                                    let max_items = 5;
471
472
102
                                    match operations_stream {
473
102
                                        Ok(stream) => {
474
102
                                            let actions = stream
475
105
                                                .
filter_map102
(|item| async move {
476
105
                                                    match item.as_ref().as_state().await {
477
105
                                                        Ok((action_state, _origin_metadata)) => {
478
105
                                                            Some(action_state)
479
                                                        }
480
0
                                                        Err(e) => {
481
0
                                                            error!(
482
                                                                ?e,
483
0
                                                                "Failed to get action state!"
484
                                                            );
485
0
                                                            None
486
                                                        }
487
                                                    }
488
210
                                                })
489
102
                                                .collect::<Vec<_>>()
490
102
                                                .await;
491
207
                                            for 
action_state105
in &actions {
492
105
                                                let name = action_state.stage.name();
493
9
                                                if let Some(values) =
  Branch (493:56): [True: 0, False: 0]
  Branch (493:56): [True: 0, False: 0]
  Branch (493:56): [Folded - Ignored]
  Branch (493:56): [True: 0, False: 12]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 0, False: 0]
  Branch (493:56): [True: 0, False: 0]
  Branch (493:56): [True: 0, False: 0]
  Branch (493:56): [True: 0, False: 1]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 1, False: 6]
  Branch (493:56): [True: 0, False: 5]
  Branch (493:56): [True: 0, False: 6]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 0, False: 3]
  Branch (493:56): [True: 2, False: 7]
  Branch (493:56): [True: 0, False: 5]
  Branch (493:56): [True: 0, False: 9]
  Branch (493:56): [True: 6, False: 8]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 0, False: 2]
  Branch (493:56): [True: 0, False: 3]
  Branch (493:56): [True: 0, False: 6]
  Branch (493:56): [True: 0, False: 3]
  Branch (493:56): [True: 0, False: 10]
  Branch (493:56): [True: 0, False: 0]
494
105
                                                    oldest_actions_in_state.get_mut(&name)
495
                                                {
496
9
                                                    values.insert(action_state.clone());
497
9
                                                    if values.len() > max_items {
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [Folded - Ignored]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 1]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 2]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 6]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
  Branch (497:56): [True: 0, False: 0]
498
0
                                                        values.pop_first();
499
9
                                                    }
500
96
                                                } else {
501
96
                                                    let mut values = BTreeSet::new();
502
96
                                                    values.insert(action_state.clone());
503
96
                                                    oldest_actions_in_state.insert(name, values);
504
96
                                                }
505
                                            }
506
                                        }
507
0
                                        Err(e) => {
508
0
                                            error!(?e, "Failed to get operations list!");
509
                                        }
510
                                    }
511
512
102
                                    for 
value96
in oldest_actions_in_state.values() {
513
96
                                        let mut items = vec![];
514
192
                                        for 
item96
in value {
515
96
                                            items.push(item.to_string());
516
96
                                        }
517
96
                                        info!(?items, "Oldest actions in state");
518
                                    }
519
520
102
                                    worker_match_logging_last.replace(now);
521
0
                                }
522
102
                                res
523
                            }
524
                            // If the inner went away it means the scheduler is shutting
525
                            // down, so we need to resolve our future.
526
0
                            None => return,
527
                        };
528
102
                        last_match_successful = result.is_ok();
529
102
                        if let Err(
err1
) = result {
  Branch (529:32): [True: 0, False: 0]
  Branch (529:32): [True: 0, False: 0]
  Branch (529:32): [Folded - Ignored]
  Branch (529:32): [True: 0, False: 12]
  Branch (529:32): [True: 0, False: 3]
  Branch (529:32): [True: 0, False: 1]
  Branch (529:32): [True: 0, False: 1]
  Branch (529:32): [True: 0, False: 0]
  Branch (529:32): [True: 0, False: 1]
  Branch (529:32): [True: 0, False: 3]
  Branch (529:32): [True: 0, False: 3]
  Branch (529:32): [True: 0, False: 4]
  Branch (529:32): [True: 0, False: 5]
  Branch (529:32): [True: 0, False: 7]
  Branch (529:32): [True: 0, False: 2]
  Branch (529:32): [True: 0, False: 4]
  Branch (529:32): [True: 0, False: 7]
  Branch (529:32): [True: 0, False: 6]
  Branch (529:32): [True: 0, False: 6]
  Branch (529:32): [True: 0, False: 9]
  Branch (529:32): [True: 0, False: 3]
  Branch (529:32): [True: 0, False: 3]
  Branch (529:32): [True: 0, False: 4]
  Branch (529:32): [True: 0, False: 7]
  Branch (529:32): [True: 1, False: 3]
  Branch (529:32): [True: 0, False: 7]
  Branch (529:32): [True: 0, False: 0]
530
1
                            error!(?err, "Error while running do_try_match");
531
101
                        }
532
533
102
                        on_matching_engine_run().await;
534
                    }
535
                    // Unreachable.
536
0
                });
537
538
24
            let worker_match_logging_interval = match spec.worker_match_logging_interval_s {
539
0
                -1 => None,
540
24
                signed_secs => {
541
24
                    if let Ok(
secs23
) = TryInto::<u64>::try_into(signed_secs) {
  Branch (541:28): [True: 0, False: 0]
  Branch (541:28): [True: 0, False: 0]
  Branch (541:28): [Folded - Ignored]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 1, False: 0]
  Branch (541:28): [True: 0, False: 1]
542
23
                        Some(Duration::from_secs(secs))
543
                    } else {
544
1
                        error!(
545
                            worker_match_logging_interval_s = spec.worker_match_logging_interval_s,
546
1
                            "Valid values for worker_match_logging_interval_s are -1 or a positive integer, setting to -1 (disabled)",
547
                        );
548
1
                        None
549
                    }
550
                }
551
            };
552
24
            Self {
553
24
                matching_engine_state_manager: state_manager.clone(),
554
24
                client_state_manager: state_manager.clone(),
555
24
                worker_scheduler,
556
24
                platform_property_manager,
557
24
                maybe_origin_event_tx,
558
24
                task_worker_matching_spawn,
559
24
                worker_match_logging_interval,
560
24
            }
561
24
        });
562
24
        (action_scheduler, worker_scheduler_clone)
563
24
    }
564
}
565
566
#[async_trait]
567
impl ClientStateManager for SimpleScheduler {
568
    async fn add_action(
569
        &self,
570
        client_operation_id: OperationId,
571
        action_info: Arc<ActionInfo>,
572
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
573
        self.inner_add_action(client_operation_id, action_info)
574
            .await
575
30
    }
576
577
    async fn filter_operations<'a>(
578
        &'a self,
579
        filter: OperationFilter,
580
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
581
        self.inner_filter_operations(filter).await
582
504
    }
583
584
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
585
0
        Some(self)
586
0
    }
587
}
588
589
#[async_trait]
590
impl KnownPlatformPropertyProvider for SimpleScheduler {
591
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
592
        Ok(self
593
            .worker_scheduler
594
            .get_platform_property_manager()
595
            .get_known_properties()
596
            .keys()
597
            .cloned()
598
            .collect())
599
0
    }
600
}
601
602
#[async_trait]
603
impl WorkerScheduler for SimpleScheduler {
604
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
605
0
        self.worker_scheduler.get_platform_property_manager()
606
0
    }
607
608
29
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
609
        self.worker_scheduler.add_worker(worker).await
610
29
    }
611
612
    async fn update_action(
613
        &self,
614
        worker_id: &WorkerId,
615
        operation_id: &OperationId,
616
        update: UpdateOperationType,
617
9
    ) -> Result<(), Error> {
618
        self.worker_scheduler
619
            .update_action(worker_id, operation_id, update)
620
            .await
621
9
    }
622
623
    async fn worker_keep_alive_received(
624
        &self,
625
        worker_id: &WorkerId,
626
        timestamp: WorkerTimestamp,
627
1
    ) -> Result<(), Error> {
628
        self.worker_scheduler
629
            .worker_keep_alive_received(worker_id, timestamp)
630
            .await
631
1
    }
632
633
5
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
634
        self.worker_scheduler.remove_worker(worker_id).await
635
5
    }
636
637
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
638
        self.worker_scheduler.shutdown(shutdown_guard).await;
639
0
    }
640
641
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
642
        self.worker_scheduler
643
            .remove_timedout_workers(now_timestamp)
644
            .await
645
1
    }
646
647
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
648
        self.worker_scheduler
649
            .set_drain_worker(worker_id, is_draining)
650
            .await
651
2
    }
652
}
653
654
impl RootMetricsComponent for SimpleScheduler {}