Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/action_messages.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp::Ordering;
16
use std::collections::HashMap;
17
use std::convert::Into;
18
use std::hash::Hash;
19
use std::time::{Duration, SystemTime};
20
21
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
22
use nativelink_metric::{
23
    publish, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::{
26
    execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata,
27
    ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory,
28
    OutputFile, OutputSymlink, SymlinkNode,
29
};
30
use nativelink_proto::google::longrunning::operation::Result as LongRunningResult;
31
use nativelink_proto::google::longrunning::Operation;
32
use nativelink_proto::google::rpc::Status;
33
use prost::bytes::Bytes;
34
use prost::Message;
35
use prost_types::Any;
36
use serde::ser::Error as SerdeError;
37
use serde::{Deserialize, Serialize};
38
use uuid::Uuid;
39
40
use crate::common::{DigestInfo, HashMapExt, VecExt};
41
use crate::digest_hasher::DigestHasherFunc;
42
43
/// Default priority remote execution jobs will get when not provided.
44
pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0;
45
46
/// Exit code sent if there is an internal error.
47
pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178;
48
49
/// Holds an id that is unique to the client for a requested operation.
50
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
51
pub enum OperationId {
52
    Uuid(Uuid),
53
    String(String),
54
}
55
56
impl OperationId {
57
1
    pub fn into_string(self) -> String {
58
1
        match self {
59
1
            Self::Uuid(uuid) => uuid.to_string(),
60
0
            Self::String(name) => name,
61
        }
62
1
    }
63
}
64
65
impl Default for OperationId {
66
96
    fn default() -> Self {
67
96
        Self::Uuid(Uuid::new_v4())
68
96
    }
69
}
70
71
impl std::fmt::Display for OperationId {
72
78
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73
78
        match self {
74
71
            Self::Uuid(uuid) => uuid.fmt(f),
75
7
            Self::String(name) => f.write_str(name),
76
        }
77
78
    }
78
}
79
80
impl MetricsComponent for OperationId {
81
0
    fn publish(
82
0
        &self,
83
0
        _kind: MetricKind,
84
0
        _field_metadata: MetricFieldData,
85
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
86
0
        Ok(MetricPublishKnownKindData::String(self.to_string()))
87
0
    }
88
}
89
90
impl From<&str> for OperationId {
91
26
    fn from(value: &str) -> Self {
92
26
        match Uuid::parse_str(value) {
93
21
            Ok(uuid) => Self::Uuid(uuid),
94
5
            Err(_) => Self::String(value.to_string()),
95
        }
96
26
    }
97
}
98
99
impl From<String> for OperationId {
100
8
    fn from(value: String) -> Self {
101
8
        match Uuid::parse_str(&value) {
102
8
            Ok(uuid) => Self::Uuid(uuid),
103
0
            Err(_) => Self::String(value),
104
        }
105
8
    }
106
}
107
108
impl TryFrom<Bytes> for OperationId {
109
    type Error = Error;
110
111
0
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
112
0
        // This is an optimized path to attempt to do the conversion in-place
113
0
        // to avoid an extra allocation/copy.
114
0
        match value.try_into_mut() {
115
            // We are the only reference to the Bytes, so we can convert it into a Vec<u8>
116
            // for free then convert the Vec<u8> to a String for free too.
117
0
            Ok(value) => {
118
0
                let value = String::from_utf8(value.into()).map_err(|e| {
119
0
                    make_input_err!(
120
0
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
121
0
                    )
122
0
                })?;
123
0
                Ok(Self::from(value))
124
            }
125
            // We could not take ownership of the Bytes, so we may need to copy our data.
126
0
            Err(value) => {
127
0
                let value = std::str::from_utf8(&value).map_err(|e| {
128
0
                    make_input_err!(
129
0
                        "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}"
130
0
                    )
131
0
                })?;
132
0
                Ok(Self::from(value))
133
            }
134
        }
135
0
    }
136
}
137
138
/// Unique id of worker.
139
#[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
140
pub struct WorkerId(pub String);
141
142
impl MetricsComponent for WorkerId {
143
0
    fn publish(
144
0
        &self,
145
0
        _kind: MetricKind,
146
0
        _field_metadata: MetricFieldData,
147
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
148
0
        Ok(MetricPublishKnownKindData::String(self.0.clone()))
149
0
    }
150
}
151
152
impl std::fmt::Display for WorkerId {
153
25
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154
25
        f.write_fmt(format_args!("{}", self.0))
155
25
    }
156
}
157
158
impl std::fmt::Debug for WorkerId {
159
5
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160
5
        std::fmt::Display::fmt(&self, f)
161
5
    }
