Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-util/src/action_messages.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp::Ordering;
16
use std::collections::HashMap;
17
use std::convert::Into;
18
use std::hash::Hash;
19
use std::time::{Duration, SystemTime};
20
21
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
22
use nativelink_metric::{
23
    publish, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::{
26
    execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata,
27
    ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory,
28
    OutputFile, OutputSymlink, SymlinkNode,
29
};
30
use nativelink_proto::google::longrunning::operation::Result as LongRunningResult;
31
use nativelink_proto::google::longrunning::Operation;
32
use nativelink_proto::google::rpc::Status;
33
use prost::bytes::Bytes;
34
use prost::Message;
35
use prost_types::Any;
36
use serde::ser::Error as SerdeError;
37
use serde::{Deserialize, Serialize};
38
use uuid::Uuid;
39
40
use crate::common::{DigestInfo, HashMapExt, VecExt};
41
use crate::digest_hasher::DigestHasherFunc;
42
43
/// Default priority remote execution jobs will get when not provided.
44
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;
45
46
/// Exit code sent if there is an internal error.
47
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;
48
49
/// Holds an id that is unique to the client for a requested operation.
50
8
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
51
pub enum OperationId {
52
    Uuid(Uuid),
53
    String(String),
54
}
55
56
impl OperationId {
57
1
    pub fn into_string(self) -> String {
58
1
        match self {
59
1
            Self::Uuid(uuid) => uuid.to_string(),
60
0
            Self::String(name) => name,
61
        }
62
1
    }
63
}
64
65
impl Default for OperationId {
66
84
    fn default() -> Self {
67
84
        Self::Uuid(Uuid::new_v4())
68
84
    }
69
}
70
71
impl std::fmt::Display for OperationId {
72
60
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73
60
        match self {
74
53
            Self::Uuid(uuid) => uuid.fmt(f),
75
7
            Self::String(name) => f.write_str(name),
76
        }
77
60
    }
78
}
79
80
impl MetricsComponent for OperationId {
81
0
    fn publish(
82
0
        &self,
83
0
        _kind: MetricKind,
84
0
        _field_metadata: MetricFieldData,
85
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
86
0
        Ok(MetricPublishKnownKindData::String(self.to_string()))
87
0
    }
88
}
89
90
0
fn uuid_to_string(uuid: &Uuid) -> String {
91
0
    uuid.hyphenated().to_string()
92
0
}
93
94
impl From<&str> for OperationId {
95
517
    fn from(value: &str) -> Self {
96
517
        match Uuid::parse_str(value) {
97
12
            Ok(uuid) => Self::Uuid(uuid),
98
505
            Err(_) => Self::String(value.to_string()),
99
        }
100
517
    }
101
}
102
103
impl From<String> for OperationId {
104
8
    fn from(value: String) -> Self {
105
8
        match Uuid::parse_str(&value) {
106
8
            Ok(uuid) => Self::Uuid(uuid),
107
0
            Err(_) => Self::String(value),
108
        }
109
8
    }
110
}
111
112
impl TryFrom<Bytes> for OperationId {
113
    type Error = Error;
114
115
0
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
116
0
        // This is an optimized path to attempt to do the conversion in-place
117
0
        // to avoid an extra allocation/copy.
118
0
        match value.try_into_mut() {
119
            // We are the only reference to the Bytes, so we can convert it into a Vec<u8>
120
            // for free then convert the Vec<u8> to a String for free too.
121
0
            Ok(value) => {
122
0
                let value = String::from_utf8(value.into()).map_err(|e| {
123
0
                    make_input_err!(
124
0
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
125
0
                    )
126
0
                })?;
127
0
                Ok(Self::from(value))
128
            }
129
            // We could not take ownership of the Bytes, so we may need to copy our data.
130
0
            Err(value) => {
131
0
                let value = std::str::from_utf8(&value).map_err(|e| {
132
0
                    make_input_err!(
133
0
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
134
0
                    )
135
0
                })?;
136
0
                Ok(Self::from(value))
137
            }
138
        }
139
0
    }
140
}
141
142
/// Unique id of worker.
143
0
#[derive(Default, Eq, PartialEq, Hash, Copy, Clone, Serialize, Deserialize)]
144
pub struct WorkerId(pub Uuid);
145
146
impl MetricsComponent for WorkerId {
147
0
    fn publish(
148
0
        &self,
149
0
        _kind: MetricKind,
150
0
        _field_metadata: MetricFieldData,
151
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
152
0
        Ok(MetricPublishKnownKindData::String(uuid_to_string(&self.0)))
153
0
    }
154
}
155
156
impl std::fmt::Display for WorkerId {
157
74
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158
74
        let mut buf = Uuid::encode_buffer();
159
74
        let worker_id_str = self.0.hyphenated().encode_lower(&mut buf);
160
74
        f.write_fmt(format_args!("{worker_id_str}"))
161
74
    }
162
}
163
164
impl std::fmt::Debug for WorkerId {
165
5
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166
5
        std::fmt::Display::fmt(&self, f)
167
5
    }
