Coverage Report

Created: 2025-12-04 21:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/action_messages.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::Ordering;
16
use core::convert::Into;
17
use core::fmt::Display;
18
use core::hash::Hash;
19
use core::time::Duration;
20
use std::collections::HashMap;
21
use std::time::SystemTime;
22
23
use humantime::format_duration;
24
use nativelink_error::{Error, ResultExt, error_if, make_input_err};
25
use nativelink_metric::{
26
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, publish,
27
};
28
use nativelink_proto::build::bazel::remote::execution::v2::{
29
    Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
30
    ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile,
31
    OutputSymlink, SymlinkNode, execution_stage,
32
};
33
use nativelink_proto::google::longrunning::Operation;
34
use nativelink_proto::google::longrunning::operation::Result as LongRunningResult;
35
use nativelink_proto::google::rpc::Status;
36
use prost::Message;
37
use prost::bytes::Bytes;
38
use prost_types::Any;
39
use serde::ser::Error as SerdeError;
40
use serde::{Deserialize, Serialize};
41
use uuid::Uuid;
42
43
use crate::common::{DigestInfo, HashMapExt, VecExt};
44
use crate::digest_hasher::DigestHasherFunc;
45
46
/// Default priority remote execution jobs will get when not provided.
47
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;
48
49
/// Exit code sent if there is an internal error.
50
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;
51
52
/// Holds an id that is unique to the client for a requested operation.
53
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
54
pub enum OperationId {
55
    Uuid(Uuid),
56
    String(String),
57
}
58
59
impl OperationId {
60
1
    pub fn into_string(self) -> String {
61
1
        match self {
62
1
            Self::Uuid(uuid) => uuid.to_string(),
63
0
            Self::String(name) => name,
64
        }
65
1
    }
66
}
67
68
impl Default for OperationId {
69
103
    fn default() -> Self {
70
103
        Self::Uuid(Uuid::new_v4())
71
103
    }
72
}
73
74
impl Display for OperationId {
75
345
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
76
345
        match self {
77
224
            Self::Uuid(uuid) => uuid.fmt(f),
78
121
            Self::String(name) => f.write_str(name),
79
        }
80
345
    }
81
}
82
83
impl MetricsComponent for OperationId {
84
0
    fn publish(
85
0
        &self,
86
0
        _kind: MetricKind,
87
0
        _field_metadata: MetricFieldData,
88
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
89
0
        Ok(MetricPublishKnownKindData::String(self.to_string()))
90
0
    }
91
}
92
93
impl From<&str> for OperationId {
94
42
    fn from(value: &str) -> Self {
95
42
        Uuid::parse_str(value).map_or_else(|_| Self::String(
value21
.
to_string21
()), Self::Uuid)
96
42
    }
97
}
98
99
impl From<String> for OperationId {
100
8
    fn from(value: String) -> Self {
101
8
        Uuid::parse_str(&value).map_or(Self::String(value), Self::Uuid)
102
8
    }
103
}
104
105
impl TryFrom<Bytes> for OperationId {
106
    type Error = Error;
107
108
0
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
109
        // This is an optimized path to attempt to do the conversion in-place
110
        // to avoid an extra allocation/copy.
111
0
        match value.try_into_mut() {
112
            // We are the only reference to the Bytes, so we can convert it into a Vec<u8>
113
            // for free then convert the Vec<u8> to a String for free too.
114
0
            Ok(value) => {
115
0
                let value = String::from_utf8(value.into()).map_err(|e| {
116
0
                    make_input_err!(
117
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
118
                    )
119
0
                })?;
120
0
                Ok(Self::from(value))
121
            }
122
            // We could not take ownership of the Bytes, so we may need to copy our data.
123
0
            Err(value) => {
124
0
                let value = core::str::from_utf8(&value).map_err(|e| {
125
0
                    make_input_err!(
126
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
127
                    )
128
0
                })?;
129
0
                Ok(Self::from(value))
130
            }
131
        }
132
0
    }
133
}
134
135
/// Unique id of worker.
136
#[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
137
pub struct WorkerId(pub String);
138
139
impl MetricsComponent for WorkerId {
140
0
    fn publish(
141
0
        &self,
142
0
        _kind: MetricKind,
143
0
        _field_metadata: MetricFieldData,
144
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
145
0
        Ok(MetricPublishKnownKindData::String(self.0.clone()))
146
0
    }
147
}
148
149
impl Display for WorkerId {
150
31
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
151
31
        f.write_fmt(format_args!("{}", self.0))
152
31
    }
153
}
154
155
impl core::fmt::Debug for WorkerId {
156
6
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
157
6
        Display::fmt(&self, f)
158
6
    }