162
}
163
164
impl From<WorkerId> for String {
165
88
    fn from(val: WorkerId) -> Self {
166
88
        val.0
167
88
    }
168
}
169
170
impl From<String> for WorkerId {
171
8
    fn from(s: String) -> Self {
172
8
        WorkerId(s)
173
8
    }
174
}
175
176
/// Holds the information needed to uniquely identify an action
177
/// and if it is cachable or not.
178
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
179
pub enum ActionUniqueQualifier {
180
    /// The action is cachable.
181
    Cachable(ActionUniqueKey),
182
    /// The action is uncachable.
183
    Uncachable(ActionUniqueKey),
184
}
185
186
impl MetricsComponent for ActionUniqueQualifier {
187
0
    fn publish(
188
0
        &self,
189
0
        _kind: MetricKind,
190
0
        field_metadata: MetricFieldData,
191
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
192
0
        let (cachable, action) = match self {
193
0
            Self::Cachable(action) => (true, action),
194
0
            Self::Uncachable(action) => (false, action),
195
        };
196
0
        publish!(
197
0
            cachable,
198
0
            &cachable,
199
0
            MetricKind::Default,
200
0
            "If the action is cachable.",
201
0
            ""
202
        );
203
0
        action.publish(MetricKind::Component, field_metadata)?;
204
0
        Ok(MetricPublishKnownKindData::Component)
205
0
    }
206
}
207
208
impl ActionUniqueQualifier {
209
    /// Get the `instance_name` of the action.
210
0
    pub const fn instance_name(&self) -> &String {
211
0
        match self {
212
0
            Self::Cachable(action) | Self::Uncachable(action) => &action.instance_name,
213
0
        }
214
0
    }
215
216
    /// Get the digest function of the action.
217
11
    pub const fn digest_function(&self) -> DigestHasherFunc {
218
11
        match self {
219
11
            Self::Cachable(action) | Self::Uncachable(
action0
) => action.digest_function,
220
11
        }
221
11
    }
222
223
    /// Get the digest of the action.
224
115
    pub const fn digest(&self) -> DigestInfo {
225
115
        match self {
226
115
            Self::Cachable(
action110
) | Self::Uncachable(
action5
) => action.digest,
227
115
        }
228
115
    }
229
}
230
231
impl std::fmt::Display for ActionUniqueQualifier {
232
3
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233
3
        let (cachable, unique_key) = match self {
234
3
            Self::Cachable(action) => (true, action),
235
0
            Self::Uncachable(action) => (false, action),
236
        };
237
3
        f.write_fmt(format_args!(
238
3
            // Note: We use underscores because it makes escaping easier
239
3
            // for redis.
240
3
            "{}_{}_{}_{}_{}",
241
3
            unique_key.instance_name,
242
3
            unique_key.digest_function,
243
3
            unique_key.digest.packed_hash(),
244
3
            unique_key.digest.size_bytes(),
245
3
            if cachable { 'c' } else { 
'u'0
},
  Branch (245:16): [True: 3, False: 0]
  Branch (245:16): [Folded - Ignored]
246
        ))
247
3
    }
248
}
249
250
/// This is a utility struct used to make it easier to match `ActionInfos` in a
251
/// `HashMap` without needing to construct an entire `ActionInfo`.
252
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, MetricsComponent)]
253
pub struct ActionUniqueKey {
254
    /// Name of instance group this action belongs to.
255
    #[metric(help = "Name of instance group this action belongs to.")]
256
    pub instance_name: String,
257
    /// The digest function this action expects.
258
    #[metric(help = "The digest function this action expects.")]
259
    pub digest_function: DigestHasherFunc,
260
    /// Digest of the underlying `Action`.
261
    #[metric(help = "Digest of the underlying Action.")]
262
    pub digest: DigestInfo,
263
}
264
265
impl std::fmt::Display for ActionUniqueKey {
266
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267
0
        f.write_fmt(format_args!(
268
0
            "{}/{}/{}",
269
0
            self.instance_name, self.digest_function, self.digest,
270
0
        ))
271
0
    }
272
}
273
274
/// Information needed to execute an action. This struct is used over bazel's proto `Action`
275
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
276
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
277
/// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto`
278
/// except for the salt field.
279
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, MetricsComponent)]
280
pub struct ActionInfo {
281
    /// Digest of the underlying `Command`.
282
    #[metric(help = "Digest of the underlying Command.")]
283
    pub command_digest: DigestInfo,
284
    /// Digest of the underlying `Directory`.
285
    #[metric(help = "Digest of the underlying Directory.")]
286
    pub input_root_digest: DigestInfo,
287
    /// Timeout of the action.
288
    #[metric(help = "Timeout of the action.")]
289
    pub timeout: Duration,
290
    /// The properties rules that must be applied when finding a worker that can run this action.
291
    #[metric(group = "platform_properties")]
292
    pub platform_properties: HashMap<String, String>,
293
    /// The priority of the action. Higher value means it should execute faster.
294
    #[metric(help = "The priority of the action. Higher value means it should execute faster.")]
295
    pub priority: i32,
296
    /// When this action started to be loaded from the CAS.
297
    #[metric(help = "When this action started to be loaded from the CAS.")]
298
    pub load_timestamp: SystemTime,
299
    /// When this action was created.
300
    #[metric(help = "When this action was created.")]
301
    pub insert_timestamp: SystemTime,
302
    /// Info used to uniquely identify this `ActionInfo` and if it is cachable.
303
    /// This is primarily used to join actions/operations together using this key.
304
    #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cachable.")]
305
    pub unique_qualifier: ActionUniqueQualifier,
306
}
307
308
impl ActionInfo {
309
    #[inline]
310
0
    pub const fn instance_name(&self) -> &String {
311
0
        self.unique_qualifier.instance_name()
312
0
    }
313
314
    /// Returns the underlying digest of the `Action`.
315
    #[inline]
316
74
    pub const fn digest(&self) -> DigestInfo {
317
74
        self.unique_qualifier.digest()
318
74
    }
319
320
16
    pub fn try_from_action_and_execute_request(
321
16
        execute_request: ExecuteRequest,
322
16
        action: Action,
323
16
        load_timestamp: SystemTime,
324
16
        queued_timestamp: SystemTime,
325
16
    ) -> Result<Self, Error> {
326
16
        let unique_key = ActionUniqueKey {
327
16
            instance_name: execute_request.instance_name,
328
16
            digest_function: DigestHasherFunc::try_from(execute_request.digest_function)
329
16
                .err_tip(|| 
format!("Could not find digest_function in try_from_action_and_execute_request {:?}", execute_request.digest_function)0
)
?0
,
330
16
            digest: execute_request
331
16
                .action_digest
332
16
                .err_tip(|| 
"Expected action_digest to exist on ExecuteRequest"0
)
?0
333
16
                .try_into()
?0
,
334
        };
335
16
        let unique_qualifier = if execute_request.skip_cache_lookup {
  Branch (335:35): [True: 0, False: 16]
  Branch (335:35): [Folded - Ignored]
336
0
            ActionUniqueQualifier::Uncachable(unique_key)
337
        } else {
338
16
            ActionUniqueQualifier::Cachable(unique_key)
339
        };
340
341
16
        let proto_properties = action.platform.unwrap_or_default();
342
16
        let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len());
343
17
        for 
property1
in proto_properties.properties {
344
1
            platform_properties.insert(property.name, property.value);
345
1
        }
346
347
        Ok(Self {
348
16
            command_digest: action
349
16
                .command_digest
350
16
                .err_tip(|| 
"Expected command_digest to exist on Action"0
)
?0
351
16
                .try_into()
?0
,
352
16
            input_root_digest: action
353
16
                .input_root_digest
354
16
                .err_tip(|| 
"Expected input_root_digest to exist on Action"0
)
?0
355
16
                .try_into()
?0
,
356
16
            timeout: action
357
16
                .timeout
358
16
                .unwrap_or_default()
359
16
                .try_into()
360
16
                .map_err(|_| 
make_input_err!("Failed convert proto duration to system duration")0
)
?0
,
361
16
            platform_properties,
362
16
            priority: execute_request
363
16
                .execution_policy
364
16
                .unwrap_or_default()
365
16
                .priority,
366
16
            load_timestamp,
367
16
            insert_timestamp: queued_timestamp,
368
16
            unique_qualifier,
369
        })
370
16
    }