168
}
169
170
impl TryFrom<String> for WorkerId {
171
    type Error = Error;
172
8
    fn try_from(s: String) -> Result<Self, Self::Error> {
173
8
        match Uuid::parse_str(&s) {
174
0
            Err(e) => Err(make_input_err!(
175
0
                "Failed to convert string to WorkerId : {s} : {e:?}",
176
0
            )),
177
8
            Ok(my_uuid) => Ok(WorkerId(my_uuid)),
178
        }
179
8
    }
180
}
181
182
/// Holds the information needed to uniquely identify an action
183
/// and if it is cachable or not.
184
4
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
185
pub enum ActionUniqueQualifier {
186
    /// The action is cachable.
187
    Cachable(ActionUniqueKey),
188
    /// The action is uncachable.
189
    Uncachable(ActionUniqueKey),
190
}
191
192
impl MetricsComponent for ActionUniqueQualifier {
193
0
    fn publish(
194
0
        &self,
195
0
        _kind: MetricKind,
196
0
        field_metadata: MetricFieldData,
197
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
198
0
        let (cachable, action) = match self {
199
0
            Self::Cachable(action) => (true, action),
200
0
            Self::Uncachable(action) => (false, action),
201
        };
202
0
        publish!(
203
0
            cachable,
204
0
            &cachable,
205
0
            MetricKind::Default,
206
0
            "If the action is cachable.",
207
0
            ""
208
        );
209
0
        action.publish(MetricKind::Component, field_metadata)?;
210
0
        Ok(MetricPublishKnownKindData::Component)
211
0
    }
212
}
213
214
impl ActionUniqueQualifier {
215
    /// Get the instance_name of the action.
216
0
    pub const fn instance_name(&self) -> &String {
217
0
        match self {
218
0
            Self::Cachable(action) => &action.instance_name,
219
0
            Self::Uncachable(action) => &action.instance_name,
220
        }
221
0
    }
222
223
    /// Get the digest function of the action.
224
5
    pub const fn digest_function(&self) -> DigestHasherFunc {
225
5
        match self {
226
5
            Self::Cachable(action) => action.digest_function,
227
0
            Self::Uncachable(action) => action.digest_function,
228
        }
229
5
    }
230
231
    /// Get the digest of the action.
232
114
    pub const fn digest(&self) -> DigestInfo {
233
114
        match self {
234
109
            Self::Cachable(action) => action.digest,
235
5
            Self::Uncachable(action) => action.digest,
236
        }
237
114
    }
238
}
239
240
impl std::fmt::Display for ActionUniqueQualifier {
241
3
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242
3
        let (cachable, unique_key) = match self {
243
3
            Self::Cachable(action) => (true, action),
244
0
            Self::Uncachable(action) => (false, action),
245
        };
246
3
        f.write_fmt(format_args!(
247
3
            // Note: We use underscores because it makes escaping easier
248
3
            // for redis.
249
3
            "{}_{}_{}_{}_{}",
250
3
            unique_key.instance_name,
251
3
            unique_key.digest_function,
252
3
            unique_key.digest.packed_hash(),
253
3
            unique_key.digest.size_bytes(),
254
3
            if cachable { 'c' } else { 
'u'0
},
  Branch (254:16): [True: 3, False: 0]
  Branch (254:16): [Folded - Ignored]
255
        ))
256
3
    }
257
}
258
259
/// This is a utility struct used to make it easier to match `ActionInfos` in a
260
/// `HashMap` without needing to construct an entire `ActionInfo`.
261
8
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, 
M0
etricsComponen
t0
)]
262
pub struct ActionUniqueKey {
263
    /// Name of instance group this action belongs to.
264
    #[metric(help = "Name of instance group this action belongs to.")]
265
    pub instance_name: String,
266
    /// The digest function this action expects.
267
    #[metric(help = "The digest function this action expects.")]
268
    pub digest_function: DigestHasherFunc,
269
    /// Digest of the underlying `Action`.
270
    #[metric(help = "Digest of the underlying Action.")]
271
    pub digest: DigestInfo,
272
}
273
274
impl std::fmt::Display for ActionUniqueKey {
275
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276
0
        f.write_fmt(format_args!(
277
0
            "{}/{}/{}",
278
0
            self.instance_name, self.digest_function, self.digest,
279
0
        ))
280
0
    }
