Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::time::{Duration, SystemTime, UNIX_EPOCH};
21
22
use futures::stream::unfold;
23
use futures::{Stream, StreamExt};
24
use nativelink_config::cas_server::{ExecutionConfig, InstanceName};
25
use nativelink_error::{make_input_err, Error, ResultExt};
26
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
27
    Execution, ExecutionServer as Server,
28
};
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    Action, Command, ExecuteRequest, WaitExecutionRequest,
31
};
32
use nativelink_proto::google::longrunning::Operation;
33
use nativelink_store::ac_utils::get_and_decode_digest;
34
use nativelink_store::store_manager::StoreManager;
35
use nativelink_util::action_messages::{
36
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
37
};
38
use nativelink_util::common::DigestInfo;
39
use nativelink_util::digest_hasher::{make_ctx_for_hash_func, DigestHasherFunc};
40
use nativelink_util::operation_state_manager::{
41
    ActionStateResult, ClientStateManager, OperationFilter,
42
};
43
use nativelink_util::store_trait::Store;
44
use tonic::{Request, Response, Status};
45
use tracing::{error_span, event, instrument, Level};
46
47
type InstanceInfoName = String;
48
49
struct NativelinkOperationId {
50
    instance_name: InstanceInfoName,
51
    client_operation_id: OperationId,
52
}
53
54
impl NativelinkOperationId {
55
0
    fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
56
0
        Self {
57
0
            instance_name,
58
0
            client_operation_id,
59
0
        }
60
0
    }
61
62
0
    fn from_name(name: &str) -> Result<Self, Error> {
63
0
        let (instance_name, name) = name
64
0
            .split_once('/')
65
0
            .err_tip(|| "Expected instance_name and name to be separated by '/'")?;
66
0
        Ok(NativelinkOperationId::new(
67
0
            instance_name.to_string(),
68
0
            OperationId::from(name),
69
0
        ))
70
0
    }
71
}
72
73
impl fmt::Display for NativelinkOperationId {
74
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75
0
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
76
0
    }
77
}
78
79
struct InstanceInfo {
80
    scheduler: Arc<dyn ClientStateManager>,
81
    cas_store: Store,
82
}
83
84
impl InstanceInfo {
85
0
    async fn build_action_info(
86
0
        &self,
87
0
        instance_name: String,
88
0
        action_digest: DigestInfo,
89
0
        action: Action,
90
0
        priority: i32,
91
0
        skip_cache_lookup: bool,
92
0
        digest_function: DigestHasherFunc,
93
0
    ) -> Result<ActionInfo, Error> {
94
0
        let command_digest = DigestInfo::try_from(
95
0
            action
96
0
                .command_digest
97
0
                .clone()
98
0
                .err_tip(|| "Expected command_digest to exist")?,
99
        )
100
0
        .err_tip(|| "Could not decode command digest")?;
101
102
0
        let input_root_digest = DigestInfo::try_from(
103
0
            action
104
0
                .clone()
105
0
                .input_root_digest
106
0
                .err_tip(|| "Expected input_digest_root")?,
107
0
        )?;
108
0
        let timeout = action
109
0
            .timeout
110
0
            .map(|v| Duration::new(v.seconds as u64, v.nanos as u32))
111
0
            .unwrap_or(Duration::MAX);
112
0
113
0
        let mut platform_properties = HashMap::new();
114
0
        if let Some(platform) = action.platform {
  Branch (114:16): [True: 0, False: 0]
  Branch (114:16): [Folded - Ignored]
115
0
            for property in platform.properties {
116
0
                platform_properties.insert(property.name, property.value);
117
0
            }
118
0
        }
119
120
        // Goma puts the properties in the Command.
121
0
        if platform_properties.is_empty() {
  Branch (121:12): [True: 0, False: 0]
  Branch (121:12): [Folded - Ignored]
122
0
            let command =
123
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
124
0
            if let Some(platform) = command.platform {
  Branch (124:20): [True: 0, False: 0]
  Branch (124:20): [Folded - Ignored]
125
0
                for property in platform.properties {
126
0
                    platform_properties.insert(property.name, property.value);
127
0
                }
128
0
            }
129
0
        }
130
131
0
        let action_key = ActionUniqueKey {
132
0
            instance_name,
133
0
            digest_function,
134
0
            digest: action_digest,
135
0
        };
136
0
        let unique_qualifier = if skip_cache_lookup {
  Branch (136:35): [True: 0, False: 0]
  Branch (136:35): [Folded - Ignored]
137
0
            ActionUniqueQualifier::Uncachable(action_key)
138
        } else {
139
0
            ActionUniqueQualifier::Cachable(action_key)
140
        };
141
142
0
        Ok(ActionInfo {
143
0
            command_digest,
144
0
            input_root_digest,
145
0
            timeout,
146
0
            platform_properties,
147
0
            priority,
148
0
            load_timestamp: UNIX_EPOCH,
149
0
            insert_timestamp: SystemTime::now(),
150
0
            unique_qualifier,
151
0
        })
152
0
    }
153
}
154
155
pub struct ExecutionServer {
156
    instance_infos: HashMap<InstanceName, InstanceInfo>,
157
}
158
159
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + 'static>>;
160
161
impl ExecutionServer {
162
0
    pub fn new(
163
0
        config: &HashMap<InstanceName, ExecutionConfig>,
164
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
165
0
        store_manager: &StoreManager,
166
0
    ) -> Result<Self, Error> {
167
0
        let mut instance_infos = HashMap::with_capacity(config.len());
168
0
        for (instance_name, exec_cfg) in config {
169
0
            let cas_store = store_manager
170
0
                .get_store(&exec_cfg.cas_store)
171
0
                .ok_or_else(|| {
172
0
                    make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store)
173
0
                })?;
174
0
            let scheduler = scheduler_map
175
0
                .get(&exec_cfg.scheduler)
176
0
                .err_tip(|| {
177
0
                    format!(
178
0
                        "Scheduler needs config for '{}' because it exists in execution",
179
0
                        exec_cfg.scheduler
180
0
                    )
181
0
                })?
182
0
                .clone();
183
0
184
0
            instance_infos.insert(
185
0
                instance_name.to_string(),
186
0
                InstanceInfo {
187
0
                    scheduler,
188
0
                    cas_store,
189
0
                },
190
0
            );
191
        }
192
0
        Ok(Self { instance_infos })
193
0
    }
