Coverage Report

Created: 2026-03-06 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{BTreeSet, HashMap};
16
use std::sync::Arc;
17
use std::time::{Instant, SystemTime};
18
19
use async_trait::async_trait;
20
use futures::{Future, StreamExt, future};
21
use nativelink_config::schedulers::SimpleSpec;
22
use nativelink_error::{Code, Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
25
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
28
use nativelink_util::operation_state_manager::{
29
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
30
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
31
};
32
use nativelink_util::origin_event::OriginMetadata;
33
use nativelink_util::shutdown_guard::ShutdownGuard;
34
use nativelink_util::spawn;
35
use nativelink_util::task::JoinHandleDropGuard;
36
use opentelemetry::KeyValue;
37
use opentelemetry::baggage::BaggageExt;
38
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
39
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
40
use tokio::sync::{Notify, mpsc};
41
use tokio::time::Duration;
42
use tracing::{debug, error, info, info_span, warn};
43
44
use crate::api_worker_scheduler::ApiWorkerScheduler;
45
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
46
use crate::platform_property_manager::PlatformPropertyManager;
47
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
48
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
49
use crate::worker_registry::WorkerRegistry;
50
use crate::worker_scheduler::WorkerScheduler;
51
52
/// Default timeout for workers in seconds.
53
/// If this changes, remember to change the documentation in the config.
54
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
55
56
/// Mark operations as completed with error if no client has updated them
57
/// within this duration.
58
/// If this changes, remember to change the documentation in the config.
59
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
60
61
/// Default times a job can retry before failing.
62
/// If this changes, remember to change the documentation in the config.
63
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
64
65
struct SimpleSchedulerActionStateResult {
66
    client_operation_id: OperationId,
67
    action_state_result: Box<dyn ActionStateResult>,
68
}
69
70
impl SimpleSchedulerActionStateResult {
71
31
    fn new(
72
31
        client_operation_id: OperationId,
73
31
        action_state_result: Box<dyn ActionStateResult>,
74
31
    ) -> Self {
75
31
        Self {
76
31
            client_operation_id,
77
31
            action_state_result,
78
31
        }
79
31
    }
80
}
81
82
#[async_trait]
83
impl ActionStateResult for SimpleSchedulerActionStateResult {
84
4
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
85
        let (mut action_state, origin_metadata) = self
86
            .action_state_result
87
            .as_state()
88
            .await
89
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
90
        // We need to ensure the client is not aware of the downstream
91
        // operation id, so override it before it goes out.
92
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
93
        Ok((action_state, origin_metadata))
94
4
    }
95
96
46
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
97
        let (mut action_state, origin_metadata) = self
98
            .action_state_result
99
            .changed()
100
            .await
101
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
102
        // We need to ensure the client is not aware of the downstream
103
        // operation id, so override it before it goes out.
104
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
105
        Ok((action_state, origin_metadata))
106
46
    }
107
108
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
109
        self.action_state_result
110
            .as_action_info()
111
            .await
112
            .err_tip(|| "In SimpleSchedulerActionStateResult")
113
0
    }
114
}
115
116
/// Engine used to manage the queued/running tasks and relationship with
117
/// the worker nodes. All state on how the workers and actions are interacting
118
/// should be held in this struct.
119
#[derive(MetricsComponent)]
120
pub struct SimpleScheduler {
121
    /// Manager for matching engine side of the state manager.
122
    #[metric(group = "matching_engine_state_manager")]
123
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
124
125
    /// Manager for client state of this scheduler.
126
    #[metric(group = "client_state_manager")]
127
    client_state_manager: Arc<dyn ClientStateManager>,
128
129
    /// Manager for platform of this scheduler.
130
    #[metric(group = "platform_properties")]
131
    platform_property_manager: Arc<PlatformPropertyManager>,
132
133
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
134
    /// order based on the allocation strategy.
135
    #[metric(group = "worker_scheduler")]
136
    worker_scheduler: Arc<ApiWorkerScheduler>,
137
138
    /// The sender to send origin events to the origin events.
139
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
140
141
    /// Background task that tries to match actions to workers. If this struct
142
    /// is dropped the spawn will be cancelled as well.
143
    task_worker_matching_spawn: JoinHandleDropGuard<()>,
144
145
    /// Every duration, do logging of worker matching
146
    /// e.g. "worker busy", "can't find any worker"
147
    /// Set to None to disable. This is quite noisy, so we limit it
148
    worker_match_logging_interval: Option<Duration>,
149
}
150
151
impl core::fmt::Debug for SimpleScheduler {
152
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
153
0
        f.debug_struct("SimpleScheduler")
154
0
            .field("platform_property_manager", &self.platform_property_manager)
155
0
            .field("worker_scheduler", &self.worker_scheduler)
156
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
157
0
            .field(
158
0
                "task_worker_matching_spawn",
159
0
                &self.task_worker_matching_spawn,
160
0
            )
161
0
            .finish_non_exhaustive()
162
0
    }