281
}
282
283
/// Information needed to execute an action. This struct is used over bazel's proto `Action`
284
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
285
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
286
/// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto`
287
/// except for the salt field.
288
18
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, 
M0
etricsComponen
t0
)]
289
pub struct ActionInfo {
290
    /// Digest of the underlying `Command`.
291
    #[metric(help = "Digest of the underlying Command.")]
292
    pub command_digest: DigestInfo,
293
    /// Digest of the underlying `Directory`.
294
    #[metric(help = "Digest of the underlying Directory.")]
295
    pub input_root_digest: DigestInfo,
296
    /// Timeout of the action.
297
    #[metric(help = "Timeout of the action.")]
298
    pub timeout: Duration,
299
    /// The properties rules that must be applied when finding a worker that can run this action.
300
    #[metric(group = "platform_properties")]
301
    pub platform_properties: HashMap<String, String>,
302
    /// The priority of the action. Higher value means it should execute faster.
303
    #[metric(help = "The priority of the action. Higher value means it should execute faster.")]
304
    pub priority: i32,
305
    /// When this action started to be loaded from the CAS.
306
    #[metric(help = "When this action started to be loaded from the CAS.")]
307
    pub load_timestamp: SystemTime,
308
    /// When this action was created.
309
    #[metric(help = "When this action was created.")]
310
    pub insert_timestamp: SystemTime,
311
    /// Info used to uniquely identify this ActionInfo and if it is cachable.
312
    /// This is primarily used to join actions/operations together using this key.
313
    #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cachable.")]
314
    pub unique_qualifier: ActionUniqueQualifier,
315
}
316
317
impl ActionInfo {
318
    #[inline]
319
0
    pub const fn instance_name(&self) -> &String {
320
0
        self.unique_qualifier.instance_name()
321
0
    }
322
323
    /// Returns the underlying digest of the `Action`.
324
    #[inline]
325
74
    pub const fn digest(&self) -> DigestInfo {
326
74
        self.unique_qualifier.digest()
327
74
    }
328
329
7
    pub fn try_from_action_and_execute_request(
330
7
        execute_request: ExecuteRequest,
331
7
        action: Action,
332
7
        load_timestamp: SystemTime,
333
7
        queued_timestamp: SystemTime,
334
7
    ) -> Result<Self, Error> {
335
7
        let unique_key = ActionUniqueKey {
336
7
            instance_name: execute_request.instance_name,
337
7
            digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
338
7
                .err_tip(|| 
format!("Could not find digest_function in try_from_action_and_execute_request {:?}", execute_request.digest_function)0
)
?0
,
339
7
            digest: execute_request
340
7
                .action_digest
341
7
                .err_tip(|| 
"Expected action_digest to exist on ExecuteRequest"0
)
?0
342
7
                .try_into()
?0
,
343
        };
344
7
        let unique_qualifier = if execute_request.skip_cache_lookup {
  Branch (344:35): [True: 0, False: 7]
  Branch (344:35): [Folded - Ignored]
345
0
            ActionUniqueQualifier::Uncachable(unique_key)
346
        } else {
347
7
            ActionUniqueQualifier::Cachable(unique_key)
348
        };
349
350
7
        let proto_properties = action.platform.unwrap_or_default();
351
7
        let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len());
352
7
        for 
property0
in proto_properties.properties {
353
0
            platform_properties.insert(property.name, property.value);
354
0
        }
355
356
        Ok(Self {
357
7
            command_digest: action
358
7
                .command_digest
359
7
                .err_tip(|| 
"Expected command_digest to exist on Action"0
)
?0
360
7
                .try_into()
?0
,
361
7
            input_root_digest: action
362
7
                .input_root_digest
363
7
                .err_tip(|| 
"Expected input_root_digest to exist on Action"0
)
?0
364
7
                .try_into()
?0
,
365
7
            timeout: action
366
7
                .timeout
367
7
                .unwrap_or_default()
368
7
                .try_into()
369
7
                .map_err(|_| 
make_input_err!("Failed convert proto duration to system duration")0
)
?0
,
370
7
            platform_properties,
371
7
            priority: execute_request
372
7
                .execution_policy
373
7
                .unwrap_or_default()
374
7
                .priority,
375
7
            load_timestamp,
376
7
            insert_timestamp: queued_timestamp,
377
7
            unique_qualifier,
378
        })
379
7
    }
380
}
381
382
impl From<&ActionInfo> for ExecuteRequest {
383
32
    fn from(val: &ActionInfo) -> Self {
384
32
        let digest = val.digest().into();
385
32
        let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier {
386
27
            ActionUniqueQualifier::Cachable(unique_qualifier) => (false, unique_qualifier),
387
5
            ActionUniqueQualifier::Uncachable(unique_qualifier) => (true, unique_qualifier),
388
        };
389
32
        Self {
390
32
            instance_name: unique_qualifier.instance_name.clone(),
391
32
            action_digest: Some(digest),
392
32
            skip_cache_lookup,
393
32
            execution_policy: None,     // Not used in the worker.
394
32
            results_cache_policy: None, // Not used in the worker.
395
32
            digest_function: unique_qualifier.digest_function.proto_digest_func().into(),
396
32
        }
397
32
    }
398
}
399
400
/// Simple utility struct to determine if a string is representing a full path or
401
/// just the name of the file.
402
/// This is in order to be able to reuse the same struct instead of building different
403
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
404
/// structs.
405
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
406
pub enum NameOrPath {
407
    Name(String),
408
    Path(String),
409
}
410
411
impl PartialOrd for NameOrPath {
412
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
413
0
        Some(self.cmp(other))
414
0
    }
