Coverage Report

Created: 2026-02-05 09:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{BTreeSet, HashMap};
16
use std::sync::Arc;
17
use std::time::{Instant, SystemTime};
18
19
use async_trait::async_trait;
20
use futures::{Future, StreamExt, future};
21
use nativelink_config::schedulers::SimpleSpec;
22
use nativelink_error::{Code, Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
25
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
28
use nativelink_util::operation_state_manager::{
29
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
30
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
31
};
32
use nativelink_util::origin_event::OriginMetadata;
33
use nativelink_util::shutdown_guard::ShutdownGuard;
34
use nativelink_util::spawn;
35
use nativelink_util::task::JoinHandleDropGuard;
36
use opentelemetry::KeyValue;
37
use opentelemetry::baggage::BaggageExt;
38
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
39
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
40
use tokio::sync::{Notify, mpsc};
41
use tokio::time::Duration;
42
use tracing::{error, info, info_span, warn};
43
44
use crate::api_worker_scheduler::ApiWorkerScheduler;
45
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
46
use crate::platform_property_manager::PlatformPropertyManager;
47
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
48
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
49
use crate::worker_registry::WorkerRegistry;
50
use crate::worker_scheduler::WorkerScheduler;
51
52
/// Default timeout for workers in seconds.
53
/// If this changes, remember to change the documentation in the config.
54
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
55
56
/// Mark operations as completed with error if no client has updated them
57
/// within this duration.
58
/// If this changes, remember to change the documentation in the config.
59
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;
60
61
/// Default times a job can retry before failing.
62
/// If this changes, remember to change the documentation in the config.
63
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
64
65
struct SimpleSchedulerActionStateResult {
66
    client_operation_id: OperationId,
67
    action_state_result: Box<dyn ActionStateResult>,
68
}
69
70
impl SimpleSchedulerActionStateResult {
71
30
    fn new(
72
30
        client_operation_id: OperationId,
73
30
        action_state_result: Box<dyn ActionStateResult>,
74
30
    ) -> Self {
75
30
        Self {
76
30
            client_operation_id,
77
30
            action_state_result,
78
30
        }
79
30
    }
80
}
81
82
#[async_trait]
83
impl ActionStateResult for SimpleSchedulerActionStateResult {
84
4
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
85
        let (mut action_state, origin_metadata) = self
86
            .action_state_result
87
            .as_state()
88
            .await
89
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
90
        // We need to ensure the client is not aware of the downstream
91
        // operation id, so override it before it goes out.
92
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
93
        Ok((action_state, origin_metadata))
94
4
    }
95
96
46
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
97
        let (mut action_state, origin_metadata) = self
98
            .action_state_result
99
            .changed()
100
            .await
101
            .err_tip(|| "In SimpleSchedulerActionStateResult")?;
102
        // We need to ensure the client is not aware of the downstream
103
        // operation id, so override it before it goes out.
104
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
105
        Ok((action_state, origin_metadata))
106
46
    }
107
108
0
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
109
        self.action_state_result
110
            .as_action_info()
111
            .await
112
            .err_tip(|| "In SimpleSchedulerActionStateResult")
113
0
    }
114
}
115
116
/// Engine used to manage the queued/running tasks and relationship with
117
/// the worker nodes. All state on how the workers and actions are interacting
118
/// should be held in this struct.
119
#[derive(MetricsComponent)]
120
pub struct SimpleScheduler {
121
    /// Manager for matching engine side of the state manager.
122
    #[metric(group = "matching_engine_state_manager")]
123
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
124
125
    /// Manager for client state of this scheduler.
126
    #[metric(group = "client_state_manager")]
127
    client_state_manager: Arc<dyn ClientStateManager>,
128
129
    /// Manager for platform of this scheduler.
130
    #[metric(group = "platform_properties")]
131
    platform_property_manager: Arc<PlatformPropertyManager>,
132
133
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
134
    /// order based on the allocation strategy.
135
    #[metric(group = "worker_scheduler")]
136
    worker_scheduler: Arc<ApiWorkerScheduler>,
137
138
    /// The sender to send origin events to the origin events.
139
    maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
140
141
    /// Background task that tries to match actions to workers. If this struct
142
    /// is dropped the spawn will be cancelled as well.
143
    task_worker_matching_spawn: JoinHandleDropGuard<()>,
144
145
    /// Every duration, do logging of worker matching
146
    /// e.g. "worker busy", "can't find any worker"
147
    /// Set to None to disable. This is quite noisy, so we limit it
148
    worker_match_logging_interval: Option<Duration>,
149
}
150
151
impl core::fmt::Debug for SimpleScheduler {
152
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
153
0
        f.debug_struct("SimpleScheduler")
154
0
            .field("platform_property_manager", &self.platform_property_manager)
155
0
            .field("worker_scheduler", &self.worker_scheduler)
156
0
            .field("maybe_origin_event_tx", &self.maybe_origin_event_tx)
157
0
            .field(
158
0
                "task_worker_matching_spawn",
159
0
                &self.task_worker_matching_spawn,
160
0
            )
161
0
            .finish_non_exhaustive()
162
0
    }
