Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/action_messages.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp::Ordering;
16
use core::convert::Into;
17
use core::hash::Hash;
18
use core::time::Duration;
19
use std::collections::HashMap;
20
use std::time::SystemTime;
21
22
use nativelink_error::{Error, ResultExt, error_if, make_input_err};
23
use nativelink_metric::{
24
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, publish,
25
};
26
use nativelink_proto::build::bazel::remote::execution::v2::{
27
    Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
28
    ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile,
29
    OutputSymlink, SymlinkNode, execution_stage,
30
};
31
use nativelink_proto::google::longrunning::Operation;
32
use nativelink_proto::google::longrunning::operation::Result as LongRunningResult;
33
use nativelink_proto::google::rpc::Status;
34
use prost::Message;
35
use prost::bytes::Bytes;
36
use prost_types::Any;
37
use serde::ser::Error as SerdeError;
38
use serde::{Deserialize, Serialize};
39
use uuid::Uuid;
40
41
use crate::common::{DigestInfo, HashMapExt, VecExt};
42
use crate::digest_hasher::DigestHasherFunc;
43
44
/// Default priority remote execution jobs will get when not provided.
45
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;
46
47
/// Exit code sent if there is an internal error.
48
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;
49
50
/// Holds an id that is unique to the client for a requested operation.
51
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
52
pub enum OperationId {
53
    Uuid(Uuid),
54
    String(String),
55
}
56
57
impl OperationId {
58
1
    pub fn into_string(self) -> String {
59
1
        match self {
60
1
            Self::Uuid(uuid) => uuid.to_string(),
61
0
            Self::String(name) => name,
62
        }
63
1
    }
64
}
65
66
impl Default for OperationId {
67
97
    fn default() -> Self {
68
97
        Self::Uuid(Uuid::new_v4())
69
97
    }
70
}
71
72
impl core::fmt::Display for OperationId {
73
157
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74
157
        match self {
75
87
            Self::Uuid(uuid) => uuid.fmt(f),
76
70
            Self::String(name) => f.write_str(name),
77
        }
78
157
    }
79
}
80
81
impl MetricsComponent for OperationId {
82
0
    fn publish(
83
0
        &self,
84
0
        _kind: MetricKind,
85
0
        _field_metadata: MetricFieldData,
86
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
87
0
        Ok(MetricPublishKnownKindData::String(self.to_string()))
88
0
    }
89
}
90
91
impl From<&str> for OperationId {
92
39
    fn from(value: &str) -> Self {
93
39
        Uuid::parse_str(value).map_or_else(|_| Self::String(
value18
.
to_string18
()), Self::Uuid)
94
39
    }
95
}
96
97
impl From<String> for OperationId {
98
8
    fn from(value: String) -> Self {
99
8
        Uuid::parse_str(&value).map_or(Self::String(value), Self::Uuid)
100
8
    }
101
}
102
103
impl TryFrom<Bytes> for OperationId {
104
    type Error = Error;
105
106
0
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
107
        // This is an optimized path to attempt to do the conversion in-place
108
        // to avoid an extra allocation/copy.
109
0
        match value.try_into_mut() {
110
            // We are the only reference to the Bytes, so we can convert it into a Vec<u8>
111
            // for free then convert the Vec<u8> to a String for free too.
112
0
            Ok(value) => {
113
0
                let value = String::from_utf8(value.into()).map_err(|e| {
114
0
                    make_input_err!(
115
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
116
                    )
117
0
                })?;
118
0
                Ok(Self::from(value))
119
            }
120
            // We could not take ownership of the Bytes, so we may need to copy our data.
121
0
            Err(value) => {
122
0
                let value = core::str::from_utf8(&value).map_err(|e| {
123
0
                    make_input_err!(
124
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
125
                    )
126
0
                })?;
127
0
                Ok(Self::from(value))
128
            }
129
        }
130
0
    }
131
}
132
133
/// Unique id of worker.
134
#[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
135
pub struct WorkerId(pub String);
136
137
impl MetricsComponent for WorkerId {
138
0
    fn publish(
139
0
        &self,
140
0
        _kind: MetricKind,
141
0
        _field_metadata: MetricFieldData,
142
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
143
0
        Ok(MetricPublishKnownKindData::String(self.0.clone()))
144
0
    }
145
}
146
147
impl core::fmt::Display for WorkerId {
148
27
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
149
27
        f.write_fmt(format_args!("{}", self.0))
150
27
    }
151
}
152
153
impl core::fmt::Debug for WorkerId {
154
6
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
155
6
        core::fmt::Display::fmt(&self, f)
156
6
    }