415
}
416
417
impl Ord for NameOrPath {
418
0
    fn cmp(&self, other: &Self) -> Ordering {
419
0
        let self_lexical_name = match self {
420
0
            Self::Name(name) => name,
421
0
            Self::Path(path) => path,
422
        };
423
0
        let other_lexical_name = match other {
424
0
            Self::Name(name) => name,
425
0
            Self::Path(path) => path,
426
        };
427
0
        self_lexical_name.cmp(other_lexical_name)
428
0
    }
429
}
430
431
/// Represents an individual file and associated metadata.
432
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs
433
/// in `remote_execution.proto`.
434
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
435
pub struct FileInfo {
436
    pub name_or_path: NameOrPath,
437
    pub digest: DigestInfo,
438
    pub is_executable: bool,
439
}
440
441
//TODO: Make this TryFrom.
442
impl From<FileInfo> for FileNode {
443
0
    fn from(val: FileInfo) -> Self {
444
0
        let NameOrPath::Name(name) = val.name_or_path else {
  Branch (444:13): [True: 0, False: 0]
  Branch (444:13): [Folded - Ignored]
445
0
            panic!("Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
446
        };
447
0
        Self {
448
0
            name,
449
0
            digest: Some((&val.digest).into()),
450
0
            is_executable: val.is_executable,
451
0
            node_properties: Option::default(), // Not supported.
452
0
        }
453
0
    }
454
}
455
456
impl TryFrom<OutputFile> for FileInfo {
457
    type Error = Error;
458
459
2
    fn try_from(output_file: OutputFile) -> Result<Self, Error> {
460
2
        Ok(Self {
461
2
            name_or_path: NameOrPath::Path(output_file.path),
462
2
            digest: output_file
463
2
                .digest
464
2
                .err_tip(|| 
"Expected digest to exist on OutputFile"0
)
?0
465
2
                .try_into()
?0
,
466
2
            is_executable: output_file.is_executable,
467
        })
468
2
    }
469
}
470
471
//TODO: Make this TryFrom.
472
impl From<FileInfo> for OutputFile {
473
7
    fn from(val: FileInfo) -> Self {
474
7
        let NameOrPath::Path(path) = val.name_or_path else {
  Branch (474:13): [True: 7, False: 0]
  Branch (474:13): [Folded - Ignored]
475
0
            panic!("Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()");
476
        };
477
7
        Self {
478
7
            path,
479
7
            digest: Some((&val.digest).into()),
480
7
            is_executable: val.is_executable,
481
7
            contents: Bytes::default(),
482
7
            node_properties: Option::default(), // Not supported.
483
7
        }
484
7
    }
485
}
486
487
/// Represents an individual symlink file and associated metadata.
488
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
489
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
490
pub struct SymlinkInfo {
491
    pub name_or_path: NameOrPath,
492
    pub target: String,
493
}
494
495
impl TryFrom<SymlinkNode> for SymlinkInfo {
496
    type Error = Error;
497
498
0
    fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
499
0
        Ok(Self {
500
0
            name_or_path: NameOrPath::Name(symlink_node.name),
501
0
            target: symlink_node.target,
502
0
        })
503
0
    }
504
}
505
506
// TODO: Make this TryFrom.
507
impl From<SymlinkInfo> for SymlinkNode {
508
0
    fn from(val: SymlinkInfo) -> Self {
509
0
        let NameOrPath::Name(name) = val.name_or_path else {
  Branch (509:13): [True: 0, False: 0]
  Branch (509:13): [Folded - Ignored]
510
0
            panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
511
        };
512
0
        Self {
513
0
            name,
514
0
            target: val.target,
515
0
            node_properties: Option::default(), // Not supported.
516
0
        }
517
0
    }
518
}
519
520
impl TryFrom<OutputSymlink> for SymlinkInfo {
521
    type Error = Error;
522
523
2
    fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
524
2
        Ok(Self {
525
2
            name_or_path: NameOrPath::Path(output_symlink.path),
526
2
            target: output_symlink.target,
527
2
        })
528
2
    }
529
}
530
531
// TODO: Make this TryFrom.
532
impl From<SymlinkInfo> for OutputSymlink {
533
2
    fn from(val: SymlinkInfo) -> Self {
534
2
        let NameOrPath::Path(path) = val.name_or_path else {
  Branch (534:13): [True: 2, False: 0]
  Branch (534:13): [Folded - Ignored]
535
0
            panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
536
        };
537
2
        Self {
538
2
            path,
539
2
            target: val.target,
540
2
            node_properties: Option::default(), // Not supported.
541
2
        }
542
2
    }
543
}
544
545
/// Represents an individual directory file and associated metadata.
546
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
547
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
548
pub struct DirectoryInfo {
549
    pub path: String,
550
    pub tree_digest: DigestInfo,
551
}
552
553
impl TryFrom<OutputDirectory> for DirectoryInfo {
554
    type Error = Error;
555
556
2
    fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> {
557
2
        Ok(Self {
558
2
            path: output_directory.path,
559
2
            tree_digest: output_directory
560
2
                .tree_digest
561
2
                .err_tip(|| 
"Expected tree_digest to exist in OutputDirectory"0
)
?0
562
2
                .try_into()
?0
,
563
        })
564
2
    }
565
}
566
567
impl From<DirectoryInfo> for OutputDirectory {
568
1
    fn from(val: DirectoryInfo) -> Self {
569
1
        Self {
570
1
            path: val.path,
571
1
            tree_digest: Some(val.tree_digest.into()),
572
1
            is_topologically_sorted: false,
573
1
        }
574
1
    }
575
}
576
577
/// Represents the metadata associated with the execution result.
578
/// This struct must be 100% compatible with `ExecutedActionMetadata`.
579
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
580
pub struct ExecutionMetadata {
581
    pub worker: String,
582
    pub queued_timestamp: SystemTime,
583
    pub worker_start_timestamp: SystemTime,
584
    pub worker_completed_timestamp: SystemTime,
585
    pub input_fetch_start_timestamp: SystemTime,
586
    pub input_fetch_completed_timestamp: SystemTime,
587
    pub execution_start_timestamp: SystemTime,
588
    pub execution_completed_timestamp: SystemTime,
589
    pub output_upload_start_timestamp: SystemTime,
590
    pub output_upload_completed_timestamp: SystemTime,
591
}
592
593
impl Default for ExecutionMetadata {
594
1
    fn default() -> Self {
595
1
        Self {
596
1
            worker: String::new(),
597
1
            queued_timestamp: SystemTime::UNIX_EPOCH,
598
1
            worker_start_timestamp: SystemTime::UNIX_EPOCH,
599
1
            worker_completed_timestamp: SystemTime::UNIX_EPOCH,
600
1
            input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
601
1
            input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
602
1
            execution_start_timestamp: SystemTime::UNIX_EPOCH,
603
1
            execution_completed_timestamp: SystemTime::UNIX_EPOCH,
604
1
            output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
605
1
            output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
606
1
        }
607
1
    }
608
}
609
610
impl From<ExecutionMetadata> for ExecutedActionMetadata {
611
17
    fn from(val: ExecutionMetadata) -> Self {
612
17
        Self {
613
17
            worker: val.worker,
614
17
            queued_timestamp: Some(val.queued_timestamp.into()),
615
17
            worker_start_timestamp: Some(val.worker_start_timestamp.into()),
616
17
            worker_completed_timestamp: Some(val.worker_completed_timestamp.into()),
617
17
            input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()),
618
17
            input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()),
619
17
            execution_start_timestamp: Some(val.execution_start_timestamp.into()),
620
17
            execution_completed_timestamp: Some(val.execution_completed_timestamp.into()),
621
17
            output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()),