163
}
164
165
impl SimpleScheduler {
166
    /// Attempts to find a worker to execute an action and begins executing it.
167
    /// If an action is already running that is cacheable it may merge this
168
    /// action with the results and state changes of the already running
169
    /// action. If the task cannot be executed immediately it will be queued
170
    /// for execution based on priority and other metrics.
171
    /// All further updates to the action will be provided through the returned
172
    /// value.
173
31
    async fn inner_add_action(
174
31
        &self,
175
31
        client_operation_id: OperationId,
176
31
        action_info: Arc<ActionInfo>,
177
31
    ) -> Result<Box<dyn ActionStateResult>, Error> {
178
31
        let action_state_result = self
179
31
            .client_state_manager
180
31
            .add_action(client_operation_id.clone(), action_info)
181
31
            .await
182
31
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
183
31
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
184
31
            client_operation_id.clone(),
185
31
            action_state_result,
186
31
        )))
187
31
    }
188
189
504
    async fn inner_filter_operations(
190
504
        &self,
191
504
        filter: OperationFilter,
192
504
    ) -> Result<ActionStateResultStream<'_>, Error> {
193
504
        self.client_state_manager
194
504
            .filter_operations(filter)
195
504
            .await
196
504
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
197
504
    }
198
199
114
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream<'_>, Error> {
200
114
        let filter = OperationFilter {
201
114
            stages: OperationStageFlags::Queued,
202
114
            order_by_priority_direction: Some(OrderDirection::Desc),
203
114
            ..Default::default()
204
114
        };
205
114
        self.matching_engine_state_manager
206
114
            .filter_operations(filter)
207
114
            .await
208
114
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
209
114
    }
210
211
10
    
pub async fn do_try_match_for_test(&self) -> Result<(), Error>0
{
212
10
        self.do_try_match(true).await
213
10
    }
214
215
    // TODO(palfrey) This is an O(n*m) (aka n^2) algorithm. In theory we
216
    // can create a map of capabilities of each worker and then try and match
217
    // the actions to the worker using the map lookup (ie. map reduce).
