Coverage Report

Created: 2025-12-30 06:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/bytestream_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::fmt::{Debug, Formatter};
17
use core::pin::Pin;
18
use core::sync::atomic::{AtomicU64, Ordering};
19
use core::time::Duration;
20
use std::collections::HashMap;
21
use std::collections::hash_map::Entry;
22
use std::sync::Arc;
23
use std::time::{Instant, SystemTime, UNIX_EPOCH};
24
25
use bytes::BytesMut;
26
use futures::future::pending;
27
use futures::stream::unfold;
28
use futures::{Future, Stream, TryFutureExt, try_join};
29
use nativelink_config::cas_server::{ByteStreamConfig, InstanceName, WithInstanceName};
30
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
31
use nativelink_metric::{
32
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, group, publish,
33
};
34
use nativelink_proto::google::bytestream::byte_stream_server::{
35
    ByteStream, ByteStreamServer as Server,
36
};
37
use nativelink_proto::google::bytestream::{
38
    QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest,
39
    WriteResponse,
40
};
41
use nativelink_store::grpc_store::GrpcStore;
42
use nativelink_store::store_manager::StoreManager;
43
use nativelink_util::buf_channel::{
44
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
45
};
46
use nativelink_util::common::DigestInfo;
47
use nativelink_util::digest_hasher::{
48
    DigestHasherFunc, default_digest_hasher_func, make_ctx_for_hash_func,
49
};
50
use nativelink_util::proto_stream_utils::WriteRequestStreamWrapper;
51
use nativelink_util::resource_info::ResourceInfo;
52
use nativelink_util::spawn;
53
use nativelink_util::store_trait::{Store, StoreLike, StoreOptimizations, UploadSizeInfo};
54
use nativelink_util::task::JoinHandleDropGuard;
55
use opentelemetry::context::FutureExt;
56
use parking_lot::Mutex;
57
use tokio::time::sleep;
58
use tonic::{Request, Response, Status, Streaming};
59
use tracing::{Instrument, Level, debug, error, error_span, info, instrument, trace, warn};
60
61
/// If this value changes update the documentation in the config definition.
62
const DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(60);
63
64
/// If this value changes update the documentation in the config definition.
65
const DEFAULT_MAX_BYTES_PER_STREAM: usize = 64 * 1024;
66
67
/// Metrics for `ByteStream` server operations.
68
/// Tracks upload/download activity, throughput, and latency.
69
#[derive(Debug, Default)]
70
pub struct ByteStreamMetrics {
71
    /// Number of currently active uploads (includes idle streams waiting for resume)
72
    pub active_uploads: AtomicU64,
73
    /// Total number of write requests received
74
    pub write_requests_total: AtomicU64,
75
    /// Total number of successful write requests
76
    pub write_requests_success: AtomicU64,
77
    /// Total number of failed write requests
78
    pub write_requests_failure: AtomicU64,
79
    /// Total number of read requests received
80
    pub read_requests_total: AtomicU64,
81
    /// Total number of successful read requests
82
    pub read_requests_success: AtomicU64,
83
    /// Total number of failed read requests
84
    pub read_requests_failure: AtomicU64,
85
    /// Total number of `query_write_status` requests
86
    pub query_write_status_total: AtomicU64,
87
    /// Total bytes written via `ByteStream`
88
    pub bytes_written_total: AtomicU64,
89
    /// Total bytes read via `ByteStream`
90
    pub bytes_read_total: AtomicU64,
91
    /// Sum of write durations in nanoseconds (for average latency calculation)
92
    pub write_duration_ns: AtomicU64,
93
    /// Sum of read durations in nanoseconds (for average latency calculation)
94
    pub read_duration_ns: AtomicU64,
95
    /// Number of UUID collisions detected
96
    pub uuid_collisions: AtomicU64,
97
    /// Number of resumed uploads (client reconnected to existing stream)
98
    pub resumed_uploads: AtomicU64,
99
    /// Number of idle streams that timed out
100
    pub idle_stream_timeouts: AtomicU64,
101
}
102
103
impl MetricsComponent for ByteStreamMetrics {
104
0
    fn publish(
105
0
        &self,
106
0
        _kind: MetricKind,
107
0
        field_metadata: MetricFieldData,
108
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
109
0
        let _enter = group!(field_metadata.name).entered();
110
111
0
        publish!(
112
0
            "active_uploads",
113
0
            &self.active_uploads,
114
0
            MetricKind::Counter,
115
0
            "Number of currently active uploads"
116
        );
117
0
        publish!(
118
0
            "write_requests_total",
119
0
            &self.write_requests_total,
120
0
            MetricKind::Counter,
121
0
            "Total write requests received"
122
        );
123
0
        publish!(
124
0
            "write_requests_success",
125
0
            &self.write_requests_success,
126
0
            MetricKind::Counter,
127
0
            "Total successful write requests"
128
        );
129
0
        publish!(
130
0
            "write_requests_failure",
131
0
            &self.write_requests_failure,
132
0
            MetricKind::Counter,
133
0
            "Total failed write requests"
134
        );
135
0
        publish!(
136
0
            "read_requests_total",
137
0
            &self.read_requests_total,
138
0
            MetricKind::Counter,
139
0
            "Total read requests received"
140
        );
141
0
        publish!(
142
0
            "read_requests_success",
143
0
            &self.read_requests_success,
144
0
            MetricKind::Counter,
145
0
            "Total successful read requests"
146
        );
147
0
        publish!(
148
0
            "read_requests_failure",
149
0
            &self.read_requests_failure,
150
0
            MetricKind::Counter,
151
0
            "Total failed read requests"
152
        );
153
0
        publish!(
154
0
            "query_write_status_total",
155
0
            &self.query_write_status_total,
156
0
            MetricKind::Counter,
157
0
            "Total query_write_status requests"
158
        );
159
0
        publish!(
160
0
            "bytes_written_total",
161
0
            &self.bytes_written_total,
162
0
            MetricKind::Counter,
163
0
            "Total bytes written via ByteStream"
164
        );
165
0
        publish!(
166
0
            "bytes_read_total",
167
0
            &self.bytes_read_total,
168
0
            MetricKind::Counter,
169
0
            "Total bytes read via ByteStream"
170
        );
171
0
        publish!(
172
0
            "write_duration_ns",
173
0
            &self.write_duration_ns,
174
0
            MetricKind::Counter,
175
0
            "Sum of write durations in nanoseconds"
176
        );
177
0
        publish!(
178
0
            "read_duration_ns",
179
0
            &self.read_duration_ns,
180
0
            MetricKind::Counter,
181
0
            "Sum of read durations in nanoseconds"
182
        );
183
0
        publish!(
184
0
            "uuid_collisions",
185
0
            &self.uuid_collisions,
186
0
            MetricKind::Counter,
187
0
            "Number of UUID collisions detected"
188
        );
189
0
        publish!(
190
0
            "resumed_uploads",
191
0
            &self.resumed_uploads,
192
0
            MetricKind::Counter,
193
0
            "Number of resumed uploads"
194
        );
195
0
        publish!(
196
0
            "idle_stream_timeouts",
197
0
            &self.idle_stream_timeouts,
198
0
            MetricKind::Counter,
199
0
            "Number of idle streams that timed out"
200
        );
201
202
0
        Ok(MetricPublishKnownKindData::Component)
203
0
    }
204
}
205
206
type BytesWrittenAndIdleStream = (Arc<AtomicU64>, Option<IdleStream>);
207
208
/// Type alias for the UUID key used in `active_uploads` `HashMap`.
209
/// Using u128 instead of String reduces memory allocations and improves
210
/// cache locality for `HashMap` operations.
211
type UuidKey = u128;
212
213
/// Parse a UUID string to a u128 for use as a `HashMap` key.
214
/// This avoids heap allocation for String keys and improves `HashMap` performance.
215
/// Falls back to hashing the string if it's not a valid hex UUID.
216
#[inline]
217
19
fn parse_uuid_to_key(uuid_str: &str) -> UuidKey {
218
    // UUIDs are typically 32 hex chars (128 bits) or 36 chars with dashes.
219
    // We'll try to parse as hex first, then fall back to hashing.
220
19
    let clean: String = uuid_str.chars().filter(char::is_ascii_hexdigit).collect();
221
19
    if clean.len() >= 16 {
  Branch (221:8): [True: 19, False: 0]
  Branch (221:8): [Folded - Ignored]
222
        // Take up to 32 hex chars (128 bits)
223
19
        let hex_str = if clean.len() > 32 {
  Branch (223:26): [True: 0, False: 19]
  Branch (223:26): [Folded - Ignored]
224
0
            &clean[..32]
225
        } else {
226
19
            &clean
227
        };
228
19
        u128::from_str_radix(hex_str, 16).unwrap_or_else(|_| 
{0
229
            // Hash fallback for non-hex strings
230
            use core::hash::{Hash, Hasher};
231
0
            let mut hasher = std::collections::hash_map::DefaultHasher::new();
232
0
            uuid_str.hash(&mut hasher);
233
0
            u128::from(hasher.finish())
234
0
        })
235
    } else {
236
        // Short strings: use hash
237
        use core::hash::{Hash, Hasher};
238
0
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
239
0
        uuid_str.hash(&mut hasher);
240
0
        u128::from(hasher.finish())
241
    }
242
19
}
243
244
pub struct InstanceInfo {
245
    store: Store,
246
    // Max number of bytes to send on each grpc stream chunk.
247
    max_bytes_per_stream: usize,
248
    /// Active uploads keyed by UUID as u128 for better performance.
249
    /// Using u128 keys instead of String reduces heap allocations
250
    /// and improves `HashMap` lookup performance.
251
    active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>>,
252
    /// How long to keep idle streams before timing them out.
253
    idle_stream_timeout: Duration,
254
    metrics: Arc<ByteStreamMetrics>,
255
    /// Handle to the global sweeper task. Kept alive for the lifetime of the instance.
256
    _sweeper_handle: Arc<JoinHandleDropGuard<()>>,
257
}
258
259
impl Debug for InstanceInfo {
260
0
    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
261
0
        f.debug_struct("InstanceInfo")
262
0
            .field("store", &self.store)
263
0
            .field("max_bytes_per_stream", &self.max_bytes_per_stream)
264
0
            .field("active_uploads", &self.active_uploads)
265
0
            .field("idle_stream_timeout", &self.idle_stream_timeout)
266
0
            .field("metrics", &self.metrics)
267
0
            .finish()
268
0
    }
