Coverage Report

Created: 2025-07-10 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/mongo_store.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::ops::Bound;
17
use core::pin::Pin;
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::sync::{Arc, Weak};
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use futures::stream::{Stream, StreamExt, TryStreamExt};
25
use mongodb::bson::{Bson, Document, doc};
26
use mongodb::options::{ClientOptions, FindOptions, IndexOptions, ReturnDocument, WriteConcern};
27
use mongodb::{Client as MongoClient, Collection, Database, IndexModel};
28
use nativelink_config::stores::ExperimentalMongoSpec;
29
use nativelink_error::{Code, Error, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
32
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
33
use nativelink_util::spawn;
34
use nativelink_util::store_trait::{
35
    BoolValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore,
36
    SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider,
37
    SchedulerSubscription, SchedulerSubscriptionManager, StoreDriver, StoreKey, UploadSizeInfo,
38
};
39
use nativelink_util::task::JoinHandleDropGuard;
40
use parking_lot::{Mutex, RwLock};
41
use patricia_tree::StringPatriciaMap;
42
use tokio::sync::watch;
43
use tokio::time::sleep;
44
use tracing::{error, info, warn};
45
46
use crate::cas_utils::is_zero_digest;
47
48
/// The default database name if not specified.
49
const DEFAULT_DB_NAME: &str = "nativelink";
50
51
/// The default collection name if not specified.
52
const DEFAULT_COLLECTION_NAME: &str = "cas";
53
54
/// The default scheduler collection name if not specified.
55
const DEFAULT_SCHEDULER_COLLECTION_NAME: &str = "scheduler";
56
57
/// The default size of the read chunk when reading data from `MongoDB`.
58
const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
59
60
/// The default connection timeout in milliseconds if not specified.
61
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
62
63
/// The default command timeout in milliseconds if not specified.
64
const DEFAULT_COMMAND_TIMEOUT_MS: u64 = 10_000;
65
66
/// The default maximum number of concurrent uploads.
67
const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10;
68
69
/// The name of the field in `MongoDB` documents that stores the key.
70
const KEY_FIELD: &str = "_id";
71
72
/// The name of the field in `MongoDB` documents that stores the data.
73
const DATA_FIELD: &str = "data";
74
75
/// The name of the field in `MongoDB` documents that stores the version.
76
const VERSION_FIELD: &str = "version";
77
78
/// The name of the field in `MongoDB` documents that stores the size.
79
const SIZE_FIELD: &str = "size";
80
81
/// A [`StoreDriver`] implementation that uses `MongoDB` as a backing store.
82
#[derive(Debug, MetricsComponent)]
83
pub struct ExperimentalMongoStore {
84
    /// The `MongoDB` client.
85
    #[allow(dead_code)]
86
    client: MongoClient,
87
88
    /// The database to use.
89
    database: Database,
90
91
    /// The collection for CAS data.
92
    cas_collection: Collection<Document>,
93
94
    /// The collection for scheduler data.
95
    scheduler_collection: Collection<Document>,
96
97
    /// A common prefix to append to all keys before they are sent to `MongoDB`.
98
    #[metric(help = "Prefix to append to all keys before sending to MongoDB")]
99
    key_prefix: String,
100
101
    /// The amount of data to read from `MongoDB` at a time.
102
    #[metric(help = "The amount of data to read from MongoDB at a time")]
103
    read_chunk_size: usize,
104
105
    /// The maximum number of concurrent uploads.
106
    #[metric(help = "The maximum number of concurrent uploads")]
107
    max_concurrent_uploads: usize,
108
109
    /// Enable change streams for real-time updates.
110
    #[metric(help = "Whether change streams are enabled")]
111
    enable_change_streams: bool,
112
113
    /// A manager for subscriptions to keys in `MongoDB`.
114
    subscription_manager: Mutex<Option<Arc<ExperimentalMongoSubscriptionManager>>>,
115
}
116
117
impl ExperimentalMongoStore {
118
    /// Create a new `ExperimentalMongoStore` from the given configuration.
119
3
    pub async fn new(mut spec: ExperimentalMongoSpec) -> Result<Arc<Self>, Error> {
120
        // Set defaults
121
3
        if spec.connection_string.is_empty() {
  Branch (121:12): [True: 0, False: 0]
  Branch (121:12): [True: 1, False: 2]
  Branch (121:12): [Folded - Ignored]
122
1
            return Err(make_err!(
123
1
                Code::InvalidArgument,
124
1
                "No connection string was specified in mongo store configuration."
125
1
            ));
126
2
        }
127
128
2
        if spec.database.is_empty() {
  Branch (128:12): [True: 0, False: 0]
  Branch (128:12): [True: 0, False: 2]
  Branch (128:12): [Folded - Ignored]
129
0
            spec.database = DEFAULT_DB_NAME.to_string();
130
2
        }
131
132
2
        if spec.cas_collection.is_empty() {
  Branch (132:12): [True: 0, False: 0]
  Branch (132:12): [True: 0, False: 2]
  Branch (132:12): [Folded - Ignored]
133
0
            spec.cas_collection = DEFAULT_COLLECTION_NAME.to_string();
134
2
        }
135
136
2
        if spec.scheduler_collection.is_empty() {
  Branch (136:12): [True: 0, False: 0]
  Branch (136:12): [True: 0, False: 2]
  Branch (136:12): [Folded - Ignored]
137
0
            spec.scheduler_collection = DEFAULT_SCHEDULER_COLLECTION_NAME.to_string();
138
2
        }
139
140
2
        if spec.read_chunk_size == 0 {
  Branch (140:12): [True: 0, False: 0]
  Branch (140:12): [True: 0, False: 2]
  Branch (140:12): [Folded - Ignored]
141
0
            spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
142
2
        }
143
144
2
        if spec.connection_timeout_ms == 0 {
  Branch (144:12): [True: 0, False: 0]
  Branch (144:12): [True: 0, False: 2]
  Branch (144:12): [Folded - Ignored]
145
0
            spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS;
146
2
        }
147
148
2
        if spec.command_timeout_ms == 0 {
  Branch (148:12): [True: 0, False: 0]
  Branch (148:12): [True: 0, False: 2]
  Branch (148:12): [Folded - Ignored]
149
0
            spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS;
150
2
        }
151
152
2
        if spec.max_concurrent_uploads == 0 {
  Branch (152:12): [True: 0, False: 0]
  Branch (152:12): [True: 0, False: 2]
  Branch (152:12): [Folded - Ignored]
153
0
            spec.max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS;
154
2
        }
155
156
        // Configure client options
157
2
        let 
mut client_options1
= ClientOptions::parse(&spec.connection_string)
158
2
            .await
159
2
            .map_err(|e| 
{1
160
1
                make_err!(
161
1
                    Code::InvalidArgument,
162
                    "Failed to parse MongoDB connection string: {e}"
163
                )
164
1
            })?;
165
166
1
        client_options.server_selection_timeout =
167
1
            Some(Duration::from_millis(spec.connection_timeout_ms));
168
1
        client_options.connect_timeout = Some(Duration::from_millis(spec.connection_timeout_ms));
169
170
        // Set write concern if specified
171
1
        if let Some(
w0
) = spec.write_concern_w {
  Branch (171:16): [True: 0, False: 0]
  Branch (171:16): [True: 0, False: 1]
  Branch (171:16): [Folded - Ignored]
172
            // Parse write concern w - can be a number or string like "majority"
173
0
            let w_value = if let Ok(num) = w.parse::<u32>() {
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [True: 0, False: 0]
  Branch (173:34): [Folded - Ignored]
174
0
                Some(num.into())
175
            } else {
176
0
                Some(w.into())
177
            };
178
179
            // Build write concern based on which options are set
180
0
            let write_concern = match (w_value, spec.write_concern_j, spec.write_concern_timeout_ms)
181
            {
182
0
                (Some(w), Some(j), Some(timeout)) => WriteConcern::builder()
183
0
                    .w(Some(w))
184
0
                    .journal(j)
185
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
186
0
                    .build(),
187
0
                (Some(w), Some(j), None) => WriteConcern::builder().w(Some(w)).journal(j).build(),
188
0
                (Some(w), None, Some(timeout)) => WriteConcern::builder()
189
0
                    .w(Some(w))
190
0
                    .w_timeout(Some(Duration::from_millis(u64::from(timeout))))
191
0
                    .build(),
192
0
                (Some(w), None, None) => WriteConcern::builder().w(Some(w)).build(),
193
0
                _ => unreachable!(), // We know w is Some because we're in the if let Some(w) block
194
            };
195
196
0
            client_options.write_concern = Some(write_concern);
197
1
        } else if spec.write_concern_j.is_some() || 
spec.write_concern_timeout_ms0
.
is_some0
() {
  Branch (197:19): [True: 0, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [True: 1, False: 0]
  Branch (197:53): [True: 0, False: 0]
  Branch (197:19): [Folded - Ignored]
  Branch (197:53): [Folded - Ignored]
198
1
            return Err(make_err!(
199
1
                Code::InvalidArgument,
200
1
                "write_concern_w not set, but j and/or timeout set. Please set 'write_concern_w' to a non-default value. See https://www.mongodb.com/docs/manual/reference/write-concern/#w-option for options."
201
1
            ));
202
0
        }
203
204
        // Create client
205
0
        let client = MongoClient::with_options(client_options).map_err(|e| {
206
0
            make_err!(
207
0
                Code::InvalidArgument,
208
                "Failed to create MongoDB client: {e}"
209
            )
210
0
        })?;
211
212
        // Get database and collections
213
0
        let database = client.database(&spec.database);
214
0
        let cas_collection = database.collection::<Document>(&spec.cas_collection);
215
0
        let scheduler_collection = database.collection::<Document>(&spec.scheduler_collection);
216
217
        // Create indexes
218
0
        Self::create_indexes(&cas_collection, &scheduler_collection).await?;
219
220
0
        let store = Self {
221
0
            client,
222
0
            database,
223
0
            cas_collection,
224
0
            scheduler_collection,
225
0
            key_prefix: spec.key_prefix.clone().unwrap_or_default(),
226
0
            read_chunk_size: spec.read_chunk_size,
227
0
            max_concurrent_uploads: spec.max_concurrent_uploads,
228
0
            enable_change_streams: spec.enable_change_streams,
229
0
            subscription_manager: Mutex::new(None),
230
0
        };
231
232
0
        Ok(Arc::new(store))
233
3
    }
234
235
    /// Create necessary indexes for efficient operations.
236
0
    async fn create_indexes(
237
0
        cas_collection: &Collection<Document>,
238
0
        _scheduler_collection: &Collection<Document>,
239
0
    ) -> Result<(), Error> {
240
        // CAS collection indexes
241
0
        cas_collection
242
0
            .create_index(IndexModel::builder().keys(doc! { SIZE_FIELD: 1 }).build())
243
0
            .await
244
0
            .map_err(|e| {
245
0
                make_err!(
246
0
                    Code::Internal,
247
                    "Failed to create size index on CAS collection: {e}"
248
                )
249
0
            })?;
250
251
        // Scheduler collection will have dynamic indexes created as needed
252
253
0
        Ok(())
254
0
    }
255
256
    /// Encode a [`StoreKey`] so it can be sent to `MongoDB`.
257
0
    fn encode_key<'a>(&self, key: &'a StoreKey<'a>) -> Cow<'a, str> {
258
0
        let key_body = key.as_str();
259
0
        if self.key_prefix.is_empty() {
  Branch (259:12): [True: 0, False: 0]
  Branch (259:12): [Folded - Ignored]
260
0
            key_body
261
        } else {
262
0
            match key_body {
263
0
                Cow::Owned(mut encoded_key) => {
264
0
                    encoded_key.insert_str(0, &self.key_prefix);
265
0
                    Cow::Owned(encoded_key)
266
                }
267
0
                Cow::Borrowed(body) => {
268
0
                    let mut encoded_key = String::with_capacity(self.key_prefix.len() + body.len());
269
0
                    encoded_key.push_str(&self.key_prefix);
270
0
                    encoded_key.push_str(body);
271
0
                    Cow::Owned(encoded_key)
272
                }
273
            }
274
        }
275
0
    }
276
277
    /// Decode a key from `MongoDB` by removing the prefix.
278
0
    fn decode_key(&self, key: &str) -> Option<String> {
279
0
        if self.key_prefix.is_empty() {
  Branch (279:12): [True: 0, False: 0]
  Branch (279:12): [Folded - Ignored]
280
0
            Some(key.to_string())
281
        } else {
282
0
            key.strip_prefix(&self.key_prefix).map(ToString::to_string)
283
        }
284
0
    }
285
}
286
287
#[async_trait]
288
impl StoreDriver for ExperimentalMongoStore {
289
    async fn has_with_results(
290
        self: Pin<&Self>,
291
        keys: &[StoreKey<'_>],
292
        results: &mut [Option<u64>],
293
0
    ) -> Result<(), Error> {
294
0
        for (key, result) in keys.iter().zip(results.iter_mut()) {
295
            // Handle zero digest specially
296
0
            if is_zero_digest(key.borrow()) {
  Branch (296:16): [True: 0, False: 0]
  Branch (296:16): [Folded - Ignored]
297
0
                *result = Some(0);
298
0
                continue;
299
0
            }
300
301
0
            let encoded_key = self.encode_key(key);
302
0
            let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
303
304
0
            match self.cas_collection.find_one(filter).await {
305
0
                Ok(Some(doc)) => {
306
0
                    *result = doc.get_i64(SIZE_FIELD).ok().map(|v| v as u64);
307
                }
308
0
                Ok(None) => {
309
0
                    *result = None;
310
0
                }
311
0
                Err(e) => {
312
0
                    return Err(make_err!(
313
0
                        Code::Internal,
314
0
                        "MongoDB error in has_with_results: {e}"
315
0
                    ));
316
                }
317
            }
318
        }
319
0
        Ok(())
320
0
    }
321
322
    async fn list(
323
        self: Pin<&Self>,
324
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
325
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
326
0
    ) -> Result<u64, Error> {
327
0
        let mut filter = Document::new();
328
329
        // Build range query
330
0
        let mut key_filter = Document::new();
331
0
        match &range.0 {
332
0
            Bound::Included(start) => {
333
0
                let encoded = self.encode_key(start);
334
0
                key_filter.insert("$gte", encoded.as_ref());
335
0
            }
336
0
            Bound::Excluded(start) => {
337
0
                let encoded = self.encode_key(start);
338
0
                key_filter.insert("$gt", encoded.as_ref());
339
0
            }
340
0
            Bound::Unbounded => {}
341
        }
342
343
0
        match &range.1 {
344
0
            Bound::Included(end) => {
345
0
                let encoded = self.encode_key(end);
346
0
                key_filter.insert("$lte", encoded.as_ref());
347
0
            }
348
0
            Bound::Excluded(end) => {
349
0
                let encoded = self.encode_key(end);
350
0
                key_filter.insert("$lt", encoded.as_ref());
351
0
            }
352
0
            Bound::Unbounded => {}
353
        }
354
355
0
        if !key_filter.is_empty() {
  Branch (355:12): [True: 0, False: 0]
  Branch (355:12): [Folded - Ignored]
356
0
            filter.insert(KEY_FIELD, key_filter);
357
0
        }
358
359
        // Add prefix filter if needed
360
0
        if !self.key_prefix.is_empty() {
  Branch (360:12): [True: 0, False: 0]
  Branch (360:12): [Folded - Ignored]
361
0
            let regex_filter = doc! {
362
                KEY_FIELD: {
363
0
                    "$regex": format!("^{}", regex::escape(&self.key_prefix)),
364
                }
365
            };
366
0
            if filter.is_empty() {
  Branch (366:16): [True: 0, False: 0]
  Branch (366:16): [Folded - Ignored]
367
0
                filter = regex_filter;
368
0
            } else {
369
0
                filter = doc! { "$and": [filter, regex_filter] };
370
0
            }
371
0
        }
372
373
0
        let mut cursor = self
374
0
            .cas_collection
375
0
            .find(filter)
376
0
            .projection(doc! { KEY_FIELD: 1 })
377
0
            .await
378
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create cursor in list: {e}"))?;
379
380
0
        let mut count = 0u64;
381
0
        while let Some(doc) = cursor
  Branch (381:19): [True: 0, False: 0]
  Branch (381:19): [Folded - Ignored]
382
0
            .try_next()
383
0
            .await
384
0
            .map_err(|e| make_err!(Code::Internal, "Failed to get next document in list: {e}"))?
385
        {
386
0
            if let Ok(key) = doc.get_str(KEY_FIELD) {
  Branch (386:20): [True: 0, False: 0]
  Branch (386:20): [Folded - Ignored]
387
0
                if let Some(decoded_key) = self.decode_key(key) {
  Branch (387:24): [True: 0, False: 0]
  Branch (387:24): [Folded - Ignored]
388
0
                    let store_key = StoreKey::new_str(&decoded_key);
389
0
                    count += 1;
390
0
                    if !handler(&store_key) {
  Branch (390:24): [True: 0, False: 0]
  Branch (390:24): [Folded - Ignored]
391
0
                        break;
392
0
                    }
393
0
                }
394
0
            }
395
        }
396
397
0
        Ok(count)
398
0
    }
399
400
    async fn update(
401
        self: Pin<&Self>,
402
        key: StoreKey<'_>,
403
        mut reader: DropCloserReadHalf,
404
        upload_size: UploadSizeInfo,
405
0
    ) -> Result<(), Error> {
406
0
        let encoded_key = self.encode_key(&key);
407
408
        // Handle zero digest
409
0
        if is_zero_digest(key.borrow()) {
  Branch (409:12): [True: 0, False: 0]
  Branch (409:12): [Folded - Ignored]
410
0
            let chunk = reader.peek().await.map_err(|e| {
411
0
                make_err!(
412
0
                    Code::Internal,
413
                    "Failed to peek in ExperimentalMongoStore::update: {e}"
414
                )
415
0
            })?;
416
0
            if chunk.is_empty() {
  Branch (416:16): [True: 0, False: 0]
  Branch (416:16): [Folded - Ignored]
417
0
                reader.drain().await.map_err(|e| {
418
0
                    make_err!(
419
0
                        Code::Internal,
420
                        "Failed to drain in ExperimentalMongoStore::update: {e}"
421
                    )
422
0
                })?;
423
0
                return Ok(());
424
0
            }
425
0
        }
426
427
        // Special handling for MaxSize(0) - this should error if stream is closed
428
0
        if upload_size == UploadSizeInfo::MaxSize(0) {
  Branch (428:12): [True: 0, False: 0]
  Branch (428:12): [Folded - Ignored]
429
            // Try to read from the stream - if it's closed immediately, this should error
430
0
            match reader.recv().await {
431
0
                Ok(_chunk) => {
432
0
                    return Err(make_input_err!(
433
0
                        "Received data when MaxSize is 0 in MongoDB store"
434
0
                    ));
435
                }
436
0
                Err(e) => {
437
0
                    return Err(make_err!(
438
0
                        Code::InvalidArgument,
439
0
                        "Stream closed for MaxSize(0) upload: {e}"
440
0
                    ));
441
                }
442
            }
443
0
        }
444
445
        // Read all data into memory with proper EOF handling
446
0
        let mut data = Vec::new();
447
0
        while let Ok(chunk) = reader.recv().await {
  Branch (447:19): [True: 0, False: 0]
  Branch (447:19): [Folded - Ignored]
448
0
            if chunk.is_empty() {
  Branch (448:16): [True: 0, False: 0]
  Branch (448:16): [Folded - Ignored]
449
0
                break; // Empty chunk signals EOF
450
0
            }
451
0
            data.extend_from_slice(&chunk);
452
        }
453
454
0
        let size = data.len() as i64;
455
456
        // Create document
457
0
        let doc = doc! {
458
0
            KEY_FIELD: encoded_key.as_ref(),
459
0
            DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
460
0
                subtype: mongodb::bson::spec::BinarySubtype::Generic,
461
0
                bytes: data,
462
0
            }),
463
0
            SIZE_FIELD: size,
464
        };
465
466
        // Upsert the document
467
0
        self.cas_collection
468
0
            .update_one(
469
0
                doc! { KEY_FIELD: encoded_key.as_ref() },
470
0
                doc! { "$set": doc },
471
0
            )
472
0
            .upsert(true)
473
0
            .await
474
0
            .map_err(|e| make_err!(Code::Internal, "Failed to update document in MongoDB: {e}"))?;
475
476
0
        Ok(())
477
0
    }
478
479
    async fn get_part(
480
        self: Pin<&Self>,
481
        key: StoreKey<'_>,
482
        writer: &mut DropCloserWriteHalf,
483
        offset: u64,
484
        length: Option<u64>,
485
0
    ) -> Result<(), Error> {
486
        // Handle zero digest
487
0
        if is_zero_digest(key.borrow()) {
  Branch (487:12): [True: 0, False: 0]
  Branch (487:12): [Folded - Ignored]
488
0
            return writer.send_eof().map_err(|e| {
489
0
                make_err!(
490
0
                    Code::Internal,
491
                    "Failed to send zero EOF in mongo store get_part: {e}"
492
                )
493
0
            });
494
0
        }
495
496
0
        let encoded_key = self.encode_key(&key);
497
0
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
498
499
0
        let doc = self
500
0
            .cas_collection
501
0
            .find_one(filter)
502
0
            .await
503
0
            .map_err(|e| make_err!(Code::Internal, "Failed to find document in get_part: {e}"))?
504
0
            .ok_or_else(|| {
505
0
                make_err!(
506
0
                    Code::NotFound,
507
                    "Data not found in MongoDB store for digest: {key:?}"
508
                )
509
0
            })?;
510
511
0
        let data = match doc.get(DATA_FIELD) {
512
0
            Some(Bson::Binary(binary)) => &binary.bytes,
513
            _ => {
514
0
                return Err(make_err!(
515
0
                    Code::Internal,
516
0
                    "Invalid data field in MongoDB document"
517
0
                ));
518
            }
519
        };
520
521
0
        let offset = offset as usize;
522
0
        let data_len = data.len();
523
524
0
        if offset > data_len {
  Branch (524:12): [True: 0, False: 0]
  Branch (524:12): [Folded - Ignored]
525
0
            return writer.send_eof().map_err(|e| {
526
0
                make_err!(
527
0
                    Code::Internal,
528
                    "Failed to send EOF in mongo store get_part: {e}"
529
                )
530
0
            });
531
0
        }
532
533
0
        let end = if let Some(len) = length {
  Branch (533:26): [True: 0, False: 0]
  Branch (533:26): [Folded - Ignored]
534
0
            cmp::min(offset + len as usize, data_len)
535
        } else {
536
0
            data_len
537
        };
538
539
0
        if offset < end {
  Branch (539:12): [True: 0, False: 0]
  Branch (539:12): [Folded - Ignored]
540
0
            let chunk = &data[offset..end];
541
542
            // Send data in chunks
543
0
            for chunk_data in chunk.chunks(self.read_chunk_size) {
544
0
                writer.send(chunk_data.to_vec().into()).await.map_err(|e| {
545
0
                    make_err!(
546
0
                        Code::Internal,
547
                        "Failed to write data in ExperimentalMongoStore::get_part: {e}"
548
                    )
549
0
                })?;
550
            }
551
0
        }
552
553
0
        writer.send_eof().map_err(|e| {
554
0
            make_err!(
555
0
                Code::Internal,
556
                "Failed to write EOF in mongo store get_part: {e}"
557
            )
558
0
        })
559
0
    }
560
561
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
562
0
        self
563
0
    }
564
565
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send) {
566
0
        self
567
0
    }
