Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/simple_scheduler.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use async_trait::async_trait;
19
use futures::Future;
20
use nativelink_error::{Code, Error, ResultExt};
21
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
22
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId};
23
use nativelink_util::instant_wrapper::InstantWrapper;
24
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
25
use nativelink_util::operation_state_manager::{
26
    ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
27
    OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
28
};
29
use nativelink_util::spawn;
30
use nativelink_util::task::JoinHandleDropGuard;
31
use tokio::sync::Notify;
32
use tokio::time::Duration;
33
use tokio_stream::StreamExt;
34
use tracing::{event, Level};
35
36
use crate::api_worker_scheduler::ApiWorkerScheduler;
37
use crate::awaited_action_db::AwaitedActionDb;
38
use crate::platform_property_manager::PlatformPropertyManager;
39
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
40
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
41
use crate::worker_scheduler::WorkerScheduler;
42
43
/// Default timeout for workers in seconds.
44
/// If this changes, remember to change the documentation in the config.
45
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;
46
47
/// Default times a job can retry before failing.
48
/// If this changes, remember to change the documentation in the config.
49
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
50
51
struct SimpleSchedulerActionStateResult {
52
    client_operation_id: OperationId,
53
    action_state_result: Box<dyn ActionStateResult>,
54
}
55
56
impl SimpleSchedulerActionStateResult {
57
25
    fn new(
58
25
        client_operation_id: OperationId,
59
25
        action_state_result: Box<dyn ActionStateResult>,
60
25
    ) -> Self {
61
25
        Self {
62
25
            client_operation_id,
63
25
            action_state_result,
64
25
        }
65
25
    }
66
}
67
68
#[async_trait]
69
impl ActionStateResult for SimpleSchedulerActionStateResult {
70
3
    async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
71
3
        let mut action_state = self
72
3
            .action_state_result
73
3
            .as_state()
74
0
            .await
75
3
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
76
        // We need to ensure the client is not aware of the downstream
77
        // operation id, so override it before it goes out.
78
3
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
79
3
        Ok(action_state)
80
6
    }
81
82
40
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
83
40
        let 
mut action_state39
= self
84
40
            .action_state_result
85
40
            .changed()
86
992
            .await
87
39
            .err_tip(|| 
"In SimpleSchedulerActionStateResult"0
)
?0
;
88
        // We need to ensure the client is not aware of the downstream
89
        // operation id, so override it before it goes out.
90
39
        Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
91
39
        Ok(action_state)
92
79
    }
93
94
0
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
95
0
        self.action_state_result
96
0
            .as_action_info()
97
0
            .await
98
0
            .err_tip(|| "In SimpleSchedulerActionStateResult")
99
0
    }