218
114
    async fn do_try_match(&self, full_worker_logging: bool) -> Result<(), Error> {
219
55
        async fn match_action_to_worker(
220
55
            action_state_result: &dyn ActionStateResult,
221
55
            workers: &ApiWorkerScheduler,
222
55
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
223
55
            platform_property_manager: &PlatformPropertyManager,
224
55
            full_worker_logging: bool,
225
55
        ) -> Result<(), Error> {
226
55
            let (action_info, maybe_origin_metadata) =
227
55
                action_state_result
228
55
                    .as_action_info()
229
55
                    .await
230
55
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
231
232
            // TODO(palfrey) We should not compute this every time and instead store
233
            // it with the ActionInfo when we receive it.
234
55
            let platform_properties = platform_property_manager
235
55
                .make_platform_properties(action_info.platform_properties.clone())
236
55
                .err_tip(
237
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
238
0
                )?;
239
240
55
            let action_info = ActionInfoWithProps {
241
55
                inner: action_info,
242
55
                platform_properties,
243
55
            };
244
245
            // Try to find a worker for the action.
246
36
            let worker_id = {
247
55
                match workers
248
55
                    .find_worker_for_action(&action_info.platform_properties, full_worker_logging)
249
55
                    .await
250
                {
251
36
                    Some(worker_id) => worker_id,
252
                    // If we could not find a worker for the action,
253
                    // we have nothing to do.
254
19
                    None => return Ok(()),
255
                }
256
            };
257
258
36
            let attach_operation_fut = async move {
259
                // Extract the operation_id from the action_state.
260
36
                let operation_id = {
261
36
                    let (action_state, _origin_metadata) = action_state_result
262
36
                        .as_state()
263
36
                        .await
264
36
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
265
36
                    action_state.client_operation_id.clone()
266
                };
267
268
                // Tell the matching engine that the operation is being assigned to a worker.
269
36
                let assign_result = matching_engine_state_manager
270
36
                    .assign_operation(&operation_id, Ok(&worker_id))
271
36
                    .await
272
36
                    .err_tip(|| "Failed to assign operation in do_try_match");
273
36
                if let Err(
err5
) = assign_result {
  Branch (273:24): [Folded - Ignored]
  Branch (273:24): [Folded - Ignored]
  Branch (273:24): [True: 4, False: 4]
  Branch (273:24): [True: 1, False: 27]
274
5
                    if err.code == Code::Aborted {
  Branch (274:24): [Folded - Ignored]
  Branch (274:24): [Folded - Ignored]
  Branch (274:24): [True: 4, False: 0]
  Branch (274:24): [True: 0, False: 1]
275
                        // If the operation was aborted, it means that the operation was
276
                        // cancelled due to another operation being assigned to the worker.
277
4
                        return Ok(());
278
1
                    }
279
                    // Any other error is a real error.
280
1
                    return Err(err);
281
31
                }
282
283
31
                debug!(%worker_id, %operation_id, ?action_info, "Notifying worker of operation");
284
31
                workers
285
31
                    .worker_notify_run_action(worker_id, operation_id, action_info)
286
31
                    .await
287
31
                    .err_tip(|| 
{1
288
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
289
1
                    })
290
36
            };
291
36
            tokio::pin!(attach_operation_fut);
292
293
36
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
294
295
36
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
296
                ENDUSER_ID,
297
36
                origin_metadata.identity,
298
            )]);
299
300
36
            info_span!("do_try_match")
301
36
                .in_scope(|| attach_operation_fut)
302
36
                .with_context(ctx)
303
36
                .await
304
55
        }
305
306
114
        let mut result = Ok(());
307
308
114
        let start = Instant::now();
309
310
114
        let mut stream = self
311
114
            .get_queued_operations()
312
114
            .await
313
114
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
314
315
114
        let query_elapsed = start.elapsed();
316
114
        if query_elapsed > Duration::from_secs(1) {
  Branch (316:12): [Folded - Ignored]
  Branch (316:12): [Folded - Ignored]
  Branch (316:12): [True: 0, False: 17]
  Branch (316:12): [True: 0, False: 97]
317
0
            warn!(
318
0
                elapsed_ms = query_elapsed.as_millis(),
319
0
                "Slow get_queued_operations query"
320
            );
321
114
        }
322
323
169
        while let Some(
action_state_result55
) = stream.next().await {
  Branch (323:19): [Folded - Ignored]
  Branch (323:19): [Folded - Ignored]
  Branch (323:19): [True: 8, False: 17]
  Branch (323:19): [True: 47, False: 97]
324
55
            result = result.merge(
325
55
                match_action_to_worker(
326
55
                    action_state_result.as_ref(),
327
55
                    self.worker_scheduler.as_ref(),
328
55
                    self.matching_engine_state_manager.as_ref(),
329
55
                    self.platform_property_manager.as_ref(),
330
55
                    full_worker_logging,
331
55
                )
332
55
                .await,
333
            );
334
        }
335
336
114
        let total_elapsed = start.elapsed();
337
114
        if total_elapsed > Duration::from_secs(5) {
  Branch (337:12): [Folded - Ignored]
  Branch (337:12): [Folded - Ignored]
  Branch (337:12): [True: 0, False: 17]
  Branch (337:12): [True: 0, False: 97]
338
0
            warn!(
339
0
                total_ms = total_elapsed.as_millis(),
340
0
                query_ms = query_elapsed.as_millis(),
341
0
                "Slow do_try_match cycle"
342
            );
343
114
        }
344
345
114
        result
346
114
    }
