Coverage Report

Created: 2026-03-19 21:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-config/src/stores.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::time::Duration;
16
use std::sync::Arc;
17
18
use rand::Rng;
19
#[cfg(feature = "dev-schema")]
20
use schemars::JsonSchema;
21
use serde::{Deserialize, Serialize};
22
23
use crate::serde_utils::{
24
    convert_boolean_with_shellexpand, convert_data_size_with_shellexpand,
25
    convert_duration_with_shellexpand, convert_numeric_with_shellexpand,
26
    convert_optional_data_size_with_shellexpand, convert_optional_numeric_with_shellexpand,
27
    convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
28
    convert_vec_string_with_shellexpand,
29
};
30
31
/// Name of the store. This type will be used when referencing a store
32
/// in the `CasConfig::stores`'s map key.
33
pub type StoreRefName = String;
34
35
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
36
#[serde(rename_all = "snake_case")]
37
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
38
pub enum ConfigDigestHashFunction {
39
    /// Use the sha256 hash function.
40
    /// <https://en.wikipedia.org/wiki/SHA-2>
41
    Sha256,
42
43
    /// Use the blake3 hash function.
44
    /// <https://en.wikipedia.org/wiki/BLAKE_(hash_function)>
45
    Blake3,
46
}
47
48
#[derive(Serialize, Deserialize, Debug, Clone)]
49
#[serde(rename_all = "snake_case")]
50
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
51
pub enum StoreSpec {
52
    /// Memory store will store all data in a hashmap in memory.
53
    ///
54
    /// **Example JSON Config:**
55
    /// ```json
56
    /// "memory": {
57
    ///   "eviction_policy": {
58
    ///     "max_bytes": "10mb",
59
    ///   }
60
    /// }
61
    /// ```
62
    ///
63
    Memory(MemorySpec),
64
65
    /// A generic blob store that will store files on the cloud
66
    /// provider. This configuration will never delete files, so you are
67
    /// responsible for purging old files in other ways.
68
    /// It supports the following backends:
69
    ///
70
    /// 1. **Amazon S3:**
71
    ///    S3 store will use Amazon's S3 service as a backend to store
72
    ///    the files. This configuration can be used to share files
73
    ///    across multiple instances. Uses system certificates for TLS
74
    ///    verification via `rustls-platform-verifier`.
75
    ///
76
    ///   **Example JSON Config:**
77
    ///   ```json
78
    ///   "experimental_cloud_object_store": {
79
    ///     "provider": "aws",
80
    ///     "region": "eu-north-1",
81
    ///     "bucket": "crossplane-bucket-af79aeca9",
82
    ///     "key_prefix": "test-prefix-index/",
83
    ///     "retry": {
84
    ///       "max_retries": 6,
85
    ///       "delay": 0.3,
86
    ///       "jitter": 0.5
87
    ///     },
88
    ///     "multipart_max_concurrent_uploads": 10
89
    ///   }
90
    ///   ```
91
    ///
92
    /// 2. **Google Cloud Storage:**
93
    ///    GCS store uses Google's GCS service as a backend to store
94
    ///    the files. This configuration can be used to share files
95
    ///    across multiple instances.
96
    ///
97
    ///   **Example JSON Config:**
98
    ///   ```json
99
    ///   "experimental_cloud_object_store": {
100
    ///     "provider": "gcs",
101
    ///     "bucket": "test-bucket",
102
    ///     "key_prefix": "test-prefix-index/",
103
    ///     "retry": {
104
    ///       "max_retries": 6,
105
    ///       "delay": 0.3,
106
    ///       "jitter": 0.5
107
    ///     },
108
    ///     "multipart_max_concurrent_uploads": 10
109
    ///   }
110
    ///   ```
111
    ///
112
    /// 3. **Azure Blob Store:**
113
    ///    Azure Blob store will use Microsoft's Azure Blob service as a
114
    ///    backend to store the files. This configuration can be used to
115
    ///    share files across multiple instances.
116
    ///
117
    ///   **Example JSON Config:**
118
    ///   ```json
119
    ///   "experimental_cloud_object_store": {
120
    ///     "provider": "azure",
121
    ///     "account_name": "cloudshell1393657559",
122
    ///     "container": "simple-test-container",
123
    ///     "key_prefix": "folder/",
124
    ///     "retry": {
125
    ///         "max_retries": 6,
126
    ///         "delay": 0.3,
127
    ///         "jitter": 0.5
128
    ///     },
129
    ///     "multipart_max_concurrent_uploads": 10
130
    ///   }
131
    ///   ```
132
    ///
133
    /// 4. **`NetApp` ONTAP S3**
134
    ///    `NetApp` ONTAP S3 store will use ONTAP's S3-compatible storage as a backend
135
    ///    to store files. This store is specifically configured for ONTAP's S3 requirements
136
    ///    including custom TLS configuration, credentials management, and proper vserver
137
    ///    configuration.
138
    ///
139
    ///    This store uses AWS environment variables for credentials:
140
    ///    - `AWS_ACCESS_KEY_ID`
141
    ///    - `AWS_SECRET_ACCESS_KEY`
142
    ///    - `AWS_DEFAULT_REGION`
143
    ///
144
    ///    **Example JSON Config:**
145
    ///    ```json
146
    ///    "experimental_cloud_object_store": {
147
    ///      "provider": "ontap",
148
    ///      "endpoint": "https://ontap-s3-endpoint:443",
149
    ///      "vserver_name": "your-vserver",
150
    ///      "bucket": "your-bucket",
151
    ///      "root_certificates": "/path/to/certs.pem",  // Optional
152
    ///      "key_prefix": "test-prefix/",               // Optional
153
    ///      "retry": {
154
    ///        "max_retries": 6,
155
    ///        "delay": 0.3,
156
    ///        "jitter": 0.5
157
    ///      },
158
    ///      "multipart_max_concurrent_uploads": 10
159
    ///    }
160
    ///    ```
161
    ExperimentalCloudObjectStore(ExperimentalCloudObjectSpec),
162
163
    /// ONTAP S3 Existence Cache provides a caching layer on top of the ONTAP S3 store
164
    /// to optimize repeated existence checks. It maintains an in-memory cache of object
165
    /// digests and periodically syncs this cache to disk for persistence.
166
    ///
167
    /// The cache helps reduce latency for repeated calls to check object existence,
168
    /// while still ensuring eventual consistency with the underlying ONTAP S3 store.
169
    ///
170
    /// Example JSON Config:
171
    /// ```json
172
    /// "ontap_s3_existence_cache": {
173
    ///   "index_path": "/path/to/cache/index.json",
174
    ///   "sync_interval_seconds": 300,
175
    ///   "backend": {
176
    ///     "endpoint": "https://ontap-s3-endpoint:443",
177
    ///     "vserver_name": "your-vserver",
178
    ///     "bucket": "your-bucket",
179
    ///     "key_prefix": "test-prefix/"
180
    ///   }
181
    /// }
182
    /// ```
183
    ///
184
    OntapS3ExistenceCache(Box<OntapS3ExistenceCacheSpec>),
185
186
    /// Verify store is used to apply verifications to an underlying
187
    /// store implementation. It is strongly encouraged to validate
188
    /// as much data as you can before accepting data from a client,
189
    /// failing to do so may cause the data in the store to be
190
    /// populated with invalid data causing all kinds of problems.
191
    ///
192
    /// The suggested configuration is to have the CAS validate the
193
    /// hash and size and the AC validate nothing.
194
    ///
195
    /// **Example JSON Config:**
196
    /// ```json
197
    /// "verify": {
198
    ///   "backend": {
199
    ///     "memory": {
200
    ///       "eviction_policy": {
201
    ///         "max_bytes": "500mb"
202
    ///       }
203
    ///     },
204
    ///   },
205
    ///   "verify_size": true,
206
    ///   "verify_hash": true
207
    /// }
208
    /// ```
209
    ///
210
    Verify(Box<VerifySpec>),
211
212
    /// Completeness checking store verifies if the
213
    /// output files & folders exist in the CAS before forwarding
214
    /// the request to the underlying store.
215
    /// Note: This store should only be used on AC stores.
216
    ///
217
    /// **Example JSON Config:**
218
    /// ```json
219
    /// "completeness_checking": {
220
    ///   "backend": {
221
    ///     "filesystem": {
222
    ///       "content_path": "~/.cache/nativelink/content_path-ac",
223
    ///       "temp_path": "~/.cache/nativelink/tmp_path-ac",
224
    ///       "eviction_policy": {
225
    ///         "max_bytes": "500mb",
226
    ///       }
227
    ///     }
228
    ///   },
229
    ///   "cas_store": {
230
    ///     "ref_store": {
231
    ///       "name": "CAS_MAIN_STORE"
232
    ///     }
233
    ///   }
234
    /// }
235
    /// ```
236
    ///
237
    CompletenessChecking(Box<CompletenessCheckingSpec>),
238
239
    /// A compression store that will compress the data inbound and
240
    /// outbound. There will be a non-trivial cost to compress and
241
    /// decompress the data, but in many cases if the final store is
242
    /// a store that requires network transport and/or storage space
243
    /// is a concern it is often faster and more efficient to use this
244
    /// store before those stores.
245
    ///
246
    /// **Example JSON Config:**
247
    /// ```json
248
    /// "compression": {
249
    ///   "compression_algorithm": {
250
    ///     "lz4": {}
251
    ///   },
252
    ///   "backend": {
253
    ///     "filesystem": {
254
    ///       "content_path": "/tmp/nativelink/data/content_path-cas",
255
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-cas",
256
    ///       "eviction_policy": {
257
    ///         "max_bytes": "2gb",
258
    ///       }
259
    ///     }
260
    ///   }
261
    /// }
262
    /// ```
263
    ///
264
    Compression(Box<CompressionSpec>),
265
266
    /// A dedup store will take the inputs and run a rolling hash
267
    /// algorithm on them to slice the input into smaller parts then
268
    /// run a sha256 algorithm on the slice and if the object doesn't
269
    /// already exist, upload the slice to the `content_store` using
270
    /// a new digest of just the slice. Once all parts exist, an
271
    /// Action-Cache-like digest will be built and uploaded to the
272
    /// `index_store` which will contain a reference to each
273
    /// chunk/digest of the uploaded file. Downloading a request will
274
    /// first grab the index from the `index_store`, and forward the
275
    /// download content of each chunk as if it were one file.
276
    ///
277
    /// This store is exceptionally good when the following conditions
278
    /// are met:
279
    /// * Content is mostly the same (inserts, updates, deletes are ok)
280
    /// * Content is not compressed or encrypted
281
    /// * Uploading or downloading from `content_store` is the bottleneck.
282
    ///
283
    /// Note: This store pairs well when used with `CompressionSpec` as
284
    /// the `content_store`, but never put `DedupSpec` as the backend of
285
    /// `CompressionSpec` as it will negate all the gains.
286
    ///
287
    /// Note: When running `.has()` on this store, it will only check
288
    /// to see if the entry exists in the `index_store` and not check
289
    /// if the individual chunks exist in the `content_store`.
290
    ///
291
    /// **Example JSON Config:**
292
    /// ```json
293
    /// "dedup": {
294
    ///   "index_store": {
295
    ///     "memory": {
296
    ///       "eviction_policy": {
297
    ///          "max_bytes": "1GB",
298
    ///       }
299
    ///     }
300
    ///   },
301
    ///   "content_store": {
302
    ///     "compression": {
303
    ///       "compression_algorithm": {
304
    ///         "lz4": {}
305
    ///       },
306
    ///       "backend": {
307
    ///         "fast_slow": {
308
    ///           "fast": {
309
    ///             "memory": {
310
    ///               "eviction_policy": {
311
    ///                 "max_bytes": "500MB",
312
    ///               }
313
    ///             }
314
    ///           },
315
    ///           "slow": {
316
    ///             "filesystem": {
317
    ///               "content_path": "/tmp/nativelink/data/content_path-content",
318
    ///               "temp_path": "/tmp/nativelink/data/tmp_path-content",
319
    ///               "eviction_policy": {
320
    ///                 "max_bytes": "2gb"
321
    ///               }
322
    ///             }
323
    ///           }
324
    ///         }
325
    ///       }
326
    ///     }
327
    ///   }
328
    /// }
329
    /// ```
330
    ///
331
    Dedup(Box<DedupSpec>),
332
333
    /// Existence store will wrap around another store and cache calls
334
    /// to has so that subsequent `has_with_results` calls will be
335
    /// faster. This is useful for cases when you have a store that
336
    /// is slow to respond to has calls.
337
    /// Note: This store should only be used on CAS stores.
338
    ///
339
    /// **Example JSON Config:**
340
    /// ```json
341
    /// "existence_cache": {
342
    ///   "backend": {
343
    ///     "memory": {
344
    ///       "eviction_policy": {
345
    ///         "max_bytes": "500mb",
346
    ///       }
347
    ///     }
348
    ///   },
349
    ///   // Note this is the existence store policy, not the backend policy
350
    ///   "eviction_policy": {
351
    ///     "max_seconds": 100,
352
    ///   }
353
    /// }
354
    /// ```
355
    ///
356
    ExistenceCache(Box<ExistenceCacheSpec>),
357
358
    /// `FastSlow` store will first try to fetch the data from the `fast`
359
    /// store and then if it does not exist try the `slow` store.
360
    /// When the object does exist in the `slow` store, it will copy
361
    /// the data to the `fast` store while returning the data.
362
    /// This store should be thought of as a store that "buffers"
363
    /// the data to the `fast` store.
364
    /// On uploads it will mirror data to both `fast` and `slow` stores.
365
    ///
366
    /// WARNING: If you need data to always exist in the `slow` store
367
    /// for something like remote execution, be careful because this
368
    /// store will never check to see if the objects exist in the
369
    /// `slow` store if it exists in the `fast` store (ie: it assumes
370
    /// that if an object exists in the `fast` store it will exist in
371
    /// the `slow` store).
372
    ///
373
    /// ***Example JSON Config:***
374
    /// ```json
375
    /// "fast_slow": {
376
    ///   "fast": {
377
    ///     "filesystem": {
378
    ///       "content_path": "/tmp/nativelink/data/content_path-index",
379
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-index",
380
    ///       "eviction_policy": {
381
    ///         "max_bytes": "500mb",
382
    ///       }
383
    ///     }
384
    ///   },
385
    ///   "slow": {
386
    ///     "filesystem": {
387
    ///       "content_path": "/tmp/nativelink/data/content_path-index",
388
    ///       "temp_path": "/tmp/nativelink/data/tmp_path-index",
389
    ///       "eviction_policy": {
390
    ///         "max_bytes": "500mb",
391
    ///       }
392
    ///     }
393
    ///   }
394
    /// }
395
    /// ```
396
    ///
397
    FastSlow(Box<FastSlowSpec>),
398
399
    /// Shards the data to multiple stores. This is useful for cases
400
    /// when you want to distribute the load across multiple stores.
401
    /// The digest hash is used to determine which store to send the
402
    /// data to.
403
    ///
404
    /// **Example JSON Config:**
405
    /// ```json
406
    /// "shard": {
407
    ///   "stores": [
408
    ///    {
409
    ///     "store": {
410
    ///       "memory": {
411
    ///         "eviction_policy": {
412
    ///             "max_bytes": "10mb"
413
    ///         },
414
    ///       },
415
    ///     },
416
    ///     "weight": 1
417
    ///   }]
418
    /// }
419
    /// ```
420
    ///
421
    Shard(ShardSpec),
422
423
    /// Stores the data on the filesystem. This store is designed for
424
    /// local persistent storage. Restarts of this program should restore
425
    /// the previous state, meaning anything uploaded will be persistent
426
    /// as long as the filesystem integrity holds.
427
    ///
428
    /// **Example JSON Config:**
429
    /// ```json
430
    /// "filesystem": {
431
    ///   "content_path": "/tmp/nativelink/data-worker-test/content_path-cas",
432
    ///   "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas",
433
    ///   "eviction_policy": {
434
    ///     "max_bytes": "10gb",
435
    ///   }
436
    /// }
437
    /// ```
438
    ///
439
    Filesystem(FilesystemSpec),
440
441
    /// Store used to reference a store in the root store manager.
442
    /// This is useful for cases when you want to share a store in different
443
    /// nested stores. Example, you may want to share the same memory store
444
    /// used for the action cache, but use a `FastSlowSpec` and have the fast
445
    /// store also share the memory store for efficiency.
446
    ///
447
    /// **Example JSON Config:**
448
    /// ```json
449
    /// "ref_store": {
450
    ///   "name": "FS_CONTENT_STORE"
451
    /// }
452
    /// ```
453
    ///
454
    RefStore(RefSpec),
455
456
    /// Uses the size field of the digest to separate which store to send the
457
    /// data. This is useful for cases when you'd like to put small objects
458
    /// in one store and large objects in another store. This should only be
459
    /// used if the size field is the real size of the content, in other
460
    /// words, don't use on AC (Action Cache) stores. Any store where you can
461
    /// safely use `VerifySpec.verify_size = true`, this store should be safe
462
    /// to use (ie: CAS stores).
463
    ///
464
    /// **Example JSON Config:**
465
    /// ```json
466
    /// "size_partitioning": {
467
    ///   "size": "128mib",
468
    ///   "lower_store": {
469
    ///     "memory": {
470
    ///       "eviction_policy": {
471
    ///         "max_bytes": "${NATIVELINK_CAS_MEMORY_CONTENT_LIMIT:-100mb}"
472
    ///       }
473
    ///     }
474
    ///   },
475
    ///   "upper_store": {
476
    ///     /// This store discards data larger than 128mib.
477
    ///     "noop": {}
478
    ///   }
479
    /// }
480
    /// ```
481
    ///
482
    SizePartitioning(Box<SizePartitioningSpec>),
483
484
    /// This store will pass-through calls to another GRPC store. This store
485
    /// is not designed to be used as a sub-store of another store, but it
486
    /// does satisfy the interface and will likely work.
487
    ///
488
    /// One major GOTCHA is that some stores use a special function on this
489
    /// store to get the size of the underlying object, which is only reliable
490
    /// when this store is serving the a CAS store, not an AC store. If using
491
    /// this store directly without being a child of any store there are no
492
    /// side effects and is the most efficient way to use it.
493
    ///
494
    /// **Example JSON Config:**
495
    /// ```json
496
    /// "grpc": {
497
    ///   "instance_name": "main",
498
    ///   "endpoints": [
499
    ///     {"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
500
    ///   ],
501
    ///   "connections_per_endpoint": "5",
502
    ///   "rpc_timeout_s": "5m",
503
    ///   "store_type": "ac"
504
    /// }
505
    /// ```
506
    ///
507
    Grpc(GrpcSpec),
508
509
    /// Stores data in any stores compatible with Redis APIs.
510
    ///
511
    /// Pairs well with `SizePartitioning` and/or `FastSlow` stores.
512
    /// Ideal for accepting small object sizes as most redis store
513
    /// services have a max file upload of between 256Mb-512Mb.
514
    ///
515
    /// **Example JSON Config:**
516
    /// ```json
517
    /// "redis_store": {
518
    ///   "addresses": [
519
    ///     "redis://127.0.0.1:6379/",
520
    ///   ],
521
    ///   "max_client_permits": 1000,
522
    /// }
523
    /// ```
524
    ///
525
    RedisStore(RedisSpec),
526
527
    /// Noop store is a store that sends streams into the void and all data
528
    /// retrieval will return 404 (`NotFound`). This can be useful for cases
529
    /// where you may need to partition your data and part of your data needs
530
    /// to be discarded.
531
    ///
532
    /// **Example JSON Config:**
533
    /// ```json
534
    /// "noop": {}
535
    /// ```
536
    ///
537
    Noop(NoopSpec),
538
539
    /// Experimental `MongoDB` store implementation.
540
    ///
541
    /// This store uses `MongoDB` as a backend for storing data. It supports
542
    /// both CAS (Content Addressable Storage) and scheduler data with
543
    /// optional change streams for real-time updates.
544
    ///
545
    /// **Example JSON Config:**
546
    /// ```json
547
    /// "experimental_mongo": {
548
    ///     "connection_string": "mongodb://localhost:27017",
549
    ///     "database": "nativelink",
550
    ///     "cas_collection": "cas",
551
    ///     "key_prefix": "cas:",
552
    ///     "read_chunk_size": 65536,
553
    ///     "max_concurrent_uploads": 10,
554
    ///     "enable_change_streams": false
555
    /// }
556
    /// ```
557
    ///
558
    ExperimentalMongo(ExperimentalMongoSpec),
559
}
560
561
/// Configuration for an individual shard of the store.
562
#[derive(Serialize, Deserialize, Debug, Clone)]
563
#[serde(deny_unknown_fields)]
564
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
565
pub struct ShardConfig {
566
    /// Store to shard the data to.
567
    pub store: StoreSpec,
568
569
    /// The weight of the store. This is used to determine how much data
570
    /// should be sent to the store. The actual percentage is the sum of
571
    /// all the store's weights divided by the individual store's weight.
572
    ///
573
    /// Default: 1
574
    #[serde(deserialize_with = "convert_optional_numeric_with_shellexpand")]
575
    pub weight: Option<u32>,
576
}
577
578
#[derive(Serialize, Deserialize, Debug, Clone)]
579
#[serde(deny_unknown_fields)]
580
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
581
pub struct ShardSpec {
582
    /// Stores to shard the data to.
583
    pub stores: Vec<ShardConfig>,
584
}
585
586
#[derive(Serialize, Deserialize, Debug, Clone)]
587
#[serde(deny_unknown_fields)]
588
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
589
pub struct SizePartitioningSpec {
590
    /// Size to partition the data on.
591
    #[serde(deserialize_with = "convert_data_size_with_shellexpand")]
592
    pub size: u64,
593
594
    /// Store to send data when object is < (less than) size.
595
    pub lower_store: StoreSpec,
596
597
    /// Store to send data when object is >= (less than eq) size.
598
    pub upper_store: StoreSpec,
599
}
600
601
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
602
#[serde(deny_unknown_fields)]
603
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
604
pub struct RefSpec {
605
    /// Name of the store under the root "stores" config object.
606
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
607
    pub name: String,
608
}
609
610
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
611
#[serde(deny_unknown_fields)]
612
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
613
pub struct FilesystemSpec {
614
    /// Path on the system where to store the actual content. This is where
615
    /// the bulk of the data will be placed.
616
    /// On service bootup this folder will be scanned and all files will be
617
    /// added to the cache. In the event one of the files doesn't match the
618
    /// criteria, the file will be deleted.
619
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
620
    pub content_path: String,
621
622
    /// A temporary location of where files that are being uploaded or
623
    /// deleted will be placed while the content cannot be guaranteed to be
624
    /// accurate. This location must be on the same block device as
625
    /// `content_path` so atomic moves can happen (ie: move without copy).
626
    /// All files in this folder will be deleted on every startup.
627
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
628
    pub temp_path: String,
629
630
    /// Buffer size to use when reading files. Generally this should be left
631
    /// to the default value except for testing.
632
    /// Default: 32k.
633
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
634
    pub read_buffer_size: u32,
635
636
    /// Policy used to evict items out of the store. Failure to set this
637
    /// value will cause items to never be removed from the store causing
638
    /// infinite memory usage.
639
    pub eviction_policy: Option<EvictionPolicy>,
640
641
    /// The block size of the filesystem for the running machine
642
    /// value is used to determine an entry's actual size on disk consumed
643
    /// For a 4KB block size filesystem, a 1B file actually consumes 4KB
644
    /// Default: 4096
645
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
646
    pub block_size: u64,
647
648
    /// Maximum number of concurrent write operations allowed.
649
    /// Each write involves streaming data to a temp file and calling `sync_all()`,
650
    /// which can saturate disk I/O when many writes happen simultaneously.
651
    /// Limiting concurrency prevents disk saturation from blocking the async
652
    /// runtime.
653
    /// A value of 0 means unlimited (no concurrency limit).
654
    /// Default: 0
655
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
656
    pub max_concurrent_writes: usize,
657
}
658
659
// NetApp ONTAP S3 Spec
660
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
661
#[serde(deny_unknown_fields)]
662
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
663
pub struct ExperimentalOntapS3Spec {
664
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
665
    pub endpoint: String,
666
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
667
    pub vserver_name: String,
668
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
669
    pub bucket: String,
670
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
671
    pub root_certificates: Option<String>,
672
673
    /// Common retry and upload configuration
674
    #[serde(flatten)]
675
    pub common: CommonObjectSpec,
676
}
677
678
#[derive(Serialize, Deserialize, Debug, Clone)]
679
#[serde(deny_unknown_fields)]
680
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
681
pub struct OntapS3ExistenceCacheSpec {
682
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
683
    pub index_path: String,
684
    #[serde(deserialize_with = "convert_numeric_with_shellexpand")]
685
    pub sync_interval_seconds: u32,
686
    pub backend: Box<ExperimentalOntapS3Spec>,
687
}
688
689
#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy, PartialEq, Eq)]
690
#[serde(rename_all = "snake_case")]
691
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
692
pub enum StoreDirection {
693
    /// The store operates normally and all get and put operations are
694
    /// handled by it.
695
    #[default]
696
    Both,
697
    /// Update operations will cause persistence to this store, but Get
698
    /// operations will be ignored.
699
    /// This only makes sense on the fast store as the slow store will
700
    /// never get written to on Get anyway.
701
    Update,
702
    /// Get operations will cause persistence to this store, but Update
703
    /// operations will be ignored.
704
    Get,
705
    /// Operate as a read only store, only really makes sense if there's
706
    /// another way to write to it.
707
    ReadOnly,
708
}
709
710
#[derive(Serialize, Deserialize, Debug, Clone)]
711
#[serde(deny_unknown_fields)]
712
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
713
pub struct FastSlowSpec {
714
    /// Fast store that will be attempted to be contacted before reaching
715
    /// out to the `slow` store.
716
    pub fast: StoreSpec,
717
718
    /// How to handle the fast store.  This can be useful to set to Get for
719
    /// worker nodes such that results are persisted to the slow store only.
720
    #[serde(default)]
721
    pub fast_direction: StoreDirection,
722
723
    /// If the object does not exist in the `fast` store it will try to
724
    /// get it from this store.
725
    pub slow: StoreSpec,
726
727
    /// How to handle the slow store.  This can be useful if creating a diode
728
    /// and you wish to have an upstream read only store.
729
    #[serde(default)]
730
    pub slow_direction: StoreDirection,
731
}
732
733
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
734
#[serde(deny_unknown_fields)]
735
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
736
pub struct MemorySpec {
737
    /// Policy used to evict items out of the store. Failure to set this
738
    /// value will cause items to never be removed from the store causing
739
    /// infinite memory usage.
740
    pub eviction_policy: Option<EvictionPolicy>,
741
}
742
743
#[derive(Serialize, Deserialize, Debug, Clone)]
744
#[serde(deny_unknown_fields)]
745
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
746
pub struct DedupSpec {
747
    /// Store used to store the index of each dedup slice. This store
748
    /// should generally be fast and small.
749
    pub index_store: StoreSpec,
750
751
    /// The store where the individual chunks will be uploaded. This
752
    /// store should generally be the slower & larger store.
753
    pub content_store: StoreSpec,
754
755
    /// Minimum size that a chunk will be when slicing up the content.
756
    /// Note: This setting can be increased to improve performance
757
    /// because it will actually not check this number of bytes when
758
    /// deciding where to partition the data.
759
    ///
760
    /// Default: 65536 (64k)
761
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
762
    pub min_size: u32,
763
764
    /// A best-effort attempt will be made to keep the average size
765
    /// of the chunks to this number. It is not a guarantee, but a
766
    /// slight attempt will be made.
767
    ///
768
    /// This value will also be about the threshold used to determine
769
    /// if we should even attempt to dedup the entry or just forward
770
    /// it directly to the `content_store` without an index. The actual
771
    /// value will be about `normal_size * 1.3` due to implementation
772
    /// details.
773
    ///
774
    /// Default: 262144 (256k)
775
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
776
    pub normal_size: u32,
777
778
    /// Maximum size a chunk is allowed to be.
779
    ///
780
    /// Default: 524288 (512k)
781
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
782
    pub max_size: u32,
783
784
    /// Due to implementation detail, we want to prefer to download
785
    /// the first chunks of the file so we can stream the content
786
    /// out and free up some of our buffers. This configuration
787
    /// will be used to to restrict the number of concurrent chunk
788
    /// downloads at a time per `get()` request.
789
    ///
790
    /// This setting will also affect how much memory might be used
791
    /// per `get()` request. Estimated worst case memory per `get()`
792
    /// request is: `max_concurrent_fetch_per_get * max_size`.
793
    ///
794
    /// Default: 10
795
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
796
    pub max_concurrent_fetch_per_get: u32,
797
}
798
799
#[derive(Serialize, Deserialize, Debug, Clone)]
800
#[serde(deny_unknown_fields)]
801
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
802
pub struct ExistenceCacheSpec {
803
    /// The underlying store wrap around. All content will first flow
804
    /// through self before forwarding to backend. In the event there
805
    /// is an error detected in self, the connection to the backend
806
    /// will be terminated, and early termination should always cause
807
    /// updates to fail on the backend.
808
    pub backend: StoreSpec,
809
810
    /// Policy used to evict items out of the store. Failure to set this
811
    /// value will cause items to never be removed from the store causing
812
    /// infinite memory usage.
813
    pub eviction_policy: Option<EvictionPolicy>,
814
}
815
816
#[derive(Serialize, Deserialize, Debug, Clone)]
817
#[serde(deny_unknown_fields)]
818
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
819
pub struct VerifySpec {
820
    /// The underlying store wrap around. All content will first flow
821
    /// through self before forwarding to backend. In the event there
822
    /// is an error detected in self, the connection to the backend
823
    /// will be terminated, and early termination should always cause
824
    /// updates to fail on the backend.
825
    pub backend: StoreSpec,
826
827
    /// If set the store will verify the size of the data before accepting
828
    /// an upload of data.
829
    ///
830
    /// This should be set to false for AC, but true for CAS stores.
831
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
832
    pub verify_size: bool,
833
834
    /// If the data should be hashed and verify that the key matches the
835
    /// computed hash. The hash function is automatically determined based
836
    /// request and if not set will use the global default.
837
    ///
838
    /// This should be set to false for AC, but true for CAS stores.
839
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
840
    pub verify_hash: bool,
841
}
842
843
#[derive(Serialize, Deserialize, Debug, Clone)]
844
#[serde(deny_unknown_fields)]
845
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
846
pub struct CompletenessCheckingSpec {
847
    /// The underlying store that will have it's results validated before sending to client.
848
    pub backend: StoreSpec,
849
850
    /// When a request is made, the results are decoded and all output digests/files are verified
851
    /// to exist in this CAS store before returning success.
852
    pub cas_store: StoreSpec,
853
}
854
855
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq, Clone, Copy)]
856
#[serde(deny_unknown_fields)]
857
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
858
pub struct Lz4Config {
859
    /// Size of the blocks to compress.
860
    /// Higher values require more ram, but might yield slightly better
861
    /// compression ratios.
862
    ///
863
    /// Default: 65536 (64k).
864
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
865
    pub block_size: u32,
866
867
    /// Maximum size allowed to attempt to deserialize data into.
868
    /// This is needed because the `block_size` is embedded into the data
869
    /// so if there was a bad actor, they could upload an extremely large
870
    /// `block_size`'ed entry and we'd allocate a large amount of memory
871
    /// when retrieving the data. To prevent this from happening, we
872
    /// allow you to specify the maximum that we'll attempt deserialize.
873
    ///
874
    /// Default: value in `block_size`.
875
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
876
    pub max_decode_block_size: u32,
877
}
878
879
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
880
#[serde(rename_all = "snake_case")]
881
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
882
pub enum CompressionAlgorithm {
883
    /// LZ4 compression algorithm is extremely fast for compression and
884
    /// decompression, however does not perform very well in compression
885
    /// ratio. In most cases build artifacts are highly compressible, however
886
    /// lz4 is quite good at aborting early if the data is not deemed very
887
    /// compressible.
888
    ///
889
    /// see: <https://lz4.github.io/lz4/>
890
    Lz4(Lz4Config),
891
}
892
893
#[derive(Serialize, Deserialize, Debug, Clone)]
894
#[serde(deny_unknown_fields)]
895
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
896
pub struct CompressionSpec {
897
    /// The underlying store wrap around. All content will first flow
898
    /// through self before forwarding to backend. In the event there
899
    /// is an error detected in self, the connection to the backend
900
    /// will be terminated, and early termination should always cause
901
    /// updates to fail on the backend.
902
    pub backend: StoreSpec,
903
904
    /// The compression algorithm to use.
905
    pub compression_algorithm: CompressionAlgorithm,
906
}
907
908
/// Eviction policy always works on LRU (Least Recently Used). Any time an entry
909
/// is touched it updates the timestamp. Inserts and updates will execute the
910
/// eviction policy removing any expired entries and/or the oldest entries
911
/// until the store size becomes smaller than `max_bytes`.
912
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
913
#[serde(deny_unknown_fields)]
914
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
915
pub struct EvictionPolicy {
916
    /// Maximum number of bytes before eviction takes place.
917
    /// Default: 0. Zero means never evict based on size.
918
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
919
    pub max_bytes: usize,
920
921
    /// When eviction starts based on hitting `max_bytes`, continue until
922
    /// `max_bytes - evict_bytes` is met to create a low watermark.  This stops
923
    /// operations from thrashing when the store is close to the limit.
924
    /// Default: 0
925
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
926
    pub evict_bytes: usize,
927
928
    /// Maximum number of seconds for an entry to live since it was last
929
    /// accessed before it is evicted.
930
    /// Default: 0. Zero means never evict based on time.
931
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
932
    pub max_seconds: u32,
933
934
    /// Maximum size of the store before an eviction takes place.
935
    /// Default: 0. Zero means never evict based on count.
936
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
937
    pub max_count: u64,
938
}
939
940
#[derive(Serialize, Deserialize, Debug, Clone)]
941
#[serde(tag = "provider", rename_all = "snake_case")]
942
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
943
pub enum ExperimentalCloudObjectSpec {
944
    Aws(ExperimentalAwsSpec),
945
    Gcs(ExperimentalGcsSpec),
946
    Azure(ExperimentalAzureSpec),
947
    Ontap(ExperimentalOntapS3Spec),
948
}
949
950
impl Default for ExperimentalCloudObjectSpec {
951
0
    fn default() -> Self {
952
0
        Self::Aws(ExperimentalAwsSpec::default())
953
0
    }
954
}
955
956
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
957
#[serde(deny_unknown_fields)]
958
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
959
pub struct ExperimentalAwsSpec {
960
    /// S3 region. Usually us-east-1, us-west-2, af-south-1, exc...
961
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
962
    pub region: String,
963
964
    /// Bucket name to use as the backend.
965
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
966
    pub bucket: String,
967
968
    /// Common retry and upload configuration
969
    #[serde(flatten)]
970
    pub common: CommonObjectSpec,
971
}
972
973
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
974
#[serde(deny_unknown_fields)]
975
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
976
pub struct ExperimentalGcsSpec {
977
    /// Bucket name to use as the backend.
978
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
979
    pub bucket: String,
980
981
    /// Chunk size for resumable uploads.
982
    ///
983
    /// Default: 2MB
984
    #[serde(
985
        default,
986
        deserialize_with = "convert_optional_data_size_with_shellexpand"
987
    )]