371
}
372
373
impl From<&ActionInfo> for ExecuteRequest {
374
32
    fn from(val: &ActionInfo) -> Self {
375
32
        let digest = val.digest().into();
376
32
        let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier {
377
27
            ActionUniqueQualifier::Cachable(unique_qualifier) => (false, unique_qualifier),
378
5
            ActionUniqueQualifier::Uncachable(unique_qualifier) => (true, unique_qualifier),
379
        };
380
32
        Self {
381
32
            instance_name: unique_qualifier.instance_name.clone(),
382
32
            action_digest: Some(digest),
383
32
            skip_cache_lookup,
384
32
            execution_policy: None,     // Not used in the worker.
385
32
            results_cache_policy: None, // Not used in the worker.
386
32
            digest_function: unique_qualifier.digest_function.proto_digest_func().into(),
387
32
        }
388
32
    }
389
}
390
391
/// Simple utility struct to determine if a string is representing a full path or
392
/// just the name of the file.
393
/// This is in order to be able to reuse the same struct instead of building different
394
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
395
/// structs.
396
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
397
pub enum NameOrPath {
398
    Name(String),
399
    Path(String),
400
}
401
402
impl PartialOrd for NameOrPath {
403
0
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
404
0
        Some(self.cmp(other))
405
0
    }