347
}
348
349
impl SimpleScheduler {
350
1
    pub fn new<A: AwaitedActionDb>(
351
1
        spec: &SimpleSpec,
352
1
        awaited_action_db: A,
353
1
        task_change_notify: Arc<Notify>,
354
1
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
355
1
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
356
1
        Self::new_with_callback(
357
1
            spec,
358
1
            awaited_action_db,
359
0
            || {
360
                // The cost of running `do_try_match()` is very high, but constant
361
                // in relation to the number of changes that have happened. This
362
                // means that grabbing this lock to process `do_try_match()` should
363
                // always yield to any other tasks that might want the lock. The
364
                // easiest and most fair way to do this is to sleep for a small
365
                // amount of time. Using something like tokio::task::yield_now()
366
                // does not yield as aggressively as we'd like if new futures are
367
                // scheduled within a future.
368
0
                tokio::time::sleep(Duration::from_millis(1))
369
0
            },
370
1
            task_change_notify,
371
            SystemTime::now,
372
1
            maybe_origin_event_tx,
373
        )
374
1
    }
375
376
25
    pub fn new_with_callback<
377
25
        Fut: Future<Output = ()> + Send,
378
25
        F: Fn() -> Fut + Send + Sync + 'static,
379
25
        A: AwaitedActionDb,
380
25
        I: InstantWrapper,
381
25
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
382
25
    >(
383
25
        spec: &SimpleSpec,
384
25
        awaited_action_db: A,
385
25
        on_matching_engine_run: F,
386
25
        task_change_notify: Arc<Notify>,
387
25
        now_fn: NowFn,
388
25
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
389
25
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
390
25
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
391
25
            spec.supported_platform_properties
392
25
                .clone()
393
25
                .unwrap_or_default(),
394
        ));
395
396
25
        let mut worker_timeout_s = spec.worker_timeout_s;
397
25
        if worker_timeout_s == 0 {
  Branch (397:12): [Folded - Ignored]
  Branch (397:12): [Folded - Ignored]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 0, False: 1]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 0, False: 1]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 0, False: 1]
  Branch (397:12): [True: 0, False: 1]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 0, False: 1]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
  Branch (397:12): [True: 1, False: 0]
398
20
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
399
20
        
}5
400
401
25
        let mut client_action_timeout_s = spec.client_action_timeout_s;
402
25
        if client_action_timeout_s == 0 {
  Branch (402:12): [Folded - Ignored]
  Branch (402:12): [Folded - Ignored]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 0, False: 1]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
  Branch (402:12): [True: 1, False: 0]
403
24
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
404
24
        
}1
405
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
406
        // tasks are going to be dropped all over the place, this isn't a good
407
        // setting.
408
25
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (408:12): [Folded - Ignored]
  Branch (408:12): [Folded - Ignored]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
  Branch (408:12): [True: 0, False: 1]
409
0
            error!(
410
                client_action_timeout_s,
411
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
412
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
413
            );
414
25
        }
415
416
25
        let mut max_job_retries = spec.max_job_retries;
417
25
        if max_job_retries == 0 {
  Branch (417:12): [Folded - Ignored]
  Branch (417:12): [Folded - Ignored]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 0, False: 1]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
  Branch (417:12): [True: 1, False: 0]
418
24
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
419
24
        
}1
420
421
25
        let worker_change_notify = Arc::new(Notify::new());
422
423
        // Create shared worker registry for single heartbeat per worker.
424
25
        let worker_registry = Arc::new(WorkerRegistry::new());
425
426
25
        let state_manager = SimpleSchedulerStateManager::new(
427
25
            max_job_retries,
428
25
            Duration::from_secs(worker_timeout_s),
429
25
            Duration::from_secs(client_action_timeout_s),
430
25
            Duration::from_secs(spec.max_action_executing_timeout_s),
431
25
            awaited_action_db,
432
25
            now_fn,
433
25
            Some(worker_registry.clone()),
434
        );
435
436
25
        let worker_scheduler = ApiWorkerScheduler::new(
437
25
            state_manager.clone(),
438
25
            platform_property_manager.clone(),
439
25
            spec.allocation_strategy,
440
25
            worker_change_notify.clone(),
441
25
            worker_timeout_s,
442
25
            worker_registry,
443
        );
444
445
25
        let worker_scheduler_clone = worker_scheduler.clone();
446
447
25
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
448
25
            let weak_inner = weak_self.clone();
449
25
            let task_worker_matching_spawn =