159
}
160
161
impl From<WorkerId> for String {
162
101
    fn from(val: WorkerId) -> Self {
163
101
        val.0
164
101
    }
165
}
166
167
impl From<String> for WorkerId {
168
0
    fn from(s: String) -> Self {
169
0
        Self(s)
170
0
    }
171
}
172
173
/// Holds the information needed to uniquely identify an action
174
/// and if it is cacheable or not.
175
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
176
pub enum ActionUniqueQualifier {
177
    /// The action is cacheable.
178
    #[serde(alias = "Cachable")] // Pre 0.7.0 spelling
179
    Cacheable(ActionUniqueKey),
180
    /// The action is uncacheable.
181
    #[serde(alias = "Uncachable")] // Pre 0.7.0 spelling
182
    Uncacheable(ActionUniqueKey),
183
}
184
185
impl MetricsComponent for ActionUniqueQualifier {
186
0
    fn publish(
187
0
        &self,
188
0
        _kind: MetricKind,
189
0
        field_metadata: MetricFieldData,
190
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
191
0
        let (cacheable, action) = match self {
192
0
            Self::Cacheable(action) => (true, action),
193
0
            Self::Uncacheable(action) => (false, action),
194
        };
195
0
        publish!(
196
0
            cacheable,
197
0
            &cacheable,
198
0
            MetricKind::Default,
199
0
            "If the action is cacheable.",
200
0
            ""
201
        );
202
0
        action.publish(MetricKind::Component, field_metadata)?;
203
0
        Ok(MetricPublishKnownKindData::Component)
204
0
    }
205
}
206
207
impl ActionUniqueQualifier {
208
    /// Get the `instance_name` of the action.
209
0
    pub const fn instance_name(&self) -> &String {
210
0
        match self {
211
0
            Self::Cacheable(action) | Self::Uncacheable(action) => &action.instance_name,
212
        }
213
0
    }
214
215
    /// Get the digest function of the action.
216
11
    pub const fn digest_function(&self) -> DigestHasherFunc {
217
11
        match self {
218
11
            Self::Cacheable(action) | Self::Uncacheable(
action0
) => action.digest_function,
219
        }
220
11
    }
221
222
    /// Get the digest of the action.
223
137
    pub const fn digest(&self) -> DigestInfo {
224
137
        match self {
225
137
            Self::Cacheable(
action132
) | Self::Uncacheable(
action5
) => action.digest,
226
        }
227
137
    }
228
}
229
230
impl Display for ActionUniqueQualifier {
231
19
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
232
19
        let (cacheable, unique_key) = match self {
233
19
            Self::Cacheable(action) => (true, action),
234
0
            Self::Uncacheable(action) => (false, action),
235
        };
236
19
        f.write_fmt(format_args!(
237
            // Note: We use underscores because it makes escaping easier
238
            // for redis.
239
19
            "{}_{}_{}_{}_{}",
240
            unique_key.instance_name,
241
            unique_key.digest_function,
242
19
            unique_key.digest.packed_hash(),
243
19
            unique_key.digest.size_bytes(),
244
19
            if cacheable { 'c' } else { 
'u'0
},
  Branch (244:16): [True: 19, False: 0]
  Branch (244:16): [Folded - Ignored]
245
        ))
246
19
    }
247
}
248
249
/// This is a utility struct used to make it easier to match `ActionInfos` in a
250
/// `HashMap` without needing to construct an entire `ActionInfo`.
251
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, MetricsComponent)]
252
pub struct ActionUniqueKey {
253
    /// Name of instance group this action belongs to.
254
    #[metric(help = "Name of instance group this action belongs to.")]
255
    pub instance_name: String,
256
    /// The digest function this action expects.
257
    #[metric(help = "The digest function this action expects.")]
258
    pub digest_function: DigestHasherFunc,
259
    /// Digest of the underlying `Action`.
260
    #[metric(help = "Digest of the underlying Action.")]
261
    pub digest: DigestInfo,
262
}
263
264
impl Display for ActionUniqueKey {
265
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
266
0
        f.write_fmt(format_args!(
267
0
            "{}/{}/{}",
268
            self.instance_name, self.digest_function, self.digest,
269
        ))
270
0
    }
271
}
272
273
/// Information needed to execute an action. This struct is used over bazel's proto `Action`
274
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
275
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
276
/// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto`
277
/// except for the salt field.
278
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, MetricsComponent)]
279
pub struct ActionInfo {
280
    /// Digest of the underlying `Command`.
281
    #[metric(help = "Digest of the underlying Command.")]
282
    pub command_digest: DigestInfo,
283
    /// Digest of the underlying `Directory`.
284
    #[metric(help = "Digest of the underlying Directory.")]
285
    pub input_root_digest: DigestInfo,
286
    /// Timeout of the action.
287
    #[metric(help = "Timeout of the action.")]
288
    pub timeout: Duration,
289
    /// The properties rules that must be applied when finding a worker that can run this action.
290
    #[metric(group = "platform_properties")]
291
    pub platform_properties: HashMap<String, String>,
292
    /// The priority of the action. Higher value means it should execute faster.
293
    #[metric(help = "The priority of the action. Higher value means it should execute faster.")]
294
    pub priority: i32,
295
    /// When this action started to be loaded from the CAS.
296
    #[metric(help = "When this action started to be loaded from the CAS.")]
297
    pub load_timestamp: SystemTime,
298
    /// When this action was created.
299
    #[metric(help = "When this action was created.")]
300
    pub insert_timestamp: SystemTime,
301
    /// Info used to uniquely identify this `ActionInfo` and if it is cacheable.
302
    /// This is primarily used to join actions/operations together using this key.
303
    #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cacheable.")]
304
    pub unique_qualifier: ActionUniqueQualifier,
305
}
306
307
impl ActionInfo {
308
    #[inline]
309
0
    pub const fn instance_name(&self) -> &String {
310
0
        self.unique_qualifier.instance_name()
311
0
    }
312
313
    /// Returns the underlying digest of the `Action`.
314
    #[inline]
315
90
    pub const fn digest(&self) -> DigestInfo {
316
90
        self.unique_qualifier.digest()
317
90
    }
318
319
19
    pub fn try_from_action_and_execute_request(
320
19
        execute_request: ExecuteRequest,
321
19
        action: Action,
322
19
        load_timestamp: SystemTime,
323
19
        queued_timestamp: SystemTime,
324
19
    ) -> Result<Self, Error> {
325
19
        let unique_key = ActionUniqueKey {
326
19
            instance_name: execute_request.instance_name,
327
19
            digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
328
19
                .err_tip(|| format!(
"Could not find digest_function in try_from_action_and_execute_request {:?}"0
, execute_request.digest_function))
?0
,
329
19
            digest: execute_request
330
19
                .action_digest
331
19
                .err_tip(|| "Expected action_digest to exist on ExecuteRequest")
?0
332
19
                .try_into()
?0
,
333
        };
334
19
        let unique_qualifier = if execute_request.skip_cache_lookup {
  Branch (334:35): [True: 0, False: 19]
  Branch (334:35): [Folded - Ignored]
335
0
            ActionUniqueQualifier::Uncacheable(unique_key)
336
        } else {
337
19
            ActionUniqueQualifier::Cacheable(unique_key)
338
        };
339
340
19
        let proto_properties = action.platform.unwrap_or_default();
341
19
        let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len());
342
20
        for 
property1
in proto_properties.properties {
343
1
            platform_properties.insert(property.name, property.value);
344
1
        }
345
346
        Ok(Self {
347
19
            command_digest: action
348
19
                .command_digest
349
19
                .err_tip(|| "Expected command_digest to exist on Action")
?0
350
19
                .try_into()
?0
,
351
19
            input_root_digest: action
352
19
                .input_root_digest
353
19
                .err_tip(|| "Expected input_root_digest to exist on Action")
?0
354
19
                .try_into()
?0
,
355
19
            timeout: action
356
19
                .timeout
357
19
                .unwrap_or_default()
358
19
                .try_into()
359
19
                .map_err(|_| make_input_err!("Failed convert proto duration to system duration"))
?0
,
360
19
            platform_properties,
361
19
            priority: execute_request
362
19
                .execution_policy
363
19
                .unwrap_or_default()
364
19
                .priority,
365
19
            load_timestamp,
366
19
            insert_timestamp: queued_timestamp,
367
19
            unique_qualifier,
368
        })
369
19
    }
