Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/schedulers.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
#[cfg(feature = "dev-schema")]
18
use schemars::JsonSchema;
19
use serde::{Deserialize, Serialize};
20
21
use crate::serde_utils::{
22
    convert_duration_with_shellexpand, convert_duration_with_shellexpand_and_negative,
23
    convert_numeric_with_shellexpand,
24
};
25
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};
26
27
#[derive(Deserialize, Serialize, Debug)]
28
#[serde(rename_all = "snake_case")]
29
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
30
pub enum SchedulerSpec {
31
    Simple(SimpleSpec),
32
    Grpc(GrpcSpec),
33
    CacheLookup(CacheLookupSpec),
34
    PropertyModifier(PropertyModifierSpec),
35
}
36
37
/// When the scheduler matches tasks to workers that are capable of running
38
/// the task, this value will be used to determine how the property is treated.
39
#[derive(Deserialize, Serialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
40
#[serde(rename_all = "snake_case")]
41
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
42
pub enum PropertyType {
43
    /// Requires the platform property to be a u64 and when the scheduler looks
44
    /// for appropriate worker nodes that are capable of executing the task,
45
    /// the task will not run on a node that has less than this value.
46
    Minimum,
47
48
    /// Requires the platform property to be a string and when the scheduler
49
    /// looks for appropriate worker nodes that are capable of executing the
50
    /// task, the task will not run on a node that does not have this property
51
    /// set to the value with exact string match.
52
    Exact,
53
54
    /// Does not restrict on this value and instead will be passed to the worker
55
    /// as an informational piece.
56
    /// TODO(palfrey) In the future this will be used by the scheduler and worker
57
    /// to cause the scheduler to prefer certain workers over others, but not
58
    /// restrict them based on these values.
59
    Priority,
60
61
    //// Allows jobs to be requested with said key, but without requiring workers
62
    //// to have that key
63
    Ignore,
64
}
65
66
/// When a worker is being searched for to run a job, this will be used
67
/// on how to choose which worker should run the job when multiple
68
/// workers are able to run the task.
69
#[derive(Copy, Clone, Deserialize, Serialize, Debug, Default)]
70
#[serde(rename_all = "snake_case")]
71
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
72
pub enum WorkerAllocationStrategy {
73
    /// Prefer workers that have been least recently used to run a job.
74
    #[default]
75
    LeastRecentlyUsed,
76
    /// Prefer workers that have been most recently used to run a job.
77
    MostRecentlyUsed,
78
}
79
80
// defaults to every 10s
81
8
const fn default_worker_match_logging_interval_s() -> i64 {
82
8
    10
83
8
}
84
85
#[derive(Deserialize, Serialize, Debug, Default)]
86
#[serde(deny_unknown_fields)]
87
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
88
pub struct SimpleSpec {
89
    /// A list of supported platform properties mapped to how these properties
90
    /// are used when the scheduler looks for worker nodes capable of running
91
    /// the task.
92
    ///
93
    /// For example, a value of:
94
    /// ```json
95
    /// { "cpu_count": "minimum", "cpu_arch": "exact" }
96
    /// ```
97
    /// With a job that contains:
98
    /// ```json
99
    /// { "cpu_count": "8", "cpu_arch": "arm" }
100
    /// ```
101
    /// Will result in the scheduler filtering out any workers that do not have
102
    /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu
103
    /// cores available.
104
    ///
105
    /// The property names here must match the property keys provided by the
106
    /// worker nodes when they join the pool. In other words, the workers will
107
    /// publish their capabilities to the scheduler when they join the worker
108
    /// pool. If the worker fails to notify the scheduler of its (for example)
109
    /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs
110
    /// have the `"cpu_arch"` label. There is no special treatment of any platform
111
    /// property labels other and entirely driven by worker configs and this
112
    /// config.
113
    pub supported_platform_properties: Option<HashMap<String, PropertyType>>,
114
115
    /// The amount of time to retain completed actions in memory for in case
116
    /// a `WaitExecution` is called after the action has completed.
117
    /// Default: 60 (seconds)
118
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
119
    pub retain_completed_for_s: u32,
120
121
    /// Mark operations as completed with error if no client has updated them
122
    /// within this duration.
123
    /// Default: 60 (seconds)
124
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
125
    pub client_action_timeout_s: u64,
126
127
    /// Remove workers from pool once the worker has not responded in this
128
    /// amount of time in seconds.
129
    /// Default: 5 (seconds)
130
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
131
    pub worker_timeout_s: u64,
132
133
    /// Maximum time (seconds) an action can stay in Executing state without
134
    /// any worker update before being timed out and re-queued.
135
    /// This applies regardless of worker keepalive status, catching cases
136
    /// where a worker is alive (sending keepalives) but stuck on a specific
137
    /// action. Set to 0 to disable (relies only on `worker_timeout_s`).
138
    ///
139
    /// Default: 0 (disabled)
140
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
141
    pub max_action_executing_timeout_s: u64,
142
143
    /// If a job returns an internal error or times out this many times when
144
    /// attempting to run on a worker the scheduler will return the last error
145
    /// to the client. Jobs will be retried and this configuration is to help
146
    /// prevent one rogue job from infinitely retrying and taking up a lot of
147
    /// resources when the task itself is the one causing the server to go
148
    /// into a bad state.
149
    /// Default: 3
150
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
151
    pub max_job_retries: usize,
152
153
    /// The strategy used to assign workers jobs.
154
    #[serde(default)]
155
    pub allocation_strategy: WorkerAllocationStrategy,
156
157
    /// The storage backend to use for the scheduler.
158
    /// Default: memory
159
    pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
160
161
    /// Every N seconds, do logging of worker matching
162
    /// e.g. "worker busy", "can't find any worker"
163
    /// Defaults to 10s. Can be set to -1 to disable
164
    #[serde(
165
        default = "default_worker_match_logging_interval_s",
166
        deserialize_with = "convert_duration_with_shellexpand_and_negative"
167
    )]
