/build/source/nativelink-util/src/action_messages.rs
| Line | Count | Source | 
| 1 |  | // Copyright 2024 The NativeLink Authors. All rights reserved. | 
| 2 |  | // | 
| 3 |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); | 
| 4 |  | // you may not use this file except in compliance with the License. | 
| 5 |  | // You may obtain a copy of the License at | 
| 6 |  | // | 
| 7 |  | //    See LICENSE file for details | 
| 8 |  | // | 
| 9 |  | // Unless required by applicable law or agreed to in writing, software | 
| 10 |  | // distributed under the License is distributed on an "AS IS" BASIS, | 
| 11 |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 12 |  | // See the License for the specific language governing permissions and | 
| 13 |  | // limitations under the License. | 
| 14 |  |  | 
| 15 |  | use core::cmp::Ordering; | 
| 16 |  | use core::convert::Into; | 
| 17 |  | use core::hash::Hash; | 
| 18 |  | use core::time::Duration; | 
| 19 |  | use std::collections::HashMap; | 
| 20 |  | use std::time::SystemTime; | 
| 21 |  |  | 
| 22 |  | use nativelink_error::{Error, ResultExt, error_if, make_input_err}; | 
| 23 |  | use nativelink_metric::{ | 
| 24 |  |     MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, publish, | 
| 25 |  | }; | 
| 26 |  | use nativelink_proto::build::bazel::remote::execution::v2::{ | 
| 27 |  |     Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest, | 
| 28 |  |     ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, | 
| 29 |  |     OutputSymlink, SymlinkNode, execution_stage, | 
| 30 |  | }; | 
| 31 |  | use nativelink_proto::google::longrunning::Operation; | 
| 32 |  | use nativelink_proto::google::longrunning::operation::Result as LongRunningResult; | 
| 33 |  | use nativelink_proto::google::rpc::Status; | 
| 34 |  | use prost::Message; | 
| 35 |  | use prost::bytes::Bytes; | 
| 36 |  | use prost_types::Any; | 
| 37 |  | use serde::ser::Error as SerdeError; | 
| 38 |  | use serde::{Deserialize, Serialize}; | 
| 39 |  | use uuid::Uuid; | 
| 40 |  |  | 
| 41 |  | use crate::common::{DigestInfo, HashMapExt, VecExt}; | 
| 42 |  | use crate::digest_hasher::DigestHasherFunc; | 
| 43 |  |  | 
| 44 |  | /// Default priority remote execution jobs will get when not provided. | 
| 45 |  | pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0; | 
| 46 |  |  | 
| 47 |  | /// Exit code sent if there is an internal error. | 
| 48 |  | pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178; | 
| 49 |  |  | 
| 50 |  | /// Holds an id that is unique to the client for a requested operation. | 
| 51 |  | #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] | 
| 52 |  | pub enum OperationId { | 
| 53 |  |     Uuid(Uuid), | 
| 54 |  |     String(String), | 
| 55 |  | } | 
| 56 |  |  | 
| 57 |  | impl OperationId { | 
| 58 | 1 |     pub fn into_string(self) -> String { | 
| 59 | 1 |         match self { | 
| 60 | 1 |             Self::Uuid(uuid) => uuid.to_string(), | 
| 61 | 0 |             Self::String(name) => name, | 
| 62 |  |         } | 
| 63 | 1 |     } | 
| 64 |  | } | 
| 65 |  |  | 
| 66 |  | impl Default for OperationId { | 
| 67 | 103 |     fn default() -> Self { | 
| 68 | 103 |         Self::Uuid(Uuid::new_v4()) | 
| 69 | 103 |     } | 
| 70 |  | } | 
| 71 |  |  | 
| 72 |  | impl core::fmt::Display for OperationId { | 
| 73 | 172 |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
| 74 | 172 |         match self { | 
| 75 | 87 |             Self::Uuid(uuid) => uuid.fmt(f), | 
| 76 | 85 |             Self::String(name) => f.write_str(name), | 
| 77 |  |         } | 
| 78 | 172 |     } | 
| 79 |  | } | 
| 80 |  |  | 
| 81 |  | impl MetricsComponent for OperationId { | 
| 82 | 0 |     fn publish( | 
| 83 | 0 |         &self, | 
| 84 | 0 |         _kind: MetricKind, | 
| 85 | 0 |         _field_metadata: MetricFieldData, | 
| 86 | 0 |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
| 87 | 0 |         Ok(MetricPublishKnownKindData::String(self.to_string())) | 
| 88 | 0 |     } | 
| 89 |  | } | 
| 90 |  |  | 
| 91 |  | impl From<&str> for OperationId { | 
| 92 | 42 |     fn from(value: &str) -> Self { | 
| 93 | 42 |         Uuid::parse_str(value).map_or_else(|_| Self::String(value21. to_string21()), Self::Uuid) | 
| 94 | 42 |     } | 
| 95 |  | } | 
| 96 |  |  | 
| 97 |  | impl From<String> for OperationId { | 
| 98 | 8 |     fn from(value: String) -> Self { | 
| 99 | 8 |         Uuid::parse_str(&value).map_or(Self::String(value), Self::Uuid) | 
| 100 | 8 |     } | 
| 101 |  | } | 
| 102 |  |  | 
| 103 |  | impl TryFrom<Bytes> for OperationId { | 
| 104 |  |     type Error = Error; | 
| 105 |  |  | 
| 106 | 0 |     fn try_from(value: Bytes) -> Result<Self, Self::Error> { | 
| 107 |  |         // This is an optimized path to attempt to do the conversion in-place | 
| 108 |  |         // to avoid an extra allocation/copy. | 
| 109 | 0 |         match value.try_into_mut() { | 
| 110 |  |             // We are the only reference to the Bytes, so we can convert it into a Vec<u8> | 
| 111 |  |             // for free then convert the Vec<u8> to a String for free too. | 
| 112 | 0 |             Ok(value) => { | 
| 113 | 0 |                 let value = String::from_utf8(value.into()).map_err(|e| { | 
| 114 | 0 |                     make_input_err!( | 
| 115 |  |                         "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}" | 
| 116 |  |                     ) | 
| 117 | 0 |                 })?; | 
| 118 | 0 |                 Ok(Self::from(value)) | 
| 119 |  |             } | 
| 120 |  |             // We could not take ownership of the Bytes, so we may need to copy our data. | 
| 121 | 0 |             Err(value) => { | 
| 122 | 0 |                 let value = core::str::from_utf8(&value).map_err(|e| { | 
| 123 | 0 |                     make_input_err!( | 
| 124 |  |                         "Failed to convert bytes to string in try_from<Bytes> for OperationId : {e:?}" | 
| 125 |  |                     ) | 
| 126 | 0 |                 })?; | 
| 127 | 0 |                 Ok(Self::from(value)) | 
| 128 |  |             } | 
| 129 |  |         } | 
| 130 | 0 |     } | 
| 131 |  | } | 
| 132 |  |  | 
| 133 |  | /// Unique id of worker. | 
| 134 |  | #[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)] | 
| 135 |  | pub struct WorkerId(pub String); | 
| 136 |  |  | 
| 137 |  | impl MetricsComponent for WorkerId { | 
| 138 | 0 |     fn publish( | 
| 139 | 0 |         &self, | 
| 140 | 0 |         _kind: MetricKind, | 
| 141 | 0 |         _field_metadata: MetricFieldData, | 
| 142 | 0 |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
| 143 | 0 |         Ok(MetricPublishKnownKindData::String(self.0.clone())) | 
| 144 | 0 |     } | 
| 145 |  | } | 
| 146 |  |  | 
| 147 |  | impl core::fmt::Display for WorkerId { | 
| 148 | 25 |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
| 149 | 25 |         f.write_fmt(format_args!("{}", self.0)) | 
| 150 | 25 |     } | 
| 151 |  | } | 
| 152 |  |  | 
| 153 |  | impl core::fmt::Debug for WorkerId { | 
| 154 | 6 |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
| 155 | 6 |         core::fmt::Display::fmt(&self, f) | 
| 156 | 6 |     } | 
| 157 |  | } | 
| 158 |  |  | 
| 159 |  | impl From<WorkerId> for String { | 
| 160 | 101 |     fn from(val: WorkerId) -> Self { | 
| 161 | 101 |         val.0 | 
| 162 | 101 |     } | 
| 163 |  | } | 
| 164 |  |  | 
| 165 |  | impl From<String> for WorkerId { | 
| 166 | 6 |     fn from(s: String) -> Self { | 
| 167 | 6 |         Self(s) | 
| 168 | 6 |     } | 
| 169 |  | } | 
| 170 |  |  | 
| 171 |  | /// Holds the information needed to uniquely identify an action | 
| 172 |  | /// and if it is cacheable or not. | 
| 173 |  | #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] | 
| 174 |  | pub enum ActionUniqueQualifier { | 
| 175 |  |     /// The action is cacheable. | 
| 176 |  |     #[serde(alias = "Cachable")] // Pre 0.7.0 spelling | 
| 177 |  |     Cacheable(ActionUniqueKey), | 
| 178 |  |     /// The action is uncacheable. | 
| 179 |  |     #[serde(alias = "Uncachable")] // Pre 0.7.0 spelling | 
| 180 |  |     Uncacheable(ActionUniqueKey), | 
| 181 |  | } | 
| 182 |  |  | 
| 183 |  | impl MetricsComponent for ActionUniqueQualifier { | 
| 184 | 0 |     fn publish( | 
| 185 | 0 |         &self, | 
| 186 | 0 |         _kind: MetricKind, | 
| 187 | 0 |         field_metadata: MetricFieldData, | 
| 188 | 0 |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
| 189 | 0 |         let (cacheable, action) = match self { | 
| 190 | 0 |             Self::Cacheable(action) => (true, action), | 
| 191 | 0 |             Self::Uncacheable(action) => (false, action), | 
| 192 |  |         }; | 
| 193 | 0 |         publish!( | 
| 194 | 0 |             cacheable, | 
| 195 | 0 |             &cacheable, | 
| 196 | 0 |             MetricKind::Default, | 
| 197 | 0 |             "If the action is cacheable.", | 
| 198 | 0 |             "" | 
| 199 |  |         ); | 
| 200 | 0 |         action.publish(MetricKind::Component, field_metadata)?; | 
| 201 | 0 |         Ok(MetricPublishKnownKindData::Component) | 
| 202 | 0 |     } | 
| 203 |  | } | 
| 204 |  |  | 
| 205 |  | impl ActionUniqueQualifier { | 
| 206 |  |     /// Get the `instance_name` of the action. | 
| 207 | 0 |     pub const fn instance_name(&self) -> &String { | 
| 208 | 0 |         match self { | 
| 209 | 0 |             Self::Cacheable(action) | Self::Uncacheable(action) => &action.instance_name, | 
| 210 |  |         } | 
| 211 | 0 |     } | 
| 212 |  |  | 
| 213 |  |     /// Get the digest function of the action. | 
| 214 | 11 |     pub const fn digest_function(&self) -> DigestHasherFunc { | 
| 215 | 11 |         match self { | 
| 216 | 11 |             Self::Cacheable(action) | Self::Uncacheable(action0) => action.digest_function, | 
| 217 |  |         } | 
| 218 | 11 |     } | 
| 219 |  |  | 
| 220 |  |     /// Get the digest of the action. | 
| 221 | 137 |     pub const fn digest(&self) -> DigestInfo { | 
| 222 | 137 |         match self { | 
| 223 | 137 |             Self::Cacheable(action132) | Self::Uncacheable( action5) => action.digest, | 
| 224 |  |         } | 
| 225 | 137 |     } | 
| 226 |  | } | 
| 227 |  |  | 
| 228 |  | impl core::fmt::Display for ActionUniqueQualifier { | 
| 229 | 19 |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
| 230 | 19 |         let (cacheable, unique_key) = match self { | 
| 231 | 19 |             Self::Cacheable(action) => (true, action), | 
| 232 | 0 |             Self::Uncacheable(action) => (false, action), | 
| 233 |  |         }; | 
| 234 | 19 |         f.write_fmt(format_args!( | 
| 235 |  |             // Note: We use underscores because it makes escaping easier | 
| 236 |  |             // for redis. | 
| 237 | 19 |             "{}_{}_{}_{}_{}", | 
| 238 |  |             unique_key.instance_name, | 
| 239 |  |             unique_key.digest_function, | 
| 240 | 19 |             unique_key.digest.packed_hash(), | 
| 241 | 19 |             unique_key.digest.size_bytes(), | 
| 242 | 19 |             if cacheable { 'c' } else { 'u'0},  Branch (242:16): [True: 19, False: 0]
  Branch (242:16): [Folded - Ignored]
 | 
| 243 |  |         )) | 
| 244 | 19 |     } | 
| 245 |  | } | 
| 246 |  |  | 
| 247 |  | /// This is a utility struct used to make it easier to match `ActionInfos` in a | 
| 248 |  | /// `HashMap` without needing to construct an entire `ActionInfo`. | 
| 249 |  | #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, MetricsComponent)] | 
| 250 |  | pub struct ActionUniqueKey { | 
| 251 |  |     /// Name of instance group this action belongs to. | 
| 252 |  |     #[metric(help = "Name of instance group this action belongs to.")] | 
| 253 |  |     pub instance_name: String, | 
| 254 |  |     /// The digest function this action expects. | 
| 255 |  |     #[metric(help = "The digest function this action expects.")] | 
| 256 |  |     pub digest_function: DigestHasherFunc, | 
| 257 |  |     /// Digest of the underlying `Action`. | 
| 258 |  |     #[metric(help = "Digest of the underlying Action.")] | 
| 259 |  |     pub digest: DigestInfo, | 
| 260 |  | } | 
| 261 |  |  | 
| 262 |  | impl core::fmt::Display for ActionUniqueKey { | 
| 263 | 0 |     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | 
| 264 | 0 |         f.write_fmt(format_args!( | 
| 265 | 0 |             "{}/{}/{}", | 
| 266 |  |             self.instance_name, self.digest_function, self.digest, | 
| 267 |  |         )) | 
| 268 | 0 |     } | 
| 269 |  | } | 
| 270 |  |  | 
| 271 |  | /// Information needed to execute an action. This struct is used over bazel's proto `Action` | 
| 272 |  | /// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts) | 
| 273 |  | /// to ensure we never match against another `ActionInfo` (when a task should never be cached). | 
| 274 |  | /// This struct must be 100% compatible with `ExecuteRequest` struct in `remote_execution.proto` | 
| 275 |  | /// except for the salt field. | 
| 276 |  | #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, MetricsComponent)] | 
| 277 |  | pub struct ActionInfo { | 
| 278 |  |     /// Digest of the underlying `Command`. | 
| 279 |  |     #[metric(help = "Digest of the underlying Command.")] | 
| 280 |  |     pub command_digest: DigestInfo, | 
| 281 |  |     /// Digest of the underlying `Directory`. | 
| 282 |  |     #[metric(help = "Digest of the underlying Directory.")] | 
| 283 |  |     pub input_root_digest: DigestInfo, | 
| 284 |  |     /// Timeout of the action. | 
| 285 |  |     #[metric(help = "Timeout of the action.")] | 
| 286 |  |     pub timeout: Duration, | 
| 287 |  |     /// The properties rules that must be applied when finding a worker that can run this action. | 
| 288 |  |     #[metric(group = "platform_properties")] | 
| 289 |  |     pub platform_properties: HashMap<String, String>, | 
| 290 |  |     /// The priority of the action. Higher value means it should execute faster. | 
| 291 |  |     #[metric(help = "The priority of the action. Higher value means it should execute faster.")] | 
| 292 |  |     pub priority: i32, | 
| 293 |  |     /// When this action started to be loaded from the CAS. | 
| 294 |  |     #[metric(help = "When this action started to be loaded from the CAS.")] | 
| 295 |  |     pub load_timestamp: SystemTime, | 
| 296 |  |     /// When this action was created. | 
| 297 |  |     #[metric(help = "When this action was created.")] | 
| 298 |  |     pub insert_timestamp: SystemTime, | 
| 299 |  |     /// Info used to uniquely identify this `ActionInfo` and if it is cacheable. | 
| 300 |  |     /// This is primarily used to join actions/operations together using this key. | 
| 301 |  |     #[metric(help = "Info used to uniquely identify this ActionInfo and if it is cacheable.")] | 
| 302 |  |     pub unique_qualifier: ActionUniqueQualifier, | 
| 303 |  | } | 
| 304 |  |  | 
| 305 |  | impl ActionInfo { | 
| 306 |  |     #[inline] | 
| 307 | 0 |     pub const fn instance_name(&self) -> &String { | 
| 308 | 0 |         self.unique_qualifier.instance_name() | 
| 309 | 0 |     } | 
| 310 |  |  | 
| 311 |  |     /// Returns the underlying digest of the `Action`. | 
| 312 |  |     #[inline] | 
| 313 | 90 |     pub const fn digest(&self) -> DigestInfo { | 
| 314 | 90 |         self.unique_qualifier.digest() | 
| 315 | 90 |     } | 
| 316 |  |  | 
| 317 | 19 |     pub fn try_from_action_and_execute_request( | 
| 318 | 19 |         execute_request: ExecuteRequest, | 
| 319 | 19 |         action: Action, | 
| 320 | 19 |         load_timestamp: SystemTime, | 
| 321 | 19 |         queued_timestamp: SystemTime, | 
| 322 | 19 |     ) -> Result<Self, Error> { | 
| 323 | 19 |         let unique_key = ActionUniqueKey { | 
| 324 | 19 |             instance_name: execute_request.instance_name, | 
| 325 | 19 |             digest_function: DigestHasherFunc::try_from(execute_request.digest_function) | 
| 326 | 19 |                 .err_tip(|| format!("Could not find digest_function in try_from_action_and_execute_request {:?}"0, execute_request.digest_function)) ?0, | 
| 327 | 19 |             digest: execute_request | 
| 328 | 19 |                 .action_digest | 
| 329 | 19 |                 .err_tip(|| "Expected action_digest to exist on ExecuteRequest")?0 | 
| 330 | 19 |                 .try_into()?0, | 
| 331 |  |         }; | 
| 332 | 19 |         let unique_qualifier = if execute_request.skip_cache_lookup {  Branch (332:35): [True: 0, False: 19]
  Branch (332:35): [Folded - Ignored]
 | 
| 333 | 0 |             ActionUniqueQualifier::Uncacheable(unique_key) | 
| 334 |  |         } else { | 
| 335 | 19 |             ActionUniqueQualifier::Cacheable(unique_key) | 
| 336 |  |         }; | 
| 337 |  |  | 
| 338 | 19 |         let proto_properties = action.platform.unwrap_or_default(); | 
| 339 | 19 |         let mut platform_properties = HashMap::with_capacity(proto_properties.properties.len()); | 
| 340 | 20 |         for property1in proto_properties.properties { | 
| 341 | 1 |             platform_properties.insert(property.name, property.value); | 
| 342 | 1 |         } | 
| 343 |  |  | 
| 344 |  |         Ok(Self { | 
| 345 | 19 |             command_digest: action | 
| 346 | 19 |                 .command_digest | 
| 347 | 19 |                 .err_tip(|| "Expected command_digest to exist on Action")?0 | 
| 348 | 19 |                 .try_into()?0, | 
| 349 | 19 |             input_root_digest: action | 
| 350 | 19 |                 .input_root_digest | 
| 351 | 19 |                 .err_tip(|| "Expected input_root_digest to exist on Action")?0 | 
| 352 | 19 |                 .try_into()?0, | 
| 353 | 19 |             timeout: action | 
| 354 | 19 |                 .timeout | 
| 355 | 19 |                 .unwrap_or_default() | 
| 356 | 19 |                 .try_into() | 
| 357 | 19 |                 .map_err(|_| make_input_err!("Failed convert proto duration to system duration"))?0, | 
| 358 | 19 |             platform_properties, | 
| 359 | 19 |             priority: execute_request | 
| 360 | 19 |                 .execution_policy | 
| 361 | 19 |                 .unwrap_or_default() | 
| 362 | 19 |                 .priority, | 
| 363 | 19 |             load_timestamp, | 
| 364 | 19 |             insert_timestamp: queued_timestamp, | 
| 365 | 19 |             unique_qualifier, | 
| 366 |  |         }) | 
| 367 | 19 |     } | 
| 368 |  | } | 
| 369 |  |  | 
| 370 |  | impl From<&ActionInfo> for ExecuteRequest { | 
| 371 | 36 |     fn from(val: &ActionInfo) -> Self { | 
| 372 | 36 |         let digest = val.digest().into(); | 
| 373 | 36 |         let (skip_cache_lookup, unique_qualifier) = match &val.unique_qualifier { | 
| 374 | 31 |             ActionUniqueQualifier::Cacheable(unique_qualifier) => (false, unique_qualifier), | 
| 375 | 5 |             ActionUniqueQualifier::Uncacheable(unique_qualifier) => (true, unique_qualifier), | 
| 376 |  |         }; | 
| 377 | 36 |         Self { | 
| 378 | 36 |             instance_name: unique_qualifier.instance_name.clone(), | 
| 379 | 36 |             action_digest: Some(digest), | 
| 380 | 36 |             skip_cache_lookup, | 
| 381 | 36 |             execution_policy: None,     // Not used in the worker. | 
| 382 | 36 |             results_cache_policy: None, // Not used in the worker. | 
| 383 | 36 |             digest_function: unique_qualifier.digest_function.proto_digest_func().into(), | 
| 384 | 36 |         } | 
| 385 | 36 |     } | 
| 386 |  | } | 
| 387 |  |  | 
| 388 |  | /// Simple utility struct to determine if a string is representing a full path or | 
| 389 |  | /// just the name of the file. | 
| 390 |  | /// This is in order to be able to reuse the same struct instead of building different | 
| 391 |  | /// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar | 
| 392 |  | /// structs. | 
| 393 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 394 |  | pub enum NameOrPath { | 
| 395 |  |     Name(String), | 
| 396 |  |     Path(String), | 
| 397 |  | } | 
| 398 |  |  | 
| 399 |  | impl PartialOrd for NameOrPath { | 
| 400 | 0 |     fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | 
| 401 | 0 |         Some(self.cmp(other)) | 
| 402 | 0 |     } | 
| 403 |  | } | 
| 404 |  |  | 
| 405 |  | impl Ord for NameOrPath { | 
| 406 | 0 |     fn cmp(&self, other: &Self) -> Ordering { | 
| 407 | 0 |         let self_lexical_name = match self { | 
| 408 | 0 |             Self::Name(name) => name, | 
| 409 | 0 |             Self::Path(path) => path, | 
| 410 |  |         }; | 
| 411 | 0 |         let other_lexical_name = match other { | 
| 412 | 0 |             Self::Name(name) => name, | 
| 413 | 0 |             Self::Path(path) => path, | 
| 414 |  |         }; | 
| 415 | 0 |         self_lexical_name.cmp(other_lexical_name) | 
| 416 | 0 |     } | 
| 417 |  | } | 
| 418 |  |  | 
| 419 |  | /// Represents an individual file and associated metadata. | 
| 420 |  | /// This struct must be 100% compatible with `OutputFile` and `FileNode` structs | 
| 421 |  | /// in `remote_execution.proto`. | 
| 422 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 423 |  | pub struct FileInfo { | 
| 424 |  |     pub name_or_path: NameOrPath, | 
| 425 |  |     pub digest: DigestInfo, | 
| 426 |  |     pub is_executable: bool, | 
| 427 |  | } | 
| 428 |  |  | 
| 429 |  | impl TryFrom<FileInfo> for FileNode { | 
| 430 |  |     type Error = Error; | 
| 431 |  |  | 
| 432 | 3 |     fn try_from(val: FileInfo) -> Result<Self, Error> { | 
| 433 | 3 |         match val.name_or_path { | 
| 434 | 0 |             NameOrPath::Path(_) => Err(make_input_err!( | 
| 435 | 0 |                 "Cannot return a FileInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()" | 
| 436 | 0 |             )), | 
| 437 | 3 |             NameOrPath::Name(name) => Ok(Self { | 
| 438 | 3 |                 name, | 
| 439 | 3 |                 digest: Some((&val.digest).into()), | 
| 440 | 3 |                 is_executable: val.is_executable, | 
| 441 | 3 |                 node_properties: None, // Not supported. | 
| 442 | 3 |             }), | 
| 443 |  |         } | 
| 444 | 3 |     } | 
| 445 |  | } | 
| 446 |  |  | 
| 447 |  | impl TryFrom<OutputFile> for FileInfo { | 
| 448 |  |     type Error = Error; | 
| 449 |  |  | 
| 450 | 2 |     fn try_from(output_file: OutputFile) -> Result<Self, Error> { | 
| 451 |  |         Ok(Self { | 
| 452 | 2 |             name_or_path: NameOrPath::Path(output_file.path), | 
| 453 | 2 |             digest: output_file | 
| 454 | 2 |                 .digest | 
| 455 | 2 |                 .err_tip(|| "Expected digest to exist on OutputFile")?0 | 
| 456 | 2 |                 .try_into()?0, | 
| 457 | 2 |             is_executable: output_file.is_executable, | 
| 458 |  |         }) | 
| 459 | 2 |     } | 
| 460 |  | } | 
| 461 |  |  | 
| 462 |  | impl TryFrom<FileInfo> for OutputFile { | 
| 463 |  |     type Error = Error; | 
| 464 |  |  | 
| 465 | 7 |     fn try_from(val: FileInfo) -> Result<Self, Error> { | 
| 466 | 7 |         match val.name_or_path { | 
| 467 | 0 |             NameOrPath::Name(_) => Err(make_input_err!( | 
| 468 | 0 |                 "Cannot return a FileInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()" | 
| 469 | 0 |             )), | 
| 470 | 7 |             NameOrPath::Path(path) => Ok(Self { | 
| 471 | 7 |                 path, | 
| 472 | 7 |                 digest: Some((&val.digest).into()), | 
| 473 | 7 |                 is_executable: val.is_executable, | 
| 474 | 7 |                 contents: Bytes::default(), | 
| 475 | 7 |                 node_properties: None, // Not supported. | 
| 476 | 7 |             }), | 
| 477 |  |         } | 
| 478 | 7 |     } | 
| 479 |  | } | 
| 480 |  |  | 
| 481 |  | /// Represents an individual symlink file and associated metadata. | 
| 482 |  | /// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`. | 
| 483 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 484 |  | pub struct SymlinkInfo { | 
| 485 |  |     pub name_or_path: NameOrPath, | 
| 486 |  |     pub target: String, | 
| 487 |  | } | 
| 488 |  |  | 
| 489 |  | impl TryFrom<SymlinkNode> for SymlinkInfo { | 
| 490 |  |     type Error = Error; | 
| 491 |  |  | 
| 492 | 0 |     fn try_from(symlink_node: SymlinkNode) -> Result<Self, Error> { | 
| 493 | 0 |         Ok(Self { | 
| 494 | 0 |             name_or_path: NameOrPath::Name(symlink_node.name), | 
| 495 | 0 |             target: symlink_node.target, | 
| 496 | 0 |         }) | 
| 497 | 0 |     } | 
| 498 |  | } | 
| 499 |  |  | 
| 500 |  | impl TryFrom<SymlinkInfo> for SymlinkNode { | 
| 501 |  |     type Error = Error; | 
| 502 |  |  | 
| 503 | 1 |     fn try_from(val: SymlinkInfo) -> Result<Self, Error> { | 
| 504 | 1 |         match val.name_or_path { | 
| 505 | 0 |             NameOrPath::Path(_) => Err(make_input_err!( | 
| 506 | 0 |                 "Cannot return a SymlinkInfo that uses a NameOrPath::Path(), it must be a NameOrPath::Name()" | 
| 507 | 0 |             )), | 
| 508 | 1 |             NameOrPath::Name(name) => Ok(Self { | 
| 509 | 1 |                 name, | 
| 510 | 1 |                 target: val.target, | 
| 511 | 1 |                 node_properties: None, // Not supported. | 
| 512 | 1 |             }), | 
| 513 |  |         } | 
| 514 | 1 |     } | 
| 515 |  | } | 
| 516 |  |  | 
| 517 |  | impl TryFrom<OutputSymlink> for SymlinkInfo { | 
| 518 |  |     type Error = Error; | 
| 519 |  |  | 
| 520 | 2 |     fn try_from(output_symlink: OutputSymlink) -> Result<Self, Error> { | 
| 521 | 2 |         Ok(Self { | 
| 522 | 2 |             name_or_path: NameOrPath::Path(output_symlink.path), | 
| 523 | 2 |             target: output_symlink.target, | 
| 524 | 2 |         }) | 
| 525 | 2 |     } | 
| 526 |  | } | 
| 527 |  |  | 
| 528 |  | impl TryFrom<SymlinkInfo> for OutputSymlink { | 
| 529 |  |     type Error = Error; | 
| 530 |  |  | 
| 531 | 2 |     fn try_from(val: SymlinkInfo) -> Result<Self, Error> { | 
| 532 | 2 |         match val.name_or_path { | 
| 533 | 2 |             NameOrPath::Path(path) => { | 
| 534 | 2 |                 Ok(Self { | 
| 535 | 2 |                     path, | 
| 536 | 2 |                     target: val.target, | 
| 537 | 2 |                     node_properties: None, // Not supported. | 
| 538 | 2 |                 }) | 
| 539 |  |             } | 
| 540 | 0 |             NameOrPath::Name(_) => Err(make_input_err!( | 
| 541 | 0 |                 "Cannot return a SymlinkInfo that uses a NameOrPath::Name(), it must be a NameOrPath::Path()" | 
| 542 | 0 |             )), | 
| 543 |  |         } | 
| 544 | 2 |     } | 
| 545 |  | } | 
| 546 |  |  | 
| 547 |  | /// Represents an individual directory file and associated metadata. | 
| 548 |  | /// This struct must be 100% compatible with `SymlinkNode` and `OutputSymlink`. | 
| 549 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 550 |  | pub struct DirectoryInfo { | 
| 551 |  |     pub path: String, | 
| 552 |  |     pub tree_digest: DigestInfo, | 
| 553 |  | } | 
| 554 |  |  | 
| 555 |  | impl TryFrom<OutputDirectory> for DirectoryInfo { | 
| 556 |  |     type Error = Error; | 
| 557 |  |  | 
| 558 | 2 |     fn try_from(output_directory: OutputDirectory) -> Result<Self, Error> { | 
| 559 |  |         Ok(Self { | 
| 560 | 2 |             path: output_directory.path, | 
| 561 | 2 |             tree_digest: output_directory | 
| 562 | 2 |                 .tree_digest | 
| 563 | 2 |                 .err_tip(|| "Expected tree_digest to exist in OutputDirectory")?0 | 
| 564 | 2 |                 .try_into()?0, | 
| 565 |  |         }) | 
| 566 | 2 |     } | 
| 567 |  | } | 
| 568 |  |  | 
| 569 |  | impl From<DirectoryInfo> for OutputDirectory { | 
| 570 | 1 |     fn from(val: DirectoryInfo) -> Self { | 
| 571 | 1 |         Self { | 
| 572 | 1 |             path: val.path, | 
| 573 | 1 |             tree_digest: Some(val.tree_digest.into()), | 
| 574 | 1 |             is_topologically_sorted: false, | 
| 575 | 1 |         } | 
| 576 | 1 |     } | 
| 577 |  | } | 
| 578 |  |  | 
| 579 |  | /// Represents the metadata associated with the execution result. | 
| 580 |  | /// This struct must be 100% compatible with `ExecutedActionMetadata`. | 
| 581 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 582 |  | pub struct ExecutionMetadata { | 
| 583 |  |     pub worker: String, | 
| 584 |  |     pub queued_timestamp: SystemTime, | 
| 585 |  |     pub worker_start_timestamp: SystemTime, | 
| 586 |  |     pub worker_completed_timestamp: SystemTime, | 
| 587 |  |     pub input_fetch_start_timestamp: SystemTime, | 
| 588 |  |     pub input_fetch_completed_timestamp: SystemTime, | 
| 589 |  |     pub execution_start_timestamp: SystemTime, | 
| 590 |  |     pub execution_completed_timestamp: SystemTime, | 
| 591 |  |     pub output_upload_start_timestamp: SystemTime, | 
| 592 |  |     pub output_upload_completed_timestamp: SystemTime, | 
| 593 |  | } | 
| 594 |  |  | 
| 595 |  | impl Default for ExecutionMetadata { | 
| 596 | 2 |     fn default() -> Self { | 
| 597 | 2 |         Self { | 
| 598 | 2 |             worker: String::new(), | 
| 599 | 2 |             queued_timestamp: SystemTime::UNIX_EPOCH, | 
| 600 | 2 |             worker_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 601 | 2 |             worker_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 602 | 2 |             input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 603 | 2 |             input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 604 | 2 |             execution_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 605 | 2 |             execution_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 606 | 2 |             output_upload_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 607 | 2 |             output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 608 | 2 |         } | 
| 609 | 2 |     } | 
| 610 |  | } | 
| 611 |  |  | 
| 612 |  | impl From<ExecutionMetadata> for ExecutedActionMetadata { | 
| 613 | 17 |     fn from(val: ExecutionMetadata) -> Self { | 
| 614 |  |         Self { | 
| 615 | 17 |             worker: val.worker, | 
| 616 | 17 |             queued_timestamp: Some(val.queued_timestamp.into()), | 
| 617 | 17 |             worker_start_timestamp: Some(val.worker_start_timestamp.into()), | 
| 618 | 17 |             worker_completed_timestamp: Some(val.worker_completed_timestamp.into()), | 
| 619 | 17 |             input_fetch_start_timestamp: Some(val.input_fetch_start_timestamp.into()), | 
| 620 | 17 |             input_fetch_completed_timestamp: Some(val.input_fetch_completed_timestamp.into()), | 
| 621 | 17 |             execution_start_timestamp: Some(val.execution_start_timestamp.into()), | 
| 622 | 17 |             execution_completed_timestamp: Some(val.execution_completed_timestamp.into()), | 
| 623 | 17 |             output_upload_start_timestamp: Some(val.output_upload_start_timestamp.into()), | 
| 624 | 17 |             output_upload_completed_timestamp: Some(val.output_upload_completed_timestamp.into()), | 
| 625 | 17 |             virtual_execution_duration: val | 
| 626 | 17 |                 .execution_completed_timestamp | 
| 627 | 17 |                 .duration_since(val.execution_start_timestamp) | 
| 628 | 17 |                 .ok() | 
| 629 | 17 |                 .and_then(|duration| prost_types::Duration::try_from(duration).ok()), | 
| 630 | 17 |             auxiliary_metadata: Vec::default(), | 
| 631 |  |         } | 
| 632 | 17 |     } | 
| 633 |  | } | 
| 634 |  |  | 
| 635 |  | impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata { | 
| 636 |  |     type Error = Error; | 
| 637 |  |  | 
| 638 | 3 |     fn try_from(eam: ExecutedActionMetadata) -> Result<Self, Error> { | 
| 639 |  |         Ok(Self { | 
| 640 | 3 |             worker: eam.worker, | 
| 641 | 3 |             queued_timestamp: eam | 
| 642 | 3 |                 .queued_timestamp | 
| 643 | 3 |                 .err_tip(|| "Expected queued_timestamp to exist in ExecutedActionMetadata")?0 | 
| 644 | 3 |                 .try_into()?0, | 
| 645 | 3 |             worker_start_timestamp: eam | 
| 646 | 3 |                 .worker_start_timestamp | 
| 647 | 3 |                 .err_tip(|| "Expected worker_start_timestamp to exist in ExecutedActionMetadata")?0 | 
| 648 | 3 |                 .try_into()?0, | 
| 649 | 3 |             worker_completed_timestamp: eam | 
| 650 | 3 |                 .worker_completed_timestamp | 
| 651 | 3 |                 .err_tip(|| {0 | 
| 652 | 0 |                     "Expected worker_completed_timestamp to exist in ExecutedActionMetadata" | 
| 653 | 0 |                 })? | 
| 654 | 3 |                 .try_into()?0, | 
| 655 | 3 |             input_fetch_start_timestamp: eam | 
| 656 | 3 |                 .input_fetch_start_timestamp | 
| 657 | 3 |                 .err_tip(|| {0 | 
| 658 | 0 |                     "Expected input_fetch_start_timestamp to exist in ExecutedActionMetadata" | 
| 659 | 0 |                 })? | 
| 660 | 3 |                 .try_into()?0, | 
| 661 | 3 |             input_fetch_completed_timestamp: eam | 
| 662 | 3 |                 .input_fetch_completed_timestamp | 
| 663 | 3 |                 .err_tip(|| {0 | 
| 664 | 0 |                     "Expected input_fetch_completed_timestamp to exist in ExecutedActionMetadata" | 
| 665 | 0 |                 })? | 
| 666 | 3 |                 .try_into()?0, | 
| 667 | 3 |             execution_start_timestamp: eam | 
| 668 | 3 |                 .execution_start_timestamp | 
| 669 | 3 |                 .err_tip(|| {0 | 
| 670 | 0 |                     "Expected execution_start_timestamp to exist in ExecutedActionMetadata" | 
| 671 | 0 |                 })? | 
| 672 | 3 |                 .try_into()?0, | 
| 673 | 3 |             execution_completed_timestamp: eam | 
| 674 | 3 |                 .execution_completed_timestamp | 
| 675 | 3 |                 .err_tip(|| {0 | 
| 676 | 0 |                     "Expected execution_completed_timestamp to exist in ExecutedActionMetadata" | 
| 677 | 0 |                 })? | 
| 678 | 3 |                 .try_into()?0, | 
| 679 | 3 |             output_upload_start_timestamp: eam | 
| 680 | 3 |                 .output_upload_start_timestamp | 
| 681 | 3 |                 .err_tip(|| {0 | 
| 682 | 0 |                     "Expected output_upload_start_timestamp to exist in ExecutedActionMetadata" | 
| 683 | 0 |                 })? | 
| 684 | 3 |                 .try_into()?0, | 
| 685 | 3 |             output_upload_completed_timestamp: eam | 
| 686 | 3 |                 .output_upload_completed_timestamp | 
| 687 | 3 |                 .err_tip(|| {0 | 
| 688 | 0 |                     "Expected output_upload_completed_timestamp to exist in ExecutedActionMetadata" | 
| 689 | 0 |                 })? | 
| 690 | 3 |                 .try_into()?0, | 
| 691 |  |         }) | 
| 692 | 3 |     } | 
| 693 |  | } | 
| 694 |  |  | 
| 695 |  | /// Represents the results of an execution. | 
| 696 |  | /// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`. | 
| 697 |  | #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 698 |  | pub struct ActionResult { | 
| 699 |  |     pub output_files: Vec<FileInfo>, | 
| 700 |  |     pub output_folders: Vec<DirectoryInfo>, | 
| 701 |  |     pub output_directory_symlinks: Vec<SymlinkInfo>, | 
| 702 |  |     pub output_file_symlinks: Vec<SymlinkInfo>, | 
| 703 |  |     pub exit_code: i32, | 
| 704 |  |     pub stdout_digest: DigestInfo, | 
| 705 |  |     pub stderr_digest: DigestInfo, | 
| 706 |  |     pub execution_metadata: ExecutionMetadata, | 
| 707 |  |     pub server_logs: HashMap<String, DigestInfo>, | 
| 708 |  |     pub error: Option<Error>, | 
| 709 |  |     pub message: String, | 
| 710 |  | } | 
| 711 |  |  | 
| 712 |  | impl Default for ActionResult { | 
| 713 | 8 |     fn default() -> Self { | 
| 714 | 8 |         Self { | 
| 715 | 8 |             output_files: Vec::default(), | 
| 716 | 8 |             output_folders: Vec::default(), | 
| 717 | 8 |             output_directory_symlinks: Vec::default(), | 
| 718 | 8 |             output_file_symlinks: Vec::default(), | 
| 719 | 8 |             exit_code: INTERNAL_ERROR_EXIT_CODE, | 
| 720 | 8 |             stdout_digest: DigestInfo::new([0u8; 32], 0), | 
| 721 | 8 |             stderr_digest: DigestInfo::new([0u8; 32], 0), | 
| 722 | 8 |             execution_metadata: ExecutionMetadata { | 
| 723 | 8 |                 worker: String::new(), | 
| 724 | 8 |                 queued_timestamp: SystemTime::UNIX_EPOCH, | 
| 725 | 8 |                 worker_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 726 | 8 |                 worker_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 727 | 8 |                 input_fetch_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 728 | 8 |                 input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 729 | 8 |                 execution_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 730 | 8 |                 execution_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 731 | 8 |                 output_upload_start_timestamp: SystemTime::UNIX_EPOCH, | 
| 732 | 8 |                 output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, | 
| 733 | 8 |             }, | 
| 734 | 8 |             server_logs: HashMap::default(), | 
| 735 | 8 |             error: None, | 
| 736 | 8 |             message: String::new(), | 
| 737 | 8 |         } | 
| 738 | 8 |     } | 
| 739 |  | } | 
| 740 |  |  | 
| 741 |  | /// The execution status/stage. This should match `ExecutionStage::Value` in `remote_execution.proto`. | 
| 742 |  | #[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] | 
| 743 |  | #[allow( | 
| 744 |  |     clippy::large_enum_variant, | 
| 745 |  |     reason = "TODO box the two relevant variants in a breaking release. Unfulfilled on nightly" | 
| 746 |  | )] | 
| 747 |  | pub enum ActionStage { | 
| 748 |  |     /// Stage is unknown. | 
| 749 |  |     Unknown, | 
| 750 |  |     /// Checking the cache to see if action exists. | 
| 751 |  |     CacheCheck, | 
| 752 |  |     /// Action has been accepted and waiting for worker to take it. | 
| 753 |  |     Queued, | 
| 754 |  |     // TODO(palfrey) We need a way to know if the job was sent to a worker, but hasn't begun | 
| 755 |  |     // execution yet. | 
| 756 |  |     /// Worker is executing the action. | 
| 757 |  |     Executing, | 
| 758 |  |     /// Worker completed the work with result. | 
| 759 |  |     Completed(ActionResult), | 
| 760 |  |     /// Result was found from cache, don't decode the proto just to re-encode it. | 
| 761 |  |     #[serde(serialize_with = "serialize_proto_result", skip_deserializing)] | 
| 762 |  |     // The serialization step decodes this to an ActionResult which is serializable. | 
| 763 |  |     // Since it will always be serialized as an ActionResult, we do not need to support | 
| 764 |  |     // deserialization on this type at all. | 
| 765 |  |     // In theory, serializing this should never happen so performance shouldn't be affected. | 
| 766 |  |     CompletedFromCache(ProtoActionResult), | 
| 767 |  | } | 
| 768 |  |  | 
| 769 | 0 | fn serialize_proto_result<S>(v: &ProtoActionResult, serializer: S) -> Result<S::Ok, S::Error> | 
| 770 | 0 | where | 
| 771 | 0 |     S: serde::Serializer, | 
| 772 |  | { | 
| 773 | 0 |     let s = ActionResult::try_from(v.clone()).map_err(S::Error::custom)?; | 
| 774 | 0 |     s.serialize(serializer) | 
| 775 | 0 | } | 
| 776 |  |  | 
| 777 |  | impl ActionStage { | 
| 778 | 100 |     pub const fn has_action_result(&self) -> bool { | 
| 779 | 100 |         match self { | 
| 780 | 86 |             Self::Unknown | Self::CacheCheck | Self::Queued | Self::Executing => false, | 
| 781 | 14 |             Self::Completed(_) | Self::CompletedFromCache(_) => true, | 
| 782 |  |         } | 
| 783 | 100 |     } | 
| 784 |  |  | 
| 785 |  |     /// Returns true if the worker considers the action done and no longer needs to be tracked. | 
| 786 |  |     // Note: This function is separate from `has_action_result()` to not mix the concept of | 
| 787 |  |     //       "finished" with "has a result". | 
| 788 | 99 |     pub const fn is_finished(&self) -> bool { | 
| 789 | 99 |         self.has_action_result() | 
| 790 | 99 |     } | 
| 791 |  |  | 
| 792 |  |     /// Returns if the stage enum is the same as the other stage enum, but | 
| 793 |  |     /// does not compare the values of the enum. | 
| 794 | 39 |     pub const fn is_same_stage(&self, other: &Self) -> bool { | 
| 795 | 39 |         matches!( | 
| 796 | 39 |             (self, other), | 
| 797 |  |             (Self::Unknown, Self::Unknown) | 
| 798 |  |                 | (Self::CacheCheck, Self::CacheCheck) | 
| 799 |  |                 | (Self::Queued, Self::Queued) | 
| 800 |  |                 | (Self::Executing, Self::Executing) | 
| 801 |  |                 | (Self::Completed(_), Self::Completed(_)) | 
| 802 |  |                 | (Self::CompletedFromCache(_), Self::CompletedFromCache(_)) | 
| 803 |  |         ) | 
| 804 | 39 |     } | 
| 805 |  | } | 
| 806 |  |  | 
| 807 |  | impl MetricsComponent for ActionStage { | 
| 808 | 0 |     fn publish( | 
| 809 | 0 |         &self, | 
| 810 | 0 |         _kind: MetricKind, | 
| 811 | 0 |         _field_metadata: MetricFieldData, | 
| 812 | 0 |     ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { | 
| 813 | 0 |         let value = match self { | 
| 814 | 0 |             Self::Unknown => "Unknown".to_string(), | 
| 815 | 0 |             Self::CacheCheck => "CacheCheck".to_string(), | 
| 816 | 0 |             Self::Queued => "Queued".to_string(), | 
| 817 | 0 |             Self::Executing => "Executing".to_string(), | 
| 818 | 0 |             Self::Completed(_) => "Completed".to_string(), | 
| 819 | 0 |             Self::CompletedFromCache(_) => "CompletedFromCache".to_string(), | 
| 820 |  |         }; | 
| 821 | 0 |         Ok(MetricPublishKnownKindData::String(value)) | 
| 822 | 0 |     } | 
| 823 |  | } | 
| 824 |  |  | 
| 825 |  | impl From<&ActionStage> for execution_stage::Value { | 
| 826 | 1 |     fn from(val: &ActionStage) -> Self { | 
| 827 | 1 |         match val { | 
| 828 | 0 |             ActionStage::Unknown => Self::Unknown, | 
| 829 | 0 |             ActionStage::CacheCheck => Self::CacheCheck, | 
| 830 | 0 |             ActionStage::Queued => Self::Queued, | 
| 831 | 0 |             ActionStage::Executing => Self::Executing, | 
| 832 | 1 |             ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed, | 
| 833 |  |         } | 
| 834 | 1 |     } | 
| 835 |  | } | 
| 836 |  |  | 
| 837 | 11 | pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse { | 
| 838 | 11 |     fn logs_from(server_logs: HashMap<String, DigestInfo>) -> HashMap<String, LogFile> { | 
| 839 | 11 |         let mut logs = HashMap::with_capacity(server_logs.len()); | 
| 840 | 12 |         for (k1, v1) in server_logs { | 
| 841 | 1 |             logs.insert( | 
| 842 | 1 |                 k.clone(), | 
| 843 | 1 |                 LogFile { | 
| 844 | 1 |                     digest: Some(v.into()), | 
| 845 | 1 |                     human_readable: false, | 
| 846 | 1 |                 }, | 
| 847 | 1 |             ); | 
| 848 | 1 |         } | 
| 849 | 11 |         logs | 
| 850 | 11 |     } | 
| 851 |  |  | 
| 852 | 11 |     let status = Some( | 
| 853 | 11 |         action_result | 
| 854 | 11 |             .error | 
| 855 | 11 |             .clone() | 
| 856 | 11 |             .map_or_else(Status::default, Into::into), | 
| 857 | 11 |     ); | 
| 858 | 11 |     let message = action_result.message.clone(); | 
| 859 | 11 |     ExecuteResponse { | 
| 860 | 11 |         server_logs: logs_from(action_result.server_logs.clone()), | 
| 861 | 11 |         result: action_result.try_into().ok(), | 
| 862 | 11 |         cached_result: false, | 
| 863 | 11 |         status, | 
| 864 | 11 |         message, | 
| 865 | 11 |     } | 
| 866 | 11 | } | 
| 867 |  |  | 
| 868 |  | impl From<ActionStage> for ExecuteResponse { | 
| 869 | 6 |     fn from(val: ActionStage) -> Self { | 
| 870 | 6 |         match val { | 
| 871 |  |             // We don't have an execute response if we don't have the results. It is defined | 
| 872 |  |             // behavior to return an empty proto struct. | 
| 873 |  |             ActionStage::Unknown | 
| 874 |  |             | ActionStage::CacheCheck | 
| 875 |  |             | ActionStage::Queued | 
| 876 | 0 |             | ActionStage::Executing => Self::default(), | 
| 877 | 6 |             ActionStage::Completed(action_result) => to_execute_response(action_result), | 
| 878 |  |             // Handled separately as there are no server logs and the action | 
| 879 |  |             // result is already in Proto format. | 
| 880 | 0 |             ActionStage::CompletedFromCache(proto_action_result) => Self { | 
| 881 | 0 |                 server_logs: HashMap::new(), | 
| 882 | 0 |                 result: Some(proto_action_result), | 
| 883 | 0 |                 cached_result: true, | 
| 884 | 0 |                 status: Some(Status::default()), | 
| 885 | 0 |                 message: String::new(), // Will be populated later if applicable. | 
| 886 | 0 |             }, | 
| 887 |  |         } | 
| 888 | 6 |     } | 
| 889 |  | } | 
| 890 |  |  | 
| 891 |  | impl TryFrom<ActionResult> for ProtoActionResult { | 
| 892 |  |     type Error = Error; | 
| 893 |  |  | 
| 894 | 17 |     fn try_from(val: ActionResult) -> Result<Self, Error> { | 
| 895 | 17 |         let mut output_symlinks = Vec::with_capacity( | 
| 896 | 17 |             val.output_file_symlinks.len() + val.output_directory_symlinks.len(), | 
| 897 |  |         ); | 
| 898 | 17 |         output_symlinks.extend_from_slice(val.output_file_symlinks.as_slice()); | 
| 899 | 17 |         output_symlinks.extend_from_slice(val.output_directory_symlinks.as_slice()); | 
| 900 |  |  | 
| 901 |  |         Ok(Self { | 
| 902 | 17 |             output_files: val | 
| 903 | 17 |                 .output_files | 
| 904 | 17 |                 .into_iter() | 
| 905 | 17 |                 .map(TryInto::try_into) | 
| 906 | 17 |                 .collect::<Result<_, _>>()?0, | 
| 907 | 17 |             output_file_symlinks: val | 
| 908 | 17 |                 .output_file_symlinks | 
| 909 | 17 |                 .into_iter() | 
| 910 | 17 |                 .map(TryInto::try_into) | 
| 911 | 17 |                 .collect::<Result<_, _>>()?0, | 
| 912 | 17 |             output_symlinks: output_symlinks | 
| 913 | 17 |                 .into_iter() | 
| 914 | 17 |                 .map(TryInto::try_into) | 
| 915 | 17 |                 .collect::<Result<_, _>>()?0, | 
| 916 | 17 |             output_directories: val | 
| 917 | 17 |                 .output_folders | 
| 918 | 17 |                 .into_iter() | 
| 919 | 17 |                 .map(TryInto::try_into) | 
| 920 | 17 |                 .collect::<Result<_, _>>()?0, | 
| 921 | 17 |             output_directory_symlinks: val | 
| 922 | 17 |                 .output_directory_symlinks | 
| 923 | 17 |                 .into_iter() | 
| 924 | 17 |                 .map(TryInto::try_into) | 
| 925 | 17 |                 .collect::<Result<_, _>>()?0, | 
| 926 | 17 |             exit_code: val.exit_code, | 
| 927 | 17 |             stdout_raw: Bytes::default(), | 
| 928 | 17 |             stdout_digest: Some(val.stdout_digest.into()), | 
| 929 | 17 |             stderr_raw: Bytes::default(), | 
| 930 | 17 |             stderr_digest: Some(val.stderr_digest.into()), | 
| 931 | 17 |             execution_metadata: Some(val.execution_metadata.into()), | 
| 932 |  |         }) | 
| 933 | 17 |     } | 
| 934 |  | } | 
| 935 |  |  | 
| 936 |  | impl TryFrom<ProtoActionResult> for ActionResult { | 
| 937 |  |     type Error = Error; | 
| 938 |  |  | 
| 939 | 0 |     fn try_from(val: ProtoActionResult) -> Result<Self, Error> { | 
| 940 | 0 |         let output_file_symlinks = val | 
| 941 | 0 |             .output_file_symlinks | 
| 942 | 0 |             .into_iter() | 
| 943 | 0 |             .map(|output_symlink| { | 
| 944 | 0 |                 SymlinkInfo::try_from(output_symlink) | 
| 945 | 0 |                     .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo") | 
| 946 | 0 |             }) | 
| 947 | 0 |             .collect::<Result<Vec<_>, _>>()?; | 
| 948 |  |  | 
| 949 | 0 |         let output_directory_symlinks = val | 
| 950 | 0 |             .output_directory_symlinks | 
| 951 | 0 |             .into_iter() | 
| 952 | 0 |             .map(|output_symlink| { | 
| 953 | 0 |                 SymlinkInfo::try_from(output_symlink) | 
| 954 | 0 |                     .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo") | 
| 955 | 0 |             }) | 
| 956 | 0 |             .collect::<Result<Vec<_>, _>>()?; | 
| 957 |  |  | 
| 958 | 0 |         let output_files = val | 
| 959 | 0 |             .output_files | 
| 960 | 0 |             .into_iter() | 
| 961 | 0 |             .map(|output_file| { | 
| 962 | 0 |                 output_file | 
| 963 | 0 |                     .try_into() | 
| 964 | 0 |                     .err_tip(|| "Output File could not be converted") | 
| 965 | 0 |             }) | 
| 966 | 0 |             .collect::<Result<Vec<_>, _>>()?; | 
| 967 |  |  | 
| 968 | 0 |         let output_folders = val | 
| 969 | 0 |             .output_directories | 
| 970 | 0 |             .into_iter() | 
| 971 | 0 |             .map(|output_directory| { | 
| 972 | 0 |                 output_directory | 
| 973 | 0 |                     .try_into() | 
| 974 | 0 |                     .err_tip(|| "Output File could not be converted") | 
| 975 | 0 |             }) | 
| 976 | 0 |             .collect::<Result<Vec<_>, _>>()?; | 
| 977 |  |  | 
| 978 |  |         Ok(Self { | 
| 979 | 0 |             output_files, | 
| 980 | 0 |             output_folders, | 
| 981 | 0 |             output_file_symlinks, | 
| 982 | 0 |             output_directory_symlinks, | 
| 983 | 0 |             exit_code: val.exit_code, | 
| 984 | 0 |             stdout_digest: val | 
| 985 | 0 |                 .stdout_digest | 
| 986 | 0 |                 .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")? | 
| 987 | 0 |                 .try_into()?, | 
| 988 | 0 |             stderr_digest: val | 
| 989 | 0 |                 .stderr_digest | 
| 990 | 0 |                 .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")? | 
| 991 | 0 |                 .try_into()?, | 
| 992 | 0 |             execution_metadata: val | 
| 993 | 0 |                 .execution_metadata | 
| 994 | 0 |                 .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")? | 
| 995 | 0 |                 .try_into()?, | 
| 996 | 0 |             server_logs: HashMap::default(), | 
| 997 | 0 |             error: None, | 
| 998 | 0 |             message: String::new(), | 
| 999 |  |         }) | 
| 1000 | 0 |     } | 
| 1001 |  | } | 
| 1002 |  |  | 
| 1003 |  | impl TryFrom<ExecuteResponse> for ActionStage { | 
| 1004 |  |     type Error = Error; | 
| 1005 |  |  | 
| 1006 | 3 |     fn try_from(execute_response: ExecuteResponse) -> Result<Self, Error> { | 
| 1007 | 3 |         let proto_action_result = execute_response | 
| 1008 | 3 |             .result | 
| 1009 | 3 |             .err_tip(|| "Expected result to be set on ExecuteResponse msg")?0; | 
| 1010 | 3 |         let action_result = ActionResult { | 
| 1011 | 3 |             output_files: proto_action_result | 
| 1012 | 3 |                 .output_files | 
| 1013 | 3 |                 .try_map(TryInto::try_into)?0, | 
| 1014 | 3 |             output_directory_symlinks: proto_action_result | 
| 1015 | 3 |                 .output_directory_symlinks | 
| 1016 | 3 |                 .try_map(TryInto::try_into)?0, | 
| 1017 | 3 |             output_file_symlinks: proto_action_result | 
| 1018 | 3 |                 .output_file_symlinks | 
| 1019 | 3 |                 .try_map(TryInto::try_into)?0, | 
| 1020 | 3 |             output_folders: proto_action_result | 
| 1021 | 3 |                 .output_directories | 
| 1022 | 3 |                 .try_map(TryInto::try_into)?0, | 
| 1023 | 3 |             exit_code: proto_action_result.exit_code, | 
| 1024 |  |  | 
| 1025 | 3 |             stdout_digest: proto_action_result | 
| 1026 | 3 |                 .stdout_digest | 
| 1027 | 3 |                 .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")?0 | 
| 1028 | 3 |                 .try_into()?0, | 
| 1029 | 3 |             stderr_digest: proto_action_result | 
| 1030 | 3 |                 .stderr_digest | 
| 1031 | 3 |                 .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")?0 | 
| 1032 | 3 |                 .try_into()?0, | 
| 1033 | 3 |             execution_metadata: proto_action_result | 
| 1034 | 3 |                 .execution_metadata | 
| 1035 | 3 |                 .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")?0 | 
| 1036 | 3 |                 .try_into()?0, | 
| 1037 | 3 |             server_logs: execute_response.server_logs.try_map(|v| {2 | 
| 1038 | 2 |                 v.digest | 
| 1039 | 2 |                     .err_tip(|| "Expected digest to be set on LogFile msg")?0 | 
| 1040 | 2 |                     .try_into() | 
| 1041 | 2 |             })?0, | 
| 1042 | 3 |             error: execute_response | 
| 1043 | 3 |                 .status | 
| 1044 | 3 |                 .clone() | 
| 1045 | 3 |                 .and_then(|v| if v.code == 0 { None1} else { Some(v.into())2}),  Branch (1045:34): [True: 1, False: 2]
  Branch (1045:34): [Folded - Ignored]
 | 
| 1046 | 3 |             message: execute_response.message, | 
| 1047 |  |         }; | 
| 1048 |  |  | 
| 1049 | 3 |         if execute_response.cached_result {  Branch (1049:12): [True: 0, False: 3]
  Branch (1049:12): [Folded - Ignored]
 | 
| 1050 | 0 |             return Ok(Self::CompletedFromCache(action_result.try_into()?)); | 
| 1051 | 3 |         } | 
| 1052 | 3 |         Ok(Self::Completed(action_result)) | 
| 1053 | 3 |     } | 
| 1054 |  | } | 
| 1055 |  |  | 
| 1056 |  | // TODO: Should be able to remove this after tokio-rs/prost#299 | 
| 1057 |  | trait TypeUrl: Message { | 
| 1058 |  |     const TYPE_URL: &'static str; | 
| 1059 |  | } | 
| 1060 |  |  | 
| 1061 |  | impl TypeUrl for ExecuteResponse { | 
| 1062 |  |     const TYPE_URL: &'static str = | 
| 1063 |  |         "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteResponse"; | 
| 1064 |  | } | 
| 1065 |  |  | 
| 1066 |  | impl TypeUrl for ExecuteOperationMetadata { | 
| 1067 |  |     const TYPE_URL: &'static str = | 
| 1068 |  |         "type.googleapis.com/build.bazel.remote.execution.v2.ExecuteOperationMetadata"; | 
| 1069 |  | } | 
| 1070 |  |  | 
| 1071 | 2 | fn from_any<T>(message: &Any) -> Result<T, Error> | 
| 1072 | 2 | where | 
| 1073 | 2 |     T: TypeUrl + Default, | 
| 1074 |  | { | 
| 1075 | 0 |     error_if!( | 
| 1076 | 2 |         message.type_url != T::TYPE_URL,   Branch (1076:9): [True: 0, False: 1]
  Branch (1076:9): [True: 0, False: 1]
  Branch (1076:9): [Folded - Ignored]
 | 
| 1077 |  |         "Incorrect type when decoding Any. {} != {}", | 
| 1078 |  |         message.type_url, | 
| 1079 | 0 |         T::TYPE_URL.to_string() | 
| 1080 |  |     ); | 
| 1081 | 2 |     Ok(T::decode(message.value.as_slice())?0) | 
| 1082 | 2 | } | 
| 1083 |  |  | 
| 1084 | 2 | fn to_any<T>(message: &T) -> Any | 
| 1085 | 2 | where | 
| 1086 | 2 |     T: TypeUrl, | 
| 1087 |  | { | 
| 1088 | 2 |     Any { | 
| 1089 | 2 |         type_url: T::TYPE_URL.to_string(), | 
| 1090 | 2 |         value: message.encode_to_vec(), | 
| 1091 | 2 |     } | 
| 1092 | 2 | } | 
| 1093 |  |  | 
| 1094 |  | /// Current state of the action. | 
| 1095 |  | /// This must be 100% compatible with `Operation` in `google/longrunning/operations.proto`. | 
| 1096 |  | #[derive(PartialEq, Debug, Clone, Serialize, Deserialize, MetricsComponent)] | 
| 1097 |  | pub struct ActionState { | 
| 1098 |  |     #[metric(help = "The current stage of the action.")] | 
| 1099 |  |     pub stage: ActionStage, | 
| 1100 |  |     #[metric(help = "The unique identifier of the action.")] | 
| 1101 |  |     pub client_operation_id: OperationId, | 
| 1102 |  |     #[metric(help = "The digest of the action.")] | 
| 1103 |  |     pub action_digest: DigestInfo, | 
| 1104 |  | } | 
| 1105 |  |  | 
| 1106 |  | impl ActionState { | 
| 1107 | 1 |     pub fn try_from_operation( | 
| 1108 | 1 |         operation: Operation, | 
| 1109 | 1 |         client_operation_id: OperationId, | 
| 1110 | 1 |     ) -> Result<Self, Error> { | 
| 1111 | 1 |         let metadata = from_any::<ExecuteOperationMetadata>( | 
| 1112 | 1 |             &operation | 
| 1113 | 1 |                 .metadata | 
| 1114 | 1 |                 .err_tip(|| "No metadata in upstream operation")?0, | 
| 1115 |  |         ) | 
| 1116 | 1 |         .err_tip(|| "Could not decode metadata in upstream operation")?0; | 
| 1117 |  |  | 
| 1118 | 1 |         let stage = match execution_stage::Value::try_from(metadata.stage).err_tip(|| {0 | 
| 1119 | 0 |             format!( | 
| 1120 | 0 |                 "Could not convert {} to execution_stage::Value", | 
| 1121 |  |                 metadata.stage | 
| 1122 |  |             ) | 
| 1123 | 0 |         })? { | 
| 1124 | 0 |             execution_stage::Value::Unknown => ActionStage::Unknown, | 
| 1125 | 0 |             execution_stage::Value::CacheCheck => ActionStage::CacheCheck, | 
| 1126 | 0 |             execution_stage::Value::Queued => ActionStage::Queued, | 
| 1127 | 0 |             execution_stage::Value::Executing => ActionStage::Executing, | 
| 1128 |  |             execution_stage::Value::Completed => { | 
| 1129 | 1 |                 let execute_response = operation | 
| 1130 | 1 |                     .result | 
| 1131 | 1 |                     .err_tip(|| "No result data for completed upstream action")?0; | 
| 1132 | 1 |                 match execute_response { | 
| 1133 | 0 |                     LongRunningResult::Error(error) => ActionStage::Completed(ActionResult { | 
| 1134 | 0 |                         error: Some(error.into()), | 
| 1135 | 0 |                         ..ActionResult::default() | 
| 1136 | 0 |                     }), | 
| 1137 | 1 |                     LongRunningResult::Response(response) => { | 
| 1138 |  |                         // Could be Completed, CompletedFromCache or Error. | 
| 1139 | 1 |                         from_any::<ExecuteResponse>(&response) | 
| 1140 | 1 |                             .err_tip(|| {0 | 
| 1141 | 0 |                                 "Could not decode result structure for completed upstream action" | 
| 1142 | 0 |                             })? | 
| 1143 | 1 |                             .try_into()?0 | 
| 1144 |  |                     } | 
| 1145 |  |                 } | 
| 1146 |  |             } | 
| 1147 |  |         }; | 
| 1148 |  |  | 
| 1149 | 1 |         let action_digest = metadata | 
| 1150 | 1 |             .action_digest | 
| 1151 | 1 |             .err_tip(|| "No action_digest in upstream operation")?0 | 
| 1152 | 1 |             .try_into() | 
| 1153 | 1 |             .err_tip(|| "Could not convert action_digest into DigestInfo")?0; | 
| 1154 |  |  | 
| 1155 | 1 |         Ok(Self { | 
| 1156 | 1 |             stage, | 
| 1157 | 1 |             client_operation_id, | 
| 1158 | 1 |             action_digest, | 
| 1159 | 1 |         }) | 
| 1160 | 1 |     } | 
| 1161 |  |  | 
| 1162 | 1 |     pub fn as_operation(&self, client_operation_id: OperationId) -> Operation { | 
| 1163 | 1 |         let stage = Into::<execution_stage::Value>::into(&self.stage) as i32; | 
| 1164 | 1 |         let name = client_operation_id.into_string(); | 
| 1165 |  |  | 
| 1166 | 1 |         let result = if self.stage.has_action_result() {  Branch (1166:25): [True: 1, False: 0]
  Branch (1166:25): [Folded - Ignored]
 | 
| 1167 | 1 |             let execute_response: ExecuteResponse = self.stage.clone().into(); | 
| 1168 | 1 |             Some(LongRunningResult::Response(to_any(&execute_response))) | 
| 1169 |  |         } else { | 
| 1170 | 0 |             None | 
| 1171 |  |         }; | 
| 1172 | 1 |         let digest = Some(self.action_digest.into()); | 
| 1173 |  |  | 
| 1174 | 1 |         let metadata = ExecuteOperationMetadata { | 
| 1175 | 1 |             stage, | 
| 1176 | 1 |             action_digest: digest, | 
| 1177 | 1 |             // TODO(palfrey) We should support stderr/stdout streaming. | 
| 1178 | 1 |             stdout_stream_name: String::default(), | 
| 1179 | 1 |             stderr_stream_name: String::default(), | 
| 1180 | 1 |             partial_execution_metadata: None, | 
| 1181 | 1 |         }; | 
| 1182 |  |  | 
| 1183 | 1 |         Operation { | 
| 1184 | 1 |             name, | 
| 1185 | 1 |             metadata: Some(to_any(&metadata)), | 
| 1186 | 1 |             done: result.is_some(), | 
| 1187 | 1 |             result, | 
| 1188 | 1 |         } | 
| 1189 | 1 |     } | 
| 1190 |  | } |