157
}
158
159
impl From<WorkerId> for String {
160
101
    fn from(val: WorkerId) -> Self {
161
101
        val.0
162
101
    }
163
}
164
165
impl From<String> for WorkerId {
166
8
    fn from(s: String) -> Self {
167
8
        Self(s)
168
8
    }
169
}
170
171
/// Holds the information needed to uniquely identify an action
172
/// and if it is cacheable or not.
173
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
174
pub enum ActionUniqueQualifier {
175
    /// The action is cacheable.
176
    #[serde(alias = "Cachable")] // Pre 0.7.0 spelling
177
    Cacheable(ActionUniqueKey),
178
    /// The action is uncacheable.
179
    #[serde(alias = "Uncachable")] // Pre 0.7.0 spelling
180
    Uncacheable(ActionUniqueKey),
181
}
182
183
impl MetricsComponent for ActionUniqueQualifier {
184
0
    fn publish(
185
0
        &self,
186
0
        _kind: MetricKind,
187
0
        field_metadata: MetricFieldData,
188
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
189
0
        let (cacheable, action) = match self {
190
0
            Self::Cacheable(action) => (true, action),
191
0
            Self::Uncacheable(action) => (false, action),
192
        };
193
0
        publish!(
194
0
            cacheable,
195
0
            &cacheable,
196
0
            MetricKind::Default,
197
0
            "If the action is cacheable.",
198
0
            ""
199
        );
200
0
        action.publish(MetricKind::Component, field_metadata)?;
201
0
        Ok(MetricPublishKnownKindData::Component)
202
0
    }
203
}
204
205
impl ActionUniqueQualifier {
206
    /// Get the `instance_name` of the action.
207
0
    pub const fn instance_name(&self) -> &String {
208
0
        match self {
209
0
            Self::Cacheable(action) | Self::Uncacheable(action) => &action.instance_name,
210
        }
211
0
    }
212
213
    /// Get the digest function of the action.
214
11
    pub const fn digest_function(&self) -> DigestHasherFunc {
215
11
        match self {
216
11
            Self::Cacheable(action) | Self::Uncacheable(
action0
) => action.digest_function,
217
        }
218
11
    }
219
220
    /// Get the digest of the action.
221
133
    pub const fn digest(&self) -> DigestInfo {
222
133
        match self {
223
133
            Self::Cacheable(
action128
) | Self::Uncacheable(
action5
) => action.digest,
224
        }
225
133
    }
226
}
227
228
impl core::fmt::Display for ActionUniqueQualifier {
229
17
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
230
17
        let (cacheable, unique_key) = match self {
231
17
            Self::Cacheable(action) => (true, action),
232
0
            Self::Uncacheable(action) => (false, action),
233
        };
234
17
        f.write_fmt(format_args!(
235
            // Note: We use underscores because it makes escaping easier
236
            // for redis.
237
17
            "{}_{}_{}_{}_{}",
238
            unique_key.instance_name,
239
            unique_key.digest_function,
240
17
            unique_key.digest.packed_hash(),
241
17
            unique_key.digest.size_bytes(),
242
17
            if cacheable { 'c' } else { 
'u'0
},
  Branch (242:16): [True: 17, False: 0]
  Branch (242:16): [Folded - Ignored]
243
        ))
244
17
    }
245
}
246
247
/// This is a utility struct used to make it easier to match `ActionInfos` in a
248
/// `HashMap` without needing to construct an entire `ActionInfo`.
249
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, MetricsComponent)]
250
pub struct ActionUniqueKey {
251
    /// Name of instance group this action belongs to.
252
    #[metric(help = "Name of instance group this action belongs to.")]
253
    pub instance_name: String,
254
    /// The digest function this action expects.
255
    #[metric(help = "The digest function this action expects.")]
256
    pub digest_function: DigestHasherFunc,
257
    /// Digest of the underlying `Action`.
258
    #[metric(help = "Digest of the underlying Action.")]
259
    pub digest: DigestInfo,
260
}
261
262
impl core::fmt::Display for ActionUniqueKey {
263
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
264
0
        f.write_fmt(format_args!(
265
0
            "{}/{}/{}",
266
            self.instance_name, self.digest_function, self.digest,
267
        ))
268
0
    }
269
}
270
271
/// Information needed to execute an action. This struct is used over bazel's proto `Action`
272
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
273
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
274
/// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto`
275
/// except for the salt field.
276
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, MetricsComponent)]
277
pub struct ActionInfo {
278
    /// Digest of the underlying `Command`.
279
    #[metric(help = "Digest of the underlying Command.")]
280
    pub command_digest: DigestInfo,
281
    /// Digest of the underlying `Directory`.
282
    #[metric(help = "Digest of the underlying Directory.")]
283
    pub input_root_digest: DigestInfo,
284
    /// Timeout of the action.
285
    #[metric(help = "Timeout of the action.")]
286
    pub timeout: Duration,
287
    /// The properties rules that must be applied when finding a worker that can run this action.
288
    #[metric(group = "platform_properties")]
289
    pub platform_properties: HashMap<String, String>,
290
    /// The priority of the action. Higher value means it should execute faster.
291
    #[metric(help = "The priority of the action. Higher value means it should execute faster.")]
292
    pub priority: i32,
293
    /// When this action started to be loaded from the CAS.
294
    #[metric(help = "When this action started to be loaded from the CAS.")]
295
    pub load_timestamp: SystemTime,
296
    /// When this action was created.
297
    #[metric(help = "When this action was created.")]
298
    pub insert_timestamp: SystemTime,
299
    /// Info used to uniquely identify this `ActionInfo` and if it is cacheable.
300
    /// This is primarily used to join actions/operations together using this key.
301
    #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cacheable.")]
302
    pub unique_qualifier: ActionUniqueQualifier,
303
}
304
305
impl ActionInfo {
306
    #[inline]
307
0
    pub const fn instance_name(&self) -> &String {
308
0
        self.unique_qualifier.instance_name()
309
0
    }
310
311
    /// Returns the underlying digest of the `Action`.
312
    #[inline]
313
90
    pub const fn digest(&self) -> DigestInfo {
314
90
        self.unique_qualifier.digest()
315
90
    }
316
317
19
    pub fn try_from_action_and_execute_request(
318
19
        execute_request: ExecuteRequest,
319
19
        action: Action,
320
19
        load_timestamp: SystemTime,
321
19
        queued_timestamp: SystemTime,
322
19
    ) -> Result<Self, Error> {
323
19
        let unique_key = ActionUniqueKey {
324
19
            instance_name: execute_request.instance_name,
325
19
            digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
326
19
                .err_tip(|| format!(
"Could not find digest_function in try_from_action_and_execute_request {:?}"0
, execute_request.digest_function))
?0
,
327
19
            digest: execute_request
328
19
                .action_digest
329
19
                .err_tip(|| "Expected action_digest to exist on ExecuteRequest")
?0
330
19
                .try_into()
?0
,
331
        };
332
19
        let unique_qualifier = if execute_request.skip_cache_lookup {
  Branch (332:35): [True: 0, False: 19]
  Branch (332:35): [Folded - Ignored]
333
0
            ActionUniqueQualifier::Uncacheable(unique_key)
334
        } else {
335
19
            ActionUniqueQualifier::Cacheable(unique_key)
336
        };
337
338
19
        let proto_properties = action.platform.unwrap_or_default();
339
19
        let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len());
340
20
        for 
property1
in proto_properties.properties {
341
1
            platform_properties.insert(property.name, property.value);
342
1
        }
343
344
        Ok(Self {
345
19
            command_digest: action
346
19
                .command_digest
347
19
                .err_tip(|| "Expected command_digest to exist on Action")
?0
348
19
                .try_into()
?0
,
349
19
            input_root_digest: action
350
19
                .input_root_digest
351
19
                .err_tip(|| "Expected input_root_digest to exist on Action")
?0
352
19
                .try_into()
?0
,
353
19
            timeout: action
354
19
                .timeout
355
19
                .unwrap_or_default()
356
19
                .try_into()
357
19
                .map_err(|_| make_input_err!("Failed convert proto duration to system duration"))
?0
,
358
19
            platform_properties,
359
19
            priority: execute_request
360
19
                .execution_policy
361
19
                .unwrap_or_default()
362
19
                .priority,
363
19
            load_timestamp,
364
19
            insert_timestamp: queued_timestamp,
365
19
            unique_qualifier,
366
        })
367
19
    }
