/build/source/nativelink-service/src/bytestream_server.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::convert::Into; |
16 | | use core::fmt::{Debug, Formatter}; |
17 | | use core::pin::Pin; |
18 | | use core::sync::atomic::{AtomicU64, Ordering}; |
19 | | use core::time::Duration; |
20 | | use std::collections::HashMap; |
21 | | use std::collections::hash_map::Entry; |
22 | | use std::sync::Arc; |
23 | | use std::time::{Instant, SystemTime, UNIX_EPOCH}; |
24 | | |
25 | | use bytes::BytesMut; |
26 | | use futures::future::pending; |
27 | | use futures::stream::unfold; |
28 | | use futures::{Future, Stream, TryFutureExt, try_join}; |
29 | | use nativelink_config::cas_server::{ByteStreamConfig, InstanceName, WithInstanceName}; |
30 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
31 | | use nativelink_metric::{ |
32 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, group, publish, |
33 | | }; |
34 | | use nativelink_proto::google::bytestream::byte_stream_server::{ |
35 | | ByteStream, ByteStreamServer as Server, |
36 | | }; |
37 | | use nativelink_proto::google::bytestream::{ |
38 | | QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, |
39 | | WriteResponse, |
40 | | }; |
41 | | use nativelink_store::grpc_store::GrpcStore; |
42 | | use nativelink_store::store_manager::StoreManager; |
43 | | use nativelink_util::buf_channel::{ |
44 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
45 | | }; |
46 | | use nativelink_util::common::DigestInfo; |
47 | | use nativelink_util::digest_hasher::{ |
48 | | DigestHasherFunc, default_digest_hasher_func, make_ctx_for_hash_func, |
49 | | }; |
50 | | use nativelink_util::proto_stream_utils::WriteRequestStreamWrapper; |
51 | | use nativelink_util::resource_info::ResourceInfo; |
52 | | use nativelink_util::spawn; |
53 | | use nativelink_util::store_trait::{Store, StoreLike, StoreOptimizations, UploadSizeInfo}; |
54 | | use nativelink_util::task::JoinHandleDropGuard; |
55 | | use opentelemetry::context::FutureExt; |
56 | | use parking_lot::Mutex; |
57 | | use tokio::time::sleep; |
58 | | use tonic::{Request, Response, Status, Streaming}; |
59 | | use tracing::{Instrument, Level, debug, error, error_span, info, instrument, trace, warn}; |
60 | | |
61 | | /// If this value changes update the documentation in the config definition. |
62 | | const DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(60); |
63 | | |
64 | | /// If this value changes update the documentation in the config definition. |
65 | | const DEFAULT_MAX_BYTES_PER_STREAM: usize = 64 * 1024; |
66 | | |
67 | | /// Metrics for `ByteStream` server operations. |
68 | | /// Tracks upload/download activity, throughput, and latency. |
69 | | #[derive(Debug, Default)] |
70 | | pub struct ByteStreamMetrics { |
71 | | /// Number of currently active uploads (includes idle streams waiting for resume) |
72 | | pub active_uploads: AtomicU64, |
73 | | /// Total number of write requests received |
74 | | pub write_requests_total: AtomicU64, |
75 | | /// Total number of successful write requests |
76 | | pub write_requests_success: AtomicU64, |
77 | | /// Total number of failed write requests |
78 | | pub write_requests_failure: AtomicU64, |
79 | | /// Total number of read requests received |
80 | | pub read_requests_total: AtomicU64, |
81 | | /// Total number of successful read requests |
82 | | pub read_requests_success: AtomicU64, |
83 | | /// Total number of failed read requests |
84 | | pub read_requests_failure: AtomicU64, |
85 | | /// Total number of `query_write_status` requests |
86 | | pub query_write_status_total: AtomicU64, |
87 | | /// Total bytes written via `ByteStream` |
88 | | pub bytes_written_total: AtomicU64, |
89 | | /// Total bytes read via `ByteStream` |
90 | | pub bytes_read_total: AtomicU64, |
91 | | /// Sum of write durations in nanoseconds (for average latency calculation) |
92 | | pub write_duration_ns: AtomicU64, |
93 | | /// Sum of read durations in nanoseconds (for average latency calculation) |
94 | | pub read_duration_ns: AtomicU64, |
95 | | /// Number of UUID collisions detected |
96 | | pub uuid_collisions: AtomicU64, |
97 | | /// Number of resumed uploads (client reconnected to existing stream) |
98 | | pub resumed_uploads: AtomicU64, |
99 | | /// Number of idle streams that timed out |
100 | | pub idle_stream_timeouts: AtomicU64, |
101 | | } |
102 | | |
103 | | impl MetricsComponent for ByteStreamMetrics { |
104 | 0 | fn publish( |
105 | 0 | &self, |
106 | 0 | _kind: MetricKind, |
107 | 0 | field_metadata: MetricFieldData, |
108 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
109 | 0 | let _enter = group!(field_metadata.name).entered(); |
110 | | |
111 | 0 | publish!( |
112 | 0 | "active_uploads", |
113 | 0 | &self.active_uploads, |
114 | 0 | MetricKind::Counter, |
115 | 0 | "Number of currently active uploads" |
116 | | ); |
117 | 0 | publish!( |
118 | 0 | "write_requests_total", |
119 | 0 | &self.write_requests_total, |
120 | 0 | MetricKind::Counter, |
121 | 0 | "Total write requests received" |
122 | | ); |
123 | 0 | publish!( |
124 | 0 | "write_requests_success", |
125 | 0 | &self.write_requests_success, |
126 | 0 | MetricKind::Counter, |
127 | 0 | "Total successful write requests" |
128 | | ); |
129 | 0 | publish!( |
130 | 0 | "write_requests_failure", |
131 | 0 | &self.write_requests_failure, |
132 | 0 | MetricKind::Counter, |
133 | 0 | "Total failed write requests" |
134 | | ); |
135 | 0 | publish!( |
136 | 0 | "read_requests_total", |
137 | 0 | &self.read_requests_total, |
138 | 0 | MetricKind::Counter, |
139 | 0 | "Total read requests received" |
140 | | ); |
141 | 0 | publish!( |
142 | 0 | "read_requests_success", |
143 | 0 | &self.read_requests_success, |
144 | 0 | MetricKind::Counter, |
145 | 0 | "Total successful read requests" |
146 | | ); |
147 | 0 | publish!( |
148 | 0 | "read_requests_failure", |
149 | 0 | &self.read_requests_failure, |
150 | 0 | MetricKind::Counter, |
151 | 0 | "Total failed read requests" |
152 | | ); |
153 | 0 | publish!( |
154 | 0 | "query_write_status_total", |
155 | 0 | &self.query_write_status_total, |
156 | 0 | MetricKind::Counter, |
157 | 0 | "Total query_write_status requests" |
158 | | ); |
159 | 0 | publish!( |
160 | 0 | "bytes_written_total", |
161 | 0 | &self.bytes_written_total, |
162 | 0 | MetricKind::Counter, |
163 | 0 | "Total bytes written via ByteStream" |
164 | | ); |
165 | 0 | publish!( |
166 | 0 | "bytes_read_total", |
167 | 0 | &self.bytes_read_total, |
168 | 0 | MetricKind::Counter, |
169 | 0 | "Total bytes read via ByteStream" |
170 | | ); |
171 | 0 | publish!( |
172 | 0 | "write_duration_ns", |
173 | 0 | &self.write_duration_ns, |
174 | 0 | MetricKind::Counter, |
175 | 0 | "Sum of write durations in nanoseconds" |
176 | | ); |
177 | 0 | publish!( |
178 | 0 | "read_duration_ns", |
179 | 0 | &self.read_duration_ns, |
180 | 0 | MetricKind::Counter, |
181 | 0 | "Sum of read durations in nanoseconds" |
182 | | ); |
183 | 0 | publish!( |
184 | 0 | "uuid_collisions", |
185 | 0 | &self.uuid_collisions, |
186 | 0 | MetricKind::Counter, |
187 | 0 | "Number of UUID collisions detected" |
188 | | ); |
189 | 0 | publish!( |
190 | 0 | "resumed_uploads", |
191 | 0 | &self.resumed_uploads, |
192 | 0 | MetricKind::Counter, |
193 | 0 | "Number of resumed uploads" |
194 | | ); |
195 | 0 | publish!( |
196 | 0 | "idle_stream_timeouts", |
197 | 0 | &self.idle_stream_timeouts, |
198 | 0 | MetricKind::Counter, |
199 | 0 | "Number of idle streams that timed out" |
200 | | ); |
201 | | |
202 | 0 | Ok(MetricPublishKnownKindData::Component) |
203 | 0 | } |
204 | | } |
205 | | |
206 | | type BytesWrittenAndIdleStream = (Arc<AtomicU64>, Option<IdleStream>); |
207 | | |
208 | | /// Type alias for the UUID key used in `active_uploads` `HashMap`. |
209 | | /// Using u128 instead of String reduces memory allocations and improves |
210 | | /// cache locality for `HashMap` operations. |
211 | | type UuidKey = u128; |
212 | | |
213 | | /// Parse a UUID string to a u128 for use as a `HashMap` key. |
214 | | /// This avoids heap allocation for String keys and improves `HashMap` performance. |
215 | | /// Falls back to hashing the string if it's not a valid hex UUID. |
216 | | #[inline] |
217 | 19 | fn parse_uuid_to_key(uuid_str: &str) -> UuidKey { |
218 | | // UUIDs are typically 32 hex chars (128 bits) or 36 chars with dashes. |
219 | | // We'll try to parse as hex first, then fall back to hashing. |
220 | 19 | let clean: String = uuid_str.chars().filter(char::is_ascii_hexdigit).collect(); |
221 | 19 | if clean.len() >= 16 { Branch (221:8): [True: 19, False: 0]
Branch (221:8): [Folded - Ignored]
|
222 | | // Take up to 32 hex chars (128 bits) |
223 | 19 | let hex_str = if clean.len() > 32 { Branch (223:26): [True: 0, False: 19]
Branch (223:26): [Folded - Ignored]
|
224 | 0 | &clean[..32] |
225 | | } else { |
226 | 19 | &clean |
227 | | }; |
228 | 19 | u128::from_str_radix(hex_str, 16).unwrap_or_else(|_| {0 |
229 | | // Hash fallback for non-hex strings |
230 | | use core::hash::{Hash, Hasher}; |
231 | 0 | let mut hasher = std::collections::hash_map::DefaultHasher::new(); |
232 | 0 | uuid_str.hash(&mut hasher); |
233 | 0 | u128::from(hasher.finish()) |
234 | 0 | }) |
235 | | } else { |
236 | | // Short strings: use hash |
237 | | use core::hash::{Hash, Hasher}; |
238 | 0 | let mut hasher = std::collections::hash_map::DefaultHasher::new(); |
239 | 0 | uuid_str.hash(&mut hasher); |
240 | 0 | u128::from(hasher.finish()) |
241 | | } |
242 | 19 | } |
243 | | |
244 | | pub struct InstanceInfo { |
245 | | store: Store, |
246 | | // Max number of bytes to send on each grpc stream chunk. |
247 | | max_bytes_per_stream: usize, |
248 | | /// Active uploads keyed by UUID as u128 for better performance. |
249 | | /// Using u128 keys instead of String reduces heap allocations |
250 | | /// and improves `HashMap` lookup performance. |
251 | | active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>>, |
252 | | /// How long to keep idle streams before timing them out. |
253 | | idle_stream_timeout: Duration, |
254 | | metrics: Arc<ByteStreamMetrics>, |
255 | | /// Handle to the global sweeper task. Kept alive for the lifetime of the instance. |
256 | | _sweeper_handle: Arc<JoinHandleDropGuard<()>>, |
257 | | } |
258 | | |
259 | | impl Debug for InstanceInfo { |
260 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { |
261 | 0 | f.debug_struct("InstanceInfo") |
262 | 0 | .field("store", &self.store) |
263 | 0 | .field("max_bytes_per_stream", &self.max_bytes_per_stream) |
264 | 0 | .field("active_uploads", &self.active_uploads) |
265 | 0 | .field("idle_stream_timeout", &self.idle_stream_timeout) |
266 | 0 | .field("metrics", &self.metrics) |
267 | 0 | .finish() |
268 | 0 | } |
269 | | } |
270 | | |
271 | | type ReadStream = Pin<Box<dyn Stream<Item = Result<ReadResponse, Status>> + Send + 'static>>; |
272 | | type StoreUpdateFuture = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>; |
273 | | |
274 | | struct StreamState { |
275 | | uuid: UuidKey, |
276 | | tx: DropCloserWriteHalf, |
277 | | store_update_fut: StoreUpdateFuture, |
278 | | } |
279 | | |
280 | | impl Debug for StreamState { |
281 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { |
282 | 0 | f.debug_struct("StreamState") |
283 | 0 | .field("uuid", &format!("{:032x}", self.uuid)) |
284 | 0 | .finish() |
285 | 0 | } |
286 | | } |
287 | | |
288 | | /// If a stream is in this state, it will automatically be put back into an `IdleStream` and |
289 | | /// placed back into the `active_uploads` map as an `IdleStream` after it is dropped. |
290 | | /// To prevent it from being put back into an `IdleStream` you must call `.graceful_finish()`. |
291 | | struct ActiveStreamGuard { |
292 | | stream_state: Option<StreamState>, |
293 | | bytes_received: Arc<AtomicU64>, |
294 | | active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>>, |
295 | | metrics: Arc<ByteStreamMetrics>, |
296 | | } |
297 | | |
298 | | impl ActiveStreamGuard { |
299 | | /// Consumes the guard. The stream will be considered "finished", will |
300 | | /// remove it from the `active_uploads`. |
301 | 6 | fn graceful_finish(mut self) { |
302 | 6 | let stream_state = self.stream_state.take().unwrap(); |
303 | 6 | self.active_uploads.lock().remove(&stream_state.uuid); |
304 | | // Decrement active uploads counter on successful completion |
305 | 6 | self.metrics.active_uploads.fetch_sub(1, Ordering::Relaxed); |
306 | 6 | } |
307 | | } |
308 | | |
309 | | impl Drop for ActiveStreamGuard { |
310 | 11 | fn drop(&mut self) { |
311 | 11 | let Some(stream_state5 ) = self.stream_state.take() else { Branch (311:13): [True: 5, False: 6]
Branch (311:13): [Folded - Ignored]
|
312 | 6 | return; // If None it means we don't want it put back into an IdleStream. |
313 | | }; |
314 | 5 | let mut active_uploads = self.active_uploads.lock(); |
315 | 5 | let uuid = stream_state.uuid; // u128 is Copy, no clone needed |
316 | 5 | let Some(active_uploads_slot) = active_uploads.get_mut(&uuid) else { Branch (316:13): [True: 5, False: 0]
Branch (316:13): [Folded - Ignored]
|
317 | 0 | error!( |
318 | | err = "Failed to find active upload. This should never happen.", |
319 | 0 | uuid = format!("{:032x}", uuid), |
320 | | ); |
321 | 0 | return; |
322 | | }; |
323 | | // Mark stream as idle with current timestamp. |
324 | | // The global sweeper will clean it up after idle_stream_timeout. |
325 | | // This avoids spawning a task per stream, reducing overhead from O(n) to O(1). |
326 | 5 | active_uploads_slot.1 = Some(IdleStream { |
327 | 5 | stream_state, |
328 | 5 | idle_since: Instant::now(), |
329 | 5 | }); |
330 | 11 | } |
331 | | } |
332 | | |
333 | | /// Represents a stream that is in the "idle" state. this means it is not currently being used |
334 | | /// by a client. If it is not used within a certain amount of time it will be removed from the |
335 | | /// `active_uploads` map automatically by the global sweeper task. |
336 | | #[derive(Debug)] |
337 | | struct IdleStream { |
338 | | stream_state: StreamState, |
339 | | /// When this stream became idle. Used by the global sweeper to determine expiration. |
340 | | idle_since: Instant, |
341 | | } |
342 | | |
343 | | impl IdleStream { |
344 | 3 | fn into_active_stream( |
345 | 3 | self, |
346 | 3 | bytes_received: Arc<AtomicU64>, |
347 | 3 | instance_info: &InstanceInfo, |
348 | 3 | ) -> ActiveStreamGuard { |
349 | 3 | ActiveStreamGuard { |
350 | 3 | stream_state: Some(self.stream_state), |
351 | 3 | bytes_received, |
352 | 3 | active_uploads: instance_info.active_uploads.clone(), |
353 | 3 | metrics: instance_info.metrics.clone(), |
354 | 3 | } |
355 | 3 | } |
356 | | } |
357 | | |
358 | | #[derive(Debug)] |
359 | | pub struct ByteStreamServer { |
360 | | instance_infos: HashMap<InstanceName, InstanceInfo>, |
361 | | } |
362 | | |
363 | | impl ByteStreamServer { |
364 | | /// Generate a unique UUID key by `XOR`ing the base key with a nanosecond timestamp. |
365 | | /// This ensures virtually zero collision probability while being O(1). |
366 | 0 | fn generate_unique_uuid_key(base_key: UuidKey) -> UuidKey { |
367 | 0 | let timestamp = SystemTime::now() |
368 | 0 | .duration_since(UNIX_EPOCH) |
369 | 0 | .unwrap_or_default() |
370 | 0 | .as_nanos(); |
371 | | // XOR with timestamp to create unique key |
372 | 0 | base_key ^ timestamp |
373 | 0 | } |
374 | | |
375 | 15 | pub fn new( |
376 | 15 | configs: &[WithInstanceName<ByteStreamConfig>], |
377 | 15 | store_manager: &StoreManager, |
378 | 15 | ) -> Result<Self, Error> { |
379 | 15 | let mut instance_infos: HashMap<String, InstanceInfo> = HashMap::new(); |
380 | 30 | for config15 in configs { |
381 | 15 | let idle_stream_timeout = if config.persist_stream_on_disconnect_timeout == 0 { Branch (381:42): [True: 15, False: 0]
Branch (381:42): [Folded - Ignored]
|
382 | 15 | DEFAULT_PERSIST_STREAM_ON_DISCONNECT_TIMEOUT |
383 | | } else { |
384 | 0 | Duration::from_secs(config.persist_stream_on_disconnect_timeout as u64) |
385 | | }; |
386 | 15 | let _old_value = instance_infos.insert( |
387 | 15 | config.instance_name.clone(), |
388 | 15 | Self::new_with_timeout(config, store_manager, idle_stream_timeout)?0 , |
389 | | ); |
390 | | } |
391 | 15 | Ok(Self { instance_infos }) |
392 | 15 | } |
393 | | |
394 | 15 | pub fn new_with_timeout( |
395 | 15 | config: &WithInstanceName<ByteStreamConfig>, |
396 | 15 | store_manager: &StoreManager, |
397 | 15 | idle_stream_timeout: Duration, |
398 | 15 | ) -> Result<InstanceInfo, Error> { |
399 | 15 | let store = store_manager |
400 | 15 | .get_store(&config.cas_store) |
401 | 15 | .ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", config.cas_store0 ))?0 ; |
402 | 15 | let max_bytes_per_stream = if config.max_bytes_per_stream == 0 { Branch (402:39): [True: 1, False: 14]
Branch (402:39): [Folded - Ignored]
|
403 | 1 | DEFAULT_MAX_BYTES_PER_STREAM |
404 | | } else { |
405 | 14 | config.max_bytes_per_stream |
406 | | }; |
407 | | |
408 | 15 | let active_uploads: Arc<Mutex<HashMap<UuidKey, BytesWrittenAndIdleStream>>> = |
409 | 15 | Arc::new(Mutex::new(HashMap::new())); |
410 | 15 | let metrics = Arc::new(ByteStreamMetrics::default()); |
411 | | |
412 | | // Spawn a single global sweeper task that periodically cleans up expired idle streams. |
413 | | // This replaces per-stream timeout tasks, reducing task spawn overhead from O(n) to O(1). |
414 | 15 | let sweeper_active_uploads = Arc::downgrade(&active_uploads); |
415 | 15 | let sweeper_metrics = Arc::downgrade(&metrics); |
416 | 15 | let sweep_interval = idle_stream_timeout / 2; // Check every half-timeout period |
417 | 15 | let sweeper_handle = spawn!("bytestream_idle_stream_sweeper", async move {12 |
418 | | loop { |
419 | 12 | sleep(sweep_interval).await; |
420 | | |
421 | 0 | let Some(active_uploads) = sweeper_active_uploads.upgrade() else { Branch (421:21): [True: 0, False: 0]
Branch (421:21): [Folded - Ignored]
|
422 | | // InstanceInfo has been dropped, exit the sweeper |
423 | 0 | break; |
424 | | }; |
425 | 0 | let metrics = sweeper_metrics.upgrade(); |
426 | | |
427 | 0 | let now = Instant::now(); |
428 | 0 | let mut expired_count = 0u64; |
429 | | |
430 | | // Lock and sweep expired entries |
431 | | { |
432 | 0 | let mut uploads = active_uploads.lock(); |
433 | 0 | uploads.retain(|uuid, (_, maybe_idle)| { |
434 | 0 | if let Some(idle_stream) = maybe_idle { Branch (434:32): [True: 0, False: 0]
Branch (434:32): [Folded - Ignored]
|
435 | 0 | if now.duration_since(idle_stream.idle_since) >= idle_stream_timeout { Branch (435:32): [True: 0, False: 0]
Branch (435:32): [Folded - Ignored]
|
436 | 0 | info!( |
437 | | msg = "Sweeping expired idle stream", |
438 | 0 | uuid = format!("{:032x}", uuid) |
439 | | ); |
440 | 0 | expired_count += 1; |
441 | 0 | return false; // Remove this entry |
442 | 0 | } |
443 | 0 | } |
444 | 0 | true // Keep this entry |
445 | 0 | }); |
446 | | } |
447 | | |
448 | | // Update metrics outside the lock |
449 | 0 | if expired_count > 0 { Branch (449:20): [True: 0, False: 0]
Branch (449:20): [Folded - Ignored]
|
450 | 0 | if let Some(m) = &metrics { Branch (450:28): [True: 0, False: 0]
Branch (450:28): [Folded - Ignored]
|
451 | 0 | m.idle_stream_timeouts |
452 | 0 | .fetch_add(expired_count, Ordering::Relaxed); |
453 | 0 | m.active_uploads.fetch_sub(expired_count, Ordering::Relaxed); |
454 | 0 | } |
455 | 0 | trace!( |
456 | | msg = "Sweeper cleaned up expired streams", |
457 | | count = expired_count |
458 | | ); |
459 | 0 | } |
460 | | } |
461 | 0 | }); |
462 | | |
463 | 15 | Ok(InstanceInfo { |
464 | 15 | store, |
465 | 15 | max_bytes_per_stream, |
466 | 15 | active_uploads, |
467 | 15 | idle_stream_timeout, |
468 | 15 | metrics, |
469 | 15 | _sweeper_handle: Arc::new(sweeper_handle), |
470 | 15 | }) |
471 | 15 | } |
472 | | |
473 | 1 | pub fn into_service(self) -> Server<Self> { |
474 | 1 | Server::new(self) |
475 | 1 | } |
476 | | |
477 | | /// Creates or joins an upload stream for the given UUID. |
478 | | /// |
479 | | /// This function handles three scenarios: |
480 | | /// 1. UUID doesn't exist - creates a new upload stream |
481 | | /// 2. UUID exists but is idle - resumes the existing stream |
482 | | /// 3. UUID exists and is active - generates a unique UUID by appending a nanosecond |
483 | | /// timestamp to avoid collision, then creates a new stream with that UUID |
484 | | /// |
485 | | /// The nanosecond timestamp ensures virtually zero probability of collision since |
486 | | /// two concurrent uploads would need to both collide on the original UUID AND |
487 | | /// generate the unique UUID in the exact same nanosecond. |
488 | 11 | fn create_or_join_upload_stream( |
489 | 11 | &self, |
490 | 11 | uuid_str: &str, |
491 | 11 | instance: &InstanceInfo, |
492 | 11 | digest: DigestInfo, |
493 | 11 | ) -> ActiveStreamGuard { |
494 | | // Parse UUID string to u128 key for efficient HashMap operations |
495 | 11 | let uuid_key = parse_uuid_to_key(uuid_str); |
496 | | |
497 | 8 | let (uuid, bytes_received, is_collision) = |
498 | 11 | match instance.active_uploads.lock().entry(uuid_key) { |
499 | 3 | Entry::Occupied(mut entry) => { |
500 | 3 | let maybe_idle_stream = entry.get_mut(); |
501 | 3 | if let Some(idle_stream) = maybe_idle_stream.1.take() { Branch (501:28): [True: 3, False: 0]
Branch (501:28): [Folded - Ignored]
|
502 | | // Case 2: Stream exists but is idle, we can resume it |
503 | 3 | let bytes_received = maybe_idle_stream.0.clone(); |
504 | 3 | info!( |
505 | | msg = "Joining existing stream", |
506 | 3 | uuid = format!("{:032x}", entry.key()) |
507 | | ); |
508 | | // Track resumed upload |
509 | 3 | instance |
510 | 3 | .metrics |
511 | 3 | .resumed_uploads |
512 | 3 | .fetch_add(1, Ordering::Relaxed); |
513 | 3 | return idle_stream.into_active_stream(bytes_received, instance); |
514 | 0 | } |
515 | | // Case 3: Stream is active - generate a unique UUID to avoid collision |
516 | | // Using nanosecond timestamp makes collision probability essentially zero |
517 | 0 | let original_key = *entry.key(); |
518 | 0 | let unique_key = Self::generate_unique_uuid_key(original_key); |
519 | 0 | warn!( |
520 | | msg = "UUID collision detected, generating unique UUID to prevent conflict", |
521 | 0 | original_uuid = format!("{:032x}", original_key), |
522 | 0 | unique_uuid = format!("{:032x}", unique_key) |
523 | | ); |
524 | | // Entry goes out of scope here, releasing the lock |
525 | | |
526 | 0 | let bytes_received = Arc::new(AtomicU64::new(0)); |
527 | 0 | let mut active_uploads = instance.active_uploads.lock(); |
528 | | // Insert with the unique UUID - this should never collide due to nanosecond precision |
529 | 0 | active_uploads.insert(unique_key, (bytes_received.clone(), None)); |
530 | 0 | (unique_key, bytes_received, true) |
531 | | } |
532 | 8 | Entry::Vacant(entry) => { |
533 | | // Case 1: UUID doesn't exist, create new stream |
534 | 8 | let bytes_received = Arc::new(AtomicU64::new(0)); |
535 | 8 | let uuid = *entry.key(); |
536 | | // Our stream is "in use" if the key is in the map, but the value is None. |
537 | 8 | entry.insert((bytes_received.clone(), None)); |
538 | 8 | (uuid, bytes_received, false) |
539 | | } |
540 | | }; |
541 | | |
542 | | // Track metrics for new upload |
543 | 8 | instance |
544 | 8 | .metrics |
545 | 8 | .active_uploads |
546 | 8 | .fetch_add(1, Ordering::Relaxed); |
547 | 8 | if is_collision { Branch (547:12): [True: 0, False: 8]
Branch (547:12): [Folded - Ignored]
|
548 | 0 | instance |
549 | 0 | .metrics |
550 | 0 | .uuid_collisions |
551 | 0 | .fetch_add(1, Ordering::Relaxed); |
552 | 8 | } |
553 | | |
554 | | // Important: Do not return an error from this point onwards without |
555 | | // removing the entry from the map, otherwise that UUID becomes |
556 | | // unusable. |
557 | | |
558 | 8 | let (tx, rx) = make_buf_channel_pair(); |
559 | 8 | let store = instance.store.clone(); |
560 | 8 | let store_update_fut = Box::pin(async move {6 |
561 | | // We need to wrap `Store::update()` in a another future because we need to capture |
562 | | // `store` to ensure its lifetime follows the future and not the caller. |
563 | 6 | store |
564 | 6 | // Bytestream always uses digest size as the actual byte size. |
565 | 6 | .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes())) |
566 | 6 | .await |
567 | 6 | }); |
568 | 8 | ActiveStreamGuard { |
569 | 8 | stream_state: Some(StreamState { |
570 | 8 | uuid, |
571 | 8 | tx, |
572 | 8 | store_update_fut, |
573 | 8 | }), |
574 | 8 | bytes_received, |
575 | 8 | active_uploads: instance.active_uploads.clone(), |
576 | 8 | metrics: instance.metrics.clone(), |
577 | 8 | } |
578 | 11 | } |
579 | | |
580 | 3 | async fn inner_read( |
581 | 3 | &self, |
582 | 3 | instance: &InstanceInfo, |
583 | 3 | digest: DigestInfo, |
584 | 3 | read_request: ReadRequest, |
585 | 3 | ) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + Send + use<>, Error> { |
586 | | struct ReaderState { |
587 | | max_bytes_per_stream: usize, |
588 | | rx: DropCloserReadHalf, |
589 | | maybe_get_part_result: Option<Result<(), Error>>, |
590 | | get_part_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>, |
591 | | } |
592 | | |
593 | 3 | let read_limit = u64::try_from(read_request.read_limit) |
594 | 3 | .err_tip(|| "Could not convert read_limit to u64")?0 ; |
595 | | |
596 | 3 | let (tx, rx) = make_buf_channel_pair(); |
597 | | |
598 | 3 | let read_limit = if read_limit != 0 { Branch (598:29): [True: 3, False: 0]
Branch (598:29): [Folded - Ignored]
|
599 | 3 | Some(read_limit) |
600 | | } else { |
601 | 0 | None |
602 | | }; |
603 | | |
604 | | // This allows us to call a destructor when the the object is dropped. |
605 | 3 | let store = instance.store.clone(); |
606 | 3 | let state = Some(ReaderState { |
607 | 3 | rx, |
608 | 3 | max_bytes_per_stream: instance.max_bytes_per_stream, |
609 | 3 | maybe_get_part_result: None, |
610 | 3 | get_part_fut: Box::pin(async move { |
611 | 3 | store |
612 | 3 | .get_part( |
613 | 3 | digest, |
614 | 3 | tx, |
615 | 3 | u64::try_from(read_request.read_offset) |
616 | 3 | .err_tip(|| "Could not convert read_offset to u64")?0 , |
617 | 3 | read_limit, |
618 | | ) |
619 | 3 | .await |
620 | 3 | }), |
621 | | }); |
622 | | |
623 | 3 | let read_stream_span = error_span!("read_stream"); |
624 | | |
625 | 9.77k | Ok(Box::pin3 (unfold3 (state3 , move |state| { |
626 | 9.77k | async { |
627 | 9.77k | let mut state = state?0 ; // If None our stream is done. |
628 | 9.77k | let mut response = ReadResponse::default(); |
629 | | { |
630 | 9.77k | let consume_fut = state.rx.consume(Some(state.max_bytes_per_stream)); |
631 | 9.77k | tokio::pin!(consume_fut); |
632 | | loop { |
633 | 9.77k | tokio::select! { |
634 | 9.77k | read_result9.77k = &mut consume_fut => { |
635 | 9.77k | match read_result { |
636 | 9.76k | Ok(bytes) => { |
637 | 9.76k | if bytes.is_empty() { Branch (637:40): [True: 2, False: 9.76k]
Branch (637:40): [Folded - Ignored]
|
638 | | // EOF. |
639 | 2 | return None; |
640 | 9.76k | } |
641 | 9.76k | if bytes.len() > state.max_bytes_per_stream { Branch (641:40): [True: 0, False: 9.76k]
Branch (641:40): [Folded - Ignored]
|
642 | 0 | let err = make_err!(Code::Internal, "Returned store size was larger than read size"); |
643 | 0 | return Some((Err(err.into()), None)); |
644 | 9.76k | } |
645 | 9.76k | response.data = bytes; |
646 | 9.76k | trace!(response = ?response); |
647 | 9.76k | debug!(response.data = format!("<redacted len({})>", response.data.len())); |
648 | 9.76k | break; |
649 | | } |
650 | 1 | Err(mut e) => { |
651 | | // We may need to propagate the error from reading the data through first. |
652 | | // For example, the NotFound error will come through `get_part_fut`, and |
653 | | // will not be present in `e`, but we need to ensure we pass NotFound error |
654 | | // code or the client won't know why it failed. |
655 | 1 | let get_part_result = if let Some(result) = state.maybe_get_part_result { Branch (655:66): [True: 1, False: 0]
Branch (655:66): [Folded - Ignored]
|
656 | 1 | result |
657 | | } else { |
658 | | // This should never be `future::pending()` if maybe_get_part_result is |
659 | | // not set. |
660 | 0 | state.get_part_fut.await |
661 | | }; |
662 | 1 | if let Err(err) = get_part_result { Branch (662:44): [True: 1, False: 0]
Branch (662:44): [Folded - Ignored]
|
663 | 1 | e = err.merge(e); |
664 | 1 | }0 |
665 | 1 | if e.code == Code::NotFound { Branch (665:40): [True: 1, False: 0]
Branch (665:40): [Folded - Ignored]
|
666 | 1 | // Trim the error code. Not Found is quite common and we don't want to send a large |
667 | 1 | // error (debug) message for something that is common. We resize to just the last |
668 | 1 | // message as it will be the most relevant. |
669 | 1 | e.messages.truncate(1); |
670 | 1 | }0 |
671 | 1 | error!(response = ?e); |
672 | 1 | return Some((Err(e.into()), None)) |
673 | | } |
674 | | } |
675 | | }, |
676 | 9.77k | result3 = &mut state.get_part_fut => { |
677 | 3 | state.maybe_get_part_result = Some(result); |
678 | 3 | // It is non-deterministic on which future will finish in what order. |
679 | 3 | // It is also possible that the `state.rx.consume()` call above may not be able to |
680 | 3 | // respond even though the publishing future is done. |
681 | 3 | // Because of this we set the writing future to pending so it never finishes. |
682 | 3 | // The `state.rx.consume()` future will eventually finish and return either the |
683 | 3 | // data or an error. |
684 | 3 | // An EOF will terminate the `state.rx.consume()` future, but we are also protected |
685 | 3 | // because we are dropping the writing future, it will drop the `tx` channel |
686 | 3 | // which will eventually propagate an error to the `state.rx.consume()` future if |
687 | 3 | // the EOF was not sent due to some other error. |
688 | 3 | state.get_part_fut = Box::pin(pending()); |
689 | 3 | }, |
690 | | } |
691 | | } |
692 | | } |
693 | 9.76k | Some((Ok(response), Some(state))) |
694 | 9.77k | }.instrument(read_stream_span.clone()) |
695 | 9.77k | }))) |
696 | 3 | } |
697 | | |
698 | | // We instrument tracing here as well as below because `stream` has a hash on it |
699 | | // that is extracted from the first stream message. If we only implemented it below |
700 | | // we would not have the hash available to us. |
701 | | #[instrument( |
702 | | ret(level = Level::DEBUG), |
703 | | level = Level::ERROR, |
704 | | skip(self, instance_info), |
705 | | )] |
706 | 11 | async fn inner_write( |
707 | 11 | &self, |
708 | 11 | instance_info: &InstanceInfo, |
709 | 11 | digest: DigestInfo, |
710 | 11 | stream: WriteRequestStreamWrapper<impl Stream<Item = Result<WriteRequest, Status>> + Unpin>, |
711 | 11 | ) -> Result<Response<WriteResponse>, Error> { |
712 | 11 | async fn process_client_stream( |
713 | 11 | mut stream: WriteRequestStreamWrapper< |
714 | 11 | impl Stream<Item = Result<WriteRequest, Status>> + Unpin, |
715 | 11 | >, |
716 | 11 | tx: &mut DropCloserWriteHalf, |
717 | 11 | outer_bytes_received: &Arc<AtomicU64>, |
718 | 11 | expected_size: u64, |
719 | 11 | ) -> Result<(), Error> { |
720 | | loop { |
721 | 21 | let write_request17 = match stream.next().await { |
722 | | // Code path for when client tries to gracefully close the stream. |
723 | | // If this happens it means there's a problem with the data sent, |
724 | | // because we always close the stream from our end before this point |
725 | | // by counting the number of bytes sent from the client. If they send |
726 | | // less than the amount they said they were going to send and then |
727 | | // close the stream, we know there's a problem. |
728 | | None => { |
729 | 0 | return Err(make_input_err!( |
730 | 0 | "Client closed stream before sending all data" |
731 | 0 | )); |
732 | | } |
733 | | // Code path for client stream error. Probably client disconnect. |
734 | 4 | Some(Err(err)) => return Err(err), |
735 | | // Code path for received chunk of data. |
736 | 17 | Some(Ok(write_request)) => write_request, |
737 | | }; |
738 | | |
739 | 17 | if write_request.write_offset < 0 { Branch (739:20): [True: 0, False: 17]
Branch (739:20): [Folded - Ignored]
|
740 | 0 | return Err(make_input_err!( |
741 | 0 | "Invalid negative write offset in write request: {}", |
742 | 0 | write_request.write_offset |
743 | 0 | )); |
744 | 17 | } |
745 | 17 | let write_offset = write_request.write_offset as u64; |
746 | | |
747 | | // If we get duplicate data because a client didn't know where |
748 | | // it left off from, then we can simply skip it. |
749 | 17 | let data16 = if write_offset < tx.get_bytes_written() { Branch (749:31): [True: 2, False: 15]
Branch (749:31): [Folded - Ignored]
|
750 | 2 | if (write_offset + write_request.data.len() as u64) < tx.get_bytes_written() { Branch (750:24): [True: 0, False: 2]
Branch (750:24): [Folded - Ignored]
|
751 | 0 | if write_request.finish_write { Branch (751:28): [True: 0, False: 0]
Branch (751:28): [Folded - Ignored]
|
752 | 0 | return Err(make_input_err!( |
753 | 0 | "Resumed stream finished at {} bytes when we already received {} bytes.", |
754 | 0 | write_offset + write_request.data.len() as u64, |
755 | 0 | tx.get_bytes_written() |
756 | 0 | )); |
757 | 0 | } |
758 | 0 | continue; |
759 | 2 | } |
760 | 2 | write_request.data.slice( |
761 | 2 | usize::try_from(tx.get_bytes_written() - write_offset) |
762 | 2 | .unwrap_or(usize::MAX).., |
763 | | ) |
764 | | } else { |
765 | 15 | if write_offset != tx.get_bytes_written() { Branch (765:24): [True: 1, False: 14]
Branch (765:24): [Folded - Ignored]
|
766 | 1 | return Err(make_input_err!( |
767 | 1 | "Received out of order data. Got {}, expected {}", |
768 | 1 | write_offset, |
769 | 1 | tx.get_bytes_written() |
770 | 1 | )); |
771 | 14 | } |
772 | 14 | write_request.data |
773 | | }; |
774 | | |
775 | | // Do not process EOF or weird stuff will happen. |
776 | 16 | if !data.is_empty() { Branch (776:20): [True: 12, False: 4]
Branch (776:20): [Folded - Ignored]
|
777 | | // We also need to process the possible EOF branch, so we can't early return. |
778 | 12 | if let Err(mut err0 ) = tx.send(data).await { Branch (778:28): [True: 0, False: 12]
Branch (778:28): [Folded - Ignored]
|
779 | 0 | err.code = Code::Internal; |
780 | 0 | return Err(err); |
781 | 12 | } |
782 | 12 | outer_bytes_received.store(tx.get_bytes_written(), Ordering::Release); |
783 | 4 | } |
784 | | |
785 | 16 | if expected_size < tx.get_bytes_written() { Branch (785:20): [True: 0, False: 16]
Branch (785:20): [Folded - Ignored]
|
786 | 0 | return Err(make_input_err!("Received more bytes than expected")); |
787 | 16 | } |
788 | 16 | if write_request.finish_write { Branch (788:20): [True: 6, False: 10]
Branch (788:20): [Folded - Ignored]
|
789 | | // Gracefully close our stream. |
790 | 6 | tx.send_eof() |
791 | 6 | .err_tip(|| "Failed to send EOF in ByteStream::write")?0 ; |
792 | 6 | return Ok(()); |
793 | 10 | } |
794 | | // Continue. |
795 | | } |
796 | | // Unreachable. |
797 | 11 | } |
798 | | |
799 | | let uuid = stream |
800 | | .resource_info |
801 | | .uuid |
802 | | .as_ref() |
803 | | .ok_or_else(|| make_input_err!("UUID must be set if writing data"))?; |
804 | | let mut active_stream_guard = |
805 | | self.create_or_join_upload_stream(uuid, instance_info, digest); |
806 | | let expected_size = stream.resource_info.expected_size as u64; |
807 | | |
808 | | let active_stream = active_stream_guard.stream_state.as_mut().unwrap(); |
809 | | try_join!( |
810 | | process_client_stream( |
811 | | stream, |
812 | | &mut active_stream.tx, |
813 | | &active_stream_guard.bytes_received, |
814 | | expected_size |
815 | | ), |
816 | | (&mut active_stream.store_update_fut) |
817 | 0 | .map_err(|err| { err.append("Error updating inner store") }) |
818 | | )?; |
819 | | |
820 | | // Close our guard and consider the stream no longer active. |
821 | | active_stream_guard.graceful_finish(); |
822 | | |
823 | | Ok(Response::new(WriteResponse { |
824 | | committed_size: expected_size as i64, |
825 | | })) |
826 | 11 | } |
827 | | |
828 | | /// Fast-path write that bypasses channel overhead for stores that support direct Bytes updates. |
829 | | /// This buffers all data in memory and calls `update_oneshot` directly. |
830 | 4 | async fn inner_write_oneshot( |
831 | 4 | &self, |
832 | 4 | instance_info: &InstanceInfo, |
833 | 4 | digest: DigestInfo, |
834 | 4 | mut stream: WriteRequestStreamWrapper< |
835 | 4 | impl Stream<Item = Result<WriteRequest, Status>> + Unpin, |
836 | 4 | >, |
837 | 4 | ) -> Result<Response<WriteResponse>, Error> { |
838 | 4 | let expected_size = stream.resource_info.expected_size as u64; |
839 | | |
840 | | // Pre-allocate buffer for expected size (capped at reasonable limit to prevent DoS) |
841 | 4 | let capacity = |
842 | 4 | usize::try_from(expected_size.min(64 * 1024 * 1024)).unwrap_or(64 * 1024 * 1024); |
843 | 4 | let mut buffer = BytesMut::with_capacity(capacity); |
844 | 4 | let mut bytes_received: u64 = 0; |
845 | | |
846 | | // Collect all data from client stream |
847 | | loop { |
848 | 4 | let write_request3 = match stream.next().await { |
849 | | None => { |
850 | 0 | return Err(make_input_err!( |
851 | 0 | "Client closed stream before sending all data" |
852 | 0 | )); |
853 | | } |
854 | 1 | Some(Err(err)) => return Err(err), |
855 | 3 | Some(Ok(write_request)) => write_request, |
856 | | }; |
857 | | |
858 | 3 | if write_request.write_offset < 0 { Branch (858:16): [True: 1, False: 2]
Branch (858:16): [Folded - Ignored]
|
859 | 1 | return Err(make_input_err!( |
860 | 1 | "Invalid negative write offset in write request: {}", |
861 | 1 | write_request.write_offset |
862 | 1 | )); |
863 | 2 | } |
864 | 2 | let write_offset = write_request.write_offset as u64; |
865 | | |
866 | | // Handle duplicate/resumed data |
867 | 2 | let data = if write_offset < bytes_received { Branch (867:27): [True: 0, False: 2]
Branch (867:27): [Folded - Ignored]
|
868 | 0 | if (write_offset + write_request.data.len() as u64) < bytes_received { Branch (868:20): [True: 0, False: 0]
Branch (868:20): [Folded - Ignored]
|
869 | 0 | if write_request.finish_write { Branch (869:24): [True: 0, False: 0]
Branch (869:24): [Folded - Ignored]
|
870 | 0 | return Err(make_input_err!( |
871 | 0 | "Resumed stream finished at {} bytes when we already received {} bytes.", |
872 | 0 | write_offset + write_request.data.len() as u64, |
873 | 0 | bytes_received |
874 | 0 | )); |
875 | 0 | } |
876 | 0 | continue; |
877 | 0 | } |
878 | 0 | write_request |
879 | 0 | .data |
880 | 0 | .slice(usize::try_from(bytes_received - write_offset).unwrap_or(usize::MAX)..) |
881 | | } else { |
882 | 2 | if write_offset != bytes_received { Branch (882:20): [True: 0, False: 2]
Branch (882:20): [Folded - Ignored]
|
883 | 0 | return Err(make_input_err!( |
884 | 0 | "Received out of order data. Got {}, expected {}", |
885 | 0 | write_offset, |
886 | 0 | bytes_received |
887 | 0 | )); |
888 | 2 | } |
889 | 2 | write_request.data |
890 | | }; |
891 | | |
892 | 2 | if !data.is_empty() { Branch (892:16): [True: 1, False: 1]
Branch (892:16): [Folded - Ignored]
|
893 | 1 | buffer.extend_from_slice(&data); |
894 | 1 | bytes_received += data.len() as u64; |
895 | 1 | } |
896 | | |
897 | 2 | if expected_size < bytes_received { Branch (897:16): [True: 0, False: 2]
Branch (897:16): [Folded - Ignored]
|
898 | 0 | return Err(make_input_err!("Received more bytes than expected")); |
899 | 2 | } |
900 | | |
901 | 2 | if write_request.finish_write { Branch (901:16): [True: 2, False: 0]
Branch (901:16): [Folded - Ignored]
|
902 | 2 | break; |
903 | 0 | } |
904 | | } |
905 | | |
906 | | // Direct update without channel overhead |
907 | 2 | let store = instance_info.store.clone(); |
908 | 2 | store |
909 | 2 | .update_oneshot(digest, buffer.freeze()) |
910 | 2 | .await |
911 | 2 | .err_tip(|| "Error in update_oneshot")?0 ; |
912 | | |
913 | | // Note: bytes_written_total is updated in the caller (bytestream_write) based on result |
914 | | |
915 | 2 | Ok(Response::new(WriteResponse { |
916 | 2 | committed_size: expected_size as i64, |
917 | 2 | })) |
918 | 4 | } |
919 | | |
920 | 3 | async fn inner_query_write_status( |
921 | 3 | &self, |
922 | 3 | query_request: &QueryWriteStatusRequest, |
923 | 3 | ) -> Result<Response<QueryWriteStatusResponse>, Error> { |
924 | 3 | let mut resource_info = ResourceInfo::new(&query_request.resource_name, true)?0 ; |
925 | | |
926 | 3 | let instance = self |
927 | 3 | .instance_infos |
928 | 3 | .get(resource_info.instance_name.as_ref()) |
929 | 3 | .err_tip(|| {0 |
930 | 0 | format!( |
931 | 0 | "'instance_name' not configured for '{}'", |
932 | 0 | &resource_info.instance_name |
933 | | ) |
934 | 0 | })?; |
935 | 3 | let store_clone = instance.store.clone(); |
936 | | |
937 | 3 | let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?0 ; |
938 | | |
939 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
940 | 3 | if let Some(grpc_store0 ) = store_clone.downcast_ref::<GrpcStore>(Some(digest.into())) { Branch (940:16): [True: 0, False: 3]
Branch (940:16): [Folded - Ignored]
|
941 | 0 | return grpc_store |
942 | 0 | .query_write_status(Request::new(query_request.clone())) |
943 | 0 | .await; |
944 | 3 | } |
945 | | |
946 | 3 | let uuid_str = resource_info |
947 | 3 | .uuid |
948 | 3 | .take() |
949 | 3 | .ok_or_else(|| make_input_err!("UUID must be set if querying write status"))?0 ; |
950 | 3 | let uuid_key = parse_uuid_to_key(&uuid_str); |
951 | | |
952 | | { |
953 | 3 | let active_uploads = instance.active_uploads.lock(); |
954 | 3 | if let Some((received_bytes1 , _maybe_idle_stream1 )) = active_uploads.get(&uuid_key) { Branch (954:20): [True: 1, False: 2]
Branch (954:20): [Folded - Ignored]
|
955 | 1 | return Ok(Response::new(QueryWriteStatusResponse { |
956 | 1 | committed_size: received_bytes.load(Ordering::Acquire) as i64, |
957 | 1 | // If we are in the active_uploads map, but the value is None, |
958 | 1 | // it means the stream is not complete. |
959 | 1 | complete: false, |
960 | 1 | })); |
961 | 2 | } |
962 | | } |
963 | | |
964 | 2 | let has_fut = store_clone.has(digest); |
965 | 2 | let Some(item_size1 ) = has_fut.await.err_tip(|| "Failed to call .has() on store")?0 else { Branch (965:13): [True: 1, False: 1]
Branch (965:13): [Folded - Ignored]
|
966 | | // We lie here and say that the stream needs to start over, even though |
967 | | // it was never started. This can happen when the client disconnects |
968 | | // before sending the first payload, but the client thinks it did send |
969 | | // the payload. |
970 | 1 | return Ok(Response::new(QueryWriteStatusResponse { |
971 | 1 | committed_size: 0, |
972 | 1 | complete: false, |
973 | 1 | })); |
974 | | }; |
975 | 1 | Ok(Response::new(QueryWriteStatusResponse { |
976 | 1 | committed_size: item_size as i64, |
977 | 1 | complete: true, |
978 | 1 | })) |
979 | 3 | } |
980 | | } |
981 | | |
982 | | #[tonic::async_trait] |
983 | | impl ByteStream for ByteStreamServer { |
984 | | type ReadStream = ReadStream; |
985 | | |
986 | | #[instrument( |
987 | | err, |
988 | | level = Level::ERROR, |
989 | | skip_all, |
990 | | fields(request = ?grpc_request.get_ref()) |
991 | | )] |
992 | | async fn read( |
993 | | &self, |
994 | | grpc_request: Request<ReadRequest>, |
995 | | ) -> Result<Response<Self::ReadStream>, Status> { |
996 | | let start_time = Instant::now(); |
997 | | |
998 | | let read_request = grpc_request.into_inner(); |
999 | | let resource_info = ResourceInfo::new(&read_request.resource_name, false)?; |
1000 | | let instance_name = resource_info.instance_name.as_ref(); |
1001 | | let expected_size = resource_info.expected_size as u64; |
1002 | | let instance = self |
1003 | | .instance_infos |
1004 | | .get(instance_name) |
1005 | 0 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"))?; |
1006 | | |
1007 | | // Track read request |
1008 | | instance |
1009 | | .metrics |
1010 | | .read_requests_total |
1011 | | .fetch_add(1, Ordering::Relaxed); |
1012 | | |
1013 | | let store = instance.store.clone(); |
1014 | | |
1015 | | let digest = DigestInfo::try_new(resource_info.hash.as_ref(), resource_info.expected_size)?; |
1016 | | |
1017 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
1018 | | if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) { |
1019 | | let stream = Box::pin(grpc_store.read(Request::new(read_request)).await?); |
1020 | | return Ok(Response::new(stream)); |
1021 | | } |
1022 | | |
1023 | | let digest_function = resource_info.digest_function.as_deref().map_or_else( |
1024 | 3 | || Ok(default_digest_hasher_func()), |
1025 | | DigestHasherFunc::try_from, |
1026 | | )?; |
1027 | | |
1028 | | let resp = self |
1029 | | .inner_read(instance, digest, read_request) |
1030 | | .instrument(error_span!("bytestream_read")) |
1031 | | .with_context( |
1032 | | make_ctx_for_hash_func(digest_function).err_tip(|| "In BytestreamServer::read")?, |
1033 | | ) |
1034 | | .await |
1035 | | .err_tip(|| "In ByteStreamServer::read") |
1036 | 3 | .map(|stream| -> Response<Self::ReadStream> { Response::new(Box::pin(stream)) }); |
1037 | | |
1038 | | // Track metrics based on result |
1039 | | #[allow(clippy::cast_possible_truncation)] |
1040 | | let elapsed_ns = start_time.elapsed().as_nanos() as u64; |
1041 | | instance |
1042 | | .metrics |
1043 | | .read_duration_ns |
1044 | | .fetch_add(elapsed_ns, Ordering::Relaxed); |
1045 | | |
1046 | | match &resp { |
1047 | | Ok(_) => { |
1048 | | instance |
1049 | | .metrics |
1050 | | .read_requests_success |
1051 | | .fetch_add(1, Ordering::Relaxed); |
1052 | | instance |
1053 | | .metrics |
1054 | | .bytes_read_total |
1055 | | .fetch_add(expected_size, Ordering::Relaxed); |
1056 | | debug!(return = "Ok(<stream>)"); |
1057 | | } |
1058 | | Err(_) => { |
1059 | | instance |
1060 | | .metrics |
1061 | | .read_requests_failure |
1062 | | .fetch_add(1, Ordering::Relaxed); |
1063 | | } |
1064 | | } |
1065 | | |
1066 | | resp.map_err(Into::into) |
1067 | | } |
1068 | | |
1069 | | #[instrument( |
1070 | | err, |
1071 | | level = Level::ERROR, |
1072 | | skip_all, |
1073 | | fields(request = ?grpc_request.get_ref()) |
1074 | | )] |
1075 | | async fn write( |
1076 | | &self, |
1077 | | grpc_request: Request<Streaming<WriteRequest>>, |
1078 | | ) -> Result<Response<WriteResponse>, Status> { |
1079 | | let start_time = Instant::now(); |
1080 | | |
1081 | | let request = grpc_request.into_inner(); |
1082 | | let stream = WriteRequestStreamWrapper::from(request) |
1083 | | .await |
1084 | | .err_tip(|| "Could not unwrap first stream message") |
1085 | | .map_err(Into::<Status>::into)?; |
1086 | | |
1087 | | let instance_name = stream.resource_info.instance_name.as_ref(); |
1088 | | let expected_size = stream.resource_info.expected_size as u64; |
1089 | | let instance = self |
1090 | | .instance_infos |
1091 | | .get(instance_name) |
1092 | 0 | .err_tip(|| format!("'instance_name' not configured for '{instance_name}'"))?; |
1093 | | |
1094 | | // Track write request |
1095 | | instance |
1096 | | .metrics |
1097 | | .write_requests_total |
1098 | | .fetch_add(1, Ordering::Relaxed); |
1099 | | |
1100 | | let store = instance.store.clone(); |
1101 | | |
1102 | | let digest = DigestInfo::try_new( |
1103 | | &stream.resource_info.hash, |
1104 | | stream.resource_info.expected_size, |
1105 | | ) |
1106 | | .err_tip(|| "Invalid digest input in ByteStream::write")?; |
1107 | | |
1108 | | // If we are a GrpcStore we shortcut here, as this is a special store. |
1109 | | if let Some(grpc_store) = store.downcast_ref::<GrpcStore>(Some(digest.into())) { |
1110 | | let resp = grpc_store.write(stream).await.map_err(Into::into); |
1111 | | return resp; |
1112 | | } |
1113 | | |
1114 | | let digest_function = stream |
1115 | | .resource_info |
1116 | | .digest_function |
1117 | | .as_deref() |
1118 | | .map_or_else( |
1119 | 15 | || Ok(default_digest_hasher_func()), |
1120 | | DigestHasherFunc::try_from, |
1121 | | )?; |
1122 | | |
1123 | | // Check if store supports direct oneshot updates (bypasses channel overhead). |
1124 | | // Use fast-path only when: |
1125 | | // 1. Store supports oneshot optimization |
1126 | | // 2. UUID is provided |
1127 | | // 3. Size is under 64MB (memory safety) |
1128 | | // 4. This is a NEW upload (UUID not already in active_uploads) |
1129 | | // 5. The first message has finish_write=true (single-shot upload) |
1130 | | // |
1131 | | // The oneshot path cannot be used for multi-message streams because: |
1132 | | // - QueryWriteStatus won't work (no progress tracking) |
1133 | | // - Resumed streams won't work (no partial progress) |
1134 | | let use_oneshot = if store.optimized_for(StoreOptimizations::SubscribesToUpdateOneshot) |
1135 | | && expected_size <= 64 * 1024 * 1024 |
1136 | | && stream.resource_info.uuid.is_some() |
1137 | | { |
1138 | | // Check if first message completes the upload (single-shot) |
1139 | | let is_single_shot = stream.is_first_msg_complete(); |
1140 | | |
1141 | | if is_single_shot { |
1142 | | let uuid_str = stream.resource_info.uuid.as_ref().unwrap(); |
1143 | | let uuid_key = parse_uuid_to_key(uuid_str); |
1144 | | // Only use oneshot if this UUID is not already being tracked |
1145 | | !instance.active_uploads.lock().contains_key(&uuid_key) |
1146 | | } else { |
1147 | | false |
1148 | | } |
1149 | | } else { |
1150 | | false |
1151 | | }; |
1152 | | |
1153 | | let result = if use_oneshot { |
1154 | | self.inner_write_oneshot(instance, digest, stream) |
1155 | | .instrument(error_span!("bytestream_write_oneshot")) |
1156 | | .with_context( |
1157 | | make_ctx_for_hash_func(digest_function) |
1158 | | .err_tip(|| "In BytestreamServer::write")?, |
1159 | | ) |
1160 | | .await |
1161 | | .err_tip(|| "In ByteStreamServer::write (oneshot)") |
1162 | | } else { |
1163 | | self.inner_write(instance, digest, stream) |
1164 | | .instrument(error_span!("bytestream_write")) |
1165 | | .with_context( |
1166 | | make_ctx_for_hash_func(digest_function) |
1167 | | .err_tip(|| "In BytestreamServer::write")?, |
1168 | | ) |
1169 | | .await |
1170 | | .err_tip(|| "In ByteStreamServer::write") |
1171 | | }; |
1172 | | |
1173 | | // Track metrics based on result |
1174 | | #[allow(clippy::cast_possible_truncation)] |
1175 | | let elapsed_ns = start_time.elapsed().as_nanos() as u64; |
1176 | | instance |
1177 | | .metrics |
1178 | | .write_duration_ns |
1179 | | .fetch_add(elapsed_ns, Ordering::Relaxed); |
1180 | | |
1181 | | match &result { |
1182 | | Ok(_) => { |
1183 | | instance |
1184 | | .metrics |
1185 | | .write_requests_success |
1186 | | .fetch_add(1, Ordering::Relaxed); |
1187 | | instance |
1188 | | .metrics |
1189 | | .bytes_written_total |
1190 | | .fetch_add(expected_size, Ordering::Relaxed); |
1191 | | } |
1192 | | Err(_) => { |
1193 | | instance |
1194 | | .metrics |
1195 | | .write_requests_failure |
1196 | | .fetch_add(1, Ordering::Relaxed); |
1197 | | } |
1198 | | } |
1199 | | |
1200 | | result.map_err(Into::into) |
1201 | | } |
1202 | | |
1203 | | #[instrument( |
1204 | | err, |
1205 | | ret(level = Level::INFO), |
1206 | | level = Level::ERROR, |
1207 | | skip_all, |
1208 | | fields(request = ?grpc_request.get_ref()) |
1209 | | )] |
1210 | | async fn query_write_status( |
1211 | | &self, |
1212 | | grpc_request: Request<QueryWriteStatusRequest>, |
1213 | | ) -> Result<Response<QueryWriteStatusResponse>, Status> { |
1214 | | let request = grpc_request.into_inner(); |
1215 | | |
1216 | | // Track query_write_status request - we need to parse the resource name to get the instance |
1217 | | if let Ok(resource_info) = ResourceInfo::new(&request.resource_name, true) { |
1218 | | if let Some(instance) = self |
1219 | | .instance_infos |
1220 | | .get(resource_info.instance_name.as_ref()) |
1221 | | { |
1222 | | instance |
1223 | | .metrics |
1224 | | .query_write_status_total |
1225 | | .fetch_add(1, Ordering::Relaxed); |
1226 | | } |
1227 | | } |
1228 | | |
1229 | | self.inner_query_write_status(&request) |
1230 | | .await |
1231 | | .err_tip(|| "Failed on query_write_status() command") |
1232 | | .map_err(Into::into) |
1233 | | } |
1234 | | } |