269
}
270
271
type ReadStream = Pin<Box<dyn Stream<Item = Result<ReadResponse, Status>> + Send + 'static>>;
272
type StoreUpdateFuture = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
273
274
struct StreamState {
275
    uuid: UuidKey,
276
    tx: DropCloserWriteHalf,
277
    store_update_fut: StoreUpdateFuture,
278
}
279
280
impl Debug for StreamState {
281
0
    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
282
0
        f.debug_struct("StreamState")
283
0
            .field("uuid", &format!("{:032x}", self.uuid))
284
0
            .finish()
285
0
    }
286
}
287
288
/// If a stream is in this state, it will automatically be put back into an `IdleStream` and
289
/// placed back into the `active_uploads` map as an `IdleStream` after it is dropped.
290
/// To prevent it from being put back into an `IdleStream` you must call `.graceful_finish()`.
291
struct ActiveStreamGuard {
292
    stream_state: Option<StreamState>,
293
    bytes_received: Arc<AtomicU64>,
294
    active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>>,
295
    metrics: Arc<ByteStreamMetrics>,
296
}
297
298
impl ActiveStreamGuard {
299
    /// Consumes the guard. The stream will be considered "finished", will
300
    /// remove it from the `active_uploads`.
301
6
    fn graceful_finish(mut self) {
302
6
        let stream_state = self.stream_state.take().unwrap();
303
6
        self.active_uploads.lock().remove(&stream_state.uuid);
304
        // Decrement active uploads counter on successful completion
305
6
        self.metrics.active_uploads.fetch_sub(1, Ordering::Relaxed);
306
6
    }
307
}
308
309
impl Drop for ActiveStreamGuard {
310
11
    fn drop(&mut self) {
311
11
        let Some(
stream_state5
) = self.stream_state.take() else {
  Branch (311:13): [True: 5, False: 6]
  Branch (311:13): [Folded - Ignored]
312
6
            return; // If None it means we don't want it put back into an IdleStream.
313
        };
314
5
        let mut active_uploads = self.active_uploads.lock();
315
5
        let uuid = stream_state.uuid; // u128 is Copy, no clone needed
316
5
        let Some(active_uploads_slot) = active_uploads.get_mut(&uuid) else {
  Branch (316:13): [True: 5, False: 0]
  Branch (316:13): [Folded - Ignored]
317
0
            error!(
318
                err = "Failed to find active upload. This should never happen.",
319
0
                uuid = format!("{:032x}", uuid),
320
            );
321
0
            return;
322
        };
323
        // Mark stream as idle with current timestamp.
324
        // The global sweeper will clean it up after idle_stream_timeout.
325
        // This avoids spawning a task per stream, reducing overhead from O(n) to O(1).
326
5
        active_uploads_slot.1 = Some(IdleStream {
327
5
            stream_state,
328
5
            idle_since: Instant::now(),
329
5
        });
330
11
    }
331
}
332
333
/// Represents a stream that is in the "idle" state. this means it is not currently being used
334
/// by a client. If it is not used within a certain amount of time it will be removed from the
335
/// `active_uploads` map automatically by the global sweeper task.
336
#[derive(Debug)]
337
struct IdleStream {
338
    stream_state: StreamState,
339
    /// When this stream became idle. Used by the global sweeper to determine expiration.
340
    idle_since: Instant,
341
}
342
343
impl IdleStream {
344
3
    fn into_active_stream(
345
3
        self,
346
3
        bytes_received: Arc<AtomicU64>,
347
3
        instance_info: &InstanceInfo,
348
3
    ) -> ActiveStreamGuard {
349
3
        ActiveStreamGuard {
350
3
            stream_state: Some(self.stream_state),
351
3
            bytes_received,
352
3
            active_uploads: instance_info.active_uploads.clone(),
353
3
            metrics: instance_info.metrics.clone(),
354
3
        }
355
3
    }
356
}
357
358
#[derive(Debug)]
359
pub struct ByteStreamServer {
360
    instance_infos: HashMap<InstanceName, InstanceInfo>,
361
}
362
363
impl ByteStreamServer {
364
    /// Generate a unique UUID key by `XOR`ing the base key with a nanosecond timestamp.
365
    /// This ensures virtually zero collision probability while being O(1).
366
0
    fn generate_unique_uuid_key(base_key: UuidKey) -> UuidKey {
367
0
        let timestamp = SystemTime::now()
368
0
            .duration_since(UNIX_EPOCH)
369
0
            .unwrap_or_default()
370
0
            .as_nanos();
371
        // XOR with timestamp to create unique key
372
0
        base_key ^ timestamp
373
0
    }
374
375
15
    pub fn new(
376
15
        configs: &[WithInstanceName<ByteStreamConfig>],
377
15
        store_manager: &StoreManager,
378
15
    ) -> Result<Self, Error> {
379
15
        let mut instance_infos: HashMap<String, InstanceInfo> = HashMap::new();
380
30
        for 
config15
in configs {
381
15
            let idle_stream_timeout = if config.persist_stream_on_disconnect_timeout == 0 {
  Branch (381:42): [True: 15, False: 0]
  Branch (381:42): [Folded - Ignored]
382
15
                DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT
383
            } else {
384
0
                Duration::from_secs(config.persist_stream_on_disconnect_timeout as u64)
385
            };
386
15
            let _old_value = instance_infos.insert(
387
15
                config.instance_name.clone(),
388
15
                Self::new_with_timeout(config, store_manager, idle_stream_timeout)
?0
,
389
            );
390
        }
391
15
        Ok(Self { instance_infos })
392
15
    }
393
394
15
    pub fn new_with_timeout(
395
15
        config: &WithInstanceName<ByteStreamConfig>,
396
15
        store_manager: &StoreManager,
397
15
        idle_stream_timeout: Duration,
398
15
    ) -> Result<InstanceInfo, Error> {
399
15
        let store = store_manager
400
15
            .get_store(&config.cas_store)
401
15
            .ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", 
config.cas_store0
))
?0
;
402
15
        let max_bytes_per_stream = if config.max_bytes_per_stream == 0 {
  Branch (402:39): [True: 1, False: 14]
  Branch (402:39): [Folded - Ignored]
403
1
            DEFAULT_MAX_BYTES_PER_STREAM
404
        } else {
405
14
            config.max_bytes_per_stream
406
        };
