Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/api_worker_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::ops::{Deref, DerefMut};
16
use std::sync::Arc;
17
18
use async_lock::Mutex;
19
use lru::LruCache;
20
use nativelink_config::schedulers::WorkerAllocationStrategy;
21
use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
    RootMetricsComponent, group,
25
};
26
use nativelink_util::action_messages::{OperationId, WorkerId};
27
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
28
use nativelink_util::platform_properties::PlatformProperties;
29
use nativelink_util::spawn;
30
use nativelink_util::task::JoinHandleDropGuard;
31
use tokio::sync::Notify;
32
use tokio::sync::mpsc::{self, UnboundedSender};
33
use tonic::async_trait;
34
use tracing::{Level, event};
35
36
use crate::platform_property_manager::PlatformPropertyManager;
37
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
38
use crate::worker_scheduler::WorkerScheduler;
39
40
#[derive(Debug)]
41
struct Workers(LruCache<WorkerId, Worker>);
42
43
impl Deref for Workers {
44
    type Target = LruCache<WorkerId, Worker>;
45
46
57
    fn deref(&self) -> &Self::Target {
47
57
        &self.0
48
57
    }
49
}
50
51
impl DerefMut for Workers {
52
109
    fn deref_mut(&mut self) -> &mut Self::Target {
53
109
        &mut self.0
54
109
    }
55
}
56
57
// Note: This could not be a derive macro because this derive-macro
58
// does n ot support LruCache and nameless field structs.
59
impl MetricsComponent for Workers {
60
0
    fn publish(
61
0
        &self,
62
0
        _kind: MetricKind,
63
0
        _field_metadata: MetricFieldData,
64
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
65
0
        let _enter = group!("workers").entered();
66
0
        for (worker_id, worker) in self.iter() {
67
0
            let _enter = group!(worker_id).entered();
68
0
            worker.publish(MetricKind::Component, MetricFieldData::default())?;
69
        }
70
0
        Ok(MetricPublishKnownKindData::Component)
71
0
    }
72
}
73
74
/// A collection of workers that are available to run tasks.
75
#[derive(MetricsComponent)]
76
struct ApiWorkerSchedulerImpl {
77
    /// A `LruCache` of workers availabled based on `allocation_strategy`.
78
    #[metric(group = "workers")]
79
    workers: Workers,
80
81
    /// The worker state manager.
82
    #[metric(group = "worker_state_manager")]
83
    worker_state_manager: Arc<dyn WorkerStateManager>,
84
    /// The allocation strategy for workers.
85
    allocation_strategy: WorkerAllocationStrategy,
86
    /// A channel to notify the matching engine that the worker pool has changed.
87
    worker_change_notify: Arc<Notify>,
88
    /// A channel to notify that an operation is still alive.
89
    operation_keep_alive_tx: UnboundedSender<(OperationId, WorkerId)>,
90
}
91
92
impl std::fmt::Debug for ApiWorkerSchedulerImpl {
93
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94
0
        f.debug_struct("ApiWorkerSchedulerImpl")
95
0
            .field("workers", &self.workers)
96
0
            .field("allocation_strategy", &self.allocation_strategy)
97
0
            .field("worker_change_notify", &self.worker_change_notify)
98
0
            .field("operation_keep_alive_tx", &self.operation_keep_alive_tx)
99
0
            .finish_non_exhaustive()
100
0
    }