368
}
369
370
impl From<&ActionInfo> for ExecuteRequest {
371
36
    fn from(val: &ActionInfo) -> Self {
372
36
        let digest = val.digest().into();
373
36
        let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier {
374
31
            ActionUniqueQualifier::Cacheable(unique_qualifier) => (false, unique_qualifier),
375
5
            ActionUniqueQualifier::Uncacheable(unique_qualifier) => (true, unique_qualifier),
376
        };
377
36
        Self {
378
36
            instance_name: unique_qualifier.instance_name.clone(),
379
36
            action_digest: Some(digest),
380
36
            skip_cache_lookup,
381
36
            execution_policy: None,     // Not used in the worker.
382
36
            results_cache_policy: None, // Not used in the worker.
383
36
            digest_function: unique_qualifier.digest_function.proto_digest_func().into(),
384
36
        }
385
36
    }
386
}
387
388
/// Simple utility struct to determine if a string is representing a full path or
389
/// just the name of the file.
390
/// This is in order to be able to reuse the same struct instead of building different
391
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
392
/// structs.
393
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
394
pub enum NameOrPath {
395
    Name(String),
396
    Path(String),
397
}
398
399
impl PartialOrd for NameOrPath {
400
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
401
0
        Some(self.cmp(other))
402
0
    }
403
}
404
405
impl Ord for NameOrPath {
406
0
    fn cmp(&self, other: &Self) -> Ordering {
407
0
        let self_lexical_name = match self {
408
0
            Self::Name(name) => name,
409
0
            Self::Path(path) => path,
410
        };
411
0
        let other_lexical_name = match other {
412
0
            Self::Name(name) => name,
413
0
            Self::Path(path) => path,
414
        };
415
0
        self_lexical_name.cmp(other_lexical_name)
416
0
    }
417
}
418
419
/// Represents an individual file and associated metadata.
420
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs
421
/// in `remote_execution.proto`.
422
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
423
pub struct FileInfo {
424
    pub name_or_path: NameOrPath,
425
    pub digest: DigestInfo,
426
    pub is_executable: bool,
427
}
428
429
impl TryFrom<FileInfo> for FileNode {
430
    type Error = Error;
431
432
3
    fn try_from(val: FileInfo) -> Result<Self, Error> {
433
3
        match val.name_or_path {
434
0
            NameOrPath::Path(_) => Err(make_input_err!(
435
0
                "Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
436
0
            )),
437
3
            NameOrPath::Name(name) => Ok(Self {
438
3
                name,
439
3
                digest: Some((&val.digest).into()),
440
3
                is_executable: val.is_executable,
441
3
                node_properties: None, // Not supported.
442
3
            }),
443
        }
444
3
    }
445
}
446
447
impl TryFrom<OutputFile> for FileInfo {
448
    type Error = Error;
449
450
2
    fn try_from(output_file: OutputFile) -> Result<Self, Error> {
451
        Ok(Self {
452
2
            name_or_path: NameOrPath::Path(output_file.path),
453
2
            digest: output_file
454
2
                .digest
455
2
                .err_tip(|| "Expected digest to exist on OutputFile")
?0
456
2
                .try_into()
?0
,
457
2
            is_executable: output_file.is_executable,
458
        })
459
2
    }
460
}
461
462
impl TryFrom<FileInfo> for OutputFile {
463
    type Error = Error;
464
465
7
    fn try_from(val: FileInfo) -> Result<Self, Error> {
466
7
        match val.name_or_path {
467
0
            NameOrPath::Name(_) => Err(make_input_err!(
468
0
                "Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
469
0
            )),
470
7
            NameOrPath::Path(path) => Ok(Self {
471
7
                path,
472
7
                digest: Some((&val.digest).into()),
473
7
                is_executable: val.is_executable,
474
7
                contents: Bytes::default(),
475
7
                node_properties: None, // Not supported.
476
7
            }),
477
        }
478
7
    }
479
}
480
481
/// Represents an individual symlink file and associated metadata.
482
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
483
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
484
pub struct SymlinkInfo {
485
    pub name_or_path: NameOrPath,
486
    pub target: String,
487
}
488
489
impl TryFrom<SymlinkNode> for SymlinkInfo {
490
    type Error = Error;
491
492
0
    fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
493
0
        Ok(Self {
494
0
            name_or_path: NameOrPath::Name(symlink_node.name),
495
0
            target: symlink_node.target,
496
0
        })
497
0
    }