407
408
15
        let active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>> =
409
15
            Arc::new(Mutex::new(HashMap::new()));
410
15
        let metrics = Arc::new(ByteStreamMetrics::default());
411
412
        // Spawn a single global sweeper task that periodically cleans up expired idle streams.
413
        // This replaces per-stream timeout tasks, reducing task spawn overhead from O(n) to O(1).
414
15
        let sweeper_active_uploads = Arc::downgrade(&active_uploads);
415
15
        let sweeper_metrics = Arc::downgrade(&metrics);
416
15
        let sweep_interval = idle_stream_timeout / 2; // Check every half-timeout period
417
15
        let sweeper_handle = spawn!("bytestream_idle_stream_sweeper", async move 
{12
418
            loop {
419
12
                sleep(sweep_interval).await;
420
421
0
                let Some(active_uploads) = sweeper_active_uploads.upgrade() else {
  Branch (421:21): [True: 0, False: 0]
  Branch (421:21): [Folded - Ignored]
422
                    // InstanceInfo has been dropped, exit the sweeper
423
0
                    break;
424
                };
425
0
                let metrics = sweeper_metrics.upgrade();
426
427
0
                let now = Instant::now();
428
0
                let mut expired_count = 0u64;
429
430
                // Lock and sweep expired entries
431
                {
432
0
                    let mut uploads = active_uploads.lock();
433
0
                    uploads.retain(|uuid, (_, maybe_idle)| {
434
0
                        if let Some(idle_stream) = maybe_idle {
  Branch (434:32): [True: 0, False: 0]
  Branch (434:32): [Folded - Ignored]
435
0
                            if now.duration_since(idle_stream.idle_since) >= idle_stream_timeout {
  Branch (435:32): [True: 0, False: 0]
  Branch (435:32): [Folded - Ignored]
436
0
                                info!(
437
                                    msg = "Sweeping expired idle stream",
438
0
                                    uuid = format!("{:032x}", uuid)
439
                                );
440
0
                                expired_count += 1;
441
0
                                return false; // Remove this entry
442
0
                            }
443
0
                        }
444
0
                        true // Keep this entry
445
0
                    });
446
                }
447
448
                // Update metrics outside the lock
449
0
                if expired_count > 0 {
  Branch (449:20): [True: 0, False: 0]
  Branch (449:20): [Folded - Ignored]
450
0
                    if let Some(m) = &metrics {
  Branch (450:28): [True: 0, False: 0]
  Branch (450:28): [Folded - Ignored]
451
0
                        m.idle_stream_timeouts
452
0
                            .fetch_add(expired_count, Ordering::Relaxed);
453
0
                        m.active_uploads.fetch_sub(expired_count, Ordering::Relaxed);
454
0
                    }
455
0
                    trace!(
456
                        msg = "Sweeper cleaned up expired streams",
457
                        count = expired_count
458
                    );
459
0
                }
460
            }
461
0
        });
462
463
15
        Ok(InstanceInfo {
464
15
            store,
465
15
            max_bytes_per_stream,
466
15
            active_uploads,
467
15
            idle_stream_timeout,
468
15
            metrics,
469
15
            _sweeper_handle: Arc::new(sweeper_handle),
470
15
        })
471
15
    }
472
473
1
    pub fn into_service(self) -> Server<Self> {
474
1
        Server::new(self)
475
1
    }
