Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::convert::Into;
17
use std::fmt;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::time::{Duration, SystemTime, UNIX_EPOCH};
21
22
use futures::stream::unfold;
23
use futures::{Stream, StreamExt};
24
use nativelink_config::cas_server::{ExecutionConfig, InstanceName};
25
use nativelink_error::{Error, ResultExt, make_input_err};
26
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
27
    Execution, ExecutionServer as Server,
28
};
29
use nativelink_proto::build::bazel::remote::execution::v2::{
30
    Action, Command, ExecuteRequest, WaitExecutionRequest,
31
};
32
use nativelink_proto::google::longrunning::Operation;
33
use nativelink_store::ac_utils::get_and_decode_digest;
34
use nativelink_store::store_manager::StoreManager;
35
use nativelink_util::action_messages::{
36
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, DEFAULT_EXECUTION_PRIORITY, OperationId,
37
};
38
use nativelink_util::common::DigestInfo;
39
use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func};
40
use nativelink_util::operation_state_manager::{
41
    ActionStateResult, ClientStateManager, OperationFilter,
42
};
43
use nativelink_util::origin_event::OriginEventContext;
44
use nativelink_util::store_trait::Store;
45
use tonic::{Request, Response, Status};
46
use tracing::{Level, error_span, event, instrument};
47
48
type InstanceInfoName = String;
49
50
struct NativelinkOperationId {
51
    instance_name: InstanceInfoName,
52
    client_operation_id: OperationId,
53
}
54
55
impl NativelinkOperationId {
56
2
    const fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
57
2
        Self {
58
2
            instance_name,
59
2
            client_operation_id,
60
2
        }
61
2
    }
62
63
2
    fn from_name(name: &str) -> Result<Self, Error> {
64
2
        let (instance_name, name) = name
65
2
            .rsplit_once('/')
66
2
            .err_tip(|| 
"Expected instance_name and name to be separated by '/'"0
)
?0
;
67
2
        Ok(NativelinkOperationId::new(
68
2
            instance_name.to_string(),
69
2
            OperationId::from(name),
70
2
        ))
71
2
    }
72
}
73
74
impl fmt::Display for NativelinkOperationId {
75
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76
0
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
77
0
    }
78
}
79
80
struct InstanceInfo {
81
    scheduler: Arc<dyn ClientStateManager>,
82
    cas_store: Store,
83
}
84
85
impl fmt::Debug for InstanceInfo {
86
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87
0
        f.debug_struct("InstanceInfo")
88
0
            .field("cas_store", &self.cas_store)
89
0
            .finish_non_exhaustive()
90
0
    }
91
}
92
93
impl InstanceInfo {
94
0
    async fn build_action_info(
95
0
        &self,
96
0
        instance_name: String,
97
0
        action_digest: DigestInfo,
98
0
        action: Action,
99
0
        priority: i32,
100
0
        skip_cache_lookup: bool,
101
0
        digest_function: DigestHasherFunc,
102
0
    ) -> Result<ActionInfo, Error> {
103
0
        let command_digest = DigestInfo::try_from(
104
0
            action
105
0
                .command_digest
106
0
                .clone()
107
0
                .err_tip(|| "Expected command_digest to exist")?,
108
        )
109
0
        .err_tip(|| "Could not decode command digest")?;
110
111
0
        let input_root_digest = DigestInfo::try_from(
112
0
            action
113
0
                .clone()
114
0
                .input_root_digest
115
0
                .err_tip(|| "Expected input_digest_root")?,
116
0
        )?;
117
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
118
0
            Duration::new(v.seconds as u64, v.nanos as u32)
119
0
        });
120
121
0
        let mut platform_properties = HashMap::new();
122
0
        if let Some(platform) = action.platform {
  Branch (122:16): [True: 0, False: 0]
  Branch (122:16): [Folded - Ignored]
123
0
            for property in platform.properties {
124
0
                platform_properties.insert(property.name, property.value);
125
0
            }
126
0
        }
127
128
        // Goma puts the properties in the Command.
129
0
        if platform_properties.is_empty() {
  Branch (129:12): [True: 0, False: 0]
  Branch (129:12): [Folded - Ignored]
130
0
            let command =
131
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
132
0
            if let Some(platform) = command.platform {
  Branch (132:20): [True: 0, False: 0]
  Branch (132:20): [Folded - Ignored]
133
0
                for property in platform.properties {
134
0
                    platform_properties.insert(property.name, property.value);
135
0
                }
136
0
            }
137
0
        }
138
139
0
        let action_key = ActionUniqueKey {
140
0
            instance_name,
141
0
            digest_function,
142
0
            digest: action_digest,
143
0
        };