622
17
            output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()),
623
17
            virtual_execution_duration: val
624
17
                .execution_completed_timestamp
625
17
                .duration_since(val.execution_start_timestamp)
626
17
                .ok()
627
17
                .and_then(|duration| prost_types::Duration::try_from(duration).ok()),
628
17
            auxiliary_metadata: Vec::default(),
629
17
        }
630
17
    }
631
}
632
633
impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
634
    type Error = Error;
635
636
3
    fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> {
637
3
        Ok(Self {
638
3
            worker: eam.worker,
639
3
            queued_timestamp: eam
640
3
                .queued_timestamp
641
3
                .err_tip(|| 
"Expected queued_timestamp to exist in ExecutedActionMetadata"0
)
?0
642
3
                .try_into()
?0
,
643
3
            worker_start_timestamp: eam
644
3
                .worker_start_timestamp
645
3
                .err_tip(|| 
"Expected worker_start_timestamp to exist in ExecutedActionMetadata"0
)
?0
646
3
                .try_into()
?0
,
647
3
            worker_completed_timestamp: eam
648
3
                .worker_completed_timestamp
649
3
                .err_tip(|| {
650
0
                    "Expected worker_completed_timestamp to exist in ExecutedActionMetadata"
651
3
                })
?0
652
3
                .try_into()
?0
,
653
3
            input_fetch_start_timestamp: eam
654
3
                .input_fetch_start_timestamp
655
3
                .err_tip(|| {
656
0
                    "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata"
657
3
                })
?0
658
3
                .try_into()
?0
,
659
3
            input_fetch_completed_timestamp: eam
660
3
                .input_fetch_completed_timestamp
661
3
                .err_tip(|| {
662
0
                    "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata"
663
3
                })
?0
664
3
                .try_into()
?0
,
665
3
            execution_start_timestamp: eam
666
3
                .execution_start_timestamp
667
3
                .err_tip(|| {
668
0
                    "Expected execution_start_timestamp to exist in ExecutedActionMetadata"
669
3
                })
?0
670
3
                .try_into()
?0
,
671
3
            execution_completed_timestamp: eam
672
3
                .execution_completed_timestamp
673
3
                .err_tip(|| {
674
0
                    "Expected execution_completed_timestamp to exist in ExecutedActionMetadata"
675
3
                })
?0
676
3
                .try_into()
?0
,
677
3
            output_upload_start_timestamp: eam
678
3
                .output_upload_start_timestamp
679
3
                .err_tip(|| {
680
0
                    "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata"
681
3
                })
?0
682
3
                .try_into()
?0
,
683
3
            output_upload_completed_timestamp: eam
684
3
                .output_upload_completed_timestamp
685
3
                .err_tip(|| {
686
0
                    "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata"
687
3
                })
?0
688
3
                .try_into()
?0
,
689
        })
690
3
    }
691
}
692
693
/// Represents the results of an execution.
694
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
695
0
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
696
pub struct ActionResult {
697
    pub output_files: Vec<FileInfo>,
698
    pub output_folders: Vec<DirectoryInfo>,
699
    pub output_directory_symlinks: Vec<SymlinkInfo>,
700
    pub output_file_symlinks: Vec<SymlinkInfo>,
701
    pub exit_code: i32,
702
    pub stdout_digest: DigestInfo,
703
    pub stderr_digest: DigestInfo,
704
    pub execution_metadata: ExecutionMetadata,
705
    pub server_logs: HashMap<String, DigestInfo>,
706
    pub error: Option<Error>,
707
    pub message: String,
708
}
709
710
impl Default for ActionResult {
711
7
    fn default() -> Self {
712
7
        ActionResult {
713
7
            output_files: Default::default(),
714
7
            output_folders: Default::default(),
715
7
            output_directory_symlinks: Default::default(),
716
7
            output_file_symlinks: Default::default(),
717
7
            exit_code: INTERNAL_ERROR_EXIT_CODE,
718
7
            stdout_digest: DigestInfo::new([0u8; 32], 0),
719
7
            stderr_digest: DigestInfo::new([0u8; 32], 0),
720
7
            execution_metadata: ExecutionMetadata {
721
7
                worker: String::new(),
722
7
                queued_timestamp: SystemTime::UNIX_EPOCH,
723
7
                worker_start_timestamp: SystemTime::UNIX_EPOCH,
724
7
                worker_completed_timestamp: SystemTime::UNIX_EPOCH,
725
7
                input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
726
7
                input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
727
7
                execution_start_timestamp: SystemTime::UNIX_EPOCH,
728
7
                execution_completed_timestamp: SystemTime::UNIX_EPOCH,
729
7
                output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
730
7
                output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
731
7
            },
732
7
            server_logs: Default::default(),
733
7
            error: None,
734
7
            message: String::new(),
735
7
        }
736
7
    }
737
}
738
739
// TODO(allada) Remove the need for clippy argument by making the ActionResult and ProtoActionResult
740
// a Box.
741
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
742
4
#[derive(PartialEq, Debug, Clone, 
Serialize0
, Deserialize)]
743
#[allow(clippy::large_enum_variant)]
744
pub enum ActionStage {
745
    /// Stage is unknown.
746
    Unknown,
747
    /// Checking the cache to see if action exists.
748
    CacheCheck,
749
    /// Action has been accepted and waiting for worker to take it.
750
    Queued,
751
    // TODO(allada) We need a way to know if the job was sent to a worker, but hasn't begun
752
    // execution yet.
753
    /// Worker is executing the action.
754
    Executing,
755
    /// Worker completed the work with result.
756
    Completed(ActionResult),
757
    /// Result was found from cache, don't decode the proto just to re-encode it.
758
    #[serde(serialize_with = "serialize_proto_result", skip_deserializing)]
759
    // The serialization step decodes this to an ActionResult which is serializable.
760
    // Since it will always be serialized as an ActionResult, we do not need to support
761
    // deserialization on this type at all.
762
    // In theory, serializing this should never happen so performance shouldn't be affected.
763
    CompletedFromCache(ProtoActionResult),
764
}
765
766
0
fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error>
767
0
where
768
0
    S: serde::Serializer,