100
}
101
102
/// Engine used to manage the queued/running tasks and relationship with
103
/// the worker nodes. All state on how the workers and actions are interacting
104
/// should be held in this struct.
105
0
#[derive(MetricsComponent)]
106
pub struct SimpleScheduler {
107
    /// Manager for matching engine side of the state manager.
108
    #[metric(group = "matching_engine_state_manager")]
109
    matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,
110
111
    /// Manager for client state of this scheduler.
112
    #[metric(group = "client_state_manager")]
113
    client_state_manager: Arc<dyn ClientStateManager>,
114
115
    /// Manager for platform of this scheduler.
116
    #[metric(group = "platform_properties")]
117
    platform_property_manager: Arc<PlatformPropertyManager>,
118
119
    /// A `Workers` pool that contains all workers that are available to execute actions in a priority
120
    /// order based on the allocation strategy.
121
    #[metric(group = "worker_scheduler")]
122
    worker_scheduler: Arc<ApiWorkerScheduler>,
123
124
    /// Background task that tries to match actions to workers. If this struct
125
    /// is dropped the spawn will be cancelled as well.
126
    _task_worker_matching_spawn: JoinHandleDropGuard<()>,
127
}
128
129
impl SimpleScheduler {
130
    /// Attempts to find a worker to execute an action and begins executing it.
131
    /// If an action is already running that is cacheable it may merge this
132
    /// action with the results and state changes of the already running
133
    /// action. If the task cannot be executed immediately it will be queued
134
    /// for execution based on priority and other metrics.
135
    /// All further updates to the action will be provided through the returned
136
    /// value.
137
25
    async fn inner_add_action(
138
25
        &self,
139
25
        client_operation_id: OperationId,
140
25
        action_info: Arc<ActionInfo>,
141
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
142
25
        let action_state_result = self
143
25
            .client_state_manager
144
25
            .add_action(client_operation_id.clone(), action_info)
145
0
            .await
146
25
            .err_tip(|| 
"In SimpleScheduler::add_action"0
)
?0
;
147
25
        Ok(Box::new(SimpleSchedulerActionStateResult::new(
148
25
            client_operation_id.clone(),
149
25
            action_state_result,
150
25
        )))
151
25
    }
152
153
503
    async fn inner_filter_operations(
154
503
        &self,
155
503
        filter: OperationFilter,
156
503
    ) -> Result<ActionStateResultStream, Error> {
157
503
        self.client_state_manager
158
503
            .filter_operations(filter)
159
0
            .await
160
503
            .err_tip(|| 
"In SimpleScheduler::find_by_client_operation_id getting filter result"0
)
161
503
    }
162
163
91
    async fn get_queued_operations(&self) -> Result<ActionStateResultStream, Error> {
164
91
        let filter = OperationFilter {
165
91
            stages: OperationStageFlags::Queued,
166
91
            order_by_priority_direction: Some(OrderDirection::Desc),
167
91
            ..Default::default()
168
91
        };
169
91
        self.matching_engine_state_manager
170
91
            .filter_operations(filter)
171
0
            .await
172
91
            .err_tip(|| 
"In SimpleScheduler::get_queued_operations getting filter result"0
)
173
91
    }
174
175
3
    pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
176
3
        self.do_try_match().
await0
177
3
    }
178
179
    // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
180
    // can create a map of capabilities of each worker and then try and match
181
    // the actions to the worker using the map lookup (ie. map reduce).