194
195
0
    pub fn into_service(self) -> Server<ExecutionServer> {
196
0
        Server::new(self)
197
0
    }
198
199
0
    fn to_execute_stream(
200
0
        nl_client_operation_id: &NativelinkOperationId,
201
0
        action_listener: Box<dyn ActionStateResult>,
202
0
    ) -> Response<ExecuteStream> {
203
0
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
204
0
        let receiver_stream = Box::pin(unfold(
205
0
            Some(action_listener),
206
0
            move |maybe_action_listener| {
207
0
                let client_operation_id = client_operation_id.clone();
208
0
                async move {
209
0
                    let mut action_listener = maybe_action_listener?;
210
0
                    match action_listener.changed().await {
211
0
                        Ok(action_update) => {
212
0
                            event!(Level::INFO, ?action_update, "Execute Resp Stream");
213
                            // If the action is finished we won't be sending any more updates.
214
0
                            let maybe_action_listener = if action_update.stage.is_finished() {
  Branch (214:60): [True: 0, False: 0]
  Branch (214:60): [Folded - Ignored]
215
0
                                None
216
                            } else {
217
0
                                Some(action_listener)
218
                            };
219
0
                            Some((
220
0
                                Ok(action_update.as_operation(client_operation_id)),
221
0
                                maybe_action_listener,
222
0
                            ))
223
                        }
224
0
                        Err(err) => {
225
0
                            event!(Level::ERROR, ?err, "Error in action_listener stream");
226
0
                            Some((Err(err.into()), None))
227
                        }
228
                    }
229
0
                }
230
0
            },
231
0
        ));
232
0
        tonic::Response::new(receiver_stream)
233
0
    }