406
}
407
408
impl Ord for NameOrPath {
409
0
    fn cmp(&self, other: &Self) -> Ordering {
410
0
        let self_lexical_name = match self {
411
0
            Self::Name(name) => name,
412
0
            Self::Path(path) => path,
413
        };
414
0
        let other_lexical_name = match other {
415
0
            Self::Name(name) => name,
416
0
            Self::Path(path) => path,
417
        };
418
0
        self_lexical_name.cmp(other_lexical_name)
419
0
    }
420
}
421
422
/// Represents an individual file and associated metadata.
423
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs
424
/// in `remote_execution.proto`.
425
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
426
pub struct FileInfo {
427
    pub name_or_path: NameOrPath,
428
    pub digest: DigestInfo,
429
    pub is_executable: bool,
430
}
431
432
//TODO: Make this TryFrom.
433
impl From<FileInfo> for FileNode {
434
3
    fn from(val: FileInfo) -> Self {
435
3
        let NameOrPath::Name(name) = val.name_or_path else {
  Branch (435:13): [True: 3, False: 0]
  Branch (435:13): [Folded - Ignored]
436
0
            panic!("Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
437
        };
438
3
        Self {
439
3
            name,
440
3
            digest: Some((&val.digest).into()),
441
3
            is_executable: val.is_executable,
442
3
            node_properties: Option::default(), // Not supported.
443
3
        }
444
3
    }
445
}
446
447
impl TryFrom<OutputFile> for FileInfo {
448
    type Error = Error;
449
450
2
    fn try_from(output_file: OutputFile) -> Result<Self, Error> {
451
2
        Ok(Self {
452
2
            name_or_path: NameOrPath::Path(output_file.path),
453
2
            digest: output_file
454
2
                .digest
455
2
                .err_tip(|| 
"Expected digest to exist on OutputFile"0
)
?0
456
2
                .try_into()
?0
,
457
2
            is_executable: output_file.is_executable,
458
        })
459
2
    }
460
}
461
462
//TODO: Make this TryFrom.
463
impl From<FileInfo> for OutputFile {
464
7
    fn from(val: FileInfo) -> Self {
465
7
        let NameOrPath::Path(path) = val.name_or_path else {
  Branch (465:13): [True: 7, False: 0]
  Branch (465:13): [Folded - Ignored]
466
0
            panic!("Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()");
467
        };
468
7
        Self {
469
7
            path,
470
7
            digest: Some((&val.digest).into()),
471
7
            is_executable: val.is_executable,
472
7
            contents: Bytes::default(),
473
7
            node_properties: Option::default(), // Not supported.
474
7
        }
475
7
    }
476
}
477
478
/// Represents an individual symlink file and associated metadata.
479
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
480
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
481
pub struct SymlinkInfo {
482
    pub name_or_path: NameOrPath,
483
    pub target: String,
484
}
485
486
impl TryFrom<SymlinkNode> for SymlinkInfo {
487
    type Error = Error;
488
489
0
    fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> {
490
0
        Ok(Self {
491
0
            name_or_path: NameOrPath::Name(symlink_node.name),
492
0
            target: symlink_node.target,
493
0
        })
494
0
    }
495
}
496
497
// TODO: Make this TryFrom.
498
impl From<SymlinkInfo> for SymlinkNode {
499
1
    fn from(val: SymlinkInfo) -> Self {
500
1
        let NameOrPath::Name(name) = val.name_or_path else {
  Branch (500:13): [True: 1, False: 0]
  Branch (500:13): [Folded - Ignored]
501
0
            panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
502
        };
503
1
        Self {
504
1
            name,
505
1
            target: val.target,
506
1
            node_properties: Option::default(), // Not supported.
507
1
        }
508
1
    }
509
}
510
511
impl TryFrom<OutputSymlink> for SymlinkInfo {
512
    type Error = Error;
513
514
2
    fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> {
515
2
        Ok(Self {
516
2
            name_or_path: NameOrPath::Path(output_symlink.path),
517
2
            target: output_symlink.target,
518
2
        })
519
2
    }
520
}
521
522
// TODO: Make this TryFrom.
523
impl From<SymlinkInfo> for OutputSymlink {
524
2
    fn from(val: SymlinkInfo) -> Self {
525
2
        let NameOrPath::Path(path) = val.name_or_path else {
  Branch (525:13): [True: 2, False: 0]
  Branch (525:13): [Folded - Ignored]
526
0
            panic!("Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()");
527
        };
528
2
        Self {
529
2
            path,
530
2
            target: val.target,
531
2
            node_properties: Option::default(), // Not supported.
532
2
        }
533
2
    }
534
}
535
536
/// Represents an individual directory file and associated metadata.
537
/// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`.
538
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
539
pub struct DirectoryInfo {
540
    pub path: String,
541
    pub tree_digest: DigestInfo,
542
}
543
544
impl TryFrom<OutputDirectory> for DirectoryInfo {
545
    type Error = Error;
546
547
2
    fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> {
548
2
        Ok(Self {
549
2
            path: output_directory.path,
550
2
            tree_digest: output_directory
551
2
                .tree_digest
552
2
                .err_tip(|| 
"Expected tree_digest to exist in OutputDirectory"0
)
?0
553
2
                .try_into()
?0
,
554
        })
555
2
    }
556
}
557
558
impl From<DirectoryInfo> for OutputDirectory {
559
1
    fn from(val: DirectoryInfo) -> Self {
560
1
        Self {
561
1
            path: val.path,
562
1
            tree_digest: Some(val.tree_digest.into()),
563
1
            is_topologically_sorted: false,
564
1
        }
565
1
    }
566
}
567
568
/// Represents the metadata associated with the execution result.
569
/// This struct must be 100% compatible with `ExecutedActionMetadata`.
570
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
571
pub struct ExecutionMetadata {
572
    pub worker: String,
573
    pub queued_timestamp: SystemTime,
574
    pub worker_start_timestamp: SystemTime,
575
    pub worker_completed_timestamp: SystemTime,
576
    pub input_fetch_start_timestamp: SystemTime,
577
    pub input_fetch_completed_timestamp: SystemTime,
578
    pub execution_start_timestamp: SystemTime,
579
    pub execution_completed_timestamp: SystemTime,
580
    pub output_upload_start_timestamp: SystemTime,
581
    pub output_upload_completed_timestamp: SystemTime,
582
}
583
584
impl Default for ExecutionMetadata {
585
1
    fn default() -> Self {
586
1
        Self {
587
1
            worker: String::new(),
588
1
            queued_timestamp: SystemTime::UNIX_EPOCH,
589
1
            worker_start_timestamp: SystemTime::UNIX_EPOCH,
590
1
            worker_completed_timestamp: SystemTime::UNIX_EPOCH,
591
1
            input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
592
1
            input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
593
1
            execution_start_timestamp: SystemTime::UNIX_EPOCH,
594
1
            execution_completed_timestamp: SystemTime::UNIX_EPOCH,
595
1
            output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
596
1
            output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
597
1
        }
598
1
    }
599
}
600
601
impl From<ExecutionMetadata> for ExecutedActionMetadata {
602
17
    fn from(val: ExecutionMetadata) -> Self {
603
17
        Self {
604
17
            worker: val.worker,
605
17
            queued_timestamp: Some(val.queued_timestamp.into()),
606
17
            worker_start_timestamp: Some(val.worker_start_timestamp.into()),
607
17
            worker_completed_timestamp: Some(val.worker_completed_timestamp.into()),
608
17
            input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()),
609
17
            input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()),
610
17
            execution_start_timestamp: Some(val.execution_start_timestamp.into()),
611
17
            execution_completed_timestamp: Some(val.execution_completed_timestamp.into()),
612
17
            output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()),
613
17
            output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()),
614
17
            virtual_execution_duration: val
615
17
                .execution_completed_timestamp
616
17
                .duration_since(val.execution_start_timestamp)
617
17
                .ok()
618
17
                .and_then(|duration| prost_types::Duration::try_from(duration).ok()),