182
91
    async fn do_try_match(&self) -> Result<(), Error> {
183
42
        async fn match_action_to_worker(
184
42
            action_state_result: &dyn ActionStateResult,
185
42
            workers: &ApiWorkerScheduler,
186
42
            matching_engine_state_manager: &dyn MatchingEngineStateManager,
187
42
            platform_property_manager: &PlatformPropertyManager,
188
42
        ) -> Result<(), Error> {
189
42
            let action_info = action_state_result
190
42
                .as_action_info()
191
0
                .await
192
42
                .err_tip(|| 
"Failed to get action_info from as_action_info_result stream"0
)
?0
;
193
194
            // TODO(allada) We should not compute this every time and instead store
195
            // it with the ActionInfo when we receive it.
196
42
            let platform_properties = platform_property_manager
197
42
                .make_platform_properties(action_info.platform_properties.clone())
198
42
                .err_tip(|| {
199
0
                    "Failed to make platform properties in SimpleScheduler::do_try_match"
200
42
                })
?0
;
201
202
42
            let action_info = ActionInfoWithProps {
203
42
                inner: action_info,
204
42
                platform_properties,
205
42
            };
206
207
            // Try to find a worker for the action.
208
28
            let worker_id = {
209
42
                match workers
210
42
                    .find_worker_for_action(&action_info.platform_properties)
211
0
                    .await
212
                {
213
28
                    Some(worker_id) => worker_id,
214
                    // If we could not find a worker for the action,
215
                    // we have nothing to do.
216
14
                    None => return Ok(()),
217
                }
218
            };
219
220
            // Extract the operation_id from the action_state.
221
28
            let operation_id = {
222
28
                let action_state = action_state_result
223
28
                    .as_state()
224
0
                    .await
225
28
                    .err_tip(|| 
"Failed to get action_info from as_state_result stream"0
)
?0
;
226
28
                action_state.client_operation_id.clone()
227
            };
228
229
            // Tell the matching engine that the operation is being assigned to a worker.
230
28
            let assign_result = matching_engine_state_manager
231
28
                .assign_operation(&operation_id, Ok(&worker_id))
232
0
                .await
233
28
                .err_tip(|| 
"Failed to assign operation in do_try_match"1
);
234
28
            if let Err(
err1
) = assign_result {
  Branch (234:20): [True: 0, False: 0]
  Branch (234:20): [Folded - Ignored]
  Branch (234:20): [True: 1, False: 27]
235
1
                if err.code == Code::Aborted {
  Branch (235:20): [True: 0, False: 0]
  Branch (235:20): [Folded - Ignored]
  Branch (235:20): [True: 0, False: 1]
236
                    // If the operation was aborted, it means that the operation was
237
                    // cancelled due to another operation being assigned to the worker.
238
0
                    return Ok(());
239
1
                }
240
1
                // Any other error is a real error.
241
1
                return Err(err);
242
27
            }
243
27
244
27
            // Notify the worker to run the action.
245
27
            {
246
27
                workers
247
27
                    .worker_notify_run_action(worker_id, operation_id, action_info)
248
0
                    .await
249
27
                    .err_tip(|| {
250
1
                        "Failed to run worker_notify_run_action in SimpleScheduler::do_try_match"
251
27
                    })
252
            }
253
42
        }
254
255
91
        let mut result = Ok(());
256
257
91
        let mut stream = self
258
91
            .get_queued_operations()
259
0
            .await
260
91
            .err_tip(|| 
"Failed to get queued operations in do_try_match"0
)
?0
;
261
262
133
        while let Some(
action_state_result42
) = stream.next().
await0
{
  Branch (262:19): [True: 0, False: 0]
  Branch (262:19): [Folded - Ignored]
  Branch (262:19): [True: 42, False: 91]
263
42
            result = result.merge(
264
42
                match_action_to_worker(
265
42
                    action_state_result.as_ref(),
266
42
                    self.worker_scheduler.as_ref(),
267
42
                    self.matching_engine_state_manager.as_ref(),
268
42
                    self.platform_property_manager.as_ref(),
269
42
                )
270
0
                .await,
271
            );
272
        }
273
91
        result
274
91
    }
275
}
276
277
impl SimpleScheduler {
278
0
    pub fn new<A: AwaitedActionDb>(
279
0
        scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
280
0
        awaited_action_db: A,
281
0
        task_change_notify: Arc<Notify>,
282
0
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
283
0
        Self::new_with_callback(
284
0
            scheduler_cfg,
285
0
            awaited_action_db,
286
0
            || {
287
0
                // The cost of running `do_try_match()` is very high, but constant
288
0
                // in relation to the number of changes that have happened. This
289
0
                // means that grabbing this lock to process `do_try_match()` should
290
0
                // always yield to any other tasks that might want the lock. The
291
0
                // easiest and most fair way to do this is to sleep for a small
292
0
                // amount of time. Using something like tokio::task::yield_now()
293
0
                // does not yield as aggresively as we'd like if new futures are
294
0
                // scheduled within a future.
295
0
                tokio::time::sleep(Duration::from_millis(1))
296
0
            },
297
0
            task_change_notify,
298
0
            SystemTime::now,
299
0
        )
300
0
    }
301
302
21
    pub fn new_with_callback<
303
21
        Fut: Future<Output = ()> + Send,
304
21
        F: Fn() -> Fut + Send + Sync + 'static,
305
21
        A: AwaitedActionDb,
306
21
        I: InstantWrapper,
307
21
        NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
308
21
    >(
309
21
        scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
310
21
        awaited_action_db: A,
311
21
        on_matching_engine_run: F,
312
21
        task_change_notify: Arc<Notify>,
313
21
        now_fn: NowFn,
314
21
    ) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
315
21
        let platform_property_manager = Arc::new(PlatformPropertyManager::new(
316
21
            scheduler_cfg
317
21
                .supported_platform_properties
318
21
                .clone()
319
21
                .unwrap_or_default(),
320
21
        ));
321
21
322
21
        let mut worker_timeout_s = scheduler_cfg.worker_timeout_s;
323
21
        if worker_timeout_s == 0 {
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [True: 0, False: 0]
  Branch (323:12): [Folded - Ignored]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 0, False: 1]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 0, False: 1]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 0, False: 1]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 0, False: 1]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
  Branch (323:12): [True: 1, False: 0]