370
}
371
372
impl From<&ActionInfo> for ExecuteRequest {
373
36
    fn from(val: &ActionInfo) -> Self {
374
36
        let digest = val.digest().into();
375
36
        let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier {
376
31
            ActionUniqueQualifier::Cacheable(unique_qualifier) => (false, unique_qualifier),
377
5
            ActionUniqueQualifier::Uncacheable(unique_qualifier) => (true, unique_qualifier),
378
        };
379
36
        Self {
380
36
            instance_name: unique_qualifier.instance_name.clone(),
381
36
            action_digest: Some(digest),
382
36
            skip_cache_lookup,
383
36
            execution_policy: None,     // Not used in the worker.
384
36
            results_cache_policy: None, // Not used in the worker.
385
36
            digest_function: unique_qualifier.digest_function.proto_digest_func().into(),
386
36
        }
387
36
    }
388
}
389
390
/// Simple utility struct to determine if a string is representing a full path or
391
/// just the name of the file.
392
/// This is in order to be able to reuse the same struct instead of building different
393
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
394
/// structs.
395
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
396
pub enum NameOrPath {
397
    Name(String),
398
    Path(String),
399
}
400
401
impl PartialOrd for NameOrPath {
402
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
403
0
        Some(self.cmp(other))
404
0
    }
405
}
406
407
impl Ord for NameOrPath {
408
0
    fn cmp(&self, other: &Self) -> Ordering {
409
0
        let self_lexical_name = match self {
410
0
            Self::Name(name) => name,
411
0
            Self::Path(path) => path,
412
        };
413
0
        let other_lexical_name = match other {
414
0
            Self::Name(name) => name,
415
0
            Self::Path(path) => path,
416
        };
417
0
        self_lexical_name.cmp(other_lexical_name)
418
0
    }
419
}
420
421
/// Represents an individual file and associated metadata.
422
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs
423
/// in `remote_execution.proto`.
424
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
425
pub struct FileInfo {
426
    pub name_or_path: NameOrPath,
427
    pub digest: DigestInfo,
428
    pub is_executable: bool,
429
}
430
431
impl TryFrom<FileInfo> for FileNode {
432
    type Error = Error;
433
434
3
    fn try_from(val: FileInfo) -> Result<Self, Error> {
435
3
        match val.name_or_path {
436
0
            NameOrPath::Path(_) => Err(make_input_err!(
437
0
                "Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
438
0
            )),
439
3
            NameOrPath::Name(name) => Ok(Self {
440
3
                name,
441
3
                digest: Some((&val.digest).into()),
442
3
                is_executable: val.is_executable,
443
3
                node_properties: None, // Not supported.
444
3
            }),
445
        }
446
3
    }
447
}
448
449
impl TryFrom<OutputFile> for FileInfo {
450
    type Error = Error;
451
452
2
    fn try_from(output_file: OutputFile) -> Result<Self, Error> {
453
        Ok(Self {
454
2
            name_or_path: NameOrPath::Path(output_file.path),
455
2
            digest: output_file
456
2
                .digest
457
2
                .err_tip(|| "Expected digest to exist on OutputFile")
?0
458
2
                .try_into()
?0
,
459
2
            is_executable: output_file.is_executable,
460
        })
461
2
    }
462
}
463
464
impl TryFrom<FileInfo> for OutputFile {
465
    type Error = Error;
466
467
7
    fn try_from(val: FileInfo) -> Result<Self, Error> {
468
7
        match val.name_or_path {
469
0
            NameOrPath::Name(_) => Err(make_input_err!(
470
0
                "Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
471
0
            )),
472
7
            NameOrPath::Path(path) => Ok(Self {
473
7
                path,
474
7
                digest: Some((&val.digest).into()),
475
7
                is_executable: val.is_executable,
476
7
                contents: Bytes::default(),
477
7
                node_properties: None, // Not supported.
478
7
            }),
479
        }
480
7
    }
481
}
482
483
/// Represents an individual symlink file and associated metadata.
484
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
485
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
486
pub struct SymlinkInfo {
487
    pub name_or_path: NameOrPath,
488
    pub target: String,
489
}
490
491
impl TryFrom<SymlinkNode> for SymlinkInfo {
492
    type Error = Error;
493
494
0
    fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
495
0
        Ok(Self {
496
0
            name_or_path: NameOrPath::Name(symlink_node.name),
497
0
            target: symlink_node.target,
498
0
        })
499
0
    }
