Coverage Report

Created: 2026-05-06 18:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/action_messages.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::Ordering;
16
use core::convert::Into;
17
use core::fmt::Display;
18
use core::hash::Hash;
19
use core::time::Duration;
20
use std::collections::HashMap;
21
use std::time::SystemTime;
22
23
use humantime::format_duration;
24
use nativelink_error::{Error, ResultExt, error_if, make_input_err};
25
use nativelink_metric::{
26
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, publish,
27
};
28
use nativelink_proto::build::bazel::remote::execution::v2::{
29
    Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
30
    ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile,
31
    OutputSymlink, SymlinkNode, execution_stage,
32
};
33
use nativelink_proto::google::longrunning::Operation;
34
use nativelink_proto::google::longrunning::operation::Result as LongRunningResult;
35
use nativelink_proto::google::rpc::Status;
36
use prost::Message;
37
use prost::bytes::Bytes;
38
use prost_types::Any;
39
use serde::ser::Error as SerdeError;
40
use serde::{Deserialize, Serialize};
41
use tonic::Code;
42
use uuid::Uuid;
43
44
use crate::common::{DigestInfo, HashMapExt, VecExt};
45
use crate::digest_hasher::DigestHasherFunc;
46
47
/// Default priority remote execution jobs will get when not provided.
48
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;
49
50
/// Exit code sent if there is an internal error.
51
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;
52
53
/// Holds an id that is unique to the client for a requested operation.
54
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
55
pub enum OperationId {
56
    Uuid(Uuid),
57
    String(String),
58
}
59
60
impl OperationId {
61
5
    pub fn into_string(self) -> String {
62
5
        match self {
63
1
            Self::Uuid(uuid) => uuid.to_string(),
64
4
            Self::String(name) => name,
65
        }
66
5
    }
67
}
68
69
impl Default for OperationId {
70
108
    fn default() -> Self {
71
108
        Self::Uuid(Uuid::new_v4())
72
108
    }
73
}
74
75
impl Display for OperationId {
76
438
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77
438
        match self {
78
263
            Self::Uuid(uuid) => uuid.fmt(f),
79
175
            Self::String(name) => f.write_str(name),
80
        }
81
438
    }
82
}
83
84
impl MetricsComponent for OperationId {
85
0
    fn publish(
86
0
        &self,
87
0
        _kind: MetricKind,
88
0
        _field_metadata: MetricFieldData,
89
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
90
0
        Ok(MetricPublishKnownKindData::String(self.to_string()))
91
0
    }
92
}
93
94
impl From<&str> for OperationId {
95
49
    fn from(value: &str) -> Self {
96
49
        Uuid::parse_str(value).map_or_else(|_| Self::String(
value26
.
to_string26
()), Self::Uuid)
97
49
    }
98
}
99
100
impl From<String> for OperationId {
101
11
    fn from(value: String) -> Self {
102
11
        Uuid::parse_str(&value).map_or(Self::String(value), Self::Uuid)
103
11
    }
104
}
105
106
impl TryFrom<Bytes> for OperationId {
107
    type Error = Error;
108
109
0
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
110
        // This is an optimized path to attempt to do the conversion in-place
111
        // to avoid an extra allocation/copy.
112
0
        match value.try_into_mut() {
113
            // We are the only reference to the Bytes, so we can convert it into a Vec<u8>
114
            // for free then convert the Vec<u8> to a String for free too.
115
0
            Ok(value) => {
116
0
                let value = String::from_utf8(value.into()).map_err(|e| {
117
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(
118
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId",
119
                    )
120
0
                })?;
121
0
                Ok(Self::from(value))
122
            }
123
            // We could not take ownership of the Bytes, so we may need to copy our data.
124
0
            Err(value) => {
125
0
                let value = core::str::from_utf8(&value).map_err(|e| {
126
0
                    Error::from_std_err(Code::InvalidArgument, &e).append(
127
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId",
128
                    )
129
0
                })?;
130
0
                Ok(Self::from(value))
131
            }
132
        }
133
0
    }
134
}
135
136
/// Unique id of worker.
137
#[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
138
pub struct WorkerId(pub String);
139
140
impl MetricsComponent for WorkerId {
141
0
    fn publish(
142
0
        &self,
143
0
        _kind: MetricKind,
144
0
        _field_metadata: MetricFieldData,
145
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
146
0
        Ok(MetricPublishKnownKindData::String(self.0.clone()))
147
0
    }
148
}
149
150
impl Display for WorkerId {
151
217
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152
217
        f.write_fmt(format_args!("{}", self.0))
153
217
    }
154
}
155
156
impl core::fmt::Debug for WorkerId {
157
130
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
158
130
        Display::fmt(&self, f)
159
130
    }
