/build/source/nativelink-store/src/default_store_factory.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::time::SystemTime; |
18 | | |
19 | | use futures::stream::FuturesOrdered; |
20 | | use futures::{Future, TryStreamExt}; |
21 | | use nativelink_config::stores::{ExperimentalCloudObjectSpec, RedisMode, StoreSpec}; |
22 | | use nativelink_error::Error; |
23 | | use nativelink_util::health_utils::HealthRegistryBuilder; |
24 | | use nativelink_util::store_trait::{Store, StoreDriver}; |
25 | | |
26 | | use crate::azure_blob_store::AzureBlobStore; |
27 | | use crate::completeness_checking_store::CompletenessCheckingStore; |
28 | | use crate::compression_store::CompressionStore; |
29 | | use crate::dedup_store::DedupStore; |
30 | | use crate::existence_cache_store::ExistenceCacheStore; |
31 | | use crate::fast_slow_store::FastSlowStore; |
32 | | use crate::filesystem_store::FilesystemStore; |
33 | | use crate::gcs_store::GcsStore; |
34 | | use crate::grpc_store::GrpcStore; |
35 | | use crate::memory_store::MemoryStore; |
36 | | use crate::mongo_store::ExperimentalMongoStore; |
37 | | use crate::noop_store::NoopStore; |
38 | | use crate::ontap_s3_existence_cache_store::OntapS3ExistenceCache; |
39 | | use crate::ontap_s3_store::OntapS3Store; |
40 | | use crate::r2_store::R2Store; |
41 | | use crate::redis_store::RedisStore; |
42 | | use crate::ref_store::RefStore; |
43 | | use crate::s3_store::S3Store; |
44 | | use crate::shard_store::ShardStore; |
45 | | use crate::size_partitioning_store::SizePartitioningStore; |
46 | | use crate::store_manager::StoreManager; |
47 | | use crate::verify_store::VerifyStore; |
48 | | |
49 | | type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + Send + 'a>; |
50 | | |
51 | 51 | pub fn store_factory<'a>( |
52 | 51 | backend: &'a StoreSpec, |
53 | 51 | store_manager: &'a Arc<StoreManager>, |
54 | 51 | maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, |
55 | 51 | ) -> Pin<FutureMaybeStore<'a>> { |
56 | 51 | Box::pin(async move { |
57 | 51 | let store: Arc<dyn StoreDriver> = match backend { |
58 | 48 | StoreSpec::Memory(spec) => MemoryStore::new(spec), |
59 | 0 | StoreSpec::ExperimentalCloudObjectStore(spec) => match spec { |
60 | 0 | ExperimentalCloudObjectSpec::Aws(aws_config) => { |
61 | 0 | S3Store::new(aws_config, SystemTime::now).await? |
62 | | } |
63 | 0 | ExperimentalCloudObjectSpec::Ontap(ontap_config) => { |
64 | 0 | OntapS3Store::new(ontap_config, SystemTime::now).await? |
65 | | } |
66 | 0 | ExperimentalCloudObjectSpec::Gcs(gcs_config) => { |
67 | 0 | GcsStore::new(gcs_config, SystemTime::now).await? |
68 | | } |
69 | 0 | ExperimentalCloudObjectSpec::Azure(azure_config) => { |
70 | 0 | AzureBlobStore::new(azure_config, SystemTime::now).await? |
71 | | } |
72 | 0 | ExperimentalCloudObjectSpec::R2(r2_config) => { |
73 | 0 | R2Store::new(r2_config, SystemTime::now).await? |
74 | | } |
75 | | }, |
76 | 0 | StoreSpec::RedisStore(spec) => { |
77 | 0 | if spec.mode == RedisMode::Cluster { |
78 | 0 | RedisStore::new_cluster(spec.clone()).await? |
79 | | } else { |
80 | 0 | RedisStore::new_standard(spec.clone()).await? |
81 | | } |
82 | | } |
83 | 0 | StoreSpec::Verify(spec) => VerifyStore::new( |
84 | 0 | spec, |
85 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
86 | | ), |
87 | 0 | StoreSpec::Compression(spec) => CompressionStore::new( |
88 | 0 | &spec.clone(), |
89 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
90 | 0 | )?, |
91 | 0 | StoreSpec::Dedup(spec) => DedupStore::new( |
92 | 0 | spec, |
93 | 0 | store_factory(&spec.index_store, store_manager, None).await?, |
94 | 0 | store_factory(&spec.content_store, store_manager, None).await?, |
95 | 0 | )?, |
96 | 0 | StoreSpec::ExistenceCache(spec) => ExistenceCacheStore::new( |
97 | 0 | spec, |
98 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
99 | | ), |
100 | 3 | StoreSpec::OntapS3ExistenceCache(spec) => { |
101 | 3 | OntapS3ExistenceCache::new(spec, SystemTime::now).await?0 |
102 | | } |
103 | 0 | StoreSpec::CompletenessChecking(spec) => CompletenessCheckingStore::new( |
104 | 0 | store_factory(&spec.backend, store_manager, None).await?, |
105 | 0 | store_factory(&spec.cas_store, store_manager, None).await?, |
106 | | ), |
107 | 0 | StoreSpec::FastSlow(spec) => FastSlowStore::new( |
108 | 0 | spec, |
109 | 0 | store_factory(&spec.fast, store_manager, None).await?, |
110 | 0 | store_factory(&spec.slow, store_manager, None).await?, |
111 | | ), |
112 | 0 | StoreSpec::Filesystem(spec) => <FilesystemStore>::new(spec).await?, |
113 | 0 | StoreSpec::RefStore(spec) => RefStore::new(spec, Arc::downgrade(store_manager)), |
114 | 0 | StoreSpec::SizePartitioning(spec) => SizePartitioningStore::new( |
115 | 0 | spec, |
116 | 0 | store_factory(&spec.lower_store, store_manager, None).await?, |
117 | 0 | store_factory(&spec.upper_store, store_manager, None).await?, |
118 | | ), |
119 | 0 | StoreSpec::Grpc(spec) => GrpcStore::new(spec).await?, |
120 | 0 | StoreSpec::Noop(_) => NoopStore::new(), |
121 | 0 | StoreSpec::ExperimentalMongo(spec) => ExperimentalMongoStore::new(spec.clone()).await?, |
122 | 0 | StoreSpec::Shard(spec) => { |
123 | 0 | let stores = spec |
124 | 0 | .stores |
125 | 0 | .iter() |
126 | 0 | .map(|store_spec| store_factory(&store_spec.store, store_manager, None)) |
127 | 0 | .collect::<FuturesOrdered<_>>() |
128 | 0 | .try_collect::<Vec<_>>() |
129 | 0 | .await?; |
130 | 0 | ShardStore::new(spec, stores)? |
131 | | } |
132 | | }; |
133 | | |
134 | 51 | if let Some(health_registry_builder0 ) = maybe_health_registry_builder { |
135 | 0 | store.clone().register_health(health_registry_builder); |
136 | 51 | } |
137 | | |
138 | 51 | Ok(Store::new(store)) |
139 | 51 | }) |
140 | 51 | } |