500
}
501
502
impl TryFrom<SymlinkInfo> for SymlinkNode {
503
    type Error = Error;
504
505
1
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
506
1
        match val.name_or_path {
507
0
            NameOrPath::Path(_) => Err(make_input_err!(
508
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
509
0
            )),
510
1
            NameOrPath::Name(name) => Ok(Self {
511
1
                name,
512
1
                target: val.target,
513
1
                node_properties: None, // Not supported.
514
1
            }),
515
        }
516
1
    }
517
}
518
519
impl TryFrom<OutputSymlink> for SymlinkInfo {
520
    type Error = Error;
521
522
2
    fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
523
2
        Ok(Self {
524
2
            name_or_path: NameOrPath::Path(output_symlink.path),
525
2
            target: output_symlink.target,
526
2
        })
527
2
    }
528
}
529
530
impl TryFrom<SymlinkInfo> for OutputSymlink {
531
    type Error = Error;
532
533
2
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
534
2
        match val.name_or_path {
535
2
            NameOrPath::Path(path) => {
536
2
                Ok(Self {
537
2
                    path,
538
2
                    target: val.target,
539
2
                    node_properties: None, // Not supported.
540
2
                })
541
            }
542
0
            NameOrPath::Name(_) => Err(make_input_err!(
543
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
544
0
            )),
545
        }
546
2
    }
547
}
548
549
/// Represents an individual directory file and associated metadata.
550
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
551
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
552
pub struct DirectoryInfo {
553
    pub path: String,
554
    pub tree_digest: DigestInfo,
555
}
556
557
impl TryFrom<OutputDirectory> for DirectoryInfo {
558
    type Error = Error;
559
560
2
    fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> {
561
        Ok(Self {
562
2
            path: output_directory.path,
563
2
            tree_digest: output_directory
564
2
                .tree_digest
565
2
                .err_tip(|| "Expected tree_digest to exist in OutputDirectory")
?0
566
2
                .try_into()
?0
,
567
        })
568
2
    }
569
}
570
571
impl From<DirectoryInfo> for OutputDirectory {
572
1
    fn from(val: DirectoryInfo) -> Self {
573
1
        Self {
574
1
            path: val.path,
575
1
            tree_digest: Some(val.tree_digest.into()),
576
1
            is_topologically_sorted: false,
577
1
        }
578
1
    }
579
}
580
581
/// Represents the metadata associated with the execution result.
582
/// This struct must be 100% compatible with `ExecutedActionMetadata`.
583
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
584
pub struct ExecutionMetadata {
585
    pub worker: String,
586
    pub queued_timestamp: SystemTime,
587
    pub worker_start_timestamp: SystemTime,
588
    pub worker_completed_timestamp: SystemTime,
589
    pub input_fetch_start_timestamp: SystemTime,
590
    pub input_fetch_completed_timestamp: SystemTime,
591
    pub execution_start_timestamp: SystemTime,
592
    pub execution_completed_timestamp: SystemTime,
593
    pub output_upload_start_timestamp: SystemTime,
594
    pub output_upload_completed_timestamp: SystemTime,
595
}
596
597
impl Default for ExecutionMetadata {
598
2
    fn default() -> Self {
599
2
        Self {
600
2
            worker: String::new(),
601
2
            queued_timestamp: SystemTime::UNIX_EPOCH,
602
2
            worker_start_timestamp: SystemTime::UNIX_EPOCH,
603
2
            worker_completed_timestamp: SystemTime::UNIX_EPOCH,
604
2
            input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
605
2
            input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
606
2
            execution_start_timestamp: SystemTime::UNIX_EPOCH,
607
2
            execution_completed_timestamp: SystemTime::UNIX_EPOCH,
608
2
            output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
609
2
            output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
610
2
        }
611
2
    }
612
}
613
614
impl From<ExecutionMetadata> for ExecutedActionMetadata {
615
17
    fn from(val: ExecutionMetadata) -> Self {
616
        Self {
617
17
            worker: val.worker,
618
17
            queued_timestamp: Some(val.queued_timestamp.into()),
619
17
            worker_start_timestamp: Some(val.worker_start_timestamp.into()),
620
17
            worker_completed_timestamp: Some(val.worker_completed_timestamp.into()),
621
17
            input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()),
622
17
            input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()),
623
17
            execution_start_timestamp: Some(val.execution_start_timestamp.into()),
624
17
            execution_completed_timestamp: Some(val.execution_completed_timestamp.into()),
625
17
            output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()),
626
17
            output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()),
627
17
            virtual_execution_duration: val
628
17
                .execution_completed_timestamp
629
17
                .duration_since(val.execution_start_timestamp)
630
17
                .ok()
631
17
                .and_then(|duration| prost_types::Duration::try_from(duration).ok()),
632
17
            auxiliary_metadata: Vec::default(),
633
        }
634
17
    }
635
}
636
637
impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
638
    type Error = Error;