619
17
            auxiliary_metadata: Vec::default(),
620
17
        }
621
17
    }
622
}
623
624
impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
625
    type Error = Error;
626
627
3
    fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> {
628
3
        Ok(Self {
629
3
            worker: eam.worker,
630
3
            queued_timestamp: eam
631
3
                .queued_timestamp
632
3
                .err_tip(|| 
"Expected queued_timestamp to exist in ExecutedActionMetadata"0
)
?0
633
3
                .try_into()
?0
,
634
3
            worker_start_timestamp: eam
635
3
                .worker_start_timestamp
636
3
                .err_tip(|| 
"Expected worker_start_timestamp to exist in ExecutedActionMetadata"0
)
?0
637
3
                .try_into()
?0
,
638
3
            worker_completed_timestamp: eam
639
3
                .worker_completed_timestamp
640
3
                .err_tip(|| {
641
0
                    "Expected worker_completed_timestamp to exist in ExecutedActionMetadata"
642
3
                })
?0
643
3
                .try_into()
?0
,
644
3
            input_fetch_start_timestamp: eam
645
3
                .input_fetch_start_timestamp
646
3
                .err_tip(|| {
647
0
                    "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata"
648
3
                })
?0
649
3
                .try_into()
?0
,
650
3
            input_fetch_completed_timestamp: eam
651
3
                .input_fetch_completed_timestamp
652
3
                .err_tip(|| {
653
0
                    "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata"
654
3
                })
?0
655
3
                .try_into()
?0
,
656
3
            execution_start_timestamp: eam
657
3
                .execution_start_timestamp
658
3
                .err_tip(|| {
659
0
                    "Expected execution_start_timestamp to exist in ExecutedActionMetadata"
660
3
                })
?0
661
3
                .try_into()
?0
,
662
3
            execution_completed_timestamp: eam
663
3
                .execution_completed_timestamp
664
3
                .err_tip(|| {
665
0
                    "Expected execution_completed_timestamp to exist in ExecutedActionMetadata"
666
3
                })
?0
667
3
                .try_into()
?0
,
668
3
            output_upload_start_timestamp: eam
669
3
                .output_upload_start_timestamp
670
3
                .err_tip(|| {
671
0
                    "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata"
672
3
                })
?0
673
3
                .try_into()
?0
,
674
3
            output_upload_completed_timestamp: eam
675
3
                .output_upload_completed_timestamp
676
3
                .err_tip(|| {
677
0
                    "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata"
678
3
                })
?0
679
3
                .try_into()
?0
,
680
        })
681
3
    }
682
}
683
684
/// Represents the results of an execution.
685
/// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`.
686
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
687
pub struct ActionResult {
688
    pub output_files: Vec<FileInfo>,
689
    pub output_folders: Vec<DirectoryInfo>,
690
    pub output_directory_symlinks: Vec<SymlinkInfo>,
691
    pub output_file_symlinks: Vec<SymlinkInfo>,
692
    pub exit_code: i32,
693
    pub stdout_digest: DigestInfo,
694
    pub stderr_digest: DigestInfo,
695
    pub execution_metadata: ExecutionMetadata,
696
    pub server_logs: HashMap<String, DigestInfo>,
697
    pub error: Option<Error>,
698
    pub message: String,
699
}
700
701
impl Default for ActionResult {
702
7
    fn default() -> Self {
703
7
        ActionResult {
704
7
            output_files: Vec::default(),
705
7
            output_folders: Vec::default(),
706
7
            output_directory_symlinks: Vec::default(),
707
7
            output_file_symlinks: Vec::default(),
708
7
            exit_code: INTERNAL_ERROR_EXIT_CODE,
709
7
            stdout_digest: DigestInfo::new([0u8; 32], 0),
710
7
            stderr_digest: DigestInfo::new([0u8; 32], 0),
711
7
            execution_metadata: ExecutionMetadata {
712
7
                worker: String::new(),
713
7
                queued_timestamp: SystemTime::UNIX_EPOCH,
714
7
                worker_start_timestamp: SystemTime::UNIX_EPOCH,
715
7
                worker_completed_timestamp: SystemTime::UNIX_EPOCH,
716
7
                input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
717
7
                input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
718
7
                execution_start_timestamp: SystemTime::UNIX_EPOCH,
719
7
                execution_completed_timestamp: SystemTime::UNIX_EPOCH,
720
7
                output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
721
7
                output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
722
7
            },
723
7
            server_logs: HashMap::default(),
724
7
            error: None,
725
7
            message: String::new(),
726
7
        }
727
7
    }
728
}
729
730
// TODO(allada) Remove the need for clippy argument by making the ActionResult and ProtoActionResult
731
// a Box.
732
/// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`.
733
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
734
#[allow(clippy::large_enum_variant)]
735
pub enum ActionStage {
736
    /// Stage is unknown.
737
    Unknown,
738
    /// Checking the cache to see if action exists.
739
    CacheCheck,
740
    /// Action has been accepted and waiting for worker to take it.
741
    Queued,
742
    // TODO(allada) We need a way to know if the job was sent to a worker, but hasn't begun
743
    // execution yet.
744
    /// Worker is executing the action.
745
    Executing,
746
    /// Worker completed the work with result.
747
    Completed(ActionResult),
748
    /// Result was found from cache, don't decode the proto just to re-encode it.
749
    #[serde(serialize_with = "serialize_proto_result", skip_deserializing)]
750
    // The serialization step decodes this to an ActionResult which is serializable.
751
    // Since it will always be serialized as an ActionResult, we do not need to support
752
    // deserialization on this type at all.
753
    // In theory, serializing this should never happen so performance shouldn't be affected.
754
    CompletedFromCache(ProtoActionResult),
755
}
756
757
0
fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error>
758
0
where
759
0
    S: serde::Serializer,
