Coverage Report

Created: 2025-06-24 08:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/push_server.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use std::collections::HashMap;
17
18
use nativelink_config::cas_server::{PushConfig, WithInstanceName};
19
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
20
use nativelink_proto::build::bazel::remote::asset::v1::push_server::{Push, PushServer as Server};
21
use nativelink_proto::build::bazel::remote::asset::v1::{
22
    PushBlobRequest, PushBlobResponse, PushDirectoryRequest, PushDirectoryResponse,
23
};
24
use nativelink_store::store_manager::StoreManager;
25
use nativelink_util::digest_hasher::make_ctx_for_hash_func;
26
use nativelink_util::store_trait::{Store, StoreLike};
27
use opentelemetry::context::FutureExt;
28
use tonic::{Request, Response, Status};
29
use tracing::{Instrument, Level, error_span, info, instrument};
30
31
use crate::remote_asset_proto::RemoteAssetArtifact;
32
33
#[derive(Debug, Clone)]
34
pub struct PushStoreInfo {
35
    store: Store,
36
    read_only: bool,
37
}
38
39
#[derive(Debug, Clone)]
40
pub struct PushServer {
41
    stores: HashMap<String, PushStoreInfo>,
42
}
43
44
impl PushServer {
45
1
    pub fn new(
46
1
        configs: &[WithInstanceName<PushConfig>],
47
1
        store_manager: &StoreManager,
48
1
    ) -> Result<Self, Error> {
49
1
        let mut stores = HashMap::with_capacity(configs.len());
50
2
        for 
config1
in configs {
51
1
            let store = store_manager.get_store(&config.push_store).ok_or_else(|| 
{0
52
0
                make_input_err!("'push_store': '{}' does not exist", config.push_store)
53
0
            })?;
54
1
            stores.insert(
55
1
                config.instance_name.to_string(),
56
1
                PushStoreInfo {
57
1
                    store,
58
1
                    read_only: config.read_only,
59
1
                },
60
            );
61
        }
62
1
        Ok(Self {
63
1
            stores: stores.clone(),
64
1
        })
65
1
    }
66
67
0
    pub fn into_service(self) -> Server<Self> {
68
0
        Server::new(self)
69
0
    }
70
71
1
    async fn inner_push_blob(
72
1
        &self,
73
1
        request: PushBlobRequest,
74
1
    ) -> Result<Response<PushBlobResponse>, Error> {
75
1
        let instance_name = &request.instance_name;
76
1
        let store_info = self
77
1
            .stores
78
1
            .get(instance_name)
79
1
            .err_tip(|| format!(
"'instance_name' not configured for '{instance_name}'"0
))
?0
;
80
81
1
        if store_info.read_only {
  Branch (81:12): [True: 0, False: 1]
  Branch (81:12): [Folded - Ignored]
82
0
            return Err(make_err!(
83
0
                Code::PermissionDenied,
84
0
                "The store '{instance_name}' is read only on this endpoint",
85
0
            ));
86
1
        }
87
88
1
        let blob_digest = request.blob_digest.ok_or_else(|| 
{0
89
0
            Error::new(
90
0
                Code::InvalidArgument,
91
0
                "Missing blob digest in push".to_owned(),
92
            )
93
0
        })?;
94
1
        if request.uris.is_empty() {
  Branch (94:12): [True: 0, False: 1]
  Branch (94:12): [Folded - Ignored]
95
0
            return Err(Error::new(
96
0
                Code::InvalidArgument,
97
0
                "No uris in push request".to_owned(),
98
0
            ));
99
1
        }
100
2
        for 
uri1
in request.uris {
101
1
            let asset = RemoteAssetArtifact::new(
102
1
                uri.clone(),
103
1
                request.qualifiers.clone(),
104
1
                blob_digest.clone(),
105
1
                request.expire_at,
106
1
                request.digest_function,
107
            );
108
1
            let asset_digest = asset.digest();
109
1
            store_info
110
1
                .store
111
1
                .update_oneshot(asset_digest, asset.as_bytes())
112
1
                .await
113
1
                .err_tip(|| "Failed to update in push server")
?0
;
114
1
            info!(
115
                uri = uri,
116
1
                digest = format!("{}", asset_digest),
117
1
                "Pushed asset metadata with hash"
118
            );
119
        }
120
121
1
        Ok(Response::new(PushBlobResponse {}))
122
1
    }
123
}
124
125
#[tonic::async_trait]
126
impl Push for PushServer {
127
    #[allow(clippy::blocks_in_conditions)]
128
    #[instrument(
129
        err(level = Level::WARN),
130
        ret(level = Level::INFO),
131
        skip_all,
132
        fields(request = ?grpc_request.get_ref())
133
    )]
134
    async fn push_blob(
135
        &self,
136
        grpc_request: Request<PushBlobRequest>,
137
2
    ) -> Result<Response<PushBlobResponse>, Status> {
138
1
        let request = grpc_request.into_inner();
139
1
        let digest_function = request.digest_function;
140
1
        self.inner_push_blob(request)
141
1
            .instrument(error_span!("push_push_blob"))
142
1
            .with_context(
143
1
                make_ctx_for_hash_func(digest_function).err_tip(|| "In PushServer::push_blob")
?0
,
144
            )
145
1
            .await
146
1
            .err_tip(|| "Failed on push_blob() command")
147
1
            .map_err(Into::into)
148
2
    }
149
150
    #[allow(clippy::blocks_in_conditions)]
151
    #[instrument(
152
        err(level = Level::WARN),
153
        ret(level = Level::INFO),
154
        skip_all,
155
        fields(request = ?_grpc_request.get_ref())
156
    )]
157
    async fn push_directory(
158
        &self,
159
        _grpc_request: Request<PushDirectoryRequest>,
160
0
    ) -> Result<Response<PushDirectoryResponse>, Status> {
161
0
        todo!()
162
0
    }
163
}