568
569
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send> {
570
0
        self
571
0
    }
572
573
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
574
0
        registry.register_indicator(self);
575
0
    }
576
}
577
578
#[async_trait]
579
impl HealthStatusIndicator for ExperimentalMongoStore {
580
0
    fn get_name(&self) -> &'static str {
581
0
        "ExperimentalMongoStore"
582
0
    }
583
584
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
585
0
        match self.database.run_command(doc! { "ping": 1 }).await {
586
0
            Ok(_) => HealthStatus::new_ok(self, "Connection healthy".into()),
587
0
            Err(e) => HealthStatus::new_failed(
588
0
                self,
589
0
                format!("{namespace} - MongoDB connection error: {e}").into(),
590
            ),
591
        }
592
0
    }
593
}
594
595
// -------------------------------------------------------------------
596
//
597
// `ExperimentalMongoDB` scheduler implementation. Likely to change
598
// -------------------------------------------------------------------
599
600
/// An individual subscription to a key in `ExperimentalMongoDB`.
601
#[derive(Debug)]
602
pub struct ExperimentalMongoSubscription {
603
    receiver: Option<watch::Receiver<String>>,
604
    weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
605
}
606
607
impl SchedulerSubscription for ExperimentalMongoSubscription {
608
0
    async fn changed(&mut self) -> Result<(), Error> {
609
0
        let receiver = self.receiver.as_mut().ok_or_else(|| {
610
0
            make_err!(
611
0
                Code::Internal,
612
                "In ExperimentalMongoSubscription::changed::as_mut"
613
            )
614
0
        })?;
615
0
        receiver.changed().await.map_err(|_| {
616
0
            make_err!(
617
0
                Code::Internal,
618
                "In ExperimentalMongoSubscription::changed::changed"
619
            )
620
0
        })
621
0
    }
622
}
623
624
impl Drop for ExperimentalMongoSubscription {
625
0
    fn drop(&mut self) {
626
0
        let Some(receiver) = self.receiver.take() else {
  Branch (626:13): [True: 0, False: 0]
  Branch (626:13): [Folded - Ignored]
627
0
            warn!("ExperimentalMongoSubscription has already been dropped, nothing to do.");
628
0
            return;
629
        };
630
0
        let key = receiver.borrow().clone();
631
0
        drop(receiver);
632
633
0
        let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else {
  Branch (633:13): [True: 0, False: 0]
  Branch (633:13): [Folded - Ignored]
634
0
            return;
635
        };
636
0
        let mut subscribed_keys = subscribed_keys.write();
637
0
        let Some(value) = subscribed_keys.get(&key) else {
  Branch (637:13): [True: 0, False: 0]
  Branch (637:13): [Folded - Ignored]
638
0
            error!(
639
0
                "Key {key} was not found in subscribed keys when checking if it should be removed."
640
            );
641
0
            return;
642
        };
643
644
0
        if value.receiver_count() == 0 {
  Branch (644:12): [True: 0, False: 0]
  Branch (644:12): [Folded - Ignored]
645
0
            subscribed_keys.remove(key);
646
0
        }
647
0
    }
648
}
649
650
/// A publisher for a key in `ExperimentalMongoDB`.
651
#[derive(Debug)]
652
struct ExperimentalMongoSubscriptionPublisher {
653
    sender: Mutex<watch::Sender<String>>,
654
}
655
656
impl ExperimentalMongoSubscriptionPublisher {
657
0
    fn new(
658
0
        key: String,
659
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
660
0
    ) -> (Self, ExperimentalMongoSubscription) {
661
0
        let (sender, receiver) = watch::channel(key);
662
0
        let publisher = Self {
663
0
            sender: Mutex::new(sender),
664
0
        };
665
0
        let subscription = ExperimentalMongoSubscription {
666
0
            receiver: Some(receiver),
667
0
            weak_subscribed_keys,
668
0
        };
669
0
        (publisher, subscription)
670
0
    }
671
672
0
    fn subscribe(
673
0
        &self,
674
0
        weak_subscribed_keys: Weak<RwLock<StringPatriciaMap<Self>>>,
675
0
    ) -> ExperimentalMongoSubscription {
676
0
        let receiver = self.sender.lock().subscribe();
677
0
        ExperimentalMongoSubscription {
678
0
            receiver: Some(receiver),
679
0
            weak_subscribed_keys,
680
0
        }
681
0
    }
682
683
0
    fn receiver_count(&self) -> usize {
684
0
        self.sender.lock().receiver_count()
685
0
    }
686
687
0
    fn notify(&self) {
688
0
        self.sender.lock().send_modify(|_| {});
689
0
    }
690
}
691
692
#[derive(Debug)]
693
pub struct ExperimentalMongoSubscriptionManager {
694
    subscribed_keys: Arc<RwLock<StringPatriciaMap<ExperimentalMongoSubscriptionPublisher>>>,
695
    _subscription_spawn: JoinHandleDropGuard<()>,
696
}
697
698
impl ExperimentalMongoSubscriptionManager {
699
0
    pub fn new(database: Database, collection_name: String, key_prefix: String) -> Self {
700
0
        let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
701
0
        let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
702
703
        Self {
704
0
            subscribed_keys,
705
0
            _subscription_spawn: spawn!("mongo_subscribe_spawn", async move {
706
0
                let collection = database.collection::<Document>(&collection_name);
707
708
                loop {
709
                    // Try to create change stream
710
0
                    let change_stream_result = collection.watch().await;
711
712
0
                    match change_stream_result {
713
0
                        Ok(mut change_stream) => {
714
0
                            info!("MongoDB change stream connected");
715
716
0
                            while let Some(event_result) = change_stream.next().await {
  Branch (716:39): [True: 0, False: 0]
  Branch (716:39): [Folded - Ignored]
717
0
                                match event_result {
718
0
                                    Ok(event) => {
719
                                        use mongodb::change_stream::event::OperationType;
720
0
                                        match event.operation_type {
721
                                            OperationType::Insert
722
                                            | OperationType::Update
723
                                            | OperationType::Replace
724
                                            | OperationType::Delete => {
725
0
                                                if let Some(doc_key) = event.document_key {
  Branch (725:56): [True: 0, False: 0]
  Branch (725:56): [Folded - Ignored]
726
0
                                                    if let Ok(key) = doc_key.get_str(KEY_FIELD) {
  Branch (726:60): [True: 0, False: 0]
  Branch (726:60): [Folded - Ignored]
727
                                                        // Remove prefix if present
728
0
                                                        let key = if key_prefix.is_empty() {
  Branch (728:70): [True: 0, False: 0]
  Branch (728:70): [Folded - Ignored]
729
0
                                                            key
730
                                                        } else {
731
0
                                                            key.strip_prefix(&key_prefix)
732
0
                                                                .unwrap_or(key)
733
                                                        };
734
735
0
                                                        let Some(subscribed_keys) =
  Branch (735:61): [True: 0, False: 0]
  Branch (735:61): [Folded - Ignored]
736
0
                                                            subscribed_keys_weak.upgrade()
737
                                                        else {
738
0
                                                            warn!(
739
0
                                                                "Parent dropped, exiting ExperimentalMongoSubscriptionManager"
740
                                                            );
741
0
                                                            return;
742
                                                        };
743
744
0
                                                        let subscribed_keys_mux =
745
0
                                                            subscribed_keys.read();
746
0
                                                        subscribed_keys_mux
747
0
                                                                .common_prefix_values(key)
748
0
                                                                .for_each(ExperimentalMongoSubscriptionPublisher::notify);
749
0
                                                    }
750
0
                                                }
751
                                            }
752
0
                                            _ => {}
753
                                        }
754
                                    }
755
0
                                    Err(e) => {
756
0
                                        error!("Error in change stream: {e}");
757
0
                                        break;
758
                                    }
759
                                }
760
                            }
761
                        }
762
0
                        Err(e) => {
763
0
                            warn!("Failed to create change stream: {e}. Will retry in 5 seconds.");
764
                        }
765
                    }
766
767
                    // Check if parent is still alive
768
0
                    if subscribed_keys_weak.upgrade().is_none() {
  Branch (768:24): [True: 0, False: 0]
  Branch (768:24): [Folded - Ignored]
769
0
                        warn!("Parent dropped, exiting ExperimentalMongoSubscriptionManager");
770
0
                        return;
771
0
                    }
772
773
                    // Sleep before retry
774
0
                    sleep(Duration::from_secs(5)).await;
775
776
                    // Notify all subscribers on reconnection
777
0
                    if let Some(subscribed_keys) = subscribed_keys_weak.upgrade() {
  Branch (777:28): [True: 0, False: 0]
  Branch (777:28): [Folded - Ignored]
778
0
                        let subscribed_keys_mux = subscribed_keys.read();
779
0
                        for publisher in subscribed_keys_mux.values() {
780
0
                            publisher.notify();
781
0
                        }
782
0
                    }
783
                }
784
0
            }),
785
        }
786
0
    }
787
}
788
789
impl SchedulerSubscriptionManager for ExperimentalMongoSubscriptionManager {
790
    type Subscription = ExperimentalMongoSubscription;
791
792
0
    fn notify_for_test(&self, value: String) {
793
0
        let subscribed_keys_mux = self.subscribed_keys.read();
794
0
        subscribed_keys_mux
795
0
            .common_prefix_values(&value)
796
0
            .for_each(ExperimentalMongoSubscriptionPublisher::notify);
797
0
    }
798
799
0
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
800
0
    where
801
0
        K: SchedulerStoreKeyProvider,
802
    {
803
0
        let weak_subscribed_keys = Arc::downgrade(&self.subscribed_keys);
804
0
        let mut subscribed_keys = self.subscribed_keys.write();
805
0
        let key = key.get_key();
806
0
        let key_str = key.as_str();
807
808
0
        let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) {
  Branch (808:39): [Folded - Ignored]
  Branch (808:39): [Folded - Ignored]
809
0
            publisher.subscribe(weak_subscribed_keys)
810
        } else {
811
0
            let (publisher, subscription) = ExperimentalMongoSubscriptionPublisher::new(
812
0
                key_str.to_string(),
813
0
                weak_subscribed_keys,
814
0
            );
815
0
            subscribed_keys.insert(key_str, publisher);
816
0
            subscription
817
        };
818
819
0
        subscription
820
0
            .receiver
821
0
            .as_mut()
822
0
            .ok_or_else(|| {
823
0
                make_err!(
824
0
                    Code::Internal,
825
                    "Receiver should be set in ExperimentalMongoSubscriptionManager::subscribe"
826
                )
827
0
            })?
828
0
            .mark_changed();
829
830
0
        Ok(subscription)
831
0
    }
