/build/source/nativelink-scheduler/src/default_scheduler_factory.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2024 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use std::sync::Arc;  | 
16  |  | use std::time::SystemTime;  | 
17  |  |  | 
18  |  | use nativelink_config::schedulers::{ | 
19  |  |     ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec,  | 
20  |  | };  | 
21  |  | use nativelink_config::stores::EvictionPolicy;  | 
22  |  | use nativelink_error::{Error, ResultExt, make_input_err}; | 
23  |  | use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;  | 
24  |  | use nativelink_store::redis_store::RedisStore;  | 
25  |  | use nativelink_store::store_manager::StoreManager;  | 
26  |  | use nativelink_util::instant_wrapper::InstantWrapper;  | 
27  |  | use nativelink_util::operation_state_manager::ClientStateManager;  | 
28  |  | use tokio::sync::{Notify, mpsc}; | 
29  |  |  | 
30  |  | use crate::cache_lookup_scheduler::CacheLookupScheduler;  | 
31  |  | use crate::grpc_scheduler::GrpcScheduler;  | 
32  |  | use crate::memory_awaited_action_db::MemoryAwaitedActionDb;  | 
33  |  | use crate::property_modifier_scheduler::PropertyModifierScheduler;  | 
34  |  | use crate::simple_scheduler::SimpleScheduler;  | 
35  |  | use crate::store_awaited_action_db::StoreAwaitedActionDb;  | 
36  |  | use crate::worker_scheduler::WorkerScheduler;  | 
37  |  |  | 
38  |  | /// Default timeout for recently completed actions in seconds.  | 
39  |  | /// If this changes, remember to change the documentation in the config.  | 
40  |  | const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;  | 
41  |  |  | 
42  |  | pub type SchedulerFactoryResults = (  | 
43  |  |     Option<Arc<dyn ClientStateManager>>,  | 
44  |  |     Option<Arc<dyn WorkerScheduler>>,  | 
45  |  | );  | 
46  |  |  | 
47  | 0  | pub fn scheduler_factory(  | 
48  | 0  |     spec: &SchedulerSpec,  | 
49  | 0  |     store_manager: &StoreManager,  | 
50  | 0  |     maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,  | 
51  | 0  | ) -> Result<SchedulerFactoryResults, Error> { | 
52  | 0  |     inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx)  | 
53  | 0  | }  | 
54  |  |  | 
55  | 0  | fn inner_scheduler_factory(  | 
56  | 0  |     spec: &SchedulerSpec,  | 
57  | 0  |     store_manager: &StoreManager,  | 
58  | 0  |     maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,  | 
59  | 0  | ) -> Result<SchedulerFactoryResults, Error> { | 
60  | 0  |     let scheduler: SchedulerFactoryResults = match spec { | 
61  | 0  |         SchedulerSpec::Simple(spec) => { | 
62  | 0  |             simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)?  | 
63  |  |         }  | 
64  | 0  |         SchedulerSpec::Grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),  | 
65  | 0  |         SchedulerSpec::CacheLookup(spec) => { | 
66  | 0  |             let ac_store = store_manager  | 
67  | 0  |                 .get_store(&spec.ac_store)  | 
68  | 0  |                 .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?; | 
69  | 0  |             let (action_scheduler, worker_scheduler) =  | 
70  | 0  |                 inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)  | 
71  | 0  |                     .err_tip(|| "In nested CacheLookupScheduler construction")?;  | 
72  | 0  |             let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(  | 
73  | 0  |                 ac_store,  | 
74  | 0  |                 action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,  | 
75  | 0  |             )?);  | 
76  | 0  |             (Some(cache_lookup_scheduler), worker_scheduler)  | 
77  |  |         }  | 
78  | 0  |         SchedulerSpec::PropertyModifier(spec) => { | 
79  | 0  |             let (action_scheduler, worker_scheduler) =  | 
80  | 0  |                 inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)  | 
81  | 0  |                     .err_tip(|| "In nested PropertyModifierScheduler construction")?;  | 
82  | 0  |             let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(  | 
83  | 0  |                 spec,  | 
84  | 0  |                 action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,  | 
85  |  |             ));  | 
86  | 0  |             (Some(property_modifier_scheduler), worker_scheduler)  | 
87  |  |         }  | 
88  |  |     };  | 
89  |  |  | 
90  | 0  |     Ok(scheduler)  | 
91  | 0  | }  | 
92  |  |  | 
93  | 0  | fn simple_scheduler_factory(  | 
94  | 0  |     spec: &SimpleSpec,  | 
95  | 0  |     store_manager: &StoreManager,  | 
96  | 0  |     now_fn: fn() -> SystemTime,  | 
97  | 0  |     maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,  | 
98  | 0  | ) -> Result<SchedulerFactoryResults, Error> { | 
99  | 0  |     match spec  | 
100  | 0  |         .experimental_backend  | 
101  | 0  |         .as_ref()  | 
102  | 0  |         .unwrap_or(&ExperimentalSimpleSchedulerBackend::Memory)  | 
103  |  |     { | 
104  |  |         ExperimentalSimpleSchedulerBackend::Memory => { | 
105  | 0  |             let task_change_notify = Arc::new(Notify::new());  | 
106  | 0  |             let awaited_action_db = memory_awaited_action_db_factory(  | 
107  | 0  |                 spec.retain_completed_for_s,  | 
108  | 0  |                 &task_change_notify,  | 
109  |  |                 SystemTime::now,  | 
110  |  |             );  | 
111  | 0  |             let (action_scheduler, worker_scheduler) = SimpleScheduler::new(  | 
112  | 0  |                 spec,  | 
113  | 0  |                 awaited_action_db,  | 
114  | 0  |                 task_change_notify,  | 
115  | 0  |                 maybe_origin_event_tx.cloned(),  | 
116  | 0  |             );  | 
117  | 0  |             Ok((Some(action_scheduler), Some(worker_scheduler)))  | 
118  |  |         }  | 
119  | 0  |         ExperimentalSimpleSchedulerBackend::Redis(redis_config) => { | 
120  | 0  |             let store = store_manager  | 
121  | 0  |                 .get_store(redis_config.redis_store.as_ref())  | 
122  | 0  |                 .err_tip(|| { | 
123  | 0  |                     format!(  | 
124  | 0  |                         "'redis_store': '{}' does not exist", | 
125  |  |                         redis_config.redis_store  | 
126  |  |                     )  | 
127  | 0  |                 })?;  | 
128  | 0  |             let task_change_notify = Arc::new(Notify::new());  | 
129  | 0  |             let store = store  | 
130  | 0  |                 .into_inner()  | 
131  | 0  |                 .as_any_arc()  | 
132  | 0  |                 .downcast::<RedisStore>()  | 
133  | 0  |                 .map_err(|_| { | 
134  | 0  |                     make_input_err!(  | 
135  |  |                         "Could not downcast to redis store in RedisAwaitedActionDb::new"  | 
136  |  |                     )  | 
137  | 0  |                 })?;  | 
138  | 0  |             let awaited_action_db = StoreAwaitedActionDb::new(  | 
139  | 0  |                 store,  | 
140  | 0  |                 task_change_notify.clone(),  | 
141  | 0  |                 now_fn,  | 
142  |  |                 Default::default,  | 
143  |  |             )  | 
144  | 0  |             .err_tip(|| "In state_manager_factory::redis_state_manager")?;  | 
145  | 0  |             let (action_scheduler, worker_scheduler) = SimpleScheduler::new(  | 
146  | 0  |                 spec,  | 
147  | 0  |                 awaited_action_db,  | 
148  | 0  |                 task_change_notify,  | 
149  | 0  |                 maybe_origin_event_tx.cloned(),  | 
150  | 0  |             );  | 
151  | 0  |             Ok((Some(action_scheduler), Some(worker_scheduler)))  | 
152  |  |         }  | 
153  |  |     }  | 
154  | 0  | }  | 
155  |  |  | 
156  | 20  | pub fn memory_awaited_action_db_factory<I, NowFn>(  | 
157  | 20  |     mut retain_completed_for_s: u32,  | 
158  | 20  |     task_change_notify: &Arc<Notify>,  | 
159  | 20  |     now_fn: NowFn,  | 
160  | 20  | ) -> MemoryAwaitedActionDb<I, NowFn>  | 
161  | 20  | where  | 
162  | 20  |     I: InstantWrapper,  | 
163  | 20  |     NowFn: Fn() -> I + Clone + Send + Sync + 'static,  | 
164  |  | { | 
165  | 20  |     if retain_completed_for_s == 0 {  Branch (165:8): [True: 0, False: 0]
  Branch (165:8): [Folded - Ignored]
  Branch (165:8): [True: 20, False: 0]
  | 
166  | 20  |         retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;  | 
167  | 20  |     }0   | 
168  | 20  |     MemoryAwaitedActionDb::new(  | 
169  | 20  |         &EvictionPolicy { | 
170  | 20  |             max_seconds: retain_completed_for_s,  | 
171  | 20  |             ..Default::default()  | 
172  | 20  |         },  | 
173  | 20  |         task_change_notify.clone(),  | 
174  | 20  |         now_fn,  | 
175  |  |     )  | 
176  | 20  | }  |