760
0
{
761
0
    let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?;
762
0
    s.serialize(serializer)
763
0
}
764
765
impl ActionStage {
766
90
    pub const fn has_action_result(&self) -> bool {
767
90
        match self {
768
77
            Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false,
769
13
            Self::Completed(_) | Self::CompletedFromCache(_) => true,
770
        }
771
90
    }
772
773
    /// Returns true if the worker considers the action done and no longer needs to be tracked.
774
    // Note: This function is separate from `has_action_result()` to not mix the concept of
775
    //       "finished" with "has a result".
776
89
    pub const fn is_finished(&self) -> bool {
777
89
        self.has_action_result()
778
89
    }
779
780
    /// Returns if the stage enum is the same as the other stage enum, but
781
    /// does not compare the values of the enum.
782
39
    pub const fn is_same_stage(&self, other: &Self) -> bool {
783
39
        matches!(
784
39
            (self, other),
785
            (Self::Unknown, Self::Unknown)
786
                | (Self::CacheCheck, Self::CacheCheck)
787
                | (Self::Queued, Self::Queued)
788
                | (Self::Executing, Self::Executing)
789
                | (Self::Completed(_), Self::Completed(_))
790
                | (Self::CompletedFromCache(_), Self::CompletedFromCache(_))
791
        )
792
39
    }
793
}
794
795
impl MetricsComponent for ActionStage {
796
0
    fn publish(
797
0
        &self,
798
0
        _kind: MetricKind,
799
0
        _field_metadata: MetricFieldData,
800
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
801
0
        let value = match self {
802
0
            ActionStage::Unknown => "Unknown".to_string(),
803
0
            ActionStage::CacheCheck => "CacheCheck".to_string(),
804
0
            ActionStage::Queued => "Queued".to_string(),
805
0
            ActionStage::Executing => "Executing".to_string(),
806
0
            ActionStage::Completed(_) => "Completed".to_string(),
807
0
            ActionStage::CompletedFromCache(_) => "CompletedFromCache".to_string(),
808
        };
809
0
        Ok(MetricPublishKnownKindData::String(value))
810
0
    }
811
}
812
813
impl From<&ActionStage> for execution_stage::Value {
814
1
    fn from(val: &ActionStage) -> Self {
815
1
        match val {
816
0
            ActionStage::Unknown => Self::Unknown,
817
0
            ActionStage::CacheCheck => Self::CacheCheck,
818
0
            ActionStage::Queued => Self::Queued,
819
0
            ActionStage::Executing => Self::Executing,
820
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
821
        }
822
1
    }
823
}
824
825
11
pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse {
826
11
    fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> {
827
11
        let mut logs = HashMap::with_capacity(server_logs.len());
828
12
        for (
k, v1
) in server_logs {
829
1
            logs.insert(
830
1
                k.clone(),
831
1
                LogFile {
832
1
                    digest: Some(v.into()),
833
1
                    human_readable: false,
834
1
                },
835
1
            );
836
1
        }
837
11
        logs
838
11
    }
839
840
11
    let status = Some(
841
11
        action_result
842
11
            .error
843
11
            .clone()
844
11
            .map_or_else(Status::default, Into::into),
845
11
    );
846
11
    let message = action_result.message.clone();
847
11
    ExecuteResponse {
848
11
        server_logs: logs_from(action_result.server_logs.clone()),
849
11
        result: Some(action_result.into()),
850
11
        cached_result: false,
851
11
        status,
852
11
        message,
853
11
    }
854
11
}
855
856
impl From<ActionStage> for ExecuteResponse {
857
6
    fn from(val: ActionStage) -> Self {
858
6
        match val {
859
            // We don't have an execute response if we don't have the results. It is defined
860
            // behavior to return an empty proto struct.
861
            ActionStage::Unknown
862
            | ActionStage::CacheCheck
863
            | ActionStage::Queued
864
0
            | ActionStage::Executing => Self::default(),
865
6
            ActionStage::Completed(action_result) => to_execute_response(action_result),
866
            // Handled separately as there are no server logs and the action
867
            // result is already in Proto format.
868
0
            ActionStage::CompletedFromCache(proto_action_result) => Self {
869
0
                server_logs: HashMap::new(),
870
0
                result: Some(proto_action_result),
871
0
                cached_result: true,
872
0
                status: Some(Status::default()),
873
0
                message: String::new(), // Will be populated later if applicable.
874
0
            },
875
        }
876
6
    }
877
}
878
879
impl From<ActionResult> for ProtoActionResult {
880
17
    fn from(val: ActionResult) -> Self {
881
17
        let mut output_symlinks = Vec::with_capacity(
882
17
            val.output_file_symlinks.len() + val.output_directory_symlinks.len(),
883
17
        );
884
17
        output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice());
885
17
        output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice());
886
17
887
17
        Self {
888
17
            output_files: val.output_files.into_iter().map(Into::into).collect(),
889
17
            output_file_symlinks: val
890
17
                .output_file_symlinks
891
17
                .into_iter()
892
17
                .map(Into::into)
893
17
                .collect(),
894
17
            output_symlinks: output_symlinks.into_iter().map(Into::into).collect(),
895
17
            output_directories: val.output_folders.into_iter().map(Into::into).collect(),
896
17
            output_directory_symlinks: val
897
17
                .output_directory_symlinks
898
17
                .into_iter()
899
17
                .map(Into::into)
900
17
                .collect(),
901
17
            exit_code: val.exit_code,
902
17
            stdout_raw: Bytes::default(),
903
17
            stdout_digest: Some(val.stdout_digest.into()),
904
17
            stderr_raw: Bytes::default(),
905
17
            stderr_digest: Some(val.stderr_digest.into()),
906
17
            execution_metadata: Some(val.execution_metadata.into()),
907
17
        }
