Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::fmt;
20
use std::sync::Arc;
21
use std::time::{SystemTime, UNIX_EPOCH};
22
23
use futures::stream::unfold;
24
use futures::{Stream, StreamExt};
25
use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName};
26
use nativelink_error::{Error, ResultExt, make_input_err};
27
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
28
    Execution, ExecutionServer as Server,
29
};
30
use nativelink_proto::build::bazel::remote::execution::v2::{
31
    Action, Command, ExecuteRequest, WaitExecutionRequest,
32
};
33
use nativelink_proto::google::longrunning::Operation;
34
use nativelink_store::ac_utils::get_and_decode_digest;
35
use nativelink_store::store_manager::StoreManager;
36
use nativelink_util::action_messages::{
37
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, DEFAULT_EXECUTION_PRIORITY, OperationId,
38
};
39
use nativelink_util::common::DigestInfo;
40
use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func};
41
use nativelink_util::operation_state_manager::{
42
    ActionStateResult, ClientStateManager, OperationFilter,
43
};
44
use nativelink_util::store_trait::Store;
45
use opentelemetry::context::FutureExt;
46
use tonic::{Request, Response, Status};
47
use tracing::{Instrument, Level, debug, error, error_span, instrument};
48
49
type InstanceInfoName = String;
50
51
struct NativelinkOperationId {
52
    instance_name: InstanceInfoName,
53
    client_operation_id: OperationId,
54
}
55
56
impl NativelinkOperationId {
57
2
    const fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
58
2
        Self {
59
2
            instance_name,
60
2
            client_operation_id,
61
2
        }
62
2
    }
63
64
2
    fn from_name(name: &str) -> Result<Self, Error> {
65
2
        let (instance_name, name) = name
66
2
            .rsplit_once('/')
67
2
            .err_tip(|| "Expected instance_name and name to be separated by '/'")
?0
;
68
2
        Ok(Self::new(
69
2
            instance_name.to_string(),
70
2
            OperationId::from(name),
71
2
        ))
72
2
    }
73
}
74
75
impl fmt::Display for NativelinkOperationId {
76
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77
0
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
78
0
    }
79
}
80
81
struct InstanceInfo {
82
    scheduler: Arc<dyn ClientStateManager>,
83
    cas_store: Store,
84
}
85
86
impl fmt::Debug for InstanceInfo {
87
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88
0
        f.debug_struct("InstanceInfo")
89
0
            .field("cas_store", &self.cas_store)
90
0
            .finish_non_exhaustive()
91
0
    }
92
}
93
94
impl InstanceInfo {
95
0
    async fn build_action_info(
96
0
        &self,
97
0
        instance_name: String,
98
0
        action_digest: DigestInfo,
99
0
        action: Action,
100
0
        priority: i32,
101
0
        skip_cache_lookup: bool,
102
0
        digest_function: DigestHasherFunc,
103
0
    ) -> Result<ActionInfo, Error> {
104
0
        let command_digest = DigestInfo::try_from(
105
0
            action
106
0
                .command_digest
107
0
                .clone()
108
0
                .err_tip(|| "Expected command_digest to exist")?,
109
        )
110
0
        .err_tip(|| "Could not decode command digest")?;
111
112
0
        let input_root_digest = DigestInfo::try_from(
113
0
            action
114
0
                .clone()
115
0
                .input_root_digest
116
0
                .err_tip(|| "Expected input_digest_root")?,
117
0
        )?;
118
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
119
0
            Duration::new(v.seconds as u64, v.nanos as u32)
120
0
        });
121
122
0
        let mut platform_properties = HashMap::new();
123
0
        if let Some(platform) = action.platform {
  Branch (123:16): [True: 0, False: 0]
  Branch (123:16): [Folded - Ignored]
124
0
            for property in platform.properties {
125
0
                platform_properties.insert(property.name, property.value);
126
0
            }
127
0
        }
128
129
        // Goma puts the properties in the Command.
130
0
        if platform_properties.is_empty() {
  Branch (130:12): [True: 0, False: 0]
  Branch (130:12): [Folded - Ignored]
131
0
            let command =
132
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
133
0
            if let Some(platform) = command.platform {
  Branch (133:20): [True: 0, False: 0]
  Branch (133:20): [Folded - Ignored]
134
0
                for property in platform.properties {
135
0
                    platform_properties.insert(property.name, property.value);
136
0
                }
137
0
            }
138
0
        }