144
0
        let unique_qualifier = if skip_cache_lookup {
  Branch (144:35): [True: 0, False: 0]
  Branch (144:35): [Folded - Ignored]
145
0
            ActionUniqueQualifier::Uncachable(action_key)
146
        } else {
147
0
            ActionUniqueQualifier::Cachable(action_key)
148
        };
149
150
0
        Ok(ActionInfo {
151
0
            command_digest,
152
0
            input_root_digest,
153
0
            timeout,
154
0
            platform_properties,
155
0
            priority,
156
0
            load_timestamp: UNIX_EPOCH,
157
0
            insert_timestamp: SystemTime::now(),
158
0
            unique_qualifier,
159
0
        })
160
0
    }
161
}
162
163
#[derive(Debug)]
164
pub struct ExecutionServer {
165
    instance_infos: HashMap<InstanceName, InstanceInfo>,
166
}
167
168
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
169
170
impl ExecutionServer {
171
0
    pub fn new(
172
0
        config: &HashMap<InstanceName, ExecutionConfig>,
173
0
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
174
0
        store_manager: &StoreManager,
175
0
    ) -> Result<Self, Error> {
176
0
        let mut instance_infos = HashMap::with_capacity(config.len());
177
0
        for (instance_name, exec_cfg) in config {
178
0
            let cas_store = store_manager
179
0
                .get_store(&exec_cfg.cas_store)
180
0
                .ok_or_else(|| {
181
0
                    make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store)
182
0
                })?;
183
0
            let scheduler = scheduler_map
184
0
                .get(&exec_cfg.scheduler)
185
0
                .err_tip(|| {
186
0
                    format!(
187
0
                        "Scheduler needs config for '{}' because it exists in execution",
188
0
                        exec_cfg.scheduler
189
0
                    )
190
0
                })?
191
0
                .clone();
192
0
193
0
            instance_infos.insert(
194
0
                instance_name.to_string(),
195
0
                InstanceInfo {
196
0
                    scheduler,
197
0
                    cas_store,
198
0
                },
199
            );
200
        }
201
0
        Ok(Self { instance_infos })
202
0
    }
203
204
0
    pub fn into_service(self) -> Server<ExecutionServer> {
205
0
        Server::new(self)
206
0
    }
207
208
0
    fn to_execute_stream(
209
0
        nl_client_operation_id: &NativelinkOperationId,
210
0
        action_listener: Box<dyn ActionStateResult>,
211
0
    ) -> impl Stream<Item = Result<Operation, Status>> + Send + use<> {
212
0
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
213
0
        unfold(Some(action_listener), move |maybe_action_listener| {
214
0
            let client_operation_id = client_operation_id.clone();
215
0
            async move {
216
0
                let mut action_listener = maybe_action_listener?;
217
0
                match action_listener.changed().await {
218
0
                    Ok((action_update, _maybe_origin_metadata)) => {
219
0
                        event!(Level::INFO, ?action_update, "Execute Resp Stream");
220
                        // If the action is finished we won't be sending any more updates.
221
0
                        let maybe_action_listener = if action_update.stage.is_finished() {
  Branch (221:56): [True: 0, False: 0]
  Branch (221:56): [Folded - Ignored]
222
0
                            None
223
                        } else {
224
0
                            Some(action_listener)
225
                        };
226
0
                        Some((
227
0
                            Ok(action_update.as_operation(client_operation_id)),
228
0
                            maybe_action_listener,
229
0
                        ))
230
                    }
231
0
                    Err(err) => {
232
0
                        event!(Level::ERROR, ?err, "Error in action_listener stream");
233
0
                        Some((Err(err.into()), None))
234
                    }
235
                }
236
0
            }
237
0
        })
238
0
    }