832
}
833
834
impl SchedulerStore for ExperimentalMongoStore {
835
    type SubscriptionManager = ExperimentalMongoSubscriptionManager;
836
837
0
    fn subscription_manager(&self) -> Result<Arc<ExperimentalMongoSubscriptionManager>, Error> {
838
0
        let mut subscription_manager = self.subscription_manager.lock();
839
0
        if let Some(subscription_manager) = &*subscription_manager {
  Branch (839:16): [True: 0, False: 0]
  Branch (839:16): [Folded - Ignored]
840
0
            Ok(subscription_manager.clone())
841
        } else {
842
0
            if !self.enable_change_streams {
  Branch (842:16): [True: 0, False: 0]
  Branch (842:16): [Folded - Ignored]
843
0
                return Err(make_input_err!(
844
0
                    "ExperimentalMongoStore must have change streams enabled for scheduler subscriptions"
845
0
                ));
846
0
            }
847
848
0
            let sub = Arc::new(ExperimentalMongoSubscriptionManager::new(
849
0
                self.database.clone(),
850
0
                self.scheduler_collection.name().to_string(),
851
0
                self.key_prefix.clone(),
852
            ));
853
0
            *subscription_manager = Some(sub.clone());
854
0
            Ok(sub)
855
        }
856
0
    }
857
858
0
    async fn update_data<T>(&self, data: T) -> Result<Option<u64>, Error>
859
0
    where
860
0
        T: SchedulerStoreDataProvider
861
0
            + SchedulerStoreKeyProvider
862
0
            + SchedulerCurrentVersionProvider
863
0
            + Send,
864
0
    {
865
0
        let key = data.get_key();
866
0
        let encoded_key = self.encode_key(&key);
867
0
        let maybe_index = data.get_indexes().map_err(|e| {
868
0
            make_err!(
869
0
                Code::Internal,
870
                "Error getting indexes in ExperimentalMongoStore::update_data: {e}"
871
            )
872
0
        })?;
873
874
0
        if <T as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (874:12): [Folded - Ignored]
  Branch (874:12): [True: 0, False: 0]
  Branch (874:12): [Folded - Ignored]
875
0
            let current_version = data.current_version();
876
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
877
0
                make_err!(
878
0
                    Code::Internal,
879
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
880
                )
881
0
            })?;