163
}
164
165
impl SimpleScheduler {
166
    /// Attempts to find a worker to execute an action and begins executing it.
167
    /// If an action is already running that is cacheable it may merge this
168
    /// action with the results and state changes of the already running
169
    /// action. If the task cannot be executed immediately it will be queued
170
    /// for execution based on priority and other metrics.
171
    /// All further updates to the action will be provided through the returned
172
    /// value.
173
30
    async fn inner_add_action(
174
30
        &self,
175
30
        client_operation_id: OperationId,
176
30
        action_info: Arc<ActionInfo>,
177
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
178
30
        let action_state_result = self
179
30
            .client_state_manager
180
30
            .add_action(client_operation_id.clone(), action_info)
181
30
            .await
182
30
            .err_tip(|| "In SimpleScheduler::add_action")
?0
;
183
30
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
184
30
            client_operation_id.clone(),
185
30
            action_state_result,
186
30
        )))
187
30
    }
188
189
504
    async fn inner_filter_operations(
190
504
        &self,
191
504
        filter: OperationFilter,
192
504
    ) -> Result<ActionStateResultStream<'_>, Error> {
193
504
        self.client_state_manager
194
504
            .filter_operations(filter)
195
504
            .await
196
504
            .err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")
197
504
    }
198
199
111
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream<'_>, Error> {
200
111
        let filter = OperationFilter {
201
111
            stages: OperationStageFlags::Queued,
202
111
            order_by_priority_direction: Some(OrderDirection::Desc),
203
111
            ..Default::default()
204
111
        };
205
111
        self.matching_engine_state_manager
206
111
            .filter_operations(filter)
207
111
            .await
208
111
            .err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
209
111
    }
210
211
9
    
pub async fn do_try_match_for_test(&self) -> Result<(), Error>0
{
212
9
        self.do_try_match(true).await
213
9
    }
214
215
    // TODO(palfrey) This is an O(n*m) (aka n^2) algorithm. In theory we
216
    // can create a map of capabilities of each worker and then try and match
217
    // the actions to the worker using the map lookup (ie. map reduce).
