Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/schedulers.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use serde::{Deserialize, Serialize};
18
19
use crate::serde_utils::{
20
    convert_duration_with_shellexpand, convert_duration_with_shellexpand_and_negative,
21
    convert_numeric_with_shellexpand,
22
};
23
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};
24
25
#[derive(Deserialize, Serialize, Debug)]
26
#[serde(rename_all = "snake_case")]
27
pub enum SchedulerSpec {
28
    Simple(SimpleSpec),
29
    Grpc(GrpcSpec),
30
    CacheLookup(CacheLookupSpec),
31
    PropertyModifier(PropertyModifierSpec),
32
}
33
34
/// When the scheduler matches tasks to workers that are capable of running
35
/// the task, this value will be used to determine how the property is treated.
36
#[derive(Deserialize, Serialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
37
#[serde(rename_all = "snake_case")]
38
pub enum PropertyType {
39
    /// Requires the platform property to be a u64 and when the scheduler looks
40
    /// for appropriate worker nodes that are capable of executing the task,
41
    /// the task will not run on a node that has less than this value.
42
    Minimum,
43
44
    /// Requires the platform property to be a string and when the scheduler
45
    /// looks for appropriate worker nodes that are capable of executing the
46
    /// task, the task will not run on a node that does not have this property
47
    /// set to the value with exact string match.
48
    Exact,
49
50
    /// Does not restrict on this value and instead will be passed to the worker
51
    /// as an informational piece.
52
    /// TODO(palfrey) In the future this will be used by the scheduler and worker
53
    /// to cause the scheduler to prefer certain workers over others, but not
54
    /// restrict them based on these values.
55
    Priority,
56
57
    //// Allows jobs to be requested with said key, but without requiring workers
58
    //// to have that key
59
    Ignore,
60
}
61
62
/// When a worker is being searched for to run a job, this will be used
63
/// on how to choose which worker should run the job when multiple
64
/// workers are able to run the task.
65
#[derive(Copy, Clone, Deserialize, Serialize, Debug, Default)]
66
#[serde(rename_all = "snake_case")]
67
pub enum WorkerAllocationStrategy {
68
    /// Prefer workers that have been least recently used to run a job.
69
    #[default]
70
    LeastRecentlyUsed,
71
    /// Prefer workers that have been most recently used to run a job.
72
    MostRecentlyUsed,
73
}
74
75
// defaults to every 10s
76
6
const fn default_worker_match_logging_interval_s() -> i64 {
77
6
    10
78
6
}
79
80
#[derive(Deserialize, Serialize, Debug, Default)]
81
#[serde(deny_unknown_fields)]
82
pub struct SimpleSpec {
83
    /// A list of supported platform properties mapped to how these properties
84
    /// are used when the scheduler looks for worker nodes capable of running
85
    /// the task.
86
    ///
87
    /// For example, a value of:
88
    /// ```json
89
    /// { "cpu_count": "minimum", "cpu_arch": "exact" }
90
    /// ```
91
    /// With a job that contains:
92
    /// ```json
93
    /// { "cpu_count": "8", "cpu_arch": "arm" }
94
    /// ```
95
    /// Will result in the scheduler filtering out any workers that do not have
96
    /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu
97
    /// cores available.
98
    ///
99
    /// The property names here must match the property keys provided by the
100
    /// worker nodes when they join the pool. In other words, the workers will
101
    /// publish their capabilities to the scheduler when they join the worker
102
    /// pool. If the worker fails to notify the scheduler of its (for example)
103
    /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs
104
    /// have the `"cpu_arch"` label. There is no special treatment of any platform
105
    /// property labels other and entirely driven by worker configs and this
106
    /// config.
107
    pub supported_platform_properties: Option<HashMap<String, PropertyType>>,
108
109
    /// The amount of time to retain completed actions in memory for in case
110
    /// a `WaitExecution` is called after the action has completed.
111
    /// Default: 60 (seconds)
112
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
113
    pub retain_completed_for_s: u32,
114
115
    /// Mark operations as completed with error if no client has updated them
116
    /// within this duration.
117
    /// Default: 60 (seconds)
118
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
119
    pub client_action_timeout_s: u64,
120
121
    /// Remove workers from pool once the worker has not responded in this
122
    /// amount of time in seconds.
123
    /// Default: 5 (seconds)
124
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
125
    pub worker_timeout_s: u64,
126
127
    /// Maximum time (seconds) an action can stay in Executing state without
128
    /// any worker update before being timed out and re-queued.
129
    /// This applies regardless of worker keepalive status, catching cases
130
    /// where a worker is alive (sending keepalives) but stuck on a specific
131
    /// action. Set to 0 to disable (relies only on `worker_timeout_s`).
132
    ///
133
    /// Default: 0 (disabled)
134
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
135
    pub max_action_executing_timeout_s: u64,
136
137
    /// If a job returns an internal error or times out this many times when
138
    /// attempting to run on a worker the scheduler will return the last error
139
    /// to the client. Jobs will be retried and this configuration is to help
140
    /// prevent one rogue job from infinitely retrying and taking up a lot of
141
    /// resources when the task itself is the one causing the server to go
142
    /// into a bad state.
143
    /// Default: 3
144
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
145
    pub max_job_retries: usize,
146
147
    /// The strategy used to assign workers jobs.
148
    #[serde(default)]
149
    pub allocation_strategy: WorkerAllocationStrategy,
150
151
    /// The storage backend to use for the scheduler.
152
    /// Default: memory
153
    pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
154
155
    /// Every N seconds, do logging of worker matching
156
    /// e.g. "worker busy", "can't find any worker"
157
    /// Defaults to 10s. Can be set to -1 to disable
158
    #[serde(
159
        default = "default_worker_match_logging_interval_s",
160
        deserialize_with = "convert_duration_with_shellexpand_and_negative"
161
    )]