160
}
161
162
impl From<WorkerId> for String {
163
105
    fn from(val: WorkerId) -> Self {
164
105
        val.0
165
105
    }
166
}
167
168
impl From<String> for WorkerId {
169
11
    fn from(s: String) -> Self {
170
11
        Self(s)
171
11
    }
172
}
173
174
/// Holds the information needed to uniquely identify an action
175
/// and if it is cacheable or not.
176
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
177
pub enum ActionUniqueQualifier {
178
    /// The action is cacheable.
179
    #[serde(alias = "Cachable")] // Pre 0.7.0 spelling
180
    Cacheable(ActionUniqueKey),
181
    /// The action is uncacheable.
182
    #[serde(alias = "Uncachable")] // Pre 0.7.0 spelling
183
    Uncacheable(ActionUniqueKey),
184
}
185
186
impl MetricsComponent for ActionUniqueQualifier {
187
0
    fn publish(
188
0
        &self,
189
0
        _kind: MetricKind,
190
0
        field_metadata: MetricFieldData,
191
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
192
0
        let (cacheable, action) = match self {
193
0
            Self::Cacheable(action) => (true, action),
194
0
            Self::Uncacheable(action) => (false, action),
195
        };
196
0
        publish!(
197
0
            cacheable,
198
0
            &cacheable,
199
0
            MetricKind::Default,
200
0
            "If the action is cacheable.",
201
0
            ""
202
        );
203
0
        action.publish(MetricKind::Component, field_metadata)?;
204
0
        Ok(MetricPublishKnownKindData::Component)
205
0
    }
206
}
207
208
impl ActionUniqueQualifier {
209
    /// Get the `instance_name` of the action.
210
47
    pub const fn instance_name(&self) -> &String {
211
47
        match self {
212
47
            Self::Cacheable(action) | Self::Uncacheable(
action0
) => &action.instance_name,
213
        }
214
47
    }
215
216
    /// Get the digest function of the action.
217
12
    pub const fn digest_function(&self) -> DigestHasherFunc {
218
12
        match self {
219
12
            Self::Cacheable(action) | Self::Uncacheable(
action0
) => action.digest_function,
220
        }
221
12
    }
222
223
    /// Get the digest of the action.
224
180
    pub const fn digest(&self) -> DigestInfo {
225
180
        match self {
226
180
            Self::Cacheable(
action172
) | Self::Uncacheable(
action8
) => action.digest,
227
        }
228
180
    }
229
}
230
231
impl Display for ActionUniqueQualifier {
232
19
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
233
19
        let (cacheable, unique_key) = match self {
234
19
            Self::Cacheable(action) => (true, action),
235
0
            Self::Uncacheable(action) => (false, action),
236
        };
237
19
        f.write_fmt(format_args!(
238
            // Note: We use underscores because it makes escaping easier
239
            // for redis.
240
            "{}_{}_{}_{}_{}",
241
            unique_key.instance_name,
242
            unique_key.digest_function,
243
19
            unique_key.digest.packed_hash(),
244
19
            unique_key.digest.size_bytes(),
245
19
            if cacheable { 'c' } else { 
'u'0
},
246
        ))
247
19
    }
248
}
249
250
/// This is a utility struct used to make it easier to match `ActionInfos` in a
251
/// `HashMap` without needing to construct an entire `ActionInfo`.
252
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, MetricsComponent)]
253
pub struct ActionUniqueKey {
254
    /// Name of instance group this action belongs to.
255
    #[metric(help = "Name of instance group this action belongs to.")]
256
    pub instance_name: String,
257
    /// The digest function this action expects.
258
    #[metric(help = "The digest function this action expects.")]
259
    pub digest_function: DigestHasherFunc,
260
    /// Digest of the underlying `Action`.
261
    #[metric(help = "Digest of the underlying Action.")]
262
    pub digest: DigestInfo,
263
}
264
265
impl Display for ActionUniqueKey {
266
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
267
0
        f.write_fmt(format_args!(
268
            "{}/{}/{}",
269
            self.instance_name, self.digest_function, self.digest,
270
        ))
271
0
    }
272
}
273
274
/// Information needed to execute an action. This struct is used over bazel's proto `Action`
275
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
276
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
277
/// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto`
278
/// except for the salt field.
279
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, MetricsComponent)]
280
pub struct ActionInfo {
281
    /// Digest of the underlying `Command`.
282
    #[metric(help = "Digest of the underlying Command.")]
283
    pub command_digest: DigestInfo,
284
    /// Digest of the underlying `Directory`.
285
    #[metric(help = "Digest of the underlying Directory.")]
286
    pub input_root_digest: DigestInfo,
287
    /// Timeout of the action.
288
    #[metric(help = "Timeout of the action.")]
289
    pub timeout: Duration,
290
    /// The properties rules that must be applied when finding a worker that can run this action.
291
    #[metric(group = "platform_properties")]
292
    pub platform_properties: HashMap<String, String>,
293
    /// The priority of the action. Higher value means it should execute faster.
294
    #[metric(help = "The priority of the action. Higher value means it should execute faster.")]
295
    pub priority: i32,
296
    /// When this action started to be loaded from the CAS.
297
    #[metric(help = "When this action started to be loaded from the CAS.")]
298
    pub load_timestamp: SystemTime,
299
    /// When this action was created.
300
    #[metric(help = "When this action was created.")]
301
    pub insert_timestamp: SystemTime,
302
    /// Info used to uniquely identify this `ActionInfo` and if it is cacheable.
303
    /// This is primarily used to join actions/operations together using this key.
304
    #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cacheable.")]
305
    pub unique_qualifier: ActionUniqueQualifier,
306
}
307
308
impl ActionInfo {
309
    #[inline]
310
0
    pub const fn instance_name(&self) -> &String {
311
0
        self.unique_qualifier.instance_name()
312
0
    }
313
314
    /// Returns the underlying digest of the `Action`.
315
    #[inline]
316
134
    pub const fn digest(&self) -> DigestInfo {
317
134
        self.unique_qualifier.digest()
318
134
    }
319
320
21
    pub fn try_from_action_and_execute_request(
321
21
        execute_request: ExecuteRequest,
322
21
        action: Action,
323
21
        load_timestamp: SystemTime,
324
21
        queued_timestamp: SystemTime,
325
21
    ) -> Result<Self, Error> {
326
21
        let unique_key = ActionUniqueKey {
327
21
            instance_name: execute_request.instance_name,
328
21
            digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
329
21
                .err_tip(|| 
format!0
("Could not find digest_function in try_from_action_and_execute_request {:?}", execute_request.digest_function))
?0
,
330
21
            digest: execute_request
331
21
                .action_digest
332
21
                .err_tip(|| "Expected action_digest to exist on ExecuteRequest")
?0
333
21
                .try_into()
?0
,
334
        };
335
21
        let unique_qualifier = if execute_request.skip_cache_lookup {
336
0
            ActionUniqueQualifier::Uncacheable(unique_key)
337
        } else {
338
21
            ActionUniqueQualifier::Cacheable(unique_key)
339
        };
340
341
21
        let proto_properties = action.platform.unwrap_or_default();
342
21
        let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len());
343
21
        for 
property1
in proto_properties.properties {
344
1
            platform_properties.insert(property.name, property.value);
345
1
        }
346
347
        Ok(Self {
348
21
            command_digest: action
349
21
                .command_digest
350
21
                .err_tip(|| "Expected command_digest to exist on Action")
?0
351
21
                .try_into()
?0
,
352
21
            input_root_digest: action
353
21
                .input_root_digest
354
21
                .err_tip(|| "Expected input_root_digest to exist on Action")
?0
355
21
                .try_into()
?0
,
356
21
            timeout: action
357
21
                .timeout
358
21
                .unwrap_or_default()
359
21
                .try_into()
360
21
                .map_err(|err| 
{0
361
0
                    Error::from_std_err(Code::InvalidArgument, &err)
362
0
                        .append("Failed convert proto duration to system duration")
363
0
                })?,
364
21
            platform_properties,
365
21
            priority: execute_request
366
21
                .execution_policy
367
21
                .unwrap_or_default()
368
21
                .priority,
369
21
            load_timestamp,
370
21
            insert_timestamp: queued_timestamp,
371
21
            unique_qualifier,
372
        })
373
21
    }