639
640
3
    fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> {
641
        Ok(Self {
642
3
            worker: eam.worker,
643
3
            queued_timestamp: eam
644
3
                .queued_timestamp
645
3
                .err_tip(|| "Expected queued_timestamp to exist in ExecutedActionMetadata")
?0
646
3
                .try_into()
?0
,
647
3
            worker_start_timestamp: eam
648
3
                .worker_start_timestamp
649
3
                .err_tip(|| "Expected worker_start_timestamp to exist in ExecutedActionMetadata")
?0
650
3
                .try_into()
?0
,
651
3
            worker_completed_timestamp: eam
652
3
                .worker_completed_timestamp
653
3
                .err_tip(|| 
{0
654
0
                    "Expected worker_completed_timestamp to exist in ExecutedActionMetadata"
655
0
                })?
656
3
                .try_into()
?0
,
657
3
            input_fetch_start_timestamp: eam
658
3
                .input_fetch_start_timestamp
659
3
                .err_tip(|| 
{0
660
0
                    "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata"
661
0
                })?
662
3
                .try_into()
?0
,
663
3
            input_fetch_completed_timestamp: eam
664
3
                .input_fetch_completed_timestamp
665
3
                .err_tip(|| 
{0
666
0
                    "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata"
667
0
                })?
668
3
                .try_into()
?0
,
669
3
            execution_start_timestamp: eam
670
3
                .execution_start_timestamp
671
3
                .err_tip(|| 
{0
672
0
                    "Expected execution_start_timestamp to exist in ExecutedActionMetadata"
673
0
                })?
674
3
                .try_into()
?0
,
675
3
            execution_completed_timestamp: eam
676
3
                .execution_completed_timestamp
677
3
                .err_tip(|| 
{0
678
0
                    "Expected execution_completed_timestamp to exist in ExecutedActionMetadata"
679
0
                })?
680
3
                .try_into()
?0
,
681
3
            output_upload_start_timestamp: eam
682
3
                .output_upload_start_timestamp
683
3
                .err_tip(|| 
{0
684
0
                    "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata"
685
0
                })?
686
3
                .try_into()
?0
,
687
3
            output_upload_completed_timestamp: eam
688
3
                .output_upload_completed_timestamp
689
3
                .err_tip(|| 
{0
690
0
                    "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata"
691
0
                })?
692
3
                .try_into()
?0
,
693
        })
694
3
    }
695
}
696
697
/// Represents the results of an execution.
698
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
699
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
700
pub struct ActionResult {
701
    pub output_files: Vec<FileInfo>,
702
    pub output_folders: Vec<DirectoryInfo>,
703
    pub output_directory_symlinks: Vec<SymlinkInfo>,
704
    pub output_file_symlinks: Vec<SymlinkInfo>,
705
    pub exit_code: i32,
706
    pub stdout_digest: DigestInfo,
707
    pub stderr_digest: DigestInfo,
708
    pub execution_metadata: ExecutionMetadata,
709
    pub server_logs: HashMap<String, DigestInfo>,
710
    pub error: Option<Error>,
711
    pub message: String,
712
}
713
714
impl Default for ActionResult {
715
8
    fn default() -> Self {
716
8
        Self {
717
8
            output_files: Vec::default(),
718
8
            output_folders: Vec::default(),
719
8
            output_directory_symlinks: Vec::default(),
720
8
            output_file_symlinks: Vec::default(),
721
8
            exit_code: INTERNAL_ERROR_EXIT_CODE,
722
8
            stdout_digest: DigestInfo::new([0u8; 32], 0),
723
8
            stderr_digest: DigestInfo::new([0u8; 32], 0),
724
8
            execution_metadata: ExecutionMetadata {
725
8
                worker: String::new(),
726
8
                queued_timestamp: SystemTime::UNIX_EPOCH,
727
8
                worker_start_timestamp: SystemTime::UNIX_EPOCH,
728
8
                worker_completed_timestamp: SystemTime::UNIX_EPOCH,
729
8
                input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
730
8
                input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
731
8
                execution_start_timestamp: SystemTime::UNIX_EPOCH,
732
8
                execution_completed_timestamp: SystemTime::UNIX_EPOCH,
733
8
                output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
734
8
                output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
735
8
            },
736
8
            server_logs: HashMap::default(),
737
8
            error: None,
738
8
            message: String::new(),
739
8
        }
740
8
    }
741
}
742
743
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
744
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
745
#[allow(
746
    clippy::large_enum_variant,
747
    reason = "TODO box the two relevant variants in a breaking release. Unfulfilled on nightly"
748
)]
749
pub enum ActionStage {
750
    /// Stage is unknown.
751
    Unknown,
752
    /// Checking the cache to see if action exists.
753
    CacheCheck,
754
    /// Action has been accepted and waiting for worker to take it.
755
    Queued,
756
    // TODO(palfrey) We need a way to know if the job was sent to a worker, but hasn't begun
757
    // execution yet.
758
    /// Worker is executing the action.
759
    Executing,
760
    /// Worker completed the work with result.
761
    Completed(ActionResult),
762
    /// Result was found from cache, don't decode the proto just to re-encode it.
763
    #[serde(serialize_with = "serialize_proto_result", skip_deserializing)]
764
    // The serialization step decodes this to an ActionResult which is serializable.
765
    // Since it will always be serialized as an ActionResult, we do not need to support
766
    // deserialization on this type at all.
767
    // In theory, serializing this should never happen so performance shouldn't be affected.
768
    CompletedFromCache(ProtoActionResult),
769
}
770
771
0
fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error>
772
0
where
773
0
    S: serde::Serializer,
