Coverage Report

Created: 2026-02-23 10:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker_capability_index.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
//! Worker capability index for fast worker matching.
16
//!
17
//! This module provides an index that accelerates worker matching by property.
18
//! Instead of iterating all workers for each action, we maintain an inverted index
19
//! that maps property values to sets of workers that have those values.
20
//!
21
//! ## Complexity Analysis
22
//!
23
//! Without index: O(W × P) where W = workers, P = properties per action
24
//! With index: O(P × log(W)) for exact properties + O(W' × P') for minimum properties
25
//!   where W' = filtered workers, P' = minimum property count (typically small)
26
//!
27
//! For typical workloads (few minimum properties), this reduces matching from
28
//! O(n × m) to approximately O(log n).
29
30
use std::collections::{HashMap, HashSet};
31
32
use nativelink_util::action_messages::WorkerId;
33
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
34
use tracing::info;
35
36
/// A property key-value pair used for indexing.
37
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
38
struct PropertyKey {
39
    name: String,
40
    value: PlatformPropertyValue,
41
}
42
43
/// Index structure for fast worker capability lookup.
44
///
45
/// Maintains an inverted index from property values to worker IDs.
46
/// Only indexes `Exact` and `Priority` properties since `Minimum` properties
47
/// are dynamic and require runtime comparison.
48
#[derive(Debug, Default)]
49
pub struct WorkerCapabilityIndex {
50
    /// Maps `(property_name, property_value)` -> Set of worker IDs with that property.
51
    /// Only contains `Exact` and `Priority` properties.
52
    exact_index: HashMap<PropertyKey, HashSet<WorkerId>>,
53
54
    /// Maps `property_name` -> Set of worker IDs that have this property (any value).
55
    /// Used for fast "has property" checks for `Priority` and `Minimum` properties.
56
    property_presence: HashMap<String, HashSet<WorkerId>>,
57
58
    /// Set of all indexed worker IDs.
59
    all_workers: HashSet<WorkerId>,
60
}
61
62
impl WorkerCapabilityIndex {
63
    /// Creates a new empty capability index.
64
42
    pub fn new() -> Self {
65
42
        Self::default()
66
42
    }
67
68
    /// Adds a worker to the index with their platform properties.
69
54
    pub fn add_worker(&mut self, worker_id: &WorkerId, properties: &PlatformProperties) {
70
54
        self.all_workers.insert(worker_id.clone());
71
72
77
        for (
name23
,
value23
) in &properties.properties {
73
            // Track property presence
74
23
            self.property_presence
75
23
                .entry(name.clone())
76
23
                .or_default()
77
23
                .insert(worker_id.clone());
78
79
23
            match value {
80
                PlatformPropertyValue::Exact(_)
81
                | PlatformPropertyValue::Priority(_)
82
15
                | PlatformPropertyValue::Unknown(_) => {
83
15
                    // Index exact-match properties
84
15
                    let key = PropertyKey {
85
15
                        name: name.clone(),
86
15
                        value: value.clone(),
87
15
                    };
88
15
                    self.exact_index
89
15
                        .entry(key)
90
15
                        .or_default()
91
15
                        .insert(worker_id.clone());
92
15
                }
93
8
                PlatformPropertyValue::Minimum(_) | PlatformPropertyValue::Ignore(_) => {
94
8
                    // Minimum properties are tracked via property_presence only.
95
8
                    // Their actual values are checked at runtime since they're dynamic.
96
8
97
8
                    // Ignore properties we just drop
98
8
                }
99
            }
100
        }
101
54
    }
102
103
    /// Removes a worker from the index.
104
11
    pub fn remove_worker(&mut self, worker_id: &WorkerId) {
105
11
        self.all_workers.remove(worker_id);
106
107
        // Remove from exact index
108
11
        self.exact_index.retain(|_, workers| 
{1
109
1
            workers.remove(worker_id);
110
1
            !workers.is_empty()
111
1
        });
112
113
        // Remove from presence index
114
11
        self.property_presence.retain(|_, workers| 
{1
115
1
            workers.remove(worker_id);
116
1
            !workers.is_empty()
117
1
        });
118
11
    }
119
120
    /// Finds workers that can satisfy the given action properties.
121
    ///
122
    /// Returns a set of worker IDs that match all required properties.
123
    /// The caller should apply additional filtering (e.g., worker availability).
124
    ///
125
    /// IMPORTANT: This method returns candidates based on STATIC properties only.
126
    /// - Exact and Unknown properties are fully matched
127
    /// - Priority properties just require the key to exist
128
    /// - Minimum properties return workers that HAVE the property (presence check only)
129
    ///
130
    /// The caller MUST still verify Minimum property values at runtime because
131
    /// worker resources change dynamically as jobs are assigned/completed.
132
55
    pub fn find_matching_workers(
133
55
        &self,
134
55
        action_properties: &PlatformProperties,
135
55
        full_worker_logging: bool,
136
55
    ) -> HashSet<WorkerId> {
137
55
        if self.all_workers.is_empty() {
  Branch (137:12): [True: 2, False: 53]
  Branch (137:12): [Folded - Ignored]
138
2
            if full_worker_logging {
  Branch (138:16): [True: 2, False: 0]
  Branch (138:16): [Folded - Ignored]
139
2
                info!("No workers available to match!");
140
0
            }
141
2
            return HashSet::new();
142
53
        }
143
144
53
        if action_properties.properties.is_empty() {
  Branch (144:12): [True: 34, False: 19]
  Branch (144:12): [Folded - Ignored]
145
            // No properties required, all workers match
146
34
            return self.all_workers.clone();
147
19
        }
148
149
19
        let mut candidates: Option<HashSet<WorkerId>> = None;
150
151
36
        for (
name20
,
value20
) in &action_properties.properties {
152
20
            match value {
153
                PlatformPropertyValue::Exact(_) | PlatformPropertyValue::Unknown(_) => {
154
                    // Look up workers with exact match
155
6
                    let key = PropertyKey {
156
6
                        name: name.clone(),
157
6
                        value: value.clone(),
158
6
                    };
159
160
6
                    let matching = self.exact_index.get(&key).cloned().unwrap_or_default();
161
162
6
                    let internal_candidates = match candidates {
163
0
                        Some(existing) => existing.intersection(&matching).cloned().collect(),
164
6
                        None => matching,
165
                    };
166
167
                    // Early exit if no candidates
168
6
                    if internal_candidates.is_empty() {
  Branch (168:24): [True: 2, False: 4]
  Branch (168:24): [Folded - Ignored]
169
2
                        if full_worker_logging {
  Branch (169:28): [True: 1, False: 1]
  Branch (169:28): [Folded - Ignored]
170
1
                            let values: Vec<_> = self
171
1
                                .exact_index
172
1
                                .iter()
173
1
                                .filter(|pk| &pk.0.name == name)
174
1
                                .map(|pk| pk.0.value.clone())
175
1
                                .collect();
176
1
                            info!(
177
1
                                "No candidate workers due to a lack of matching '{name}' = {value:?}. Workers have: {values:?}"
178
                            );
179
1
                        }
180
2
                        return HashSet::new();
181
4
                    }
182
4
                    candidates = Some(internal_candidates);
183
                }
184
                PlatformPropertyValue::Priority(_) | PlatformPropertyValue::Minimum(_) => {
185
                    // Priority: just requires the key to exist
186
                    // Minimum: worker must have the property (value checked at runtime by caller)
187
                    // We only check presence here because Minimum values are DYNAMIC -
188
                    // they change as jobs are assigned to workers.
189
13
                    let workers_with_property = self
190
13
                        .property_presence
191
13
                        .get(name)
192
13
                        .cloned()
193
13
                        .unwrap_or_default();
194
195
13
                    let internal_candidates = match candidates {
196
1
                        Some(existing) => existing
197
1
                            .intersection(&workers_with_property)
198
1
                            .cloned()
199
1
                            .collect(),
200
12
                        None => workers_with_property,
201
                    };
202
203
13
                    if internal_candidates.is_empty() {
  Branch (203:24): [True: 1, False: 12]
  Branch (203:24): [Folded - Ignored]
204
1
                        if full_worker_logging {
  Branch (204:28): [True: 1, False: 0]
  Branch (204:28): [Folded - Ignored]
205
1
                            info!(
206
1
                                "No candidate workers due to a lack of key '{name}'. Job asked for {value:?}"
207
                            );
208
0
                        }
209
1
                        return HashSet::new();
210
12
                    }
211
12
                    candidates = Some(internal_candidates);
212
                }
213
1
                PlatformPropertyValue::Ignore(_) => {}
214
            }
215
        }
216
217
16
        candidates.unwrap_or_else(|| 
self.all_workers1
.
clone1
())
218
55
    }
219
220
    /// Returns the number of indexed workers.
221
2
    pub fn worker_count(&self) -> usize {
222
2
        self.all_workers.len()
223
2
    }
224
225
    /// Returns true if the index is empty.
226
0
    pub fn is_empty(&self) -> bool {
227
0
        self.all_workers.is_empty()
228
0
    }
229
}