374
}
375
376
impl From<&ActionInfo> for ExecuteRequest {
377
39
    fn from(val: &ActionInfo) -> Self {
378
39
        let digest = val.digest().into();
379
39
        let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier {
380
31
            ActionUniqueQualifier::Cacheable(unique_qualifier) => (false, unique_qualifier),
381
8
            ActionUniqueQualifier::Uncacheable(unique_qualifier) => (true, unique_qualifier),
382
        };
383
39
        Self {
384
39
            instance_name: unique_qualifier.instance_name.clone(),
385
39
            action_digest: Some(digest),
386
39
            skip_cache_lookup,
387
39
            execution_policy: None,     // Not used in the worker.
388
39
            results_cache_policy: None, // Not used in the worker.
389
39
            digest_function: unique_qualifier.digest_function.proto_digest_func().into(),
390
39
        }
391
39
    }
392
}
393
394
/// Simple utility struct to determine if a string is representing a full path or
395
/// just the name of the file.
396
/// This is in order to be able to reuse the same struct instead of building different
397
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
398
/// structs.
399
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
400
pub enum NameOrPath {
401
    Name(String),
402
    Path(String),
403
}
404
405
impl PartialOrd for NameOrPath {
406
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
407
0
        Some(self.cmp(other))
408
0
    }
409
}
410
411
impl Ord for NameOrPath {
412
0
    fn cmp(&self, other: &Self) -> Ordering {
413
0
        let self_lexical_name = match self {
414
0
            Self::Name(name) => name,
415
0
            Self::Path(path) => path,
416
        };
417
0
        let other_lexical_name = match other {
418
0
            Self::Name(name) => name,
419
0
            Self::Path(path) => path,
420
        };
421
0
        self_lexical_name.cmp(other_lexical_name)
422
0
    }
423
}
424
425
/// Represents an individual file and associated metadata.
426
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs
427
/// in `remote_execution.proto`.
428
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
429
pub struct FileInfo {
430
    pub name_or_path: NameOrPath,
431
    pub digest: DigestInfo,
432
    pub is_executable: bool,
433
}
434
435
impl TryFrom<FileInfo> for FileNode {
436
    type Error = Error;
437
438
3
    fn try_from(val: FileInfo) -> Result<Self, Error> {
439
3
        match val.name_or_path {
440
0
            NameOrPath::Path(_) => Err(make_input_err!(
441
0
                "Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
442
0
            )),
443
3
            NameOrPath::Name(name) => Ok(Self {
444
3
                name,
445
3
                digest: Some((&val.digest).into()),
446
3
                is_executable: val.is_executable,
447
3
                node_properties: None, // Not supported.
448
3
            }),
449
        }
450
3
    }
451
}
452
453
impl TryFrom<OutputFile> for FileInfo {
454
    type Error = Error;
455
456
2
    fn try_from(output_file: OutputFile) -> Result<Self, Error> {
457
        Ok(Self {
458
2
            name_or_path: NameOrPath::Path(output_file.path),
459
2
            digest: output_file
460
2
                .digest
461
2
                .err_tip(|| "Expected digest to exist on OutputFile")
?0
462
2
                .try_into()
?0
,
463
2
            is_executable: output_file.is_executable,
464
        })
465
2
    }
466
}
467
468
impl TryFrom<FileInfo> for OutputFile {
469
    type Error = Error;
470
471
7
    fn try_from(val: FileInfo) -> Result<Self, Error> {
472
7
        match val.name_or_path {
473
0
            NameOrPath::Name(_) => Err(make_input_err!(
474
0
                "Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
475
0
            )),
476
7
            NameOrPath::Path(path) => Ok(Self {
477
7
                path,
478
7
                digest: Some((&val.digest).into()),
479
7
                is_executable: val.is_executable,
480
7
                contents: Bytes::default(),
481
7
                node_properties: None, // Not supported.
482
7
            }),
483
        }
484
7
    }
485
}
486
487
/// Represents an individual symlink file and associated metadata.
488
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
489
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
490
pub struct SymlinkInfo {
491
    pub name_or_path: NameOrPath,
492
    pub target: String,
493
}
494
495
impl TryFrom<SymlinkNode> for SymlinkInfo {
496
    type Error = Error;
497
498
0
    fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
499
0
        Ok(Self {
500
0
            name_or_path: NameOrPath::Name(symlink_node.name),
501
0
            target: symlink_node.target,
502
0
        })
503
0
    }
