Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use nativelink_config::schedulers::{
19
    ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec,
20
};
21
use nativelink_config::stores::EvictionPolicy;
22
use nativelink_error::{make_input_err, Error, ResultExt};
23
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
24
use nativelink_store::redis_store::RedisStore;
25
use nativelink_store::store_manager::StoreManager;
26
use nativelink_util::instant_wrapper::InstantWrapper;
27
use nativelink_util::operation_state_manager::ClientStateManager;
28
use tokio::sync::{mpsc, Notify};
29
30
use crate::cache_lookup_scheduler::CacheLookupScheduler;
31
use crate::grpc_scheduler::GrpcScheduler;
32
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
33
use crate::property_modifier_scheduler::PropertyModifierScheduler;
34
use crate::simple_scheduler::SimpleScheduler;
35
use crate::store_awaited_action_db::StoreAwaitedActionDb;
36
use crate::worker_scheduler::WorkerScheduler;
37
38
/// Default timeout for recently completed actions in seconds.
39
/// If this changes, remember to change the documentation in the config.
40
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;
41
42
pub type SchedulerFactoryResults = (
43
    Option<Arc<dyn ClientStateManager>>,
44
    Option<Arc<dyn WorkerScheduler>>,
45
);
46
47
0
pub fn scheduler_factory(
48
0
    spec: &SchedulerSpec,
49
0
    store_manager: &StoreManager,
50
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
51
0
) -> Result<SchedulerFactoryResults, Error> {
52
0
    inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx)
53
0
}
54
55
0
fn inner_scheduler_factory(
56
0
    spec: &SchedulerSpec,
57
0
    store_manager: &StoreManager,
58
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
59
0
) -> Result<SchedulerFactoryResults, Error> {
60
0
    let scheduler: SchedulerFactoryResults = match spec {
61
0
        SchedulerSpec::simple(spec) => {
62
0
            simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)?
63
        }
64
0
        SchedulerSpec::grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
65
0
        SchedulerSpec::cache_lookup(spec) => {
66
0
            let ac_store = store_manager
67
0
                .get_store(&spec.ac_store)
68
0
                .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
69
0
            let (action_scheduler, worker_scheduler) =
70
0
                inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
71
0
                    .err_tip(|| "In nested CacheLookupScheduler construction")?;
72
0
            let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
73
0
                ac_store,
74
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
75
0
            )?);
76
0
            (Some(cache_lookup_scheduler), worker_scheduler)
77
        }
78
0
        SchedulerSpec::property_modifier(spec) => {
79
0
            let (action_scheduler, worker_scheduler) =
80
0
                inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
81
0
                    .err_tip(|| "In nested PropertyModifierScheduler construction")?;
82
0
            let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
83
0
                spec,
84
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
85
            ));
86
0
            (Some(property_modifier_scheduler), worker_scheduler)
87
        }
88
    };
89
90
0
    Ok(scheduler)
91
0
}
92
93
0
fn simple_scheduler_factory(
94
0
    spec: &SimpleSpec,
95
0
    store_manager: &StoreManager,
96
0
    now_fn: fn() -> SystemTime,
97
0
    maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
98
0
) -> Result<SchedulerFactoryResults, Error> {
99
0
    match spec
100
0
        .experimental_backend
101
0
        .as_ref()
102
0
        .unwrap_or(&ExperimentalSimpleSchedulerBackend::memory)
103
    {
104
        ExperimentalSimpleSchedulerBackend::memory => {
105
0
            let task_change_notify = Arc::new(Notify::new());
106
0
            let awaited_action_db = memory_awaited_action_db_factory(
107
0
                spec.retain_completed_for_s,
108
0
                &task_change_notify.clone(),
109
0
                SystemTime::now,
110
0
            );
111
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
112
0
                spec,
113
0
                awaited_action_db,
114
0
                task_change_notify,
115
0
                maybe_origin_event_tx.cloned(),
116
0
            );
117
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
118
        }
119
0
        ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
120
0
            let store = store_manager
121
0
                .get_store(redis_config.redis_store.as_ref())
122
0
                .err_tip(|| {
123
0
                    format!(
124
0
                        "'redis_store': '{}' does not exist",
125
0
                        redis_config.redis_store
126
0
                    )
127
0
                })?;
128
0
            let task_change_notify = Arc::new(Notify::new());
129
0
            let store = store
130
0
                .into_inner()
131
0
                .as_any_arc()
132
0
                .downcast::<RedisStore>()
133
0
                .map_err(|_| {
134
0
                    make_input_err!(
135
0
                        "Could not downcast to redis store in RedisAwaitedActionDb::new"
136
0
                    )
137
0
                })?;
138
0
            let awaited_action_db = StoreAwaitedActionDb::new(
139
0
                store,
140
0
                task_change_notify.clone(),
141
0
                now_fn,
142
0
                Default::default,
143
0
            )
144
0
            .err_tip(|| "In state_manager_factory::redis_state_manager")?;
145
0
            let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
146
0
                spec,
147
0
                awaited_action_db,
148
0
                task_change_notify,
149
0
                maybe_origin_event_tx.cloned(),
150
0
            );
151
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
152
        }
153
    }
154
0
}
155
156
20
pub fn memory_awaited_action_db_factory<I, NowFn>(
157
20
    mut retain_completed_for_s: u32,
158
20
    task_change_notify: &Arc<Notify>,
159
20
    now_fn: NowFn,
160
20
) -> MemoryAwaitedActionDb<I, NowFn>
161
20
where
162
20
    I: InstantWrapper,
163
20
    NowFn: Fn() -> I + Clone + Send + Sync + 'static,
164
20
{
165
20
    if retain_completed_for_s == 0 {
  Branch (165:8): [True: 0, False: 0]
  Branch (165:8): [Folded - Ignored]
  Branch (165:8): [True: 20, False: 0]
166
20
        retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
167
20
    
}0
168
20
    MemoryAwaitedActionDb::new(
169
20
        &EvictionPolicy {
170
20
            max_seconds: retain_completed_for_s,
171
20
            ..Default::default()
172
20
        },
173
20
        task_change_notify.clone(),
174
20
        now_fn,
175
20
    )
176
20
}