/build/source/nativelink-store/src/default_store_factory.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::SystemTime; |
18 | | |
19 | | use futures::stream::FuturesOrdered; |
20 | | use futures::{Future, TryStreamExt}; |
21 | | use nativelink_config::stores::StoreSpec; |
22 | | use nativelink_error::Error; |
23 | | use nativelink_util::health_utils::HealthRegistryBuilder; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver}; |
25 | | |
26 | | use crate::completeness_checking_store::CompletenessCheckingStore; |
27 | | use crate::compression_store::CompressionStore; |
28 | | use crate::dedup_store::DedupStore; |
29 | | use crate::existence_cache_store::ExistenceCacheStore; |
30 | | use crate::fast_slow_store::FastSlowStore; |
31 | | use crate::filesystem_store::FilesystemStore; |
32 | | use crate::grpc_store::GrpcStore; |
33 | | use crate::memory_store::MemoryStore; |
34 | | use crate::noop_store::NoopStore; |
35 | | use crate::redis_store::RedisStore; |
36 | | use crate::ref_store::RefStore; |
37 | | use crate::s3_store::S3Store; |
38 | | use crate::shard_store::ShardStore; |
39 | | use crate::size_partitioning_store::SizePartitioningStore; |
40 | | use crate::store_manager::StoreManager; |
41 | | use crate::verify_store::VerifyStore; |
42 | | |
43 | | type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>; |
44 | | |
45 | 32 | pub fn store_factory<'a>( |
46 | 32 | backend: &'a StoreSpec, |
47 | 32 | store_manager: &'a Arc<StoreManager>, |
48 | 32 | maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, |
49 | 32 | ) -> Pin<FutureMaybeStore<'a>> { |
50 | 32 | Box::pin(async move { |
51 | 32 | let store: Arc<dyn StoreDriver> = match backend { |
52 | 32 | StoreSpec::memory(spec) => MemoryStore::new(spec), |
53 | 0 | StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, |
54 | 0 | StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, |
55 | 0 | StoreSpec::verify(spec) => VerifyStore::new( |
56 | 0 | spec, |
57 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
58 | | ), |
59 | 0 | StoreSpec::compression(spec) => CompressionStore::new( |
60 | 0 | &spec.clone(), |
61 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
62 | 0 | )?, |
63 | 0 | StoreSpec::dedup(spec) => DedupStore::new( |
64 | 0 | spec, |
65 | 0 | store_factory(&spec.index_store, store_manager, None).await?, |
66 | 0 | store_factory(&spec.content_store, store_manager, None).await?, |
67 | 0 | )?, |
68 | 0 | StoreSpec::existence_cache(spec) => ExistenceCacheStore::new( |
69 | 0 | spec, |
70 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
71 | | ), |
72 | 0 | StoreSpec::completeness_checking(spec) => CompletenessCheckingStore::new( |
73 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
74 | 0 | store_factory(&spec.cas_store, store_manager, None).await?, |
75 | | ), |
76 | 0 | StoreSpec::fast_slow(spec) => FastSlowStore::new( |
77 | 0 | spec, |
78 | 0 | store_factory(&spec.fast, store_manager, None).await?, |
79 | 0 | store_factory(&spec.slow, store_manager, None).await?, |
80 | | ), |
81 | 0 | StoreSpec::filesystem(spec) => <FilesystemStore>::new(spec).await?, |
82 | 0 | StoreSpec::ref_store(spec) => RefStore::new(spec, Arc::downgrade(store_manager)), |
83 | 0 | StoreSpec::size_partitioning(spec) => SizePartitioningStore::new( |
84 | 0 | spec, |
85 | 0 | store_factory(&spec.lower_store, store_manager, None).await?, |
86 | 0 | store_factory(&spec.upper_store, store_manager, None).await?, |
87 | | ), |
88 | 0 | StoreSpec::grpc(spec) => GrpcStore::new(spec).await?, |
89 | 0 | StoreSpec::noop(_) => NoopStore::new(), |
90 | 0 | StoreSpec::shard(spec) => { |
91 | 0 | let stores = spec |
92 | 0 | .stores |
93 | 0 | .iter() |
94 | 0 | .map(|store_spec| store_factory(&store_spec.store, store_manager, None)) |
95 | 0 | .collect::<FuturesOrdered<_>>() |
96 | 0 | .try_collect::<Vec<_>>() |
97 | 0 | .await?; |
98 | 0 | ShardStore::new(spec, stores)? |
99 | | } |
100 | | }; |
101 | | |
102 | 32 | if let Some(health_registry_builder0 ) = maybe_health_registry_builder { Branch (102:16): [True: 0, False: 32]
Branch (102:16): [Folded - Ignored]
|
103 | 0 | store.clone().register_health(health_registry_builder); |
104 | 32 | } |
105 | | |
106 | 32 | Ok(Store::new(store)) |
107 | 32 | }) |
108 | 32 | } |