/build/source/nativelink-config/src/schedulers.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | |
17 | | #[cfg(feature = "dev-schema")] |
18 | | use schemars::JsonSchema; |
19 | | use serde::{Deserialize, Serialize}; |
20 | | |
21 | | use crate::serde_utils::{ |
22 | | convert_duration_with_shellexpand, convert_duration_with_shellexpand_and_negative, |
23 | | convert_numeric_with_shellexpand, |
24 | | }; |
25 | | use crate::stores::{GrpcEndpoint, Retry, StoreRefName}; |
26 | | |
27 | | #[derive(Deserialize, Serialize, Debug)] |
28 | | #[serde(rename_all = "snake_case")] |
29 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
30 | | pub enum SchedulerSpec { |
31 | | Simple(SimpleSpec), |
32 | | Grpc(GrpcSpec), |
33 | | CacheLookup(CacheLookupSpec), |
34 | | PropertyModifier(PropertyModifierSpec), |
35 | | } |
36 | | |
37 | | /// When the scheduler matches tasks to workers that are capable of running |
38 | | /// the task, this value will be used to determine how the property is treated. |
39 | | #[derive(Deserialize, Serialize, Debug, Clone, Copy, Hash, Eq, PartialEq)] |
40 | | #[serde(rename_all = "snake_case")] |
41 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
42 | | pub enum PropertyType { |
43 | | /// Requires the platform property to be a u64 and when the scheduler looks |
44 | | /// for appropriate worker nodes that are capable of executing the task, |
45 | | /// the task will not run on a node that has less than this value. |
46 | | Minimum, |
47 | | |
48 | | /// Requires the platform property to be a string and when the scheduler |
49 | | /// looks for appropriate worker nodes that are capable of executing the |
50 | | /// task, the task will not run on a node that does not have this property |
51 | | /// set to the value with exact string match. |
52 | | Exact, |
53 | | |
54 | | /// Does not restrict on this value and instead will be passed to the worker |
55 | | /// as an informational piece. |
56 | | /// TODO(palfrey) In the future this will be used by the scheduler and worker |
57 | | /// to cause the scheduler to prefer certain workers over others, but not |
58 | | /// restrict them based on these values. |
59 | | Priority, |
60 | | |
61 | | //// Allows jobs to be requested with said key, but without requiring workers |
62 | | //// to have that key |
63 | | Ignore, |
64 | | } |
65 | | |
66 | | /// When a worker is being searched for to run a job, this will be used |
67 | | /// on how to choose which worker should run the job when multiple |
68 | | /// workers are able to run the task. |
69 | | #[derive(Copy, Clone, Deserialize, Serialize, Debug, Default)] |
70 | | #[serde(rename_all = "snake_case")] |
71 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
72 | | pub enum WorkerAllocationStrategy { |
73 | | /// Prefer workers that have been least recently used to run a job. |
74 | | #[default] |
75 | | LeastRecentlyUsed, |
76 | | /// Prefer workers that have been most recently used to run a job. |
77 | | MostRecentlyUsed, |
78 | | } |
79 | | |
80 | | // defaults to every 10s |
81 | 8 | const fn default_worker_match_logging_interval_s() -> i64 { |
82 | 8 | 10 |
83 | 8 | } |
84 | | |
85 | | #[derive(Deserialize, Serialize, Debug, Default)] |
86 | | #[serde(deny_unknown_fields)] |
87 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
88 | | pub struct SimpleSpec { |
89 | | /// A list of supported platform properties mapped to how these properties |
90 | | /// are used when the scheduler looks for worker nodes capable of running |
91 | | /// the task. |
92 | | /// |
93 | | /// For example, a value of: |
94 | | /// ```json |
95 | | /// { "cpu_count": "minimum", "cpu_arch": "exact" } |
96 | | /// ``` |
97 | | /// With a job that contains: |
98 | | /// ```json |
99 | | /// { "cpu_count": "8", "cpu_arch": "arm" } |
100 | | /// ``` |
101 | | /// Will result in the scheduler filtering out any workers that do not have |
102 | | /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu |
103 | | /// cores available. |
104 | | /// |
105 | | /// The property names here must match the property keys provided by the |
106 | | /// worker nodes when they join the pool. In other words, the workers will |
107 | | /// publish their capabilities to the scheduler when they join the worker |
108 | | /// pool. If the worker fails to notify the scheduler of its (for example) |
109 | | /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs |
110 | | /// have the `"cpu_arch"` label. There is no special treatment of any platform |
111 | | /// property labels other and entirely driven by worker configs and this |
112 | | /// config. |
113 | | pub supported_platform_properties: Option<HashMap<String, PropertyType>>, |
114 | | |
115 | | /// The amount of time to retain completed actions in memory for in case |
116 | | /// a `WaitExecution` is called after the action has completed. |
117 | | /// Default: 60 (seconds) |
118 | | #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] |
119 | | pub retain_completed_for_s: u32, |
120 | | |
121 | | /// Mark operations as completed with error if no client has updated them |
122 | | /// within this duration. |
123 | | /// Default: 60 (seconds) |
124 | | #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] |
125 | | pub client_action_timeout_s: u64, |
126 | | |
127 | | /// Remove workers from pool once the worker has not responded in this |
128 | | /// amount of time in seconds. |
129 | | /// Default: 5 (seconds) |
130 | | #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] |
131 | | pub worker_timeout_s: u64, |
132 | | |
133 | | /// Maximum time (seconds) an action can stay in Executing state without |
134 | | /// any worker update before being timed out and re-queued. |
135 | | /// This applies regardless of worker keepalive status, catching cases |
136 | | /// where a worker is alive (sending keepalives) but stuck on a specific |
137 | | /// action. Set to 0 to disable (relies only on `worker_timeout_s`). |
138 | | /// |
139 | | /// Default: 0 (disabled) |
140 | | #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] |
141 | | pub max_action_executing_timeout_s: u64, |
142 | | |
143 | | /// If a job returns an internal error or times out this many times when |
144 | | /// attempting to run on a worker the scheduler will return the last error |
145 | | /// to the client. Jobs will be retried and this configuration is to help |
146 | | /// prevent one rogue job from infinitely retrying and taking up a lot of |
147 | | /// resources when the task itself is the one causing the server to go |
148 | | /// into a bad state. |
149 | | /// Default: 3 |
150 | | #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] |
151 | | pub max_job_retries: usize, |
152 | | |
153 | | /// The strategy used to assign workers jobs. |
154 | | #[serde(default)] |
155 | | pub allocation_strategy: WorkerAllocationStrategy, |
156 | | |
157 | | /// The storage backend to use for the scheduler. |
158 | | /// Default: memory |
159 | | pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>, |
160 | | |
161 | | /// Every N seconds, do logging of worker matching |
162 | | /// e.g. "worker busy", "can't find any worker" |
163 | | /// Defaults to 10s. Can be set to -1 to disable |
164 | | #[serde( |
165 | | default = "default_worker_match_logging_interval_s", |
166 | | deserialize_with = "convert_duration_with_shellexpand_and_negative" |
167 | | )] |
168 | | pub worker_match_logging_interval_s: i64, |
169 | | } |
170 | | |
171 | | #[derive(Deserialize, Serialize, Debug)] |
172 | | #[serde(rename_all = "snake_case")] |
173 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
174 | | pub enum ExperimentalSimpleSchedulerBackend { |
175 | | /// Use an in-memory store for the scheduler. |
176 | | Memory, |
177 | | /// Use a redis store for the scheduler. |
178 | | Redis(ExperimentalRedisSchedulerBackend), |
179 | | } |
180 | | |
181 | | #[derive(Deserialize, Serialize, Debug, Default)] |
182 | | #[serde(deny_unknown_fields)] |
183 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
184 | | pub struct ExperimentalRedisSchedulerBackend { |
185 | | /// A reference to the redis store to use for the scheduler. |
186 | | /// Note: This MUST resolve to a `RedisSpec`. |
187 | | pub redis_store: StoreRefName, |
188 | | } |
189 | | |
190 | | /// A scheduler that simply forwards requests to an upstream scheduler. This |
191 | | /// is useful to use when doing some kind of local action cache or CAS away from |
192 | | /// the main cluster of workers. In general, it's more efficient to point the |
193 | | /// build at the main scheduler directly though. |
194 | | #[derive(Deserialize, Serialize, Debug)] |
195 | | #[serde(deny_unknown_fields)] |
196 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
197 | | pub struct GrpcSpec { |
198 | | /// The upstream scheduler to forward requests to. |
199 | | pub endpoint: GrpcEndpoint, |
200 | | |
201 | | /// Retry configuration to use when a network request fails. |
202 | | #[serde(default)] |
203 | | pub retry: Retry, |
204 | | |
205 | | /// Limit the number of simultaneous upstream requests to this many. A |
206 | | /// value of zero is treated as unlimited. If the limit is reached the |
207 | | /// request is queued. |
208 | | #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] |
209 | | pub max_concurrent_requests: usize, |
210 | | |
211 | | /// The number of connections to make to each specified endpoint to balance |
212 | | /// the load over multiple TCP connections. Default 1. |
213 | | #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] |
214 | | pub connections_per_endpoint: usize, |
215 | | } |
216 | | |
217 | | #[derive(Deserialize, Serialize, Debug)] |
218 | | #[serde(deny_unknown_fields)] |
219 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
220 | | pub struct CacheLookupSpec { |
221 | | /// The reference to the action cache store used to return cached |
222 | | /// actions from rather than running them again. |
223 | | /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`. |
224 | | pub ac_store: StoreRefName, |
225 | | |
226 | | /// The nested scheduler to use if cache lookup fails. |
227 | | pub scheduler: Box<SchedulerSpec>, |
228 | | } |
229 | | |
230 | | #[derive(Deserialize, Serialize, Debug, Clone)] |
231 | | #[serde(deny_unknown_fields)] |
232 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
233 | | pub struct PlatformPropertyAddition { |
234 | | /// The name of the property to add. |
235 | | pub name: String, |
236 | | /// The value to assign to the property. |
237 | | pub value: String, |
238 | | } |
239 | | |
240 | | #[derive(Deserialize, Serialize, Debug, Clone)] |
241 | | #[serde(deny_unknown_fields)] |
242 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
243 | | pub struct PlatformPropertyReplacement { |
244 | | /// The name of the property to replace. |
245 | | pub name: String, |
246 | | /// The the value to match against, if unset then any instance matches. |
247 | | #[serde(default)] |
248 | | pub value: Option<String>, |
249 | | /// The new name of the property. |
250 | | pub new_name: String, |
251 | | /// The value to assign to the property, if unset will remain the same. |
252 | | #[serde(default)] |
253 | | pub new_value: Option<String>, |
254 | | } |
255 | | |
256 | | #[derive(Deserialize, Serialize, Debug, Clone)] |
257 | | #[serde(rename_all = "snake_case")] |
258 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
259 | | pub enum PropertyModification { |
260 | | /// Add a property to the action properties. |
261 | | Add(PlatformPropertyAddition), |
262 | | /// Remove a named property from the action. |
263 | | Remove(String), |
264 | | /// If a property is found, then replace it with another one. |
265 | | Replace(PlatformPropertyReplacement), |
266 | | } |
267 | | |
268 | | #[derive(Deserialize, Serialize, Debug)] |
269 | | #[serde(deny_unknown_fields)] |
270 | | #[cfg_attr(feature = "dev-schema", derive(JsonSchema))] |
271 | | pub struct PropertyModifierSpec { |
272 | | /// A list of modifications to perform to incoming actions for the nested |
273 | | /// scheduler. These are performed in order and blindly, so removing a |
274 | | /// property that doesn't exist is fine and overwriting an existing property |
275 | | /// is also fine. If adding properties that do not exist in the nested |
276 | | /// scheduler is not supported and will likely cause unexpected behaviour. |
277 | | pub modifications: Vec<PropertyModification>, |
278 | | |
279 | | /// The nested scheduler to use after modifying the properties. |
280 | | pub scheduler: Box<SchedulerSpec>, |
281 | | } |