Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-config/src/schedulers.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
17
use serde::Deserialize;
18
19
use crate::serde_utils::{convert_duration_with_shellexpand, convert_numeric_with_shellexpand};
20
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};
21
22
#[allow(non_camel_case_types)]
23
0
#[derive(Deserialize, Debug)]
24
pub enum SchedulerConfig {
25
    simple(SimpleScheduler),
26
    grpc(GrpcScheduler),
27
    cache_lookup(CacheLookupScheduler),
28
    property_modifier(PropertyModifierScheduler),
29
}
30
31
/// When the scheduler matches tasks to workers that are capable of running
32
/// the task, this value will be used to determine how the property is treated.
33
#[allow(non_camel_case_types)]
34
0
#[derive(Deserialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
35
pub enum PropertyType {
36
    /// Requires the platform property to be a u64 and when the scheduler looks
37
    /// for appropriate worker nodes that are capable of executing the task,
38
    /// the task will not run on a node that has less than this value.
39
    minimum,
40
41
    /// Requires the platform property to be a string and when the scheduler
42
    /// looks for appropriate worker nodes that are capable of executing the
43
    /// task, the task will not run on a node that does not have this property
44
    /// set to the value with exact string match.
45
    exact,
46
47
    /// Does not restrict on this value and instead will be passed to the worker
48
    /// as an informational piece.
49
    /// TODO(allada) In the future this will be used by the scheduler and worker
50
    /// to cause the scheduler to prefer certain workers over others, but not
51
    /// restrict them based on these values.
52
    priority,
53
}
54
55
/// When a worker is being searched for to run a job, this will be used
56
/// on how to choose which worker should run the job when multiple
57
/// workers are able to run the task.
58
#[allow(non_camel_case_types)]
59
0
#[derive(Copy, Clone, Deserialize, Debug, Default)]
60
pub enum WorkerAllocationStrategy {
61
    /// Prefer workers that have been least recently used to run a job.
62
    #[default]
63
    least_recently_used,
64
    /// Prefer workers that have been most recently used to run a job.
65
    most_recently_used,
66
}
67
68
0
#[derive(Deserialize, Debug, Default)]
69
#[serde(deny_unknown_fields)]
70
pub struct SimpleScheduler {
71
    /// A list of supported platform properties mapped to how these properties
72
    /// are used when the scheduler looks for worker nodes capable of running
73
    /// the task.
74
    ///
75
    /// For example, a value of:
76
    /// ```json
77
    /// { "cpu_count": "minimum", "cpu_arch": "exact" }
78
    /// ```
79
    /// With a job that contains:
80
    /// ```json
81
    /// { "cpu_count": "8", "cpu_arch": "arm" }
82
    /// ```
83
    /// Will result in the scheduler filtering out any workers that do not have
84
    /// "cpu_arch" = "arm" and filter out any workers that have less than 8 cpu
85
    /// cores available.
86
    ///
87
    /// The property names here must match the property keys provided by the
88
    /// worker nodes when they join the pool. In other words, the workers will
89
    /// publish their capabilities to the scheduler when they join the worker
90
    /// pool. If the worker fails to notify the scheduler of its (for example)
91
    /// "cpu_arch", the scheduler will never send any jobs to it, if all jobs
92
    /// have the "cpu_arch" label. There is no special treatment of any platform
93
    /// property labels other and entirely driven by worker configs and this
94
    /// config.
95
    pub supported_platform_properties: Option<HashMap<String, PropertyType>>,
96
97
    /// The amount of time to retain completed actions in memory for in case
98
    /// a WaitExecution is called after the action has completed.
99
    /// Default: 60 (seconds)
100
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
101
    pub retain_completed_for_s: u32,
102
103
    /// Remove workers from pool once the worker has not responded in this
104
    /// amount of time in seconds.
105
    /// Default: 5 (seconds)
106
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
107
    pub worker_timeout_s: u64,
108
109
    /// If a job returns an internal error or times out this many times when
110
    /// attempting to run on a worker the scheduler will return the last error
111
    /// to the client. Jobs will be retried and this configuration is to help
112
    /// prevent one rogue job from infinitely retrying and taking up a lot of
113
    /// resources when the task itself is the one causing the server to go
114
    /// into a bad state.
115
    /// Default: 3
116
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
117
    pub max_job_retries: usize,
118
119
    /// The strategy used to assign workers jobs.
120
    #[serde(default)]
121
    pub allocation_strategy: WorkerAllocationStrategy,
122
123
    /// The storage backend to use for the scheduler.
124
    /// Default: memory
125
    pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
126
}
127
128
#[allow(non_camel_case_types)]
129
0
#[derive(Deserialize, Debug)]
130
pub enum ExperimentalSimpleSchedulerBackend {
131
    /// Use an in-memory store for the scheduler.
132
    memory,
133
    /// Use a redis store for the scheduler.
134
    redis(ExperimentalRedisSchedulerBackend),
135
}
136
137
0
#[derive(Deserialize, Debug, Default)]
138
#[serde(deny_unknown_fields)]
139
pub struct ExperimentalRedisSchedulerBackend {
140
    /// A reference to the redis store to use for the scheduler.
141
    /// Note: This MUST resolve to a RedisStore.
142
    pub redis_store: StoreRefName,
143
}
144
145
/// A scheduler that simply forwards requests to an upstream scheduler.  This
146
/// is useful to use when doing some kind of local action cache or CAS away from
147
/// the main cluster of workers.  In general, it's more efficient to point the
148
/// build at the main scheduler directly though.
149
0
#[derive(Deserialize, Debug)]
150
#[serde(deny_unknown_fields)]
151
pub struct GrpcScheduler {
152
    /// The upstream scheduler to forward requests to.
153
    pub endpoint: GrpcEndpoint,
154
155
    /// Retry configuration to use when a network request fails.
156
    #[serde(default)]
157
    pub retry: Retry,
158
159
    /// Limit the number of simultaneous upstream requests to this many.  A
160
    /// value of zero is treated as unlimited.  If the limit is reached the
161
    /// request is queued.
162
    #[serde(default)]
163
    pub max_concurrent_requests: usize,
164
165
    /// The number of connections to make to each specified endpoint to balance
166
    /// the load over multiple TCP connections.  Default 1.
167
    #[serde(default)]
168
    pub connections_per_endpoint: usize,
169
}
170
171
0
#[derive(Deserialize, Debug)]
172
#[serde(deny_unknown_fields)]
173
pub struct CacheLookupScheduler {
174
    /// The reference to the action cache store used to return cached
175
    /// actions from rather than running them again.
176
    /// To prevent unintended issues, this store should probably be a CompletenessCheckingStore.
177
    pub ac_store: StoreRefName,
178
179
    /// The nested scheduler to use if cache lookup fails.
180
    pub scheduler: Box<SchedulerConfig>,
181
}
182
183
0
#[derive(Deserialize, Debug, Clone)]
184
#[serde(deny_unknown_fields)]
185
pub struct PlatformPropertyAddition {
186
    /// The name of the property to add.
187
    pub name: String,
188
    /// The value to assign to the property.
189
    pub value: String,
190
}
191
192
#[allow(non_camel_case_types)]
193
0
#[derive(Deserialize, Debug, Clone)]
194
pub enum PropertyModification {
195
    /// Add a property to the action properties.
196
    add(PlatformPropertyAddition),
197
    /// Remove a named property from the action.
198
    remove(String),
199
}
200
201
0
#[derive(Deserialize, Debug)]
202
#[serde(deny_unknown_fields)]
203
pub struct PropertyModifierScheduler {
204
    /// A list of modifications to perform to incoming actions for the nested
205
    /// scheduler.  These are performed in order and blindly, so removing a
206
    /// property that doesn't exist is fine and overwriting an existing property
207
    /// is also fine.  If adding properties that do not exist in the nested
208
    /// scheduler is not supported and will likely cause unexpected behaviour.
209
    pub modifications: Vec<PropertyModification>,
210
211
    /// The nested scheduler to use after modifying the properties.
212
    pub scheduler: Box<SchedulerConfig>,
213
}