769
0
{
770
0
    let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?;
771
0
    s.serialize(serializer)
772
0
}
773
774
impl ActionStage {
775
89
    pub const fn has_action_result(&self) -> bool {
776
89
        match self {
777
76
            Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
778
13
            Self::Completed(_) | Self::CompletedFromCache(_) => true,
779
        }
780
89
    }
781
782
    /// Returns true if the worker considers the action done and no longer needs to be tracked.
783
    // Note: This function is separate from `has_action_result()` to not mix the concept of
784
    //       "finished" with "has a result".
785
88
    pub const fn is_finished(&self) -> bool {
786
88
        self.has_action_result()
787
88
    }
788
789
    /// Returns if the stage enum is the same as the other stage enum, but
790
    /// does not compare the values of the enum.
791
39
    pub const fn is_same_stage(&self, other: &Self) -> bool {
792
39
        matches!(
793
39
            (self, other),
794
            (Self::Unknown, Self::Unknown)
795
                | (Self::CacheCheck, Self::CacheCheck)
796
                | (Self::Queued, Self::Queued)
797
                | (Self::Executing, Self::Executing)
798
                | (Self::Completed(_), Self::Completed(_))
799
                | (Self::CompletedFromCache(_), Self::CompletedFromCache(_))
800
        )
801
39
    }
802
}
803
804
impl MetricsComponent for ActionStage {
805
0
    fn publish(
806
0
        &self,
807
0
        _kind: MetricKind,
808
0
        _field_metadata: MetricFieldData,
809
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
810
0
        let value = match self {
811
0
            ActionStage::Unknown => "Unknown".to_string(),
812
0
            ActionStage::CacheCheck => "CacheCheck".to_string(),
813
0
            ActionStage::Queued => "Queued".to_string(),
814
0
            ActionStage::Executing => "Executing".to_string(),
815
0
            ActionStage::Completed(_) => "Completed".to_string(),
816
0
            ActionStage::CompletedFromCache(_) => "CompletedFromCache".to_string(),
817
        };
818
0
        Ok(MetricPublishKnownKindData::String(value))
819
0
    }
820
}
821
822
impl From<&ActionStage> for execution_stage::Value {
823
1
    fn from(val: &ActionStage) -> Self {
824
1
        match val {
825
0
            ActionStage::Unknown => Self::Unknown,
826
0
            ActionStage::CacheCheck => Self::CacheCheck,
827
0
            ActionStage::Queued => Self::Queued,
828
0
            ActionStage::Executing => Self::Executing,
829
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
830
        }
831
1
    }
832
}
833
834
11
pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse {
835
11
    fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> {
836
11
        let mut logs = HashMap::with_capacity(server_logs.len());
837
12
        for (
k, v1
) in server_logs {
838
1
            logs.insert(
839
1
                k.clone(),
840
1
                LogFile {
841
1
                    digest: Some(v.into()),
842
1
                    human_readable: false,
843
1
                },
844
1
            );
845
1
        }
846
11
        logs
847
11
    }
848
849
11
    let status = Some(
850
11
        action_result
851
11
            .error
852
11
            .clone()
853
11
            .map_or_else(Status::default, Into::into),
854
11
    );
855
11
    let message = action_result.message.clone();
856
11
    ExecuteResponse {
857
11
        server_logs: logs_from(action_result.server_logs.clone()),
858
11
        result: Some(action_result.into()),
859
11
        cached_result: false,
860
11
        status,
861
11
        message,
862
11
    }
863
11
}
864
865
impl From<ActionStage> for ExecuteResponse {
866
6
    fn from(val: ActionStage) -> Self {
867
6
        match val {
868
            // We don't have an execute response if we don't have the results. It is defined
869
            // behavior to return an empty proto struct.
870
            ActionStage::Unknown
871
            | ActionStage::CacheCheck
872
            | ActionStage::Queued
873
0
            | ActionStage::Executing => Self::default(),
874
6
            ActionStage::Completed(action_result) => to_execute_response(action_result),
875
            // Handled separately as there are no server logs and the action
876
            // result is already in Proto format.
877
0
            ActionStage::CompletedFromCache(proto_action_result) => Self {
878
0
                server_logs: HashMap::new(),
879
0
                result: Some(proto_action_result),
880
0
                cached_result: true,
881
0
                status: Some(Status::default()),
882
0
                message: String::new(), // Will be populated later if applicable.
883
0
            },
884
        }
885
6
    }
886
}
887
888
impl From<ActionResult> for ProtoActionResult {
889
17
    fn from(val: ActionResult) -> Self {
890
17
        let mut output_symlinks = Vec::with_capacity(
891
17
            val.output_file_symlinks.len() + val.output_directory_symlinks.len(),
892
17
        );
893
17
        output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice());
894
17
        output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice());