908
17
    }
909
}
910
911
impl TryFrom<ProtoActionResult> for ActionResult {
912
    type Error = Error;
913
914
0
    fn try_from(val: ProtoActionResult) -> Result<Self, Error> {
915
0
        let output_file_symlinks = val
916
0
            .output_file_symlinks
917
0
            .into_iter()
918
0
            .map(|output_symlink| {
919
0
                SymlinkInfo::try_from(output_symlink)
920
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
921
0
            })
922
0
            .collect::<Result<Vec<_>, _>>()?;
923
924
0
        let output_directory_symlinks = val
925
0
            .output_directory_symlinks
926
0
            .into_iter()
927
0
            .map(|output_symlink| {
928
0
                SymlinkInfo::try_from(output_symlink)
929
0
                    .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo")
930
0
            })
931
0
            .collect::<Result<Vec<_>, _>>()?;
932
933
0
        let output_files = val
934
0
            .output_files
935
0
            .into_iter()
936
0
            .map(|output_file| {
937
0
                output_file
938
0
                    .try_into()
939
0
                    .err_tip(|| "Output File could not be converted")
940
0
            })
941
0
            .collect::<Result<Vec<_>, _>>()?;
942
943
0
        let output_folders = val
944
0
            .output_directories
945
0
            .into_iter()
946
0
            .map(|output_directory| {
947
0
                output_directory
948
0
                    .try_into()
949
0
                    .err_tip(|| "Output File could not be converted")
950
0
            })
951
0
            .collect::<Result<Vec<_>, _>>()?;
952
953
        Ok(Self {
954
0
            output_files,
955
0
            output_folders,
956
0
            output_file_symlinks,
957
0
            output_directory_symlinks,
958
0
            exit_code: val.exit_code,
959
0
            stdout_digest: val
960
0
                .stdout_digest
961
0
                .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?
962
0
                .try_into()?,
963
0
            stderr_digest: val
964
0
                .stderr_digest
965
0
                .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?
966
0
                .try_into()?,
967
0
            execution_metadata: val
968
0
                .execution_metadata
969
0
                .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?
970
0
                .try_into()?,
971
0
            server_logs: HashMap::default(),
972
0
            error: None,
973
0
            message: String::new(),
974
        })
975
0
    }
976
}
977
978
impl TryFrom<ExecuteResponse> for ActionStage {
979
    type Error = Error;
980
981
3
    fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> {
982
3
        let proto_action_result = execute_response
983
3
            .result
984
3
            .err_tip(|| 
"Expected result to be set on ExecuteResponse msg"0
)
?0
;
985
3
        let action_result = ActionResult {
986
3
            output_files: proto_action_result
987
3
                .output_files
988
3
                .try_map(TryInto::try_into)
?0
,
989
3
            output_directory_symlinks: proto_action_result
990
3
                .output_directory_symlinks
991
3
                .try_map(TryInto::try_into)
?0
,
992
3
            output_file_symlinks: proto_action_result
993
3
                .output_file_symlinks
994
3
                .try_map(TryInto::try_into)
?0
,
995
3
            output_folders: proto_action_result
996
3
                .output_directories
997
3
                .try_map(TryInto::try_into)
?0
,
998
3
            exit_code: proto_action_result.exit_code,
999
3
1000
3
            stdout_digest: proto_action_result
1001
3
                .stdout_digest
1002
3
                .err_tip(|| 
"Expected stdout_digest to be set on ExecuteResponse msg"0
)
?0
1003
3
                .try_into()
?0
,
1004
3
            stderr_digest: proto_action_result
1005
3
                .stderr_digest
1006
3
                .err_tip(|| 
"Expected stderr_digest to be set on ExecuteResponse msg"0
)
?0
1007
3
                .try_into()
?0
,
1008
3
            execution_metadata: proto_action_result
1009
3
                .execution_metadata
1010
3
                .err_tip(|| 
"Expected execution_metadata to be set on ExecuteResponse msg"0
)
?0
1011
3
                .try_into()
?0
,
1012
3
            server_logs: execute_response.server_logs.try_map(|v| {
1013
2
                v.digest
1014
2
                    .err_tip(|| 
"Expected digest to be set on LogFile msg"0
)
?0
1015
2
                    .try_into()
1016
3
            
}2
)
?0
,
1017
3
            error: execute_response.status.clone().and_then(|v| {
1018
3
                if v.code == 0 {
  Branch (1018:20): [True: 1, False: 2]
  Branch (1018:20): [Folded - Ignored]
1019
1
                    None
1020
                } else {
1021
2
                    Some(v.into())
1022
                }
1023
3
            }),
1024
3
            message: execute_response.message,
1025
3
        };
1026
3
1027
3
        if execute_response.cached_result {
  Branch (1027:12): [True: 0, False: 3]
  Branch (1027:12): [Folded - Ignored]
1028
0
            return Ok(Self::CompletedFromCache(action_result.into()));
1029
3
        }
1030
3
        Ok(Self::Completed(action_result))
1031
3
    }
1032
}
1033
1034
// TODO: Should be able to remove this after tokio-rs/prost#299
1035
trait TypeUrl: Message {
1036
    const TYPE_URL: &'static str;
1037
}
1038
1039
impl TypeUrl for ExecuteResponse {
1040
    const TYPE_URL: &'static str =
1041
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse";
1042
}
1043
1044
impl TypeUrl for ExecuteOperationMetadata {
1045
    const TYPE_URL: &'static str =
1046
        "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata";
1047
}
1048
1049
2
fn from_any<T>(message: &Any) -> Result<T, Error>
1050
2
where
1051
2
    T: TypeUrl + Default,