774
{
775
0
    let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?;
776
0
    s.serialize(serializer)
777
0
}
778
779
impl ActionStage {
780
100
    pub const fn has_action_result(&self) -> bool {
781
100
        match self {
782
86
            Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
783
14
            Self::Completed(_) | Self::CompletedFromCache(_) => true,
784
        }
785
100
    }
786
787
    /// Returns true if the worker considers the action done and no longer needs to be tracked.
788
    // Note: This function is separate from `has_action_result()` to not mix the concept of
789
    //       "finished" with "has a result".
790
99
    pub const fn is_finished(&self) -> bool {
791
99
        self.has_action_result()
792
99
    }
793
794
    /// Returns if the stage enum is the same as the other stage enum, but
795
    /// does not compare the values of the enum.
796
39
    pub const fn is_same_stage(&self, other: &Self) -> bool {
797
39
        matches!(
798
39
            (self, other),
799
            (Self::Unknown, Self::Unknown)
800
                | (Self::CacheCheck, Self::CacheCheck)
801
                | (Self::Queued, Self::Queued)
802
                | (Self::Executing, Self::Executing)
803
                | (Self::Completed(_), Self::Completed(_))
804
                | (Self::CompletedFromCache(_), Self::CompletedFromCache(_))
805
        )
806
39
    }
807
808
201
    pub fn name(&self) -> String {
809
201
        match self {
810
0
            Self::Unknown => "Unknown".to_string(),
811
0
            Self::CacheCheck => "CacheCheck".to_string(),
812
35
            Self::Queued => "Queued".to_string(),
813
152
            Self::Executing => "Executing".to_string(),
814
14
            Self::Completed(_) => "Completed".to_string(),
815
0
            Self::CompletedFromCache(_) => "CompletedFromCache".to_string(),
816
        }
817
201
    }
818
}
819
820
impl MetricsComponent for ActionStage {
821
0
    fn publish(
822
0
        &self,
823
0
        _kind: MetricKind,
824
0
        _field_metadata: MetricFieldData,
825
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
826
0
        Ok(MetricPublishKnownKindData::String(self.name()))
827
0
    }
828
}
829
830
impl From<&ActionStage> for execution_stage::Value {
831
1
    fn from(val: &ActionStage) -> Self {
832
1
        match val {
833
0
            ActionStage::Unknown => Self::Unknown,
834
0
            ActionStage::CacheCheck => Self::CacheCheck,
835
0
            ActionStage::Queued => Self::Queued,
836
0
            ActionStage::Executing => Self::Executing,
837
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
838
        }
839
1
    }
840
}
841
842
11
pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse {
843
11
    fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> {
844
11
        let mut logs = HashMap::with_capacity(server_logs.len());
845
12
        for (
k1
,
v1
) in server_logs {
846
1
            logs.insert(
847
1
                k.clone(),
848
1
                LogFile {
849
1
                    digest: Some(v.into()),
850
1
                    human_readable: false,
851
1
                },
852
1
            );
853
1
        }
854
11
        logs
855
11
    }
856
857
11
    let status = Some(
858
11
        action_result
859
11
            .error
860
11
            .clone()
861
11
            .map_or_else(Status::default, Into::into),
862
11
    );
863
11
    let message = action_result.message.clone();
864
11
    ExecuteResponse {
865
11
        server_logs: logs_from(action_result.server_logs.clone()),
866
11
        result: action_result.try_into().ok(),
867
11
        cached_result: false,
868
11
        status,
869
11
        message,
870
11
    }
871
11
}
872
873
impl From<ActionStage> for ExecuteResponse {
874
6
    fn from(val: ActionStage) -> Self {
875
6
        match val {
876
            // We don't have an execute response if we don't have the results. It is defined
877
            // behavior to return an empty proto struct.
878
            ActionStage::Unknown
879
            | ActionStage::CacheCheck
880
            | ActionStage::Queued
881
0
            | ActionStage::Executing => Self::default(),
882
6
            ActionStage::Completed(action_result) => to_execute_response(action_result),
883
            // Handled separately as there are no server logs and the action
884
            // result is already in Proto format.
885
0
            ActionStage::CompletedFromCache(proto_action_result) => Self {
886
0
                server_logs: HashMap::new(),
887
0
                result: Some(proto_action_result),
888
0
                cached_result: true,
889
0
                status: Some(Status::default()),
890
0
                message: String::new(), // Will be populated later if applicable.
891
0
            },
892
        }
893
6
    }
894
}
895
896
impl TryFrom<ActionResult> for ProtoActionResult {
897
    type Error = Error;
898
899
17
    fn try_from(val: ActionResult) -> Result<Self, Error> {
900
17
        let mut output_symlinks = Vec::with_capacity(
901
17
            val.output_file_symlinks.len() + val.output_directory_symlinks.len(),
902
        );
903
17
        output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice());
904
17
        output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice());
905
906
        Ok(Self {
907
17
            output_files: val
908
17
                .output_files
909
17
                .into_iter()
910
17
                .map(TryInto::try_into)
911
17
                .collect::<Result<_, _>>()
?0
,
912
17
            output_file_symlinks: val
913
17
                .output_file_symlinks
914
17
                .into_iter()
915
17
                .map(TryInto::try_into)
916
17
                .collect::<Result<_, _>>()
?0
,
917
17
            output_symlinks: output_symlinks
918
17
                .into_iter()
919
17
                .map(TryInto::try_into)
920
17
                .collect::<Result<_, _>>()
?0
,
921
17
            output_directories: val
922
17
                .output_folders
923
17
                .into_iter()
924
17
                .map(TryInto::try_into)
925
17
                .collect::<Result<_, _>>()
?0
,
926
17
            output_directory_symlinks: val
927
17
                .output_directory_symlinks
928
17
                .into_iter()
929
17
                .map(TryInto::try_into)
930
17
                .collect::<Result<_, _>>()
?0
,
931
17
            exit_code: val.exit_code,
932
17
            stdout_raw: Bytes::default(),
933
17
            stdout_digest: Some(val.stdout_digest.into()),
934
17
            stderr_raw: Bytes::default(),
935
17
            stderr_digest: Some(val.stderr_digest.into()),
936
17
            execution_metadata: Some(val.execution_metadata.into()),
937
        })
938
17
    }