895
17
896
17
        Self {
897
17
            output_files: val.output_files.into_iter().map(Into::into).collect(),
898
17
            output_file_symlinks: val
899
17
                .output_file_symlinks
900
17
                .into_iter()
901
17
                .map(Into::into)
902
17
                .collect(),
903
17
            output_symlinks: output_symlinks.into_iter().map(Into::into).collect(),
904
17
            output_directories: val.output_folders.into_iter().map(Into::into).collect(),
905
17
            output_directory_symlinks: val
906
17
                .output_directory_symlinks
907
17
                .into_iter()
908
17
                .map(Into::into)
909
17
                .collect(),
910
17
            exit_code: val.exit_code,
911
17
            stdout_raw: Bytes::default(),
912
17
            stdout_digest: Some(val.stdout_digest.into()),
913
17
            stderr_raw: Bytes::default(),
914
17
            stderr_digest: Some(val.stderr_digest.into()),
915
17
            execution_metadata: Some(val.execution_metadata.into()),
916
17
        }
917
17
    }
918
}
919
920
impl TryFrom<ProtoActionResult> for ActionResult {
921
    type Error = Error;
922
923
0
    fn try_from(val: ProtoActionResult) -> Result<Self, Error> {
924
0
        let output_file_symlinks = val
925
0
            .output_file_symlinks
926
0
            .into_iter()
927
0
            .map(|output_symlink| {
928
0
                SymlinkInfo::try_from(output_symlink)
929
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
930
0
            })
931
0
            .collect::<Result<Vec<_>, _>>()?;
932
933
0
        let output_directory_symlinks = val
934
0
            .output_directory_symlinks
935
0
            .into_iter()
936
0
            .map(|output_symlink| {
937
0
                SymlinkInfo::try_from(output_symlink)
938
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
939
0
            })
940
0
            .collect::<Result<Vec<_>, _>>()?;
941
942
0
        let output_files = val
943
0
            .output_files
944
0
            .into_iter()
945
0
            .map(|output_file| {
946
0
                output_file
947
0
                    .try_into()
948
0
                    .err_tip(|| "Output File could not be converted")
949
0
            })
950
0
            .collect::<Result<Vec<_>, _>>()?;
951
952
0
        let output_folders = val
953
0
            .output_directories
954
0
            .into_iter()
955
0
            .map(|output_directory| {
956
0
                output_directory
957
0
                    .try_into()
958
0
                    .err_tip(|| "Output File could not be converted")
959
0
            })
960
0
            .collect::<Result<Vec<_>, _>>()?;
961
962
        Ok(Self {
963
0
            output_files,
964
0
            output_folders,
965
0
            output_file_symlinks,
966
0
            output_directory_symlinks,
967
0
            exit_code: val.exit_code,
968
0
            stdout_digest: val
969
0
                .stdout_digest
970
0
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?
971
0
                .try_into()?,
972
0
            stderr_digest: val
973
0
                .stderr_digest
974
0
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?
975
0
                .try_into()?,
976
0
            execution_metadata: val
977
0
                .execution_metadata
978
0
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?
979
0
                .try_into()?,
980
0
            server_logs: Default::default(),
981
0
            error: None,
982
0
            message: String::new(),
983
        })
984
0
    }
985
}
986
987
impl TryFrom<ExecuteResponse> for ActionStage {
988
    type Error = Error;
989
990
3
    fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> {
991
3
        let proto_action_result = execute_response
992
3
            .result
993
3
            .err_tip(|| 
"Expected result to be set on ExecuteResponse msg"0
)
?0
;
994
3
        let action_result = ActionResult {
995
3
            output_files: proto_action_result
996
3
                .output_files
997
3
                .try_map(TryInto::try_into)
?0
,
998
3
            output_directory_symlinks: proto_action_result
999
3
                .output_directory_symlinks
1000
3
                .try_map(TryInto::try_into)
?0
,
1001
3
            output_file_symlinks: proto_action_result
1002
3
                .output_file_symlinks
1003
3
                .try_map(TryInto::try_into)
?0
,
1004
3
            output_folders: proto_action_result
1005
3
                .output_directories
1006
3
                .try_map(TryInto::try_into)
?0
,
1007
3
            exit_code: proto_action_result.exit_code,
1008
3
1009
3
            stdout_digest: proto_action_result
1010
3
                .stdout_digest
1011
3
                .err_tip(|| 
"Expected stdout_digest to be set on ExecuteResponse msg"0
)
?0
1012
3
                .try_into()
?0
,
1013
3
            stderr_digest: proto_action_result
1014
3
                .stderr_digest
1015
3
                .err_tip(|| 
"Expected stderr_digest to be set on ExecuteResponse msg"0
)
?0
1016
3
                .try_into()
?0
,
1017
3
            execution_metadata: proto_action_result
1018
3
                .execution_metadata
1019
3
                .err_tip(|| 
"Expected execution_metadata to be set on ExecuteResponse msg"0
)
?0
1020
3
                .try_into()
?0
,
1021
3
            server_logs: execute_response.server_logs.try_map(|v| {
1022
2
                v.digest
1023
2
                    .err_tip(|| 
"Expected digest to be set on LogFile msg"0
)
?0
1024
2
                    .try_into()
1025
3
            
}2
)
?0
,
1026
3
            error: execute_response.status.clone().and_then(|v| {
1027
3
                if v.code == 0 {
  Branch (1027:20): [True: 1, False: 2]
  Branch (1027:20): [Folded - Ignored]
1028
1
                    None
1029
                } else {
1030
2
                    Some(v.into())
1031
                }
1032
3
            }),
1033
3
            message: execute_response.message,
1034
3
        };
1035
3
1036
3
        if execute_response.cached_result {
  Branch (1036:12): [True: 0, False: 3]
  Branch (1036:12): [Folded - Ignored]
1037
0
            return Ok(Self::CompletedFromCache(action_result.into()));
1038
3
        }
1039
3
        Ok(Self::Completed(action_result))
1040
3
    }
1041
}
1042
1043
// TODO: Should be able to remove this after tokio-rs/prost#299
1044
trait TypeUrl: Message {
1045
    const TYPE_URL: &'static str;
1046
}
1047
1048
impl TypeUrl for ExecuteResponse {
1049
    const TYPE_URL: &'static str =
1050
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
1051
}
1052
1053
impl TypeUrl for ExecuteOperationMetadata {
1054
    const TYPE_URL: &'static str =
1055
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
1056
}
1057
1058
2
fn from_any<T>(message: &Any) -> Result<T, Error>
1059
2
where
1060
2
    T: TypeUrl + Default,