504
}
505
506
impl TryFrom<SymlinkInfo> for SymlinkNode {
507
    type Error = Error;
508
509
1
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
510
1
        match val.name_or_path {
511
0
            NameOrPath::Path(_) => Err(make_input_err!(
512
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
513
0
            )),
514
1
            NameOrPath::Name(name) => Ok(Self {
515
1
                name,
516
1
                target: val.target,
517
1
                node_properties: None, // Not supported.
518
1
            }),
519
        }
520
1
    }
521
}
522
523
impl TryFrom<OutputSymlink> for SymlinkInfo {
524
    type Error = Error;
525
526
2
    fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
527
2
        Ok(Self {
528
2
            name_or_path: NameOrPath::Path(output_symlink.path),
529
2
            target: output_symlink.target,
530
2
        })
531
2
    }
532
}
533
534
impl TryFrom<SymlinkInfo> for OutputSymlink {
535
    type Error = Error;
536
537
2
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
538
2
        match val.name_or_path {
539
2
            NameOrPath::Path(path) => {
540
2
                Ok(Self {
541
2
                    path,
542
2
                    target: val.target,
543
2
                    node_properties: None, // Not supported.
544
2
                })
545
            }
546
0
            NameOrPath::Name(_) => Err(make_input_err!(
547
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
548
0
            )),
549
        }
550
2
    }
551
}
552
553
/// Represents an individual directory file and associated metadata.
554
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
555
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
556
pub struct DirectoryInfo {
557
    pub path: String,
558
    pub tree_digest: DigestInfo,
559
}
560
561
impl TryFrom<OutputDirectory> for DirectoryInfo {
562
    type Error = Error;
563
564
2
    fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> {
565
        Ok(Self {
566
2
            path: output_directory.path,
567
2
            tree_digest: output_directory
568
2
                .tree_digest
569
2
                .err_tip(|| "Expected tree_digest to exist in OutputDirectory")
?0
570
2
                .try_into()
?0
,
571
        })
572
2
    }
573
}
574
575
impl From<DirectoryInfo> for OutputDirectory {
576
1
    fn from(val: DirectoryInfo) -> Self {
577
1
        Self {
578
1
            path: val.path,
579
1
            tree_digest: Some(val.tree_digest.into()),
580
1
            is_topologically_sorted: false,
581
1
        }
582
1
    }
583
}
584
585
/// Represents the metadata associated with the execution result.
586
/// This struct must be 100% compatible with `ExecutedActionMetadata`.
587
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
588
pub struct ExecutionMetadata {
589
    pub worker: String,
590
    pub queued_timestamp: SystemTime,
591
    pub worker_start_timestamp: SystemTime,
592
    pub worker_completed_timestamp: SystemTime,
593
    pub input_fetch_start_timestamp: SystemTime,
594
    pub input_fetch_completed_timestamp: SystemTime,
595
    pub execution_start_timestamp: SystemTime,
596
    pub execution_completed_timestamp: SystemTime,
597
    pub output_upload_start_timestamp: SystemTime,
598
    pub output_upload_completed_timestamp: SystemTime,
599
}
600
601
impl Default for ExecutionMetadata {
602
2
    fn default() -> Self {
603
2
        Self {
604
2
            worker: String::new(),
605
2
            queued_timestamp: SystemTime::UNIX_EPOCH,
606
2
            worker_start_timestamp: SystemTime::UNIX_EPOCH,
607
2
            worker_completed_timestamp: SystemTime::UNIX_EPOCH,
608
2
            input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
609
2
            input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
610
2
            execution_start_timestamp: SystemTime::UNIX_EPOCH,
611
2
            execution_completed_timestamp: SystemTime::UNIX_EPOCH,
612
2
            output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
613
2
            output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
614
2
        }
615
2
    }
616
}
617
618
impl From<ExecutionMetadata> for ExecutedActionMetadata {
619
20
    fn from(val: ExecutionMetadata) -> Self {
620
        Self {
621
20
            worker: val.worker,
622
20
            queued_timestamp: Some(val.queued_timestamp.into()),
623
20
            worker_start_timestamp: Some(val.worker_start_timestamp.into()),
624
20
            worker_completed_timestamp: Some(val.worker_completed_timestamp.into()),
625
20
            input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()),
626
20
            input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()),
627
20
            execution_start_timestamp: Some(val.execution_start_timestamp.into()),
628
20
            execution_completed_timestamp: Some(val.execution_completed_timestamp.into()),
629
20
            output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()),
630
20
            output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()),
631
20
            virtual_execution_duration: val
632
20
                .execution_completed_timestamp
633
20
                .duration_since(val.execution_start_timestamp)
634
20
                .ok()
635
20
                .and_then(|duration| prost_types::Duration::try_from(duration).ok()),
636
20
            auxiliary_metadata: Vec::default(),
637
        }
638
20
    }
639
}
640
641
impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
642
    type Error = Error;