882
883
0
            let mut update_doc = doc! {
884
                "$set": {
885
0
                    DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
886
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
887
0
                        bytes: data_bytes.to_vec(),
888
0
                    }),
889
                },
890
                "$inc": {
891
                    VERSION_FIELD: 1i64,
892
                }
893
            };
894
895
            // Add indexes
896
0
            for (name, value) in maybe_index {
897
0
                update_doc.get_document_mut("$set").unwrap().insert(
898
0
                    name,
899
0
                    Bson::Binary(mongodb::bson::Binary {
900
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
901
0
                        bytes: value.to_vec(),
902
0
                    }),
903
0
                );
904
0
            }
905
906
0
            let filter = doc! {
907
0
                KEY_FIELD: encoded_key.as_ref(),
908
0
                VERSION_FIELD: current_version as i64,
909
            };
910
911
0
            match self
912
0
                .scheduler_collection
913
0
                .find_one_and_update(filter, update_doc)
914
0
                .upsert(true)
915
0
                .return_document(ReturnDocument::After)
916
0
                .await
917
            {
918
0
                Ok(Some(doc)) => {
919
0
                    let new_version = doc.get_i64(VERSION_FIELD).unwrap_or(1) as u64;
920
0
                    Ok(Some(new_version))
921
                }
922
0
                Ok(None) => Ok(None),
923
0
                Err(e) => Err(make_err!(
924
0
                    Code::Internal,
925
0
                    "MongoDB error in update_data: {e}"
926
0
                )),
927
            }