498
}
499
500
impl TryFrom<SymlinkInfo> for SymlinkNode {
501
    type Error = Error;
502
503
1
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
504
1
        match val.name_or_path {
505
0
            NameOrPath::Path(_) => Err(make_input_err!(
506
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()"
507
0
            )),
508
1
            NameOrPath::Name(name) => Ok(Self {
509
1
                name,
510
1
                target: val.target,
511
1
                node_properties: None, // Not supported.
512
1
            }),
513
        }
514
1
    }
515
}
516
517
impl TryFrom<OutputSymlink> for SymlinkInfo {
518
    type Error = Error;
519
520
2
    fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
521
2
        Ok(Self {
522
2
            name_or_path: NameOrPath::Path(output_symlink.path),
523
2
            target: output_symlink.target,
524
2
        })
525
2
    }
526
}
527
528
impl TryFrom<SymlinkInfo> for OutputSymlink {
529
    type Error = Error;
530
531
2
    fn try_from(val: SymlinkInfo) -> Result<Self, Error> {
532
2
        match val.name_or_path {
533
2
            NameOrPath::Path(path) => {
534
2
                Ok(Self {
535
2
                    path,
536
2
                    target: val.target,
537
2
                    node_properties: None, // Not supported.
538
2
                })
539
            }
540
0
            NameOrPath::Name(_) => Err(make_input_err!(
541
0
                "Cannot return a SymlinkInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()"
542
0
            )),
543
        }
544
2
    }
545
}
546
547
/// Represents an individual directory file and associated metadata.
548
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
549
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
550
pub struct DirectoryInfo {
551
    pub path: String,
552
    pub tree_digest: DigestInfo,
553
}
554
555
impl TryFrom<OutputDirectory> for DirectoryInfo {
556
    type Error = Error;
557
558
2
    fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> {
559
        Ok(Self {
560
2
            path: output_directory.path,
561
2
            tree_digest: output_directory
562
2
                .tree_digest
563
2
                .err_tip(|| "Expected tree_digest to exist in OutputDirectory")
?0
564
2
                .try_into()
?0
,
565
        })
566
2
    }
567
}
568
569
impl From<DirectoryInfo> for OutputDirectory {
570
1
    fn from(val: DirectoryInfo) -> Self {
571
1
        Self {
572
1
            path: val.path,
573
1
            tree_digest: Some(val.tree_digest.into()),
574
1
            is_topologically_sorted: false,
575
1
        }
576
1
    }
577
}
578
579
/// Represents the metadata associated with the execution result.
580
/// This struct must be 100% compatible with `ExecutedActionMetadata`.
581
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
582
pub struct ExecutionMetadata {
583
    pub worker: String,
584
    pub queued_timestamp: SystemTime,
585
    pub worker_start_timestamp: SystemTime,
586
    pub worker_completed_timestamp: SystemTime,
587
    pub input_fetch_start_timestamp: SystemTime,
588
    pub input_fetch_completed_timestamp: SystemTime,
589
    pub execution_start_timestamp: SystemTime,
590
    pub execution_completed_timestamp: SystemTime,
591
    pub output_upload_start_timestamp: SystemTime,
592
    pub output_upload_completed_timestamp: SystemTime,
593
}
594
595
impl Default for ExecutionMetadata {
596
2
    fn default() -> Self {
597
2
        Self {
598
2
            worker: String::new(),
599
2
            queued_timestamp: SystemTime::UNIX_EPOCH,
600
2
            worker_start_timestamp: SystemTime::UNIX_EPOCH,
601
2
            worker_completed_timestamp: SystemTime::UNIX_EPOCH,
602
2
            input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
603
2
            input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
604
2
            execution_start_timestamp: SystemTime::UNIX_EPOCH,
605
2
            execution_completed_timestamp: SystemTime::UNIX_EPOCH,
606
2
            output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
607
2
            output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
608
2
        }
609
2
    }
610
}
611
612
impl From<ExecutionMetadata> for ExecutedActionMetadata {
613
17
    fn from(val: ExecutionMetadata) -> Self {
614
        Self {
615
17
            worker: val.worker,
616
17
            queued_timestamp: Some(val.queued_timestamp.into()),
617
17
            worker_start_timestamp: Some(val.worker_start_timestamp.into()),
618
17
            worker_completed_timestamp: Some(val.worker_completed_timestamp.into()),
619
17
            input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()),
620
17
            input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()),
621
17
            execution_start_timestamp: Some(val.execution_start_timestamp.into()),
622
17
            execution_completed_timestamp: Some(val.execution_completed_timestamp.into()),
623
17
            output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()),
624
17
            output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()),
625
17
            virtual_execution_duration: val
626
17
                .execution_completed_timestamp
