Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::time::{Duration, SystemTime, UNIX_EPOCH};
21
22
use futures::stream::unfold;
23
use futures::{Stream, StreamExt};
24
use nativelink_config::cas_server::{ExecutionConfig, InstanceName};
25
use nativelink_error::{make_input_err, Error, ResultExt};
26
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
27
    Execution, ExecutionServer as Server,
28
};
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    Action, Command, ExecuteRequest, WaitExecutionRequest,
31
};
32
use nativelink_proto::google::longrunning::Operation;
33
use nativelink_store::ac_utils::get_and_decode_digest;
34
use nativelink_store::store_manager::StoreManager;
35
use nativelink_util::action_messages::{
36
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
37
};
38
use nativelink_util::common::DigestInfo;
39
use nativelink_util::digest_hasher::{make_ctx_for_hash_func, DigestHasherFunc};
40
use nativelink_util::operation_state_manager::{
41
    ActionStateResult, ClientStateManager, OperationFilter,
42
};
43
use nativelink_util::store_trait::Store;
44
use tonic::{Request, Response, Status};
45
use tracing::{error_span, event, instrument, Level};
46
47
type InstanceInfoName = String;
48
49
struct NativelinkOperationId {
50
    instance_name: InstanceInfoName,
51
    client_operation_id: OperationId,
52
}
53
54
impl NativelinkOperationId {
55
0
    fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
56
0
        Self {
57
0
            instance_name,
58
0
            client_operation_id,
59
0
        }
60
0
    }
61
62
0
    fn from_name(name: &str) -> Result<Self, Error> {
63
0
        let (instance_name, name) = name
64
0
            .split_once('/')
65
0
            .err_tip(|| "Expected instance_name and name to be separated by '/'")?;
66
0
        Ok(NativelinkOperationId::new(
67
0
            instance_name.to_string(),
68
0
            OperationId::from(name),
69
0
        ))
70
0
    }
71
}
72
73
impl fmt::Display for NativelinkOperationId {
74
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75
0
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
76
0
    }
77
}
78
79
struct InstanceInfo {
80
    scheduler: Arc<dyn ClientStateManager>,
81
    cas_store: Store,
82
}
83
84
impl InstanceInfo {
85
0
    async fn build_action_info(
86
0
        &self,
87
0
        instance_name: String,
88
0
        action_digest: DigestInfo,
89
0
        action: Action,
90
0
        priority: i32,
91
0
        skip_cache_lookup: bool,
92
0
        digest_function: DigestHasherFunc,
93
0
    ) -> Result<ActionInfo, Error> {
94
0
        let command_digest = DigestInfo::try_from(
95
0
            action
96
0
                .command_digest
97
0
                .clone()
98
0
                .err_tip(|| "Expected command_digest to exist")?,
99
        )
100
0
        .err_tip(|| "Could not decode command digest")?;
101
102
0
        let input_root_digest = DigestInfo::try_from(
103
0
            action
104
0
                .clone()
105
0
                .input_root_digest
106
0
                .err_tip(|| "Expected input_digest_root")?,
107
0
        )?;
108
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
109
0
            Duration::new(v.seconds as u64, v.nanos as u32)
110
0
        });
111
0
112
0
        let mut platform_properties = HashMap::new();
113
0
        if let Some(platform) = action.platform {
  Branch (113:16): [True: 0, False: 0]
  Branch (113:16): [Folded - Ignored]
114
0
            for property in platform.properties {
115
0
                platform_properties.insert(property.name, property.value);
116
0
            }
117
0
        }
118
119
        // Goma puts the properties in the Command.
120
0
        if platform_properties.is_empty() {
  Branch (120:12): [True: 0, False: 0]
  Branch (120:12): [Folded - Ignored]
121
0
            let command =
122
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
123
0
            if let Some(platform) = command.platform {
  Branch (123:20): [True: 0, False: 0]
  Branch (123:20): [Folded - Ignored]
124
0
                for property in platform.properties {
125
0
                    platform_properties.insert(property.name, property.value);
126
0
                }
127
0
            }
128
0
        }
129
130
0
        let action_key = ActionUniqueKey {
131
0
            instance_name,
132
0
            digest_function,
133
0
            digest: action_digest,
134
0
        };
