Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/stores.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::time::Duration;
16
use std::sync::Arc;
17
18
use rand::Rng;
19
use serde::{Deserialize, Serialize};
20
21
use crate::serde_utils::{
22
    convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
23
    convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
24
    convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
25
    convert_vec_string_with_shellexpand,
26
};
27
28
/// Name of the store. This type will be used when referencing a store
29
/// in the `CasConfig::stores`'s map key.
30
pub type StoreRefName = String;
31
32
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
33
#[serde(rename_all = "snake_case")]
34
pub enum ConfigDigestHashFunction {
35
    /// Use the sha256 hash function.
36
    /// <https://en.wikipedia.org/wiki/SHA-2>
37
    Sha256,
38
39
    /// Use the blake3 hash function.
40
    /// <https://en.wikipedia.org/wiki/BLAKE_(hash_function)>
41
    Blake3,
42
}
43
44
#[derive(Serialize, Deserialize, Debug, Clone)]
45
#[serde(rename_all = "snake_case")]
46
pub enum StoreSpec {
47
    /// Memory store will store all data in a hashmap in memory.
48
    ///
49
    /// **Example JSON Config:**
50
    /// ```json
51
    /// "memory": {
52
    ///     "eviction_policy": {
53
    ///       // 10mb.
54
    ///       "max_bytes": 10000000,
55
    ///     }
56
    ///   }
57
    /// }
58
    /// ```
59
    ///
60
    Memory(MemorySpec),
61
62
    /// A generic blob store that will store files on the cloud
63
    /// provider. This configuration will never delete files, so you are
64
    /// responsible for purging old files in other ways.
65
    /// It supports the following backends:
66
    ///
67
    /// 1. **Amazon S3:**
68
    ///    S3 store will use Amazon's S3 service as a backend to store
69
    ///    the files. This configuration can be used to share files
70
    ///    across multiple instances. Uses system certificates for TLS
71
    ///    verification via `rustls-platform-verifier`.
72
    ///
73
    ///   **Example JSON Config:**
74
    ///   ```json
75
    ///   "experimental_cloud_object_store": {
76
    ///     "provider": "aws",
77
    ///     "region": "eu-north-1",
78
    ///     "bucket": "crossplane-bucket-af79aeca9",
79
    ///     "key_prefix": "test-prefix-index/",
80
    ///     "retry": {
81
    ///       "max_retries": 6,
82
    ///       "delay": 0.3,
83
    ///       "jitter": 0.5
84
    ///     },
85
    ///     "multipart_max_concurrent_uploads": 10
86
    ///   }
87
    ///   ```
88
    ///
89
    /// 2. **Google Cloud Storage:**
90
    ///    GCS store uses Google's GCS service as a backend to store
91
    ///    the files. This configuration can be used to share files
92
    ///    across multiple instances.
93
    ///
94
    ///   **Example JSON Config:**
95
    ///   ```json
96
    ///   "experimental_cloud_object_store": {
97
    ///     "provider": "gcs",
98
    ///     "bucket": "test-bucket",
99
    ///     "key_prefix": "test-prefix-index/",
100
    ///     "retry": {
101
    ///       "max_retries": 6,
102
    ///       "delay": 0.3,
103
    ///       "jitter": 0.5
104
    ///     },
105
    ///     "multipart_max_concurrent_uploads": 10
106
    ///   }
107
    ///   ```
108
    ///
109
    /// 3. **`NetApp` ONTAP S3**
110
    ///    `NetApp` ONTAP S3 store will use ONTAP's S3-compatible storage as a backend
111
    ///    to store files. This store is specifically configured for ONTAP's S3 requirements
112
    ///    including custom TLS configuration, credentials management, and proper vserver
113
    ///    configuration.
114
    ///
115
    ///    This store uses AWS environment variables for credentials:
116
    ///    - `AWS_ACCESS_KEY_ID`
117
    ///    - `AWS_SECRET_ACCESS_KEY`
118
    ///    - `AWS_DEFAULT_REGION`
119
    ///
120
    ///    **Example JSON Config:**
121
    ///    ```json
122
    ///    "experimental_cloud_object_store": {
123
    ///      "provider": "ontap"
124
    ///      "endpoint": "https://ontap-s3-endpoint:443",
125
    ///      "vserver_name": "your-vserver",
126
    ///      "bucket": "your-bucket",
127
    ///      "root_certificates": "/path/to/certs.pem",  // Optional
128
    ///      "key_prefix": "test-prefix/",               // Optional
129
    ///      "retry": {
130
    ///        "max_retries": 6,
131
    ///        "delay": 0.3,
132
    ///        "jitter": 0.5
133
    ///      },
134
    ///      "multipart_max_concurrent_uploads": 10
135
    ///    }
136
    ///    ```
137
    ExperimentalCloudObjectStore(ExperimentalCloudObjectSpec),
138
139
    /// ONTAP S3 Existence Cache provides a caching layer on top of the ONTAP S3 store
140
    /// to optimize repeated existence checks. It maintains an in-memory cache of object
141
    /// digests and periodically syncs this cache to disk for persistence.
142
    ///
143
    /// The cache helps reduce latency for repeated calls to check object existence,
144
    /// while still ensuring eventual consistency with the underlying ONTAP S3 store.
145
    ///
146
    /// Example JSON Config:
147
    /// ```json
148
    /// "ontap_s3_existence_cache": {
149
    ///   "index_path": "/path/to/cache/index.json",
150
    ///   "sync_interval_seconds": 300,
151
    ///   "backend": {
152
    ///     "endpoint": "https://ontap-s3-endpoint:443",
153
    ///     "vserver_name": "your-vserver",
154
    ///     "bucket": "your-bucket",
155
    ///     "key_prefix": "test-prefix/"
156
    ///   }
157
    /// }
158
    /// ```
159
    ///
160
    OntapS3ExistenceCache(Box<OntapS3ExistenceCacheSpec>),
161
162
    /// Verify store is used to apply verifications to an underlying
163
    /// store implementation. It is strongly encouraged to validate
164
    /// as much data as you can before accepting data from a client,
165
    /// failing to do so may cause the data in the store to be
166
    /// populated with invalid data causing all kinds of problems.
167
    ///
168
    /// The suggested configuration is to have the CAS validate the
169
    /// hash and size and the AC validate nothing.
170
    ///
171
    /// **Example JSON Config:**
172
    /// ```json
173
    /// "verify": {
174
    ///   "memory": {
175
    ///     "eviction_policy": {
176
    ///       "max_bytes": 500000000 // 500mb.
177
    ///     }
178
    ///   },
179
    ///   "verify_size": true,
180
    ///   "hash_verification_function": "sha256"
181
    /// }
182
    /// ```
183
    ///
184
    Verify(Box<VerifySpec>),
185
186
    /// Completeness checking store verifies if the
187
    /// output files & folders exist in the CAS before forwarding
188
    /// the request to the underlying store.
189
    /// Note: This store should only be used on AC stores.
190
    ///
191
    /// **Example JSON Config:**
192
    /// ```json
193
    /// "completeness_checking": {
194
    ///     "backend": {
195
    ///       "filesystem": {
196
    ///         "content_path": "~/.cache/nativelink/content_path-ac",
197
    ///         "temp_path": "~/.cache/nativelink/tmp_path-ac",
198
    ///         "eviction_policy": {
199
    ///           // 500mb.
200
    ///           "max_bytes": 500000000,
201
    ///         }
202
    ///       }
203
    ///     },
204
    ///     "cas_store": {
205
    ///       "ref_store": {
206
    ///         "name": "CAS_MAIN_STORE"
207
    ///       }
208
    ///     }
209
    ///   }
210
    /// ```
211
    ///
212
    CompletenessChecking(Box<CompletenessCheckingSpec>),
213
214
    /// A compression store that will compress the data inbound and
215
    /// outbound. There will be a non-trivial cost to compress and
216
    /// decompress the data, but in many cases if the final store is
217
    /// a store that requires network transport and/or storage space
218
    /// is a concern it is often faster and more efficient to use this
219
    /// store before those stores.
220
    ///
221
    /// **Example JSON Config:**
222
    /// ```json
223
    /// "compression": {
224
    ///     "compression_algorithm": {
225
    ///       "lz4": {}
226
    ///     },
227
    ///     "backend": {
228
    ///       "filesystem": {
229
    ///         "content_path": "/tmp/nativelink/data/content_path-cas",
230
    ///         "temp_path": "/tmp/nativelink/data/tmp_path-cas",
231
    ///         "eviction_policy": {
232
    ///           // 2gb.
233
    ///           "max_bytes": 2000000000,
234
    ///         }
235
    ///       }
236
    ///     }
237
    ///   }
238
    /// ```
239
    ///
240
    Compression(Box<CompressionSpec>),
241
242
    /// A dedup store will take the inputs and run a rolling hash
243
    /// algorithm on them to slice the input into smaller parts then
244
    /// run a sha256 algorithm on the slice and if the object doesn't
245
    /// already exist, upload the slice to the `content_store` using
246
    /// a new digest of just the slice. Once all parts exist, an
247
    /// Action-Cache-like digest will be built and uploaded to the
248
    /// `index_store` which will contain a reference to each
249
    /// chunk/digest of the uploaded file. Downloading a request will
250
    /// first grab the index from the `index_store`, and forward the
251
    /// download content of each chunk as if it were one file.
252
    ///
253
    /// This store is exceptionally good when the following conditions
254
    /// are met:
255
    /// * Content is mostly the same (inserts, updates, deletes are ok)
256
    /// * Content is not compressed or encrypted
257
    /// * Uploading or downloading from `content_store` is the bottleneck.
258
    ///
259
    /// Note: This store pairs well when used with `CompressionSpec` as
260
    /// the `content_store`, but never put `DedupSpec` as the backend of
261
    /// `CompressionSpec` as it will negate all the gains.
262
    ///
263
    /// Note: When running `.has()` on this store, it will only check
264
    /// to see if the entry exists in the `index_store` and not check
265
    /// if the individual chunks exist in the `content_store`.
266
    ///
267
    /// **Example JSON Config:**
268
    /// ```json
269
    /// "dedup": {
270
    ///     "index_store": {
271
    ///       "memory_store": {
272
    ///         "max_size": 1000000000, // 1GB
273
    ///         "eviction_policy": "LeastRecentlyUsed"
274
    ///       }
275
    ///     },
276
    ///     "content_store": {
277
    ///       "compression": {
278
    ///         "compression_algorithm": {
279
    ///           "lz4": {}
280
    ///         },
281
    ///         "backend": {
282
    ///           "fast_slow": {
283
    ///             "fast": {
284
    ///               "memory_store": {
285
    ///                 "max_size": 500000000, // 500MB
286
    ///                 "eviction_policy": "LeastRecentlyUsed"
287
    ///               }
288
    ///             },
289
    ///             "slow": {
290
    ///               "filesystem": {
291
    ///                 "content_path": "/tmp/nativelink/data/content_path-content",
292
    ///                 "temp_path": "/tmp/nativelink/data/tmp_path-content",
293
    ///                 "eviction_policy": {
294
    ///                   "max_bytes": 2000000000 // 2gb.
295
    ///                 }
296
    ///               }
297
    ///             }
298
    ///           }
299
    ///         }
300
    ///       }
301
    ///     }
302
    ///   }
303
    /// ```
304
    ///
305
    Dedup(Box<DedupSpec>),
306
307
    /// Existence store will wrap around another store and cache calls
308
    /// to has so that subsequent `has_with_results` calls will be
309
    /// faster. This is useful for cases when you have a store that
310
    /// is slow to respond to has calls.
311
    /// Note: This store should only be used on CAS stores.
312
    ///
313
    /// **Example JSON Config:**
314
    /// ```json
315
    /// "existence_cache": {
316
    ///     "backend": {
317
    ///       "memory": {
318
    ///         "eviction_policy": {
319
    ///           // 500mb.
320
    ///           "max_bytes": 500000000,
321
    ///         }
322
    ///       }
323
    ///     },
324
    ///     "cas_store": {
325
    ///       "ref_store": {
326
    ///         "name": "CAS_MAIN_STORE"
327
    ///       }
328
    ///     }
329
    ///   }
330
    /// ```
331
    ///
332
    ExistenceCache(Box<ExistenceCacheSpec>),
333
334
    /// `FastSlow` store will first try to fetch the data from the `fast`
335
    /// store and then if it does not exist try the `slow` store.
336
    /// When the object does exist in the `slow` store, it will copy
337
    /// the data to the `fast` store while returning the data.
338
    /// This store should be thought of as a store that "buffers"
339
    /// the data to the `fast` store.
340
    /// On uploads it will mirror data to both `fast` and `slow` stores.
341
    ///
342
    /// WARNING: If you need data to always exist in the `slow` store
343
    /// for something like remote execution, be careful because this
344
    /// store will never check to see if the objects exist in the
345
    /// `slow` store if it exists in the `fast` store (ie: it assumes
346
    /// that if an object exists `fast` store it will exist in `slow`
347
    /// store).
348
    ///
349
    /// ***Example JSON Config:***
350
    /// ```json
351
    /// "fast_slow": {
352
    ///     "fast": {
353
    ///       "filesystem": {
354
    ///         "content_path": "/tmp/nativelink/data/content_path-index",
355
    ///         "temp_path": "/tmp/nativelink/data/tmp_path-index",
356
    ///         "eviction_policy": {
357
    ///           // 500mb.
358
    ///           "max_bytes": 500000000,
359
    ///         }
360
    ///       }
361
    ///     },
362
    ///     "slow": {
363
    ///       "filesystem": {
364
    ///         "content_path": "/tmp/nativelink/data/content_path-index",
365
    ///         "temp_path": "/tmp/nativelink/data/tmp_path-index",
366
    ///         "eviction_policy": {
367
    ///           // 500mb.
368
    ///           "max_bytes": 500000000,
369
    ///         }
370
    ///       }
371
    ///     }
372
    ///   }
373
    /// ```
374
    ///
375
    FastSlow(Box<FastSlowSpec>),
376
377
    /// Shards the data to multiple stores. This is useful for cases
378
    /// when you want to distribute the load across multiple stores.
379
    /// The digest hash is used to determine which store to send the
380
    /// data to.
381
    ///
382
    /// **Example JSON Config:**
383
    /// ```json
384
    /// "shard": {
385
    ///     "stores": [
386
    ///         "memory": {
387
    ///             "eviction_policy": {
388
    ///                 // 10mb.
389
    ///                 "max_bytes": 10000000
390
    ///             },
391
    ///             "weight": 1
392
    ///         }
393
    ///     ]
394
    /// }
395
    /// ```
396
    ///
397
    Shard(ShardSpec),
398
399
    /// Stores the data on the filesystem. This store is designed for
400
    /// local persistent storage. Restarts of this program should restore
401
    /// the previous state, meaning anything uploaded will be persistent
402
    /// as long as the filesystem integrity holds.
403
    ///
404
    /// **Example JSON Config:**
405
    /// ```json
406
    /// "filesystem": {
407
    ///     "content_path": "/tmp/nativelink/data-worker-test/content_path-cas",
408
    ///     "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas",
409
    ///     "eviction_policy": {
410
    ///       // 10gb.
411
    ///       "max_bytes": 10000000000,
412
    ///     }
413
    /// }
414
    /// ```
415
    ///
416
    Filesystem(FilesystemSpec),
417
418
    /// Store used to reference a store in the root store manager.
419
    /// This is useful for cases when you want to share a store in different
420
    /// nested stores. Example, you may want to share the same memory store
421
    /// used for the action cache, but use a `FastSlowSpec` and have the fast
422
    /// store also share the memory store for efficiency.
423
    ///
424
    /// **Example JSON Config:**
425
    /// ```json
426
    /// "ref_store": {
427
    ///     "name": "FS_CONTENT_STORE"
428
    /// }
429
    /// ```
430
    ///
431
    RefStore(RefSpec),
432
433
    /// Uses the size field of the digest to separate which store to send the
434
    /// data. This is useful for cases when you'd like to put small objects
435
    /// in one store and large objects in another store. This should only be
436
    /// used if the size field is the real size of the content, in other
437
    /// words, don't use on AC (Action Cache) stores. Any store where you can
438
    /// safely use `VerifySpec.verify_size = true`, this store should be safe
439
    /// to use (ie: CAS stores).
440
    ///
441
    /// **Example JSON Config:**
442
    /// ```json
443
    /// "size_partitioning": {
444
    ///     "size": 134217728, // 128mib.
445
    ///     "lower_store": {
446
    ///       "memory": {
447
    ///         "eviction_policy": {
448
    ///           "max_bytes": "${NATIVELINK_CAS_MEMORY_CONTENT_LIMIT:-100000000}"
449
    ///         }
450
    ///       }
451
    ///     },
452
    ///     "upper_store": {
453
    ///       /// This store discards data larger than 128mib.
454
    ///       "noop": {}
455
    ///     }
456
    ///   }
457
    /// ```
458
    ///
459
    SizePartitioning(Box<SizePartitioningSpec>),
460
461
    /// This store will pass-through calls to another GRPC store. This store
462
    /// is not designed to be used as a sub-store of another store, but it
463
    /// does satisfy the interface and will likely work.
464
    ///
465
    /// One major GOTCHA is that some stores use a special function on this
466
    /// store to get the size of the underlying object, which is only reliable
467
    /// when this store is serving the a CAS store, not an AC store. If using
468
    /// this store directly without being a child of any store there are no
469
    /// side effects and is the most efficient way to use it.
470
    ///
471
    /// **Example JSON Config:**
472
    /// ```json
473
    /// "grpc": {
474
    ///     "instance_name": "main",
475
    ///     "endpoints": [
476
    ///       {"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
477
    ///     ],
478
    ///     "store_type": "ac"
479
    ///   }
480
    /// ```
481
    ///
482
    Grpc(GrpcSpec),
483
484
    /// Stores data in any stores compatible with Redis APIs.
485
    ///
486
    /// Pairs well with `SizePartitioning` and/or `FastSlow` stores.
487
    /// Ideal for accepting small object sizes as most redis store
488
    /// services have a max file upload of between 256Mb-512Mb.
489
    ///
490
    /// **Example JSON Config:**
491
    /// ```json
492
    /// "redis_store": {
493
    ///     "addresses": [
494
    ///         "redis://127.0.0.1:6379/",
495
    ///     ]
496
    /// }
497
    /// ```
498
    ///
499
    RedisStore(RedisSpec),
500
501
    /// Noop store is a store that sends streams into the void and all data
502
    /// retrieval will return 404 (`NotFound`). This can be useful for cases
503
    /// where you may need to partition your data and part of your data needs
504
    /// to be discarded.
505
    ///
506
    /// **Example JSON Config:**
507
    /// ```json
508
    /// "noop": {}
509
    /// ```
510
    ///
511
    Noop(NoopSpec),
512
513
    /// Experimental `MongoDB` store implementation.
514
    ///
515
    /// This store uses `MongoDB` as a backend for storing data. It supports
516
    /// both CAS (Content Addressable Storage) and scheduler data with
517
    /// optional change streams for real-time updates.
518
    ///
519
    /// **Example JSON Config:**
520
    /// ```json
521
    /// "experimental_mongo": {
522
    ///     "connection_string": "mongodb://localhost:27017",
523
    ///     "database": "nativelink",
524
    ///     "cas_collection": "cas",
525
    ///     "key_prefix": "cas:",
526
    ///     "read_chunk_size": 65536,
527
    ///     "max_concurrent_uploads": 10,
528
    ///     "enable_change_streams": false
529
    /// }
530
    /// ```
531
    ///
532
    ExperimentalMongo(ExperimentalMongoSpec),
533
}
534
535
/// Configuration for an individual shard of the store.
536
#[derive(Serialize, Deserialize, Debug, Clone)]
537
#[serde(deny_unknown_fields)]
538
pub struct ShardConfig {
539
    /// Store to shard the data to.
540
    pub store: StoreSpec,
541
542
    /// The weight of the store. This is used to determine how much data
543
    /// should be sent to the store. The actual percentage is the sum of
544
    /// all the store's weights divided by the individual store's weight.
545
    ///
546
    /// Default: 1
547
    pub weight: Option<u32>,
548
}
549
550
#[derive(Serialize, Deserialize, Debug, Clone)]
551
#[serde(deny_unknown_fields)]
552
pub struct ShardSpec {
553
    /// Stores to shard the data to.
554
    pub stores: Vec<ShardConfig>,
555
}
556
557
#[derive(Serialize, Deserialize, Debug, Clone)]
558
#[serde(deny_unknown_fields)]
559
pub struct SizePartitioningSpec {
560
    /// Size to partition the data on.
561
    #[serde(deserialize_with = "convert_data_size_with_shellexpand")]
562
    pub size: u64,
563
564
    /// Store to send data when object is < (less than) size.
565
    pub lower_store: StoreSpec,
566
567
    /// Store to send data when object is >= (less than eq) size.
568
    pub upper_store: StoreSpec,
569
}
570
571
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
572
#[serde(deny_unknown_fields)]
573
pub struct RefSpec {
574
    /// Name of the store under the root "stores" config object.
575
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
576
    pub name: String,
577
}
578
579
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
580
#[serde(deny_unknown_fields)]
581
pub struct FilesystemSpec {
582
    /// Path on the system where to store the actual content. This is where
583
    /// the bulk of the data will be placed.
584
    /// On service bootup this folder will be scanned and all files will be
585
    /// added to the cache. In the event one of the files doesn't match the
586
    /// criteria, the file will be deleted.
587
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
588
    pub content_path: String,
589
590
    /// A temporary location of where files that are being uploaded or
591
    /// deleted will be placed while the content cannot be guaranteed to be
592
    /// accurate. This location must be on the same block device as
593
    /// `content_path` so atomic moves can happen (ie: move without copy).
594
    /// All files in this folder will be deleted on every startup.
595
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
596
    pub temp_path: String,
597
598
    /// Buffer size to use when reading files. Generally this should be left
599
    /// to the default value except for testing.
600
    /// Default: 32k.
601
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
602
    pub read_buffer_size: u32,
603
604
    /// Policy used to evict items out of the store. Failure to set this
605
    /// value will cause items to never be removed from the store causing
606
    /// infinite memory usage.
607
    pub eviction_policy: Option<EvictionPolicy>,
608
609
    /// The block size of the filesystem for the running machine
610
    /// value is used to determine an entry's actual size on disk consumed
611
    /// For a 4KB block size filesystem, a 1B file actually consumes 4KB
612
    /// Default: 4096
613
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
614
    pub block_size: u64,
615
}
616
617
// NetApp ONTAP S3 Spec
618
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
619
#[serde(deny_unknown_fields)]
620
pub struct ExperimentalOntapS3Spec {
621
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
622
    pub endpoint: String,
623
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
624
    pub vserver_name: String,
625
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
626
    pub bucket: String,
627
    #[serde(default)]
628
    pub root_certificates: Option<String>,
629
630
    /// Common retry and upload configuration
631
    #[serde(flatten)]
632
    pub common: CommonObjectSpec,
633
}
634
635
#[derive(Serialize, Deserialize, Debug, Clone)]
636
#[serde(deny_unknown_fields)]
637
pub struct OntapS3ExistenceCacheSpec {
638
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
639
    pub index_path: String,
640
    #[serde(deserialize_with = "convert_numeric_with_shellexpand")]
641
    pub sync_interval_seconds: u32,
642
    pub backend: Box<ExperimentalOntapS3Spec>,
643
}
644
645
#[derive(Serialize, Deserialize, Debug, Clone)]
646
#[serde(deny_unknown_fields)]
647
pub struct FastSlowSpec {
648
    /// Fast store that will be attempted to be contacted before reaching
649
    /// out to the `slow` store.
650
    pub fast: StoreSpec,
651
652
    /// If the object does not exist in the `fast` store it will try to
653
    /// get it from this store.
654
    pub slow: StoreSpec,
655
}
656
657
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
658
#[serde(deny_unknown_fields)]
659
pub struct MemorySpec {
660
    /// Policy used to evict items out of the store. Failure to set this
661
    /// value will cause items to never be removed from the store causing
662
    /// infinite memory usage.
663
    pub eviction_policy: Option<EvictionPolicy>,
664
}
665
666
#[derive(Serialize, Deserialize, Debug, Clone)]
667
#[serde(deny_unknown_fields)]
668
pub struct DedupSpec {
669
    /// Store used to store the index of each dedup slice. This store
670
    /// should generally be fast and small.
671
    pub index_store: StoreSpec,
672
673
    /// The store where the individual chunks will be uploaded. This
674
    /// store should generally be the slower & larger store.
675
    pub content_store: StoreSpec,
676
677
    /// Minimum size that a chunk will be when slicing up the content.
678
    /// Note: This setting can be increased to improve performance
679
    /// because it will actually not check this number of bytes when
680
    /// deciding where to partition the data.
681
    ///
682
    /// Default: 65536 (64k)
683
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
684
    pub min_size: u32,
685
686
    /// A best-effort attempt will be made to keep the average size
687
    /// of the chunks to this number. It is not a guarantee, but a
688
    /// slight attempt will be made.
689
    ///
690
    /// This value will also be about the threshold used to determine
691
    /// if we should even attempt to dedup the entry or just forward
692
    /// it directly to the `content_store` without an index. The actual
693
    /// value will be about `normal_size * 1.3` due to implementation
694
    /// details.
695
    ///
696
    /// Default: 262144 (256k)
697
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
698
    pub normal_size: u32,
699
700
    /// Maximum size a chunk is allowed to be.
701
    ///
702
    /// Default: 524288 (512k)
703
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
704
    pub max_size: u32,
705
706
    /// Due to implementation detail, we want to prefer to download
707
    /// the first chunks of the file so we can stream the content
708
    /// out and free up some of our buffers. This configuration
709
    /// will be used to to restrict the number of concurrent chunk
710
    /// downloads at a time per `get()` request.
711
    ///
712
    /// This setting will also affect how much memory might be used
713
    /// per `get()` request. Estimated worst case memory per `get()`
714
    /// request is: `max_concurrent_fetch_per_get * max_size`.
715
    ///
716
    /// Default: 10
717
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
718
    pub max_concurrent_fetch_per_get: u32,
719
}
720
721
#[derive(Serialize, Deserialize, Debug, Clone)]
722
#[serde(deny_unknown_fields)]
723
pub struct ExistenceCacheSpec {
724
    /// The underlying store wrap around. All content will first flow
725
    /// through self before forwarding to backend. In the event there
726
    /// is an error detected in self, the connection to the backend
727
    /// will be terminated, and early termination should always cause
728
    /// updates to fail on the backend.
729
    pub backend: StoreSpec,
730
731
    /// Policy used to evict items out of the store. Failure to set this
732
    /// value will cause items to never be removed from the store causing
733
    /// infinite memory usage.
734
    pub eviction_policy: Option<EvictionPolicy>,
735
}
736
737
#[derive(Serialize, Deserialize, Debug, Clone)]
738
#[serde(deny_unknown_fields)]
739
pub struct VerifySpec {
740
    /// The underlying store wrap around. All content will first flow
741
    /// through self before forwarding to backend. In the event there
742
    /// is an error detected in self, the connection to the backend
743
    /// will be terminated, and early termination should always cause
744
    /// updates to fail on the backend.
745
    pub backend: StoreSpec,
746
747
    /// If set the store will verify the size of the data before accepting
748
    /// an upload of data.
749
    ///
750
    /// This should be set to false for AC, but true for CAS stores.
751
    #[serde(default)]
752
    pub verify_size: bool,
753
754
    /// If the data should be hashed and verify that the key matches the
755
    /// computed hash. The hash function is automatically determined based
756
    /// request and if not set will use the global default.
757
    ///
758
    /// This should be set to None for AC, but hashing function like `sha256` for CAS stores.
759
    #[serde(default)]
760
    pub verify_hash: bool,
761
}
762
763
#[derive(Serialize, Deserialize, Debug, Clone)]
764
#[serde(deny_unknown_fields)]
765
pub struct CompletenessCheckingSpec {
766
    /// The underlying store that will have it's results validated before sending to client.
767
    pub backend: StoreSpec,
768
769
    /// When a request is made, the results are decoded and all output digests/files are verified
770
    /// to exist in this CAS store before returning success.
771
    pub cas_store: StoreSpec,
772
}
773
774
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq, Clone, Copy)]
775
#[serde(deny_unknown_fields)]
776
pub struct Lz4Config {
777
    /// Size of the blocks to compress.
778
    /// Higher values require more ram, but might yield slightly better
779
    /// compression ratios.
780
    ///
781
    /// Default: 65536 (64k).
782
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
783
    pub block_size: u32,
784
785
    /// Maximum size allowed to attempt to deserialize data into.
786
    /// This is needed because the `block_size` is embedded into the data
787
    /// so if there was a bad actor, they could upload an extremely large
788
    /// `block_size`'ed entry and we'd allocate a large amount of memory
789
    /// when retrieving the data. To prevent this from happening, we
790
    /// allow you to specify the maximum that we'll attempt deserialize.
791
    ///
792
    /// Default: value in `block_size`.
793
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
794
    pub max_decode_block_size: u32,
795
}
796
797
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
798
#[serde(rename_all = "snake_case")]
799
pub enum CompressionAlgorithm {
800
    /// LZ4 compression algorithm is extremely fast for compression and
801
    /// decompression, however does not perform very well in compression
802
    /// ratio. In most cases build artifacts are highly compressible, however
803
    /// lz4 is quite good at aborting early if the data is not deemed very
804
    /// compressible.
805
    ///
806
    /// see: <https://lz4.github.io/lz4/>
807
    Lz4(Lz4Config),
808
}
809
810
#[derive(Serialize, Deserialize, Debug, Clone)]
811
#[serde(deny_unknown_fields)]
812
pub struct CompressionSpec {
813
    /// The underlying store wrap around. All content will first flow
814
    /// through self before forwarding to backend. In the event there
815
    /// is an error detected in self, the connection to the backend
816
    /// will be terminated, and early termination should always cause
817
    /// updates to fail on the backend.
818
    pub backend: StoreSpec,
819
820
    /// The compression algorithm to use.
821
    pub compression_algorithm: CompressionAlgorithm,
822
}
823
824
/// Eviction policy always works on LRU (Least Recently Used). Any time an entry
825
/// is touched it updates the timestamp. Inserts and updates will execute the
826
/// eviction policy removing any expired entries and/or the oldest entries
827
/// until the store size becomes smaller than `max_bytes`.
828
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
829
#[serde(deny_unknown_fields)]
830
pub struct EvictionPolicy {
831
    /// Maximum number of bytes before eviction takes place.
832
    /// Default: 0. Zero means never evict based on size.
833
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
834
    pub max_bytes: usize,
835
836
    /// When eviction starts based on hitting `max_bytes`, continue until
837
    /// `max_bytes - evict_bytes` is met to create a low watermark.  This stops
838
    /// operations from thrashing when the store is close to the limit.
839
    /// Default: 0
840
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
841
    pub evict_bytes: usize,
842
843
    /// Maximum number of seconds for an entry to live since it was last
844
    /// accessed before it is evicted.
845
    /// Default: 0. Zero means never evict based on time.
846
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
847
    pub max_seconds: u32,
848
849
    /// Maximum size of the store before an eviction takes place.
850
    /// Default: 0. Zero means never evict based on count.
851
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
852
    pub max_count: u64,
853
}
854
855
#[derive(Serialize, Deserialize, Debug, Clone)]
856
#[serde(tag = "provider", rename_all = "snake_case")]
857
pub enum ExperimentalCloudObjectSpec {
858
    Aws(ExperimentalAwsSpec),
859
    Gcs(ExperimentalGcsSpec),
860
    Ontap(ExperimentalOntapS3Spec),
861
}
862
863
impl Default for ExperimentalCloudObjectSpec {
864
0
    fn default() -> Self {
865
0
        Self::Aws(ExperimentalAwsSpec::default())
866
0
    }
867
}
868
869
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
870
#[serde(deny_unknown_fields)]
871
pub struct ExperimentalAwsSpec {
872
    /// S3 region. Usually us-east-1, us-west-2, af-south-1, exc...
873
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
874
    pub region: String,
875
876
    /// Bucket name to use as the backend.
877
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
878
    pub bucket: String,
879
880
    /// Common retry and upload configuration
881
    #[serde(flatten)]
882
    pub common: CommonObjectSpec,
883
}
884
885
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
886
#[serde(deny_unknown_fields)]
887
pub struct ExperimentalGcsSpec {
888
    /// Bucket name to use as the backend.
889
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
890
    pub bucket: String,
891
892
    /// Chunk size for resumable uploads.
893
    ///
894
    /// Default: 2MB
895
    pub resumable_chunk_size: Option<usize>,
896
897
    /// Common retry and upload configuration
898
    #[serde(flatten)]
899
    pub common: CommonObjectSpec,
900
}
901
902
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
903
pub struct CommonObjectSpec {
904
    /// If you wish to prefix the location in the bucket. If None, no prefix will be used.
905
    #[serde(default)]
906
    pub key_prefix: Option<String>,
907
908
    /// Retry configuration to use when a network request fails.
909
    #[serde(default)]
910
    pub retry: Retry,
911
912
    /// If the number of seconds since the `last_modified` time of the object
913
    /// is greater than this value, the object will not be considered
914
    /// "existing". This allows for external tools to delete objects that
915
    /// have not been uploaded in a long time. If a client receives a `NotFound`
916
    /// the client should re-upload the object.
917
    ///
918
    /// There should be sufficient buffer time between how long the expiration
919
    /// configuration of the external tool is and this value. Keeping items
920
    /// around for a few days is generally a good idea.
921
    ///
922
    /// Default: 0. Zero means never consider an object expired.
923
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
924
    pub consider_expired_after_s: u32,
925
926
    /// The maximum buffer size to retain in case of a retryable error
927
    /// during upload. Setting this to zero will disable upload buffering;
928
    /// this means that in the event of a failure during upload, the entire
929
    /// upload will be aborted and the client will likely receive an error.
930
    ///
931
    /// Default: 5MB.
932
    pub max_retry_buffer_per_request: Option<usize>,
933
934
    /// Maximum number of concurrent `UploadPart` requests per `MultipartUpload`.
935
    ///
936
    /// Default: 10.
937
    pub multipart_max_concurrent_uploads: Option<usize>,
938
939
    /// Allow unencrypted HTTP connections. Only use this for local testing.
940
    ///
941
    /// Default: false
942
    #[serde(default)]
943
    pub insecure_allow_http: bool,
944
945
    /// Disable http/2 connections and only use http/1.1. Default client
946
    /// configuration will have http/1.1 and http/2 enabled for connection
947
    /// schemes. Http/2 should be disabled if environments have poor support
948
    /// or performance related to http/2. Safe to keep default unless
949
    /// underlying network environment, S3, or GCS API servers specify otherwise.
950
    ///
951
    /// Default: false
952
    #[serde(default)]
953
    pub disable_http2: bool,
954
}
955
956
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
957
#[serde(rename_all = "snake_case")]
958
pub enum StoreType {
959
    /// The store is content addressable storage.
960
    Cas,
961
    /// The store is an action cache.
962
    Ac,
963
}
964
965
#[derive(Serialize, Deserialize, Debug, Clone)]
966
pub struct ClientTlsConfig {
967
    /// Path to the certificate authority to use to validate the remote.
968
    ///
969
    /// Default: None
970
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
971
    pub ca_file: Option<String>,
972
973
    /// Path to the certificate file for client authentication.
974
    ///
975
    /// Default: None
976
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
977
    pub cert_file: Option<String>,
978
979
    /// Path to the private key file for client authentication.
980
    ///
981
    /// Default: None
982
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
983
    pub key_file: Option<String>,
984
985
    /// If set the client will use the native roots for TLS connections.
986
    ///
987
    /// Default: false
988
    #[serde(default)]
989
    pub use_native_roots: Option<bool>,
990
}
991
992
#[derive(Serialize, Deserialize, Debug, Clone)]
993
#[serde(deny_unknown_fields)]
994
pub struct GrpcEndpoint {
995
    /// The endpoint address (i.e. grpc(s)://example.com:443).
996
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
997
    pub address: String,
998
    /// The TLS configuration to use to connect to the endpoint (if grpcs).
999
    pub tls_config: Option<ClientTlsConfig>,
1000
    /// The maximum concurrency to allow on this endpoint.
1001
    pub concurrency_limit: Option<usize>,
1002
}
1003
1004
#[derive(Serialize, Deserialize, Debug, Clone)]
1005
#[serde(deny_unknown_fields)]
1006
pub struct GrpcSpec {
1007
    /// Instance name for GRPC calls. Proxy calls will have the `instance_name` changed to this.
1008
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1009
    pub instance_name: String,
1010
1011
    /// The endpoint of the grpc connection.
1012
    pub endpoints: Vec<GrpcEndpoint>,
1013
1014
    /// The type of the upstream store, this ensures that the correct server calls are made.
1015
    pub store_type: StoreType,
1016
1017
    /// Retry configuration to use when a network request fails.
1018
    #[serde(default)]
1019
    pub retry: Retry,
1020
1021
    /// Limit the number of simultaneous upstream requests to this many.  A
1022
    /// value of zero is treated as unlimited.  If the limit is reached the
1023
    /// request is queued.
1024
    #[serde(default)]
1025
    pub max_concurrent_requests: usize,
1026
1027
    /// The number of connections to make to each specified endpoint to balance
1028
    /// the load over multiple TCP connections.  Default 1.
1029
    #[serde(default)]
1030
    pub connections_per_endpoint: usize,
1031
}
1032
1033
/// The possible error codes that might occur on an upstream request.
1034
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
1035
pub enum ErrorCode {
1036
    Cancelled = 1,
1037
    Unknown = 2,
1038
    InvalidArgument = 3,
1039
    DeadlineExceeded = 4,
1040
    NotFound = 5,
1041
    AlreadyExists = 6,
1042
    PermissionDenied = 7,
1043
    ResourceExhausted = 8,
1044
    FailedPrecondition = 9,
1045
    Aborted = 10,
1046
    OutOfRange = 11,
1047
    Unimplemented = 12,
1048
    Internal = 13,
1049
    Unavailable = 14,
1050
    DataLoss = 15,
1051
    Unauthenticated = 16,
1052
    // Note: This list is duplicated from nativelink-error/lib.rs.
1053
}
1054
1055
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1056
pub struct RedisSpec {
1057
    /// The hostname or IP address of the Redis server.
1058
    /// Ex: `["redis://username:password@redis-server-url:6380/99"]`
1059
    /// 99 Represents database ID, 6380 represents the port.
1060
    #[serde(deserialize_with = "convert_vec_string_with_shellexpand")]
1061
    pub addresses: Vec<String>,
1062
1063
    /// The response timeout for the Redis connection in seconds.
1064
    ///
1065
    /// Default: 10
1066
    #[serde(default)]
1067
    pub response_timeout_s: u64,
1068
1069
    /// The connection timeout for the Redis connection in seconds.
1070
    ///
1071
    /// Default: 10
1072
    #[serde(default)]
1073
    pub connection_timeout_s: u64,
1074
1075
    /// An optional and experimental Redis channel to publish write events to.
1076
    ///
1077
    /// If set, every time a write operation is made to a Redis node
1078
    /// then an event will be published to a Redis channel with the given name.
1079
    /// If unset, the writes will still be made,
1080
    /// but the write events will not be published.
1081
    ///
1082
    /// Default: (Empty String / No Channel)
1083
    #[serde(default)]
1084
    pub experimental_pub_sub_channel: Option<String>,
1085
1086
    /// An optional prefix to prepend to all keys in this store.
1087
    ///
1088
    /// Setting this value can make it convenient to query or
1089
    /// organize your data according to the shared prefix.
1090
    ///
1091
    /// Default: (Empty String / No Prefix)
1092
    #[serde(default)]
1093
    pub key_prefix: String,
1094
1095
    /// Set the mode Redis is operating in.
1096
    ///
1097
    /// Available options are "cluster" for
1098
    /// [cluster mode](https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/),
1099
    /// "sentinel" for [sentinel mode](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/),
1100
    /// or "standard" if Redis is operating in neither cluster nor sentinel mode.
1101
    ///
1102
    /// Default: standard,
1103
    #[serde(default)]
1104
    pub mode: RedisMode,
1105
1106
    /// When using pubsub interface, this is the maximum number of items to keep
1107
    /// queued up before dropping old items.
1108
    ///
1109
    /// Default: 4096
1110
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1111
    pub broadcast_channel_capacity: usize,
1112
1113
    /// The amount of time in milliseconds until the redis store considers the
1114
    /// command to be timed out. This will trigger a retry of the command and
1115
    /// potentially a reconnection to the redis server.
1116
    ///
1117
    /// Default: 10000 (10 seconds)
1118
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1119
    pub command_timeout_ms: u64,
1120
1121
    /// The amount of time in milliseconds until the redis store considers the
1122
    /// connection to unresponsive. This will trigger a reconnection to the
1123
    /// redis server.
1124
    ///
1125
    /// Default: 3000 (3 seconds)
1126
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1127
    pub connection_timeout_ms: u64,
1128
1129
    /// The amount of data to read from the redis server at a time.
1130
    /// This is used to limit the amount of memory used when reading
1131
    /// large objects from the redis server as well as limiting the
1132
    /// amount of time a single read operation can take.
1133
    ///
1134
    /// IMPORTANT: If this value is too high, the `command_timeout_ms`
1135
    /// might be triggered if the latency or throughput to the redis
1136
    /// server is too low.
1137
    ///
1138
    /// Default: 64KiB
1139
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1140
    pub read_chunk_size: usize,
1141
1142
    /// The number of connections to keep open to the redis server(s).
1143
    ///
1144
    /// Default: 3
1145
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1146
    pub connection_pool_size: usize,
1147
1148
    /// The maximum number of upload chunks to allow per update.
1149
    /// This is used to limit the amount of memory used when uploading
1150
    /// large objects to the redis server. A good rule of thumb is to
1151
    /// think of the data as:
1152
    /// `AVAIL_MEMORY / (read_chunk_size * max_chunk_uploads_per_update) = THORETICAL_MAX_CONCURRENT_UPLOADS`
1153
    /// (note: it is a good idea to divide `AVAIL_MAX_MEMORY` by ~10 to account for other memory usage)
1154
    ///
1155
    /// Default: 10
1156
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1157
    pub max_chunk_uploads_per_update: usize,
1158
1159
    /// The COUNT value passed when scanning keys in Redis.
1160
    /// This is used to hint the amount of work that should be done per response.
1161
    ///
1162
    /// Default: 10000
1163
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1164
    pub scan_count: u32,
1165
1166
    /// Retry configuration to use when a network request fails.
1167
    /// See the `Retry` struct for more information.
1168
    ///
1169
    /// ```txt
1170
    /// Default: Retry {
1171
    ///   max_retries: 0, /* unlimited */
1172
    ///   delay: 0.1, /* 100ms */
1173
    ///   jitter: 0.5, /* 50% */
1174
    ///   retry_on_errors: None, /* not used in redis store */
1175
    /// }
1176
    /// ```
1177
    #[serde(default)]
1178
    pub retry: Retry,
1179
}
1180
1181
#[derive(Debug, Default, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
1182
#[serde(rename_all = "snake_case")]
1183
pub enum RedisMode {
1184
    Cluster,
1185
    Sentinel,
1186
    #[default]
1187
    Standard,
1188
}
1189
1190
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
1191
pub struct NoopSpec {}
1192
1193
/// Retry configuration. This configuration is exponential and each iteration
1194
/// a jitter as a percentage is applied of the calculated delay. For example:
1195
/// ```haskell
1196
/// Retry{
1197
///   max_retries: 7,
1198
///   delay: 0.1,
1199
///   jitter: 0.5,
1200
/// }
1201
/// ```
1202
/// will result in:
1203
/// Attempt - Delay
1204
/// 1         0ms
1205
/// 2         75ms - 125ms
1206
/// 3         150ms - 250ms
1207
/// 4         300ms - 500ms
1208
/// 5         600ms - 1s
1209
/// 6         1.2s - 2s
1210
/// 7         2.4s - 4s
1211
/// 8         4.8s - 8s
1212
/// Remember that to get total results is additive, meaning the above results
1213
/// would mean a single request would have a total delay of 9.525s - 15.875s.
1214
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
1215
#[serde(deny_unknown_fields)]
1216
pub struct Retry {
1217
    /// Maximum number of retries until retrying stops.
1218
    /// Setting this to zero will always attempt 1 time, but not retry.
1219
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1220
    pub max_retries: usize,
1221
1222
    /// Delay in seconds for exponential back off.
1223
    #[serde(default)]
1224
    pub delay: f32,
1225
1226
    /// Amount of jitter to add as a percentage in decimal form. This will
1227
    /// change the formula like:
1228
    /// ```haskell
1229
    /// random(
1230
    ///    (2 ^ {attempt_number}) * {delay} * (1 - (jitter / 2)),
1231
    ///    (2 ^ {attempt_number}) * {delay} * (1 + (jitter / 2)),
1232
    /// )
1233
    /// ```
1234
    #[serde(default)]
1235
    pub jitter: f32,
1236
1237
    /// A list of error codes to retry on, if this is not set then the default
1238
    /// error codes to retry on are used.  These default codes are the most
1239
    /// likely to be non-permanent.
1240
    ///  - `Unknown`
1241
    ///  - `Cancelled`
1242
    ///  - `DeadlineExceeded`
1243
    ///  - `ResourceExhausted`
1244
    ///  - `Aborted`
1245
    ///  - `Internal`
1246
    ///  - `Unavailable`
1247
    ///  - `DataLoss`
1248
    #[serde(default)]
1249
    pub retry_on_errors: Option<Vec<ErrorCode>>,
1250
}
1251
1252
/// Configuration for `ExperimentalMongoDB` store.
1253
#[derive(Serialize, Deserialize, Debug, Clone)]
1254
#[serde(deny_unknown_fields)]
1255
pub struct ExperimentalMongoSpec {
1256
    /// `ExperimentalMongoDB` connection string.
1257
    /// Example: <mongodb://localhost:27017> or <mongodb+srv://cluster.mongodb.net>
1258
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
1259
    pub connection_string: String,
1260
1261
    /// The database name to use.
1262
    /// Default: "nativelink"
1263
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1264
    pub database: String,
1265
1266
    /// The collection name for CAS data.
1267
    /// Default: "cas"
1268
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1269
    pub cas_collection: String,
1270
1271
    /// The collection name for scheduler data.
1272
    /// Default: "scheduler"
1273
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1274
    pub scheduler_collection: String,
1275
1276
    /// Prefix to prepend to all keys stored in `MongoDB`.
1277
    /// Default: ""
1278
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1279
    pub key_prefix: Option<String>,
1280
1281
    /// The maximum amount of data to read from `MongoDB` in a single chunk (in bytes).
1282
    /// Default: 65536 (64KB)
1283
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
1284
    pub read_chunk_size: usize,
1285
1286
    /// Maximum number of concurrent uploads allowed.
1287
    /// Default: 10
1288
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1289
    pub max_concurrent_uploads: usize,
1290
1291
    /// Connection timeout in milliseconds.
1292
    /// Default: 3000
1293
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1294
    pub connection_timeout_ms: u64,
1295
1296
    /// Command timeout in milliseconds.
1297
    /// Default: 10000
1298
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1299
    pub command_timeout_ms: u64,
1300
1301
    /// Enable `MongoDB` change streams for real-time updates.
1302
    /// Required for scheduler subscriptions.
1303
    /// Default: false
1304
    #[serde(default)]
1305
    pub enable_change_streams: bool,
1306
1307
    /// Write concern 'w' parameter.
1308
    /// Can be a number (e.g., 1) or string (e.g., "majority").
1309
    /// Default: None (uses `MongoDB` default)
1310
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1311
    pub write_concern_w: Option<String>,
1312
1313
    /// Write concern 'j' parameter (journal acknowledgment).
1314
    /// Default: None (uses `MongoDB` default)
1315
    #[serde(default)]
1316
    pub write_concern_j: Option<bool>,
1317
1318
    /// Write concern timeout in milliseconds.
1319
    /// Default: None (uses `MongoDB` default)
1320
    #[serde(
1321
        default,
1322
        deserialize_with = "convert_optional_numeric_with_shellexpand"
1323
    )]
1324
    pub write_concern_timeout_ms: Option<u32>,
1325
}
1326
1327
impl Retry {
1328
9
    pub fn make_jitter_fn(&self) -> Arc<dyn Fn(Duration) -> Duration + Send + Sync> {
1329
9
        if self.jitter == 0f32 {
  Branch (1329:12): [True: 9, False: 0]
  Branch (1329:12): [Folded - Ignored]
1330
9
            Arc::new(move |delay: Duration| delay)
1331
        } else {
1332
0
            let local_jitter = self.jitter;
1333
0
            Arc::new(move |delay: Duration| {
1334
0
                delay.mul_f32(local_jitter.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
1335
0
            })
1336
        }
1337
9
    }
1338
}