218
111
    async fn do_try_match(&self, full_worker_logging: bool) -> Result<(), Error> {
219
50
        async fn match_action_to_worker(
220
50
            action_state_result: &dyn ActionStateResult,
221
50
            workers: &ApiWorkerScheduler,
222
50
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
223
50
            platform_property_manager: &PlatformPropertyManager,
224
50
            full_worker_logging: bool,
225
50
        ) -> Result<(), Error> {
226
50
            let (action_info, maybe_origin_metadata) =
227
50
                action_state_result
228
50
                    .as_action_info()
229
50
                    .await
230
50
                    .err_tip(|| "Failed to get action_info from as_action_info_result stream")
?0
;
231
232
            // TODO(palfrey) We should not compute this every time and instead store
233
            // it with the ActionInfo when we receive it.
234
50
            let platform_properties = platform_property_manager
235
50
                .make_platform_properties(action_info.platform_properties.clone())
236
50
                .err_tip(
237
                    || "Failed to make platform properties in SimpleScheduler::do_try_match",
238
0
                )?;
239
240
50
            let action_info = ActionInfoWithProps {
241
50
                inner: action_info,
242
50
                platform_properties,
243
50
            };
244
245
            // Try to find a worker for the action.
246
32
            let worker_id = {
247
50
                match workers
248
50
                    .find_worker_for_action(&action_info.platform_properties, full_worker_logging)
249
50
                    .await
250
                {
251
32
                    Some(worker_id) => worker_id,
252
                    // If we could not find a worker for the action,
253
                    // we have nothing to do.
254
18
                    None => return Ok(()),
255
                }
256
            };
257
258
32
            let attach_operation_fut = async move {
259
                // Extract the operation_id from the action_state.
260
32
                let operation_id = {
261
32
                    let (action_state, _origin_metadata) = action_state_result
262
32
                        .as_state()
263
32
                        .await
264
32
                        .err_tip(|| "Failed to get action_info from as_state_result stream")
?0
;
265
32
                    action_state.client_operation_id.clone()
266
                };
267
268
                // Tell the matching engine that the operation is being assigned to a worker.
269
32
                let assign_result = matching_engine_state_manager
270
32
                    .assign_operation(&operation_id, Ok(&worker_id))
271
32
                    .await
272
32
                    .err_tip(|| "Failed to assign operation in do_try_match");
273
32
                if let Err(
err1
) = assign_result {
  Branch (273:24): [True: 0, False: 0]
  Branch (273:24): [Folded - Ignored]
  Branch (273:24): [True: 0, False: 4]
  Branch (273:24): [True: 1, False: 27]
274
1
                    if err.code == Code::Aborted {
  Branch (274:24): [True: 0, False: 0]
  Branch (274:24): [Folded - Ignored]
  Branch (274:24): [True: 0, False: 0]
  Branch (274:24): [True: 0, False: 1]
275
                        // If the operation was aborted, it means that the operation was
276
                        // cancelled due to another operation being assigned to the worker.
277
0
                        return Ok(());
278
1
                    }
279
                    // Any other error is a real error.
280
1
                    return Err(err);
281
31
                }
282
283
31
                workers
284
31
                    .worker_notify_run_action(worker_id, operation_id, action_info)
285
31
                    .await
286
31
                    .err_tip(|| 
{1
287
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
288
1
                    })
289
32
            };
290
32
            tokio::pin!(attach_operation_fut);
291
292
32
            let origin_metadata = maybe_origin_metadata.unwrap_or_default();
293
294
32
            let ctx = Context::current_with_baggage(vec![KeyValue::new(
295
                ENDUSER_ID,
296
32
                origin_metadata.identity,
297
            )]);
298
299
32
            info_span!("do_try_match")
300
32
                .in_scope(|| attach_operation_fut)
301
32
                .with_context(ctx)
302
32
                .await
303
50
        }
304
305
111
        let mut result = Ok(());
306
307
111
        let start = Instant::now();
308
309
111
        let mut stream = self
310
111
            .get_queued_operations()
311
111
            .await
312
111
            .err_tip(|| "Failed to get queued operations in do_try_match")
?0
;
313
314
111
        let query_elapsed = start.elapsed();
315
111
        if query_elapsed > Duration::from_secs(1) {
  Branch (315:12): [True: 0, False: 0]
  Branch (315:12): [Folded - Ignored]
  Branch (315:12): [True: 0, False: 17]
  Branch (315:12): [True: 0, False: 94]
316
0
            warn!(
317
0
                elapsed_ms = query_elapsed.as_millis(),
318
0
                "Slow get_queued_operations query"
319
            );
320
111
        }
321
322
161
        while let Some(
action_state_result50
) = stream.next().await {
  Branch (322:19): [True: 0, False: 0]
  Branch (322:19): [Folded - Ignored]
  Branch (322:19): [True: 5, False: 17]
  Branch (322:19): [True: 45, False: 94]
323
50
            result = result.merge(
324
50
                match_action_to_worker(
325
50
                    action_state_result.as_ref(),
326
50
                    self.worker_scheduler.as_ref(),
327
50
                    self.matching_engine_state_manager.as_ref(),
328
50
                    self.platform_property_manager.as_ref(),
329
50
                    full_worker_logging,
330
50
                )
331
50
                .await,
332
            );
333
        }
334
335
111
        let total_elapsed = start.elapsed();
336
111
        if total_elapsed > Duration::from_secs(5) {
  Branch (336:12): [True: 0, False: 0]
  Branch (336:12): [Folded - Ignored]
  Branch (336:12): [True: 0, False: 17]
  Branch (336:12): [True: 0, False: 94]
337
0
            warn!(
338
0
                total_ms = total_elapsed.as_millis(),
339
0
                query_ms = query_elapsed.as_millis(),
340
0
                "Slow do_try_match cycle"
341
            );
342
111
        }
343
344
111
        result
345
111
    }