135
0
        let unique_qualifier = if skip_cache_lookup {
  Branch (135:35): [True: 0, False: 0]
  Branch (135:35): [Folded - Ignored]
136
0
            ActionUniqueQualifier::Uncachable(action_key)
137
        } else {
138
0
            ActionUniqueQualifier::Cachable(action_key)
139
        };
140
141
0
        Ok(ActionInfo {
142
0
            command_digest,
143
0
            input_root_digest,
144
0
            timeout,
145
0
            platform_properties,
146
0
            priority,
147
0
            load_timestamp: UNIX_EPOCH,
148
0
            insert_timestamp: SystemTime::now(),
149
0
            unique_qualifier,
150
0
        })
151
0
    }
152
}
153
154
pub struct ExecutionServer {
155
    instance_infos: HashMap<InstanceName, InstanceInfo>,
156
}
157
158
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + 'static>>;
159
160
impl ExecutionServer {
161
0
    pub fn new(
162
0
        config: &HashMap<InstanceName, ExecutionConfig>,
163
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
164
0
        store_manager: &StoreManager,
165
0
    ) -> Result<Self, Error> {
166
0
        let mut instance_infos = HashMap::with_capacity(config.len());
167
0
        for (instance_name, exec_cfg) in config {
168
0
            let cas_store = store_manager
169
0
                .get_store(&exec_cfg.cas_store)
170
0
                .ok_or_else(|| {
171
0
                    make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store)
172
0
                })?;
173
0
            let scheduler = scheduler_map
174
0
                .get(&exec_cfg.scheduler)
175
0
                .err_tip(|| {
176
0
                    format!(
177
0
                        "Scheduler needs config for '{}' because it exists in execution",
178
0
                        exec_cfg.scheduler
179
0
                    )
180
0
                })?
181
0
                .clone();
182
0
183
0
            instance_infos.insert(
184
0
                instance_name.to_string(),
185
0
                InstanceInfo {
186
0
                    scheduler,
187
0
                    cas_store,
188
0
                },
189
0
            );
190
        }
191
0
        Ok(Self { instance_infos })
192
0
    }
193
194
0
    pub fn into_service(self) -> Server<ExecutionServer> {
195
0
        Server::new(self)
196
0
    }
197
198
0
    fn to_execute_stream(
199
0
        nl_client_operation_id: &NativelinkOperationId,
200
0
        action_listener: Box<dyn ActionStateResult>,
201
0
    ) -> Response<ExecuteStream> {
202
0
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
203
0
        let receiver_stream = Box::pin(unfold(
204
0
            Some(action_listener),
205
0
            move |maybe_action_listener| {
206
0
                let client_operation_id = client_operation_id.clone();
207
0
                async move {
208
0
                    let mut action_listener = maybe_action_listener?;
209
0
                    match action_listener.changed().await {
210
0
                        Ok(action_update) => {
211
0
                            event!(Level::INFO, ?action_update, "Execute Resp Stream");
212
                            // If the action is finished we won't be sending any more updates.
213
0
                            let maybe_action_listener = if action_update.stage.is_finished() {
  Branch (213:60): [True: 0, False: 0]
  Branch (213:60): [Folded - Ignored]
214
0
                                None
215
                            } else {
216
0
                                Some(action_listener)
217
                            };
218
0
                            Some((
219
0
                                Ok(action_update.as_operation(client_operation_id)),
220
0
                                maybe_action_listener,
221
0
                            ))
222
                        }
223
0
                        Err(err) => {
224
0
                            event!(Level::ERROR, ?err, "Error in action_listener stream");
225
0
                            Some((Err(err.into()), None))
226
                        }
227
                    }
228
0
                }
229
0
            },
230
0
        ));
231
0
        tonic::Response::new(receiver_stream)
232
0
    }