234
235
0
    async fn inner_execute(
236
0
        &self,
237
0
        request: ExecuteRequest,
238
0
    ) -> Result<Response<ExecuteStream>, Error> {
239
0
        let instance_name = request.instance_name;
240
241
0
        let instance_info = self
242
0
            .instance_infos
243
0
            .get(&instance_name)
244
0
            .err_tip(|| "Instance name '{}' not configured")?;
245
246
0
        let digest = DigestInfo::try_from(
247
0
            request
248
0
                .action_digest
249
0
                .err_tip(|| "Expected action_digest to exist")?,
250
        )
251
0
        .err_tip(|| "Failed to unwrap action cache")?;
252
253
0
        let priority = request
254
0
            .execution_policy
255
0
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
256
257
0
        let action =
258
0
            get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
259
0
        let action_info = instance_info
260
0
            .build_action_info(
261
0
                instance_name.clone(),
262
0
                digest,
263
0
                action,
264
0
                priority,
265
0
                request.skip_cache_lookup,
266
0
                request
267
0
                    .digest_function
268
0
                    .try_into()
269
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
270
            )
271
0
            .await?;
272
273
0
        let action_listener = instance_info
274
0
            .scheduler
275
0
            .add_action(OperationId::default(), Arc::new(action_info))
276
0
            .await
277
0
            .err_tip(|| "Failed to schedule task")?;
278
279
        Ok(Self::to_execute_stream(
280
            &NativelinkOperationId::new(
281
0
                instance_name,
282
0
                action_listener
283
0
                    .as_state()
284
0
                    .await
285
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
286
                    .client_operation_id
287
0
                    .clone(),
288
0
            ),
289
0
            action_listener,
290
        ))
291
0
    }
292
293
0
    async fn inner_wait_execution(
294
0
        &self,
295
0
        request: Request<WaitExecutionRequest>,
296
0
    ) -> Result<Response<ExecuteStream>, Status> {
297
0
        let nl_operation_id = NativelinkOperationId::from_name(&request.into_inner().name)
298
0
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?;
299
0
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
  Branch (299:13): [True: 0, False: 0]
  Branch (299:13): [Folded - Ignored]
300
0
            return Err(Status::not_found(format!(
301
0
                "No scheduler with the instance name {}",
302
0
                nl_operation_id.instance_name,
303
0
            )));
304
        };
305
0
        let Some(rx) = instance_info
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [Folded - Ignored]
306
0
            .scheduler
307
0
            .filter_operations(OperationFilter {
308
0
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
309
0
                ..Default::default()
310
0
            })
311
0
            .await
312
0
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")?
313
0
            .next()
314
0
            .await
315
        else {
316
0
            return Err(Status::not_found("Failed to find existing task"));
317
        };
318
0
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
319
0
    }
320
}
321
322
#[tonic::async_trait]
323
impl Execution for ExecutionServer {
324
    type ExecuteStream = ExecuteStream;
325
    type WaitExecutionStream = ExecuteStream;
326
327
    #[allow(clippy::blocks_in_conditions)]
328
0
    #[instrument(
329
        err,
330
        level = Level::ERROR,
331
        skip_all,
332
        fields(request = ?grpc_request.get_ref())
333
0
    )]
334
    async fn execute(
335
        &self,
336
        grpc_request: Request<ExecuteRequest>,
337
0
    ) -> Result<Response<ExecuteStream>, Status> {
338
0
        let request = grpc_request.into_inner();
339
0
        make_ctx_for_hash_func(request.digest_function)
340
0
            .err_tip(|| "In ExecutionServer::execute")?
341
            .wrap_async(
342
0
                error_span!("execution_server_execute"),
343
0
                self.inner_execute(request),
344
            )
345
0
            .await
346
0
            .err_tip(|| "Failed on execute() command")
347
0
            .map_err(Into::into)
348
0
    }
349
350
    #[allow(clippy::blocks_in_conditions)]
351
0
    #[instrument(
352
        err,
353
        level = Level::ERROR,
354
        skip_all,
355
        fields(request = ?grpc_request.get_ref())
356
0
    )]
357
    async fn wait_execution(
358
        &self,
359
        grpc_request: Request<WaitExecutionRequest>,
360
0
    ) -> Result<Response<ExecuteStream>, Status> {
361
0
        let resp = self
362
0
            .inner_wait_execution(grpc_request)
363
0
            .await
364
0
            .err_tip(|| "Failed on wait_execution() command")
365
0
            .map_err(Into::into);
366
0
367
0
        if resp.is_ok() {
  Branch (367:12): [True: 0, False: 0]
  Branch (367:12): [Folded - Ignored]
368
0
            event!(Level::DEBUG, return = "Ok(<stream>)");
369
0
        }
370
0
        resp
371
0
    }
372
}