627
17
                .duration_since(val.execution_start_timestamp)
628
17
                .ok()
629
17
                .and_then(|duration| prost_types::Duration::try_from(duration).ok()),
630
17
            auxiliary_metadata: Vec::default(),
631
        }
632
17
    }
633
}
634
635
impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
636
    type Error = Error;
637
638
3
    fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> {
639
        Ok(Self {
640
3
            worker: eam.worker,
641
3
            queued_timestamp: eam
642
3
                .queued_timestamp
643
3
                .err_tip(|| "Expected queued_timestamp to exist in ExecutedActionMetadata")
?0
644
3
                .try_into()
?0
,
645
3
            worker_start_timestamp: eam
646
3
                .worker_start_timestamp
647
3
                .err_tip(|| "Expected worker_start_timestamp to exist in ExecutedActionMetadata")
?0
648
3
                .try_into()
?0
,
649
3
            worker_completed_timestamp: eam
650
3
                .worker_completed_timestamp
651
3
                .err_tip(|| 
{0
652
0
                    "Expected worker_completed_timestamp to exist in ExecutedActionMetadata"
653
0
                })?
654
3
                .try_into()
?0
,
655
3
            input_fetch_start_timestamp: eam
656
3
                .input_fetch_start_timestamp
657
3
                .err_tip(|| 
{0
658
0
                    "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata"
659
0
                })?
660
3
                .try_into()
?0
,
661
3
            input_fetch_completed_timestamp: eam
662
3
                .input_fetch_completed_timestamp
663
3
                .err_tip(|| 
{0
664
0
                    "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata"
665
0
                })?
666
3
                .try_into()
?0
,
667
3
            execution_start_timestamp: eam
668
3
                .execution_start_timestamp
669
3
                .err_tip(|| 
{0
670
0
                    "Expected execution_start_timestamp to exist in ExecutedActionMetadata"
671
0
                })?
672
3
                .try_into()
?0
,
673
3
            execution_completed_timestamp: eam
674
3
                .execution_completed_timestamp
675
3
                .err_tip(|| 
{0
676
0
                    "Expected execution_completed_timestamp to exist in ExecutedActionMetadata"
677
0
                })?
678
3
                .try_into()
?0
,
679
3
            output_upload_start_timestamp: eam
680
3
                .output_upload_start_timestamp
681
3
                .err_tip(|| 
{0
682
0
                    "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata"
683
0
                })?
684
3
                .try_into()
?0
,
685
3
            output_upload_completed_timestamp: eam
686
3
                .output_upload_completed_timestamp
687
3
                .err_tip(|| 
{0
688
0
                    "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata"
689
0
                })?
690
3
                .try_into()
?0
,
691
        })
692
3
    }
693
}
694
695
/// Represents the results of an execution.
696
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
697
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
698
pub struct ActionResult {
699
    pub output_files: Vec<FileInfo>,
700
    pub output_folders: Vec<DirectoryInfo>,
701
    pub output_directory_symlinks: Vec<SymlinkInfo>,
702
    pub output_file_symlinks: Vec<SymlinkInfo>,
703
    pub exit_code: i32,
704
    pub stdout_digest: DigestInfo,
705
    pub stderr_digest: DigestInfo,
706
    pub execution_metadata: ExecutionMetadata,
707
    pub server_logs: HashMap<String, DigestInfo>,
708
    pub error: Option<Error>,
709
    pub message: String,
710
}
711
712
impl Default for ActionResult {
713
8
    fn default() -> Self {
714
8
        Self {
715
8
            output_files: Vec::default(),
716
8
            output_folders: Vec::default(),
717
8
            output_directory_symlinks: Vec::default(),
718
8
            output_file_symlinks: Vec::default(),
719
8
            exit_code: INTERNAL_ERROR_EXIT_CODE,
720
8
            stdout_digest: DigestInfo::new([0u8; 32], 0),
721
8
            stderr_digest: DigestInfo::new([0u8; 32], 0),
722
8
            execution_metadata: ExecutionMetadata {
723
8
                worker: String::new(),
724
8
                queued_timestamp: SystemTime::UNIX_EPOCH,
725
8
                worker_start_timestamp: SystemTime::UNIX_EPOCH,
726
8
                worker_completed_timestamp: SystemTime::UNIX_EPOCH,
727
8
                input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
728
8
                input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
729
8
                execution_start_timestamp: SystemTime::UNIX_EPOCH,
730
8
                execution_completed_timestamp: SystemTime::UNIX_EPOCH,
731
8
                output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
732
8
                output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
733
8
            },
734
8
            server_logs: HashMap::default(),
735
8
            error: None,
736
8
            message: String::new(),
737
8
        }
738
8
    }
739
}
740
741
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
742
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
743
#[allow(
744
    clippy::large_enum_variant,
745
    reason = "TODO box the two relevant variants in a breaking release. Unfulfilled on nightly"
746
)]
747
pub enum ActionStage {
748
    /// Stage is unknown.
749
    Unknown,
750
    /// Checking the cache to see if action exists.
751
    CacheCheck,
752
    /// Action has been accepted and waiting for worker to take it.
753
    Queued,
754
    // TODO(palfrey) We need a way to know if the job was sent to a worker, but hasn't begun
755
    // execution yet.
756
    /// Worker is executing the action.
757
    Executing,
758
    /// Worker completed the work with result.
759
    Completed(ActionResult),
760
    /// Result was found from cache, don't decode the proto just to re-encode it.
761
    #[serde(serialize_with = "serialize_proto_result", skip_deserializing)]
762
    // The serialization step decodes this to an ActionResult which is serializable.
763
    // Since it will always be serialized as an ActionResult, we do not need to support
764
    // deserialization on this type at all.
765
    // In theory, serializing this should never happen so performance shouldn't be affected.
766
    CompletedFromCache(ProtoActionResult),
767
}
768
769
0
fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error>
770
0
where
771
0
    S: serde::Serializer,
