Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig};
19
use nativelink_config::stores::EvictionPolicy;
20
use nativelink_error::{make_input_err, Error, ResultExt};
21
use nativelink_store::redis_store::RedisStore;
22
use nativelink_store::store_manager::StoreManager;
23
use nativelink_util::instant_wrapper::InstantWrapper;
24
use nativelink_util::operation_state_manager::ClientStateManager;
25
use tokio::sync::Notify;
26
27
use crate::cache_lookup_scheduler::CacheLookupScheduler;
28
use crate::grpc_scheduler::GrpcScheduler;
29
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
30
use crate::property_modifier_scheduler::PropertyModifierScheduler;
31
use crate::simple_scheduler::SimpleScheduler;
32
use crate::store_awaited_action_db::StoreAwaitedActionDb;
33
use crate::worker_scheduler::WorkerScheduler;
34
35
/// Default timeout for recently completed actions in seconds.
36
/// If this changes, remember to change the documentation in the config.
37
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;
38
39
pub type SchedulerFactoryResults = (
40
    Option<Arc<dyn ClientStateManager>>,
41
    Option<Arc<dyn WorkerScheduler>>,
42
);
43
44
0
pub fn scheduler_factory(
45
0
    scheduler_type_cfg: &SchedulerConfig,
46
0
    store_manager: &StoreManager,
47
0
) -> Result<SchedulerFactoryResults, Error> {
48
0
    inner_scheduler_factory(scheduler_type_cfg, store_manager)
49
0
}
50
51
0
fn inner_scheduler_factory(
52
0
    scheduler_type_cfg: &SchedulerConfig,
53
0
    store_manager: &StoreManager,
54
0
) -> Result<SchedulerFactoryResults, Error> {
55
0
    let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
56
0
        SchedulerConfig::simple(config) => {
57
0
            simple_scheduler_factory(config, store_manager, SystemTime::now)?
58
        }
59
0
        SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
60
0
        SchedulerConfig::cache_lookup(config) => {
61
0
            let ac_store = store_manager
62
0
                .get_store(&config.ac_store)
63
0
                .err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
64
0
            let (action_scheduler, worker_scheduler) =
65
0
                inner_scheduler_factory(&config.scheduler, store_manager)
66
0
                    .err_tip(|| "In nested CacheLookupScheduler construction")?;
67
0
            let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
68
0
                ac_store,
69
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
70
0
            )?);
71
0
            (Some(cache_lookup_scheduler), worker_scheduler)
72
        }
73
0
        SchedulerConfig::property_modifier(config) => {
74
0
            let (action_scheduler, worker_scheduler) =
75
0
                inner_scheduler_factory(&config.scheduler, store_manager)
76
0
                    .err_tip(|| "In nested PropertyModifierScheduler construction")?;
77
0
            let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
78
0
                config,
79
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
80
            ));
81
0
            (Some(property_modifier_scheduler), worker_scheduler)
82
        }
83
    };
84
85
0
    Ok(scheduler)
86
0
}
87
88
0
fn simple_scheduler_factory(
89
0
    config: &nativelink_config::schedulers::SimpleScheduler,
90
0
    store_manager: &StoreManager,
91
0
    now_fn: fn() -> SystemTime,
92
0
) -> Result<SchedulerFactoryResults, Error> {
93
0
    match config
94
0
        .experimental_backend
95
0
        .as_ref()
96
0
        .unwrap_or(&ExperimentalSimpleSchedulerBackend::memory)
97
    {
98
        ExperimentalSimpleSchedulerBackend::memory => {
99
0
            let task_change_notify = Arc::new(Notify::new());
100
0
            let awaited_action_db = memory_awaited_action_db_factory(
101
0
                config.retain_completed_for_s,
102
0
                &task_change_notify.clone(),
103
0
                SystemTime::now,
104
0
            );
105
0
            let (action_scheduler, worker_scheduler) =
106
0
                SimpleScheduler::new(config, awaited_action_db, task_change_notify);
107
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
108
        }
109
0
        ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
110
0
            let store = store_manager
111
0
                .get_store(redis_config.redis_store.as_ref())
112
0
                .err_tip(|| {
113
0
                    format!(
114
0
                        "'redis_store': '{}' does not exist",
115
0
                        redis_config.redis_store
116
0
                    )
117
0
                })?;
118
0
            let task_change_notify = Arc::new(Notify::new());
119
0
            let store = store
120
0
                .into_inner()
121
0
                .as_any_arc()
122
0
                .downcast::<RedisStore>()
123
0
                .map_err(|_| {
124
0
                    make_input_err!(
125
0
                        "Could not downcast to redis store in RedisAwaitedActionDb::new"
126
0
                    )
127
0
                })?;
128
0
            let awaited_action_db = StoreAwaitedActionDb::new(
129
0
                store,
130
0
                task_change_notify.clone(),
131
0
                now_fn,
132
0
                Default::default,
133
0
            )
134
0
            .err_tip(|| "In state_manager_factory::redis_state_manager")?;
135
0
            let (action_scheduler, worker_scheduler) =
136
0
                SimpleScheduler::new(config, awaited_action_db, task_change_notify);
137
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
138
        }
139
    }
140
0
}
141
142
19
pub fn memory_awaited_action_db_factory<I, NowFn>(
143
19
    mut retain_completed_for_s: u32,
144
19
    task_change_notify: &Arc<Notify>,
145
19
    now_fn: NowFn,
146
19
) -> MemoryAwaitedActionDb<I, NowFn>
147
19
where
148
19
    I: InstantWrapper,
149
19
    NowFn: Fn() -> I + Clone + Send + Sync + 'static,
150
19
{
151
19
    if retain_completed_for_s == 0 {
  Branch (151:8): [True: 0, False: 0]
  Branch (151:8): [Folded - Ignored]
  Branch (151:8): [True: 19, False: 0]
152
19
        retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
153
19
    }
0
154
19
    MemoryAwaitedActionDb::new(
155
19
        &EvictionPolicy {
156
19
            max_seconds: retain_completed_for_s,
157
19
            ..Default::default()
158
19
        },
159
19
        task_change_notify.clone(),
160
19
        now_fn,
161
19
    )
162
19
}