239
240
0
    async fn inner_execute(
241
0
        &self,
242
0
        request: ExecuteRequest,
243
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Error> {
244
0
        let instance_name = request.instance_name;
245
246
0
        let instance_info = self
247
0
            .instance_infos
248
0
            .get(&instance_name)
249
0
            .err_tip(|| "Instance name '{}' not configured")?;
250
251
0
        let digest = DigestInfo::try_from(
252
0
            request
253
0
                .action_digest
254
0
                .err_tip(|| "Expected action_digest to exist")?,
255
        )
256
0
        .err_tip(|| "Failed to unwrap action cache")?;
257
258
0
        let priority = request
259
0
            .execution_policy
260
0
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
261
262
0
        let action =
263
0
            get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
264
0
        let action_info = instance_info
265
0
            .build_action_info(
266
0
                instance_name.clone(),
267
0
                digest,
268
0
                action,
269
0
                priority,
270
0
                request.skip_cache_lookup,
271
0
                request
272
0
                    .digest_function
273
0
                    .try_into()
274
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
275
            )
276
0
            .await?;
277
278
0
        let action_listener = instance_info
279
0
            .scheduler
280
0
            .add_action(OperationId::default(), Arc::new(action_info))
281
0
            .await
282
0
            .err_tip(|| "Failed to schedule task")?;
283
284
0
        Ok(Box::pin(Self::to_execute_stream(
285
0
            &NativelinkOperationId::new(
286
0
                instance_name,
287
0
                action_listener
288
0
                    .as_state()
289
0
                    .await
290
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
291
                    .0
292
                    .client_operation_id
293
0
                    .clone(),
294
0
            ),
295
0
            action_listener,
296
        )))
297
0
    }
298
299
0
    async fn inner_wait_execution(
300
0
        &self,
301
0
        request: WaitExecutionRequest,
302
0
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Status> {
303
0
        let nl_operation_id = NativelinkOperationId::from_name(&request.name)
304
0
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?;
305
0
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
  Branch (305:13): [True: 0, False: 0]
  Branch (305:13): [Folded - Ignored]
306
0
            return Err(Status::not_found(format!(
307
0
                "No scheduler with the instance name {}",
308
0
                nl_operation_id.instance_name,
309
0
            )));
310
        };
311
0
        let Some(rx) = instance_info
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [Folded - Ignored]
312
0
            .scheduler
313
0
            .filter_operations(OperationFilter {
314
0
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
315
0
                ..Default::default()
316
0
            })
317
0
            .await
318
0
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")?
319
0
            .next()
320
0
            .await
321
        else {
322
0
            return Err(Status::not_found("Failed to find existing task"));
323
        };
324
0
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
325
0
    }
326
}
327
328
#[tonic::async_trait]
329
impl Execution for ExecutionServer {
330
    type ExecuteStream = ExecuteStream;
331
    type WaitExecutionStream = ExecuteStream;
332
333
    #[instrument(
334
        err,
335
        level = Level::ERROR,
336
        skip_all,
337
        fields(request = ?grpc_request.get_ref())
338
    )]
339
    async fn execute(
340
        &self,
341
        grpc_request: Request<ExecuteRequest>,
342
0
    ) -> Result<Response<ExecuteStream>, Status> {
343
0
        let request = grpc_request.into_inner();
344
0
        let ctx = OriginEventContext::new(|| &request).await;
345
0
        let resp = make_ctx_for_hash_func(request.digest_function)
346
0
            .err_tip(|| "In ExecutionServer::execute")?
347
0
            .wrap_async(
348
0
                error_span!("execution_server_execute"),
349
0
                self.inner_execute(request),
350
0
            )
351
0
            .await
352
0
            .map(|stream| ctx.wrap_stream(stream))
353
0
            .map(Response::new)
354
0
            .err_tip(|| "Failed on execute() command")
355
0
            .map_err(Into::into);
356
0
        ctx.emit(|| &resp).await;
357
0
        resp
358
0
    }
359
360
    #[instrument(
361
        err,
362
        level = Level::ERROR,
363
        skip_all,
364
        fields(request = ?grpc_request.get_ref())
365
    )]
366
    async fn wait_execution(
367
        &self,
368
        grpc_request: Request<WaitExecutionRequest>,
369
0
    ) -> Result<Response<ExecuteStream>, Status> {
370
0
        let request = grpc_request.into_inner();
371
0
        let ctx = OriginEventContext::new(|| &request).await;
372
0
        let resp = self
373
0
            .inner_wait_execution(request)
374
0
            .await
375
0
            .err_tip(|| "Failed on wait_execution() command")
376
0
            .map(|stream| ctx.wrap_stream(stream))
377
0
            .map(Response::new)
378
0
            .map_err(Into::into);
379
0
380
0
        if resp.is_ok() {
  Branch (380:12): [True: 0, False: 0]
  Branch (380:12): [Folded - Ignored]
381
0
            event!(Level::DEBUG, return = "Ok(<stream>)");
382
0
        }
383
0
        resp
384
0
    }
385
}
386
387
#[cfg(test)]
388
#[test]
389
1
fn test_nl_op_id_from_name() -> Result<(), Box<dyn std::error::Error>> {
390
1
    let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")];
391
392
3
    for (
input, expected2
) in examples {
393
2
        let id = NativelinkOperationId::from_name(input)
?0
;
394
2
        assert_eq!(id.instance_name, expected);
395
    }
396
397
1
    Ok(())
398
1
}