101
}
102
103
impl ApiWorkerSchedulerImpl {
104
    /// Refreshes the lifetime of the worker with the given timestamp.
105
2
    fn refresh_lifetime(
106
2
        &mut self,
107
2
        worker_id: &WorkerId,
108
2
        timestamp: WorkerTimestamp,
109
2
    ) -> Result<(), Error> {
110
2
        let worker = self.workers.0.peek_mut(worker_id).ok_or_else(|| {
111
0
            make_input_err!(
112
0
                "Worker not found in worker map in refresh_lifetime() {}",
113
0
                worker_id
114
0
            )
115
0
        })?;
116
0
        error_if!(
117
2
            worker.last_update_timestamp > timestamp,
  Branch (117:13): [True: 0, False: 2]
  Branch (117:13): [Folded - Ignored]
118
            "Worker already had a timestamp of {}, but tried to update it with {}",
119
            worker.last_update_timestamp,
120
            timestamp
121
        );
122
2
        worker.last_update_timestamp = timestamp;
123
2
        for 
operation_id0
in worker.running_action_infos.keys() {
124
0
            if self
  Branch (124:16): [True: 0, False: 0]
  Branch (124:16): [Folded - Ignored]
125
0
                .operation_keep_alive_tx
126
0
                .send((operation_id.clone(), worker_id.clone()))
127
0
                .is_err()
128
            {
129
0
                event!(
130
0
                    Level::ERROR,
131
                    ?operation_id,
132
                    ?worker_id,
133
0
                    "OperationKeepAliveTx stream closed"
134
                );
135
0
            }
136
        }
137
2
        Ok(())
138
2
    }
139
140
    /// Adds a worker to the pool.
141
    /// Note: This function will not do any task matching.
142
31
    fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
143
31
        let worker_id = worker.id.clone();
144
31
        self.workers.put(worker_id.clone(), worker);
145
31
146
31
        // Worker is not cloneable, and we do not want to send the initial connection results until
147
31
        // we have added it to the map, or we might get some strange race conditions due to the way
148
31
        // the multi-threaded runtime works.
149
31
        let worker = self.workers.peek_mut(&worker_id).unwrap();
150
31
        let res = worker
151
31
            .send_initial_connection_result()
152
31
            .err_tip(|| 
"Failed to send initial connection result to worker"0
);
153
31
        if let Err(
err0
) = &res {
  Branch (153:16): [True: 0, False: 31]
  Branch (153:16): [Folded - Ignored]
154
0
            event!(
155
0
                Level::ERROR,
156
                ?worker_id,
157
                ?err,
158
0
                "Worker connection appears to have been closed while adding to pool"
159
            );
160
31
        }
161
31
        self.worker_change_notify.notify_one();
162
31
        res
163
31
    }
164
165
    /// Removes worker from pool.
166
    /// Note: The caller is responsible for any rescheduling of any tasks that might be
167
    /// running.
168
6
    fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
169
6
        let result = self.workers.pop(worker_id);
170
6
        self.worker_change_notify.notify_one();
171
6
        result
172
6
    }
173
174
    /// Sets if the worker is draining or not.
175
2
    async fn set_drain_worker(
176
2
        &mut self,
177
2
        worker_id: &WorkerId,
178
2
        is_draining: bool,
179
2
    ) -> Result<(), Error> {
180
2
        let worker = self
181
2
            .workers
182
2
            .get_mut(worker_id)
183
2
            .err_tip(|| 
format!("Worker {worker_id} doesn't exist in the pool")0
)
?0
;
184
2
        worker.is_draining = is_draining;
185
2
        self.worker_change_notify.notify_one();
186
2
        Ok(())
187
2
    }
188
189
45
    fn inner_find_worker_for_action(
190
45
        &self,
191
45
        platform_properties: &PlatformProperties,
192
45
    ) -> Option<WorkerId> {
193
45
        let mut workers_iter = self.workers.iter();
194
45
        let workers_iter = match self.allocation_strategy {
195
            // Use rfind to get the least recently used that satisfies the properties.
196
45
            WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter.rfind(|(_, w)| {
197
34
                w.can_accept_work() && 
platform_properties.is_satisfied_by(&w.platform_properties33
)
  Branch (197:17): [True: 33, False: 1]
  Branch (197:17): [Folded - Ignored]
198
34
            }),
199
            // Use find to get the most recently used that satisfies the properties.
200
0
            WorkerAllocationStrategy::MostRecentlyUsed => workers_iter.find(|(_, w)| {
201
0
                w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
  Branch (201:17): [True: 0, False: 0]
  Branch (201:17): [Folded - Ignored]
202
0
            }),
203
        };
204
45
        workers_iter.map(|(_, w)| 
w.id.clone()28
)
205
45
    }