450
25
                spawn!("simple_scheduler_task_worker_matching", async move 
{23
451
23
                    let mut last_match_successful = true;
452
23
                    let mut worker_match_logging_last: Option<Instant> = None;
453
                    // Break out of the loop only when the inner is dropped.
454
                    loop {
455
127
                        let task_change_fut = task_change_notify.notified();
456
127
                        let worker_change_fut = worker_change_notify.notified();
457
127
                        tokio::pin!(task_change_fut);
458
127
                        tokio::pin!(worker_change_fut);
459
                        // Wait for either of these futures to be ready.
460
127
                        let state_changed = future::select(task_change_fut, worker_change_fut);
461
127
                        if last_match_successful {
  Branch (461:28): [Folded - Ignored]
  Branch (461:28): [Folded - Ignored]
  Branch (461:28): [True: 13, False: 0]
  Branch (461:28): [True: 4, False: 0]
  Branch (461:28): [True: 3, False: 0]
  Branch (461:28): [True: 2, False: 0]
  Branch (461:28): [True: 2, False: 0]
  Branch (461:28): [True: 0, False: 0]
  Branch (461:28): [True: 2, False: 0]
  Branch (461:28): [True: 4, False: 0]
  Branch (461:28): [True: 4, False: 0]
  Branch (461:28): [True: 5, False: 0]
  Branch (461:28): [True: 6, False: 0]
  Branch (461:28): [True: 8, False: 0]
  Branch (461:28): [True: 3, False: 0]
  Branch (461:28): [True: 5, False: 0]
  Branch (461:28): [True: 8, False: 0]
  Branch (461:28): [True: 7, False: 0]
  Branch (461:28): [True: 7, False: 0]
  Branch (461:28): [True: 10, False: 0]
  Branch (461:28): [True: 4, False: 0]
  Branch (461:28): [True: 4, False: 0]
  Branch (461:28): [True: 5, False: 0]
  Branch (461:28): [True: 8, False: 0]
  Branch (461:28): [True: 4, False: 1]
  Branch (461:28): [True: 8, False: 0]
  Branch (461:28): [True: 0, False: 0]
462
126
                            let _ = state_changed.await;
463
                        } else {
464
                            // If the last match failed, then run again after a short sleep.
465
                            // This resolves issues where we tried to re-schedule a job to
466
                            // a disconnected worker.  The sleep ensures we don't enter a
467
                            // hard loop if there's something wrong inside do_try_match.
468
1
                            let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
469
1
                            tokio::pin!(sleep_fut);
470
1
                            let _ = future::select(state_changed, sleep_fut).await;
471
                        }
472
473
104
                        let result = match weak_inner.upgrade() {
474
104
                            Some(scheduler) => {
475
104
                                let now = Instant::now();
476
104
                                let full_worker_logging = {
477
104
                                    match scheduler.worker_match_logging_interval {
478
99
                                        None => false,
479
5
                                        Some(duration) => match worker_match_logging_last {
480
2
                                            None => true,
481
3
                                            Some(when) => now.duration_since(when) >= duration,
482
                                        },
483
                                    }
484
                                };
485
486
104
                                let res = scheduler.do_try_match(full_worker_logging).await;
487
104
                                if full_worker_logging {
  Branch (487:36): [Folded - Ignored]
  Branch (487:36): [Folded - Ignored]
  Branch (487:36): [True: 0, False: 12]
  Branch (487:36): [True: 0, False: 3]
  Branch (487:36): [True: 1, False: 1]
  Branch (487:36): [True: 0, False: 1]
  Branch (487:36): [True: 0, False: 1]
  Branch (487:36): [True: 0, False: 0]
  Branch (487:36): [True: 0, False: 1]
  Branch (487:36): [True: 0, False: 3]
  Branch (487:36): [True: 1, False: 2]
  Branch (487:36): [True: 0, False: 4]
  Branch (487:36): [True: 0, False: 5]
  Branch (487:36): [True: 0, False: 7]
  Branch (487:36): [True: 0, False: 2]
  Branch (487:36): [True: 0, False: 4]
  Branch (487:36): [True: 0, False: 7]
  Branch (487:36): [True: 0, False: 6]
  Branch (487:36): [True: 0, False: 6]
  Branch (487:36): [True: 0, False: 9]
  Branch (487:36): [True: 0, False: 3]
  Branch (487:36): [True: 0, False: 3]
  Branch (487:36): [True: 0, False: 4]
  Branch (487:36): [True: 0, False: 7]
  Branch (487:36): [True: 0, False: 4]
  Branch (487:36): [True: 0, False: 7]
  Branch (487:36): [True: 0, False: 0]
488
2
                                    let operations_stream = scheduler
489
2
                                        .matching_engine_state_manager
490
2
                                        .filter_operations(OperationFilter::default())
491
2
                                        .await
492
2
                                        .err_tip(|| "In action_scheduler getting filter result");
493
494
2
                                    let mut oldest_actions_in_state: HashMap<
495
2
                                        String,
496
2
                                        BTreeSet<Arc<ActionState>>,
497
2
                                    > = HashMap::new();
498
2
                                    let max_items = 5;
499
500
2
                                    match operations_stream {
501
2
                                        Ok(stream) => {
502
2
                                            let actions = stream
503
2
                                                .filter_map(|item| async move 
{0
504
0
                                                    match item.as_ref().as_state().await {
505
0
                                                        Ok((action_state, _origin_metadata)) => {
506
0
                                                            Some(action_state)
507
                                                        }
508
0
                                                        Err(e) => {
509
0
                                                            error!(
510
                                                                ?e,
511
0
                                                                "Failed to get action state!"
512
                                                            );
513
0
                                                            None
514
                                                        }
515
                                                    }
516
0
                                                })
517
2
                                                .collect::<Vec<_>>()
518
2
                                                .await;
519
2
                                            for 
action_state0
in &actions {
520
0
                                                let name = action_state.stage.name();
521
0
                                                if let Some(values) =
  Branch (521:56): [Folded - Ignored]
  Branch (521:56): [Folded - Ignored]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
  Branch (521:56): [True: 0, False: 0]
522
0
                                                    oldest_actions_in_state.get_mut(&name)
523
                                                {
524
0
                                                    values.insert(action_state.clone());
525
0
                                                    if values.len() > max_items {
  Branch (525:56): [Folded - Ignored]
  Branch (525:56): [Folded - Ignored]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
  Branch (525:56): [True: 0, False: 0]
526
0
                                                        values.pop_first();
527
0
                                                    }
528
0
                                                } else {
529
0
                                                    let mut values = BTreeSet::new();
530
0
                                                    values.insert(action_state.clone());
531
0
                                                    oldest_actions_in_state.insert(name, values);
532
0
                                                }
533
                                            }
534
                                        }
535
0
                                        Err(e) => {
536
0
                                            error!(?e, "Failed to get operations list!");
537
                                        }
538
                                    }
539
540
2
                                    for 
value0
in oldest_actions_in_state.values() {
541
0
                                        let mut items = vec![];
542
0
                                        for item in value {
543
0
                                            items.push(item.to_string());
544
0
                                        }
545
0
                                        info!(?items, "Oldest actions in state");
546
                                    }
547
548
2
                                    worker_match_logging_last.replace(now);
549
102
                                }
550
104
                                res
551
                            }
552
                            // If the inner went away it means the scheduler is shutting
553
                            // down, so we need to resolve our future.
554
0
                            None => return,
555
                        };
556
104
                        last_match_successful = result.is_ok();
557
104
                        if let Err(
err1
) = result {
  Branch (557:32): [Folded - Ignored]
  Branch (557:32): [Folded - Ignored]
  Branch (557:32): [True: 0, False: 12]
  Branch (557:32): [True: 0, False: 3]
  Branch (557:32): [True: 0, False: 2]
  Branch (557:32): [True: 0, False: 1]
  Branch (557:32): [True: 0, False: 1]
  Branch (557:32): [True: 0, False: 0]
  Branch (557:32): [True: 0, False: 1]
  Branch (557:32): [True: 0, False: 3]
  Branch (557:32): [True: 0, False: 3]
  Branch (557:32): [True: 0, False: 4]
  Branch (557:32): [True: 0, False: 5]
  Branch (557:32): [True: 0, False: 7]
  Branch (557:32): [True: 0, False: 2]
  Branch (557:32): [True: 0, False: 4]
  Branch (557:32): [True: 0, False: 7]
  Branch (557:32): [True: 0, False: 6]
  Branch (557:32): [True: 0, False: 6]
  Branch (557:32): [True: 0, False: 9]
  Branch (557:32): [True: 0, False: 3]
  Branch (557:32): [True: 0, False: 3]
  Branch (557:32): [True: 0, False: 4]
  Branch (557:32): [True: 0, False: 7]
  Branch (557:32): [True: 1, False: 3]
  Branch (557:32): [True: 0, False: 7]
  Branch (557:32): [True: 0, False: 0]
558
1
                            error!(?err, "Error while running do_try_match");
559
103
                        }
560
561
104
                        on_matching_engine_run().await;
562
                    }
563
                    // Unreachable.
564
0
                });