928
        } else {
929
0
            let data_bytes = data.try_into_bytes().map_err(|e| {
930
0
                make_err!(
931
0
                    Code::Internal,
932
                    "Could not convert value to bytes in ExperimentalMongoStore::update_data: {e}"
933
                )
934
0
            })?;
935
936
0
            let mut doc = doc! {
937
0
                KEY_FIELD: encoded_key.as_ref(),
938
0
                DATA_FIELD: Bson::Binary(mongodb::bson::Binary {
939
0
                    subtype: mongodb::bson::spec::BinarySubtype::Generic,
940
0
                    bytes: data_bytes.to_vec(),
941
0
                }),
942
            };
943
944
            // Add indexes
945
0
            for (name, value) in maybe_index {
946
0
                doc.insert(
947
0
                    name,
948
0
                    Bson::Binary(mongodb::bson::Binary {
949
0
                        subtype: mongodb::bson::spec::BinarySubtype::Generic,
950
0
                        bytes: value.to_vec(),
951
0
                    }),
952
0
                );
953
0
            }
954
955
0
            self.scheduler_collection
956
0
                .update_one(
957
0
                    doc! { KEY_FIELD: encoded_key.as_ref() },
958
0
                    doc! { "$set": doc },
959
0
                )
960
0
                .upsert(true)
961
0
                .await
962
0
                .map_err(|e| {
963
0
                    make_err!(Code::Internal, "Failed to update scheduler document: {e}")
964
0
                })?;
965
966
0
            Ok(Some(0))
967
        }