206
207
10
    async fn update_action(
208
10
        &mut self,
209
10
        worker_id: &WorkerId,
210
10
        operation_id: &OperationId,
211
10
        update: UpdateOperationType,
212
10
    ) -> Result<(), Error> {
213
10
        let worker = self.workers.get_mut(worker_id).err_tip(|| {
214
0
            format!("Worker {worker_id} does not exist in SimpleScheduler::update_action")
215
0
        })?;
216
217
        // Ensure the worker is supposed to be running the operation.
218
10
        if !worker.running_action_infos.contains_key(operation_id) {
  Branch (218:12): [True: 1, False: 9]
  Branch (218:12): [Folded - Ignored]
219
1
            let err = make_err!(
220
1
                Code::Internal,
221
1
                "Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
222
1
            );
223
1
            return Result::<(), _>::Err(err.clone())
224
1
                .merge(self.immediate_evict_worker(worker_id, err).await);
225
9
        }
226
227
9
        let (is_finished, due_to_backpressure) = match &update {
228
6
            UpdateOperationType::UpdateWithActionStage(action_stage) => {
229
6
                (action_stage.is_finished(), false)
230
            }
231
0
            UpdateOperationType::KeepAlive => (false, false),
232
3
            UpdateOperationType::UpdateWithError(err) => {
233
3
                (true, err.code == Code::ResourceExhausted)
234
            }
235
        };
236
237
        // Update the operation in the worker state manager.
238
        {
239
9
            let update_operation_res = self
240
9
                .worker_state_manager
241
9
                .update_operation(operation_id, worker_id, update)
242
9
                .await
243
9
                .err_tip(|| 
"in update_operation on SimpleScheduler::update_action"0
);
244
9
            if let Err(
err0
) = update_operation_res {
  Branch (244:20): [True: 0, False: 9]
  Branch (244:20): [Folded - Ignored]
245
0
                event!(
246
0
                    Level::ERROR,
247
                    ?operation_id,
248
                    ?worker_id,
249
                    ?err,
250
0
                    "Failed to update_operation on update_action"
251
                );
252
0
                return Err(err);
253
9
            }
254
9
        }
255
9
256
9
        if !is_finished {
  Branch (256:12): [True: 0, False: 9]
  Branch (256:12): [Folded - Ignored]
257
0
            return Ok(());
258
9
        }
259
260
        // Clear this action from the current worker if finished.
261
9
        let complete_action_res = {
262
9
            let was_paused = !worker.can_accept_work();
263
264
            // Note: We need to run this before dealing with backpressure logic.
265
9
            let complete_action_res = worker.complete_action(operation_id).await;
266
267
            // Only pause if there's an action still waiting that will unpause.
268
9
            if (was_paused || due_to_backpressure) && 
worker.has_actions0
() {
  Branch (268:17): [True: 0, False: 9]
  Branch (268:31): [True: 0, False: 9]
  Branch (268:55): [True: 0, False: 0]
  Branch (268:17): [Folded - Ignored]
  Branch (268:31): [Folded - Ignored]
  Branch (268:55): [Folded - Ignored]
269
0
                worker.is_paused = true;
270
9
            }
271
9
            complete_action_res
272
9
        };
273
9
274
9
        self.worker_change_notify.notify_one();
275
9
276
9
        complete_action_res
277
10
    }
278
279
    /// Notifies the specified worker to run the given action and handles errors by evicting
280
    /// the worker if the notification fails.
281
28
    async fn worker_notify_run_action(
282
28
        &mut self,
283
28
        worker_id: WorkerId,
284
28
        operation_id: OperationId,
285
28
        action_info: ActionInfoWithProps,
286
28
    ) -> Result<(), Error> {
287
28
        if let Some(worker) = self.workers.get_mut(&worker_id) {
  Branch (287:16): [True: 0, False: 0]
  Branch (287:16): [Folded - Ignored]
  Branch (287:16): [True: 27, False: 0]
  Branch (287:16): [True: 1, False: 0]
288
28
            let notify_worker_result = worker
289
28
                .notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
290
28
                .await;
291
292
28
            if notify_worker_result.is_err() {
  Branch (292:16): [True: 0, False: 0]
  Branch (292:16): [Folded - Ignored]
  Branch (292:16): [True: 1, False: 26]
  Branch (292:16): [True: 0, False: 1]
293
1
                event!(
294
1
                    Level::WARN,
295
                    ?worker_id,
296
                    ?action_info,
297
                    ?notify_worker_result,
298
1
                    "Worker command failed, removing worker",
299
                );
300
301
1
                let err = make_err!(
302
1
                    Code::Internal,
303
1
                    "Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
304
1
                );
305
1
306
1
                return Result::<(), _>::Err(err.clone())
307
1
                    .merge(self.immediate_evict_worker(&worker_id, err).await);
308
27
            }
309
        } else {
310
0
            event!(
311
0
                Level::WARN,
312
                ?worker_id,
313
                ?operation_id,
314
                ?action_info,
315
0
                "Worker not found in worker map in worker_notify_run_action"
316
            );
317
        }
318
27
        Ok(())
319
28
    }
320
321
    /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
