/build/source/nativelink-store/src/default_store_factory.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::SystemTime; |
18 | | |
19 | | use futures::stream::FuturesOrdered; |
20 | | use futures::{Future, TryStreamExt}; |
21 | | use nativelink_config::stores::{ExperimentalCloudObjectSpec, RedisMode, StoreSpec}; |
22 | | use nativelink_error::Error; |
23 | | use nativelink_util::health_utils::HealthRegistryBuilder; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver}; |
25 | | |
26 | | use crate::azure_blob_store::AzureBlobStore; |
27 | | use crate::cache_metrics_store::CacheMetricsStore; |
28 | | use crate::completeness_checking_store::CompletenessCheckingStore; |
29 | | use crate::compression_store::CompressionStore; |
30 | | use crate::dedup_store::DedupStore; |
31 | | use crate::existence_cache_store::ExistenceCacheStore; |
32 | | use crate::fast_slow_store::FastSlowStore; |
33 | | use crate::filesystem_store::FilesystemStore; |
34 | | use crate::gcs_store::GcsStore; |
35 | | use crate::grpc_store::GrpcStore; |
36 | | use crate::memory_store::MemoryStore; |
37 | | use crate::mongo_store::ExperimentalMongoStore; |
38 | | use crate::noop_store::NoopStore; |
39 | | use crate::ontap_s3_existence_cache_store::OntapS3ExistenceCache; |
40 | | use crate::ontap_s3_store::OntapS3Store; |
41 | | use crate::r2_store::R2Store; |
42 | | use crate::redis_store::RedisStore; |
43 | | use crate::ref_store::RefStore; |
44 | | use crate::s3_store::S3Store; |
45 | | use crate::shard_store::ShardStore; |
46 | | use crate::size_partitioning_store::SizePartitioningStore; |
47 | | use crate::store_manager::StoreManager; |
48 | | use crate::verify_store::VerifyStore; |
49 | | |
50 | | type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + Send + 'a>; |
51 | | |
52 | 57 | pub fn store_factory<'a>( |
53 | 57 | backend: &'a StoreSpec, |
54 | 57 | store_manager: &'a Arc<StoreManager>, |
55 | 57 | maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, |
56 | 57 | ) -> Pin<FutureMaybeStore<'a>> { |
57 | 57 | Box::pin(async move { |
58 | 57 | let store: Arc<dyn StoreDriver> = match backend { |
59 | 2 | StoreSpec::CacheMetrics(spec) => CacheMetricsStore::new( |
60 | 2 | spec, |
61 | 2 | store_factory(&spec.backend, store_manager, None).await?0 , |
62 | | ), |
63 | 52 | StoreSpec::Memory(spec) => MemoryStore::new(spec), |
64 | 0 | StoreSpec::ExperimentalCloudObjectStore(spec) => match spec { |
65 | 0 | ExperimentalCloudObjectSpec::Aws(aws_config) => { |
66 | 0 | S3Store::new(aws_config, SystemTime::now).await? |
67 | | } |
68 | 0 | ExperimentalCloudObjectSpec::Ontap(ontap_config) => { |
69 | 0 | OntapS3Store::new(ontap_config, SystemTime::now).await? |
70 | | } |
71 | 0 | ExperimentalCloudObjectSpec::Gcs(gcs_config) => { |
72 | 0 | GcsStore::new(gcs_config, SystemTime::now).await? |
73 | | } |
74 | 0 | ExperimentalCloudObjectSpec::Azure(azure_config) => { |
75 | 0 | AzureBlobStore::new(azure_config, SystemTime::now).await? |
76 | | } |
77 | 0 | ExperimentalCloudObjectSpec::R2(r2_config) => { |
78 | 0 | R2Store::new(r2_config, SystemTime::now).await? |
79 | | } |
80 | | }, |
81 | 0 | StoreSpec::RedisStore(spec) => { |
82 | 0 | if spec.mode == RedisMode::Cluster { |
83 | 0 | RedisStore::new_cluster(spec.clone()).await? |
84 | | } else { |
85 | 0 | RedisStore::new_standard(spec.clone()).await? |
86 | | } |
87 | | } |
88 | 0 | StoreSpec::Verify(spec) => VerifyStore::new( |
89 | 0 | spec, |
90 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
91 | | ), |
92 | 0 | StoreSpec::Compression(spec) => CompressionStore::new( |
93 | 0 | &spec.clone(), |
94 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
95 | 0 | )?, |
96 | 0 | StoreSpec::Dedup(spec) => DedupStore::new( |
97 | 0 | spec, |
98 | 0 | store_factory(&spec.index_store, store_manager, None).await?, |
99 | 0 | store_factory(&spec.content_store, store_manager, None).await?, |
100 | 0 | )?, |
101 | 0 | StoreSpec::ExistenceCache(spec) => ExistenceCacheStore::new( |
102 | 0 | spec, |
103 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
104 | | ), |
105 | 3 | StoreSpec::OntapS3ExistenceCache(spec) => { |
106 | 3 | OntapS3ExistenceCache::new(spec, SystemTime::now).await?0 |
107 | | } |
108 | 0 | StoreSpec::CompletenessChecking(spec) => CompletenessCheckingStore::new( |
109 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
110 | 0 | store_factory(&spec.cas_store, store_manager, None).await?, |
111 | | ), |
112 | 0 | StoreSpec::FastSlow(spec) => FastSlowStore::new( |
113 | 0 | spec, |
114 | 0 | store_factory(&spec.fast, store_manager, None).await?, |
115 | 0 | store_factory(&spec.slow, store_manager, None).await?, |
116 | | ), |
117 | 0 | StoreSpec::Filesystem(spec) => <FilesystemStore>::new(spec).await?, |
118 | 0 | StoreSpec::RefStore(spec) => RefStore::new(spec, Arc::downgrade(store_manager)), |
119 | 0 | StoreSpec::SizePartitioning(spec) => SizePartitioningStore::new( |
120 | 0 | spec, |
121 | 0 | store_factory(&spec.lower_store, store_manager, None).await?, |
122 | 0 | store_factory(&spec.upper_store, store_manager, None).await?, |
123 | | ), |
124 | 0 | StoreSpec::Grpc(spec) => GrpcStore::new(spec).await?, |
125 | 0 | StoreSpec::Noop(_) => NoopStore::new(), |
126 | 0 | StoreSpec::ExperimentalMongo(spec) => ExperimentalMongoStore::new(spec.clone()).await?, |
127 | 0 | StoreSpec::Shard(spec) => { |
128 | 0 | let stores = spec |
129 | 0 | .stores |
130 | 0 | .iter() |
131 | 0 | .map(|store_spec| store_factory(&store_spec.store, store_manager, None)) |
132 | 0 | .collect::<FuturesOrdered<_>>() |
133 | 0 | .try_collect::<Vec<_>>() |
134 | 0 | .await?; |
135 | 0 | ShardStore::new(spec, stores)? |
136 | | } |
137 | | }; |
138 | | |
139 | 57 | if let Some(health_registry_builder0 ) = maybe_health_registry_builder { |
140 | 0 | store.clone().register_health(health_registry_builder); |
141 | 57 | } |
142 | | |
143 | 57 | Ok(Store::new(store)) |
144 | 57 | }) |
145 | 57 | } |