939
}
940
941
impl TryFrom<ProtoActionResult> for ActionResult {
942
    type Error = Error;
943
944
0
    fn try_from(val: ProtoActionResult) -> Result<Self, Error> {
945
0
        let output_file_symlinks = val
946
0
            .output_file_symlinks
947
0
            .into_iter()
948
0
            .map(|output_symlink| {
949
0
                SymlinkInfo::try_from(output_symlink)
950
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
951
0
            })
952
0
            .collect::<Result<Vec<_>, _>>()?;
953
954
0
        let output_directory_symlinks = val
955
0
            .output_directory_symlinks
956
0
            .into_iter()
957
0
            .map(|output_symlink| {
958
0
                SymlinkInfo::try_from(output_symlink)
959
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
960
0
            })
961
0
            .collect::<Result<Vec<_>, _>>()?;
962
963
0
        let output_files = val
964
0
            .output_files
965
0
            .into_iter()
966
0
            .map(|output_file| {
967
0
                output_file
968
0
                    .try_into()
969
0
                    .err_tip(|| "Output File could not be converted")
970
0
            })
971
0
            .collect::<Result<Vec<_>, _>>()?;
972
973
0
        let output_folders = val
974
0
            .output_directories
975
0
            .into_iter()
976
0
            .map(|output_directory| {
977
0
                output_directory
978
0
                    .try_into()
979
0
                    .err_tip(|| "Output File could not be converted")
980
0
            })
981
0
            .collect::<Result<Vec<_>, _>>()?;
982
983
        Ok(Self {
984
0
            output_files,
985
0
            output_folders,
986
0
            output_file_symlinks,
987
0
            output_directory_symlinks,
988
0
            exit_code: val.exit_code,
989
0
            stdout_digest: val
990
0
                .stdout_digest
991
0
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?
992
0
                .try_into()?,
993
0
            stderr_digest: val
994
0
                .stderr_digest
995
0
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?
996
0
                .try_into()?,
997
0
            execution_metadata: val
998
0
                .execution_metadata
999
0
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?
1000
0
                .try_into()?,
1001
0
            server_logs: HashMap::default(),
1002
0
            error: None,
1003
0
            message: String::new(),
1004
        })
1005
0
    }
1006
}
1007
1008
impl TryFrom<ExecuteResponse> for ActionStage {
1009
    type Error = Error;
1010
1011
3
    fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> {
1012
3
        let proto_action_result = execute_response
1013
3
            .result
1014
3
            .err_tip(|| "Expected result to be set on ExecuteResponse msg")
?0
;
1015
3
        let action_result = ActionResult {
1016
3
            output_files: proto_action_result
1017
3
                .output_files
1018
3
                .try_map(TryInto::try_into)
?0
,
1019
3
            output_directory_symlinks: proto_action_result
1020
3
                .output_directory_symlinks
1021
3
                .try_map(TryInto::try_into)
?0
,
1022
3
            output_file_symlinks: proto_action_result
1023
3
                .output_file_symlinks
1024
3
                .try_map(TryInto::try_into)
?0
,
1025
3
            output_folders: proto_action_result
1026
3
                .output_directories
1027
3
                .try_map(TryInto::try_into)
?0
,
1028
3
            exit_code: proto_action_result.exit_code,
1029
1030
3
            stdout_digest: proto_action_result
1031
3
                .stdout_digest
1032
3
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")
?0
1033
3
                .try_into()
?0
,
1034
3
            stderr_digest: proto_action_result
1035
3
                .stderr_digest
1036
3
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")
?0
1037
3
                .try_into()
?0
,
1038
3
            execution_metadata: proto_action_result
1039
3
                .execution_metadata
1040
3
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")
?0
1041
3
                .try_into()
?0
,
1042
3
            server_logs: execute_response.server_logs.try_map(|v| 
{2
1043
2
                v.digest
1044
2
                    .err_tip(|| "Expected digest to be set on LogFile msg")
?0
1045
2
                    .try_into()
1046
2
            })
?0
,
1047
3
            error: execute_response
1048
3
                .status
1049
3
                .clone()
1050
3
                .and_then(|v| if v.code == 0 { 
None1
} else {
Some(v.into())2
}),
  Branch (1050:34): [True: 1, False: 2]
  Branch (1050:34): [Folded - Ignored]
1051
3
            message: execute_response.message,
1052
        };
1053
1054
3
        if execute_response.cached_result {
  Branch (1054:12): [True: 0, False: 3]
  Branch (1054:12): [Folded - Ignored]
1055
0
            return Ok(Self::CompletedFromCache(action_result.try_into()?));
1056
3
        }
1057
3
        Ok(Self::Completed(action_result))
1058
3
    }
1059
}
1060
1061
// TODO: Should be able to remove this after tokio-rs/prost#299
1062
trait TypeUrl: Message {
1063
    const TYPE_URL: &'static str;
1064
}
1065
1066
impl TypeUrl for ExecuteResponse {
1067
    const TYPE_URL: &'static str =
1068
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
1069
}
1070
1071
impl TypeUrl for ExecuteOperationMetadata {
1072
    const TYPE_URL: &'static str =
1073
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
1074
}
1075
1076
2
fn from_any<T>(message: &Any) -> Result<T, Error>
1077
2
where
1078
2
    T: TypeUrl + Default,
1079
{
1080
0
    error_if!(
1081
2
        message.type_url != T::TYPE_URL,
  Branch (1081:9): [True: 0, False: 1]
  Branch (1081:9): [True: 0, False: 1]
  Branch (1081:9): [Folded - Ignored]
1082
        "Incorrect type when decoding Any. {} != {}",
1083
        message.type_url,
1084
0
        T::TYPE_URL.to_string()
1085
    );
1086
2
    Ok(T::decode(message.value.as_slice())
?0
)
1087
2
}
1088
1089
2
fn to_any<T>(message: &T) -> Any
1090
2
where
1091
2
    T: TypeUrl,