346
}
347
348
impl SimpleScheduler {
349
1
    pub fn new<A: AwaitedActionDb>(
350
1
        spec: &SimpleSpec,
351
1
        awaited_action_db: A,
352
1
        task_change_notify: Arc<Notify>,
353
1
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
354
1
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
355
1
        Self::new_with_callback(
356
1
            spec,
357
1
            awaited_action_db,
358
0
            || {
359
                // The cost of running `do_try_match()` is very high, but constant
360
                // in relation to the number of changes that have happened. This
361
                // means that grabbing this lock to process `do_try_match()` should
362
                // always yield to any other tasks that might want the lock. The
363
                // easiest and most fair way to do this is to sleep for a small
364
                // amount of time. Using something like tokio::task::yield_now()
365
                // does not yield as aggressively as we'd like if new futures are
366
                // scheduled within a future.
367
0
                tokio::time::sleep(Duration::from_millis(1))
368
0
            },
369
1
            task_change_notify,
370
            SystemTime::now,
371
1
            maybe_origin_event_tx,
372
        )
373
1
    }
374
375
24
    pub fn new_with_callback<
376
24
        Fut: Future<Output = ()> + Send,
377
24
        F: Fn() -> Fut + Send + Sync + 'static,
378
24
        A: AwaitedActionDb,
379
24
        I: InstantWrapper,
380
24
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
381
24
    >(
382
24
        spec: &SimpleSpec,
383
24
        awaited_action_db: A,
384
24
        on_matching_engine_run: F,
385
24
        task_change_notify: Arc<Notify>,
386
24
        now_fn: NowFn,
387
24
        maybe_origin_event_tx: Option<mpsc::Sender<OriginEvent>>,
388
24
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
389
24
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
390
24
            spec.supported_platform_properties
391
24
                .clone()
392
24
                .unwrap_or_default(),
393
        ));
394
395
24
        let mut worker_timeout_s = spec.worker_timeout_s;
396
24
        if worker_timeout_s == 0 {
  Branch (396:12): [True: 0, False: 0]
  Branch (396:12): [True: 0, False: 0]
  Branch (396:12): [Folded - Ignored]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 0, False: 1]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
  Branch (396:12): [True: 1, False: 0]
397
19
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
398
19
        
}5
399
400
24
        let mut client_action_timeout_s = spec.client_action_timeout_s;
401
24
        if client_action_timeout_s == 0 {
  Branch (401:12): [True: 0, False: 0]
  Branch (401:12): [True: 0, False: 0]
  Branch (401:12): [Folded - Ignored]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 0, False: 1]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
402
23
            client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
403
23
        
}1
404
        // This matches the value of CLIENT_KEEPALIVE_DURATION which means that
405
        // tasks are going to be dropped all over the place, this isn't a good
406
        // setting.
407
24
        if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
  Branch (407:12): [True: 0, False: 0]
  Branch (407:12): [True: 0, False: 0]
  Branch (407:12): [Folded - Ignored]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
  Branch (407:12): [True: 0, False: 1]
408
0
            error!(
409
                client_action_timeout_s,
410
0
                "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.",
411
0
                CLIENT_KEEPALIVE_DURATION.as_secs()
412
            );
413
24
        }
414
415
24
        let mut max_job_retries = spec.max_job_retries;
416
24
        if max_job_retries == 0 {
  Branch (416:12): [True: 0, False: 0]
  Branch (416:12): [True: 0, False: 0]
  Branch (416:12): [Folded - Ignored]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 0, False: 1]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
  Branch (416:12): [True: 1, False: 0]
417
23
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
418
23
        
}1
419
420
24
        let worker_change_notify = Arc::new(Notify::new());
421
422
        // Create shared worker registry for single heartbeat per worker.
423
24
        let worker_registry = Arc::new(WorkerRegistry::new());
424
425
24
        let state_manager = SimpleSchedulerStateManager::new(
426
24
            max_job_retries,
427
24
            Duration::from_secs(worker_timeout_s),
428
24
            Duration::from_secs(client_action_timeout_s),
429
24
            awaited_action_db,
430
24
            now_fn,
431
24
            Some(worker_registry.clone()),
432
        );
433
434
24
        let worker_scheduler = ApiWorkerScheduler::new(
435
24
            state_manager.clone(),
436
24
            platform_property_manager.clone(),
437
24
            spec.allocation_strategy,
438
24
            worker_change_notify.clone(),
439
24
            worker_timeout_s,
440
24
            worker_registry,
441
        );
