Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-scheduler/src/property_modifier_scheduler.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::schedulers::PropertyModification;
20
use nativelink_error::{Error, ResultExt};
21
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
22
use nativelink_util::action_messages::{ActionInfo, OperationId};
23
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
24
use nativelink_util::operation_state_manager::{
25
    ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
26
};
27
use parking_lot::Mutex;
28
29
0
#[derive(MetricsComponent)]
30
pub struct PropertyModifierScheduler {
31
    modifications: Vec<nativelink_config::schedulers::PropertyModification>,
32
    #[metric(group = "scheduler")]
33
    scheduler: Arc<dyn ClientStateManager>,
34
    #[metric(group = "property_manager")]
35
    known_properties: Mutex<HashMap<String, Vec<String>>>,
36
}
37
38
impl PropertyModifierScheduler {
39
8
    pub fn new(
40
8
        config: &nativelink_config::schedulers::PropertyModifierScheduler,
41
8
        scheduler: Arc<dyn ClientStateManager>,
42
8
    ) -> Self {
43
8
        Self {
44
8
            modifications: config.modifications.clone(),
45
8
            scheduler,
46
8
            known_properties: Mutex::new(HashMap::new()),
47
8
        }
48
8
    }
49
50
2
    async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
51
2
        {
52
2
            let known_properties = self.known_properties.lock();
53
2
            if let Some(
property_manager0
) = known_properties.get(instance_name) {
  Branch (53:20): [True: 0, False: 2]
  Branch (53:20): [Folded - Ignored]
54
0
                return Ok(property_manager.clone());
55
2
            }
56
        }
57
2
        let known_platform_property_provider = self
58
2
            .scheduler
59
2
            .as_known_platform_property_provider()
60
2
            .err_tip(|| 
"Inner scheduler does not implement KnownPlatformPropertyProvider for PropertyModifierScheduler"0
)
?0
;
61
2
        let mut known_properties = HashSet::<String>::from_iter(
62
2
            known_platform_property_provider
63
2
                .get_known_properties(instance_name)
64
2
                .await
?0
,
65
        );
66
4
        for 
modification2
in &self.modifications {
67
2
            match modification {
68
2
                PropertyModification::remove(name) => {
69
2
                    known_properties.insert(name.clone());
70
2
                }
71
0
                PropertyModification::add(_) => (),
72
            }
73
        }
74
2
        let final_known_properties: Vec<String> = known_properties.into_iter().collect();
75
2
        self.known_properties
76
2
            .lock()
77
2
            .insert(instance_name.to_string(), final_known_properties.clone());
78
2
79
2
        Ok(final_known_properties)
80
2
    }
81
82
5
    async fn inner_add_action(
83
5
        &self,
84
5
        client_operation_id: OperationId,
85
5
        mut action_info: Arc<ActionInfo>,
86
5
    ) -> Result<Box<dyn ActionStateResult>, Error> {
87
5
        let action_info_mut = Arc::make_mut(&mut action_info);
88
12
        for 
modification7
in &self.modifications {
89
7
            match modification {
90
4
                PropertyModification::add(addition) => action_info_mut
91
4
                    .platform_properties
92
4
                    .insert(addition.name.clone(), addition.value.clone()),
93
3
                PropertyModification::remove(name) => {
94
3
                    action_info_mut.platform_properties.remove(name)
95
                }
96
            };
97
        }
98
5
        self.scheduler
99
5
            .add_action(client_operation_id, action_info)
100
5
            .await
101
5
    }
102
103
1
    async fn inner_filter_operations(
104
1
        &self,
105
1
        filter: OperationFilter,
106
1
    ) -> Result<ActionStateResultStream, Error> {
107
1
        self.scheduler.filter_operations(filter).await
108
1
    }
109
}
110
111
#[async_trait]
112
impl KnownPlatformPropertyProvider for PropertyModifierScheduler {
113
2
    async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
114
2
        self.inner_get_known_properties(instance_name).await
115
4
    }
116
}
117
118
#[async_trait]
119
impl ClientStateManager for PropertyModifierScheduler {
120
    async fn add_action(
121
        &self,
122
        client_operation_id: OperationId,
123
        action_info: Arc<ActionInfo>,
124
5
    ) -> Result<Box<dyn ActionStateResult>, Error> {
125
5
        self.inner_add_action(client_operation_id, action_info)
126
5
            .await
127
10
    }
128
129
    async fn filter_operations<'a>(
130
        &'a self,
131
        filter: OperationFilter,
132
1
    ) -> Result<ActionStateResultStream<'a>, Error> {
133
1
        self.inner_filter_operations(filter).await
134
2
    }
135
136
0
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
137
0
        Some(self)
138
0
    }
139
}
140
141
impl RootMetricsComponent for PropertyModifierScheduler {}