Coverage Report

Created: 2024-12-03 16:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::Arc;
16
use std::time::SystemTime;
17
18
use nativelink_config::schedulers::{
19
    ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec,
20
};
21
use nativelink_config::stores::EvictionPolicy;
22
use nativelink_error::{make_input_err, Error, ResultExt};
23
use nativelink_store::redis_store::RedisStore;
24
use nativelink_store::store_manager::StoreManager;
25
use nativelink_util::instant_wrapper::InstantWrapper;
26
use nativelink_util::operation_state_manager::ClientStateManager;
27
use tokio::sync::Notify;
28
29
use crate::cache_lookup_scheduler::CacheLookupScheduler;
30
use crate::grpc_scheduler::GrpcScheduler;
31
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
32
use crate::property_modifier_scheduler::PropertyModifierScheduler;
33
use crate::simple_scheduler::SimpleScheduler;
34
use crate::store_awaited_action_db::StoreAwaitedActionDb;
35
use crate::worker_scheduler::WorkerScheduler;
36
37
/// Default timeout for recently completed actions in seconds.
38
/// If this changes, remember to change the documentation in the config.
39
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;
40
41
pub type SchedulerFactoryResults = (
42
    Option<Arc<dyn ClientStateManager>>,
43
    Option<Arc<dyn WorkerScheduler>>,
44
);
45
46
0
pub fn scheduler_factory(
47
0
    spec: &SchedulerSpec,
48
0
    store_manager: &StoreManager,
49
0
) -> Result<SchedulerFactoryResults, Error> {
50
0
    inner_scheduler_factory(spec, store_manager)
51
0
}
52
53
0
fn inner_scheduler_factory(
54
0
    spec: &SchedulerSpec,
55
0
    store_manager: &StoreManager,
56
0
) -> Result<SchedulerFactoryResults, Error> {
57
0
    let scheduler: SchedulerFactoryResults = match spec {
58
0
        SchedulerSpec::simple(spec) => {
59
0
            simple_scheduler_factory(spec, store_manager, SystemTime::now)?
60
        }
61
0
        SchedulerSpec::grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
62
0
        SchedulerSpec::cache_lookup(spec) => {
63
0
            let ac_store = store_manager
64
0
                .get_store(&spec.ac_store)
65
0
                .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
66
0
            let (action_scheduler, worker_scheduler) =
67
0
                inner_scheduler_factory(&spec.scheduler, store_manager)
68
0
                    .err_tip(|| "In nested CacheLookupScheduler construction")?;
69
0
            let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
70
0
                ac_store,
71
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
72
0
            )?);
73
0
            (Some(cache_lookup_scheduler), worker_scheduler)
74
        }
75
0
        SchedulerSpec::property_modifier(spec) => {
76
0
            let (action_scheduler, worker_scheduler) =
77
0
                inner_scheduler_factory(&spec.scheduler, store_manager)
78
0
                    .err_tip(|| "In nested PropertyModifierScheduler construction")?;
79
0
            let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
80
0
                spec,
81
0
                action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
82
            ));
83
0
            (Some(property_modifier_scheduler), worker_scheduler)
84
        }
85
    };
86
87
0
    Ok(scheduler)
88
0
}
89
90
0
fn simple_scheduler_factory(
91
0
    spec: &SimpleSpec,
92
0
    store_manager: &StoreManager,
93
0
    now_fn: fn() -> SystemTime,
94
0
) -> Result<SchedulerFactoryResults, Error> {
95
0
    match spec
96
0
        .experimental_backend
97
0
        .as_ref()
98
0
        .unwrap_or(&ExperimentalSimpleSchedulerBackend::memory)
99
    {
100
        ExperimentalSimpleSchedulerBackend::memory => {
101
0
            let task_change_notify = Arc::new(Notify::new());
102
0
            let awaited_action_db = memory_awaited_action_db_factory(
103
0
                spec.retain_completed_for_s,
104
0
                &task_change_notify.clone(),
105
0
                SystemTime::now,
106
0
            );
107
0
            let (action_scheduler, worker_scheduler) =
108
0
                SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
109
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
110
        }
111
0
        ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
112
0
            let store = store_manager
113
0
                .get_store(redis_config.redis_store.as_ref())
114
0
                .err_tip(|| {
115
0
                    format!(
116
0
                        "'redis_store': '{}' does not exist",
117
0
                        redis_config.redis_store
118
0
                    )
119
0
                })?;
120
0
            let task_change_notify = Arc::new(Notify::new());
121
0
            let store = store
122
0
                .into_inner()
123
0
                .as_any_arc()
124
0
                .downcast::<RedisStore>()
125
0
                .map_err(|_| {
126
0
                    make_input_err!(
127
0
                        "Could not downcast to redis store in RedisAwaitedActionDb::new"
128
0
                    )
129
0
                })?;
130
0
            let awaited_action_db = StoreAwaitedActionDb::new(
131
0
                store,
132
0
                task_change_notify.clone(),
133
0
                now_fn,
134
0
                Default::default,
135
0
            )
136
0
            .err_tip(|| "In state_manager_factory::redis_state_manager")?;
137
0
            let (action_scheduler, worker_scheduler) =
138
0
                SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
139
0
            Ok((Some(action_scheduler), Some(worker_scheduler)))
140
        }
141
    }
142
0
}
143
144
19
pub fn memory_awaited_action_db_factory<I, NowFn>(
145
19
    mut retain_completed_for_s: u32,
146
19
    task_change_notify: &Arc<Notify>,
147
19
    now_fn: NowFn,
148
19
) -> MemoryAwaitedActionDb<I, NowFn>
149
19
where
150
19
    I: InstantWrapper,
151
19
    NowFn: Fn() -> I + Clone + Send + Sync + 'static,
152
19
{
153
19
    if retain_completed_for_s == 0 {
  Branch (153:8): [True: 0, False: 0]
  Branch (153:8): [Folded - Ignored]
  Branch (153:8): [True: 19, False: 0]
154
19
        retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
155
19
    
}0
156
19
    MemoryAwaitedActionDb::new(
157
19
        &EvictionPolicy {
158
19
            max_seconds: retain_completed_for_s,
159
19
            ..Default::default()
160
19
        },
161
19
        task_change_notify.clone(),
162
19
        now_fn,
163
19
    )
164
19
}