476
477
    /// Creates or joins an upload stream for the given UUID.
478
    ///
479
    /// This function handles three scenarios:
480
    /// 1. UUID doesn't exist - creates a new upload stream
481
    /// 2. UUID exists but is idle - resumes the existing stream
482
    /// 3. UUID exists and is active - generates a unique UUID by appending a nanosecond
483
    ///    timestamp to avoid collision, then creates a new stream with that UUID
484
    ///
485
    /// The nanosecond timestamp ensures virtually zero probability of collision since
486
    /// two concurrent uploads would need to both collide on the original UUID AND
487
    /// generate the unique UUID in the exact same nanosecond.
488
11
    fn create_or_join_upload_stream(
489
11
        &self,
490
11
        uuid_str: &str,
491
11
        instance: &InstanceInfo,
492
11
        digest: DigestInfo,
493
11
    ) -> ActiveStreamGuard {
494
        // Parse UUID string to u128 key for efficient HashMap operations
495
11
        let uuid_key = parse_uuid_to_key(uuid_str);
496
497
8
        let (uuid, bytes_received, is_collision) =
498
11
            match instance.active_uploads.lock().entry(uuid_key) {
499
3
                Entry::Occupied(mut entry) => {
500
3
                    let maybe_idle_stream = entry.get_mut();
501
3
                    if let Some(idle_stream) = maybe_idle_stream.1.take() {
  Branch (501:28): [True: 3, False: 0]
  Branch (501:28): [Folded - Ignored]
502
                        // Case 2: Stream exists but is idle, we can resume it
503
3
                        let bytes_received = maybe_idle_stream.0.clone();
504
3
                        info!(
505
                            msg = "Joining existing stream",
506
3
                            uuid = format!("{:032x}", entry.key())
507
                        );
508
                        // Track resumed upload
509
3
                        instance
510
3
                            .metrics
511
3
                            .resumed_uploads
512
3
                            .fetch_add(1, Ordering::Relaxed);
513
3
                        return idle_stream.into_active_stream(bytes_received, instance);
514
0
                    }
515
                    // Case 3: Stream is active - generate a unique UUID to avoid collision
516
                    // Using nanosecond timestamp makes collision probability essentially zero
517
0
                    let original_key = *entry.key();
518
0
                    let unique_key = Self::generate_unique_uuid_key(original_key);
519
0
                    warn!(
520
                        msg = "UUID collision detected, generating unique UUID to prevent conflict",
521
0
                        original_uuid = format!("{:032x}", original_key),
522
0
                        unique_uuid = format!("{:032x}", unique_key)
523
                    );
524
                    // Entry goes out of scope here, releasing the lock
525
526
0
                    let bytes_received = Arc::new(AtomicU64::new(0));
527
0
                    let mut active_uploads = instance.active_uploads.lock();
528
                    // Insert with the unique UUID - this should never collide due to nanosecond precision
529
0
                    active_uploads.insert(unique_key, (bytes_received.clone(), None));
530
0
                    (unique_key, bytes_received, true)
531
                }
532
8
                Entry::Vacant(entry) => {
533
                    // Case 1: UUID doesn't exist, create new stream
534
8
                    let bytes_received = Arc::new(AtomicU64::new(0));
535
8
                    let uuid = *entry.key();
536
                    // Our stream is "in use" if the key is in the map, but the value is None.
537
8
                    entry.insert((bytes_received.clone(), None));
538
8
                    (uuid, bytes_received, false)
539
                }
540
            };
541
542
        // Track metrics for new upload
543
8
        instance
544
8
            .metrics
545
8
            .active_uploads
546
8
            .fetch_add(1, Ordering::Relaxed);
547
8
        if is_collision {
  Branch (547:12): [True: 0, False: 8]
  Branch (547:12): [Folded - Ignored]
548
0
            instance
549
0
                .metrics
550
0
                .uuid_collisions
551
0
                .fetch_add(1, Ordering::Relaxed);
552
8
        }
553
554
        // Important: Do not return an error from this point onwards without
555
        // removing the entry from the map, otherwise that UUID becomes
556
        // unusable.
557
558
8
        let (tx, rx) = make_buf_channel_pair();
559
8
        let store = instance.store.clone();
560
8
        let store_update_fut = Box::pin(async move 
{6
561
            // We need to wrap `Store::update()` in a another future because we need to capture
562
            // `store` to ensure its lifetime follows the future and not the caller.
563
6
            store
564
6
                // Bytestream always uses digest size as the actual byte size.
565
6
                .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes()))
566
6
                .await
567
6
        });
568
8
        ActiveStreamGuard {
569
8
            stream_state: Some(StreamState {
570
8
                uuid,
571
8
                tx,
572
8
                store_update_fut,
573
8
            }),
574
8
            bytes_received,
575
8
            active_uploads: instance.active_uploads.clone(),
576
8
            metrics: instance.metrics.clone(),
577
8
        }
578
11
    }