565
566
25
            let worker_match_logging_interval = match spec.worker_match_logging_interval_s {
567
                // -1 or 0 means disabled (0 used to cause expensive logging on every call)
568
22
                -1 | 0 => None,
569
3
                signed_secs => {
570
3
                    if let Ok(
secs2
) = TryInto::<u64>::try_into(signed_secs) {
  Branch (570:28): [Folded - Ignored]
  Branch (570:28): [Folded - Ignored]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 1, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 1, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 0]
  Branch (570:28): [True: 0, False: 1]
571
2
                        Some(Duration::from_secs(secs))
572
                    } else {
573
1
                        error!(
574
                            worker_match_logging_interval_s = spec.worker_match_logging_interval_s,
575
1
                            "Valid values for worker_match_logging_interval_s are -1, 0, or a positive integer, setting to disabled",
576
                        );
577
1
                        None
578
                    }
579
                }
580
            };
581
25
            Self {
582
25
                matching_engine_state_manager: state_manager.clone(),
583
25
                client_state_manager: state_manager.clone(),
584
25
                worker_scheduler,
585
25
                platform_property_manager,
586
25
                maybe_origin_event_tx,
587
25
                task_worker_matching_spawn,
588
25
                worker_match_logging_interval,
589
25
            }
590
25
        });