324
17
            worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
325
17
        }
4
326
327
21
        let mut max_job_retries = scheduler_cfg.max_job_retries;
328
21
        if max_job_retries == 0 {
  Branch (328:12): [True: 0, False: 0]
  Branch (328:12): [True: 0, False: 0]
  Branch (328:12): [Folded - Ignored]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 0, False: 1]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
  Branch (328:12): [True: 1, False: 0]
329
20
            max_job_retries = DEFAULT_MAX_JOB_RETRIES;
330
20
        }
1
331
332
21
        let worker_change_notify = Arc::new(Notify::new());
333
21
        let state_manager = SimpleSchedulerStateManager::new(
334
21
            max_job_retries,
335
21
            Duration::from_secs(worker_timeout_s),
336
21
            awaited_action_db,
337
21
            now_fn,
338
21
        );
339
21
340
21
        let worker_scheduler = ApiWorkerScheduler::new(
341
21
            state_manager.clone(),
342
21
            platform_property_manager.clone(),
343
21
            scheduler_cfg.allocation_strategy,
344
21
            worker_change_notify.clone(),
345
21
            worker_timeout_s,
346
21
        );
347
21
348
21
        let worker_scheduler_clone = worker_scheduler.clone();
349
21
350
21
        let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
351
21
            let weak_inner = weak_self.clone();
352
21
            let task_worker_matching_spawn =
353
21
                spawn!("simple_scheduler_task_worker_matching", async move 
{20
354
                    // Break out of the loop only when the inner is dropped.
355
                    loop {
356
108
                        let task_change_fut = task_change_notify.notified();
357
108
                        let worker_change_fut = worker_change_notify.notified();
358
108
                        tokio::pin!(task_change_fut);
359
108
                        tokio::pin!(worker_change_fut);
360
108
                        // Wait for either of these futures to be ready.
361
108
                        let _ = futures::future::select(task_change_fut, worker_change_fut).
await42
;
362
88
                        let result = match weak_inner.upgrade() {
363
88
                            Some(scheduler) => scheduler.do_try_match().
await0
,
364
                            // If the inner went away it means the scheduler is shutting
365
                            // down, so we need to resolve our future.
366
0
                            None => return,
367
                        };
368
88
                        if let Err(
err1
) = result {
  Branch (368:32): [True: 0, False: 0]
  Branch (368:32): [True: 0, False: 0]
  Branch (368:32): [Folded - Ignored]
  Branch (368:32): [True: 0, False: 3]
  Branch (368:32): [True: 0, False: 1]
  Branch (368:32): [True: 0, False: 1]
  Branch (368:32): [True: 0, False: 0]
  Branch (368:32): [True: 0, False: 1]
  Branch (368:32): [True: 0, False: 3]
  Branch (368:32): [True: 0, False: 3]
  Branch (368:32): [True: 0, False: 4]
  Branch (368:32): [True: 0, False: 5]
  Branch (368:32): [True: 0, False: 7]
  Branch (368:32): [True: 0, False: 4]
  Branch (368:32): [True: 0, False: 7]
  Branch (368:32): [True: 0, False: 6]
  Branch (368:32): [True: 0, False: 6]
  Branch (368:32): [True: 0, False: 9]
  Branch (368:32): [True: 0, False: 3]
  Branch (368:32): [True: 0, False: 3]
  Branch (368:32): [True: 0, False: 4]
  Branch (368:32): [True: 0, False: 7]
  Branch (368:32): [True: 1, False: 3]
  Branch (368:32): [True: 0, False: 7]
369
1
                            event!(Level::ERROR, ?err, "Error while running do_try_match");
370
87
                        }
371
372
88
                        on_matching_engine_run().
await0
;
373
                    }
374
                    // Unreachable.
375
21
                
}0
);
376
21
            SimpleScheduler {
377
21
                matching_engine_state_manager: state_manager.clone(),
378
21
                client_state_manager: state_manager.clone(),
379
21
                worker_scheduler,
380
21
                platform_property_manager,
381
21
                _task_worker_matching_spawn: task_worker_matching_spawn,
382
21
            }