139
140
0
        let action_key = ActionUniqueKey {
141
0
            instance_name,
142
0
            digest_function,
143
0
            digest: action_digest,
144
0
        };
145
0
        let unique_qualifier = if skip_cache_lookup {
  Branch (145:35): [True: 0, False: 0]
  Branch (145:35): [Folded - Ignored]
146
0
            ActionUniqueQualifier::Uncacheable(action_key)
147
        } else {
148
0
            ActionUniqueQualifier::Cacheable(action_key)
149
        };
150
151
0
        Ok(ActionInfo {
152
0
            command_digest,
153
0
            input_root_digest,
154
0
            timeout,
155
0
            platform_properties,
156
0
            priority,
157
0
            load_timestamp: UNIX_EPOCH,
158
0
            insert_timestamp: SystemTime::now(),
159
0
            unique_qualifier,
160
0
        })
161
0
    }
162
}
163
164
#[derive(Debug)]
165
pub struct ExecutionServer {
166
    instance_infos: HashMap<InstanceName, InstanceInfo>,
167
}
168
169
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
170
171
impl ExecutionServer {
172
0
    pub fn new(
173
0
        configs: &[WithInstanceName<ExecutionConfig>],
174
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
175
0
        store_manager: &StoreManager,
176
0
    ) -> Result<Self, Error> {
177
0
        let mut instance_infos = HashMap::with_capacity(configs.len());
178
0
        for config in configs {
179
0
            let cas_store = store_manager.get_store(&config.cas_store).ok_or_else(|| {
180
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
181
0
            })?;
182
0
            let scheduler = scheduler_map
183
0
                .get(&config.scheduler)
184
0
                .err_tip(|| {
185
0
                    format!(
186
0
                        "Scheduler needs config for '{}' because it exists in execution",
187
0
                        config.scheduler
188
                    )
189
0
                })?
190
0
                .clone();
191
192
0
            instance_infos.insert(
193
0
                config.instance_name.to_string(),
194
0
                InstanceInfo {
195
0
                    scheduler,
196
0
                    cas_store,
197
0
                },
198
            );
199
        }
200
0
        Ok(Self { instance_infos })
201
0
    }
202
203
0
    pub fn into_service(self) -> Server<Self> {
204
0
        Server::new(self)
205
0
    }
206
207
0
    fn to_execute_stream(
208
0
        nl_client_operation_id: &NativelinkOperationId,
209
0
        action_listener: Box<dyn ActionStateResult>,
210
0
    ) -> impl Stream<Item = Result<Operation, Status>> + Send + use<> {
211
0
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
212
0
        unfold(Some(action_listener), move |maybe_action_listener| {
213
0
            let client_operation_id = client_operation_id.clone();
214
0
            async move {
215
0
                let mut action_listener = maybe_action_listener?;
216
0
                match action_listener.changed().await {
217
0
                    Ok((action_update, _maybe_origin_metadata)) => {
218
0
                        debug!(?action_update, "Execute Resp Stream");
219
                        // If the action is finished we won't be sending any more updates.
220
0
                        let maybe_action_listener = if action_update.stage.is_finished() {
  Branch (220:56): [True: 0, False: 0]
  Branch (220:56): [Folded - Ignored]
221
0
                            None
222
                        } else {
223
0
                            Some(action_listener)
224
                        };
225
0
                        Some((
226
0
                            Ok(action_update.as_operation(client_operation_id)),
227
0
                            maybe_action_listener,
228
0
                        ))
229
                    }
230
0
                    Err(err) => {
231
0
                        error!(?err, "Error in action_listener stream");
232
0
                        Some((Err(err.into()), None))
233
                    }
234
                }
235
0
            }
236
0
        })
237
0
    }
238
239
0
    async fn inner_execute(
240
0
        &self,
241
0
        request: ExecuteRequest,
242
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Error> {
243
0
        let instance_name = request.instance_name;
244
245
0
        let instance_info = self
246
0
            .instance_infos
247
0
            .get(&instance_name)
248
0
            .err_tip(|| "Instance name '{}' not configured")?;
249
250
0
        let digest = DigestInfo::try_from(
251
0
            request
252
0
                .action_digest
253
0
                .err_tip(|| "Expected action_digest to exist")?,
254
        )
255
0
        .err_tip(|| "Failed to unwrap action cache")?;
256
257
0
        let priority = request
258
0
            .execution_policy
259
0
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
260
261
0
        let action =
262
0
            get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
263
0
        let action_info = instance_info
264
0
            .build_action_info(
265
0
                instance_name.clone(),
266
0
                digest,
267
0
                action,
268
0
                priority,
269
0
                request.skip_cache_lookup,
270
0
                request
271
0
                    .digest_function
272
0
                    .try_into()
273
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
274
            )
275
0
            .await?;
276
277
0
        let action_listener = instance_info
278
0
            .scheduler
279
0
            .add_action(OperationId::default(), Arc::new(action_info))
280
0
            .await
281
0
            .err_tip(|| "Failed to schedule task")?;
282
283
0
        Ok(Box::pin(Self::to_execute_stream(
284
0
            &NativelinkOperationId::new(
285
0
                instance_name,
286
0
                action_listener
287
0
                    .as_state()
288
0
                    .await
289
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
290
                    .0
291
                    .client_operation_id
292
0
                    .clone(),
293
            ),
294
0
            action_listener,
295
        )))
296
0
    }
297
298
0
    async fn inner_wait_execution(
299
0
        &self,
300
0
        request: WaitExecutionRequest,
301
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Status> {
302
0
        let nl_operation_id = NativelinkOperationId::from_name(&request.name)
303
0
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?;
304
0
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
  Branch (304:13): [True: 0, False: 0]
  Branch (304:13): [Folded - Ignored]
305
0
            return Err(Status::not_found(format!(
306
0
                "No scheduler with the instance name {}",
307
0
                nl_operation_id.instance_name,
308
0
            )));
309
        };
310
0
        let Some(rx) = instance_info
  Branch (310:13): [True: 0, False: 0]
  Branch (310:13): [Folded - Ignored]
311
0
            .scheduler
312
0
            .filter_operations(OperationFilter {
313
0
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
314
0
                ..Default::default()
315
0
            })
316
0
            .await
317
0
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")?
318
0
            .next()
319
0
            .await
320
        else {
321
0
            return Err(Status::not_found("Failed to find existing task"));
322
        };
323
0
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
324
0
    }
325
}
326
327
#[tonic::async_trait]
328
impl Execution for ExecutionServer {
329
    type ExecuteStream = ExecuteStream;
330
    type WaitExecutionStream = ExecuteStream;
331
332
    #[instrument(
333
        err,
334
        level = Level::ERROR,
335
        skip_all,
336
        fields(request = ?grpc_request.get_ref())
337
    )]