643
644
3
    fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> {
645
        Ok(Self {
646
3
            worker: eam.worker,
647
3
            queued_timestamp: eam
648
3
                .queued_timestamp
649
3
                .err_tip(|| "Expected queued_timestamp to exist in ExecutedActionMetadata")
?0
650
3
                .try_into()
?0
,
651
3
            worker_start_timestamp: eam
652
3
                .worker_start_timestamp
653
3
                .err_tip(|| "Expected worker_start_timestamp to exist in ExecutedActionMetadata")
?0
654
3
                .try_into()
?0
,
655
3
            worker_completed_timestamp: eam
656
3
                .worker_completed_timestamp
657
3
                .err_tip(|| 
{0
658
0
                    "Expected worker_completed_timestamp to exist in ExecutedActionMetadata"
659
0
                })?
660
3
                .try_into()
?0
,
661
3
            input_fetch_start_timestamp: eam
662
3
                .input_fetch_start_timestamp
663
3
                .err_tip(|| 
{0
664
0
                    "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata"
665
0
                })?
666
3
                .try_into()
?0
,
667
3
            input_fetch_completed_timestamp: eam
668
3
                .input_fetch_completed_timestamp
669
3
                .err_tip(|| 
{0
670
0
                    "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata"
671
0
                })?
672
3
                .try_into()
?0
,
673
3
            execution_start_timestamp: eam
674
3
                .execution_start_timestamp
675
3
                .err_tip(|| 
{0
676
0
                    "Expected execution_start_timestamp to exist in ExecutedActionMetadata"
677
0
                })?
678
3
                .try_into()
?0
,
679
3
            execution_completed_timestamp: eam
680
3
                .execution_completed_timestamp
681
3
                .err_tip(|| 
{0
682
0
                    "Expected execution_completed_timestamp to exist in ExecutedActionMetadata"
683
0
                })?
684
3
                .try_into()
?0
,
685
3
            output_upload_start_timestamp: eam
686
3
                .output_upload_start_timestamp
687
3
                .err_tip(|| 
{0
688
0
                    "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata"
689
0
                })?
690
3
                .try_into()
?0
,
691
3
            output_upload_completed_timestamp: eam
692
3
                .output_upload_completed_timestamp
693
3
                .err_tip(|| 
{0
694
0
                    "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata"
695
0
                })?
696
3
                .try_into()
?0
,
697
        })
698
3
    }
699
}
700
701
/// Represents the results of an execution.
702
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
703
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
704
pub struct ActionResult {
705
    pub output_files: Vec<FileInfo>,
706
    pub output_folders: Vec<DirectoryInfo>,
707
    pub output_directory_symlinks: Vec<SymlinkInfo>,
708
    pub output_file_symlinks: Vec<SymlinkInfo>,
709
    pub exit_code: i32,
710
    pub stdout_digest: DigestInfo,
711
    pub stderr_digest: DigestInfo,
712
    pub execution_metadata: ExecutionMetadata,
713
    pub server_logs: HashMap<String, DigestInfo>,
714
    pub error: Option<Error>,
715
    pub message: String,
716
}
717
718
impl Default for ActionResult {
719
14
    fn default() -> Self {
720
14
        Self {
721
14
            output_files: Vec::default(),
722
14
            output_folders: Vec::default(),
723
14
            output_directory_symlinks: Vec::default(),
724
14
            output_file_symlinks: Vec::default(),
725
14
            exit_code: INTERNAL_ERROR_EXIT_CODE,
726
14
            stdout_digest: DigestInfo::new([0u8; 32], 0),
727
14
            stderr_digest: DigestInfo::new([0u8; 32], 0),
728
14
            execution_metadata: ExecutionMetadata {
729
14
                worker: String::new(),
730
14
                queued_timestamp: SystemTime::UNIX_EPOCH,
731
14
                worker_start_timestamp: SystemTime::UNIX_EPOCH,
732
14
                worker_completed_timestamp: SystemTime::UNIX_EPOCH,
733
14
                input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
734
14
                input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
735
14
                execution_start_timestamp: SystemTime::UNIX_EPOCH,
736
14
                execution_completed_timestamp: SystemTime::UNIX_EPOCH,
737
14
                output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
738
14
                output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
739
14
            },
740
14
            server_logs: HashMap::default(),
741
14
            error: None,
742
14
            message: String::new(),
743
14
        }
744
14
    }
745
}
746
747
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
748
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
749
#[allow(
750
    clippy::large_enum_variant,
751
    reason = "TODO box the two relevant variants in a breaking release. Unfulfilled on nightly"
752
)]
753
pub enum ActionStage {
754
    /// Stage is unknown.
755
    Unknown,
756
    /// Checking the cache to see if action exists.
757
    CacheCheck,
758
    /// Action has been accepted and waiting for worker to take it.
759
    Queued,
760
    // TODO(palfrey) We need a way to know if the job was sent to a worker, but hasn't begun
761
    // execution yet.
762
    /// Worker is executing the action.
763
    Executing,
764
    /// Worker completed the work with result.
765
    Completed(ActionResult),
766
    /// Result was found from cache, don't decode the proto just to re-encode it.
767
    #[serde(serialize_with = "serialize_proto_result", skip_deserializing)]
768
    // The serialization step decodes this to an ActionResult which is serializable.
769
    // Since it will always be serialized as an ActionResult, we do not need to support
770
    // deserialization on this type at all.
771
    // In theory, serializing this should never happen so performance shouldn't be affected.
772
    CompletedFromCache(ProtoActionResult),
773
}
774
775
0
fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error>
776
0
where
777
0
    S: serde::Serializer,