579
580
3
    async fn inner_read(
581
3
        &self,
582
3
        instance: &InstanceInfo,
583
3
        digest: DigestInfo,
584
3
        read_request: ReadRequest,
585
3
    ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + Send + use<>, Error> {
586
        struct ReaderState {
587
            max_bytes_per_stream: usize,
588
            rx: DropCloserReadHalf,
589
            maybe_get_part_result: Option<Result<(), Error>>,
590
            get_part_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
591
        }
592
593
3
        let read_limit = u64::try_from(read_request.read_limit)
594
3
            .err_tip(|| "Could not convert read_limit to u64")
?0
;
595
596
3
        let (tx, rx) = make_buf_channel_pair();
597
598
3
        let read_limit = if read_limit != 0 {
  Branch (598:29): [True: 3, False: 0]
  Branch (598:29): [Folded - Ignored]
599
3
            Some(read_limit)
600
        } else {
601
0
            None
602
        };
603
604
        // This allows us to call a destructor when the the object is dropped.
605
3
        let store = instance.store.clone();
606
3
        let state = Some(ReaderState {
607
3
            rx,
608
3
            max_bytes_per_stream: instance.max_bytes_per_stream,
609
3
            maybe_get_part_result: None,
610
3
            get_part_fut: Box::pin(async move {
611
3
                store
612
3
                    .get_part(
613
3
                        digest,
614
3
                        tx,
615
3
                        u64::try_from(read_request.read_offset)
616
3
                            .err_tip(|| "Could not convert read_offset to u64")
?0
,
617
3
                        read_limit,
618
                    )
619
3
                    .await
620
3
            }),
621
        });
622
623
3
        let read_stream_span = error_span!("read_stream");
624
625
9.77k
        Ok(
Box::pin3
(
unfold3
(
state3
, move |state| {
626
9.77k
            async {
627
9.77k
            let mut state = state
?0
; // If None our stream is done.
628
9.77k
            let mut response = ReadResponse::default();
629
            {
630
9.77k
                let consume_fut = state.rx.consume(Some(state.max_bytes_per_stream));
631
9.77k
                tokio::pin!(consume_fut);
632
                loop {
633
9.77k
                    tokio::select! {
634
9.77k
                        
read_result9.77k
= &mut consume_fut => {
635
9.77k
                            match read_result {
636
9.76k
                                Ok(bytes) => {
637
9.76k
                                    if bytes.is_empty() {
  Branch (637:40): [True: 2, False: 9.76k]
  Branch (637:40): [Folded - Ignored]
638
                                        // EOF.
639
2
                                        return None;
640
9.76k
                                    }
641
9.76k
                                    if bytes.len() > state.max_bytes_per_stream {
  Branch (641:40): [True: 0, False: 9.76k]
  Branch (641:40): [Folded - Ignored]
642
0
                                        let err = make_err!(Code::Internal, "Returned store size was larger than read size");
643
0
                                        return Some((Err(err.into()), None));
644
9.76k
                                    }
645
9.76k
                                    response.data = bytes;
646
9.76k
                                    trace!(response = ?response);
647
9.76k
                                    debug!(response.data = format!("<redacted len({})>", response.data.len()));
648
9.76k
                                    break;
649
                                }
650
1
                                Err(mut e) => {
651
                                    // We may need to propagate the error from reading the data through first.
652
                                    // For example, the NotFound error will come through `get_part_fut`, and
653
                                    // will not be present in `e`, but we need to ensure we pass NotFound error
654
                                    // code or the client won't know why it failed.
655
1
                                    let get_part_result = if let Some(result) = state.maybe_get_part_result {
  Branch (655:66): [True: 1, False: 0]
  Branch (655:66): [Folded - Ignored]
656
1
                                        result
657
                                    } else {
658
                                        // This should never be `future::pending()` if maybe_get_part_result is
659
                                        // not set.
660
0
                                        state.get_part_fut.await
661
                                    };
662
1
                                    if let Err(err) = get_part_result {
  Branch (662:44): [True: 1, False: 0]
  Branch (662:44): [Folded - Ignored]
663
1
                                        e = err.merge(e);
664
1
                                    
}0
665
1
                                    if e.code == Code::NotFound {
  Branch (665:40): [True: 1, False: 0]
  Branch (665:40): [Folded - Ignored]
666
1
                                        // Trim the error code. Not Found is quite common and we don't want to send a large
667
1
                                        // error (debug) message for something that is common. We resize to just the last
668
1
                                        // message as it will be the most relevant.
669
1
                                        e.messages.truncate(1);
670
1
                                    
}0
671
1
                                    error!(response = ?e);
672
1
                                    return Some((Err(e.into()), None))
673
                                }
674
                            }
675
                        },
676
9.77k
                        
result3
= &mut state.get_part_fut => {
677
3
                            state.maybe_get_part_result = Some(result);
678
3
                            // It is non-deterministic on which future will finish in what order.
679
3
                            // It is also possible that the `state.rx.consume()` call above may not be able to
680
3
                            // respond even though the publishing future is done.
681
3
                            // Because of this we set the writing future to pending so it never finishes.
682
3
                            // The `state.rx.consume()` future will eventually finish and return either the
683
3
                            // data or an error.
684
3
                            // An EOF will terminate the `state.rx.consume()` future, but we are also protected
685
3
                            // because we are dropping the writing future, it will drop the `tx` channel
686
3
                            // which will eventually propagate an error to the `state.rx.consume()` future if
687
3
                            // the EOF was not sent due to some other error.
688
3
                            state.get_part_fut = Box::pin(pending());
689
3
                        },
690
                    }
691
                }
692
            }
693
9.76k
            Some((Ok(response), Some(state)))
694
9.77k
        }.instrument(read_stream_span.clone())
695
9.77k
        })))
696
3
    }
697
698
    // We instrument tracing here as well as below because `stream` has a hash on it
699
    // that is extracted from the first stream message. If we only implemented it below
700
    // we would not have the hash available to us.
701
    #[instrument(
702
        ret(level = Level::DEBUG),
703
        level = Level::ERROR,
704
        skip(self, instance_info),
705
    )]
