Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-scheduler/src/property_modifier_scheduler.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::schedulers::{PropertyModification, PropertyModifierSpec};
20
use nativelink_error::{Error, ResultExt};
21
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
22
use nativelink_util::action_messages::{ActionInfo, OperationId};
23
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
24
use nativelink_util::operation_state_manager::{
25
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
26
};
27
use parking_lot::Mutex;
28
29
#[derive(MetricsComponent)]
30
pub struct PropertyModifierScheduler {
31
    modifications: Vec<PropertyModification>,
32
    #[metric(group = "scheduler")]
33
    scheduler: Arc<dyn ClientStateManager>,
34
    #[metric(group = "property_manager")]
35
    known_properties: Mutex<HashMap<String, Vec<String>>>,
36
}
37
38
impl std::fmt::Debug for PropertyModifierScheduler {
39
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40
0
        f.debug_struct("PropertyModifierScheduler")
41
0
            .field("modifications", &self.modifications)
42
0
            .field("known_properties", &self.known_properties)
43
0
            .finish_non_exhaustive()
44
0
    }
45
}
46
47
impl PropertyModifierScheduler {
48
8
    pub fn new(spec: &PropertyModifierSpec, scheduler: Arc<dyn ClientStateManager>) -> Self {
49
8
        Self {
50
8
            modifications: spec.modifications.clone(),
51
8
            scheduler,
52
8
            known_properties: Mutex::new(HashMap::new()),
53
8
        }
54
8
    }
55
56
2
    async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
57
2
        {
58
2
            let known_properties = self.known_properties.lock();
59
2
            if let Some(
property_manager0
) = known_properties.get(instance_name) {
  Branch (59:20): [True: 0, False: 2]
  Branch (59:20): [Folded - Ignored]
60
0
                return Ok(property_manager.clone());
61
2
            }
62
        }
63
2
        let known_platform_property_provider = self
64
2
            .scheduler
65
2
            .as_known_platform_property_provider()
66
2
            .err_tip(|| 
"Inner scheduler does not implement KnownPlatformPropertyProvider for PropertyModifierScheduler"0
)
?0
;
67
2
        let mut known_properties = HashSet::<String>::from_iter(
68
2
            known_platform_property_provider
69
2
                .get_known_properties(instance_name)
70
2
                .await
?0
,
71
        );
72
4
        for 
modification2
in &self.modifications {
73
2
            match modification {
74
2
                PropertyModification::Remove(name) => {
75
2
                    known_properties.insert(name.clone());
76
2
                }
77
0
                PropertyModification::Add(_) => (),
78
            }
79
        }
80
2
        let final_known_properties: Vec<String> = known_properties.into_iter().collect();
81
2
        self.known_properties
82
2
            .lock()
83
2
            .insert(instance_name.to_string(), final_known_properties.clone());
84
2
85
2
        Ok(final_known_properties)
86
2
    }
87
88
5
    async fn inner_add_action(
89
5
        &self,
90
5
        client_operation_id: OperationId,
91
5
        mut action_info: Arc<ActionInfo>,
92
5
    ) -> Result<Box<dyn ActionStateResult>, Error> {
93
5
        let action_info_mut = Arc::make_mut(&mut action_info);
94
12
        for 
modification7
in &self.modifications {
95
7
            match modification {
96
4
                PropertyModification::Add(addition) => action_info_mut
97
4
                    .platform_properties
98
4
                    .insert(addition.name.clone(), addition.value.clone()),
99
3
                PropertyModification::Remove(name) => {
100
3
                    action_info_mut.platform_properties.remove(name)
101
                }
102
            };
103
        }
104
5
        self.scheduler
105
5
            .add_action(client_operation_id, action_info)
106
5
            .await
107
5
    }
108
109
1
    async fn inner_filter_operations(
110
1
        &self,
111
1
        filter: OperationFilter,
112
1
    ) -> Result<ActionStateResultStream, Error> {
113
1
        self.scheduler.filter_operations(filter).await
114
1
    }
115
}
116
117
#[async_trait]
118
impl KnownPlatformPropertyProvider for PropertyModifierScheduler {
119
4
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
120
2
        self.inner_get_known_properties(instance_name).await
121
4
    }
122
}
123
124
#[async_trait]
125
impl ClientStateManager for PropertyModifierScheduler {
126
    async fn add_action(
127
        &self,
128
        client_operation_id: OperationId,
129
        action_info: Arc<ActionInfo>,
130
10
    ) -> Result<Box<dyn ActionStateResult>, Error> {
131
5
        self.inner_add_action(client_operation_id, action_info)
132
5
            .await
133
10
    }
134
135
    async fn filter_operations<'a>(
136
        &'a self,
137
        filter: OperationFilter,
138
2
    ) -> Result<ActionStateResultStream<'a>, Error> {
139
1
        self.inner_filter_operations(filter).await
140
2
    }
141
142
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
143
0
        Some(self)
144
0
    }
145
}
146
147
impl RootMetricsComponent for PropertyModifierScheduler {}