Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/schedulers.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use serde::{Deserialize, Serialize};
18
19
use crate::serde_utils::{
20
    convert_duration_with_shellexpand, convert_duration_with_shellexpand_and_negative,
21
    convert_numeric_with_shellexpand,
22
};
23
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};
24
25
#[derive(Deserialize, Serialize, Debug)]
26
#[serde(rename_all = "snake_case")]
27
pub enum SchedulerSpec {
28
    Simple(SimpleSpec),
29
    Grpc(GrpcSpec),
30
    CacheLookup(CacheLookupSpec),
31
    PropertyModifier(PropertyModifierSpec),
32
}
33
34
/// When the scheduler matches tasks to workers that are capable of running
35
/// the task, this value will be used to determine how the property is treated.
36
#[derive(Deserialize, Serialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
37
#[serde(rename_all = "snake_case")]
38
pub enum PropertyType {
39
    /// Requires the platform property to be a u64 and when the scheduler looks
40
    /// for appropriate worker nodes that are capable of executing the task,
41
    /// the task will not run on a node that has less than this value.
42
    Minimum,
43
44
    /// Requires the platform property to be a string and when the scheduler
45
    /// looks for appropriate worker nodes that are capable of executing the
46
    /// task, the task will not run on a node that does not have this property
47
    /// set to the value with exact string match.
48
    Exact,
49
50
    /// Does not restrict on this value and instead will be passed to the worker
51
    /// as an informational piece.
52
    /// TODO(palfrey) In the future this will be used by the scheduler and worker
53
    /// to cause the scheduler to prefer certain workers over others, but not
54
    /// restrict them based on these values.
55
    Priority,
56
}
57
58
/// When a worker is being searched for to run a job, this will be used
59
/// on how to choose which worker should run the job when multiple
60
/// workers are able to run the task.
61
#[derive(Copy, Clone, Deserialize, Serialize, Debug, Default)]
62
#[serde(rename_all = "snake_case")]
63
pub enum WorkerAllocationStrategy {
64
    /// Prefer workers that have been least recently used to run a job.
65
    #[default]
66
    LeastRecentlyUsed,
67
    /// Prefer workers that have been most recently used to run a job.
68
    MostRecentlyUsed,
69
}
70
71
// defaults to every 10s
72
6
const fn default_worker_match_logging_interval_s() -> i64 {
73
6
    10
74
6
}
75
76
#[derive(Deserialize, Serialize, Debug, Default)]
77
#[serde(deny_unknown_fields)]
78
pub struct SimpleSpec {
79
    /// A list of supported platform properties mapped to how these properties
80
    /// are used when the scheduler looks for worker nodes capable of running
81
    /// the task.
82
    ///
83
    /// For example, a value of:
84
    /// ```json
85
    /// { "cpu_count": "minimum", "cpu_arch": "exact" }
86
    /// ```
87
    /// With a job that contains:
88
    /// ```json
89
    /// { "cpu_count": "8", "cpu_arch": "arm" }
90
    /// ```
91
    /// Will result in the scheduler filtering out any workers that do not have
92
    /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu
93
    /// cores available.
94
    ///
95
    /// The property names here must match the property keys provided by the
96
    /// worker nodes when they join the pool. In other words, the workers will
97
    /// publish their capabilities to the scheduler when they join the worker
98
    /// pool. If the worker fails to notify the scheduler of its (for example)
99
    /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs
100
    /// have the `"cpu_arch"` label. There is no special treatment of any platform
101
    /// property labels other and entirely driven by worker configs and this
102
    /// config.
103
    pub supported_platform_properties: Option<HashMap<String, PropertyType>>,
104
105
    /// The amount of time to retain completed actions in memory for in case
106
    /// a `WaitExecution` is called after the action has completed.
107
    /// Default: 60 (seconds)
108
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
109
    pub retain_completed_for_s: u32,
110
111
    /// Mark operations as completed with error if no client has updated them
112
    /// within this duration.
113
    /// Default: 60 (seconds)
114
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
115
    pub client_action_timeout_s: u64,
116
117
    /// Remove workers from pool once the worker has not responded in this
118
    /// amount of time in seconds.
119
    /// Default: 5 (seconds)
120
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
121
    pub worker_timeout_s: u64,
122
123
    /// If a job returns an internal error or times out this many times when
124
    /// attempting to run on a worker the scheduler will return the last error
125
    /// to the client. Jobs will be retried and this configuration is to help
126
    /// prevent one rogue job from infinitely retrying and taking up a lot of
127
    /// resources when the task itself is the one causing the server to go
128
    /// into a bad state.
129
    /// Default: 3
130
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
131
    pub max_job_retries: usize,
132
133
    /// The strategy used to assign workers jobs.
134
    #[serde(default)]
135
    pub allocation_strategy: WorkerAllocationStrategy,
136
137
    /// The storage backend to use for the scheduler.
138
    /// Default: memory
139
    pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
140
141
    /// Every N seconds, do logging of worker matching
142
    /// e.g. "worker busy", "can't find any worker"
143
    /// Defaults to 10s. Can be set to -1 to disable
144
    #[serde(
145
        default = "default_worker_match_logging_interval_s",
146
        deserialize_with = "convert_duration_with_shellexpand_and_negative"
147
    )]