442
443
24
        let worker_scheduler_clone = worker_scheduler.clone();
444
445
24
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
446
24
            let weak_inner = weak_self.clone();
447
24
            let task_worker_matching_spawn =
448
24
                spawn!("simple_scheduler_task_worker_matching", async move 
{22
449
22
                    let mut last_match_successful = true;
450
22
                    let mut worker_match_logging_last: Option<Instant> = None;
451
                    // Break out of the loop only when the inner is dropped.
452
                    loop {
453
124
                        let task_change_fut = task_change_notify.notified();
454
124
                        let worker_change_fut = worker_change_notify.notified();
455
124
                        tokio::pin!(task_change_fut);
456
124
                        tokio::pin!(worker_change_fut);
457
                        // Wait for either of these futures to be ready.
458
124
                        let state_changed = future::select(task_change_fut, worker_change_fut);
459
124
                        if last_match_successful {
  Branch (459:28): [True: 0, False: 0]
  Branch (459:28): [True: 0, False: 0]
  Branch (459:28): [Folded - Ignored]
  Branch (459:28): [True: 13, False: 0]
  Branch (459:28): [True: 4, False: 0]
  Branch (459:28): [True: 2, False: 0]
  Branch (459:28): [True: 2, False: 0]
  Branch (459:28): [True: 0, False: 0]
  Branch (459:28): [True: 2, False: 0]
  Branch (459:28): [True: 4, False: 0]
  Branch (459:28): [True: 4, False: 0]
  Branch (459:28): [True: 5, False: 0]
  Branch (459:28): [True: 6, False: 0]
  Branch (459:28): [True: 8, False: 0]
  Branch (459:28): [True: 3, False: 0]
  Branch (459:28): [True: 5, False: 0]
  Branch (459:28): [True: 8, False: 0]
  Branch (459:28): [True: 7, False: 0]
  Branch (459:28): [True: 7, False: 0]
  Branch (459:28): [True: 10, False: 0]
  Branch (459:28): [True: 4, False: 0]
  Branch (459:28): [True: 4, False: 0]
  Branch (459:28): [True: 5, False: 0]
  Branch (459:28): [True: 8, False: 0]
  Branch (459:28): [True: 4, False: 1]
  Branch (459:28): [True: 8, False: 0]
  Branch (459:28): [True: 0, False: 0]
460
123
                            let _ = state_changed.await;
461
                        } else {
462
                            // If the last match failed, then run again after a short sleep.
463
                            // This resolves issues where we tried to re-schedule a job to
464
                            // a disconnected worker.  The sleep ensures we don't enter a
465
                            // hard loop if there's something wrong inside do_try_match.
466
1
                            let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
467
1
                            tokio::pin!(sleep_fut);
468
1
                            let _ = future::select(state_changed, sleep_fut).await;
469
                        }
470
471
102
                        let result = match weak_inner.upgrade() {
472
102
                            Some(scheduler) => {
473
102
                                let now = Instant::now();
474
102
                                let full_worker_logging = {
475
102
                                    match scheduler.worker_match_logging_interval {
476
99
                                        None => false,
477
3
                                        Some(duration) => match worker_match_logging_last {
478
1
                                            None => true,
479
2
                                            Some(when) => now.duration_since(when) >= duration,
480
                                        },
481
                                    }
482
                                };
483
484
102
                                let res = scheduler.do_try_match(full_worker_logging).await;
485
102
                                if full_worker_logging {
  Branch (485:36): [True: 0, False: 0]
  Branch (485:36): [True: 0, False: 0]
  Branch (485:36): [Folded - Ignored]
  Branch (485:36): [True: 0, False: 12]
  Branch (485:36): [True: 0, False: 3]
  Branch (485:36): [True: 0, False: 1]
  Branch (485:36): [True: 0, False: 1]
  Branch (485:36): [True: 0, False: 0]
  Branch (485:36): [True: 0, False: 1]
  Branch (485:36): [True: 0, False: 3]
  Branch (485:36): [True: 1, False: 2]
  Branch (485:36): [True: 0, False: 4]
  Branch (485:36): [True: 0, False: 5]
  Branch (485:36): [True: 0, False: 7]
  Branch (485:36): [True: 0, False: 2]
  Branch (485:36): [True: 0, False: 4]
  Branch (485:36): [True: 0, False: 7]
  Branch (485:36): [True: 0, False: 6]
  Branch (485:36): [True: 0, False: 6]
  Branch (485:36): [True: 0, False: 9]
  Branch (485:36): [True: 0, False: 3]
  Branch (485:36): [True: 0, False: 3]
  Branch (485:36): [True: 0, False: 4]
  Branch (485:36): [True: 0, False: 7]
  Branch (485:36): [True: 0, False: 4]
  Branch (485:36): [True: 0, False: 7]
  Branch (485:36): [True: 0, False: 0]
486
1
                                    let operations_stream = scheduler
487
1
                                        .matching_engine_state_manager
488
1
                                        .filter_operations(OperationFilter::default())
489
1
                                        .await
490
1
                                        .err_tip(|| "In action_scheduler getting filter result");
491
492
1
                                    let mut oldest_actions_in_state: HashMap<
493
1
                                        String,
494
1
                                        BTreeSet<Arc<ActionState>>,
495
1
                                    > = HashMap::new();
496
1
                                    let max_items = 5;
497
498
1
                                    match operations_stream {
499
1
                                        Ok(stream) => {
500
1
                                            let actions = stream
501
1
                                                .filter_map(|item| async move 
{0
502
0
                                                    match item.as_ref().as_state().await {
503
0
                                                        Ok((action_state, _origin_metadata)) => {
504
0
                                                            Some(action_state)
505
                                                        }
506
0
                                                        Err(e) => {
507
0
                                                            error!(
508
                                                                ?e,
509
0
                                                                "Failed to get action state!"
510
                                                            );
511
0
                                                            None
512
                                                        }
513
                                                    }
514
0
                                                })
515
1
                                                .collect::<Vec<_>>()
516
1
                                                .await;
517
1
                                            for 
action_state0
in &actions {
518
0
                                                let name = action_state.stage.name();
519
0
                                                if let Some(values) =
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [Folded - Ignored]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
  Branch (519:56): [True: 0, False: 0]
520
0
                                                    oldest_actions_in_state.get_mut(&name)
521
                                                {
522
0
                                                    values.insert(action_state.clone());
523
0
                                                    if values.len() > max_items {
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [Folded - Ignored]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
  Branch (523:56): [True: 0, False: 0]
524
0
                                                        values.pop_first();
525
0
                                                    }
526
0
                                                } else {
527
0
                                                    let mut values = BTreeSet::new();
528
0
                                                    values.insert(action_state.clone());
529
0
                                                    oldest_actions_in_state.insert(name, values);
530
0
                                                }
531
                                            }
532
                                        }
533
0
                                        Err(e) => {
534
0
                                            error!(?e, "Failed to get operations list!");
535
                                        }
536
                                    }
537
538
1
                                    for 
value0
in oldest_actions_in_state.values() {
539
0
                                        let mut items = vec![];
540
0
                                        for item in value {
541
0
                                            items.push(item.to_string());
542
0
                                        }
543
0
                                        info!(?items, "Oldest actions in state");
544
                                    }
545
546
1
                                    worker_match_logging_last.replace(now);
547
101
                                }
548
102
                                res
549
                            }
550
                            // If the inner went away it means the scheduler is shutting
551
                            // down, so we need to resolve our future.
552
0
                            None => return,
553
                        };
554
102
                        last_match_successful = result.is_ok();
555
102
                        if let Err(
err1
) = result {
  Branch (555:32): [True: 0, False: 0]
  Branch (555:32): [True: 0, False: 0]
  Branch (555:32): [Folded - Ignored]
  Branch (555:32): [True: 0, False: 12]
  Branch (555:32): [True: 0, False: 3]
  Branch (555:32): [True: 0, False: 1]
  Branch (555:32): [True: 0, False: 1]
  Branch (555:32): [True: 0, False: 0]
  Branch (555:32): [True: 0, False: 1]
  Branch (555:32): [True: 0, False: 3]
  Branch (555:32): [True: 0, False: 3]
  Branch (555:32): [True: 0, False: 4]
  Branch (555:32): [True: 0, False: 5]
  Branch (555:32): [True: 0, False: 7]
  Branch (555:32): [True: 0, False: 2]
  Branch (555:32): [True: 0, False: 4]
  Branch (555:32): [True: 0, False: 7]
  Branch (555:32): [True: 0, False: 6]
  Branch (555:32): [True: 0, False: 6]
  Branch (555:32): [True: 0, False: 9]
  Branch (555:32): [True: 0, False: 3]
  Branch (555:32): [True: 0, False: 3]
  Branch (555:32): [True: 0, False: 4]
  Branch (555:32): [True: 0, False: 7]
  Branch (555:32): [True: 1, False: 3]
  Branch (555:32): [True: 0, False: 7]
  Branch (555:32): [True: 0, False: 0]
556
1
                            error!(?err, "Error while running do_try_match");
557
101
                        }
558
559
102
                        on_matching_engine_run().await;
560
                    }
561
                    // Unreachable.
562
0
                });
