Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/operation_state_manager.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::SystemTime;
18
19
use async_trait::async_trait;
20
use bitflags::bitflags;
21
use futures::Stream;
22
use nativelink_error::Error;
23
use nativelink_metric::MetricsComponent;
24
25
use crate::action_messages::{
26
    ActionInfo, ActionStage, ActionState, ActionUniqueKey, OperationId, WorkerId,
27
};
28
use crate::common::DigestInfo;
29
use crate::known_platform_property_provider::KnownPlatformPropertyProvider;
30
use crate::origin_event::OriginMetadata;
31
32
bitflags! {
33
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34
    pub struct OperationStageFlags: u32 {
35
        const CacheCheck = 1 << 1;
36
        const Queued     = 1 << 2;
37
        const Executing  = 1 << 3;
38
        const Completed  = 1 << 4;
39
        const Any        = u32::MAX;
40
    }
41
}
42
43
impl Default for OperationStageFlags {
44
601
    fn default() -> Self {
45
601
        Self::Any
46
601
    }
47
}
48
49
#[async_trait]
50
pub trait ActionStateResult: Send + Sync + 'static {
51
    /// Provides the current state of the action.
52
    async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error>;
53
    /// Waits for the state of the action to change.
54
    async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error>;
55
    /// Provide result as action info. This behavior will not be supported by all implementations.
56
    async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error>;
57
}
58
59
/// The direction in which the results are ordered.
60
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61
pub enum OrderDirection {
62
    Asc,
63
    Desc,
64
}
65
66
/// The filters used to query operations from the state manager.
67
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
68
pub struct OperationFilter {
69
    // TODO(adams): create rust builder pattern?
70
    /// The stage(s) that the operation must be in.
71
    pub stages: OperationStageFlags,
72
73
    /// The client operation id.
74
    pub client_operation_id: Option<OperationId>,
75
76
    /// The operation id.
77
    pub operation_id: Option<OperationId>,
78
79
    /// The worker that the operation must be assigned to.
80
    pub worker_id: Option<WorkerId>,
81
82
    /// The digest of the action that the operation must have.
83
    pub action_digest: Option<DigestInfo>,
84
85
    /// The operation must have its worker timestamp before this time.
86
    pub worker_update_before: Option<SystemTime>,
87
88
    /// The operation must have been completed before this time.
89
    pub completed_before: Option<SystemTime>,
90
91
    /// The unique key for filtering specific action results.
92
    pub unique_key: Option<ActionUniqueKey>,
93
94
    /// If the results should be ordered by priority and in which direction.
95
    pub order_by_priority_direction: Option<OrderDirection>,
96
}
97
98
pub type ActionStateResultStream<'a> =
99
    Pin<Box<dyn Stream<Item = Box<dyn ActionStateResult>> + Send + 'a>>;
100
101
#[async_trait]
102
pub trait ClientStateManager: Sync + Send + Unpin + MetricsComponent + 'static {
103
    /// Add a new action to the queue or joins an existing action.
104
    async fn add_action(
105
        &self,
106
        client_operation_id: OperationId,
107
        action_info: Arc<ActionInfo>,
108
    ) -> Result<Box<dyn ActionStateResult>, Error>;
109
110
    /// Returns a stream of operations that match the filter.
111
    async fn filter_operations(
112
        &self,
113
        filter: OperationFilter,
114
    ) -> Result<ActionStateResultStream, Error>;
115
116
    /// Returns the known platform property provider for the given instance
117
    /// if this implementation supports it.
118
    // TODO(https://github.com/rust-lang/rust/issues/65991) When this lands we can
119
    // remove this and have the call sites instead try to cast the ClientStateManager
120
    // into a KnownPlatformPropertyProvider instead. Rust currently does not support
121
    // casting traits to other traits.
122
    fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider>;
123
}
124
125
/// The type of update to perform on an operation.
126
#[derive(Debug, PartialEq)]
127
#[allow(
128
    clippy::large_enum_variant,
129
    reason = "TODO Fix this. Breaks on stable, but not on nightly"
130
)]
131
pub enum UpdateOperationType {
132
    /// Notification that the operation is still alive.
133
    KeepAlive,
134
135
    /// Notification that the operation has been updated.
136
    UpdateWithActionStage(ActionStage),
137
138
    /// Notification that the operation has been completed.
139
    UpdateWithError(Error),
140
}
141
142
#[async_trait]
143
pub trait WorkerStateManager: Sync + Send + MetricsComponent {
144
    /// Update that state of an operation.
145
    /// The worker must also send periodic updates even if the state
146
    /// did not change with a modified timestamp in order to prevent
147
    /// the operation from being considered stale and being rescheduled.
148
    async fn update_operation(
149
        &self,
150
        operation_id: &OperationId,
151
        worker_id: &WorkerId,
152
        update: UpdateOperationType,
153
    ) -> Result<(), Error>;
154
}
155
156
#[async_trait]
157
pub trait MatchingEngineStateManager: Sync + Send + MetricsComponent {
158
    /// Returns a stream of operations that match the filter.
159
    async fn filter_operations<'a>(
160
        &'a self,
161
        filter: OperationFilter,
162
    ) -> Result<ActionStateResultStream<'a>, Error>;
163
164
    /// Assign an operation to a worker or unassign it.
165
    async fn assign_operation(
166
        &self,
167
        operation_id: &OperationId,
168
        worker_id_or_reason_for_unassign: Result<&WorkerId, Error>,
169
    ) -> Result<(), Error>;
170
}