Coverage Report

Created: 2026-07-01 11:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/property_modifier_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::schedulers::{
20
    PlatformPropertyReplacement, PropertyModification, PropertyModifierSpec,
21
};
22
use nativelink_error::{Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_util::action_messages::{ActionInfo, OperationId};
25
use nativelink_util::operation_state_manager::{
26
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
27
};
28
use parking_lot::Mutex;
29
30
use crate::known_platform_property_provider::KnownPlatformPropertyProvider;
31
32
#[derive(MetricsComponent)]
33
pub struct PropertyModifierScheduler {
34
    modifications: Vec<PropertyModification>,
35
    #[metric(group = "scheduler")]
36
    scheduler: Option<Arc<dyn KnownPlatformPropertyProvider>>,
37
    #[metric(group = "property_manager")]
38
    known_properties: Mutex<HashMap<String, Vec<String>>>,
39
}
40
41
impl core::fmt::Debug for PropertyModifierScheduler {
42
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
43
0
        f.debug_struct("PropertyModifierScheduler")
44
0
            .field("modifications", &self.modifications)
45
0
            .field("known_properties", &self.known_properties)
46
0
            .finish_non_exhaustive()
47
0
    }
48
}
49
50
impl PropertyModifierScheduler {
51
12
    pub fn new(
52
12
        spec: &PropertyModifierSpec,
53
12
        scheduler: Arc<dyn KnownPlatformPropertyProvider>,
54
12
    ) -> Self {
55
12
        Self {
56
12
            modifications: spec.modifications.clone(),
57
12
            scheduler: Some(scheduler),
58
12
            known_properties: Mutex::new(HashMap::new()),
59
12
        }
60
12
    }
61
62
3
    async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
63
        {
64
3
            let known_properties = self.known_properties.lock();
65
3
            if let Some(
property_manager0
) = known_properties.get(instance_name) {
66
0
                return Ok(property_manager.clone());
67
3
            }
68
        }
69
3
        let known_platform_property_provider = self
70
3
            .scheduler
71
3
            .as_ref()
72
3
            .err_tip(|| "Inner scheduler does not implement KnownPlatformPropertyProvider for PropertyModifierScheduler")
?0
;
73
3
        let mut known_properties = HashSet::<String>::from_iter(
74
3
            known_platform_property_provider
75
3
                .get_known_properties(instance_name)
76
3
                .await
?0
,
77
        );
78
3
        for modification in &self.modifications {
79
3
            match modification {
80
2
                PropertyModification::Remove(name)
81
3
                | PropertyModification::Replace(PlatformPropertyReplacement { 
name1
, .. }) => {
82
3
                    known_properties.insert(name.clone());
83
3
                }
84
0
                PropertyModification::Add(_) => (),
85
            }
86
        }
87
3
        let final_known_properties: Vec<String> = known_properties.into_iter().collect();
88
3
        self.known_properties
89
3
            .lock()
90
3
            .insert(instance_name.to_string(), final_known_properties.clone());
91
92
3
        Ok(final_known_properties)
93
3
    }
94
95
8
    async fn inner_add_action(
96
8
        &self,
97
8
        client_operation_id: OperationId,
98
8
        mut action_info: Arc<ActionInfo>,
99
8
    ) -> Result<Box<dyn ActionStateResult>, Error> {
100
8
        let action_info_mut = Arc::make_mut(&mut action_info);
101
10
        for modification in 
&self.modifications8
{
102
10
            match modification {
103
4
                PropertyModification::Add(addition) => {
104
4
                    action_info_mut
105
4
                        .platform_properties
106
4
                        .insert(addition.name.clone(), addition.value.clone());
107
4
                }
108
3
                PropertyModification::Remove(name) => {
109
3
                    action_info_mut.platform_properties.remove(name);
110
3
                }
111
3
                PropertyModification::Replace(replacement) => {
112
3
                    if let Some((existing_name, existing_value)) = action_info_mut
113
3
                        .platform_properties
114
3
                        .remove_entry(&replacement.name)
115
                    {
116
3
                        if replacement
117
3
                            .value
118
3
                            .as_ref()
119
3
                            .is_none_or(|value| 
*value1
==
existing_value1
)
120
3
                        {
121
3
                            action_info_mut.platform_properties.insert(
122
3
                                replacement.new_name.clone(),
123
3
                                replacement.new_value.clone().unwrap_or(existing_value),
124
3
                            );
125
3
                        } else {
126
0
                            action_info_mut
127
0
                                .platform_properties
128
0
                                .insert(existing_name, existing_value);
129
0
                        }
130
0
                    }
131
                }
132
            }
133
        }
134
8
        self.scheduler
135
8
            .as_ref()
136
8
            .err_tip(|| "Inner scheduler not available for PropertyModifierScheduler")
?0
137
8
            .add_action(client_operation_id, action_info)
138
8
            .await
139
8
    }
140
141
1
    async fn inner_filter_operations(
142
1
        &self,
143
1
        filter: OperationFilter,
144
1
    ) -> Result<ActionStateResultStream<'_>, Error> {
145
1
        self.scheduler
146
1
            .as_ref()
147
1
            .err_tip(|| "Inner scheduler not available for PropertyModifierScheduler")
?0
148
1
            .filter_operations(filter)
149
1
            .await
150
1
    }
151
}
152
153
#[async_trait]
154
impl KnownPlatformPropertyProvider for PropertyModifierScheduler {
155
3
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
156
        self.inner_get_known_properties(instance_name).await
157
3
    }
158
}
159
160
#[async_trait]
161
impl ClientStateManager for PropertyModifierScheduler {
162
    async fn add_action(
163
        &self,
164
        client_operation_id: OperationId,
165
        action_info: Arc<ActionInfo>,
166
8
    ) -> Result<Box<dyn ActionStateResult>, Error> {
167
        self.inner_add_action(client_operation_id, action_info)
168
            .await
169
8
    }
170
171
    async fn filter_operations<'a>(
172
        &'a self,
173
        filter: OperationFilter,
174
1
    ) -> Result<ActionStateResultStream<'a>, Error> {
175
        self.inner_filter_operations(filter).await
176
1
    }
177
}
178
179
impl RootMetricsComponent for PropertyModifierScheduler {}