322
6
    async fn immediate_evict_worker(
323
6
        &mut self,
324
6
        worker_id: &WorkerId,
325
6
        err: Error,
326
6
    ) -> Result<(), Error> {
327
6
        let mut result = Ok(());
328
6
        if let Some(mut worker) = self.remove_worker(worker_id) {
  Branch (328:16): [True: 5, False: 0]
  Branch (328:16): [Folded - Ignored]
  Branch (328:16): [True: 1, False: 0]
  Branch (328:16): [True: 0, False: 0]
329
            // We don't care if we fail to send message to worker, this is only a best attempt.
330
6
            drop(worker.notify_update(WorkerUpdate::Disconnect).await);
331
6
            for (
operation_id4
, _) in worker.running_action_infos.drain() {
332
4
                result = result.merge(
333
4
                    self.worker_state_manager
334
4
                        .update_operation(
335
4
                            &operation_id,
336
4
                            worker_id,
337
4
                            UpdateOperationType::UpdateWithError(err.clone()),
338
4
                        )
339
4
                        .await,
340
                );
341
            }
342
0
        }
343
        // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
344
        // TODO(allada) This should be moved to inside the Workers struct.
345
6
        self.worker_change_notify.notify_one();
346
6
        result
347
6
    }
348
}
349
350
#[derive(Debug, MetricsComponent)]
351
pub struct ApiWorkerScheduler {
352
    #[metric]
353
    inner: Mutex<ApiWorkerSchedulerImpl>,
354
    #[metric(group = "platform_property_manager")]
355
    platform_property_manager: Arc<PlatformPropertyManager>,
356
357
    #[metric(
358
        help = "Timeout of how long to evict workers if no response in this given amount of time in seconds."
359
    )]
360
    worker_timeout_s: u64,
361
    _operation_keep_alive_spawn: JoinHandleDropGuard<()>,
362
}
363
364
impl ApiWorkerScheduler {
365
28
    pub fn new(
366
28
        worker_state_manager: Arc<dyn WorkerStateManager>,
367
28
        platform_property_manager: Arc<PlatformPropertyManager>,
368
28
        allocation_strategy: WorkerAllocationStrategy,
369
28
        worker_change_notify: Arc<Notify>,
370
28
        worker_timeout_s: u64,
371
28
    ) -> Arc<Self> {
372
28
        let (operation_keep_alive_tx, mut operation_keep_alive_rx) = mpsc::unbounded_channel();
373
28
        Arc::new(Self {
374
28
            inner: Mutex::new(ApiWorkerSchedulerImpl {
375
28
                workers: Workers(LruCache::unbounded()),
376
28
                worker_state_manager: worker_state_manager.clone(),
377
28
                allocation_strategy,
378
28
                worker_change_notify,
379
28
                operation_keep_alive_tx,
380
28
            }),
381
28
            platform_property_manager,
382
28
            worker_timeout_s,
383
28
            _operation_keep_alive_spawn: spawn!(
384
                "simple_scheduler_operation_keep_alive",
385
23
                async move {
386
                    const RECV_MANY_LIMIT: usize = 256;
387
23
                    let mut messages = Vec::with_capacity(RECV_MANY_LIMIT);
388
                    loop {
389
23
                        messages.clear();
390
23
                        operation_keep_alive_rx
391
23
                            .recv_many(&mut messages, RECV_MANY_LIMIT)
392
23
                            .await;
393
0
                        if messages.is_empty() {
  Branch (393:28): [True: 0, False: 0]
  Branch (393:28): [Folded - Ignored]
394
0
                            return; // Looks like our sender has been dropped.
395
0
                        }
396
0
                        for (operation_id, worker_id) in messages.drain(..) {
397
0
                            let update_operation_res = worker_state_manager
398
0
                                .update_operation(
399
0
                                    &operation_id,
400
0
                                    &worker_id,
401
0
                                    UpdateOperationType::KeepAlive,
402
0
                                )
403
0
                                .await;
404
0
                            if let Err(err) = update_operation_res {
  Branch (404:36): [True: 0, False: 0]
  Branch (404:36): [Folded - Ignored]
405
0
                                event!(
406
0
                                    Level::WARN,
407
                                    ?err,
408
0
                                    "Error while running worker_keep_alive_received, maybe job is done?"
409
                                );
410
0
                            }
411
                        }
412
                    }
413
0
                }
414
            ),
415
        })
416
28
    }
417
418
28
    pub async fn worker_notify_run_action(
419
28
        &self,
420
28
        worker_id: WorkerId,
421
28
        operation_id: OperationId,
422
28
        action_info: ActionInfoWithProps,
423
28
    ) -> Result<(), Error> {
424
28
        let mut inner = self.inner.lock().await;
425
28
        inner
426
28
            .worker_notify_run_action(worker_id, operation_id, action_info)
427
28
            .await
428
28
    }
429
430
    /// Attempts to find a worker that is capable of running this action.
431
    // TODO(blaise.bruer) This algorithm is not very efficient. Simple testing using a tree-like
432
    // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
433
    // simulation of worst cases in a single threaded environment.
434
45
    pub async fn find_worker_for_action(
435
45
        &self,
436
45
        platform_properties: &PlatformProperties,
437
45
    ) -> Option<WorkerId> {
438
45
        let inner = self.inner.lock().await;
439
45
        inner.inner_find_worker_for_action(platform_properties)
440
45
    }
441
442
    /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
443
    #[must_use]
444
7
    pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
445
7
        let inner = self.inner.lock().await;
446
7
        inner.workers.contains(worker_id)
447
7
    }