706
11
    async fn inner_write(
707
11
        &self,
708
11
        instance_info: &InstanceInfo,
709
11
        digest: DigestInfo,
710
11
        stream: WriteRequestStreamWrapper<impl Stream<Item = Result<WriteRequest, Status>> + Unpin>,
711
11
    ) -> Result<Response<WriteResponse>, Error> {
712
11
        async fn process_client_stream(
713
11
            mut stream: WriteRequestStreamWrapper<
714
11
                impl Stream<Item = Result<WriteRequest, Status>> + Unpin,
715
11
            >,
716
11
            tx: &mut DropCloserWriteHalf,
717
11
            outer_bytes_received: &Arc<AtomicU64>,
718
11
            expected_size: u64,
719
11
        ) -> Result<(), Error> {
720
            loop {
721
21
                let 
write_request17
= match stream.next().await {
722
                    // Code path for when client tries to gracefully close the stream.
723
                    // If this happens it means there's a problem with the data sent,
724
                    // because we always close the stream from our end before this point
725
                    // by counting the number of bytes sent from the client. If they send
726
                    // less than the amount they said they were going to send and then
727
                    // close the stream, we know there's a problem.
728
                    None => {
729
0
                        return Err(make_input_err!(
730
0
                            "Client closed stream before sending all data"
731
0
                        ));
732
                    }
733
                    // Code path for client stream error. Probably client disconnect.
734
4
                    Some(Err(err)) => return Err(err),
735
                    // Code path for received chunk of data.
736
17
                    Some(Ok(write_request)) => write_request,
737
                };
738
739
17
                if write_request.write_offset < 0 {
  Branch (739:20): [True: 0, False: 17]
  Branch (739:20): [Folded - Ignored]
740
0
                    return Err(make_input_err!(
741
0
                        "Invalid negative write offset in write request: {}",
742
0
                        write_request.write_offset
743
0
                    ));
744
17
                }
745
17
                let write_offset = write_request.write_offset as u64;
746
747
                // If we get duplicate data because a client didn't know where
748
                // it left off from, then we can simply skip it.
749
17
                let 
data16
= if write_offset < tx.get_bytes_written() {
  Branch (749:31): [True: 2, False: 15]
  Branch (749:31): [Folded - Ignored]
750
2
                    if (write_offset + write_request.data.len() as u64) < tx.get_bytes_written() {
  Branch (750:24): [True: 0, False: 2]
  Branch (750:24): [Folded - Ignored]
751
0
                        if write_request.finish_write {
  Branch (751:28): [True: 0, False: 0]
  Branch (751:28): [Folded - Ignored]
752
0
                            return Err(make_input_err!(
753
0
                                "Resumed stream finished at {} bytes when we already received {} bytes.",
754
0
                                write_offset + write_request.data.len() as u64,
755
0
                                tx.get_bytes_written()
756
0
                            ));
757
0
                        }
758
0
                        continue;
759
2
                    }
760
2
                    write_request.data.slice(
761
2
                        usize::try_from(tx.get_bytes_written() - write_offset)
762
2
                            .unwrap_or(usize::MAX)..,
763
                    )
764
                } else {
765
15
                    if write_offset != tx.get_bytes_written() {
  Branch (765:24): [True: 1, False: 14]
  Branch (765:24): [Folded - Ignored]
766
1
                        return Err(make_input_err!(
767
1
                            "Received out of order data. Got {}, expected {}",
768
1
                            write_offset,
769
1
                            tx.get_bytes_written()
770
1
                        ));
771
14
                    }
772
14
                    write_request.data
773
                };
774
775
                // Do not process EOF or weird stuff will happen.
776
16
                if !data.is_empty() {
  Branch (776:20): [True: 12, False: 4]
  Branch (776:20): [Folded - Ignored]
777
                    // We also need to process the possible EOF branch, so we can't early return.
778
12
                    if let Err(
mut err0
) = tx.send(data).await {
  Branch (778:28): [True: 0, False: 12]
  Branch (778:28): [Folded - Ignored]
779
0
                        err.code = Code::Internal;
780
0
                        return Err(err);
781
12
                    }
782
12
                    outer_bytes_received.store(tx.get_bytes_written(), Ordering::Release);
783
4
                }
784
785
16
                if expected_size < tx.get_bytes_written() {
  Branch (785:20): [True: 0, False: 16]
  Branch (785:20): [Folded - Ignored]
786
0
                    return Err(make_input_err!("Received more bytes than expected"));
787
16
                }
788
16
                if write_request.finish_write {
  Branch (788:20): [True: 6, False: 10]
  Branch (788:20): [Folded - Ignored]
789
                    // Gracefully close our stream.
790
6
                    tx.send_eof()
791
6
                        .err_tip(|| "Failed to send EOF in ByteStream::write")
?0
;
792
6
                    return Ok(());
793
10
                }
794
                // Continue.
795
            }
796
            // Unreachable.
797
11
        }
798
799
        let uuid = stream
800
            .resource_info
801
            .uuid
802
            .as_ref()
803
            .ok_or_else(|| make_input_err!("UUID must be set if writing data"))?;
804
        let mut active_stream_guard =
805
            self.create_or_join_upload_stream(uuid, instance_info, digest);
806
        let expected_size = stream.resource_info.expected_size as u64;
807
808
        let active_stream = active_stream_guard.stream_state.as_mut().unwrap();
809
        try_join!(
810
            process_client_stream(
811
                stream,
812
                &mut active_stream.tx,
813
                &active_stream_guard.bytes_received,
814
                expected_size
815
            ),
816
            (&mut active_stream.store_update_fut)
817
0
                .map_err(|err| { err.append("Error updating inner store") })
818
        )?;
819
820
        // Close our guard and consider the stream no longer active.
821
        active_stream_guard.graceful_finish();
822
823
        Ok(Response::new(WriteResponse {
824
            committed_size: expected_size as i64,
825
        }))
826
11
    }
827
828
    /// Fast-path write that bypasses channel overhead for stores that support direct Bytes updates.
829
    /// This buffers all data in memory and calls `update_oneshot` directly.
830
4
    async fn inner_write_oneshot(
831
4
        &self,
832
4
        instance_info: &InstanceInfo,
833
4
        digest: DigestInfo,
834
4
        mut stream: WriteRequestStreamWrapper<
835
4
            impl Stream<Item = Result<WriteRequest, Status>> + Unpin,
836
4
        >,
837
4
    ) -> Result<Response<WriteResponse>, Error> {
838
4
        let expected_size = stream.resource_info.expected_size as u64;
839
840
        // Pre-allocate buffer for expected size (capped at reasonable limit to prevent DoS)
841
4
        let capacity =
842
4
            usize::try_from(expected_size.min(64 * 1024 * 1024)).unwrap_or(64 * 1024 * 1024);
843
4
        let mut buffer = BytesMut::with_capacity(capacity);
844
4
        let mut bytes_received: u64 = 0;
845
846
        // Collect all data from client stream
847
        loop {
848
4
            let 
write_request3
= match stream.next().await {
849
                None => {
850
0
                    return Err(make_input_err!(
851
0
                        "Client closed stream before sending all data"
852
0
                    ));
853
                }
854
1
                Some(Err(err)) => return Err(err),
855
3
                Some(Ok(write_request)) => write_request,
856
            };
857
858
3
            if write_request.write_offset < 0 {
  Branch (858:16): [True: 1, False: 2]
  Branch (858:16): [Folded - Ignored]
859
1
                return Err(make_input_err!(
860
1
                    "Invalid negative write offset in write request: {}",
861
1
                    write_request.write_offset
862
1
                ));
863
2
            }
864
2
            let write_offset = write_request.write_offset as u64;
865
866
            // Handle duplicate/resumed data
867
2
            let data = if write_offset < bytes_received {
  Branch (867:27): [True: 0, False: 2]
  Branch (867:27): [Folded - Ignored]
868
0
                if (write_offset + write_request.data.len() as u64) < bytes_received {
  Branch (868:20): [True: 0, False: 0]
  Branch (868:20): [Folded - Ignored]
869
0
                    if write_request.finish_write {
  Branch (869:24): [True: 0, False: 0]
  Branch (869:24): [Folded - Ignored]
870
0
                        return Err(make_input_err!(
871
0
                            "Resumed stream finished at {} bytes when we already received {} bytes.",
872
0
                            write_offset + write_request.data.len() as u64,
873
0
                            bytes_received
874
0
                        ));
875
0
                    }
876
0
                    continue;
877
0
                }
878
0
                write_request
879
0
                    .data
880
0
                    .slice(usize::try_from(bytes_received - write_offset).unwrap_or(usize::MAX)..)
881
            } else {
882
2
                if write_offset != bytes_received {
  Branch (882:20): [True: 0, False: 2]
  Branch (882:20): [Folded - Ignored]
883
0
                    return Err(make_input_err!(
884
0
                        "Received out of order data. Got {}, expected {}",
885
0
                        write_offset,
886
0
                        bytes_received
887
0
                    ));
888
2
                }
889
2
                write_request.data
890
            };
891
892
2
            if !data.is_empty() {
  Branch (892:16): [True: 1, False: 1]
  Branch (892:16): [Folded - Ignored]
893
1
                buffer.extend_from_slice(&data);
894
1
                bytes_received += data.len() as u64;
895
1
            }
896
897
2
            if expected_size < bytes_received {
  Branch (897:16): [True: 0, False: 2]
  Branch (897:16): [Folded - Ignored]
898
0
                return Err(make_input_err!("Received more bytes than expected"));
899
2
            }
900
901
2
            if write_request.finish_write {
  Branch (901:16): [True: 2, False: 0]
  Branch (901:16): [Folded - Ignored]
902
2
                break;
903
0
            }
904
        }
905
906
        // Direct update without channel overhead
907
2
        let store = instance_info.store.clone();
908
2
        store
909
2
            .update_oneshot(digest, buffer.freeze())
910
2
            .await
911
2
            .err_tip(|| "Error in update_oneshot")
?0
;
912
913
        // Note: bytes_written_total is updated in the caller (bytestream_write) based on result
914
915
2
        Ok(Response::new(WriteResponse {
916
2
            committed_size: expected_size as i64,
917
2
        }))
918
4
    }