162
    pub worker_match_logging_interval_s: i64,
163
}
164
165
#[derive(Deserialize, Serialize, Debug)]
166
#[serde(rename_all = "snake_case")]
167
pub enum ExperimentalSimpleSchedulerBackend {
168
    /// Use an in-memory store for the scheduler.
169
    Memory,
170
    /// Use a redis store for the scheduler.
171
    Redis(ExperimentalRedisSchedulerBackend),
172
}
173
174
#[derive(Deserialize, Serialize, Debug, Default)]
175
#[serde(deny_unknown_fields)]
176
pub struct ExperimentalRedisSchedulerBackend {
177
    /// A reference to the redis store to use for the scheduler.
178
    /// Note: This MUST resolve to a `RedisSpec`.
179
    pub redis_store: StoreRefName,
180
}
181
182
/// A scheduler that simply forwards requests to an upstream scheduler.  This
183
/// is useful to use when doing some kind of local action cache or CAS away from
184
/// the main cluster of workers.  In general, it's more efficient to point the
185
/// build at the main scheduler directly though.
186
#[derive(Deserialize, Serialize, Debug)]
187
#[serde(deny_unknown_fields)]
188
pub struct GrpcSpec {
189
    /// The upstream scheduler to forward requests to.
190
    pub endpoint: GrpcEndpoint,
191
192
    /// Retry configuration to use when a network request fails.
193
    #[serde(default)]
194
    pub retry: Retry,
195
196
    /// Limit the number of simultaneous upstream requests to this many.  A
197
    /// value of zero is treated as unlimited.  If the limit is reached the
198
    /// request is queued.
199
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
200
    pub max_concurrent_requests: usize,
201
202
    /// The number of connections to make to each specified endpoint to balance
203
    /// the load over multiple TCP connections.  Default 1.
204
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
205
    pub connections_per_endpoint: usize,
206
}
207
208
#[derive(Deserialize, Serialize, Debug)]
209
#[serde(deny_unknown_fields)]
210
pub struct CacheLookupSpec {
211
    /// The reference to the action cache store used to return cached
212
    /// actions from rather than running them again.
213
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`.
214
    pub ac_store: StoreRefName,
215
216
    /// The nested scheduler to use if cache lookup fails.
217
    pub scheduler: Box<SchedulerSpec>,
218
}
219
220
#[derive(Deserialize, Serialize, Debug, Clone)]
221
#[serde(deny_unknown_fields)]
222
pub struct PlatformPropertyAddition {
223
    /// The name of the property to add.
224
    pub name: String,
225
    /// The value to assign to the property.
226
    pub value: String,
227
}
228
229
#[derive(Deserialize, Serialize, Debug, Clone)]
230
#[serde(deny_unknown_fields)]
231
pub struct PlatformPropertyReplacement {
232
    /// The name of the property to replace.
233
    pub name: String,
234
    /// The the value to match against, if unset then any instance matches.
235
    #[serde(default)]
236
    pub value: Option<String>,
237
    /// The new name of the property.
238
    pub new_name: String,
239
    /// The value to assign to the property, if unset will remain the same.
240
    #[serde(default)]
241
    pub new_value: Option<String>,
242
}
243
244
#[derive(Deserialize, Serialize, Debug, Clone)]
245
#[serde(rename_all = "snake_case")]
246
pub enum PropertyModification {
247
    /// Add a property to the action properties.
248
    Add(PlatformPropertyAddition),
249
    /// Remove a named property from the action.
250
    Remove(String),
251
    /// If a property is found, then replace it with another one.
252
    Replace(PlatformPropertyReplacement),
253
}
254
255
#[derive(Deserialize, Serialize, Debug)]
256
#[serde(deny_unknown_fields)]
257
pub struct PropertyModifierSpec {
258
    /// A list of modifications to perform to incoming actions for the nested
259
    /// scheduler.  These are performed in order and blindly, so removing a
260
    /// property that doesn't exist is fine and overwriting an existing property
261
    /// is also fine.  If adding properties that do not exist in the nested
262
    /// scheduler is not supported and will likely cause unexpected behaviour.
263
    pub modifications: Vec<PropertyModification>,
264
265
    /// The nested scheduler to use after modifying the properties.
266
    pub scheduler: Box<SchedulerSpec>,
267
}