/build/source/nativelink-store/src/default_store_factory.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::SystemTime; |
18 | | |
19 | | use futures::stream::FuturesOrdered; |
20 | | use futures::{Future, TryStreamExt}; |
21 | | use nativelink_config::stores::{ExperimentalCloudObjectSpec, StoreSpec}; |
22 | | use nativelink_error::Error; |
23 | | use nativelink_util::health_utils::HealthRegistryBuilder; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver}; |
25 | | |
26 | | use crate::completeness_checking_store::CompletenessCheckingStore; |
27 | | use crate::compression_store::CompressionStore; |
28 | | use crate::dedup_store::DedupStore; |
29 | | use crate::existence_cache_store::ExistenceCacheStore; |
30 | | use crate::fast_slow_store::FastSlowStore; |
31 | | use crate::filesystem_store::FilesystemStore; |
32 | | use crate::gcs_store::GcsStore; |
33 | | use crate::grpc_store::GrpcStore; |
34 | | use crate::memory_store::MemoryStore; |
35 | | use crate::noop_store::NoopStore; |
36 | | use crate::redis_store::RedisStore; |
37 | | use crate::ref_store::RefStore; |
38 | | use crate::s3_store::S3Store; |
39 | | use crate::shard_store::ShardStore; |
40 | | use crate::size_partitioning_store::SizePartitioningStore; |
41 | | use crate::store_manager::StoreManager; |
42 | | use crate::verify_store::VerifyStore; |
43 | | |
44 | | type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + Send + 'a>; |
45 | | |
46 | 34 | pub fn store_factory<'a>( |
47 | 34 | backend: &'a StoreSpec, |
48 | 34 | store_manager: &'a Arc<StoreManager>, |
49 | 34 | maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, |
50 | 34 | ) -> Pin<FutureMaybeStore<'a>> { |
51 | 34 | Box::pin(async move { |
52 | 34 | let store: Arc<dyn StoreDriver> = match backend { |
53 | 34 | StoreSpec::Memory(spec) => MemoryStore::new(spec), |
54 | 0 | StoreSpec::ExperimentalCloudObjectStore(spec) => match spec { |
55 | 0 | ExperimentalCloudObjectSpec::Aws(aws_config) => { |
56 | 0 | S3Store::new(aws_config, SystemTime::now).await? |
57 | | } |
58 | 0 | ExperimentalCloudObjectSpec::Gcs(gcs_config) => { |
59 | 0 | GcsStore::new(gcs_config, SystemTime::now).await? |
60 | | } |
61 | | }, |
62 | 0 | StoreSpec::RedisStore(spec) => RedisStore::new(spec.clone())?, |
63 | 0 | StoreSpec::Verify(spec) => VerifyStore::new( |
64 | 0 | spec, |
65 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
66 | | ), |
67 | 0 | StoreSpec::Compression(spec) => CompressionStore::new( |
68 | 0 | &spec.clone(), |
69 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
70 | 0 | )?, |
71 | 0 | StoreSpec::Dedup(spec) => DedupStore::new( |
72 | 0 | spec, |
73 | 0 | store_factory(&spec.index_store, store_manager, None).await?, |
74 | 0 | store_factory(&spec.content_store, store_manager, None).await?, |
75 | 0 | )?, |
76 | 0 | StoreSpec::ExistenceCache(spec) => ExistenceCacheStore::new( |
77 | 0 | spec, |
78 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
79 | | ), |
80 | 0 | StoreSpec::CompletenessChecking(spec) => CompletenessCheckingStore::new( |
81 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
82 | 0 | store_factory(&spec.cas_store, store_manager, None).await?, |
83 | | ), |
84 | 0 | StoreSpec::FastSlow(spec) => FastSlowStore::new( |
85 | 0 | spec, |
86 | 0 | store_factory(&spec.fast, store_manager, None).await?, |
87 | 0 | store_factory(&spec.slow, store_manager, None).await?, |
88 | | ), |
89 | 0 | StoreSpec::Filesystem(spec) => <FilesystemStore>::new(spec).await?, |
90 | 0 | StoreSpec::RefStore(spec) => RefStore::new(spec, Arc::downgrade(store_manager)), |
91 | 0 | StoreSpec::SizePartitioning(spec) => SizePartitioningStore::new( |
92 | 0 | spec, |
93 | 0 | store_factory(&spec.lower_store, store_manager, None).await?, |
94 | 0 | store_factory(&spec.upper_store, store_manager, None).await?, |
95 | | ), |
96 | 0 | StoreSpec::Grpc(spec) => GrpcStore::new(spec).await?, |
97 | 0 | StoreSpec::Noop(_) => NoopStore::new(), |
98 | 0 | StoreSpec::Shard(spec) => { |
99 | 0 | let stores = spec |
100 | 0 | .stores |
101 | 0 | .iter() |
102 | 0 | .map(|store_spec| store_factory(&store_spec.store, store_manager, None)) |
103 | 0 | .collect::<FuturesOrdered<_>>() |
104 | 0 | .try_collect::<Vec<_>>() |
105 | 0 | .await?; |
106 | 0 | ShardStore::new(spec, stores)? |
107 | | } |
108 | | }; |
109 | | |
110 | 34 | if let Some(health_registry_builder0 ) = maybe_health_registry_builder { Branch (110:16): [True: 0, False: 34]
Branch (110:16): [Folded - Ignored]
|
111 | 0 | store.clone().register_health(health_registry_builder); |
112 | 34 | } |
113 | | |
114 | 34 | Ok(Store::new(store)) |
115 | 34 | }) |
116 | 34 | } |