919
920
3
    async fn inner_query_write_status(
921
3
        &self,
922
3
        query_request: &QueryWriteStatusRequest,
923
3
    ) -> Result<Response<QueryWriteStatusResponse>, Error> {
924
3
        let mut resource_info = ResourceInfo::new(&query_request.resource_name, true)
?0
;
925
926
3
        let instance = self
927
3
            .instance_infos
928
3
            .get(resource_info.instance_name.as_ref())
929
3
            .err_tip(|| 
{0
930
0
                format!(
931
0
                    "'instance_name' not configured for '{}'",
932
0
                    &resource_info.instance_name
933
                )
934
0
            })?;
935
3
        let store_clone = instance.store.clone();
936
937
3
        let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)
?0
;
938
939
        // If we are a GrpcStore we shortcut here, as this is a special store.
940
3
        if let Some(
grpc_store0
) = store_clone.downcast_ref::<GrpcStore>(Some(digest.into())) {
  Branch (940:16): [True: 0, False: 3]
  Branch (940:16): [Folded - Ignored]
941
0
            return grpc_store
942
0
                .query_write_status(Request::new(query_request.clone()))
943
0
                .await;
944
3
        }
945
946
3
        let uuid_str = resource_info
947
3
            .uuid
948
3
            .take()
949
3
            .ok_or_else(|| make_input_err!("UUID must be set if querying write status"))
?0
;
950
3
        let uuid_key = parse_uuid_to_key(&uuid_str);
951
952
        {
953
3
            let active_uploads = instance.active_uploads.lock();
954
3
            if let Some((
received_bytes1
,
_maybe_idle_stream1
)) = active_uploads.get(&uuid_key) {
  Branch (954:20): [True: 1, False: 2]
  Branch (954:20): [Folded - Ignored]
955
1
                return Ok(Response::new(QueryWriteStatusResponse {
956
1
                    committed_size: received_bytes.load(Ordering::Acquire) as i64,
957
1
                    // If we are in the active_uploads map, but the value is None,
958
1
                    // it means the stream is not complete.
959
1
                    complete: false,
960
1
                }));
961
2
            }
962
        }
963
964
2
        let has_fut = store_clone.has(digest);
965
2
        let Some(
item_size1
) = has_fut.await.err_tip(|| "Failed to call .has() on store")
?0
else {
  Branch (965:13): [True: 1, False: 1]
  Branch (965:13): [Folded - Ignored]
966
            // We lie here and say that the stream needs to start over, even though
967
            // it was never started. This can happen when the client disconnects
968
            // before sending the first payload, but the client thinks it did send
969
            // the payload.
970
1
            return Ok(Response::new(QueryWriteStatusResponse {
971
1
                committed_size: 0,
972
1
                complete: false,
973
1
            }));
974
        };
975
1
        Ok(Response::new(QueryWriteStatusResponse {
976
1
            committed_size: item_size as i64,
977
1
            complete: true,
978
1
        }))
979
3
    }
980
}
981
982
#[tonic::async_trait]
983
impl ByteStream for ByteStreamServer {
984
    type ReadStream = ReadStream;
985
986
    #[instrument(
987
        err,
988
        level = Level::ERROR,
989
        skip_all,
990
        fields(request = ?grpc_request.get_ref())
991
    )]
992
    async fn read(
993
        &self,
994
        grpc_request: Request<ReadRequest>,
995
    ) -> Result<Response<Self::ReadStream>, Status> {
996
        let start_time = Instant::now();
997
998
        let read_request = grpc_request.into_inner();
999
        let resource_info = ResourceInfo::new(&read_request.resource_name, false)?;
1000
        let instance_name = resource_info.instance_name.as_ref();
1001
        let expected_size = resource_info.expected_size as u64;
1002
        let instance = self
1003
            .instance_infos
1004
            .get(instance_name)
1005
0
            .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"))?;
1006
1007
        // Track read request
1008
        instance
1009
            .metrics
1010
            .read_requests_total
1011
            .fetch_add(1, Ordering::Relaxed);
1012
1013
        let store = instance.store.clone();
1014
1015
        let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?;
1016
1017
        // If we are a GrpcStore we shortcut here, as this is a special store.
1018
        if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
1019
            let stream = Box::pin(grpc_store.read(Request::new(read_request)).await?);
1020
            return Ok(Response::new(stream));
1021
        }
1022
1023
        let digest_function = resource_info.digest_function.as_deref().map_or_else(
1024
3
            || Ok(default_digest_hasher_func()),
1025
            DigestHasherFunc::try_from,
1026
        )?;
1027
1028
        let resp = self
1029
            .inner_read(instance, digest, read_request)
1030
            .instrument(error_span!("bytestream_read"))
1031
            .with_context(
1032
                make_ctx_for_hash_func(digest_function).err_tip(|| "In BytestreamServer::read")?,
1033
            )
1034
            .await
1035
            .err_tip(|| "In ByteStreamServer::read")
1036
3
            .map(|stream| -> Response<Self::ReadStream> { Response::new(Box::pin(stream)) });
1037
1038
        // Track metrics based on result
1039
        #[allow(clippy::cast_possible_truncation)]
1040
        let elapsed_ns = start_time.elapsed().as_nanos() as u64;
1041
        instance
1042
            .metrics
1043
            .read_duration_ns
1044
            .fetch_add(elapsed_ns, Ordering::Relaxed);
