Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-util/src/operation_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::SystemTime;
18
19
use async_trait::async_trait;
20
use bitflags::bitflags;
21
use futures::Stream;
22
use nativelink_error::Error;
23
use nativelink_metric::MetricsComponent;
24
25
use crate::action_messages::{
26
    ActionInfo, ActionStage, ActionState, ActionUniqueKey, OperationId, WorkerId,
27
};
28
use crate::common::DigestInfo;
29
use crate::known_platform_property_provider::KnownPlatformPropertyProvider;
30
31
bitflags! {
32
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33
    pub struct OperationStageFlags: u32 {
34
        const CacheCheck = 1 << 1;
35
        const Queued     = 1 << 2;
36
        const Executing  = 1 << 3;
37
        const Completed  = 1 << 4;
38
        const Any        = u32::MAX;
39
    }
40
}
41
42
impl Default for OperationStageFlags {
43
598
    fn default() -> Self {
44
598
        Self::Any
45
598
    }
46
}
47
48
#[async_trait]
49
pub trait ActionStateResult: Send + Sync + 'static {
50
    // Provides the current state of the action.
51
    async fn as_state(&self) -> Result<Arc<ActionState>, Error>;
52
    // Waits for the state of the action to change.
53
    async fn changed(&mut self) -> Result<Arc<ActionState>, Error>;
54
    // Provide result as action info. This behavior will not be supported by all implementations.
55
    async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error>;
56
}
57
58
/// The direction in which the results are ordered.
59
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60
pub enum OrderDirection {
61
    Asc,
62
    Desc,
63
}
64
65
/// The filters used to query operations from the state manager.
66
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
67
pub struct OperationFilter {
68
    // TODO(adams): create rust builder pattern?
69
    /// The stage(s) that the operation must be in.
70
    pub stages: OperationStageFlags,
71
72
    /// The client operation id.
73
    pub client_operation_id: Option<OperationId>,
74
75
    /// The operation id.
76
    pub operation_id: Option<OperationId>,
77
78
    /// The worker that the operation must be assigned to.
79
    pub worker_id: Option<WorkerId>,
80
81
    /// The digest of the action that the operation must have.
82
    pub action_digest: Option<DigestInfo>,
83
84
    /// The operation must have its worker timestamp before this time.
85
    pub worker_update_before: Option<SystemTime>,
86
87
    /// The operation must have been completed before this time.
88
    pub completed_before: Option<SystemTime>,
89
90
    /// The unique key for filtering specific action results.
91
    pub unique_key: Option<ActionUniqueKey>,
92
93
    /// If the results should be ordered by priority and in which direction.
94
    pub order_by_priority_direction: Option<OrderDirection>,
95
}
96
97
pub type ActionStateResultStream<'a> =
98
    Pin<Box<dyn Stream<Item = Box<dyn ActionStateResult>> + Send + 'a>>;
99
100
#[async_trait]
101
pub trait ClientStateManager: Sync + Send + Unpin + MetricsComponent + 'static {
102
    /// Add a new action to the queue or joins an existing action.
103
    async fn add_action(
104
        &self,
105
        client_operation_id: OperationId,
106
        action_info: Arc<ActionInfo>,
107
    ) -> Result<Box<dyn ActionStateResult>, Error>;
108
109
    /// Returns a stream of operations that match the filter.
110
    async fn filter_operations(
111
        &self,
112
        filter: OperationFilter,
113
    ) -> Result<ActionStateResultStream, Error>;
114
115
    /// Returns the known platform property provider for the given instance
116
    /// if this implementation supports it.
117
    // TODO(https://github.com/rust-lang/rust/issues/65991) When this lands we can
118
    // remove this and have the call sites instead try to cast the ClientStateManager
119
    // into a KnownPlatformPropertyProvider instead. Rust currently does not support
120
    // casting traits to other traits.
121
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider>;
122
}
123
124
/// The type of update to perform on an operation.
125
#[derive(Debug, PartialEq)]
126
pub enum UpdateOperationType {
127
    /// Notification that the operation is still alive.
128
    KeepAlive,
129
130
    /// Notification that the operation has been updated.
131
    UpdateWithActionStage(ActionStage),
132
133
    /// Notification that the operation has been completed.
134
    UpdateWithError(Error),
135
}
136
137
#[async_trait]
138
pub trait WorkerStateManager: Sync + Send + MetricsComponent {
139
    /// Update that state of an operation.
140
    /// The worker must also send periodic updates even if the state
141
    /// did not change with a modified timestamp in order to prevent
142
    /// the operation from being considered stale and being rescheduled.
143
    async fn update_operation(
144
        &self,
145
        operation_id: &OperationId,
146
        worker_id: &WorkerId,
147
        update: UpdateOperationType,
148
    ) -> Result<(), Error>;
149
}
150
151
#[async_trait]
152
pub trait MatchingEngineStateManager: Sync + Send + MetricsComponent {
153
    /// Returns a stream of operations that match the filter.
154
    async fn filter_operations<'a>(
155
        &'a self,
156
        filter: OperationFilter,
157
    ) -> Result<ActionStateResultStream<'a>, Error>;
158
159
    /// Assign an operation to a worker or unassign it.
160
    async fn assign_operation(
161
        &self,
162
        operation_id: &OperationId,
163
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
164
    ) -> Result<(), Error>;
165
}