772
{
773
0
    let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?;
774
0
    s.serialize(serializer)
775
0
}
776
777
impl ActionStage {
778
100
    pub const fn has_action_result(&self) -> bool {
779
100
        match self {
780
86
            Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
781
14
            Self::Completed(_) | Self::CompletedFromCache(_) => true,
782
        }
783
100
    }
784
785
    /// Returns true if the worker considers the action done and no longer needs to be tracked.
786
    // Note: This function is separate from `has_action_result()` to not mix the concept of
787
    //       "finished" with "has a result".
788
99
    pub const fn is_finished(&self) -> bool {
789
99
        self.has_action_result()
790
99
    }
791
792
    /// Returns if the stage enum is the same as the other stage enum, but
793
    /// does not compare the values of the enum.
794
39
    pub const fn is_same_stage(&self, other: &Self) -> bool {
795
39
        matches!(
796
39
            (self, other),
797
            (Self::Unknown, Self::Unknown)
798
                | (Self::CacheCheck, Self::CacheCheck)
799
                | (Self::Queued, Self::Queued)
800
                | (Self::Executing, Self::Executing)
801
                | (Self::Completed(_), Self::Completed(_))
802
                | (Self::CompletedFromCache(_), Self::CompletedFromCache(_))
803
        )
804
39
    }
805
}
806
807
impl MetricsComponent for ActionStage {
808
0
    fn publish(
809
0
        &self,
810
0
        _kind: MetricKind,
811
0
        _field_metadata: MetricFieldData,
812
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
813
0
        let value = match self {
814
0
            Self::Unknown => "Unknown".to_string(),
815
0
            Self::CacheCheck => "CacheCheck".to_string(),
816
0
            Self::Queued => "Queued".to_string(),
817
0
            Self::Executing => "Executing".to_string(),
818
0
            Self::Completed(_) => "Completed".to_string(),
819
0
            Self::CompletedFromCache(_) => "CompletedFromCache".to_string(),
820
        };
821
0
        Ok(MetricPublishKnownKindData::String(value))
822
0
    }
823
}
824
825
impl From<&ActionStage> for execution_stage::Value {
826
1
    fn from(val: &ActionStage) -> Self {
827
1
        match val {
828
0
            ActionStage::Unknown => Self::Unknown,
829
0
            ActionStage::CacheCheck => Self::CacheCheck,
830
0
            ActionStage::Queued => Self::Queued,
831
0
            ActionStage::Executing => Self::Executing,
832
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
833
        }
834
1
    }
835
}
836
837
11
pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse {
838
11
    fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> {
839
11
        let mut logs = HashMap::with_capacity(server_logs.len());
840
12
        for (
k1
,
v1
) in server_logs {
841
1
            logs.insert(
842
1
                k.clone(),
843
1
                LogFile {
844
1
                    digest: Some(v.into()),
845
1
                    human_readable: false,
846
1
                },
847
1
            );
848
1
        }
849
11
        logs
850
11
    }
851
852
11
    let status = Some(
853
11
        action_result
854
11
            .error
855
11
            .clone()
856
11
            .map_or_else(Status::default, Into::into),
857
11
    );
858
11
    let message = action_result.message.clone();
859
11
    ExecuteResponse {
860
11
        server_logs: logs_from(action_result.server_logs.clone()),
861
11
        result: action_result.try_into().ok(),
862
11
        cached_result: false,
863
11
        status,
864
11
        message,
865
11
    }
866
11
}
867
868
impl From<ActionStage> for ExecuteResponse {
869
6
    fn from(val: ActionStage) -> Self {
870
6
        match val {
871
            // We don't have an execute response if we don't have the results. It is defined
872
            // behavior to return an empty proto struct.
873
            ActionStage::Unknown
874
            | ActionStage::CacheCheck
875
            | ActionStage::Queued
876
0
            | ActionStage::Executing => Self::default(),
877
6
            ActionStage::Completed(action_result) => to_execute_response(action_result),
878
            // Handled separately as there are no server logs and the action
879
            // result is already in Proto format.
880
0
            ActionStage::CompletedFromCache(proto_action_result) => Self {
881
0
                server_logs: HashMap::new(),
882
0
                result: Some(proto_action_result),
883
0
                cached_result: true,
884
0
                status: Some(Status::default()),
885
0
                message: String::new(), // Will be populated later if applicable.
886
0
            },
887
        }
888
6
    }
889
}
890
891
impl TryFrom<ActionResult> for ProtoActionResult {
892
    type Error = Error;
893
894
17
    fn try_from(val: ActionResult) -> Result<Self, Error> {
895
17
        let mut output_symlinks = Vec::with_capacity(
896
17
            val.output_file_symlinks.len() + val.output_directory_symlinks.len(),
897
        );
898
17
        output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice());
899
17
        output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice());
