Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/default_store_factory.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::time::SystemTime;
18
19
use futures::stream::FuturesOrdered;
20
use futures::{Future, TryStreamExt};
21
use nativelink_config::stores::StoreConfig;
22
use nativelink_error::Error;
23
use nativelink_util::health_utils::HealthRegistryBuilder;
24
use nativelink_util::store_trait::{Store, StoreDriver};
25
26
use crate::completeness_checking_store::CompletenessCheckingStore;
27
use crate::compression_store::CompressionStore;
28
use crate::dedup_store::DedupStore;
29
use crate::existence_cache_store::ExistenceCacheStore;
30
use crate::fast_slow_store::FastSlowStore;
31
use crate::filesystem_store::FilesystemStore;
32
use crate::grpc_store::GrpcStore;
33
use crate::memory_store::MemoryStore;
34
use crate::noop_store::NoopStore;
35
use crate::redis_store::RedisStore;
36
use crate::ref_store::RefStore;
37
use crate::s3_store::S3Store;
38
use crate::shard_store::ShardStore;
39
use crate::size_partitioning_store::SizePartitioningStore;
40
use crate::store_manager::StoreManager;
41
use crate::verify_store::VerifyStore;
42
43
type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>;
44
45
32
pub fn store_factory<'a>(
46
32
    backend: &'a StoreConfig,
47
32
    store_manager: &'a Arc<StoreManager>,
48
32
    maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>,
49
32
) -> Pin<FutureMaybeStore<'a>> {
50
32
    Box::pin(async move {
51
32
        let store: Arc<dyn StoreDriver> = match backend {
52
32
            StoreConfig::memory(config) => MemoryStore::new(config),
53
0
            StoreConfig::experimental_s3_store(config) => {
54
0
                S3Store::new(config, SystemTime::now).await?
55
            }
56
0
            StoreConfig::redis_store(config) => RedisStore::new(config)?,
57
0
            StoreConfig::verify(config) => VerifyStore::new(
58
0
                config,
59
0
                store_factory(&config.backend, store_manager, None).await?,
60
            ),
61
0
            StoreConfig::compression(config) => CompressionStore::new(
62
0
                &config.clone(),
63
0
                store_factory(&config.backend, store_manager, None).await?,
64
0
            )?,
65
0
            StoreConfig::dedup(config) => DedupStore::new(
66
0
                config,
67
0
                store_factory(&config.index_store, store_manager, None).await?,
68
0
                store_factory(&config.content_store, store_manager, None).await?,
69
0
            )?,
70
0
            StoreConfig::existence_cache(config) => ExistenceCacheStore::new(
71
0
                config,
72
0
                store_factory(&config.backend, store_manager, None).await?,
73
            ),
74
0
            StoreConfig::completeness_checking(config) => CompletenessCheckingStore::new(
75
0
                store_factory(&config.backend, store_manager, None).await?,
76
0
                store_factory(&config.cas_store, store_manager, None).await?,
77
            ),
78
0
            StoreConfig::fast_slow(config) => FastSlowStore::new(
79
0
                config,
80
0
                store_factory(&config.fast, store_manager, None).await?,
81
0
                store_factory(&config.slow, store_manager, None).await?,
82
            ),
83
0
            StoreConfig::filesystem(config) => <FilesystemStore>::new(config).await?,
84
0
            StoreConfig::ref_store(config) => RefStore::new(config, Arc::downgrade(store_manager)),
85
0
            StoreConfig::size_partitioning(config) => SizePartitioningStore::new(
86
0
                config,
87
0
                store_factory(&config.lower_store, store_manager, None).await?,
88
0
                store_factory(&config.upper_store, store_manager, None).await?,
89
            ),
90
0
            StoreConfig::grpc(config) => GrpcStore::new(config).await?,
91
0
            StoreConfig::noop => NoopStore::new(),
92
0
            StoreConfig::shard(config) => {
93
0
                let stores = config
94
0
                    .stores
95
0
                    .iter()
96
0
                    .map(|store_config| store_factory(&store_config.store, store_manager, None))
97
0
                    .collect::<FuturesOrdered<_>>()
98
0
                    .try_collect::<Vec<_>>()
99
0
                    .await?;
100
0
                ShardStore::new(config, stores)?
101
            }
102
        };
103
104
32
        if let Some(
health_registry_builder0
) = maybe_health_registry_builder {
  Branch (104:16): [True: 0, False: 32]
  Branch (104:16): [Folded - Ignored]
105
0
            store.clone().register_health(health_registry_builder);
106
32
        }
107
108
32
        Ok(Store::new(store))
109
32
    })
110
32
}