778
{
779
0
    let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?;
780
0
    s.serialize(serializer)
781
0
}
782
783
impl ActionStage {
784
128
    pub const fn has_action_result(&self) -> bool {
785
128
        match self {
786
111
            Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
787
17
            Self::Completed(_) | Self::CompletedFromCache(_) => true,
788
        }
789
128
    }
790
791
    /// Returns true if the worker considers the action done and no longer needs to be tracked.
792
    // Note: This function is separate from `has_action_result()` to not mix the concept of
793
    //       "finished" with "has a result".
794
123
    pub const fn is_finished(&self) -> bool {
795
123
        self.has_action_result()
796
123
    }
797
798
    /// Returns if the stage enum is the same as the other stage enum, but
799
    /// does not compare the values of the enum.
800
39
    pub const fn is_same_stage(&self, other: &Self) -> bool {
801
39
        matches!(
802
39
            (self, other),
803
            (Self::Unknown, Self::Unknown)
804
                | (Self::CacheCheck, Self::CacheCheck)
805
                | (Self::Queued, Self::Queued)
806
                | (Self::Executing, Self::Executing)
807
                | (Self::Completed(_), Self::Completed(_))
808
                | (Self::CompletedFromCache(_), Self::CompletedFromCache(_))
809
        )
810
39
    }
811
812
0
    pub fn name(&self) -> String {
813
0
        match self {
814
0
            Self::Unknown => "Unknown".to_string(),
815
0
            Self::CacheCheck => "CacheCheck".to_string(),
816
0
            Self::Queued => "Queued".to_string(),
817
0
            Self::Executing => "Executing".to_string(),
818
0
            Self::Completed(_) => "Completed".to_string(),
819
0
            Self::CompletedFromCache(_) => "CompletedFromCache".to_string(),
820
        }
821
0
    }
822
}
823
824
impl MetricsComponent for ActionStage {
825
0
    fn publish(
826
0
        &self,
827
0
        _kind: MetricKind,
828
0
        _field_metadata: MetricFieldData,
829
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
830
0
        Ok(MetricPublishKnownKindData::String(self.name()))
831
0
    }
832
}
833
834
impl From<&ActionStage> for execution_stage::Value {
835
5
    fn from(val: &ActionStage) -> Self {
836
5
        match val {
837
0
            ActionStage::Unknown => Self::Unknown,
838
0
            ActionStage::CacheCheck => Self::CacheCheck,
839
3
            ActionStage::Queued => Self::Queued,
840
0
            ActionStage::Executing => Self::Executing,
841
2
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
842
        }
843
5
    }
844
}
845
846
14
pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse {
847
14
    fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> {
848
14
        let mut logs = HashMap::with_capacity(server_logs.len());
849
14
        for (
k1
,
v1
) in server_logs {
850
1
            logs.insert(
851
1
                k.clone(),
852
1
                LogFile {
853
1
                    digest: Some(v.into()),
854
1
                    human_readable: false,
855
1
                },
856
1
            );
857
1
        }
858
14
        logs
859
14
    }
860
861
14
    let status = Some(
862
14
        action_result
863
14
            .error
864
14
            .clone()
865
14
            .map_or_else(Status::default, Into::into),
866
14
    );
867
14
    let message = action_result.message.clone();
868
14
    ExecuteResponse {
869
14
        server_logs: logs_from(action_result.server_logs.clone()),
870
14
        result: action_result.try_into().ok(),
871
14
        cached_result: false,
872
14
        status,
873
14
        message,
874
14
    }
875
14
}
876
877
impl From<ActionStage> for ExecuteResponse {
878
9
    fn from(val: ActionStage) -> Self {
879
9
        match val {
880
            // We don't have an execute response if we don't have the results. It is defined
881
            // behavior to return an empty proto struct.
882
            ActionStage::Unknown
883
            | ActionStage::CacheCheck
884
            | ActionStage::Queued
885
0
            | ActionStage::Executing => Self::default(),
886
9
            ActionStage::Completed(action_result) => to_execute_response(action_result),
887
            // Handled separately as there are no server logs and the action
888
            // result is already in Proto format.
889
0
            ActionStage::CompletedFromCache(proto_action_result) => Self {
890
0
                server_logs: HashMap::new(),
891
0
                result: Some(proto_action_result),
892
0
                cached_result: true,
893
0
                status: Some(Status::default()),
894
0
                message: String::new(), // Will be populated later if applicable.
895
0
            },
896
        }
897
9
    }
898
}
899
900
impl TryFrom<ActionResult> for ProtoActionResult {
901
    type Error = Error;
902
903
20
    fn try_from(val: ActionResult) -> Result<Self, Error> {
904
20
        let mut output_symlinks = Vec::with_capacity(
905
20
            val.output_file_symlinks.len() + val.output_directory_symlinks.len(),
906
        );
907
20
        output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice());
908
20
        output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice());
909
910
        Ok(Self {
911
20
            output_files: val
912
20
                .output_files
913
20
                .into_iter()
914
20
                .map(TryInto::try_into)
915
20
                .collect::<Result<_, _>>()
?0
,
916
20
            output_file_symlinks: val
917
20
                .output_file_symlinks
918
20
                .into_iter()
919
20
                .map(TryInto::try_into)
920
20
                .collect::<Result<_, _>>()
?0
,
921
20
            output_symlinks: output_symlinks
922
20
                .into_iter()
923
20
                .map(TryInto::try_into)
924
20
                .collect::<Result<_, _>>()
?0
,
925
20
            output_directories: val
926
20
                .output_folders
927
20
                .into_iter()
928
20
                .map(TryInto::try_into)
929
20
                .collect::<Result<_, _>>()
?0
,
930
20
            output_directory_symlinks: val
931
20
                .output_directory_symlinks
932
20
                .into_iter()
933
20
                .map(TryInto::try_into)
934
20
                .collect::<Result<_, _>>()
?0
,
935
20
            exit_code: val.exit_code,
936
20
            stdout_raw: Bytes::default(),
937
20
            stdout_digest: Some(val.stdout_digest.into()),
938
20
            stderr_raw: Bytes::default(),
939
20
            stderr_digest: Some(val.stderr_digest.into()),
940
20
            execution_metadata: Some(val.execution_metadata.into()),
941
        })
