Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use nativelink_config::schedulers::{
19
    ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec,
20
};
21
use nativelink_config::stores::EvictionPolicy;
22
use nativelink_error::{Error, ResultExt, make_input_err};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_store::redis_store::RedisStore;
25
use nativelink_store::store_manager::StoreManager;
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::operation_state_manager::ClientStateManager;
28
use redis::aio::{ConnectionManager, PubSub};
29
use tokio::sync::{Notify, mpsc};
30
31
use crate::cache_lookup_scheduler::CacheLookupScheduler;
32
use crate::grpc_scheduler::GrpcScheduler;
33
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
34
use crate::property_modifier_scheduler::PropertyModifierScheduler;
35
use crate::simple_scheduler::SimpleScheduler;
36
use crate::store_awaited_action_db::StoreAwaitedActionDb;
37
use crate::worker_scheduler::WorkerScheduler;
38
39
/// Default timeout for recently completed actions in seconds.
40
/// If this changes, remember to change the documentation in the config.
41
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;
42
43
pub type SchedulerFactoryResults = (
44
    Option<Arc<dyn ClientStateManager>>,
45
    Option<Arc<dyn WorkerScheduler>>,
46
);
47
48
0
pub fn scheduler_factory(
49
0
    spec: &SchedulerSpec,
50
0
    store_manager: &StoreManager,
51
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
52
0
) -> Result<SchedulerFactoryResults, Error> {
53
0
    inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx)
54
0
}
55
56
0
fn inner_scheduler_factory(
57
0
    spec: &SchedulerSpec,
58
0
    store_manager: &StoreManager,
59
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
60
0
) -> Result<SchedulerFactoryResults, Error> {
61
0
    let scheduler: SchedulerFactoryResults = match spec {
62
0
        SchedulerSpec::Simple(spec) => {
63
0
            simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)?
64
        }
65
0
        SchedulerSpec::Grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
66
0
        SchedulerSpec::CacheLookup(spec) => {
67
0
            let ac_store = store_manager
68
0
                .get_store(&spec.ac_store)
69
0
                .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
70
0
            let (action_scheduler, worker_scheduler) =
71
0
                inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
72
0
                    .err_tip(|| "In nested CacheLookupScheduler construction")?;
73
0
            let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
74
0
                ac_store,
75
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
76
0
            )?);
77
0
            (Some(cache_lookup_scheduler), worker_scheduler)
78
        }
79
0
        SchedulerSpec::PropertyModifier(spec) => {
80
0
            let (action_scheduler, worker_scheduler) =
81
0
                inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
82
0
                    .err_tip(|| "In nested PropertyModifierScheduler construction")?;
83
0
            let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
84
0
                spec,
85
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
86
            ));
87
0
            (Some(property_modifier_scheduler), worker_scheduler)
88
        }
89
    };
90
91
0
    Ok(scheduler)
92
0
}
93
94
0
fn simple_scheduler_factory(
95
0
    spec: &SimpleSpec,
96
0
    store_manager: &StoreManager,
97
0
    now_fn: fn() -> SystemTime,
98
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
99
0
) -> Result<SchedulerFactoryResults, Error> {
100
0
    match spec
101
0
        .experimental_backend
102
0
        .as_ref()
103
0
        .unwrap_or(&ExperimentalSimpleSchedulerBackend::Memory)
104
    {
105
        ExperimentalSimpleSchedulerBackend::Memory => {
106
0
            let task_change_notify = Arc::new(Notify::new());
107
0
            let awaited_action_db = memory_awaited_action_db_factory(
108
0
                spec.retain_completed_for_s,
109
0
                &task_change_notify,
110
                SystemTime::now,
111
            );
112
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
113
0
                spec,
114
0
                awaited_action_db,
115
0
                task_change_notify,
116
0
                maybe_origin_event_tx.cloned(),
117
0
            );
118
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
119
        }
120
0
        ExperimentalSimpleSchedulerBackend::Redis(redis_config) => {
121
0
            let store = store_manager
122
0
                .get_store(redis_config.redis_store.as_ref())
123
0
                .err_tip(|| {
124
0
                    format!(
125
0
                        "'redis_store': '{}' does not exist",
126
                        redis_config.redis_store
127
                    )
128
0
                })?;
129
0
            let task_change_notify = Arc::new(Notify::new());
130
0
            let store = store
131
0
                .into_inner()
132
0
                .as_any_arc()
133
0
                .downcast::<RedisStore<ConnectionManager, PubSub>>()
134
0
                .map_err(|_| {
135
0
                    make_input_err!(
136
                        "Could not downcast to redis store in RedisAwaitedActionDb::new"
137
                    )
138
0
                })?;
139
0
            let awaited_action_db = StoreAwaitedActionDb::new(
140
0
                store,
141
0
                task_change_notify.clone(),
142
0
                now_fn,
143
                Default::default,
144
            )
145
0
            .err_tip(|| "In state_manager_factory::redis_state_manager")?;
146
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
147
0
                spec,
148
0
                awaited_action_db,
149
0
                task_change_notify,
150
0
                maybe_origin_event_tx.cloned(),
151
0
            );
152
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
153
        }
154
    }
155
0
}
156
157
23
pub fn memory_awaited_action_db_factory<I, NowFn>(
158
23
    mut retain_completed_for_s: u32,
159
23
    task_change_notify: &Arc<Notify>,
160
23
    now_fn: NowFn,
161
23
) -> MemoryAwaitedActionDb<I, NowFn>
162
23
where
163
23
    I: InstantWrapper,
164
23
    NowFn: Fn() -> I + Clone + Send + Sync + 'static,
165
{
166
23
    if retain_completed_for_s == 0 {
  Branch (166:8): [True: 0, False: 0]
  Branch (166:8): [Folded - Ignored]
  Branch (166:8): [True: 1, False: 0]
  Branch (166:8): [True: 22, False: 0]
167
23
        retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
168
23
    
}0
169
23
    MemoryAwaitedActionDb::new(
170
23
        &EvictionPolicy {
171
23
            max_seconds: retain_completed_for_s,
172
23
            ..Default::default()
173
23
        },
174
23
        task_change_notify.clone(),
175
23
        now_fn,
176
    )
177
23
}