/build/source/nativelink-store/src/default_store_factory.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::SystemTime; |
18 | | |
19 | | use futures::stream::FuturesOrdered; |
20 | | use futures::{Future, TryStreamExt}; |
21 | | use nativelink_config::stores::StoreConfig; |
22 | | use nativelink_error::Error; |
23 | | use nativelink_util::health_utils::HealthRegistryBuilder; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver}; |
25 | | |
26 | | use crate::completeness_checking_store::CompletenessCheckingStore; |
27 | | use crate::compression_store::CompressionStore; |
28 | | use crate::dedup_store::DedupStore; |
29 | | use crate::existence_cache_store::ExistenceCacheStore; |
30 | | use crate::fast_slow_store::FastSlowStore; |
31 | | use crate::filesystem_store::FilesystemStore; |
32 | | use crate::grpc_store::GrpcStore; |
33 | | use crate::memory_store::MemoryStore; |
34 | | use crate::noop_store::NoopStore; |
35 | | use crate::redis_store::RedisStore; |
36 | | use crate::ref_store::RefStore; |
37 | | use crate::s3_store::S3Store; |
38 | | use crate::shard_store::ShardStore; |
39 | | use crate::size_partitioning_store::SizePartitioningStore; |
40 | | use crate::store_manager::StoreManager; |
41 | | use crate::verify_store::VerifyStore; |
42 | | |
43 | | type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>; |
44 | | |
45 | 32 | pub fn store_factory<'a>( |
46 | 32 | backend: &'a StoreConfig, |
47 | 32 | store_manager: &'a Arc<StoreManager>, |
48 | 32 | maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, |
49 | 32 | ) -> Pin<FutureMaybeStore<'a>> { |
50 | 32 | Box::pin(async move { |
51 | 32 | let store: Arc<dyn StoreDriver> = match backend { |
52 | 32 | StoreConfig::memory(config) => MemoryStore::new(config), |
53 | 0 | StoreConfig::experimental_s3_store(config) => { |
54 | 0 | S3Store::new(config, SystemTime::now).await? |
55 | | } |
56 | 0 | StoreConfig::redis_store(config) => RedisStore::new(config)?, |
57 | 0 | StoreConfig::verify(config) => VerifyStore::new( |
58 | 0 | config, |
59 | 0 | store_factory(&config.backend, store_manager, None).await?, |
60 | | ), |
61 | 0 | StoreConfig::compression(config) => CompressionStore::new( |
62 | 0 | &config.clone(), |
63 | 0 | store_factory(&config.backend, store_manager, None).await?, |
64 | 0 | )?, |
65 | 0 | StoreConfig::dedup(config) => DedupStore::new( |
66 | 0 | config, |
67 | 0 | store_factory(&config.index_store, store_manager, None).await?, |
68 | 0 | store_factory(&config.content_store, store_manager, None).await?, |
69 | 0 | )?, |
70 | 0 | StoreConfig::existence_cache(config) => ExistenceCacheStore::new( |
71 | 0 | config, |
72 | 0 | store_factory(&config.backend, store_manager, None).await?, |
73 | | ), |
74 | 0 | StoreConfig::completeness_checking(config) => CompletenessCheckingStore::new( |
75 | 0 | store_factory(&config.backend, store_manager, None).await?, |
76 | 0 | store_factory(&config.cas_store, store_manager, None).await?, |
77 | | ), |
78 | 0 | StoreConfig::fast_slow(config) => FastSlowStore::new( |
79 | 0 | config, |
80 | 0 | store_factory(&config.fast, store_manager, None).await?, |
81 | 0 | store_factory(&config.slow, store_manager, None).await?, |
82 | | ), |
83 | 0 | StoreConfig::filesystem(config) => <FilesystemStore>::new(config).await?, |
84 | 0 | StoreConfig::ref_store(config) => RefStore::new(config, Arc::downgrade(store_manager)), |
85 | 0 | StoreConfig::size_partitioning(config) => SizePartitioningStore::new( |
86 | 0 | config, |
87 | 0 | store_factory(&config.lower_store, store_manager, None).await?, |
88 | 0 | store_factory(&config.upper_store, store_manager, None).await?, |
89 | | ), |
90 | 0 | StoreConfig::grpc(config) => GrpcStore::new(config).await?, |
91 | 0 | StoreConfig::noop => NoopStore::new(), |
92 | 0 | StoreConfig::shard(config) => { |
93 | 0 | let stores = config |
94 | 0 | .stores |
95 | 0 | .iter() |
96 | 0 | .map(|store_config| store_factory(&store_config.store, store_manager, None)) |
97 | 0 | .collect::<FuturesOrdered<_>>() |
98 | 0 | .try_collect::<Vec<_>>() |
99 | 0 | .await?; |
100 | 0 | ShardStore::new(config, stores)? |
101 | | } |
102 | | }; |
103 | | |
104 | 32 | if let Some(health_registry_builder0 ) = maybe_health_registry_builder { Branch (104:16): [True: 0, False: 32]
Branch (104:16): [Folded - Ignored]
|
105 | 0 | store.clone().register_health(health_registry_builder); |
106 | 32 | } |
107 | | |
108 | 32 | Ok(Store::new(store)) |
109 | 32 | }) |
110 | 32 | } |