563
564
24
            let worker_match_logging_interval = match spec.worker_match_logging_interval_s {
565
                // -1 or 0 means disabled (0 used to cause expensive logging on every call)
566
22
                -1 | 0 => None,
567
2
                signed_secs => {
568
2
                    if let Ok(
secs1
) = TryInto::<u64>::try_into(signed_secs) {
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [Folded - Ignored]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 1, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 0]
  Branch (568:28): [True: 0, False: 1]
569
1
                        Some(Duration::from_secs(secs))
570
                    } else {
571
1
                        error!(
572
                            worker_match_logging_interval_s = spec.worker_match_logging_interval_s,
573
1
                            "Valid values for worker_match_logging_interval_s are -1, 0, or a positive integer, setting to disabled",
574
                        );
575
1
                        None
576
                    }
577
                }
578
            };
579
24
            Self {
580
24
                matching_engine_state_manager: state_manager.clone(),
581
24
                client_state_manager: state_manager.clone(),
582
24
                worker_scheduler,
583
24
                platform_property_manager,
584
24
                maybe_origin_event_tx,
585
24
                task_worker_matching_spawn,
586
24
                worker_match_logging_interval,
587
24
            }
588
24
        });
589
24
        (action_scheduler, worker_scheduler_clone)
590
24
    }