988
    pub resumable_chunk_size: Option<usize>,
989
990
    /// Common retry and upload configuration
991
    #[serde(flatten)]
992
    pub common: CommonObjectSpec,
993
994
    /// Error if authentication was not found.
995
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
996
    pub authentication_required: bool,
997
998
    /// Connection timeout in milliseconds.
999
    /// Default: 3000
1000
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1001
    pub connection_timeout_s: u64,
1002
1003
    /// Read timeout in milliseconds.
1004
    /// Default: 3000
1005
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1006
    pub read_timeout_s: u64,
1007
}
1008
1009
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
1010
#[serde(deny_unknown_fields)]
1011
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1012
pub struct ExperimentalAzureSpec {
1013
    /// The Azure Storage account name.
1014
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1015
    pub account_name: String,
1016
1017
    /// The container name to use as the backend.
1018
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1019
    pub container: String,
1020
1021
    /// Common retry and upload configuration.
1022
    #[serde(flatten)]
1023
    pub common: CommonObjectSpec,
1024
1025
    /// Connection timeout in milliseconds.
1026
    /// Default: 3000
1027
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1028
    pub connection_timeout_s: u64,
1029
1030
    /// Read timeout in milliseconds.
1031
    /// Default: 3000
1032
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1033
    pub read_timeout_s: u64,
1034
}
1035
1036
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
1037
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1038
pub struct CommonObjectSpec {
1039
    /// If you wish to prefix the location in the bucket. If None, no prefix will be used.
1040
    #[serde(default)]
1041
    pub key_prefix: Option<String>,
1042
1043
    /// Retry configuration to use when a network request fails.
1044
    #[serde(default)]
1045
    pub retry: Retry,
1046
1047
    /// If the number of seconds since the `last_modified` time of the object
1048
    /// is greater than this value, the object will not be considered
1049
    /// "existing". This allows for external tools to delete objects that
1050
    /// have not been uploaded in a long time. If a client receives a `NotFound`
1051
    /// the client should re-upload the object.
1052
    ///
1053
    /// There should be sufficient buffer time between how long the expiration
1054
    /// configuration of the external tool is and this value. Keeping items
1055
    /// around for a few days is generally a good idea.
1056
    ///
1057
    /// Default: 0. Zero means never consider an object expired.
1058
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1059
    pub consider_expired_after_s: u32,
1060
1061
    /// The maximum buffer size to retain in case of a retryable error
1062
    /// during upload. Setting this to zero will disable upload buffering;
1063
    /// this means that in the event of a failure during upload, the entire
1064
    /// upload will be aborted and the client will likely receive an error.
1065
    ///
1066
    /// Default: 5MB.
1067
    #[serde(
1068
        default,
1069
        deserialize_with = "convert_optional_data_size_with_shellexpand"
1070
    )]