900
901
        Ok(Self {
902
17
            output_files: val
903
17
                .output_files
904
17
                .into_iter()
905
17
                .map(TryInto::try_into)
906
17
                .collect::<Result<_, _>>()
?0
,
907
17
            output_file_symlinks: val
908
17
                .output_file_symlinks
909
17
                .into_iter()
910
17
                .map(TryInto::try_into)
911
17
                .collect::<Result<_, _>>()
?0
,
912
17
            output_symlinks: output_symlinks
913
17
                .into_iter()
914
17
                .map(TryInto::try_into)
915
17
                .collect::<Result<_, _>>()
?0
,
916
17
            output_directories: val
917
17
                .output_folders
918
17
                .into_iter()
919
17
                .map(TryInto::try_into)
920
17
                .collect::<Result<_, _>>()
?0
,
921
17
            output_directory_symlinks: val
922
17
                .output_directory_symlinks
923
17
                .into_iter()
924
17
                .map(TryInto::try_into)
925
17
                .collect::<Result<_, _>>()
?0
,
926
17
            exit_code: val.exit_code,
927
17
            stdout_raw: Bytes::default(),
928
17
            stdout_digest: Some(val.stdout_digest.into()),
929
17
            stderr_raw: Bytes::default(),
930
17
            stderr_digest: Some(val.stderr_digest.into()),
931
17
            execution_metadata: Some(val.execution_metadata.into()),
932
        })
933
17
    }
934
}
935
936
impl TryFrom<ProtoActionResult> for ActionResult {
937
    type Error = Error;
938
939
0
    fn try_from(val: ProtoActionResult) -> Result<Self, Error> {
940
0
        let output_file_symlinks = val
941
0
            .output_file_symlinks
942
0
            .into_iter()
943
0
            .map(|output_symlink| {
944
0
                SymlinkInfo::try_from(output_symlink)
945
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
946
0
            })
947
0
            .collect::<Result<Vec<_>, _>>()?;
948
949
0
        let output_directory_symlinks = val
950
0
            .output_directory_symlinks
951
0
            .into_iter()
952
0
            .map(|output_symlink| {
953
0
                SymlinkInfo::try_from(output_symlink)
954
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
955
0
            })
956
0
            .collect::<Result<Vec<_>, _>>()?;
957
958
0
        let output_files = val
959
0
            .output_files
960
0
            .into_iter()
961
0
            .map(|output_file| {
962
0
                output_file
963
0
                    .try_into()
964
0
                    .err_tip(|| "Output File could not be converted")
965
0
            })
966
0
            .collect::<Result<Vec<_>, _>>()?;
967
968
0
        let output_folders = val
969
0
            .output_directories
970
0
            .into_iter()
971
0
            .map(|output_directory| {
972
0
                output_directory
973
0
                    .try_into()
974
0
                    .err_tip(|| "Output File could not be converted")
975
0
            })
976
0
            .collect::<Result<Vec<_>, _>>()?;
977
978
        Ok(Self {
979
0
            output_files,
980
0
            output_folders,
981
0
            output_file_symlinks,
982
0
            output_directory_symlinks,
983
0
            exit_code: val.exit_code,
984
0
            stdout_digest: val
985
0
                .stdout_digest
986
0
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?
987
0
                .try_into()?,
988
0
            stderr_digest: val
989
0
                .stderr_digest
990
0
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?
991
0
                .try_into()?,
992
0
            execution_metadata: val
993
0
                .execution_metadata
994
0
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?
995
0
                .try_into()?,
996
0
            server_logs: HashMap::default(),
997
0
            error: None,
998
0
            message: String::new(),
999
        })
1000
0
    }
1001
}
1002
1003
impl TryFrom<ExecuteResponse> for ActionStage {
1004
    type Error = Error;
1005
1006
3
    fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> {
1007
3
        let proto_action_result = execute_response
1008
3
            .result
1009
3
            .err_tip(|| "Expected result to be set on ExecuteResponse msg")
?0
;
1010
3
        let action_result = ActionResult {
1011
3
            output_files: proto_action_result
1012
3
                .output_files
1013
3
                .try_map(TryInto::try_into)
?0
,
1014
3
            output_directory_symlinks: proto_action_result
1015
3
                .output_directory_symlinks
1016
3
                .try_map(TryInto::try_into)
?0
,
1017
3
            output_file_symlinks: proto_action_result
1018
3
                .output_file_symlinks
1019
3
                .try_map(TryInto::try_into)
?0
,
1020
3
            output_folders: proto_action_result
1021
3
                .output_directories
1022
3
                .try_map(TryInto::try_into)
?0
,
1023
3
            exit_code: proto_action_result.exit_code,
1024
1025
3
            stdout_digest: proto_action_result
1026
3
                .stdout_digest
1027
3
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")
?0
1028
3
                .try_into()
?0
,
1029
3
            stderr_digest: proto_action_result
1030
3
                .stderr_digest
1031
3
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")
?0
1032
3
                .try_into()
?0
,
1033
3
            execution_metadata: proto_action_result
1034
3
                .execution_metadata
1035
3
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")
?0
1036
3
                .try_into()
?0
,
1037
3
            server_logs: execute_response.server_logs.try_map(|v| 
{2
1038
2
                v.digest
1039
2
                    .err_tip(|| "Expected digest to be set on LogFile msg")
?0
1040
2
                    .try_into()
1041
2
            })
?0
,
1042
3
            error: execute_response
1043
3
                .status
1044
3
                .clone()
1045
3
                .and_then(|v| if v.code == 0 { 
None1
} else {
Some(v.into())2
}),
  Branch (1045:34): [True: 1, False: 2]
  Branch (1045:34): [Folded - Ignored]
1046
3
            message: execute_response.message,
1047
        };
1048
1049
3
        if execute_response.cached_result {
  Branch (1049:12): [True: 0, False: 3]
  Branch (1049:12): [Folded - Ignored]
1050
0
            return Ok(Self::CompletedFromCache(action_result.try_into()?));
1051
3
        }
1052
3
        Ok(Self::Completed(action_result))
1053
3
    }
1054
}
1055
1056
// TODO: Should be able to remove this after tokio-rs/prost#299
1057
trait TypeUrl: Message {
1058
    const TYPE_URL: &'static str;
1059
}
1060
1061
impl TypeUrl for ExecuteResponse {
1062
    const TYPE_URL: &'static str =
1063
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
1064
}
1065
1066
impl TypeUrl for ExecuteOperationMetadata {
1067
    const TYPE_URL: &'static str =
1068
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
1069
}
1070
1071
2
fn from_any<T>(message: &Any) -> Result<T, Error>
1072
2
where
1073
2
    T: TypeUrl + Default,