168
    pub worker_match_logging_interval_s: i64,
169
}
170
171
#[derive(Deserialize, Serialize, Debug)]
172
#[serde(rename_all = "snake_case")]
173
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
174
pub enum ExperimentalSimpleSchedulerBackend {
175
    /// Use an in-memory store for the scheduler.
176
    Memory,
177
    /// Use a redis store for the scheduler.
178
    Redis(ExperimentalRedisSchedulerBackend),
179
}
180
181
#[derive(Deserialize, Serialize, Debug, Default)]
182
#[serde(deny_unknown_fields)]
183
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
184
pub struct ExperimentalRedisSchedulerBackend {
185
    /// A reference to the redis store to use for the scheduler.
186
    /// Note: This MUST resolve to a `RedisSpec`.
187
    pub redis_store: StoreRefName,
188
}
189
190
/// A scheduler that simply forwards requests to an upstream scheduler.  This
191
/// is useful to use when doing some kind of local action cache or CAS away from
192
/// the main cluster of workers.  In general, it's more efficient to point the
193
/// build at the main scheduler directly though.
194
#[derive(Deserialize, Serialize, Debug)]
195
#[serde(deny_unknown_fields)]
196
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
197
pub struct GrpcSpec {
198
    /// The upstream scheduler to forward requests to.
199
    pub endpoint: GrpcEndpoint,
200
201
    /// Retry configuration to use when a network request fails.
202
    #[serde(default)]
203
    pub retry: Retry,
204
205
    /// Limit the number of simultaneous upstream requests to this many.  A
206
    /// value of zero is treated as unlimited.  If the limit is reached the
207
    /// request is queued.
208
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
209
    pub max_concurrent_requests: usize,
210
211
    /// The number of connections to make to each specified endpoint to balance
212
    /// the load over multiple TCP connections.  Default 1.
213
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
214
    pub connections_per_endpoint: usize,
215
}
216
217
#[derive(Deserialize, Serialize, Debug)]
218
#[serde(deny_unknown_fields)]
219
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
220
pub struct CacheLookupSpec {
221
    /// The reference to the action cache store used to return cached
222
    /// actions from rather than running them again.
223
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`.
224
    pub ac_store: StoreRefName,
225
226
    /// The nested scheduler to use if cache lookup fails.
227
    pub scheduler: Box<SchedulerSpec>,
228
}
229
230
#[derive(Deserialize, Serialize, Debug, Clone)]
231
#[serde(deny_unknown_fields)]
232
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
233
pub struct PlatformPropertyAddition {
234
    /// The name of the property to add.
235
    pub name: String,
236
    /// The value to assign to the property.
237
    pub value: String,
238
}
239
240
#[derive(Deserialize, Serialize, Debug, Clone)]
241
#[serde(deny_unknown_fields)]
242
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
243
pub struct PlatformPropertyReplacement {
244
    /// The name of the property to replace.
245
    pub name: String,
246
    /// The the value to match against, if unset then any instance matches.
247
    #[serde(default)]
248
    pub value: Option<String>,
249
    /// The new name of the property.
250
    pub new_name: String,
251
    /// The value to assign to the property, if unset will remain the same.
252
    #[serde(default)]
253
    pub new_value: Option<String>,
254
}
255
256
#[derive(Deserialize, Serialize, Debug, Clone)]
257
#[serde(rename_all = "snake_case")]
258
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
259
pub enum PropertyModification {
260
    /// Add a property to the action properties.
261
    Add(PlatformPropertyAddition),
262
    /// Remove a named property from the action.
263
    Remove(String),
264
    /// If a property is found, then replace it with another one.
265
    Replace(PlatformPropertyReplacement),
266
}
267
268
#[derive(Deserialize, Serialize, Debug)]
269
#[serde(deny_unknown_fields)]
270
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
271
pub struct PropertyModifierSpec {
272
    /// A list of modifications to perform to incoming actions for the nested
273
    /// scheduler.  These are performed in order and blindly, so removing a
274
    /// property that doesn't exist is fine and overwriting an existing property
275
    /// is also fine.  If adding properties that do not exist in the nested
276
    /// scheduler is not supported and will likely cause unexpected behaviour.
277
    pub modifications: Vec<PropertyModification>,
278
279
    /// The nested scheduler to use after modifying the properties.
280
    pub scheduler: Box<SchedulerSpec>,
281
}