148
    pub worker_match_logging_interval_s: i64,
149
}
150
151
#[derive(Deserialize, Serialize, Debug)]
152
#[serde(rename_all = "snake_case")]
153
pub enum ExperimentalSimpleSchedulerBackend {
154
    /// Use an in-memory store for the scheduler.
155
    Memory,
156
    /// Use a redis store for the scheduler.
157
    Redis(ExperimentalRedisSchedulerBackend),
158
}
159
160
#[derive(Deserialize, Serialize, Debug, Default)]
161
#[serde(deny_unknown_fields)]
162
pub struct ExperimentalRedisSchedulerBackend {
163
    /// A reference to the redis store to use for the scheduler.
164
    /// Note: This MUST resolve to a `RedisSpec`.
165
    pub redis_store: StoreRefName,
166
}
167
168
/// A scheduler that simply forwards requests to an upstream scheduler.  This
169
/// is useful to use when doing some kind of local action cache or CAS away from
170
/// the main cluster of workers.  In general, it's more efficient to point the
171
/// build at the main scheduler directly though.
172
#[derive(Deserialize, Serialize, Debug)]
173
#[serde(deny_unknown_fields)]
174
pub struct GrpcSpec {
175
    /// The upstream scheduler to forward requests to.
176
    pub endpoint: GrpcEndpoint,
177
178
    /// Retry configuration to use when a network request fails.
179
    #[serde(default)]
180
    pub retry: Retry,
181
182
    /// Limit the number of simultaneous upstream requests to this many.  A
183
    /// value of zero is treated as unlimited.  If the limit is reached the
184
    /// request is queued.
185
    #[serde(default)]
186
    pub max_concurrent_requests: usize,
187
188
    /// The number of connections to make to each specified endpoint to balance
189
    /// the load over multiple TCP connections.  Default 1.
190
    #[serde(default)]
191
    pub connections_per_endpoint: usize,
192
}
193
194
#[derive(Deserialize, Serialize, Debug)]
195
#[serde(deny_unknown_fields)]
196
pub struct CacheLookupSpec {
197
    /// The reference to the action cache store used to return cached
198
    /// actions from rather than running them again.
199
    /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`.
200
    pub ac_store: StoreRefName,
201
202
    /// The nested scheduler to use if cache lookup fails.
203
    pub scheduler: Box<SchedulerSpec>,
204
}
205
206
#[derive(Deserialize, Serialize, Debug, Clone)]
207
#[serde(deny_unknown_fields)]
208
pub struct PlatformPropertyAddition {
209
    /// The name of the property to add.
210
    pub name: String,
211
    /// The value to assign to the property.
212
    pub value: String,
213
}
214
215
#[derive(Deserialize, Serialize, Debug, Clone)]
216
#[serde(deny_unknown_fields)]
217
pub struct PlatformPropertyReplacement {
218
    /// The name of the property to replace.
219
    pub name: String,
220
    /// The the value to match against, if unset then any instance matches.
221
    #[serde(default)]
222
    pub value: Option<String>,
223
    /// The new name of the property.
224
    pub new_name: String,
225
    /// The value to assign to the property, if unset will remain the same.
226
    #[serde(default)]
227
    pub new_value: Option<String>,
228
}
229
230
#[derive(Deserialize, Serialize, Debug, Clone)]
231
#[serde(rename_all = "snake_case")]
232
pub enum PropertyModification {
233
    /// Add a property to the action properties.
234
    Add(PlatformPropertyAddition),
235
    /// Remove a named property from the action.
236
    Remove(String),
237
    /// If a property is found, then replace it with another one.
238
    Replace(PlatformPropertyReplacement),
239
}
240
241
#[derive(Deserialize, Serialize, Debug)]
242
#[serde(deny_unknown_fields)]
243
pub struct PropertyModifierSpec {
244
    /// A list of modifications to perform to incoming actions for the nested
245
    /// scheduler.  These are performed in order and blindly, so removing a
246
    /// property that doesn't exist is fine and overwriting an existing property
247
    /// is also fine.  If adding properties that do not exist in the nested
248
    /// scheduler is not supported and will likely cause unexpected behaviour.
249
    pub modifications: Vec<PropertyModification>,
250
251
    /// The nested scheduler to use after modifying the properties.
252
    pub scheduler: Box<SchedulerSpec>,
253
}