942
20
    }
943
}
944
945
impl TryFrom<ProtoActionResult> for ActionResult {
946
    type Error = Error;
947
948
0
    fn try_from(val: ProtoActionResult) -> Result<Self, Error> {
949
0
        let output_file_symlinks = val
950
0
            .output_file_symlinks
951
0
            .into_iter()
952
0
            .map(|output_symlink| {
953
0
                SymlinkInfo::try_from(output_symlink)
954
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
955
0
            })
956
0
            .collect::<Result<Vec<_>, _>>()?;
957
958
0
        let output_directory_symlinks = val
959
0
            .output_directory_symlinks
960
0
            .into_iter()
961
0
            .map(|output_symlink| {
962
0
                SymlinkInfo::try_from(output_symlink)
963
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
964
0
            })
965
0
            .collect::<Result<Vec<_>, _>>()?;
966
967
0
        let output_files = val
968
0
            .output_files
969
0
            .into_iter()
970
0
            .map(|output_file| {
971
0
                output_file
972
0
                    .try_into()
973
0
                    .err_tip(|| "Output File could not be converted")
974
0
            })
975
0
            .collect::<Result<Vec<_>, _>>()?;
976
977
0
        let output_folders = val
978
0
            .output_directories
979
0
            .into_iter()
980
0
            .map(|output_directory| {
981
0
                output_directory
982
0
                    .try_into()
983
0
                    .err_tip(|| "Output File could not be converted")
984
0
            })
985
0
            .collect::<Result<Vec<_>, _>>()?;
986
987
        Ok(Self {
988
0
            output_files,
989
0
            output_folders,
990
0
            output_file_symlinks,
991
0
            output_directory_symlinks,
992
0
            exit_code: val.exit_code,
993
0
            stdout_digest: val
994
0
                .stdout_digest
995
0
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?
996
0
                .try_into()?,
997
0
            stderr_digest: val
998
0
                .stderr_digest
999
0
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?
1000
0
                .try_into()?,
1001
0
            execution_metadata: val
1002
0
                .execution_metadata
1003
0
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?
1004
0
                .try_into()?,
1005
0
            server_logs: HashMap::default(),
1006
0
            error: None,
1007
0
            message: String::new(),
1008
        })
1009
0
    }
1010
}
1011
1012
impl TryFrom<ExecuteResponse> for ActionStage {
1013
    type Error = Error;
1014
1015
3
    fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> {
1016
3
        let proto_action_result = execute_response
1017
3
            .result
1018
3
            .err_tip(|| "Expected result to be set on ExecuteResponse msg")
?0
;
1019
3
        let action_result = ActionResult {
1020
3
            output_files: proto_action_result
1021
3
                .output_files
1022
3
                .try_map(TryInto::try_into)
?0
,
1023
3
            output_directory_symlinks: proto_action_result
1024
3
                .output_directory_symlinks
1025
3
                .try_map(TryInto::try_into)
?0
,
1026
3
            output_file_symlinks: proto_action_result
1027
3
                .output_file_symlinks
1028
3
                .try_map(TryInto::try_into)
?0
,
1029
3
            output_folders: proto_action_result
1030
3
                .output_directories
1031
3
                .try_map(TryInto::try_into)
?0
,
1032
3
            exit_code: proto_action_result.exit_code,
1033
1034
3
            stdout_digest: proto_action_result
1035
3
                .stdout_digest
1036
3
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")
?0
1037
3
                .try_into()
?0
,
1038
3
            stderr_digest: proto_action_result
1039
3
                .stderr_digest
1040
3
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")
?0
1041
3
                .try_into()
?0
,
1042
3
            execution_metadata: proto_action_result
1043
3
                .execution_metadata
1044
3
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")
?0
1045
3
                .try_into()
?0
,
1046
3
            server_logs: execute_response.server_logs.try_map(|v| 
{2
1047
2
                v.digest
1048
2
                    .err_tip(|| "Expected digest to be set on LogFile msg")
?0
1049
2
                    .try_into()
1050
2
            })
?0
,
1051
3
            error: execute_response
1052
3
                .status
1053
3
                .clone()
1054
3
                .and_then(|v| if v.code == 0 { 
None1
} else {
Some(v.into())2
}),
1055
3
            message: execute_response.message,
1056
        };
1057
1058
3
        if execute_response.cached_result {
1059
0
            return Ok(Self::CompletedFromCache(action_result.try_into()?));
1060
3
        }
1061
3
        Ok(Self::Completed(action_result))
1062
3
    }
1063
}
1064
1065
// TODO: Should be able to remove this after tokio-rs/prost#299
1066
trait TypeUrl: Message {
1067
    const TYPE_URL: &'static str;
1068
}
1069
1070
impl TypeUrl for ExecuteResponse {
1071
    const TYPE_URL: &'static str =
1072
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
1073
}
1074
1075
impl TypeUrl for ExecuteOperationMetadata {
1076
    const TYPE_URL: &'static str =
1077
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
1078
}
1079
1080
2
fn from_any<T>(message: &Any) -> Result<T, Error>
1081
2
where
1082
2
    T: TypeUrl + Default,
1083
{
1084
0
    error_if!(
1085
2
        message.type_url != T::TYPE_URL,
1086
        "Incorrect type when decoding Any. {} != {}",
1087
        message.type_url,
1088
0
        T::TYPE_URL.to_string()
1089
    );
1090
2
    Ok(T::decode(message.value.as_slice())
?0
)
1091
2
}
1092
1093
7
fn to_any<T>(message: &T) -> Any
1094
7
where
1095
7
    T: TypeUrl,