1045
1046
        match &resp {
1047
            Ok(_) => {
1048
                instance
1049
                    .metrics
1050
                    .read_requests_success
1051
                    .fetch_add(1, Ordering::Relaxed);
1052
                instance
1053
                    .metrics
1054
                    .bytes_read_total
1055
                    .fetch_add(expected_size, Ordering::Relaxed);
1056
                debug!(return = "Ok(<stream>)");
1057
            }
1058
            Err(_) => {
1059
                instance
1060
                    .metrics
1061
                    .read_requests_failure
1062
                    .fetch_add(1, Ordering::Relaxed);
1063
            }
1064
        }
1065
1066
        resp.map_err(Into::into)
1067
    }
1068
1069
    #[instrument(
1070
        err,
1071
        level = Level::ERROR,
1072
        skip_all,
1073
        fields(request = ?grpc_request.get_ref())
1074
    )]
1075
    async fn write(
1076
        &self,
1077
        grpc_request: Request<Streaming<WriteRequest>>,
1078
    ) -> Result<Response<WriteResponse>, Status> {
1079
        let start_time = Instant::now();
1080
1081
        let request = grpc_request.into_inner();
1082
        let stream = WriteRequestStreamWrapper::from(request)
1083
            .await
1084
            .err_tip(|| "Could not unwrap first stream message")
1085
            .map_err(Into::<Status>::into)?;
1086
1087
        let instance_name = stream.resource_info.instance_name.as_ref();
1088
        let expected_size = stream.resource_info.expected_size as u64;
1089
        let instance = self
1090
            .instance_infos
1091
            .get(instance_name)
1092
0
            .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"))?;
1093
1094
        // Track write request
1095
        instance
1096
            .metrics
1097
            .write_requests_total
1098
            .fetch_add(1, Ordering::Relaxed);
1099
1100
        let store = instance.store.clone();
1101
1102
        let digest = DigestInfo::try_new(
1103
            &stream.resource_info.hash,
1104
            stream.resource_info.expected_size,
1105
        )
1106
        .err_tip(|| "Invalid digest input in ByteStream::write")?;
1107
1108
        // If we are a GrpcStore we shortcut here, as this is a special store.
1109
        if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) {
1110
            let resp = grpc_store.write(stream).await.map_err(Into::into);
1111
            return resp;
1112
        }
1113
1114
        let digest_function = stream
1115
            .resource_info
1116
            .digest_function
1117
            .as_deref()
1118
            .map_or_else(
1119
15
                || Ok(default_digest_hasher_func()),
1120
                DigestHasherFunc::try_from,
1121
            )?;
1122
1123
        // Check if store supports direct oneshot updates (bypasses channel overhead).
1124
        // Use fast-path only when:
1125
        // 1. Store supports oneshot optimization
1126
        // 2. UUID is provided
1127
        // 3. Size is under 64MB (memory safety)
1128
        // 4. This is a NEW upload (UUID not already in active_uploads)
1129
        // 5. The first message has finish_write=true (single-shot upload)
1130
        //
1131
        // The oneshot path cannot be used for multi-message streams because:
1132
        // - QueryWriteStatus won't work (no progress tracking)
1133
        // - Resumed streams won't work (no partial progress)
1134
        let use_oneshot = if store.optimized_for(StoreOptimizations::SubscribesToUpdateOneshot)
1135
            && expected_size <= 64 * 1024 * 1024
1136
            && stream.resource_info.uuid.is_some()
1137
        {
1138
            // Check if first message completes the upload (single-shot)
1139
            let is_single_shot = stream.is_first_msg_complete();
1140
1141
            if is_single_shot {
1142
                let uuid_str = stream.resource_info.uuid.as_ref().unwrap();
1143
                let uuid_key = parse_uuid_to_key(uuid_str);
1144
                // Only use oneshot if this UUID is not already being tracked
1145
                !instance.active_uploads.lock().contains_key(&uuid_key)
1146
            } else {
1147
                false
1148
            }
1149
        } else {
1150
            false
1151
        };
1152
1153
        let result = if use_oneshot {
1154
            self.inner_write_oneshot(instance, digest, stream)
1155
                .instrument(error_span!("bytestream_write_oneshot"))
1156
                .with_context(
1157
                    make_ctx_for_hash_func(digest_function)
1158
                        .err_tip(|| "In BytestreamServer::write")?,
1159
                )
1160
                .await
1161
                .err_tip(|| "In ByteStreamServer::write (oneshot)")
1162
        } else {
1163
            self.inner_write(instance, digest, stream)
1164
                .instrument(error_span!("bytestream_write"))
1165
                .with_context(
1166
                    make_ctx_for_hash_func(digest_function)
1167
                        .err_tip(|| "In BytestreamServer::write")?,
1168
                )
1169
                .await
1170
                .err_tip(|| "In ByteStreamServer::write")
1171
        };
1172
1173
        // Track metrics based on result
1174
        #[allow(clippy::cast_possible_truncation)]
1175
        let elapsed_ns = start_time.elapsed().as_nanos() as u64;
1176
        instance
1177
            .metrics
1178
            .write_duration_ns
1179
            .fetch_add(elapsed_ns, Ordering::Relaxed);
1180
1181
        match &result {
1182
            Ok(_) => {
1183
                instance
1184
                    .metrics
1185
                    .write_requests_success
1186
                    .fetch_add(1, Ordering::Relaxed);
1187
                instance
1188
                    .metrics
1189
                    .bytes_written_total
1190
                    .fetch_add(expected_size, Ordering::Relaxed);
1191
            }
1192
            Err(_) => {
1193
                instance
1194
                    .metrics
1195
                    .write_requests_failure
1196
                    .fetch_add(1, Ordering::Relaxed);
1197
            }
1198
        }
1199
1200
        result.map_err(Into::into)
1201
    }
1202
1203
    #[instrument(
1204
        err,
1205
        ret(level = Level::INFO),
1206
        level = Level::ERROR,
1207
        skip_all,
1208
        fields(request = ?grpc_request.get_ref())
1209
    )]
1210
    async fn query_write_status(
1211
        &self,
1212
        grpc_request: Request<QueryWriteStatusRequest>,
1213
    ) -> Result<Response<QueryWriteStatusResponse>, Status> {
1214
        let request = grpc_request.into_inner();
1215
1216
        // Track query_write_status request - we need to parse the resource name to get the instance
1217
        if let Ok(resource_info) = ResourceInfo::new(&request.resource_name, true) {
1218
            if let Some(instance) = self
1219
                .instance_infos
1220
                .get(resource_info.instance_name.as_ref())
1221
            {
1222
                instance
1223
                    .metrics
1224
                    .query_write_status_total
1225
                    .fetch_add(1, Ordering::Relaxed);
1226
            }
1227
        }
1228
1229
        self.inner_query_write_status(&request)
1230
            .await
1231
            .err_tip(|| "Failed on query_write_status() command")
1232
            .map_err(Into::into)
1233
    }
1234
}