Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/property_modifier_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::schedulers::{
20
    PlatformPropertyReplacement, PropertyModification, PropertyModifierSpec,
21
};
22
use nativelink_error::{Error, ResultExt};
23
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
24
use nativelink_util::action_messages::{ActionInfo, OperationId};
25
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
26
use nativelink_util::operation_state_manager::{
27
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
28
};
29
use parking_lot::Mutex;
30
31
#[derive(MetricsComponent)]
32
pub struct PropertyModifierScheduler {
33
    modifications: Vec<PropertyModification>,
34
    #[metric(group = "scheduler")]
35
    scheduler: Arc<dyn ClientStateManager>,
36
    #[metric(group = "property_manager")]
37
    known_properties: Mutex<HashMap<String, Vec<String>>>,
38
}
39
40
impl core::fmt::Debug for PropertyModifierScheduler {
41
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
42
0
        f.debug_struct("PropertyModifierScheduler")
43
0
            .field("modifications", &self.modifications)
44
0
            .field("known_properties", &self.known_properties)
45
0
            .finish_non_exhaustive()
46
0
    }
47
}
48
49
impl PropertyModifierScheduler {
50
12
    pub fn new(spec: &PropertyModifierSpec, scheduler: Arc<dyn ClientStateManager>) -> Self {
51
12
        Self {
52
12
            modifications: spec.modifications.clone(),
53
12
            scheduler,
54
12
            known_properties: Mutex::new(HashMap::new()),
55
12
        }
56
12
    }
57
58
3
    async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
59
        {
60
3
            let known_properties = self.known_properties.lock();
61
3
            if let Some(
property_manager0
) = known_properties.get(instance_name) {
  Branch (61:20): [True: 0, False: 3]
  Branch (61:20): [Folded - Ignored]
62
0
                return Ok(property_manager.clone());
63
3
            }
64
        }
65
3
        let known_platform_property_provider = self
66
3
            .scheduler
67
3
            .as_known_platform_property_provider()
68
3
            .err_tip(|| "Inner scheduler does not implement KnownPlatformPropertyProvider for PropertyModifierScheduler")
?0
;
69
3
        let mut known_properties = HashSet::<String>::from_iter(
70
3
            known_platform_property_provider
71
3
                .get_known_properties(instance_name)
72
3
                .await
?0
,
73
        );
74
6
        for 
modification3
in &self.modifications {
75
3
            match modification {
76
2
                PropertyModification::Remove(name)
77
3
                | PropertyModification::Replace(PlatformPropertyReplacement { 
name1
, .. }) => {
78
3
                    known_properties.insert(name.clone());
79
3
                }
80
0
                PropertyModification::Add(_) => (),
81
            }
82
        }
83
3
        let final_known_properties: Vec<String> = known_properties.into_iter().collect();
84
3
        self.known_properties
85
3
            .lock()
86
3
            .insert(instance_name.to_string(), final_known_properties.clone());
87
88
3
        Ok(final_known_properties)
89
3
    }
90
91
8
    async fn inner_add_action(
92
8
        &self,
93
8
        client_operation_id: OperationId,
94
8
        mut action_info: Arc<ActionInfo>,
95
8
    ) -> Result<Box<dyn ActionStateResult>, Error> {
96
8
        let action_info_mut = Arc::make_mut(&mut action_info);
97
18
        for 
modification10
in &self.modifications {
98
10
            match modification {
99
4
                PropertyModification::Add(addition) => {
100
4
                    action_info_mut
101
4
                        .platform_properties
102
4
                        .insert(addition.name.clone(), addition.value.clone());
103
4
                }
104
3
                PropertyModification::Remove(name) => {
105
3
                    action_info_mut.platform_properties.remove(name);
106
3
                }
107
3
                PropertyModification::Replace(replacement) => {
108
3
                    if let Some((existing_name, existing_value)) = action_info_mut
  Branch (108:28): [True: 3, False: 0]
  Branch (108:28): [Folded - Ignored]
109
3
                        .platform_properties
110
3
                        .remove_entry(&replacement.name)
111
                    {
112
3
                        if replacement
  Branch (112:28): [True: 3, False: 0]
  Branch (112:28): [Folded - Ignored]
113
3
                            .value
114
3
                            .as_ref()
115
3
                            .is_none_or(|value| 
*value1
==
existing_value1
)
116
3
                        {
117
3
                            action_info_mut.platform_properties.insert(
118
3
                                replacement.new_name.clone(),
119
3
                                replacement.new_value.clone().unwrap_or(existing_value),
120
3
                            );
121
3
                        } else {
122
0
                            action_info_mut
123
0
                                .platform_properties
124
0
                                .insert(existing_name, existing_value);
125
0
                        }
126
0
                    }
127
                }
128
            }
129
        }
130
8
        self.scheduler
131
8
            .add_action(client_operation_id, action_info)
132
8
            .await
133
8
    }
134
135
1
    async fn inner_filter_operations(
136
1
        &self,
137
1
        filter: OperationFilter,
138
1
    ) -> Result<ActionStateResultStream<'_>, Error> {
139
1
        self.scheduler.filter_operations(filter).await
140
1
    }
141
}
142
143
#[async_trait]
144
impl KnownPlatformPropertyProvider for PropertyModifierScheduler {
145
3
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
146
        self.inner_get_known_properties(instance_name).await
147
3
    }
148
}
149
150
#[async_trait]
151
impl ClientStateManager for PropertyModifierScheduler {
152
    async fn add_action(
153
        &self,
154
        client_operation_id: OperationId,
155
        action_info: Arc<ActionInfo>,
156
8
    ) -> Result<Box<dyn ActionStateResult>, Error> {
157
        self.inner_add_action(client_operation_id, action_info)
158
            .await
159
8
    }
160
161
    async fn filter_operations<'a>(
162
        &'a self,
163
        filter: OperationFilter,
164
1
    ) -> Result<ActionStateResultStream<'a>, Error> {
165
        self.inner_filter_operations(filter).await
166
1
    }
167
168
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
169
0
        Some(self)
170
0
    }
171
}
172
173
impl RootMetricsComponent for PropertyModifierScheduler {}