448
449
    /// A unit test function used to send the keep alive message to the worker from the server.
450
0
    pub async fn send_keep_alive_to_worker_for_test(
451
0
        &self,
452
0
        worker_id: &WorkerId,
453
1
    ) -> Result<(), Error> {
454
1
        let mut inner = self.inner.lock().await;
455
1
        let worker = inner.workers.get_mut(worker_id).ok_or_else(|| {
456
0
            make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
457
0
        })?;
458
1
        worker.keep_alive()
459
1
    }
460
}
461
462
#[async_trait]
463
impl WorkerScheduler for ApiWorkerScheduler {
464
1
    fn get_platform_property_manager(&self) -> &PlatformPropertyManager {
465
1
        self.platform_property_manager.as_ref()
466
1
    }
467
468
62
    async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
469
31
        let mut inner = self.inner.lock().await;
470
31
        let worker_id = worker.id.clone();
471
31
        let result = inner
472
31
            .add_worker(worker)
473
31
            .err_tip(|| 
"Error while adding worker, removing from pool"0
);
474
31
        if let Err(
err0
) = result {
  Branch (474:16): [True: 0, False: 31]
  Branch (474:16): [Folded - Ignored]
475
0
            return Result::<(), _>::Err(err.clone())
476
0
                .merge(inner.immediate_evict_worker(&worker_id, err).await);
477
31
        }
478
31
        Ok(())
479
62
    }
480
481
    async fn update_action(
482
        &self,
483
        worker_id: &WorkerId,
484
        operation_id: &OperationId,
485
        update: UpdateOperationType,
486
20
    ) -> Result<(), Error> {
487
10
        let mut inner = self.inner.lock().await;
488
10
        inner.update_action(worker_id, operation_id, update).await
489
20
    }
490
491
    async fn worker_keep_alive_received(
492
        &self,
493
        worker_id: &WorkerId,
494
        timestamp: WorkerTimestamp,
495
4
    ) -> Result<(), Error> {
496
2
        let mut inner = self.inner.lock().await;
497
2
        inner
498
2
            .refresh_lifetime(worker_id, timestamp)
499
2
            .err_tip(|| 
"Error refreshing lifetime in worker_keep_alive_received()"0
)
500
4
    }
501
502
4
    async fn remove_worker(&self, worker_id: &WorkerId) -> Result<(), Error> {
503
2
        let mut inner = self.inner.lock().await;
504
2
        inner
505
2
            .immediate_evict_worker(
506
2
                worker_id,
507
2
                make_err!(Code::Internal, "Received request to remove worker"),
508
2
            )
509
2
            .await
510
4
    }
511
512
10
    async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
513
5
        let mut inner = self.inner.lock().await;
514
515
5
        let mut result = Ok(());
516
5
        // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
517
5
        // map most of the time.
518
5
        let worker_ids_to_remove: Vec<WorkerId> = inner
519
5
            .workers
520
5
            .iter()
521
5
            .rev()
522
6
            .map_while(|(worker_id, worker)| {
523
6
                if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
  Branch (523:20): [True: 2, False: 4]
  Branch (523:20): [Folded - Ignored]
524
2
                    Some(worker_id.clone())
525
                } else {
526
4
                    None
527
                }
528
6
            })
529
5
            .collect();
530
7
        for 
worker_id2
in &worker_ids_to_remove {
531
2
            event!(
532
2
                Level::WARN,
533
                ?worker_id,
534
2
                "Worker timed out, removing from pool"
535
            );
536
2
            result = result.merge(
537
2
                inner
538
2
                    .immediate_evict_worker(
539
2
                        worker_id,
540
2
                        make_err!(
541
2
                            Code::Internal,
542
2
                            "Worker {worker_id} timed out, removing from pool"
543
2
                        ),
544
2
                    )
545
2
                    .await,
546
            );
547
        }
548
549
5
        result
550
10
    }
551
552
4
    async fn set_drain_worker(&self, worker_id: &WorkerId, is_draining: bool) -> Result<(), Error> {
553
2
        let mut inner = self.inner.lock().await;
554
2
        inner.set_drain_worker(worker_id, is_draining).await
555
4
    }
556
}
557
558
impl RootMetricsComponent for ApiWorkerScheduler {}