591
}
592
593
#[async_trait]
594
impl ClientStateManager for SimpleScheduler {
595
    async fn add_action(
596
        &self,
597
        client_operation_id: OperationId,
598
        action_info: Arc<ActionInfo>,
599
30
    ) -> Result<Box<dyn ActionStateResult>, Error> {
600
        self.inner_add_action(client_operation_id, action_info)
601
            .await
602
30
    }
603
604
    async fn filter_operations<'a>(
605
        &'a self,
606
        filter: OperationFilter,
607
504
    ) -> Result<ActionStateResultStream<'a>, Error> {
608
        self.inner_filter_operations(filter).await
609
504
    }
610
611
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
612
0
        Some(self)
613
0
    }
614
}
615
616
#[async_trait]
617
impl KnownPlatformPropertyProvider for SimpleScheduler {
618
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
619
        Ok(self
620
            .worker_scheduler
621
            .get_platform_property_manager()
622
            .get_known_properties()
623
            .keys()
624
            .cloned()
625
            .collect())
626
0
    }
627
}
628
629
#[async_trait]
630
impl WorkerScheduler for SimpleScheduler {
631
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
632
0
        self.worker_scheduler.get_platform_property_manager()
633
0
    }
634
635
29
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
636
        self.worker_scheduler.add_worker(worker).await
637
29
    }
638
639
    async fn update_action(
640
        &self,
641
        worker_id: &WorkerId,
642
        operation_id: &OperationId,
643
        update: UpdateOperationType,
644
9
    ) -> Result<(), Error> {
645
        self.worker_scheduler
646
            .update_action(worker_id, operation_id, update)
647
            .await
648
9
    }
649
650
    async fn worker_keep_alive_received(
651
        &self,
652
        worker_id: &WorkerId,
653
        timestamp: WorkerTimestamp,
654
1
    ) -> Result<(), Error> {
655
        self.worker_scheduler
656
            .worker_keep_alive_received(worker_id, timestamp)
657
            .await
658
1
    }
659
660
5
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
661
        self.worker_scheduler.remove_worker(worker_id).await
662
5
    }
663
664
0
    async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
665
        self.worker_scheduler.shutdown(shutdown_guard).await;
666
0
    }
667
668
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
669
        self.worker_scheduler
670
            .remove_timedout_workers(now_timestamp)
671
            .await
672
1
    }
673
674
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
675
        self.worker_scheduler
676
            .set_drain_worker(worker_id, is_draining)
677
            .await
678
2
    }
679
}
680
681
impl RootMetricsComponent for SimpleScheduler {}