Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/worker_capability_index.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
//! Worker capability index for fast worker matching.
16
//!
17
//! This module provides an index that accelerates worker matching by property.
18
//! Instead of iterating all workers for each action, we maintain an inverted index
19
//! that maps property values to sets of workers that have those values.
20
//!
21
//! ## Complexity Analysis
22
//!
23
//! Without index: O(W × P) where W = workers, P = properties per action
24
//! With index: O(P × log(W)) for exact properties + O(W' × P') for minimum properties
25
//!   where W' = filtered workers, P' = minimum property count (typically small)
26
//!
27
//! For typical workloads (few minimum properties), this reduces matching from
28
//! O(n × m) to approximately O(log n).
29
30
use std::collections::{HashMap, HashSet};
31
32
use nativelink_util::action_messages::WorkerId;
33
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
34
35
/// A property key-value pair used for indexing.
36
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
37
struct PropertyKey {
38
    name: String,
39
    value: PlatformPropertyValue,
40
}
41
42
/// Index structure for fast worker capability lookup.
43
///
44
/// Maintains an inverted index from property values to worker IDs.
45
/// Only indexes `Exact` and `Priority` properties since `Minimum` properties
46
/// are dynamic and require runtime comparison.
47
#[derive(Debug, Default)]
48
pub struct WorkerCapabilityIndex {
49
    /// Maps `(property_name, property_value)` -> Set of worker IDs with that property.
50
    /// Only contains `Exact` and `Priority` properties.
51
    exact_index: HashMap<PropertyKey, HashSet<WorkerId>>,
52
53
    /// Maps `property_name` -> Set of worker IDs that have this property (any value).
54
    /// Used for fast "has property" checks for `Priority` and `Minimum` properties.
55
    property_presence: HashMap<String, HashSet<WorkerId>>,
56
57
    /// Set of all indexed worker IDs.
58
    all_workers: HashSet<WorkerId>,
59
}
60
61
impl WorkerCapabilityIndex {
62
    /// Creates a new empty capability index.
63
37
    pub fn new() -> Self {
64
37
        Self::default()
65
37
    }
66
67
    /// Adds a worker to the index with their platform properties.
68
48
    pub fn add_worker(&mut self, worker_id: &WorkerId, properties: &PlatformProperties) {
69
48
        self.all_workers.insert(worker_id.clone());
70
71
67
        for (
name19
,
value19
) in &properties.properties {
72
            // Track property presence
73
19
            self.property_presence
74
19
                .entry(name.clone())
75
19
                .or_default()
76
19
                .insert(worker_id.clone());
77
78
19
            match value {
79
                PlatformPropertyValue::Exact(_)
80
                | PlatformPropertyValue::Priority(_)
81
12
                | PlatformPropertyValue::Unknown(_) => {
82
12
                    // Index exact-match properties
83
12
                    let key = PropertyKey {
84
12
                        name: name.clone(),
85
12
                        value: value.clone(),
86
12
                    };
87
12
                    self.exact_index
88
12
                        .entry(key)
89
12
                        .or_default()
90
12
                        .insert(worker_id.clone());
91
12
                }
92
7
                PlatformPropertyValue::Minimum(_) => {
93
7
                    // Minimum properties are tracked via property_presence only.
94
7
                    // Their actual values are checked at runtime since they're dynamic.
95
7
                }
96
            }
97
        }
98
48
    }
99
100
    /// Removes a worker from the index.
101
11
    pub fn remove_worker(&mut self, worker_id: &WorkerId) {
102
11
        self.all_workers.remove(worker_id);
103
104
        // Remove from exact index
105
11
        self.exact_index.retain(|_, workers| 
{1
106
1
            workers.remove(worker_id);
107
1
            !workers.is_empty()
108
1
        });
109
110
        // Remove from presence index
111
11
        self.property_presence.retain(|_, workers| 
{1
112
1
            workers.remove(worker_id);
113
1
            !workers.is_empty()
114
1
        });
115
11
    }
116
117
    /// Finds workers that can satisfy the given action properties.
118
    ///
119
    /// Returns a set of worker IDs that match all required properties.
120
    /// The caller should apply additional filtering (e.g., worker availability).
121
    ///
122
    /// IMPORTANT: This method returns candidates based on STATIC properties only.
123
    /// - Exact and Unknown properties are fully matched
124
    /// - Priority properties just require the key to exist
125
    /// - Minimum properties return workers that HAVE the property (presence check only)
126
    ///
127
    /// The caller MUST still verify Minimum property values at runtime because
128
    /// worker resources change dynamically as jobs are assigned/completed.
129
59
    pub fn find_matching_workers(
130
59
        &self,
131
59
        action_properties: &PlatformProperties,
132
59
    ) -> HashSet<WorkerId> {
133
59
        if action_properties.properties.is_empty() {
  Branch (133:12): [True: 41, False: 18]
  Branch (133:12): [Folded - Ignored]
134
            // No properties required, all workers match
135
41
            return self.all_workers.clone();
136
18
        }
137
138
18
        let mut candidates: Option<HashSet<WorkerId>> = None;
139
140
32
        for (
name19
,
value19
) in &action_properties.properties {
141
19
            match value {
142
                PlatformPropertyValue::Exact(_) | PlatformPropertyValue::Unknown(_) => {
143
                    // Look up workers with exact match
144
6
                    let key = PropertyKey {
145
6
                        name: name.clone(),
146
6
                        value: value.clone(),
147
6
                    };
148
149
6
                    let matching = self.exact_index.get(&key).cloned().unwrap_or_default();
150
151
6
                    candidates = Some(match candidates {
152
0
                        Some(existing) => existing.intersection(&matching).cloned().collect(),
153
6
                        None => matching,
154
                    });
155
156
                    // Early exit if no candidates
157
6
                    if candidates.as_ref().is_some_and(HashSet::is_empty) {
  Branch (157:24): [True: 2, False: 4]
  Branch (157:24): [Folded - Ignored]
158
2
                        return HashSet::new();
159
4
                    }
160
                }
161
                PlatformPropertyValue::Priority(_) | PlatformPropertyValue::Minimum(_) => {
162
                    // Priority: just requires the key to exist
163
                    // Minimum: worker must have the property (value checked at runtime by caller)
164
                    // We only check presence here because Minimum values are DYNAMIC -
165
                    // they change as jobs are assigned to workers.
166
13
                    let workers_with_property = self
167
13
                        .property_presence
168
13
                        .get(name)
169
13
                        .cloned()
170
13
                        .unwrap_or_default();
171
172
13
                    candidates = Some(match candidates {
173
1
                        Some(existing) => existing
174
1
                            .intersection(&workers_with_property)
175
1
                            .cloned()
176
1
                            .collect(),
177
12
                        None => workers_with_property,
178
                    });
179
180
13
                    if candidates.as_ref().is_some_and(HashSet::is_empty) {
  Branch (180:24): [True: 3, False: 10]
  Branch (180:24): [Folded - Ignored]
181
3
                        return HashSet::new();
182
10
                    }
183
                }
184
            }
185
        }
186
187
13
        candidates.unwrap_or_else(|| 
self.all_workers0
.
clone0
())
188
59
    }
189
190
    /// Returns the number of indexed workers.
191
2
    pub fn worker_count(&self) -> usize {
192
2
        self.all_workers.len()
193
2
    }
194
195
    /// Returns true if the index is empty.
196
0
    pub fn is_empty(&self) -> bool {
197
0
        self.all_workers.is_empty()
198
0
    }
199
}