591
25
        (action_scheduler, worker_scheduler_clone)
592
25
    }
593
}
594
595
#[async_trait]
596
impl ClientStateManager for SimpleScheduler {
597
    async fn add_action(
598
        &self,
599
        client_operation_id: OperationId,
600
        action_info: Arc<ActionInfo>,
601
31
    ) -> Result<Box<dyn ActionStateResult>, Error> {
602
        self.inner_add_action(client_operation_id, action_info)
603
            .await
604
31
    }
605
606
    async fn filter_operations<'a>(
607
        &'a self,
608
        filter: OperationFilter,
609
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
610
        self.inner_filter_operations(filter).await
611
504
    }
612
613
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
614
0
        Some(self)
615
0
    }
616
}
617
618
#[async_trait]
619
impl KnownPlatformPropertyProvider for SimpleScheduler {
620
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
621
        Ok(self
622
            .worker_scheduler
623
            .get_platform_property_manager()
624
            .get_known_properties()
625
            .keys()
626
            .cloned()
627
            .collect())
628
0
    }
629
}
630
631
#[async_trait]
632
impl WorkerScheduler for SimpleScheduler {
633
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
634
0
        self.worker_scheduler.get_platform_property_manager()
635
0
    }
636
637
30
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
638
        self.worker_scheduler.add_worker(worker).await
639
30
    }
640
641
    async fn update_action(
642
        &self,
643
        worker_id: &WorkerId,
644
        operation_id: &OperationId,
645
        update: UpdateOperationType,
646
9
    ) -> Result<(), Error> {
647
        self.worker_scheduler
648
            .update_action(worker_id, operation_id, update)
649
            .await
650
9
    }
651
652
    async fn worker_keep_alive_received(
653
        &self,
654
        worker_id: &WorkerId,
655
        timestamp: WorkerTimestamp,
656
1
    ) -> Result<(), Error> {
657
        self.worker_scheduler
658
            .worker_keep_alive_received(worker_id, timestamp)
659
            .await
660
1
    }
661
662
5
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
663
        self.worker_scheduler.remove_worker(worker_id).await
664
5
    }
665
666
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
667
        self.worker_scheduler.shutdown(shutdown_guard).await;
668
0
    }
669
670
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
671
        self.worker_scheduler
672
            .remove_timedout_workers(now_timestamp)
673
            .await
674
1
    }
675
676
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
677
        self.worker_scheduler
678
            .set_drain_worker(worker_id, is_draining)
679
            .await
680
2
    }
681
}
682
683
impl RootMetricsComponent for SimpleScheduler {}