1074
{
1075
0
    error_if!(
1076
2
        message.type_url != T::TYPE_URL,
  Branch (1076:9): [True: 0, False: 1]
  Branch (1076:9): [True: 0, False: 1]
  Branch (1076:9): [Folded - Ignored]
1077
        "Incorrect type when decoding Any. {} != {}",
1078
        message.type_url,
1079
0
        T::TYPE_URL.to_string()
1080
    );
1081
2
    Ok(T::decode(message.value.as_slice())
?0
)
1082
2
}
1083
1084
2
fn to_any<T>(message: &T) -> Any
1085
2
where
1086
2
    T: TypeUrl,
1087
{
1088
2
    Any {
1089
2
        type_url: T::TYPE_URL.to_string(),
1090
2
        value: message.encode_to_vec(),
1091
2
    }
1092
2
}
1093
1094
/// Current state of the action.
1095
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
1096
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize, MetricsComponent)]
1097
pub struct ActionState {
1098
    #[metric(help = "The current stage of the action.")]
1099
    pub stage: ActionStage,
1100
    #[metric(help = "The unique identifier of the action.")]
1101
    pub client_operation_id: OperationId,
1102
    #[metric(help = "The digest of the action.")]
1103
    pub action_digest: DigestInfo,
1104
}
1105
1106
impl ActionState {
1107
1
    pub fn try_from_operation(
1108
1
        operation: Operation,
1109
1
        client_operation_id: OperationId,
1110
1
    ) -> Result<Self, Error> {
1111
1
        let metadata = from_any::<ExecuteOperationMetadata>(
1112
1
            &operation
1113
1
                .metadata
1114
1
                .err_tip(|| "No metadata in upstream operation")
?0
,
1115
        )
1116
1
        .err_tip(|| "Could not decode metadata in upstream operation")
?0
;
1117
1118
1
        let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| 
{0
1119
0
            format!(
1120
0
                "Could not convert {} to execution_stage::Value",
1121
                metadata.stage
1122
            )
1123
0
        })? {
1124
0
            execution_stage::Value::Unknown => ActionStage::Unknown,
1125
0
            execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
1126
0
            execution_stage::Value::Queued => ActionStage::Queued,
1127
0
            execution_stage::Value::Executing => ActionStage::Executing,
1128
            execution_stage::Value::Completed => {
1129
1
                let execute_response = operation
1130
1
                    .result
1131
1
                    .err_tip(|| "No result data for completed upstream action")
?0
;
1132
1
                match execute_response {
1133
0
                    LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
1134
0
                        error: Some(error.into()),
1135
0
                        ..ActionResult::default()
1136
0
                    }),
1137
1
                    LongRunningResult::Response(response) => {
1138
                        // Could be Completed, CompletedFromCache or Error.
1139
1
                        from_any::<ExecuteResponse>(&response)
1140
1
                            .err_tip(|| 
{0
1141
0
                                "Could not decode result structure for completed upstream action"
1142
0
                            })?
1143
1
                            .try_into()
?0
1144
                    }
1145
                }
1146
            }
1147
        };
1148
1149
1
        let action_digest = metadata
1150
1
            .action_digest
1151
1
            .err_tip(|| "No action_digest in upstream operation")
?0
1152
1
            .try_into()
1153
1
            .err_tip(|| "Could not convert action_digest into DigestInfo")
?0
;
1154
1155
1
        Ok(Self {
1156
1
            stage,
1157
1
            client_operation_id,
1158
1
            action_digest,
1159
1
        })
1160
1
    }
1161
1162
1
    pub fn as_operation(&self, client_operation_id: OperationId) -> Operation {
1163
1
        let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
1164
1
        let name = client_operation_id.into_string();
1165
1166
1
        let result = if self.stage.has_action_result() {
  Branch (1166:25): [True: 1, False: 0]
  Branch (1166:25): [Folded - Ignored]
1167
1
            let execute_response: ExecuteResponse = self.stage.clone().into();
1168
1
            Some(LongRunningResult::Response(to_any(&execute_response)))
1169
        } else {
1170
0
            None
1171
        };
1172
1
        let digest = Some(self.action_digest.into());
1173
1174
1
        let metadata = ExecuteOperationMetadata {
1175
1
            stage,
1176
1
            action_digest: digest,
1177
1
            // TODO(palfrey) We should support stderr/stdout streaming.
1178
1
            stdout_stream_name: String::default(),
1179
1
            stderr_stream_name: String::default(),
1180
1
            partial_execution_metadata: None,
1181
1
        };
1182
1183
1
        Operation {
1184
1
            name,
1185
1
            metadata: Some(to_any(&metadata)),
1186
1
            done: result.is_some(),
1187
1
            result,
1188
1
        }
1189
1
    }
1190
}