383
21
        });
384
21
        (action_scheduler, worker_scheduler_clone)
385
21
    }
386
}
387
388
#[async_trait]
389
impl ClientStateManager for SimpleScheduler {
390
    async fn add_action(
391
        &self,
392
        client_operation_id: OperationId,
393
        action_info: Arc<ActionInfo>,
394
25
    ) -> Result<Box<dyn ActionStateResult>, Error> {
395
25
        self.inner_add_action(client_operation_id, action_info)
396
0
            .await
397
50
    }
398
399
    async fn filter_operations<'a>(
400
        &'a self,
401
        filter: OperationFilter,
402
503
    ) -> Result<ActionStateResultStream<'a>, Error> {
403
503
        self.inner_filter_operations(filter).
await0
404
1.00k
    }
405
406
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
407
0
        Some(self)
408
0
    }
409
}
410
411
#[async_trait]
412
impl KnownPlatformPropertyProvider for SimpleScheduler {
413
0
    async fn get_known_properties(&self, _instance_name: &str) -> Result<Vec<String>, Error> {
414
0
        Ok(self
415
0
            .worker_scheduler
416
0
            .get_platform_property_manager()
417
0
            .get_known_properties()
418
0
            .keys()
419
0
            .cloned()
420
0
            .collect())
421
0
    }
422
}
423
424
#[async_trait]
425
impl WorkerScheduler for SimpleScheduler {
426
0
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
427
0
        self.worker_scheduler.get_platform_property_manager()
428
0
    }
429
430
25
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
431
25
        self.worker_scheduler.add_worker(worker).
await0
432
50
    }
433
434
    async fn update_action(
435
        &self,
436
        worker_id: &WorkerId,
437
        operation_id: &OperationId,
438
        update: UpdateOperationType,
439
9
    ) -> Result<(), Error> {
440
9
        self.worker_scheduler
441
9
            .update_action(worker_id, operation_id, update)
442
0
            .await
443
18
    }
444
445
    async fn worker_keep_alive_received(
446
        &self,
447
        worker_id: &WorkerId,
448
        timestamp: WorkerTimestamp,
449
1
    ) -> Result<(), Error> {
450
1
        self.worker_scheduler
451
1
            .worker_keep_alive_received(worker_id, timestamp)
452
0
            .await
453
2
    }
454
455
1
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
456
1
        self.worker_scheduler.remove_worker(worker_id).
await0
457
2
    }
458
459
1
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
460
1
        self.worker_scheduler
461
1
            .remove_timedout_workers(now_timestamp)
462
0
            .await
463
2
    }
464
465
2
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
466
2
        self.worker_scheduler
467
2
            .set_drain_worker(worker_id, is_draining)
468
0
            .await
469
4
    }
470
}
471
472
impl RootMetricsComponent for SimpleScheduler {}