Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use nativelink_config::schedulers::{
19
    ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec,
20
};
21
use nativelink_config::stores::EvictionPolicy;
22
use nativelink_error::{Error, ResultExt, make_input_err};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_store::redis_store::{RedisStore, StandardRedisManager};
25
use nativelink_store::store_manager::StoreManager;
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::operation_state_manager::ClientStateManager;
28
use redis::aio::ConnectionManager;
29
use tokio::sync::{Notify, mpsc};
30
31
use crate::cache_lookup_scheduler::CacheLookupScheduler;
32
use crate::grpc_scheduler::GrpcScheduler;
33
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
34
use crate::property_modifier_scheduler::PropertyModifierScheduler;
35
use crate::simple_scheduler::SimpleScheduler;
36
use crate::store_awaited_action_db::StoreAwaitedActionDb;
37
use crate::worker_scheduler::WorkerScheduler;
38
39
/// Default timeout for recently completed actions in seconds.
40
/// If this changes, remember to change the documentation in the config.
41
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;
42
43
pub type SchedulerFactoryResults = (
44
    Option<Arc<dyn ClientStateManager>>,
45
    Option<Arc<dyn WorkerScheduler>>,
46
);
47
48
0
pub async fn scheduler_factory(
49
0
    spec: &SchedulerSpec,
50
0
    store_manager: &StoreManager,
51
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
52
0
) -> Result<SchedulerFactoryResults, Error> {
53
    inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx).await
54
}
55
56
0
async fn inner_scheduler_factory(
57
0
    spec: &SchedulerSpec,
58
0
    store_manager: &StoreManager,
59
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
60
0
) -> Result<SchedulerFactoryResults, Error> {
61
0
    let scheduler: SchedulerFactoryResults = match spec {
62
0
        SchedulerSpec::Simple(spec) => {
63
0
            simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)
64
0
                .await?
65
        }
66
0
        SchedulerSpec::Grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
67
0
        SchedulerSpec::CacheLookup(spec) => {
68
0
            let ac_store = store_manager
69
0
                .get_store(&spec.ac_store)
70
0
                .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
71
0
            let (action_scheduler, worker_scheduler) = Box::pin(inner_scheduler_factory(
72
0
                &spec.scheduler,
73
0
                store_manager,
74
0
                maybe_origin_event_tx,
75
0
            ))
76
0
            .await
77
0
            .err_tip(|| "In nested CacheLookupScheduler construction")?;
78
0
            let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
79
0
                ac_store,
80
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
81
0
            )?);
82
0
            (Some(cache_lookup_scheduler), worker_scheduler)
83
        }
84
0
        SchedulerSpec::PropertyModifier(spec) => {
85
0
            let (action_scheduler, worker_scheduler) = Box::pin(inner_scheduler_factory(
86
0
                &spec.scheduler,
87
0
                store_manager,
88
0
                maybe_origin_event_tx,
89
0
            ))
90
0
            .await
91
0
            .err_tip(|| "In nested PropertyModifierScheduler construction")?;
92
0
            let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
93
0
                spec,
94
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
95
            ));
96
0
            (Some(property_modifier_scheduler), worker_scheduler)
97
        }
98
    };
99
100
0
    Ok(scheduler)
101
0
}
102
103
0
async fn simple_scheduler_factory(
104
0
    spec: &SimpleSpec,
105
0
    store_manager: &StoreManager,
106
0
    now_fn: fn() -> SystemTime,
107
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
108
0
) -> Result<SchedulerFactoryResults, Error> {
109
0
    match spec
110
0
        .experimental_backend
111
0
        .as_ref()
112
0
        .unwrap_or(&ExperimentalSimpleSchedulerBackend::Memory)
113
    {
114
        ExperimentalSimpleSchedulerBackend::Memory => {
115
0
            let task_change_notify = Arc::new(Notify::new());
116
0
            let awaited_action_db = memory_awaited_action_db_factory(
117
0
                spec.retain_completed_for_s,
118
0
                &task_change_notify,
119
                SystemTime::now,
120
            );
121
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
122
0
                spec,
123
0
                awaited_action_db,
124
0
                task_change_notify,
125
0
                maybe_origin_event_tx.cloned(),
126
0
            );
127
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
128
        }
129
0
        ExperimentalSimpleSchedulerBackend::Redis(redis_config) => {
130
0
            let store = store_manager
131
0
                .get_store(redis_config.redis_store.as_ref())
132
0
                .err_tip(|| {
133
0
                    format!(
134
                        "'redis_store': '{}' does not exist",
135
                        redis_config.redis_store
136
                    )
137
0
                })?;
138
0
            let task_change_notify = Arc::new(Notify::new());
139
0
            let store = store
140
0
                .into_inner()
141
0
                .as_any_arc()
142
0
                .downcast::<RedisStore<ConnectionManager, StandardRedisManager<ConnectionManager>>>(
143
                )
144
0
                .map_err(|_| {
145
0
                    make_input_err!(
146
                        "Could not downcast to redis store in RedisAwaitedActionDb::new"
147
                    )
148
0
                })?;
149
0
            let awaited_action_db = StoreAwaitedActionDb::new(
150
0
                store,
151
0
                task_change_notify.clone(),
152
0
                now_fn,
153
0
                Default::default,
154
0
            )
155
0
            .await
156
0
            .err_tip(|| "In state_manager_factory::redis_state_manager")?;
157
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
158
0
                spec,
159
0
                awaited_action_db,
160
0
                task_change_notify,
161
0
                maybe_origin_event_tx.cloned(),
162
0
            );
163
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
164
        }
165
    }
166
0
}
167
168
23
pub fn memory_awaited_action_db_factory<I, NowFn>(
169
23
    mut retain_completed_for_s: u32,
170
23
    task_change_notify: &Arc<Notify>,
171
23
    now_fn: NowFn,
172
23
) -> MemoryAwaitedActionDb<I, NowFn>
173
23
where
174
23
    I: InstantWrapper,
175
23
    NowFn: Fn() -> I + Clone + Send + Sync + 'static,
176
{
177
23
    if retain_completed_for_s == 0 {
178
23
        retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
179
23
    
}0
180
23
    MemoryAwaitedActionDb::new(
181
23
        &EvictionPolicy {
182
23
            max_seconds: retain_completed_for_s,
183
23
            ..Default::default()
184
23
        },
185
23
        task_change_notify.clone(),
186
23
        now_fn,
187
    )
188
23
}