968
0
    }
969
970
0
    async fn search_by_index_prefix<K>(
971
0
        &self,
972
0
        index: K,
973
0
    ) -> Result<
974
0
        impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
975
0
        Error,
976
0
    >
977
0
    where
978
0
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
979
0
    {
980
0
        let index_value = index.index_value();
981
982
        // Create index if it doesn't exist
983
0
        let index_name = format!("{}_{}", K::KEY_PREFIX, K::INDEX_NAME);
984
0
        self.scheduler_collection
985
0
            .create_index(
986
0
                IndexModel::builder()
987
0
                    .keys(doc! { K::INDEX_NAME: 1 })
988
0
                    .options(IndexOptions::builder().name(index_name).build())
989
0
                    .build(),
990
0
            )
991
0
            .await
992
0
            .map_err(|e| make_err!(Code::Internal, "Failed to create scheduler index: {e}"))?;
993
994
        // Build filter
995
0
        let filter = doc! {
996
            K::INDEX_NAME: {
997
0
                "$regex": format!("^{}", regex::escape(index_value.as_ref())),
998
            }
999
        };
1000
1001
        // Add sort if specified
1002
0
        let find_options = if let Some(sort_key) = K::MAYBE_SORT_KEY {
  Branch (1002:35): [Folded - Ignored]
  Branch (1002:35): [True: 0, False: 0]
  Branch (1002:35): [True: 0, False: 0]
  Branch (1002:35): [Folded - Ignored]
1003
0
            FindOptions::builder().sort(doc! { sort_key: 1 }).build()
1004
        } else {
1005
0
            FindOptions::default()
1006
        };
1007
1008
0
        let cursor = self
1009
0
            .scheduler_collection
1010
0
            .find(filter)
1011
0
            .with_options(find_options)
1012
0
            .await
1013
0
            .map_err(|e| {
1014
0
                make_err!(
1015
0
                    Code::Internal,
1016
                    "Failed to create cursor in search_by_index_prefix: {e}"
1017
                )
1018
0
            })?;
1019
1020
0
        Ok(cursor.map(move |result| {
1021
0
            let doc = result.map_err(|e| {
1022
0
                make_err!(
1023
0
                    Code::Internal,
1024
                    "Error reading document in search_by_index_prefix: {e}"
1025
                )
1026
0
            })?;
1027
1028
0
            let data = match doc.get(DATA_FIELD) {
1029
0
                Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1030
                _ => {
1031
0
                    return Err(make_err!(
1032
0
                        Code::Internal,
1033
0
                        "Missing or invalid data field in search_by_index_prefix"
1034
0
                    ));
1035
                }
1036
            };
1037
1038
0
            let version = if <K as SchedulerIndexProvider>::Versioned::VALUE {
  Branch (1038:30): [Folded - Ignored]
  Branch (1038:30): [True: 0, False: 0]
  Branch (1038:30): [True: 0, False: 0]
  Branch (1038:30): [Folded - Ignored]
1039
0
                doc.get_i64(VERSION_FIELD).unwrap_or(0) as u64
1040
            } else {
1041
0
                0
1042
            };
1043
1044
0
            K::decode(version, data).map_err(|e| {
1045
0
                make_err!(
1046
0
                    Code::Internal,
1047
                    "Failed to decode in search_by_index_prefix: {e}"
1048
                )
1049
0
            })
1050
0
        }))
1051
0
    }