1071
    pub max_retry_buffer_per_request: Option<usize>,
1072
1073
    /// Maximum number of concurrent `UploadPart` requests per `MultipartUpload`.
1074
    ///
1075
    /// Default: 10.
1076
    ///
1077
    #[serde(
1078
        default,
1079
        deserialize_with = "convert_optional_numeric_with_shellexpand"
1080
    )]
1081
    pub multipart_max_concurrent_uploads: Option<usize>,
1082
1083
    /// Allow unencrypted HTTP connections. Only use this for local testing.
1084
    ///
1085
    /// Default: false
1086
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
1087
    pub insecure_allow_http: bool,
1088
1089
    /// Disable http/2 connections and only use http/1.1. Default client
1090
    /// configuration will have http/1.1 and http/2 enabled for connection
1091
    /// schemes. Http/2 should be disabled if environments have poor support
1092
    /// or performance related to http/2. Safe to keep default unless
1093
    /// underlying network environment, S3, or GCS API servers specify otherwise.
1094
    ///
1095
    /// Default: false
1096
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
1097
    pub disable_http2: bool,
1098
}
1099
1100
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
1101
#[serde(rename_all = "snake_case")]
1102
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1103
pub enum StoreType {
1104
    /// The store is content addressable storage.
1105
    Cas,
1106
    /// The store is an action cache.
1107
    Ac,
1108
}
1109
1110
#[derive(Serialize, Deserialize, Debug, Clone)]
1111
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1112
pub struct ClientTlsConfig {
1113
    /// Path to the certificate authority to use to validate the remote.
1114
    ///
1115
    /// Default: None
1116
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1117
    pub ca_file: Option<String>,
1118
1119
    /// Path to the certificate file for client authentication.
1120
    ///
1121
    /// Default: None
1122
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1123
    pub cert_file: Option<String>,
1124
1125
    /// Path to the private key file for client authentication.
1126
    ///
1127
    /// Default: None
1128
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1129
    pub key_file: Option<String>,
1130
1131
    /// If set the client will use the native roots for TLS connections.
1132
    ///
1133
    /// Default: false
1134
    #[serde(default)]
1135
    pub use_native_roots: Option<bool>,
1136
}
1137
1138
#[derive(Serialize, Deserialize, Debug, Clone)]
1139
#[serde(deny_unknown_fields)]
1140
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1141
pub struct GrpcEndpoint {
1142
    /// The endpoint address (i.e. grpc(s)://example.com:443).
1143
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
1144
    pub address: String,
1145
    /// The TLS configuration to use to connect to the endpoint (if grpcs).
1146
    pub tls_config: Option<ClientTlsConfig>,
1147
    /// The maximum concurrency to allow on this endpoint.
1148
    #[serde(
1149
        default,
1150
        deserialize_with = "convert_optional_numeric_with_shellexpand"
1151
    )]