1096
{
1097
7
    Any {
1098
7
        type_url: T::TYPE_URL.to_string(),
1099
7
        value: message.encode_to_vec(),
1100
7
    }
1101
7
}
1102
1103
/// Current state of the action.
1104
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
1105
#[derive(Debug, Clone, Serialize, Deserialize, MetricsComponent)]
1106
pub struct ActionState {
1107
    #[metric(help = "The current stage of the action.")]
1108
    pub stage: ActionStage,
1109
    #[metric(help = "Last time this action changed stage")]
1110
    pub last_transition_timestamp: SystemTime,
1111
    #[metric(help = "The unique identifier of the action.")]
1112
    pub client_operation_id: OperationId,
1113
    #[metric(help = "The digest of the action.")]
1114
    pub action_digest: DigestInfo,
1115
}
1116
1117
impl Display for ActionState {
1118
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1119
0
        write!(
1120
0
            f,
1121
            "stage={} last_transition={} client_operation_id={} action_digest={}",
1122
0
            self.stage.name(),
1123
0
            self.last_transition_timestamp.elapsed().map_or_else(
1124
0
                |_| "<unknown duration>".to_string(),
1125
0
                |d| { format_duration(d).to_string() }
1126
            ),
1127
            self.client_operation_id,
1128
            self.action_digest
1129
        )
1130
0
    }
1131
}
1132
1133
impl PartialOrd for ActionState {
1134
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1135
0
        Some(self.cmp(other))
1136
0
    }
1137
}
1138
1139
impl Ord for ActionState {
1140
0
    fn cmp(&self, other: &Self) -> Ordering {
1141
0
        self.last_transition_timestamp
1142
0
            .cmp(&other.last_transition_timestamp)
1143
0
    }
1144
}
1145
1146
impl PartialEq for ActionState {
1147
24
    fn eq(&self, other: &Self) -> bool {
1148
        // Ignore last_transition_timestamp as the actions can still be the same even if they happened at different times
1149
24
        self.stage == other.stage
1150
24
            && self.client_operation_id == other.client_operation_id
1151
24
            && self.action_digest == other.action_digest
1152
24
    }
1153
}
1154
1155
impl Eq for ActionState {}
1156
1157
impl ActionState {
1158
1
    pub fn try_from_operation(
1159
1
        operation: Operation,
1160
1
        client_operation_id: OperationId,
1161
1
    ) -> Result<Self, Error> {
1162
1
        let metadata = from_any::<ExecuteOperationMetadata>(
1163
1
            &operation
1164
1
                .metadata
1165
1
                .err_tip(|| "No metadata in upstream operation")
?0
,
1166
        )
1167
1
        .err_tip(|| "Could not decode metadata in upstream operation")
?0
;
1168
1169
1
        let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| 
{0
1170
0
            format!(
1171
                "Could not convert {} to execution_stage::Value",
1172
                metadata.stage
1173
            )
1174
0
        })? {
1175
0
            execution_stage::Value::Unknown => ActionStage::Unknown,
1176
0
            execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
1177
0
            execution_stage::Value::Queued => ActionStage::Queued,
1178
0
            execution_stage::Value::Executing => ActionStage::Executing,
1179
            execution_stage::Value::Completed => {
1180
1
                let execute_response = operation
1181
1
                    .result
1182
1
                    .err_tip(|| "No result data for completed upstream action")
?0
;
1183
1
                match execute_response {
1184
0
                    LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
1185
0
                        error: Some(error.into()),
1186
0
                        ..ActionResult::default()
1187
0
                    }),
1188
1
                    LongRunningResult::Response(response) => {
1189
                        // Could be Completed, CompletedFromCache or Error.
1190
1
                        from_any::<ExecuteResponse>(&response)
1191
1
                            .err_tip(|| 
{0
1192
0
                                "Could not decode result structure for completed upstream action"
1193
0
                            })?
1194
1
                            .try_into()
?0
1195
                    }
1196
                }
1197
            }
1198
        };
1199
1200
1
        let action_digest = metadata
1201
1
            .action_digest
1202
1
            .err_tip(|| "No action_digest in upstream operation")
?0
1203
1
            .try_into()
1204
1
            .err_tip(|| "Could not convert action_digest into DigestInfo")
?0
;
1205
1206
1
        Ok(Self {
1207
1
            stage,
1208
1
            client_operation_id,
1209
1
            action_digest,
1210
1
            last_transition_timestamp: SystemTime::now(),
1211
1
        })
1212
1
    }
1213
1214
5
    pub fn as_operation(&self, client_operation_id: OperationId) -> Operation {
1215
5
        let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
1216
5
        let name = client_operation_id.into_string();
1217
1218
5
        let result = if self.stage.has_action_result() {
1219
2
            let execute_response: ExecuteResponse = self.stage.clone().into();
1220
2
            Some(LongRunningResult::Response(to_any(&execute_response)))
1221
        } else {
1222
3
            None
1223
        };
1224
5
        let digest = Some(self.action_digest.into());
1225
1226
5
        let metadata = ExecuteOperationMetadata {
1227
5
            stage,
1228
5
            action_digest: digest,
1229
5
            // TODO(palfrey) We should support stderr/stdout streaming.
1230
5
            stdout_stream_name: String::default(),
1231
5
            stderr_stream_name: String::default(),
1232
5
            partial_execution_metadata: None,
1233
5
        };
1234
1235
5
        Operation {
1236
5
            name,
1237
5
            metadata: Some(to_any(&metadata)),
1238
5
            done: result.is_some(),
1239
5
            result,
1240
5
        }
1241
5
    }
1242
}