1092
{
1093
2
    Any {
1094
2
        type_url: T::TYPE_URL.to_string(),
1095
2
        value: message.encode_to_vec(),
1096
2
    }
1097
2
}
1098
1099
/// Current state of the action.
1100
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
1101
#[derive(Debug, Clone, Serialize, Deserialize, MetricsComponent)]
1102
pub struct ActionState {
1103
    #[metric(help = "The current stage of the action.")]
1104
    pub stage: ActionStage,
1105
    #[metric(help = "Last time this action changed stage")]
1106
    pub last_transition_timestamp: SystemTime,
1107
    #[metric(help = "The unique identifier of the action.")]
1108
    pub client_operation_id: OperationId,
1109
    #[metric(help = "The digest of the action.")]
1110
    pub action_digest: DigestInfo,
1111
}
1112
1113
impl Display for ActionState {
1114
96
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1115
96
        write!(
1116
96
            f,
1117
96
            "stage={} last_transition={} client_operation_id={} action_digest={}",
1118
96
            self.stage.name(),
1119
96
            self.last_transition_timestamp.elapsed().map_or_else(
1120
0
                |_| "<unknown duration>".to_string(),
1121
96
                |d| { format_duration(d).to_string() }
1122
            ),
1123
            self.client_operation_id,
1124
            self.action_digest
1125
        )
1126
96
    }
1127
}
1128
1129
impl PartialOrd for ActionState {
1130
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1131
0
        Some(self.cmp(other))
1132
0
    }
1133
}
1134
1135
impl Ord for ActionState {
1136
9
    fn cmp(&self, other: &Self) -> Ordering {
1137
9
        self.last_transition_timestamp
1138
9
            .cmp(&other.last_transition_timestamp)
1139
9
    }
1140
}
1141
1142
impl PartialEq for ActionState {
1143
24
    fn eq(&self, other: &Self) -> bool {
1144
        // Ignore last_transition_timestamp as the actions can still be the same even if they happened at different times
1145
24
        self.stage == other.stage
  Branch (1145:9): [True: 24, False: 0]
  Branch (1145:9): [Folded - Ignored]
1146
24
            && self.client_operation_id == other.client_operation_id
  Branch (1146:16): [True: 24, False: 0]
  Branch (1146:16): [Folded - Ignored]
1147
24
            && self.action_digest == other.action_digest
1148
24
    }
1149
}
1150
1151
impl Eq for ActionState {}
1152
1153
impl ActionState {
1154
1
    pub fn try_from_operation(
1155
1
        operation: Operation,
1156
1
        client_operation_id: OperationId,
1157
1
    ) -> Result<Self, Error> {
1158
1
        let metadata = from_any::<ExecuteOperationMetadata>(
1159
1
            &operation
1160
1
                .metadata
1161
1
                .err_tip(|| "No metadata in upstream operation")
?0
,
1162
        )
1163
1
        .err_tip(|| "Could not decode metadata in upstream operation")
?0
;
1164
1165
1
        let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| 
{0
1166
0
            format!(
1167
0
                "Could not convert {} to execution_stage::Value",
1168
                metadata.stage
1169
            )
1170
0
        })? {
1171
0
            execution_stage::Value::Unknown => ActionStage::Unknown,
1172
0
            execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
1173
0
            execution_stage::Value::Queued => ActionStage::Queued,
1174
0
            execution_stage::Value::Executing => ActionStage::Executing,
1175
            execution_stage::Value::Completed => {
1176
1
                let execute_response = operation
1177
1
                    .result
1178
1
                    .err_tip(|| "No result data for completed upstream action")
?0
;
1179
1
                match execute_response {
1180
0
                    LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
1181
0
                        error: Some(error.into()),
1182
0
                        ..ActionResult::default()
1183
0
                    }),
1184
1
                    LongRunningResult::Response(response) => {
1185
                        // Could be Completed, CompletedFromCache or Error.
1186
1
                        from_any::<ExecuteResponse>(&response)
1187
1
                            .err_tip(|| 
{0
1188
0
                                "Could not decode result structure for completed upstream action"
1189
0
                            })?
1190
1
                            .try_into()
?0
1191
                    }
1192
                }
1193
            }
1194
        };
1195
1196
1
        let action_digest = metadata
1197
1
            .action_digest
1198
1
            .err_tip(|| "No action_digest in upstream operation")
?0
1199
1
            .try_into()
1200
1
            .err_tip(|| "Could not convert action_digest into DigestInfo")
?0
;
1201
1202
1
        Ok(Self {
1203
1
            stage,
1204
1
            client_operation_id,
1205
1
            action_digest,
1206
1
            last_transition_timestamp: SystemTime::now(),
1207
1
        })
1208
1
    }
1209
1210
1
    pub fn as_operation(&self, client_operation_id: OperationId) -> Operation {
1211
1
        let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
1212
1
        let name = client_operation_id.into_string();
1213
1214
1
        let result = if self.stage.has_action_result() {
  Branch (1214:25): [True: 1, False: 0]
  Branch (1214:25): [Folded - Ignored]
1215
1
            let execute_response: ExecuteResponse = self.stage.clone().into();
1216
1
            Some(LongRunningResult::Response(to_any(&execute_response)))
1217
        } else {
1218
0
            None
1219
        };
1220
1
        let digest = Some(self.action_digest.into());
1221
1222
1
        let metadata = ExecuteOperationMetadata {
1223
1
            stage,
1224
1
            action_digest: digest,
1225
1
            // TODO(palfrey) We should support stderr/stdout streaming.
1226
1
            stdout_stream_name: String::default(),
1227
1
            stderr_stream_name: String::default(),
1228
1
            partial_execution_metadata: None,
1229
1
        };
1230
1231
1
        Operation {
1232
1
            name,
1233
1
            metadata: Some(to_any(&metadata)),
1234
1
            done: result.is_some(),
1235
1
            result,
1236
1
        }
1237
1
    }
1238
}