1152
    pub concurrency_limit: Option<usize>,
1153
1154
    /// Timeout for establishing a TCP connection to the endpoint (seconds).
1155
    /// If not set or 0, defaults to 30 seconds.
1156
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1157
    pub connect_timeout_s: u64,
1158
1159
    /// TCP keepalive interval (seconds). Sends TCP keepalive probes at this
1160
    /// interval to detect dead connections at the OS level.
1161
    /// If not set or 0, defaults to 30 seconds.
1162
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1163
    pub tcp_keepalive_s: u64,
1164
1165
    /// HTTP/2 keepalive interval (seconds). Sends HTTP/2 PING frames at this
1166
    /// interval to detect dead connections at the application level.
1167
    /// If not set or 0, defaults to 30 seconds.
1168
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1169
    pub http2_keepalive_interval_s: u64,
1170
1171
    /// HTTP/2 keepalive timeout (seconds). If a PING response is not received
1172
    /// within this duration, the connection is considered dead.
1173
    /// If not set or 0, defaults to 20 seconds.
1174
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1175
    pub http2_keepalive_timeout_s: u64,
1176
}
1177
1178
#[derive(Serialize, Deserialize, Debug, Clone)]
1179
#[serde(deny_unknown_fields)]
1180
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1181
pub struct GrpcSpec {
1182
    /// Instance name for GRPC calls. Proxy calls will have the `instance_name` changed to this.
1183
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1184
    pub instance_name: String,
1185
1186
    /// The endpoint of the grpc connection.
1187
    pub endpoints: Vec<GrpcEndpoint>,
1188
1189
    /// The type of the upstream store, this ensures that the correct server calls are made.
1190
    pub store_type: StoreType,
1191
1192
    /// Retry configuration to use when a network request fails.
1193
    #[serde(default)]
1194
    pub retry: Retry,
1195
1196
    /// Limit the number of simultaneous upstream requests to this many.  A
1197
    /// value of zero is treated as unlimited.  If the limit is reached the
1198
    /// request is queued.
1199
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1200
    pub max_concurrent_requests: usize,
1201
1202
    /// The number of connections to make to each specified endpoint to balance
1203
    /// the load over multiple TCP connections.  Default 1.
1204
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1205
    pub connections_per_endpoint: usize,
1206
1207
    /// Maximum time (seconds) allowed for a single RPC request (e.g. a
1208
    /// ByteStream.Write call) before it is cancelled.
1209
    ///
1210
    /// A value of 0 (the default) disables the per-RPC timeout. Dead
1211
    /// connections are still detected by the HTTP/2 and TCP keepalive
1212
    /// mechanisms configured on each endpoint.
1213
    ///
1214
    /// For large uploads (multi-GB), either leave this at 0 or set it
1215
    /// large enough to accommodate the full transfer time.
1216
    ///
1217
    /// Default: 0 (disabled)
1218
    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
1219
    pub rpc_timeout_s: u64,
1220
}
1221
1222
/// The possible error codes that might occur on an upstream request.
1223
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
1224
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1225
pub enum ErrorCode {
1226
    Cancelled = 1,
1227
    Unknown = 2,
1228
    InvalidArgument = 3,
1229
    DeadlineExceeded = 4,
1230
    NotFound = 5,
1231
    AlreadyExists = 6,
1232
    PermissionDenied = 7,
1233
    ResourceExhausted = 8,
1234
    FailedPrecondition = 9,
1235
    Aborted = 10,
1236
    OutOfRange = 11,
1237
    Unimplemented = 12,
1238
    Internal = 13,
1239
    Unavailable = 14,
1240
    DataLoss = 15,
1241
    Unauthenticated = 16,
1242
    // Note: This list is duplicated from nativelink-error/lib.rs.
1243
}
1244
1245
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1246
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1247
pub struct RedisSpec {
1248
    /// The hostname or IP address of the Redis server.
1249
    /// Ex: `["redis://username:password@redis-server-url:6380/99"]`
1250
    /// 99 Represents database ID, 6380 represents the port.
1251
    #[serde(deserialize_with = "convert_vec_string_with_shellexpand")]
1252
    pub addresses: Vec<String>,
1253
1254
    /// DEPRECATED: use `command_timeout_ms`
1255
    /// The response timeout for the Redis connection in seconds.
1256
    ///
1257
    /// Default: 10
1258
    #[serde(default)]
1259
    pub response_timeout_s: u64,
1260
1261
    /// DEPRECATED: use `connection_timeout_ms`
1262
    ///
1263
    /// The connection timeout for the Redis connection in seconds.
1264
    ///
1265
    /// Default: 10
1266
    #[serde(default)]
1267
    pub connection_timeout_s: u64,
1268
1269
    /// An optional and experimental Redis channel to publish write events to.
1270
    ///
1271
    /// If set, every time a write operation is made to a Redis node
1272
    /// then an event will be published to a Redis channel with the given name.
1273
    /// If unset, the writes will still be made,
1274
    /// but the write events will not be published.
1275
    ///
1276
    /// Default: (Empty String / No Channel)
1277
    #[serde(default)]
1278
    pub experimental_pub_sub_channel: Option<String>,
1279
1280
    /// An optional prefix to prepend to all keys in this store.
1281
    ///
1282
    /// Setting this value can make it convenient to query or
1283
    /// organize your data according to the shared prefix.
1284
    ///
1285
    /// Default: (Empty String / No Prefix)
1286
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1287
    pub key_prefix: String,
1288
1289
    /// Set the mode Redis is operating in.
1290
    ///
1291
    /// Available options are "cluster" for
1292
    /// [cluster mode](https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/),
1293
    /// "sentinel" for [sentinel mode](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/),
1294
    /// or "standard" if Redis is operating in neither cluster nor sentinel mode.
1295
    ///
1296
    /// Default: standard,
1297
    #[serde(default)]
1298
    pub mode: RedisMode,
1299
1300
    /// Deprecated as redis-rs doesn't use it
1301
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1302
    pub broadcast_channel_capacity: usize,
1303
1304
    /// The amount of time in milliseconds until the redis store considers the
1305
    /// command to be timed out. This will trigger a retry of the command and
1306
    /// potentially a reconnection to the redis server.
1307
    ///
1308
    /// Default: 10000 (10 seconds)
1309
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1310
    pub command_timeout_ms: u64,
1311
1312
    /// The amount of time in milliseconds until the redis store considers the
1313
    /// connection to unresponsive. This will trigger a reconnection to the
1314
    /// redis server.
1315
    ///
1316
    /// Default: 3000 (3 seconds)
1317
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1318
    pub connection_timeout_ms: u64,
1319
1320
    /// The amount of data to read from the redis server at a time.
1321
    /// This is used to limit the amount of memory used when reading
1322
    /// large objects from the redis server as well as limiting the
1323
    /// amount of time a single read operation can take.
1324
    ///
1325
    /// IMPORTANT: If this value is too high, the `command_timeout_ms`
1326
    /// might be triggered if the latency or throughput to the redis
1327
    /// server is too low.
1328
    ///
1329
    /// Default: 64KiB
1330
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1331
    pub read_chunk_size: usize,
1332
1333
    /// The number of connections to keep open to the redis server(s).
1334
    ///
1335
    /// Default: 3
1336
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1337
    pub connection_pool_size: usize,
1338
1339
    /// The maximum number of upload chunks to allow per update.
1340
    /// This is used to limit the amount of memory used when uploading
1341
    /// large objects to the redis server. A good rule of thumb is to
1342
    /// think of the data as:
1343
    /// `AVAIL_MEMORY / (read_chunk_size * max_chunk_uploads_per_update) = THORETICAL_MAX_CONCURRENT_UPLOADS`
1344
    /// (note: it is a good idea to divide `AVAIL_MAX_MEMORY` by ~10 to account for other memory usage)
1345
    ///
1346
    /// Default: 10
1347
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1348
    pub max_chunk_uploads_per_update: usize,
1349
1350
    /// The COUNT value passed when scanning keys in Redis.
1351
    /// This is used to hint the amount of work that should be done per response.
1352
    ///
1353
    /// Default: 10000
1354
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1355
    pub scan_count: usize,
1356
1357
    /// Retry configuration to use when a network request fails.
1358
    /// See the `Retry` struct for more information.
1359
    ///
1360
    /// ```txt
1361
    /// Default: Retry {
1362
    ///   max_retries: 0, /* unlimited */
1363
    ///   delay: 0.1, /* 100ms */
1364
    ///   jitter: 0.5, /* 50% */
1365
    ///   retry_on_errors: None, /* not used in redis store */
1366
    /// }
1367
    /// ```
1368
    #[serde(default)]
1369
    pub retry: Retry,
1370
1371
    /// Maximum number of permitted actions to the Redis store at any one time
1372
    /// This stops problems with timeouts due to many, many inflight actions
1373
    /// Default: 500
1374
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1375
    pub max_client_permits: usize,
1376
1377
    /// Maximum number of items returned per cursor for the search indexes
1378
    /// May reduce thundering herd issues with worker provisioner at higher node counts,
1379
    /// Default: 1500
1380
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1381
    pub max_count_per_cursor: u64,
1382
}
1383
1384
#[derive(Debug, Default, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
1385
#[serde(rename_all = "snake_case")]
1386
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1387
pub enum RedisMode {
1388
    Cluster,
1389
    Sentinel,
1390
    #[default]
1391
    Standard,
1392
}
1393
1394
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
1395
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1396
pub struct NoopSpec {}
1397
1398
/// Retry configuration. This configuration is exponential and each iteration
1399
/// a jitter as a percentage is applied of the calculated delay. For example:
1400
/// ```haskell
1401
/// Retry{
1402
///   max_retries: 7,
1403
///   delay: 0.1,
1404
///   jitter: 0.5,
1405
/// }
1406
/// ```
1407
/// will result in:
1408
/// Attempt - Delay
1409
/// 1         0ms
1410
/// 2         75ms - 125ms
1411
/// 3         150ms - 250ms
1412
/// 4         300ms - 500ms
1413
/// 5         600ms - 1s
1414
/// 6         1.2s - 2s
1415
/// 7         2.4s - 4s
1416
/// 8         4.8s - 8s
1417
/// Remember that to get total results is additive, meaning the above results
1418
/// would mean a single request would have a total delay of 9.525s - 15.875s.
1419
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
1420
#[serde(deny_unknown_fields)]
1421
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1422
pub struct Retry {
1423
    /// Maximum number of retries until retrying stops.
1424
    /// Setting this to zero will always attempt 1 time, but not retry.
1425
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1426
    pub max_retries: usize,
1427
1428
    /// Delay in seconds for exponential back off.
1429
    #[serde(default)]
1430
    pub delay: f32,
1431
1432
    /// Amount of jitter to add as a percentage in decimal form. This will
1433
    /// change the formula like:
1434
    /// ```haskell
1435
    /// random(
1436
    ///    (2 ^ {attempt_number}) * {delay} * (1 - (jitter / 2)),
1437
    ///    (2 ^ {attempt_number}) * {delay} * (1 + (jitter / 2)),
1438
    /// )
1439
    /// ```
1440
    #[serde(default)]
1441
    pub jitter: f32,
1442
1443
    /// A list of error codes to retry on, if this is not set then the default
1444
    /// error codes to retry on are used.  These default codes are the most
1445
    /// likely to be non-permanent.
1446
    ///  - `Unknown`
1447
    ///  - `Cancelled`
1448
    ///  - `DeadlineExceeded`
1449
    ///  - `ResourceExhausted`
1450
    ///  - `Aborted`
1451
    ///  - `Internal`
1452
    ///  - `Unavailable`
1453
    ///  - `DataLoss`
1454
    #[serde(default)]
1455
    pub retry_on_errors: Option<Vec<ErrorCode>>,
1456
}
1457
1458
/// Configuration for `ExperimentalMongoDB` store.
1459
#[derive(Serialize, Deserialize, Debug, Clone)]
1460
#[serde(deny_unknown_fields)]
1461
#[cfg_attr(feature = "dev-schema", derive(JsonSchema))]
1462
pub struct ExperimentalMongoSpec {
1463
    /// `ExperimentalMongoDB` connection string.
1464
    /// Example: <mongodb://localhost:27017> or <mongodb+srv://cluster.mongodb.net>
1465
    #[serde(deserialize_with = "convert_string_with_shellexpand")]
1466
    pub connection_string: String,
1467
1468
    /// The database name to use.
1469
    /// Default: "nativelink"
1470
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1471
    pub database: String,
1472
1473
    /// The collection name for CAS data.
1474
    /// Default: "cas"
1475
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1476
    pub cas_collection: String,
1477
1478
    /// The collection name for scheduler data.
1479
    /// Default: "scheduler"
1480
    #[serde(default, deserialize_with = "convert_string_with_shellexpand")]
1481
    pub scheduler_collection: String,
1482
1483
    /// Prefix to prepend to all keys stored in `MongoDB`.
1484
    /// Default: ""
1485
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1486
    pub key_prefix: Option<String>,
1487
1488
    /// The maximum amount of data to read from `MongoDB` in a single chunk (in bytes).
1489
    /// Default: 65536 (64KB)
1490
    #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
1491
    pub read_chunk_size: usize,
1492
1493
    /// Maximum number of concurrent uploads allowed.
1494
    /// Default: 10
1495
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1496
    pub max_concurrent_uploads: usize,
1497
1498
    /// Connection timeout in milliseconds.
1499
    /// Default: 3000
1500
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1501
    pub connection_timeout_ms: u64,
1502
1503
    /// Command timeout in milliseconds.
1504
    /// Default: 10000
1505
    #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1506
    pub command_timeout_ms: u64,
1507
1508
    /// Enable `MongoDB` change streams for real-time updates.
1509
    /// Required for scheduler subscriptions.
1510
    /// Default: false
1511
    #[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
1512
    pub enable_change_streams: bool,
1513
1514
    /// Write concern 'w' parameter.
1515
    /// Can be a number (e.g., 1) or string (e.g., "majority").
1516
    /// Default: None (uses `MongoDB` default)
1517
    #[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
1518
    pub write_concern_w: Option<String>,
1519
1520
    /// Write concern 'j' parameter (journal acknowledgment).
1521
    /// Default: None (uses `MongoDB` default)
1522
    #[serde(default)]
1523
    pub write_concern_j: Option<bool>,
1524
1525
    /// Write concern timeout in milliseconds.
1526
    /// Default: None (uses `MongoDB` default)
1527
    #[serde(
1528
        default,
1529
        deserialize_with = "convert_optional_numeric_with_shellexpand"
1530
    )]
1531
    pub write_concern_timeout_ms: Option<u32>,
1532
}
1533
1534
impl Retry {
1535
10
    pub fn make_jitter_fn(&self) -> Arc<dyn Fn(Duration) -> Duration + Send + Sync> {
1536
10
        if self.jitter == 0f32 {
  Branch (1536:12): [True: 10, False: 0]
  Branch (1536:12): [Folded - Ignored]
1537
10
            Arc::new(move |delay: Duration| delay)
1538
        } else {
1539
0
            let local_jitter = self.jitter;
1540
0
            Arc::new(move |delay: Duration| {
1541
0
                delay.mul_f32(local_jitter.mul_add(rand::rng().random::<f32>() - 0.5, 1.))
1542
0
            })
1543
        }
1544
10
    }
1545
}