1052
2
{
1053
0
    error_if!(
1054
2
        message.type_url != T::TYPE_URL,
  Branch (1054:9): [True: 0, False: 1]
  Branch (1054:9): [True: 0, False: 1]
  Branch (1054:9): [Folded - Ignored]
1055
        "Incorrect type when decoding Any. {} != {}",
1056
        message.type_url,
1057
0
        T::TYPE_URL.to_string()
1058
    );
1059
2
    Ok(T::decode(message.value.as_slice())
?0
)
1060
2
}
1061
1062
2
fn to_any<T>(message: &T) -> Any
1063
2
where
1064
2
    T: TypeUrl,
1065
2
{
1066
2
    Any {
1067
2
        type_url: T::TYPE_URL.to_string(),
1068
2
        value: message.encode_to_vec(),
1069
2
    }
1070
2
}
1071
1072
/// Current state of the action.
1073
/// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`.
1074
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize, MetricsComponent)]
1075
pub struct ActionState {
1076
    #[metric(help = "The current stage of the action.")]
1077
    pub stage: ActionStage,
1078
    #[metric(help = "The unique identifier of the action.")]
1079
    pub client_operation_id: OperationId,
1080
    #[metric(help = "The digest of the action.")]
1081
    pub action_digest: DigestInfo,
1082
}
1083
1084
impl ActionState {
1085
1
    pub fn try_from_operation(
1086
1
        operation: Operation,
1087
1
        client_operation_id: OperationId,
1088
1
    ) -> Result<Self, Error> {
1089
1
        let metadata = from_any::<ExecuteOperationMetadata>(
1090
1
            &operation
1091
1
                .metadata
1092
1
                .err_tip(|| 
"No metadata in upstream operation"0
)
?0
,
1093
        )
1094
1
        .err_tip(|| 
"Could not decode metadata in upstream operation"0
)
?0
;
1095
1096
1
        let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| {
1097
0
            format!(
1098
0
                "Could not convert {} to execution_stage::Value",
1099
0
                metadata.stage
1100
0
            )
1101
1
        })
?0
{
1102
0
            execution_stage::Value::Unknown => ActionStage::Unknown,
1103
0
            execution_stage::Value::CacheCheck => ActionStage::CacheCheck,
1104
0
            execution_stage::Value::Queued => ActionStage::Queued,
1105
0
            execution_stage::Value::Executing => ActionStage::Executing,
1106
            execution_stage::Value::Completed => {
1107
1
                let execute_response = operation
1108
1
                    .result
1109
1
                    .err_tip(|| 
"No result data for completed upstream action"0
)
?0
;
1110
1
                match execute_response {
1111
0
                    LongRunningResult::Error(error) => ActionStage::Completed(ActionResult {
1112
0
                        error: Some(error.into()),
1113
0
                        ..ActionResult::default()
1114
0
                    }),
1115
1
                    LongRunningResult::Response(response) => {
1116
1
                        // Could be Completed, CompletedFromCache or Error.
1117
1
                        from_any::<ExecuteResponse>(&response)
1118
1
                            .err_tip(|| {
1119
0
                                "Could not decode result structure for completed upstream action"
1120
1
                            })
?0
1121
1
                            .try_into()
?0
1122
                    }
1123
                }
1124
            }
1125
        };
1126
1127
1
        let action_digest = metadata
1128
1
            .action_digest
1129
1
            .err_tip(|| 
"No action_digest in upstream operation"0
)
?0
1130
1
            .try_into()
1131
1
            .err_tip(|| 
"Could not convert action_digest into DigestInfo"0
)
?0
;
1132
1133
1
        Ok(Self {
1134
1
            stage,
1135
1
            client_operation_id,
1136
1
            action_digest,
1137
1
        })
1138
1
    }
1139
1140
1
    pub fn as_operation(&self, client_operation_id: OperationId) -> Operation {
1141
1
        let stage = Into::<execution_stage::Value>::into(&self.stage) as i32;
1142
1
        let name = client_operation_id.into_string();
1143
1144
1
        let result = if self.stage.has_action_result() {
  Branch (1144:25): [True: 1, False: 0]
  Branch (1144:25): [Folded - Ignored]
1145
1
            let execute_response: ExecuteResponse = self.stage.clone().into();
1146
1
            Some(LongRunningResult::Response(to_any(&execute_response)))
1147
        } else {
1148
0
            None
1149
        };
1150
1
        let digest = Some(self.action_digest.into());
1151
1
1152
1
        let metadata = ExecuteOperationMetadata {
1153
1
            stage,
1154
1
            action_digest: digest,
1155
1
            // TODO(blaise.bruer) We should support stderr/stdout streaming.
1156
1
            stdout_stream_name: String::default(),
1157
1
            stderr_stream_name: String::default(),
1158
1
            partial_execution_metadata: None,
1159
1
        };
1160
1
1161
1
        Operation {
1162
1
            name,
1163
1
            metadata: Some(to_any(&metadata)),
1164
1
            done: result.is_some(),
1165
1
            result,
1166
1
        }
1167
1
    }
1168
}