233
234
0
    async fn inner_execute(
235
0
        &self,
236
0
        request: ExecuteRequest,
237
0
    ) -> Result<Response<ExecuteStream>, Error> {
238
0
        let instance_name = request.instance_name;
239
240
0
        let instance_info = self
241
0
            .instance_infos
242
0
            .get(&instance_name)
243
0
            .err_tip(|| "Instance name '{}' not configured")?;
244
245
0
        let digest = DigestInfo::try_from(
246
0
            request
247
0
                .action_digest
248
0
                .err_tip(|| "Expected action_digest to exist")?,
249
        )
250
0
        .err_tip(|| "Failed to unwrap action cache")?;
251
252
0
        let priority = request
253
0
            .execution_policy
254
0
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
255
256
0
        let action =
257
0
            get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
258
0
        let action_info = instance_info
259
0
            .build_action_info(
260
0
                instance_name.clone(),
261
0
                digest,
262
0
                action,
263
0
                priority,
264
0
                request.skip_cache_lookup,
265
0
                request
266
0
                    .digest_function
267
0
                    .try_into()
268
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
269
            )
270
0
            .await?;
271
272
0
        let action_listener = instance_info
273
0
            .scheduler
274
0
            .add_action(OperationId::default(), Arc::new(action_info))
275
0
            .await
276
0
            .err_tip(|| "Failed to schedule task")?;
277
278
        Ok(Self::to_execute_stream(
279
            &NativelinkOperationId::new(
280
0
                instance_name,
281
0
                action_listener
282
0
                    .as_state()
283
0
                    .await
284
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
285
                    .client_operation_id
286
0
                    .clone(),
287
0
            ),
288
0
            action_listener,
289
        ))
290
0
    }
291
292
0
    async fn inner_wait_execution(
293
0
        &self,
294
0
        request: Request<WaitExecutionRequest>,
295
0
    ) -> Result<Response<ExecuteStream>, Status> {
296
0
        let nl_operation_id = NativelinkOperationId::from_name(&request.into_inner().name)
297
0
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?;
298
0
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
  Branch (298:13): [True: 0, False: 0]
  Branch (298:13): [Folded - Ignored]
299
0
            return Err(Status::not_found(format!(
300
0
                "No scheduler with the instance name {}",
301
0
                nl_operation_id.instance_name,
302
0
            )));
303
        };
304
0
        let Some(rx) = instance_info
  Branch (304:13): [True: 0, False: 0]
  Branch (304:13): [Folded - Ignored]
305
0
            .scheduler
306
0
            .filter_operations(OperationFilter {
307
0
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
308
0
                ..Default::default()
309
0
            })
310
0
            .await
311
0
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")?
312
0
            .next()
313
0
            .await
314
        else {
315
0
            return Err(Status::not_found("Failed to find existing task"));
316
        };
317
0
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
318
0
    }
319
}
320
321
#[tonic::async_trait]
322
impl Execution for ExecutionServer {
323
    type ExecuteStream = ExecuteStream;
324
    type WaitExecutionStream = ExecuteStream;
325
326
    #[allow(clippy::blocks_in_conditions)]
327
0
    #[instrument(
328
        err,
329
        level = Level::ERROR,
330
        skip_all,
331
        fields(request = ?grpc_request.get_ref())
332
0
    )]
333
    async fn execute(
334
        &self,
335
        grpc_request: Request<ExecuteRequest>,
336
0
    ) -> Result<Response<ExecuteStream>, Status> {
337
0
        let request = grpc_request.into_inner();
338
0
        make_ctx_for_hash_func(request.digest_function)
339
0
            .err_tip(|| "In ExecutionServer::execute")?
340
            .wrap_async(
341
0
                error_span!("execution_server_execute"),
342
0
                self.inner_execute(request),
343
            )
344
0
            .await
345
0
            .err_tip(|| "Failed on execute() command")
346
0
            .map_err(Into::into)
347
0
    }
348
349
    #[allow(clippy::blocks_in_conditions)]
350
0
    #[instrument(
351
        err,
352
        level = Level::ERROR,
353
        skip_all,
354
        fields(request = ?grpc_request.get_ref())
355
0
    )]
356
    async fn wait_execution(
357
        &self,
358
        grpc_request: Request<WaitExecutionRequest>,
359
0
    ) -> Result<Response<ExecuteStream>, Status> {
360
0
        let resp = self
361
0
            .inner_wait_execution(grpc_request)
362
0
            .await
363
0
            .err_tip(|| "Failed on wait_execution() command")
364
0
            .map_err(Into::into);
365
0
366
0
        if resp.is_ok() {
  Branch (366:12): [True: 0, False: 0]
  Branch (366:12): [Folded - Ignored]
367
0
            event!(Level::DEBUG, return = "Ok(<stream>)");
368
0
        }
369
0
        resp
370
0
    }
371
}