338
    async fn execute(
339
        &self,
340
        grpc_request: Request<ExecuteRequest>,
341
0
    ) -> Result<Response<ExecuteStream>, Status> {
342
0
        let request = grpc_request.into_inner();
343
344
0
        let digest_function = request.digest_function;
345
0
        let result = self
346
0
            .inner_execute(request)
347
0
            .instrument(error_span!("execution_server_execute"))
348
0
            .with_context(
349
0
                make_ctx_for_hash_func(digest_function)
350
0
                    .err_tip(|| "In ExecutionServer::execute")?,
351
            )
352
0
            .await
353
0
            .err_tip(|| "Failed on execute() command")?;
354
355
0
        Ok(Response::new(Box::pin(result)))
356
0
    }
357
358
    #[instrument(
359
        err,
360
        level = Level::ERROR,
361
        skip_all,
362
        fields(request = ?grpc_request.get_ref())
363
    )]
364
    async fn wait_execution(
365
        &self,
366
        grpc_request: Request<WaitExecutionRequest>,
367
0
    ) -> Result<Response<ExecuteStream>, Status> {
368
0
        let request = grpc_request.into_inner();
369
370
0
        let stream_result = self
371
0
            .inner_wait_execution(request)
372
0
            .await
373
0
            .err_tip(|| "Failed on wait_execution() command")
374
0
            .map_err(Into::into);
375
0
        let stream = match stream_result {
376
0
            Ok(stream) => stream,
377
0
            Err(e) => return Err(e),
378
        };
379
0
        debug!(return = "Ok(<stream>)");
380
0
        Ok(Response::new(Box::pin(stream)))
381
0
    }
382
}
383
384
#[cfg(test)]
385
#[test]
386
1
fn test_nl_op_id_from_name() -> Result<(), Box<dyn core::error::Error>> {
387
1
    let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")];
388
389
3
    for (
input2
,
expected2
) in examples {
390
2
        let id = NativelinkOperationId::from_name(input)
?0
;
391
2
        assert_eq!(id.instance_name, expected);
392
    }
393
394
1
    Ok(())
395
1
}