Coverage Report

Created: 2025-11-07 13:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/stores.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::time::Duration;
16
use std::sync::Arc;
17
18
use rand::Rng;
19
use serde::{Deserialize, Serialize};
20
21
use crate::serde_utils::{
22
    convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
23
    convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
24
    convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
25
    convert_vec_string_with_shellexpand,
26
};
27
28
/// Name of the store. This type will be used when referencing a store
29
/// in the `CasConfig::stores`'s map key.
30
pub type StoreRefName = String;
31
32
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
33
#[serde(rename_all = "snake_case")]
34
pub enum ConfigDigestHashFunction {
35
    /// Use the sha256 hash function.
36
    /// <https://en.wikipedia.org/wiki/SHA-2>
37
    Sha256,
38
39
    /// Use the blake3 hash function.
40
    /// <https://en.wikipedia.org/wiki/BLAKE_(hash_function)>
41
    Blake3,
42
}
43
44
#[derive(Serialize, Deserialize, Debug, Clone)]
45
#[serde(rename_all = "snake_case")]
46
pub enum StoreSpec {
47
    /// Memory store will store all data in a hashmap in memory.
48
    ///
49
    /// **Example JSON Config:**
50
    /// ```json
51
    /// "memory": {
52
    ///   "eviction_policy": {
53
    ///     "max_bytes": "10mb",
54
    ///   }
55
    /// }
56
    /// ```
57
    ///
58
    Memory(MemorySpec),
59
60
    /// A generic blob store that will store files on the cloud
61
    /// provider. This configuration will never delete files, so you are
62
    /// responsible for purging old files in other ways.
63
    /// It supports the following backends:
64
    ///
65
    /// 1. **Amazon S3:**
66
    ///    S3 store will use Amazon's S3 service as a backend to store
67
    ///    the files. This configuration can be used to share files
68
    ///    across multiple instances. Uses system certificates for TLS
69
    ///    verification via `rustls-platform-verifier`.
70
    ///
71
    ///   **Example JSON Config:**
72
    ///   ```json
73
    ///   "experimental_cloud_object_store": {
74
    ///     "provider": "aws",
75
    ///     "region": "eu-north-1",
76
    ///     "bucket": "crossplane-bucket-af79aeca9",
77
    ///     "key_prefix": "test-prefix-index/",
78
    ///     "retry": {
79
    ///       "max_retries": 6,
80
    ///       "delay": 0.3,
81
    ///       "jitter": 0.5
82
    ///     },
83
    ///     "multipart_max_concurrent_uploads": 10
84
    ///   }
85
    ///   ```
86
    ///
87
    /// 2. **Google Cloud Storage:**
88
    ///    GCS store uses Google's GCS service as a backend to store
89
    ///    the files. This configuration can be used to share files
90
    ///    across multiple instances.
91
    ///
92
    ///   **Example JSON Config:**
93
    ///   ```json
94
    ///   "experimental_cloud_object_store": {
95
    ///     "provider": "gcs",
96
    ///     "bucket": "test-bucket",
97
    ///     "key_prefix": "test-prefix-index/",
98
    ///     "retry": {
99
    ///       "max_retries": 6,
100
    ///       "delay": 0.3,
101
    ///       "jitter": 0.5
102
    ///     },
103
    ///     "multipart_max_concurrent_uploads": 10
104
    ///   }
105
    ///   ```
106
    ///
107
    /// 3. **`NetApp` ONTAP S3**
108
    ///    `NetApp` ONTAP S3 store will use ONTAP's S3-compatible storage as a backend
109
    ///    to store files. This store is specifically configured for ONTAP's S3 requirements
110
    ///    including custom TLS configuration, credentials management, and proper vserver
111
    ///    configuration.
112
    ///
113
    ///    This store uses AWS environment variables for credentials:
114
    ///    - `AWS_ACCESS_KEY_ID`
115
    ///    - `AWS_SECRET_ACCESS_KEY`
116
    ///    - `AWS_DEFAULT_REGION`
117
    ///
118
    ///    **Example JSON Config:**
119
    ///    ```json
120
    ///    "experimental_cloud_object_store": {
121
    ///      "provider": "ontap",
122
    ///      "endpoint": "https://ontap-s3-endpoint:443",
123
    ///      "vserver_name": "your-vserver",
124
    ///      "bucket": "your-bucket",
125
    ///      "root_certificates": "/path/to/certs.pem",  // Optional
126
    ///      "key_prefix": "test-prefix/",               // Optional
127
    ///      "retry": {
128
    ///        "max_retries": 6,
129
    ///        "delay": 0.3,
130
    ///        "jitter": 0.5
131
    ///      },
132
    ///      "multipart_max_concurrent_uploads": 10
133
    ///    }
134
    ///    ```
135
    ExperimentalCloudObjectStore(ExperimentalCloudObjectSpec),
136
137
    /// ONTAP S3 Existence Cache provides a caching layer on top of the ONTAP S3 store
138
    /// to optimize repeated existence checks. It maintains an in-memory cache of object
139
    /// digests and periodically syncs this cache to disk for persistence.
140
    ///
141
    /// The cache helps reduce latency for repeated calls to check object existence,
142
    /// while still ensuring eventual consistency with the underlying ONTAP S3 store.
143
    ///
144
    /// Example JSON Config:
145
    /// ```json
146
    /// "ontap_s3_existence_cache": {
147
    ///   "index_path": "/path/to/cache/index.json",
148
    ///   "sync_interval_seconds": 300,
149
    ///   "backend": {
150
    ///     "endpoint": "https://ontap-s3-endpoint:443",
151
    ///     "vserver_name": "your-vserver",
152
    ///     "bucket": "your-bucket",
153
    ///     "key_prefix": "test-prefix/"
154
    ///   }
155
    /// }
156
    /// ```
157
    ///
158
    OntapS3ExistenceCache(Box<OntapS3ExistenceCacheSpec>),
159
160
    /// Verify store is used to apply verifications to an underlying
161
    /// store implementation. It is strongly encouraged to validate
162
    /// as much data as you can before accepting data from a client,
163
    /// failing to do so may cause the data in the store to be
164
    /// populated with invalid data causing all kinds of problems.
165
    ///
166
    /// The suggested configuration is to have the CAS validate the
167
    /// hash and size and the AC validate nothing.
168
    ///
169
    /// **Example JSON Config:**
170
    /// ```json
171
    /// "verify": {
172
    ///   "backend": {
173
    ///     "memory": {
174
    ///       "eviction_policy": {
175
    ///         "max_bytes": "500mb"
176
    ///       }
177
    ///     },
178
    ///   },
179
    ///   "verify_size": true,
180
    ///   "verify_hash": true
181
    /// }
182
    /// ```
183
    ///
184
    Verify(Box<VerifySpec>),
185
186
    /// Completeness checking store verifies if the
187
    /// output files & folders exist in the CAS before forwarding
188
    /// the request to the underlying store.
189
    /// Note: This store should only be used on AC stores.
190
    ///
191
    /// **Example JSON Config:**
192
    /// ```json
193
    /// "completeness_checking": {
194
    ///   "backend": {
195
    ///     "filesystem": {
196
    ///       "content_path": "~/.cache/nativelink/content_path-ac",
197
    ///       "temp_path": "~/.cache/nativelink/tmp_path-ac",
198
    ///       "eviction_policy": {
199
    ///         "max_bytes": "500mb",
200
    ///       }
201
    ///     }
202
    ///   },
203
    ///   "cas_store": {
204
    ///     "ref_store": {
205
    ///       "name": "CAS_MAIN_STORE"
206
    ///     }
207
    ///   }
208
    /// }
209
    /// ```
210
    ///
211
    CompletenessChecking(Box<CompletenessCheckingSpec>),
212
213
    /// A compression store that will compress the data inbound and
214
    /// outbound. There will be a non-trivial cost to compress and
215
    /// decompress the data, but in many cases if the final store is
216
    /// a store that requires network transport and/or storage space
217
    /// is a concern it is often faster and more efficient to use this
218
    /// store before those stores.
219
    ///
220
    /// **Example JSON Config:**
221
    /// ```json
222
    /// "compression": {
223
    ///   "compression_algorithm": {
224
    ///     "lz4": {}
225
    ///   },
226
    ///   "backend": {
227
    ///     "filesystem": {
228
    ///       "content_path": "/tmp/nativelink/data/content_path-cas",
229
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-cas",
230
    ///       "eviction_policy": {
231
    ///         "max_bytes": "2gb",
232
    ///       }
233
    ///     }
234
    ///   }
235
    /// }
236
    /// ```
237
    ///
238
    Compression(Box<CompressionSpec>),
239
240
    /// A dedup store will take the inputs and run a rolling hash
241
    /// algorithm on them to slice the input into smaller parts then
242
    /// run a sha256 algorithm on the slice and if the object doesn't
243
    /// already exist, upload the slice to the `content_store` using
244
    /// a new digest of just the slice. Once all parts exist, an
245
    /// Action-Cache-like digest will be built and uploaded to the
246
    /// `index_store` which will contain a reference to each
247
    /// chunk/digest of the uploaded file. Downloading a request will
248
    /// first grab the index from the `index_store`, and forward the
249
    /// download content of each chunk as if it were one file.
250
    ///
251
    /// This store is exceptionally good when the following conditions
252
    /// are met:
253
    /// * Content is mostly the same (inserts, updates, deletes are ok)
254
    /// * Content is not compressed or encrypted
255
    /// * Uploading or downloading from `content_store` is the bottleneck.
256
    ///
257
    /// Note: This store pairs well when used with `CompressionSpec` as
258
    /// the `content_store`, but never put `DedupSpec` as the backend of
259
    /// `CompressionSpec` as it will negate all the gains.
260
    ///
261
    /// Note: When running `.has()` on this store, it will only check
262
    /// to see if the entry exists in the `index_store` and not check
263
    /// if the individual chunks exist in the `content_store`.
264
    ///
265
    /// **Example JSON Config:**
266
    /// ```json
267
    /// "dedup": {
268
    ///   "index_store": {
269
    ///     "memory": {
270
    ///       "eviction_policy": {
271
    ///          "max_bytes": "1GB",
272
    ///       }
273
    ///     }
274
    ///   },
275
    ///   "content_store": {
276
    ///     "compression": {
277
    ///       "compression_algorithm": {
278
    ///         "lz4": {}
279
    ///       },
280
    ///       "backend": {
281
    ///         "fast_slow": {
282
    ///           "fast": {
283
    ///             "memory": {
284
    ///               "eviction_policy": {
285
    ///                 "max_bytes": "500MB",
286
    ///               }
287
    ///             }
288
    ///           },
289
    ///           "slow": {
290
    ///             "filesystem": {
291
    ///               "content_path": "/tmp/nativelink/data/content_path-content",
292
    ///               "temp_path": "/tmp/nativelink/data/tmp_path-content",
293
    ///               "eviction_policy": {
294
    ///                 "max_bytes": "2gb"
295
    ///               }
296
    ///             }
297
    ///           }
298
    ///         }
299
    ///       }
300
    ///     }
301
    ///   }
302
    /// }
303
    /// ```
304
    ///
305
    Dedup(Box<DedupSpec>),
306
307
    /// Existence store will wrap around another store and cache calls
308
    /// to has so that subsequent `has_with_results` calls will be
309
    /// faster. This is useful for cases when you have a store that
310
    /// is slow to respond to has calls.
311
    /// Note: This store should only be used on CAS stores.
312
    ///
313
    /// **Example JSON Config:**
314
    /// ```json
315
    /// "existence_cache": {
316
    ///   "backend": {
317
    ///     "memory": {
318
    ///       "eviction_policy": {
319
    ///         "max_bytes": "500mb",
320
    ///       }
321
    ///     }
322
    ///   },
323
    ///   // Note this is the existence store policy, not the backend policy
324
    ///   "eviction_policy": {
325
    ///     "max_seconds": 100,
326
    ///   }
327
    /// }
328
    /// ```
329
    ///
330
    ExistenceCache(Box<ExistenceCacheSpec>),
331
332
    /// `FastSlow` store will first try to fetch the data from the `fast`
333
    /// store and then if it does not exist try the `slow` store.
334
    /// When the object does exist in the `slow` store, it will copy
335
    /// the data to the `fast` store while returning the data.
336
    /// This store should be thought of as a store that "buffers"
337
    /// the data to the `fast` store.
338
    /// On uploads it will mirror data to both `fast` and `slow` stores.
339
    ///
340
    /// WARNING: If you need data to always exist in the `slow` store
341
    /// for something like remote execution, be careful because this
342
    /// store will never check to see if the objects exist in the
343
    /// `slow` store if it exists in the `fast` store (ie: it assumes
344
    /// that if an object exists in the `fast` store it will exist in
345
    /// the `slow` store).
346
    ///
347
    /// ***Example JSON Config:***
348
    /// ```json
349
    /// "fast_slow": {
350
    ///   "fast": {
351
    ///     "filesystem": {
352
    ///       "content_path": "/tmp/nativelink/data/content_path-index",
353
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-index",
354
    ///       "eviction_policy": {
355
    ///         "max_bytes": "500mb",
356
    ///       }
357
    ///     }
358
    ///   },
359
    ///   "slow": {
360
    ///     "filesystem": {
361
    ///       "content_path": "/tmp/nativelink/data/content_path-index",
362
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-index",
363
    ///       "eviction_policy": {
364
    ///         "max_bytes": "500mb",
365
    ///       }
366
    ///     }
367
    ///   }
368
    /// }
369
    /// ```
370
    ///
371
    FastSlow(Box<FastSlowSpec>),
372
373
    /// Shards the data to multiple stores. This is useful for cases
374
    /// when you want to distribute the load across multiple stores.
375
    /// The digest hash is used to determine which store to send the
376
    /// data to.
377
    ///
378
    /// **Example JSON Config:**
379
    /// ```json
380
    /// "shard": {
381
    ///   "stores": [
382
    ///    {
383
    ///     "store": {
384
    ///       "memory": {
385
    ///         "eviction_policy": {
386
    ///             "max_bytes": "10mb"
387
    ///         },
388
    ///       },
389
    ///     },
390
    ///     "weight": 1
391
    ///   }]
392
    /// }
393
    /// ```
394
    ///
395
    Shard(ShardSpec),
396
397
    /// Stores the data on the filesystem. This store is designed for
398
    /// local persistent storage. Restarts of this program should restore
399
    /// the previous state, meaning anything uploaded will be persistent
400
    /// as long as the filesystem integrity holds.
401
    ///
402
    /// **Example JSON Config:**
403
    /// ```json
404
    /// "filesystem": {
405
    ///   "content_path": "/tmp/nativelink/data-worker-test/content_path-cas",
406
    ///   "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas",
407
    ///   "eviction_policy": {
408
    ///     "max_bytes": "10gb",
409
    ///   }
410
    /// }
411
    /// ```
412
    ///
413
    Filesystem(FilesystemSpec),
414
415
    /// Store used to reference a store in the root store manager.
416
    /// This is useful for cases when you want to share a store in different
417
    /// nested stores. Example, you may want to share the same memory store
418
    /// used for the action cache, but use a `FastSlowSpec` and have the fast
419
    /// store also share the memory store for efficiency.
420
    ///
421
    /// **Example JSON Config:**
422
    /// ```json
423
    /// "ref_store": {
424
    ///   "name": "FS_CONTENT_STORE"
425
    /// }
426
    /// ```
427
    ///
428
    RefStore(RefSpec),
429
430
    /// Uses the size field of the digest to separate which store to send the
431
    /// data. This is useful for cases when you'd like to put small objects
432
    /// in one store and large objects in another store. This should only be
433
    /// used if the size field is the real size of the content, in other
434
    /// words, don't use on AC (Action Cache) stores. Any store where you can
435
    /// safely use `VerifySpec.verify_size = true`, this store should be safe
436
    /// to use (ie: CAS stores).
437
    ///
438
    /// **Example JSON Config:**
439
    /// ```json
440
    /// "size_partitioning": {
441
    ///   "size": "128mib",
442
    ///   "lower_store": {
443
    ///     "memory": {
444
    ///       "eviction_policy": {
445
    ///         "max_bytes": "${NATIVELINK_CAS_MEMORY_CONTENT_LIMIT:-100mb}"
446
    ///       }
447
    ///     }
448
    ///   },
449
    ///   "upper_store": {
450
    ///     /// This store discards data larger than 128mib.
451
    ///     "noop": {}
452
    ///   }
453
    /// }
454
    /// ```
455
    ///
456
    SizePartitioning(Box<SizePartitioningSpec>),
457
458
    /// This store will pass-through calls to another GRPC store. This store
459
    /// is not designed to be used as a sub-store of another store, but it
460
    /// does satisfy the interface and will likely work.
461
    ///
462
    /// One major GOTCHA is that some stores use a special function on this
463
    /// store to get the size of the underlying object, which is only reliable
464
    /// when this store is serving the a CAS store, not an AC store. If using
465
    /// this store directly without being a child of any store there are no
466
    /// side effects and is the most efficient way to use it.
467
    ///
468
    /// **Example JSON Config:**
469
    /// ```json
470
    /// "grpc": {
471
    ///   "instance_name": "main",
472
    ///   "endpoints": [
473
    ///     {"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
474
    ///   ],
475
    ///   "store_type": "ac"
476
    /// }
477
    /// ```
478
    ///
479
    Grpc(GrpcSpec),
480
481
    /// Stores data in any stores compatible with Redis APIs.
482
    ///
483
    /// Pairs well with `SizePartitioning` and/or `FastSlow` stores.
484
    /// Ideal for accepting small object sizes as most redis store
485
    /// services have a max file upload of between 256Mb-512Mb.
486
    ///
487
    /// **Example JSON Config:**
488
    /// ```json
489
    /// "redis_store": {
490
    ///   "addresses": [
491
    ///     "redis://127.0.0.1:6379/",
492
    ///   ]
493
    /// }
494
    /// ```
495
    ///
496
    RedisStore(RedisSpec),
497
498
    /// Noop store is a store that sends streams into the void and all data
499
    /// retrieval will return 404 (`NotFound`). This can be useful for cases
500
    /// where you may need to partition your data and part of your data needs
501
    /// to be discarded.
502
    ///
503
    /// **Example JSON Config:**
504
    /// ```json
505
    /// "noop": {}
506
    /// ```
507
    ///
508
    Noop(NoopSpec),
509
510
    /// Experimental `MongoDB` store implementation.
511
    ///
512
    /// This store uses `MongoDB` as a backend for storing data. It supports
513
    /// both CAS (Content Addressable Storage) and scheduler data with
514
    /// optional change streams for real-time updates.
515
    ///
516
    /// **Example JSON Config:**
517
    /// ```json
518
    /// "experimental_mongo": {
519
    ///     "connection_string": "mongodb://localhost:27017",
520
    ///     "database": "nativelink",
521
    ///     "cas_collection": "cas",
522
    ///     "key_prefix": "cas:",
523
    ///     "read_chunk_size": 65536,
524
    ///     "max_concurrent_uploads": 10,
525
    ///     "enable_change_streams": false
526
    /// }
527
    /// ```
528
    ///
529
    ExperimentalMongo(ExperimentalMongoSpec),
530
}
531
532
/// Configuration for an individual shard of the store.
533
#[derive(Serialize, Deserialize, Debug, Clone)]
534
#[serde(deny_unknown_fields)]
535
pub struct ShardConfig {
536
    /// Store to shard the data to.
537
    pub store: StoreSpec,
538
539
    /// The weight of the store. This is used to determine how much data
540
    /// should be sent to the store. The actual percentage is the sum of
541
    /// all the store's weights divided by the individual store's weight.
542
    ///
543
    /// Default: 1
544
    pub weight: Option<u32>,
545
}
546
547
#[derive(Serialize, Deserialize, Debug, Clone)]
548
#[serde(deny_unknown_fields)]
549
pub struct ShardSpec {
550
    /// Stores to shard the data to.
551
    pub stores: Vec<ShardConfig>,
552
}
553
554
#[derive(Serialize, Deserialize, Debug, Clone)]
555
#[serde(deny_unknown_fields)]
556
pub struct SizePartitioningSpec {
557
    /// Size to partition the data on.
558
    #[serde(deserialize_with = "convert_data_size_with_shellexpand")]
559
    pub size: u64,
560
561
    /// Store to send data when object is < (less than) size.
562
    pub lower_store: StoreSpec,
563
564
    /// Store to send data when object is >= (less than eq) size.
565
    pub upper_store: StoreSpec,
566
}
567
568
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
569
#[serde(deny_unknown_fields)]
570
pub struct RefSpec {
571
    /// Name of the store under the root "stores" config object.
572
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
573
    pub name: String,
574
}
575
576
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
577
#[serde(deny_unknown_fields)]
578
pub struct FilesystemSpec {
579
    /// Path on the system where to store the actual content. This is where
580
    /// the bulk of the data will be placed.
581
    /// On service bootup this folder will be scanned and all files will be
582
    /// added to the cache. In the event one of the files doesn't match the
583
    /// criteria, the file will be deleted.
584
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
585
    pub content_path: String,
586
587
    /// A temporary location of where files that are being uploaded or
588
    /// deleted will be placed while the content cannot be guaranteed to be
589
    /// accurate. This location must be on the same block device as
590
    /// `content_path` so atomic moves can happen (ie: move without copy).
591
    /// All files in this folder will be deleted on every startup.
592
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
593
    pub temp_path: String,
594
595
    /// Buffer size to use when reading files. Generally this should be left
596
    /// to the default value except for testing.
597
    /// Default: 32k.
598
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
599
    pub read_buffer_size: u32,
600
601
    /// Policy used to evict items out of the store. Failure to set this
602
    /// value will cause items to never be removed from the store causing
603
    /// infinite memory usage.
604
    pub eviction_policy: Option<EvictionPolicy>,
605
606
    /// The block size of the filesystem for the running machine
607
    /// value is used to determine an entry's actual size on disk consumed
608
    /// For a 4KB block size filesystem, a 1B file actually consumes 4KB
609
    /// Default: 4096
610
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
611
    pub block_size: u64,
612
}
613
614
// NetApp ONTAP S3 Spec
615
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
616
#[serde(deny_unknown_fields)]
617
pub struct ExperimentalOntapS3Spec {
618
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
619
    pub endpoint: String,
620
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
621
    pub vserver_name: String,
622
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
623
    pub bucket: String,
624
    #[serde(default)]
625
    pub root_certificates: Option<String>,
626
627
    /// Common retry and upload configuration
628
    #[serde(flatten)]
629
    pub common: CommonObjectSpec,
630
}
631
632
#[derive(Serialize, Deserialize, Debug, Clone)]
633
#[serde(deny_unknown_fields)]
634
pub struct OntapS3ExistenceCacheSpec {
635
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
636
    pub index_path: String,
637
    #[serde(deserialize_with = "convert_numeric_with_shellexpand")]
638
    pub sync_interval_seconds: u32,
639
    pub backend: Box<ExperimentalOntapS3Spec>,
640
}
641
642
#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy, PartialEq)]
643
#[serde(rename_all = "snake_case")]
644
pub enum StoreDirection {
645
    /// The store operates normally and all get and put operations are
646
    /// handled by it.
647
    #[default]
648
    Both,
649
    /// Update operations will cause persistence to this store, but Get
650
    /// operations will be ignored.
651
    /// This only makes sense on the fast store as the slow store will
652
    /// never get written to on Get anyway.
653
    Update,
654
    /// Get operations will cause persistence to this store, but Update
655
    /// operations will be ignored.
656
    Get,
657
    /// Operate as a read only store, only really makes sense if there's
658
    /// another way to write to it.
659
    ReadOnly,
660
}
661
662
#[derive(Serialize, Deserialize, Debug, Clone)]
663
#[serde(deny_unknown_fields)]
664
pub struct FastSlowSpec {
665
    /// Fast store that will be attempted to be contacted before reaching
666
    /// out to the `slow` store.
667
    pub fast: StoreSpec,
668
669
    /// How to handle the fast store.  This can be useful to set to Get for
670
    /// worker nodes such that results are persisted to the slow store only.
671
    #[serde(default)]
672
    pub fast_direction: StoreDirection,
673
674
    /// If the object does not exist in the `fast` store it will try to
675
    /// get it from this store.
676
    pub slow: StoreSpec,
677
678
    /// How to handle the slow store.  This can be useful if creating a diode
679
    /// and you wish to have an upstream read only store.
680
    #[serde(default)]
681
    pub slow_direction: StoreDirection,
682
}
683
684
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
685
#[serde(deny_unknown_fields)]
686
pub struct MemorySpec {
687
    /// Policy used to evict items out of the store. Failure to set this
688
    /// value will cause items to never be removed from the store causing
689
    /// infinite memory usage.
690
    pub eviction_policy: Option<EvictionPolicy>,
691
}
692
693
#[derive(Serialize, Deserialize, Debug, Clone)]
694
#[serde(deny_unknown_fields)]
695
pub struct DedupSpec {
696
    /// Store used to store the index of each dedup slice. This store
697
    /// should generally be fast and small.
698
    pub index_store: StoreSpec,
699
700
    /// The store where the individual chunks will be uploaded. This
701
    /// store should generally be the slower & larger store.
702
    pub content_store: StoreSpec,
703
704
    /// Minimum size that a chunk will be when slicing up the content.
705
    /// Note: This setting can be increased to improve performance
706
    /// because it will actually not check this number of bytes when
707
    /// deciding where to partition the data.
708
    ///
709
    /// Default: 65536 (64k)
710
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
711
    pub min_size: u32,
712
713
    /// A best-effort attempt will be made to keep the average size
714
    /// of the chunks to this number. It is not a guarantee, but a
715
    /// slight attempt will be made.
716
    ///
717
    /// This value will also be about the threshold used to determine
718
    /// if we should even attempt to dedup the entry or just forward
719
    /// it directly to the `content_store` without an index. The actual
720
    /// value will be about `normal_size * 1.3` due to implementation
721
    /// details.
722
    ///
723
    /// Default: 262144 (256k)
724
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
725
    pub normal_size: u32,
726
727
    /// Maximum size a chunk is allowed to be.
728
    ///
729
    /// Default: 524288 (512k)
730
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
731
    pub max_size: u32,
732
733
    /// Due to implementation detail, we want to prefer to download
734
    /// the first chunks of the file so we can stream the content
735
    /// out and free up some of our buffers. This configuration
736
    /// will be used to to restrict the number of concurrent chunk
737
    /// downloads at a time per `get()` request.
738
    ///
739
    /// This setting will also affect how much memory might be used
740
    /// per `get()` request. Estimated worst case memory per `get()`
741
    /// request is: `max_concurrent_fetch_per_get * max_size`.
742
    ///
743
    /// Default: 10
744
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
745
    pub max_concurrent_fetch_per_get: u32,
746
}
747
748
#[derive(Serialize, Deserialize, Debug, Clone)]
749
#[serde(deny_unknown_fields)]
750
pub struct ExistenceCacheSpec {
751
    /// The underlying store wrap around. All content will first flow
752
    /// through self before forwarding to backend. In the event there
753
    /// is an error detected in self, the connection to the backend
754
    /// will be terminated, and early termination should always cause
755
    /// updates to fail on the backend.
756
    pub backend: StoreSpec,
757
758
    /// Policy used to evict items out of the store. Failure to set this
759
    /// value will cause items to never be removed from the store causing
760
    /// infinite memory usage.
761
    pub eviction_policy: Option<EvictionPolicy>,
762
}
763
764
#[derive(Serialize, Deserialize, Debug, Clone)]
765
#[serde(deny_unknown_fields)]
766
pub struct VerifySpec {
767
    /// The underlying store wrap around. All content will first flow
768
    /// through self before forwarding to backend. In the event there
769
    /// is an error detected in self, the connection to the backend
770
    /// will be terminated, and early termination should always cause
771
    /// updates to fail on the backend.
772
    pub backend: StoreSpec,
773
774
    /// If set the store will verify the size of the data before accepting
775
    /// an upload of data.
776
    ///
777
    /// This should be set to false for AC, but true for CAS stores.
778
    #[serde(default)]
779
    pub verify_size: bool,
780
781
    /// If the data should be hashed and verify that the key matches the
782
    /// computed hash. The hash function is automatically determined based
783
    /// request and if not set will use the global default.
784
    ///
785
    /// This should be set to false for AC, but true for CAS stores.
786
    #[serde(default)]
787
    pub verify_hash: bool,
788
}
789
790
#[derive(Serialize, Deserialize, Debug, Clone)]
791
#[serde(deny_unknown_fields)]
792
pub struct CompletenessCheckingSpec {
793
    /// The underlying store that will have it's results validated before sending to client.
794
    pub backend: StoreSpec,
795
796
    /// When a request is made, the results are decoded and all output digests/files are verified
797
    /// to exist in this CAS store before returning success.
798
    pub cas_store: StoreSpec,
799
}
800
801
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq, Clone, Copy)]
802
#[serde(deny_unknown_fields)]
803
pub struct Lz4Config {
804
    /// Size of the blocks to compress.
805
    /// Higher values require more ram, but might yield slightly better
806
    /// compression ratios.
807
    ///
808
    /// Default: 65536 (64k).
809
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
810
    pub block_size: u32,
811
812
    /// Maximum size allowed to attempt to deserialize data into.
813
    /// This is needed because the `block_size` is embedded into the data
814
    /// so if there was a bad actor, they could upload an extremely large
815
    /// `block_size`'ed entry and we'd allocate a large amount of memory
816
    /// when retrieving the data. To prevent this from happening, we
817
    /// allow you to specify the maximum that we'll attempt deserialize.
818
    ///
819
    /// Default: value in `block_size`.
820
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
821
    pub max_decode_block_size: u32,
822
}
823
824
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
825
#[serde(rename_all = "snake_case")]
826
pub enum CompressionAlgorithm {
827
    /// LZ4 compression algorithm is extremely fast for compression and
828
    /// decompression, however does not perform very well in compression
829
    /// ratio. In most cases build artifacts are highly compressible, however
830
    /// lz4 is quite good at aborting early if the data is not deemed very
831
    /// compressible.
832
    ///
833
    /// see: <https://lz4.github.io/lz4/>
834
    Lz4(Lz4Config),
835
}
836
837
#[derive(Serialize, Deserialize, Debug, Clone)]
838
#[serde(deny_unknown_fields)]
839
pub struct CompressionSpec {
840
    /// The underlying store wrap around. All content will first flow
841
    /// through self before forwarding to backend. In the event there
842
    /// is an error detected in self, the connection to the backend
843
    /// will be terminated, and early termination should always cause
844
    /// updates to fail on the backend.
845
    pub backend: StoreSpec,
846
847
    /// The compression algorithm to use.
848
    pub compression_algorithm: CompressionAlgorithm,
849
}
850
851
/// Eviction policy always works on LRU (Least Recently Used). Any time an entry
852
/// is touched it updates the timestamp. Inserts and updates will execute the
853
/// eviction policy removing any expired entries and/or the oldest entries
854
/// until the store size becomes smaller than `max_bytes`.
855
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
856
#[serde(deny_unknown_fields)]
857
pub struct EvictionPolicy {
858
    /// Maximum number of bytes before eviction takes place.
859
    /// Default: 0. Zero means never evict based on size.
860
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
861
    pub max_bytes: usize,
862
863
    /// When eviction starts based on hitting `max_bytes`, continue until
864
    /// `max_bytes - evict_bytes` is met to create a low watermark.  This stops
865
    /// operations from thrashing when the store is close to the limit.
866
    /// Default: 0
867
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
868
    pub evict_bytes: usize,
869
870
    /// Maximum number of seconds for an entry to live since it was last
871
    /// accessed before it is evicted.
872
    /// Default: 0. Zero means never evict based on time.
873
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
874
    pub max_seconds: u32,
875
876
    /// Maximum size of the store before an eviction takes place.
877
    /// Default: 0. Zero means never evict based on count.
878
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
879
    pub max_count: u64,
880
}
881
882
#[derive(Serialize, Deserialize, Debug, Clone)]
883
#[serde(tag = "provider", rename_all = "snake_case")]
884
pub enum ExperimentalCloudObjectSpec {
885
    Aws(ExperimentalAwsSpec),
886
    Gcs(ExperimentalGcsSpec),
887
    Ontap(ExperimentalOntapS3Spec),
888
}
889
890
impl Default for ExperimentalCloudObjectSpec {
891
0
    fn default() -> Self {
892
0
        Self::Aws(ExperimentalAwsSpec::default())
893
0
    }
894
}
895
896
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
897
#[serde(deny_unknown_fields)]
898
pub struct ExperimentalAwsSpec {
899
    /// S3 region. Usually us-east-1, us-west-2, af-south-1, exc...
900
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
901
    pub region: String,
902
903
    /// Bucket name to use as the backend.
904
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
905
    pub bucket: String,
906
907
    /// Common retry and upload configuration
908
    #[serde(flatten)]
909
    pub common: CommonObjectSpec,
910
}
911
912
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
913
#[serde(deny_unknown_fields)]
914
pub struct ExperimentalGcsSpec {
915
    /// Bucket name to use as the backend.
916
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
917
    pub bucket: String,
918
919
    /// Chunk size for resumable uploads.
920
    ///
921
    /// Default: 2MB
922
    pub resumable_chunk_size: Option<usize>,
923
924
    /// Common retry and upload configuration
925
    #[serde(flatten)]
926
    pub common: CommonObjectSpec,
927
928
    /// Error if authentication was not found.
929
    #[serde(default)]
930
    pub authentication_required: bool,
931
932
    /// Connection timeout in milliseconds.
933
    /// Default: 3000
934
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
935
    pub connection_timeout_s: u64,
936
937
    /// Read timeout in milliseconds.
938
    /// Default: 3000
939
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
940
    pub read_timeout_s: u64,
941
}
942
943
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
944
pub struct CommonObjectSpec {
945
    /// If you wish to prefix the location in the bucket. If None, no prefix will be used.
946
    #[serde(default)]
947
    pub key_prefix: Option<String>,
948
949
    /// Retry configuration to use when a network request fails.
950
    #[serde(default)]
951
    pub retry: Retry,
952
953
    /// If the number of seconds since the `last_modified` time of the object
954
    /// is greater than this value, the object will not be considered
955
    /// "existing". This allows for external tools to delete objects that
956
    /// have not been uploaded in a long time. If a client receives a `NotFound`
957
    /// the client should re-upload the object.
958
    ///
959
    /// There should be sufficient buffer time between how long the expiration
960
    /// configuration of the external tool is and this value. Keeping items
961
    /// around for a few days is generally a good idea.
962
    ///
963
    /// Default: 0. Zero means never consider an object expired.
964
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
965
    pub consider_expired_after_s: u32,
966
967
    /// The maximum buffer size to retain in case of a retryable error
968
    /// during upload. Setting this to zero will disable upload buffering;
969
    /// this means that in the event of a failure during upload, the entire
970
    /// upload will be aborted and the client will likely receive an error.
971
    ///
972
    /// Default: 5MB.
973
    pub max_retry_buffer_per_request: Option<usize>,
974
975
    /// Maximum number of concurrent `UploadPart` requests per `MultipartUpload`.
976
    ///
977
    /// Default: 10.
978
    pub multipart_max_concurrent_uploads: Option<usize>,
979
980
    /// Allow unencrypted HTTP connections. Only use this for local testing.
981
    ///
982
    /// Default: false
983
    #[serde(default)]
984
    pub insecure_allow_http: bool,
985
986
    /// Disable http/2 connections and only use http/1.1. Default client
987
    /// configuration will have http/1.1 and http/2 enabled for connection
988
    /// schemes. Http/2 should be disabled if environments have poor support
989
    /// or performance related to http/2. Safe to keep default unless
990
    /// underlying network environment, S3, or GCS API servers specify otherwise.
991
    ///
992
    /// Default: false
993
    #[serde(default)]
994
    pub disable_http2: bool,
995
}
996
997
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
998
#[serde(rename_all = "snake_case")]
999
pub enum StoreType {
1000
    /// The store is content addressable storage.
1001
    Cas,
1002
    /// The store is an action cache.
1003
    Ac,
1004
}
1005
1006
#[derive(Serialize, Deserialize, Debug, Clone)]
1007
pub struct ClientTlsConfig {
1008
    /// Path to the certificate authority to use to validate the remote.
1009
    ///
1010
    /// Default: None
1011
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1012
    pub ca_file: Option<String>,
1013
1014
    /// Path to the certificate file for client authentication.
1015
    ///
1016
    /// Default: None
1017
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1018
    pub cert_file: Option<String>,
1019
1020
    /// Path to the private key file for client authentication.
1021
    ///
1022
    /// Default: None
1023
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1024
    pub key_file: Option<String>,
1025
1026
    /// If set the client will use the native roots for TLS connections.
1027
    ///
1028
    /// Default: false
1029
    #[serde(default)]
1030
    pub use_native_roots: Option<bool>,
1031
}
1032
1033
#[derive(Serialize, Deserialize, Debug, Clone)]
1034
#[serde(deny_unknown_fields)]
1035
pub struct GrpcEndpoint {
1036
    /// The endpoint address (i.e. grpc(s)://example.com:443).
1037
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
1038
    pub address: String,
1039
    /// The TLS configuration to use to connect to the endpoint (if grpcs).
1040
    pub tls_config: Option<ClientTlsConfig>,
1041
    /// The maximum concurrency to allow on this endpoint.
1042
    pub concurrency_limit: Option<usize>,
1043
}
1044
1045
#[derive(Serialize, Deserialize, Debug, Clone)]
1046
#[serde(deny_unknown_fields)]
1047
pub struct GrpcSpec {
1048
    /// Instance name for GRPC calls. Proxy calls will have the `instance_name` changed to this.
1049
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1050
    pub instance_name: String,
1051
1052
    /// The endpoint of the grpc connection.
1053
    pub endpoints: Vec<GrpcEndpoint>,
1054
1055
    /// The type of the upstream store, this ensures that the correct server calls are made.
1056
    pub store_type: StoreType,
1057
1058
    /// Retry configuration to use when a network request fails.
1059
    #[serde(default)]
1060
    pub retry: Retry,
1061
1062
    /// Limit the number of simultaneous upstream requests to this many.  A
1063
    /// value of zero is treated as unlimited.  If the limit is reached the
1064
    /// request is queued.
1065
    #[serde(default)]
1066
    pub max_concurrent_requests: usize,
1067
1068
    /// The number of connections to make to each specified endpoint to balance
1069
    /// the load over multiple TCP connections.  Default 1.
1070
    #[serde(default)]
1071
    pub connections_per_endpoint: usize,
1072
}
1073
1074
/// The possible error codes that might occur on an upstream request.
1075
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
1076
pub enum ErrorCode {
1077
    Cancelled = 1,
1078
    Unknown = 2,
1079
    InvalidArgument = 3,
1080
    DeadlineExceeded = 4,
1081
    NotFound = 5,
1082
    AlreadyExists = 6,
1083
    PermissionDenied = 7,
1084
    ResourceExhausted = 8,
1085
    FailedPrecondition = 9,
1086
    Aborted = 10,
1087
    OutOfRange = 11,
1088
    Unimplemented = 12,
1089
    Internal = 13,
1090
    Unavailable = 14,
1091
    DataLoss = 15,
1092
    Unauthenticated = 16,
1093
    // Note: This list is duplicated from nativelink-error/lib.rs.
1094
}
1095
1096
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1097
pub struct RedisSpec {
1098
    /// The hostname or IP address of the Redis server.
1099
    /// Ex: `["redis://username:password@redis-server-url:6380/99"]`
1100
    /// 99 Represents database ID, 6380 represents the port.
1101
    #[serde(deserialize_with = "convert_vec_string_with_shellexpand")]
1102
    pub addresses: Vec<String>,
1103
1104
    /// The response timeout for the Redis connection in seconds.
1105
    ///
1106
    /// Default: 10
1107
    #[serde(default)]
1108
    pub response_timeout_s: u64,
1109
1110
    /// The connection timeout for the Redis connection in seconds.
1111
    ///
1112
    /// Default: 10
1113
    #[serde(default)]
1114
    pub connection_timeout_s: u64,
1115
1116
    /// An optional and experimental Redis channel to publish write events to.
1117
    ///
1118
    /// If set, every time a write operation is made to a Redis node
1119
    /// then an event will be published to a Redis channel with the given name.
1120
    /// If unset, the writes will still be made,
1121
    /// but the write events will not be published.
1122
    ///
1123
    /// Default: (Empty String / No Channel)
1124
    #[serde(default)]
1125
    pub experimental_pub_sub_channel: Option<String>,
1126
1127
    /// An optional prefix to prepend to all keys in this store.
1128
    ///
1129
    /// Setting this value can make it convenient to query or
1130
    /// organize your data according to the shared prefix.
1131
    ///
1132
    /// Default: (Empty String / No Prefix)
1133
    #[serde(default)]
1134
    pub key_prefix: String,
1135
1136
    /// Set the mode Redis is operating in.
1137
    ///
1138
    /// Available options are "cluster" for
1139
    /// [cluster mode](https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/),
1140
    /// "sentinel" for [sentinel mode](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/),
1141
    /// or "standard" if Redis is operating in neither cluster nor sentinel mode.
1142
    ///
1143
    /// Default: standard,
1144
    #[serde(default)]
1145
    pub mode: RedisMode,
1146
1147
    /// When using pubsub interface, this is the maximum number of items to keep
1148
    /// queued up before dropping old items.
1149
    ///
1150
    /// Default: 4096
1151
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1152
    pub broadcast_channel_capacity: usize,
1153
1154
    /// The amount of time in milliseconds until the redis store considers the
1155
    /// command to be timed out. This will trigger a retry of the command and
1156
    /// potentially a reconnection to the redis server.
1157
    ///
1158
    /// Default: 10000 (10 seconds)
1159
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1160
    pub command_timeout_ms: u64,
1161
1162
    /// The amount of time in milliseconds until the redis store considers the
1163
    /// connection to unresponsive. This will trigger a reconnection to the
1164
    /// redis server.
1165
    ///
1166
    /// Default: 3000 (3 seconds)
1167
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1168
    pub connection_timeout_ms: u64,
1169
1170
    /// The amount of data to read from the redis server at a time.
1171
    /// This is used to limit the amount of memory used when reading
1172
    /// large objects from the redis server as well as limiting the
1173
    /// amount of time a single read operation can take.
1174
    ///
1175
    /// IMPORTANT: If this value is too high, the `command_timeout_ms`
1176
    /// might be triggered if the latency or throughput to the redis
1177
    /// server is too low.
1178
    ///
1179
    /// Default: 64KiB
1180
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1181
    pub read_chunk_size: usize,
1182
1183
    /// The number of connections to keep open to the redis server(s).
1184
    ///
1185
    /// Default: 3
1186
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1187
    pub connection_pool_size: usize,
1188
1189
    /// The maximum number of upload chunks to allow per update.
1190
    /// This is used to limit the amount of memory used when uploading
1191
    /// large objects to the redis server. A good rule of thumb is to
1192
    /// think of the data as:
1193
    /// `AVAIL_MEMORY / (read_chunk_size * max_chunk_uploads_per_update) = THORETICAL_MAX_CONCURRENT_UPLOADS`
1194
    /// (note: it is a good idea to divide `AVAIL_MAX_MEMORY` by ~10 to account for other memory usage)
1195
    ///
1196
    /// Default: 10
1197
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1198
    pub max_chunk_uploads_per_update: usize,
1199
1200
    /// The COUNT value passed when scanning keys in Redis.
1201
    /// This is used to hint the amount of work that should be done per response.
1202
    ///
1203
    /// Default: 10000
1204
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1205
    pub scan_count: u32,
1206
1207
    /// Retry configuration to use when a network request fails.
1208
    /// See the `Retry` struct for more information.
1209
    ///
1210
    /// ```txt
1211
    /// Default: Retry {
1212
    ///   max_retries: 0, /* unlimited */
1213
    ///   delay: 0.1, /* 100ms */
1214
    ///   jitter: 0.5, /* 50% */
1215
    ///   retry_on_errors: None, /* not used in redis store */
1216
    /// }
1217
    /// ```
1218
    #[serde(default)]
1219
    pub retry: Retry,
1220
}
1221
1222
#[derive(Debug, Default, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
1223
#[serde(rename_all = "snake_case")]
1224
pub enum RedisMode {
1225
    Cluster,
1226
    Sentinel,
1227
    #[default]
1228
    Standard,
1229
}
1230
1231
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
1232
pub struct NoopSpec {}
1233
1234
/// Retry configuration. This configuration is exponential and each iteration
1235
/// a jitter as a percentage is applied of the calculated delay. For example:
1236
/// ```haskell
1237
/// Retry{
1238
///   max_retries: 7,
1239
///   delay: 0.1,
1240
///   jitter: 0.5,
1241
/// }
1242
/// ```
1243
/// will result in:
1244
/// Attempt - Delay
1245
/// 1         0ms
1246
/// 2         75ms - 125ms
1247
/// 3         150ms - 250ms
1248
/// 4         300ms - 500ms
1249
/// 5         600ms - 1s
1250
/// 6         1.2s - 2s
1251
/// 7         2.4s - 4s
1252
/// 8         4.8s - 8s
1253
/// Remember that to get total results is additive, meaning the above results
1254
/// would mean a single request would have a total delay of 9.525s - 15.875s.
1255
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
1256
#[serde(deny_unknown_fields)]
1257
pub struct Retry {
1258
    /// Maximum number of retries until retrying stops.
1259
    /// Setting this to zero will always attempt 1 time, but not retry.
1260
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1261
    pub max_retries: usize,
1262
1263
    /// Delay in seconds for exponential back off.
1264
    #[serde(default)]
1265
    pub delay: f32,
1266
1267
    /// Amount of jitter to add as a percentage in decimal form. This will
1268
    /// change the formula like:
1269
    /// ```haskell
1270
    /// random(
1271
    ///    (2 ^ {attempt_number}) * {delay} * (1 - (jitter / 2)),
1272
    ///    (2 ^ {attempt_number}) * {delay} * (1 + (jitter / 2)),
1273
    /// )
1274
    /// ```
1275
    #[serde(default)]
1276
    pub jitter: f32,
1277
1278
    /// A list of error codes to retry on, if this is not set then the default
1279
    /// error codes to retry on are used.  These default codes are the most
1280
    /// likely to be non-permanent.
1281
    ///  - `Unknown`
1282
    ///  - `Cancelled`
1283
    ///  - `DeadlineExceeded`
1284
    ///  - `ResourceExhausted`
1285
    ///  - `Aborted`
1286
    ///  - `Internal`
1287
    ///  - `Unavailable`
1288
    ///  - `DataLoss`
1289
    #[serde(default)]
1290
    pub retry_on_errors: Option<Vec<ErrorCode>>,
1291
}
1292
1293
/// Configuration for `ExperimentalMongoDB` store.
1294
#[derive(Serialize, Deserialize, Debug, Clone)]
1295
#[serde(deny_unknown_fields)]
1296
pub struct ExperimentalMongoSpec {
1297
    /// `ExperimentalMongoDB` connection string.
1298
    /// Example: <mongodb://localhost:27017> or <mongodb+srv://cluster.mongodb.net>
1299
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
1300
    pub connection_string: String,
1301
1302
    /// The database name to use.
1303
    /// Default: "nativelink"
1304
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1305
    pub database: String,
1306
1307
    /// The collection name for CAS data.
1308
    /// Default: "cas"
1309
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1310
    pub cas_collection: String,
1311
1312
    /// The collection name for scheduler data.
1313
    /// Default: "scheduler"
1314
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1315
    pub scheduler_collection: String,
1316
1317
    /// Prefix to prepend to all keys stored in `MongoDB`.
1318
    /// Default: ""
1319
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1320
    pub key_prefix: Option<String>,
1321
1322
    /// The maximum amount of data to read from `MongoDB` in a single chunk (in bytes).
1323
    /// Default: 65536 (64KB)
1324
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
1325
    pub read_chunk_size: usize,
1326
1327
    /// Maximum number of concurrent uploads allowed.
1328
    /// Default: 10
1329
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1330
    pub max_concurrent_uploads: usize,
1331
1332
    /// Connection timeout in milliseconds.
1333
    /// Default: 3000
1334
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1335
    pub connection_timeout_ms: u64,
1336
1337
    /// Command timeout in milliseconds.
1338
    /// Default: 10000
1339
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1340
    pub command_timeout_ms: u64,
1341
1342
    /// Enable `MongoDB` change streams for real-time updates.
1343
    /// Required for scheduler subscriptions.
1344
    /// Default: false
1345
    #[serde(default)]
1346
    pub enable_change_streams: bool,
1347
1348
    /// Write concern 'w' parameter.
1349
    /// Can be a number (e.g., 1) or string (e.g., "majority").
1350
    /// Default: None (uses `MongoDB` default)
1351
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1352
    pub write_concern_w: Option<String>,
1353
1354
    /// Write concern 'j' parameter (journal acknowledgment).
1355
    /// Default: None (uses `MongoDB` default)
1356
    #[serde(default)]
1357
    pub write_concern_j: Option<bool>,
1358
1359
    /// Write concern timeout in milliseconds.
1360
    /// Default: None (uses `MongoDB` default)
1361
    #[serde(
1362
        default,
1363
        deserialize_with = "convert_optional_numeric_with_shellexpand"
1364
    )]
1365
    pub write_concern_timeout_ms: Option<u32>,
1366
}
1367
1368
impl Retry {
1369
9
    pub fn make_jitter_fn(&self) -> Arc<dyn Fn(Duration) -> Duration + Send + Sync> {
1370
9
        if self.jitter == 0f32 {
  Branch (1370:12): [True: 9, False: 0]
  Branch (1370:12): [Folded - Ignored]
1371
9
            Arc::new(move |delay: Duration| delay)
1372
        } else {
1373
0
            let local_jitter = self.jitter;
1374
0
            Arc::new(move |delay: Duration| {
1375
0
                delay.mul_f32(local_jitter.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
1376
0
            })
1377
        }
1378
9
    }
1379
}