1061
2
{
1062
0
    error_if!(
1063
2
        message.type_url != T::TYPE_URL,
  Branch (1063:9): [True: 0, False: 1]
  Branch (1063:9): [True: 0, False: 1]
  Branch (1063:9): [Folded - Ignored]
1064
        "Incorrect type when decoding Any. {} != {}",
1065
        message.type_url,
1066
0
        T::TYPE_URL.to_string()
1067
    );
1068
2
    Ok(T::decode(message.value.as_slice())
?0
)
1069
2
}
1070
1071
2
fn to_any<T>(message: &T) -> Any
1072
2
where
1073
2
    T: TypeUrl,
1074
2
{
1075
2
    Any {
1076
2
        type_url: T::TYPE_URL.to_string(),
1077
2
        value: message.encode_to_vec(),
1078
2
    }
1079
2
}
1080
1081
/// Current state of the action.
1082
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
1083
8
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize, 
M0
etricsComponen
t0
)]
1084
pub struct ActionState {
1085
    #[metric(help = "The current stage of the action.")]
1086
    pub stage: ActionStage,
1087
    #[metric(help = "The unique identifier of the action.")]
1088
    pub client_operation_id: OperationId,
1089
    #[metric(help = "The digest of the action.")]
1090
    pub action_digest: DigestInfo,
1091
}
1092
1093
impl ActionState {
1094
1
    pub fn try_from_operation(
1095
1
        operation: Operation,
1096
1
        client_operation_id: OperationId,
1097
1
    ) -> Result<Self, Error> {
1098
1
        let metadata = from_any::<ExecuteOperationMetadata>(
1099
1
            &operation
1100
1
                .metadata
1101
1
                .err_tip(|| 
"No metadata in upstream operation"0
)
?0
,
1102
        )
1103
1
        .err_tip(|| 
"Could not decode metadata in upstream operation"0
)
?0
;
1104
1105
1
        let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| {
1106
0
            format!(
1107
0
                "Could not convert {} to execution_stage::Value",
1108
0
                metadata.stage
1109
0
            )
1110
1
        })
?0
{
1111
0
            execution_stage::Value::Unknown => ActionStage::Unknown,
1112
0
            execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
1113
0
            execution_stage::Value::Queued => ActionStage::Queued,
1114
0
            execution_stage::Value::Executing => ActionStage::Executing,
1115
            execution_stage::Value::Completed => {
1116
1
                let execute_response = operation
1117
1
                    .result
1118
1
                    .err_tip(|| 
"No result data for completed upstream action"0
)
?0
;
1119
1
                match execute_response {
1120
0
                    LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
1121
0
                        error: Some(error.into()),
1122
0
                        ..ActionResult::default()
1123
0
                    }),
1124
1
                    LongRunningResult::Response(response) => {
1125
1
                        // Could be Completed, CompletedFromCache or Error.
1126
1
                        from_any::<ExecuteResponse>(&response)
1127
1
                            .err_tip(|| {
1128
0
                                "Could not decode result structure for completed upstream action"
1129
1
                            })
?0
1130
1
                            .try_into()
?0
1131
                    }
1132
                }
1133
            }
1134
        };
1135
1136
1
        let action_digest = metadata
1137
1
            .action_digest
1138
1
            .err_tip(|| 
"No action_digest in upstream operation"0
)
?0
1139
1
            .try_into()
1140
1
            .err_tip(|| 
"Could not convert action_digest into DigestInfo"0
)
?0
;
1141
1142
1
        Ok(Self {
1143
1
            stage,
1144
1
            client_operation_id,
1145
1
            action_digest,
1146
1
        })
1147
1
    }
1148
1149
1
    pub fn as_operation(&self, client_operation_id: OperationId) -> Operation {
1150
1
        let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
1151
1
        let name = client_operation_id.into_string();
1152
1153
1
        let result = if self.stage.has_action_result() {
  Branch (1153:25): [True: 1, False: 0]
  Branch (1153:25): [Folded - Ignored]
1154
1
            let execute_response: ExecuteResponse = self.stage.clone().into();
1155
1
            Some(LongRunningResult::Response(to_any(&execute_response)))
1156
        } else {
1157
0
            None
1158
        };
1159
1
        let digest = Some(self.action_digest.into());
1160
1
1161
1
        let metadata = ExecuteOperationMetadata {
1162
1
            stage,
1163
1
            action_digest: digest,
1164
1
            // TODO(blaise.bruer) We should support stderr/stdout streaming.
1165
1
            stdout_stream_name: String::default(),
1166
1
            stderr_stream_name: String::default(),
1167
1
            partial_execution_metadata: None,
1168
1
        };
1169
1
1170
1
        Operation {
1171
1
            name,
1172
1
            metadata: Some(to_any(&metadata)),
1173
1
            done: result.is_some(),
1174
1
            result,
1175
1
        }
1176
1
    }
1177
}