1052
1053
0
    async fn get_and_decode<K>(
1054
0
        &self,
1055
0
        key: K,
1056
0
    ) -> Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>
1057
0
    where
1058
0
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send,
1059
0
    {
1060
0
        let key = key.get_key();
1061
0
        let encoded_key = self.encode_key(&key);
1062
0
        let filter = doc! { KEY_FIELD: encoded_key.as_ref() };
1063
1064
0
        let doc = self
1065
0
            .scheduler_collection
1066
0
            .find_one(filter)
1067
0
            .await
1068
0
            .map_err(|e| {
1069
0
                make_err!(
1070
0
                    Code::Internal,
1071
                    "Failed to find document in get_and_decode: {e}"
1072
                )
1073
0
            })?;
1074
1075
0
        let Some(doc) = doc else {
  Branch (1075:13): [Folded - Ignored]
  Branch (1075:13): [True: 0, False: 0]
  Branch (1075:13): [Folded - Ignored]
1076
0
            return Ok(None);
1077
        };
1078
1079
0
        let data = match doc.get(DATA_FIELD) {
1080
0
            Some(Bson::Binary(binary)) => Bytes::from(binary.bytes.clone()),
1081
0
            _ => return Ok(None),
1082
        };
1083
1084
0
        let version = if <K as SchedulerStoreKeyProvider>::Versioned::VALUE {
  Branch (1084:26): [Folded - Ignored]
  Branch (1084:26): [True: 0, False: 0]
  Branch (1084:26): [Folded - Ignored]
1085
0
            doc.get_i64(VERSION_FIELD).unwrap_or(0) as u64
1086
        } else {
1087
0
            0
1088
        };
1089
1090
0
        Ok(Some(K::decode(version, data).map_err(|e| {
1091
0
            make_err!(Code::Internal, "Failed to decode in get_and_decode: {e}")
1092
0
        })?))
1093
0
    }
1094
}