Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/metrics.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Business Source License, Version 1.1 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may requested a copy of the License by emailing contact@nativelink.com.
6
//
7
// Use of this module requires an enterprise license agreement, which can be
8
// attained by emailing contact@nativelink.com or signing up for Nativelink
9
// Cloud at app.nativelink.com.
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
17
use std::sync::LazyLock;
18
19
use opentelemetry::{InstrumentationScope, KeyValue, Value, global, metrics};
20
21
use crate::action_messages::ActionStage;
22
23
// Metric attribute keys for cache operations.
24
pub const CACHE_TYPE: &str = "cache.type";
25
pub const CACHE_OPERATION: &str = "cache.operation.name";
26
pub const CACHE_RESULT: &str = "cache.operation.result";
27
28
// Metric attribute keys for remote execution operations.
29
pub const EXECUTION_STAGE: &str = "execution.stage";
30
pub const EXECUTION_RESULT: &str = "execution.result";
31
pub const EXECUTION_INSTANCE: &str = "execution.instance";
32
pub const EXECUTION_PRIORITY: &str = "execution.priority";
33
pub const EXECUTION_WORKER_ID: &str = "execution.worker_id";
34
pub const EXECUTION_EXIT_CODE: &str = "execution.exit_code";
35
pub const EXECUTION_ACTION_DIGEST: &str = "execution.action_digest";
36
37
/// Cache operation types for metrics classification.
38
#[derive(Debug, Clone, Copy)]
39
pub enum CacheOperationName {
40
    /// Data retrieval operations (get, peek, contains, etc.)
41
    Read,
42
    /// Data storage operations (insert, update, replace, etc.)
43
    Write,
44
    /// Explicit data removal operations
45
    Delete,
46
    /// Automatic cache maintenance (evictions, TTL cleanup, etc.)
47
    Evict,
48
}
49
50
impl From<CacheOperationName> for Value {
51
10
    fn from(op: CacheOperationName) -> Self {
52
10
        match op {
53
3
            CacheOperationName::Read => Self::from("read"),
54
2
            CacheOperationName::Write => Self::from("write"),
55
3
            CacheOperationName::Delete => Self::from("delete"),
56
2
            CacheOperationName::Evict => Self::from("evict"),
57
        }
58
10
    }
59
}
60
61
/// Results of cache operations.
62
///
63
/// Result semantics vary by operation type:
64
/// - Read: Hit/Miss/Expired indicate data availability
65
/// - Write/Delete/Evict: Success/Error indicate completion status
66
#[derive(Debug, Clone, Copy)]
67
pub enum CacheOperationResult {
68
    /// Data found and valid (Read operations)
69
    Hit,
70
    /// Data not found (Read operations)
71
    Miss,
72
    /// Data found but invalid/expired (Read operations)
73
    Expired,
74
    /// Operation completed successfully (Write/Delete/Evict operations)
75
    Success,
76
    /// Operation failed (any operation type)
77
    Error,
78
}
79
80
impl From<CacheOperationResult> for Value {
81
10
    fn from(result: CacheOperationResult) -> Self {
82
10
        match result {
83
1
            CacheOperationResult::Hit => Self::from("hit"),
84
2
            CacheOperationResult::Miss => Self::from("miss"),
85
2
            CacheOperationResult::Expired => Self::from("expired"),
86
3
            CacheOperationResult::Success => Self::from("success"),
87
2
            CacheOperationResult::Error => Self::from("error"),
88
        }
89
10
    }
90
}
91
92
/// Remote execution stages for metrics classification.
93
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94
pub enum ExecutionStage {
95
    /// Unknown stage
96
    Unknown,
97
    /// Checking cache for existing results
98
    CacheCheck,
99
    /// Action is queued waiting for execution
100
    Queued,
101
    /// Action is being executed by a worker
102
    Executing,
103
    /// Action execution completed
104
    Completed,
105
}
106
107
impl From<ExecutionStage> for Value {
108
158
    fn from(stage: ExecutionStage) -> Self {
109
158
        match stage {
110
1
            ExecutionStage::Unknown => Self::from("unknown"),
111
1
            ExecutionStage::CacheCheck => Self::from("cache_check"),
112
68
            ExecutionStage::Queued => Self::from("queued"),
113
70
            ExecutionStage::Executing => Self::from("executing"),
114
18
            ExecutionStage::Completed => Self::from("completed"),
115
        }
116
158
    }
117
}
118
119
impl From<ActionStage> for ExecutionStage {
120
5
    fn from(stage: ActionStage) -> Self {
121
5
        match stage {
122
1
            ActionStage::Unknown => Self::Unknown,
123
1
            ActionStage::CacheCheck => Self::CacheCheck,
124
1
            ActionStage::Queued => Self::Queued,
125
1
            ActionStage::Executing => Self::Executing,
126
1
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
127
        }
128
5
    }
129
}
130
131
impl From<&ActionStage> for ExecutionStage {
132
10.1k
    fn from(stage: &ActionStage) -> Self {
133
10.1k
        match stage {
134
1
            ActionStage::Unknown => Self::Unknown,
135
1
            ActionStage::CacheCheck => Self::CacheCheck,
136
44
            ActionStage::Queued => Self::Queued,
137
70
            ActionStage::Executing => Self::Executing,
138
10.0k
            ActionStage::Completed(_) | ActionStage::CompletedFromCache(_) => Self::Completed,
139
        }
140
10.1k
    }
141
}
142
143
/// Results of remote execution operations.
144
#[derive(Debug, Clone, Copy)]
145
pub enum ExecutionResult {
146
    /// Execution completed successfully
147
    Success,
148
    /// Execution failed
149
    Failure,
150
    /// Execution was cancelled
151
    Cancelled,
152
    /// Execution timed out
153
    Timeout,
154
    /// Result was found in cache
155
    CacheHit,
156
}
157
158
impl From<ExecutionResult> for Value {
159
18
    fn from(result: ExecutionResult) -> Self {
160
18
        match result {
161
11
            ExecutionResult::Success => Self::from("success"),
162
4
            ExecutionResult::Failure => Self::from("failure"),
163
1
            ExecutionResult::Cancelled => Self::from("cancelled"),
164
1
            ExecutionResult::Timeout => Self::from("timeout"),
165
1
            ExecutionResult::CacheHit => Self::from("cache_hit"),
166
        }
167
18
    }
168
}
169
170
/// Pre-allocated attribute combinations for efficient cache metrics collection.
171
///
172
/// Avoids runtime allocation by pre-computing common attribute combinations
173
/// for cache operations and results.
174
#[derive(Debug)]
175
pub struct CacheMetricAttrs {
176
    // Read operation attributes
177
    read_hit: Vec<KeyValue>,
178
    read_miss: Vec<KeyValue>,
179
    read_expired: Vec<KeyValue>,
180
181
    // Write operation attributes
182
    write_success: Vec<KeyValue>,
183
    write_error: Vec<KeyValue>,
184
185
    // Delete operation attributes
186
    delete_success: Vec<KeyValue>,
187
    delete_miss: Vec<KeyValue>,
188
    delete_error: Vec<KeyValue>,
189
190
    // Evict operation attributes
191
    evict_success: Vec<KeyValue>,
192
    evict_expired: Vec<KeyValue>,
193
}
194
195
impl CacheMetricAttrs {
196
    /// Creates a new set of pre-computed attributes.
197
    ///
198
    /// The `base_attrs` are included in all attribute combinations (e.g., cache
199
    /// type, instance ID).
200
    #[must_use]
201
1
    pub fn new(base_attrs: &[KeyValue]) -> Self {
202
10
        let 
make_attrs1
= |op: CacheOperationName, result: CacheOperationResult| {
203
10
            let mut attrs = base_attrs.to_vec();
204
10
            attrs.push(KeyValue::new(CACHE_OPERATION, op));
205
10
            attrs.push(KeyValue::new(CACHE_RESULT, result));
206
10
            attrs
207
10
        };
208
209
1
        Self {
210
1
            read_hit: make_attrs(CacheOperationName::Read, CacheOperationResult::Hit),
211
1
            read_miss: make_attrs(CacheOperationName::Read, CacheOperationResult::Miss),
212
1
            read_expired: make_attrs(CacheOperationName::Read, CacheOperationResult::Expired),
213
1
214
1
            write_success: make_attrs(CacheOperationName::Write, CacheOperationResult::Success),
215
1
            write_error: make_attrs(CacheOperationName::Write, CacheOperationResult::Error),
216
1
217
1
            delete_success: make_attrs(CacheOperationName::Delete, CacheOperationResult::Success),
218
1
            delete_miss: make_attrs(CacheOperationName::Delete, CacheOperationResult::Miss),
219
1
            delete_error: make_attrs(CacheOperationName::Delete, CacheOperationResult::Error),
220
1
221
1
            evict_success: make_attrs(CacheOperationName::Evict, CacheOperationResult::Success),
222
1
            evict_expired: make_attrs(CacheOperationName::Evict, CacheOperationResult::Expired),
223
1
        }
224
1
    }
225
226
    // Attribute accessors
227
    #[must_use]
228
0
    pub fn read_hit(&self) -> &[KeyValue] {
229
0
        &self.read_hit
230
0
    }
231
    #[must_use]
232
0
    pub fn read_miss(&self) -> &[KeyValue] {
233
0
        &self.read_miss
234
0
    }
235
    #[must_use]
236
0
    pub fn read_expired(&self) -> &[KeyValue] {
237
0
        &self.read_expired
238
0
    }
239
    #[must_use]
240
0
    pub fn write_success(&self) -> &[KeyValue] {
241
0
        &self.write_success
242
0
    }
243
    #[must_use]
244
0
    pub fn write_error(&self) -> &[KeyValue] {
245
0
        &self.write_error
246
0
    }
247
    #[must_use]
248
0
    pub fn delete_success(&self) -> &[KeyValue] {
249
0
        &self.delete_success
250
0
    }
251
    #[must_use]
252
0
    pub fn delete_miss(&self) -> &[KeyValue] {
253
0
        &self.delete_miss
254
0
    }
255
    #[must_use]
256
0
    pub fn delete_error(&self) -> &[KeyValue] {
257
0
        &self.delete_error
258
0
    }
259
    #[must_use]
260
0
    pub fn evict_success(&self) -> &[KeyValue] {
261
0
        &self.evict_success
262
0
    }
263
    #[must_use]
264
0
    pub fn evict_expired(&self) -> &[KeyValue] {
265
0
        &self.evict_expired
266
0
    }
267
}
268
269
/// Pre-allocated attribute combinations for efficient remote execution metrics collection.
270
#[derive(Debug)]
271
pub struct ExecutionMetricAttrs {
272
    // Stage transition attributes
273
    unknown: Vec<KeyValue>,
274
    cache_check: Vec<KeyValue>,
275
    queued: Vec<KeyValue>,
276
    executing: Vec<KeyValue>,
277
    completed_success: Vec<KeyValue>,
278
    completed_failure: Vec<KeyValue>,
279
    completed_cancelled: Vec<KeyValue>,
280
    completed_timeout: Vec<KeyValue>,
281
    completed_cache_hit: Vec<KeyValue>,
282
}
283
284
impl ExecutionMetricAttrs {
285
    /// Creates a new set of pre-computed attributes.
286
    ///
287
    /// The `base_attrs` are included in all attribute combinations (e.g., instance
288
    /// name, worker ID).
289
    #[must_use]
290
1
    pub fn new(base_attrs: &[KeyValue]) -> Self {
291
9
        let 
make_attrs1
= |stage: ExecutionStage, result: Option<ExecutionResult>| {
292
9
            let mut attrs = base_attrs.to_vec();
293
9
            attrs.push(KeyValue::new(EXECUTION_STAGE, stage));
294
9
            if let Some(
result5
) = result {
  Branch (294:20): [True: 5, False: 4]
  Branch (294:20): [Folded - Ignored]
295
5
                attrs.push(KeyValue::new(EXECUTION_RESULT, result));
296
5
            
}4
297
9
            attrs
298
9
        };
299
300
1
        Self {
301
1
            unknown: make_attrs(ExecutionStage::Unknown, None),
302
1
            cache_check: make_attrs(ExecutionStage::CacheCheck, None),
303
1
            queued: make_attrs(ExecutionStage::Queued, None),
304
1
            executing: make_attrs(ExecutionStage::Executing, None),
305
1
            completed_success: make_attrs(
306
1
                ExecutionStage::Completed,
307
1
                Some(ExecutionResult::Success),
308
1
            ),
309
1
            completed_failure: make_attrs(
310
1
                ExecutionStage::Completed,
311
1
                Some(ExecutionResult::Failure),
312
1
            ),
313
1
            completed_cancelled: make_attrs(
314
1
                ExecutionStage::Completed,
315
1
                Some(ExecutionResult::Cancelled),
316
1
            ),
317
1
            completed_timeout: make_attrs(
318
1
                ExecutionStage::Completed,
319
1
                Some(ExecutionResult::Timeout),
320
1
            ),
321
1
            completed_cache_hit: make_attrs(
322
1
                ExecutionStage::Completed,
323
1
                Some(ExecutionResult::CacheHit),
324
1
            ),
325
1
        }
326
1
    }
327
328
    // Attribute accessors
329
    #[must_use]
330
0
    pub fn unknown(&self) -> &[KeyValue] {
331
0
        &self.unknown
332
0
    }
333
    #[must_use]
334
0
    pub fn cache_check(&self) -> &[KeyValue] {
335
0
        &self.cache_check
336
0
    }
337
    #[must_use]
338
1
    pub fn queued(&self) -> &[KeyValue] {
339
1
        &self.queued
340
1
    }
341
    #[must_use]
342
0
    pub fn executing(&self) -> &[KeyValue] {
343
0
        &self.executing
344
0
    }
345
    #[must_use]
346
1
    pub fn completed_success(&self) -> &[KeyValue] {
347
1
        &self.completed_success
348
1
    }
349
    #[must_use]
350
0
    pub fn completed_failure(&self) -> &[KeyValue] {
351
0
        &self.completed_failure
352
0
    }
353
    #[must_use]
354
0
    pub fn completed_cancelled(&self) -> &[KeyValue] {
355
0
        &self.completed_cancelled
356
0
    }
357
    #[must_use]
358
0
    pub fn completed_timeout(&self) -> &[KeyValue] {
359
0
        &self.completed_timeout
360
0
    }
361
    #[must_use]
362
0
    pub fn completed_cache_hit(&self) -> &[KeyValue] {
363
0
        &self.completed_cache_hit
364
0
    }
365
}
366
367
/// Global cache metrics instruments.
368
1
pub static CACHE_METRICS: LazyLock<CacheMetrics> = LazyLock::new(|| {
369
1
    let meter = global::meter_with_scope(InstrumentationScope::builder("nativelink").build());
370
371
1
    CacheMetrics {
372
1
        cache_operation_duration: meter
373
1
            .f64_histogram("cache.operation.duration")
374
1
            .with_description("Duration of cache operations in milliseconds")
375
1
            .with_unit("ms")
376
1
            // The range of these is quite large as a cache might be backed by
377
1
            // memory, a filesystem, or network storage. The current values were
378
1
            // determined empirically and might need adjustment.
379
1
            .with_boundaries(vec![
380
1
                // Microsecond range
381
1
                0.001, // 1μs
382
1
                0.005, // 5μs
383
1
                0.01,  // 10μs
384
1
                0.05,  // 50μs
385
1
                0.1,   // 100μs
386
1
                // Sub-millisecond range
387
1
                0.2, // 200μs
388
1
                0.5, // 500μs
389
1
                1.0, // 1ms
390
1
                // Low millisecond range
391
1
                2.0,   // 2ms
392
1
                5.0,   // 5ms
393
1
                10.0,  // 10ms
394
1
                20.0,  // 20ms
395
1
                50.0,  // 50ms
396
1
                100.0, // 100ms
397
1
                // Higher latency range
398
1
                200.0,  // 200ms
399
1
                500.0,  // 500ms
400
1
                1000.0, // 1 second
401
1
                2000.0, // 2 seconds
402
1
                5000.0, // 5 seconds
403
1
            ])
404
1
            .build(),
405
1
406
1
        cache_operations: meter
407
1
            .u64_counter("cache.operations")
408
1
            .with_description("Total cache operations by type and result")
409
1
            .build(),
410
1
411
1
        cache_io: meter
412
1
            .u64_counter("cache.io")
413
1
            .with_description("Total bytes processed by cache operations")
414
1
            .with_unit("By")
415
1
            .build(),
416
1
417
1
        cache_size: meter
418
1
            .i64_up_down_counter("cache.size")
419
1
            .with_description("Current total size of cached data")
420
1
            .with_unit("By")
421
1
            .build(),
422
1
423
1
        cache_entries: meter
424
1
            .i64_up_down_counter("cache.entries")
425
1
            .with_description("Current number of cached entries")
426
1
            .with_unit("{entry}")
427
1
            .build(),
428
1
429
1
        cache_entry_size: meter
430
1
            .u64_histogram("cache.item.size")
431
1
            .with_description("Size distribution of cached entries")
432
1
            .with_unit("By")
433
1
            .build(),
434
1
    }
435
1
});
436
437
/// OpenTelemetry metrics instruments for cache monitoring.
438
#[derive(Debug)]
439
pub struct CacheMetrics {
440
    /// Histogram of cache operation durations in milliseconds
441
    pub cache_operation_duration: metrics::Histogram<f64>,
442
    /// Counter of cache operations by type and result
443
    pub cache_operations: metrics::Counter<u64>,
444
    /// Counter of bytes read/written during cache operations
445
    pub cache_io: metrics::Counter<u64>,
446
    /// Current total size of all cached data in bytes
447
    pub cache_size: metrics::UpDownCounter<i64>,
448
    /// Current number of entries in cache
449
    pub cache_entries: metrics::UpDownCounter<i64>,
450
    /// Histogram of individual cache entry sizes in bytes
451
    pub cache_entry_size: metrics::Histogram<u64>,
452
}
453
454
/// Global remote execution metrics instruments.
455
3
pub static EXECUTION_METRICS: LazyLock<ExecutionMetrics> = LazyLock::new(|| {
456
3
    let meter = global::meter_with_scope(InstrumentationScope::builder("nativelink").build());
457
458
3
    ExecutionMetrics {
459
3
        execution_stage_duration: meter
460
3
            .f64_histogram("execution.stage.duration")
461
3
            .with_description("Duration of each execution stage in seconds")
462
3
            .with_unit("s")
463
3
            .with_boundaries(vec![
464
3
                // Sub-second range
465
3
                0.001, // 1ms
466
3
                0.01,  // 10ms
467
3
                0.1,   // 100ms
468
3
                0.5,   // 500ms
469
3
                1.0,   // 1s
470
3
                // Multi-second range
471
3
                2.0,    // 2s
472
3
                5.0,    // 5s
473
3
                10.0,   // 10s
474
3
                30.0,   // 30s
475
3
                60.0,   // 1 minute
476
3
                120.0,  // 2 minutes
477
3
                300.0,  // 5 minutes
478
3
                600.0,  // 10 minutes
479
3
                1800.0, // 30 minutes
480
3
                3600.0, // 1 hour
481
3
            ])
482
3
            .build(),
483
3
484
3
        execution_total_duration: meter
485
3
            .f64_histogram("execution.total.duration")
486
3
            .with_description(
487
3
                "Total duration of action execution from submission to completion in seconds",
488
3
            )
489
3
            .with_unit("s")
490
3
            .with_boundaries(vec![
491
3
                // Sub-second range
492
3
                0.01, // 10ms
493
3
                0.1,  // 100ms
494
3
                0.5,  // 500ms
495
3
                1.0,  // 1s
496
3
                // Multi-second range
497
3
                5.0,    // 5s
498
3
                10.0,   // 10s
499
3
                30.0,   // 30s
500
3
                60.0,   // 1 minute
501
3
                300.0,  // 5 minutes
502
3
                600.0,  // 10 minutes
503
3
                1800.0, // 30 minutes
504
3
                3600.0, // 1 hour
505
3
                7200.0, // 2 hours
506
3
            ])
507
3
            .build(),
508
3
509
3
        execution_queue_time: meter
510
3
            .f64_histogram("execution.queue.time")
511
3
            .with_description("Time spent waiting in queue before execution in seconds")
512
3
            .with_unit("s")
513
3
            .with_boundaries(vec![
514
3
                0.001, // 1ms
515
3
                0.01,  // 10ms
516
3
                0.1,   // 100ms
517
3
                0.5,   // 500ms
518
3
                1.0,   // 1s
519
3
                2.0,   // 2s
520
3
                5.0,   // 5s
521
3
                10.0,  // 10s
522
3
                30.0,  // 30s
523
3
                60.0,  // 1 minute
524
3
                300.0, // 5 minutes
525
3
                600.0, // 10 minutes
526
3
            ])
527
3
            .build(),
528
3
529
3
        execution_active_count: meter
530
3
            .i64_up_down_counter("execution.active.count")
531
3
            .with_description("Number of actions currently in each stage")
532
3
            .with_unit("{action}")
533
3
            .build(),
534
3
535
3
        execution_completed_count: meter
536
3
            .u64_counter("execution.completed.count")
537
3
            .with_description("Total number of completed executions by result")
538
3
            .with_unit("{action}")
539
3
            .build(),
540
3
541
3
        execution_stage_transitions: meter
542
3
            .u64_counter("execution.stage.transitions")
543
3
            .with_description("Number of stage transitions")
544
3
            .with_unit("{transition}")
545
3
            .build(),
546
3
547
3
        execution_output_size: meter
548
3
            .u64_histogram("execution.output.size")
549
3
            .with_description("Size of execution outputs in bytes")
550
3
            .with_unit("By")
551
3
            .with_boundaries(vec![
552
3
                1_024.0,          // 1KB
553
3
                10_240.0,         // 10KB
554
3
                102_400.0,        // 100KB
555
3
                1_048_576.0,      // 1MB
556
3
                10_485_760.0,     // 10MB
557
3
                104_857_600.0,    // 100MB
558
3
                1_073_741_824.0,  // 1GB
559
3
                10_737_418_240.0, // 10GB
560
3
            ])
561
3
            .build(),
562
3
563
3
        execution_cpu_time: meter
564
3
            .f64_histogram("execution.cpu.time")
565
3
            .with_description("CPU time consumed by action execution in seconds")
566
3
            .with_unit("s")
567
3
            .with_boundaries(vec![
568
3
                0.01,   // 10ms
569
3
                0.1,    // 100ms
570
3
                1.0,    // 1s
571
3
                10.0,   // 10s
572
3
                60.0,   // 1 minute
573
3
                300.0,  // 5 minutes
574
3
                600.0,  // 10 minutes
575
3
                1800.0, // 30 minutes
576
3
                3600.0, // 1 hour
577
3
            ])
578
3
            .build(),
579
3
580
3
        execution_memory_usage: meter
581
3
            .u64_histogram("execution.memory.usage")
582
3
            .with_description("Peak memory usage during execution in bytes")
583
3
            .with_unit("By")
584
3
            .with_boundaries(vec![
585
3
                1_048_576.0,      // 1MB
586
3
                10_485_760.0,     // 10MB
587
3
                104_857_600.0,    // 100MB
588
3
                524_288_000.0,    // 500MB
589
3
                1_073_741_824.0,  // 1GB
590
3
                5_368_709_120.0,  // 5GB
591
3
                10_737_418_240.0, // 10GB
592
3
                53_687_091_200.0, // 50GB
593
3
            ])
594
3
            .build(),
595
3
596
3
        execution_retry_count: meter
597
3
            .u64_counter("execution.retry.count")
598
3
            .with_description("Number of execution retries")
599
3
            .with_unit("{retry}")
600
3
            .build(),
601
3
    }
602
3
});
603
604
/// OpenTelemetry metrics instruments for remote execution monitoring.
605
#[derive(Debug)]
606
pub struct ExecutionMetrics {
607
    /// Histogram of stage durations in seconds
608
    pub execution_stage_duration: metrics::Histogram<f64>,
609
    /// Histogram of total execution durations in seconds
610
    pub execution_total_duration: metrics::Histogram<f64>,
611
    /// Histogram of queue wait times in seconds
612
    pub execution_queue_time: metrics::Histogram<f64>,
613
    /// Current number of actions in each stage
614
    pub execution_active_count: metrics::UpDownCounter<i64>,
615
    /// Total number of completed executions
616
    pub execution_completed_count: metrics::Counter<u64>,
617
    /// Number of stage transitions
618
    pub execution_stage_transitions: metrics::Counter<u64>,
619
    /// Histogram of output sizes in bytes
620
    pub execution_output_size: metrics::Histogram<u64>,
621
    /// Histogram of CPU time in seconds
622
    pub execution_cpu_time: metrics::Histogram<f64>,
623
    /// Histogram of peak memory usage in bytes
624
    pub execution_memory_usage: metrics::Histogram<u64>,
625
    /// Counter for execution retries
626
    pub execution_retry_count: metrics::Counter<u64>,
627
}
628
629
/// Helper function to create attributes for execution metrics
630
#[must_use]
631
111
pub fn make_execution_attributes(
632
111
    instance_name: &str,
633
111
    worker_id: Option<&str>,
634
111
    priority: Option<i32>,
635
111
) -> Vec<KeyValue> {
636
111
    let mut attrs = vec![KeyValue::new(EXECUTION_INSTANCE, instance_name.to_string())];
637
638
111
    if let Some(
worker_id38
) = worker_id {
  Branch (638:12): [True: 38, False: 73]
  Branch (638:12): [Folded - Ignored]
639
38
        attrs.push(KeyValue::new(EXECUTION_WORKER_ID, worker_id.to_string()));
640
73
    }
641
642
111
    if let Some(priority) = priority {
  Branch (642:12): [True: 111, False: 0]
  Branch (642:12): [Folded - Ignored]
643
111
        attrs.push(KeyValue::new(EXECUTION_PRIORITY, i64::from(priority)));
644
111
    
}0
645
646
111
    attrs
647
111
}