Coverage Report

Created: 2026-06-19 15:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::fmt;
20
use std::sync::Arc;
21
use std::time::{SystemTime, UNIX_EPOCH};
22
23
use bytes::Bytes;
24
use futures::stream::unfold;
25
use futures::{Stream, StreamExt};
26
use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName};
27
use nativelink_error::{Error, ResultExt, make_input_err};
28
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
29
    Execution, ExecutionServer as Server,
30
};
31
use nativelink_proto::build::bazel::remote::execution::v2::{
32
    Action, Command, ExecuteRequest, WaitExecutionRequest,
33
};
34
use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer};
35
use nativelink_proto::google::longrunning::{
36
    CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest,
37
    ListOperationsResponse, Operation, WaitOperationRequest,
38
};
39
use nativelink_proto::google::rpc::{
40
    PreconditionFailure, Status as GrpcStatusProto, precondition_failure,
41
};
42
use nativelink_scheduler::known_platform_property_provider::KnownPlatformPropertyProvider;
43
use nativelink_store::ac_utils::get_and_decode_digest;
44
use nativelink_store::store_manager::StoreManager;
45
use nativelink_util::action_messages::{
46
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, DEFAULT_EXECUTION_PRIORITY, OperationId,
47
    TypeUrl,
48
};
49
use nativelink_util::common::{self, DigestInfo};
50
use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func};
51
use nativelink_util::operation_state_manager::{ActionStateResult, OperationFilter};
52
use nativelink_util::store_trait::{Store, StoreLike};
53
use opentelemetry::context::FutureExt;
54
use prost::Message as _;
55
use tonic::{Code, Request, Response, Status};
56
use tracing::{Instrument, Level, debug, error, error_span, instrument, warn};
57
58
/// Result of a synchronous `Execute` decision before the async
59
/// scheduling stream begins. Stream is the happy path; Reject is a
60
/// client-facing gRPC `Status` returned without going through NL's
61
/// internal Error/instrumentation pipeline.
62
enum ExecuteOutcome<S> {
63
    Stream(S),
64
    Reject(Status),
65
}
66
67
/// Build a tonic [`Status`] of code `FAILED_PRECONDITION` whose details
68
/// carry a `google.rpc.PreconditionFailure` listing the missing CAS
69
/// blobs.
70
///
71
/// The pre-check that calls this is intentionally shallow: it only
72
/// validates the top-level Action proto, `command_digest`, and
73
/// `input_root_digest`. Nested Directory protos and file contents
74
/// under the input root are not walked here — the worker path fetches
75
/// those lazily and reports them via the same mechanism (see
76
/// `action_messages::to_execute_response`, which dispatches on
77
/// `Error::context`).
78
///
79
/// Race note: the pre-check uses `has_many` then the action is
80
/// scheduled; a blob present at check time may be evicted before the
81
/// worker fetches it. That case is intentionally not addressed here —
82
/// the worker path covers it with the same `FAILED_PRECONDITION`
83
/// surfacing, so Bazel retries either way.
84
4
fn missing_blobs_failed_precondition(
85
4
    missing: &[(DigestInfo, &'static str)],
86
4
    summary: &str,
87
4
) -> Status {
88
4
    let pf = PreconditionFailure {
89
4
        violations: missing
90
4
            .iter()
91
4
            .map(|(d, ctx)| precondition_failure::Violation {
92
5
                r#type: common::VIOLATION_TYPE_MISSING.to_string(),
93
                // Per REv2, the subject for a missing-blob violation is
94
                // `blobs/<hash>/<size>` so the client knows exactly
95
                // which digest to re-upload.
96
5
                subject: format!("blobs/{}/{}", d.packed_hash(), d.size_bytes()),
97
5
                description: (*ctx).to_string(),
98
5
            })
99
4
            .collect(),
100
    };
101
102
    // Wrap PreconditionFailure into a google.protobuf.Any.
103
4
    let mut pf_buf: Vec<u8> = Vec::with_capacity(pf.encoded_len());
104
4
    pf.encode(&mut pf_buf)
105
4
        .expect("encoding prost message into Vec<u8> cannot fail");
106
4
    let any = prost_types::Any {
107
4
        type_url: PreconditionFailure::TYPE_URL.to_string(),
108
4
        value: pf_buf,
109
4
    };
110
111
4
    let status_proto = GrpcStatusProto {
112
4
        code: Code::FailedPrecondition as i32,
113
4
        message: summary.to_string(),
114
4
        details: vec![any],
115
4
    };
116
4
    let mut status_buf: Vec<u8> = Vec::with_capacity(status_proto.encoded_len());
117
4
    status_proto
118
4
        .encode(&mut status_buf)
119
4
        .expect("encoding prost message into Vec<u8> cannot fail");
120
121
4
    Status::with_details(
122
4
        Code::FailedPrecondition,
123
4
        summary.to_string(),
124
4
        Bytes::from(status_buf),
125
    )
126
4
}
127
128
type InstanceInfoName = String;
129
130
struct NativelinkOperationId {
131
    instance_name: InstanceInfoName,
132
    client_operation_id: OperationId,
133
}
134
135
impl NativelinkOperationId {
136
6
    const fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
137
6
        Self {
138
6
            instance_name,
139
6
            client_operation_id,
140
6
        }
141
6
    }
142
143
6
    fn from_name(name: &str) -> Result<Self, Error> {
144
6
        let (instance_name, name) = name
145
6
            .rsplit_once('/')
146
6
            .err_tip(|| "Expected instance_name and name to be separated by '/'")
?0
;
147
6
        Ok(Self::new(
148
6
            instance_name.to_string(),
149
6
            OperationId::from(name),
150
6
        ))
151
6
    }
152
}
153
154
impl fmt::Display for NativelinkOperationId {
155
3
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156
3
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
157
3
    }
158
}
159
160
#[derive(Clone)]
161
struct InstanceInfo {
162
    scheduler: Arc<dyn KnownPlatformPropertyProvider>,
163
    cas_store: Store,
164
}
165
166
impl fmt::Debug for InstanceInfo {
167
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168
0
        f.debug_struct("InstanceInfo")
169
0
            .field("cas_store", &self.cas_store)
170
0
            .finish_non_exhaustive()
171
0
    }
172
}
173
174
impl InstanceInfo {
175
0
    async fn build_action_info(
176
0
        &self,
177
0
        instance_name: String,
178
0
        action_digest: DigestInfo,
179
0
        action: Action,
180
0
        priority: i32,
181
0
        skip_cache_lookup: bool,
182
0
        digest_function: DigestHasherFunc,
183
0
    ) -> Result<ActionInfo, Error> {
184
0
        let command_digest = DigestInfo::try_from(
185
0
            action
186
0
                .command_digest
187
0
                .clone()
188
0
                .err_tip(|| "Expected command_digest to exist")?,
189
        )
190
0
        .err_tip(|| "Could not decode command digest")?;
191
192
0
        let input_root_digest = DigestInfo::try_from(
193
0
            action
194
0
                .clone()
195
0
                .input_root_digest
196
0
                .err_tip(|| "Expected input_digest_root")?,
197
0
        )?;
198
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
199
0
            Duration::new(v.seconds as u64, v.nanos as u32)
200
0
        });
201
202
0
        let mut platform_properties = HashMap::new();
203
0
        if let Some(platform) = action.platform {
204
0
            for property in platform.properties {
205
0
                platform_properties.insert(property.name, property.value);
206
0
            }
207
0
        }
208
209
        // Goma puts the properties in the Command.
210
0
        if platform_properties.is_empty() {
211
0
            let command =
212
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
213
0
            if let Some(platform) = command.platform {
214
0
                for property in platform.properties {
215
0
                    platform_properties.insert(property.name, property.value);
216
0
                }
217
0
            }
218
0
        }
219
220
0
        let action_key = ActionUniqueKey {
221
0
            instance_name,
222
0
            digest_function,
223
0
            digest: action_digest,
224
0
        };
225
0
        let unique_qualifier = if skip_cache_lookup {
226
0
            ActionUniqueQualifier::Uncacheable(action_key)
227
        } else {
228
0
            ActionUniqueQualifier::Cacheable(action_key)
229
        };
230
231
0
        Ok(ActionInfo {
232
0
            command_digest,
233
0
            input_root_digest,
234
0
            timeout,
235
0
            platform_properties,
236
0
            priority,
237
0
            load_timestamp: UNIX_EPOCH,
238
0
            insert_timestamp: SystemTime::now(),
239
0
            unique_qualifier,
240
0
        })
241
0
    }
242
}
243
244
#[derive(Debug, Clone)]
245
pub struct ExecutionServer {
246
    instance_infos: HashMap<InstanceName, InstanceInfo>,
247
}
248
249
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
250
251
impl ExecutionServer {
252
12
    pub fn new(
253
12
        configs: &[WithInstanceName<ExecutionConfig>],
254
12
        scheduler_map: &HashMap<String, Arc<dyn KnownPlatformPropertyProvider>>,
255
12
        store_manager: &StoreManager,
256
12
    ) -> Result<Self, Error> {
257
12
        let mut instance_infos = HashMap::with_capacity(configs.len());
258
12
        for config in configs {
259
12
            let cas_store = store_manager.get_store(&config.cas_store).ok_or_else(|| 
{0
260
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
261
0
            })?;
262
12
            let scheduler = scheduler_map
263
12
                .get(&config.scheduler)
264
12
                .err_tip(|| 
{0
265
0
                    format!(
266
                        "Scheduler needs config for '{}' because it exists in execution",
267
0
                        config.scheduler
268
                    )
269
0
                })?
270
12
                .clone();
271
272
12
            instance_infos.insert(
273
12
                config.instance_name.clone(),
274
12
                InstanceInfo {
275
12
                    scheduler,
276
12
                    cas_store,
277
12
                },
278
            );
279
        }
280
12
        Ok(Self { instance_infos })
281
12
    }
282
283
0
    pub fn into_service(self) -> Server<Self> {
284
0
        Server::new(self)
285
0
    }
286
287
0
    pub fn into_operations_service(self) -> OperationsServer<Self> {
288
0
        OperationsServer::new(self)
289
0
    }
290
291
3
    fn to_execute_stream(
292
3
        nl_client_operation_id: &NativelinkOperationId,
293
3
        action_listener: Box<dyn ActionStateResult>,
294
3
    ) -> impl Stream<Item = Result<Operation, Status>> + Send + use<> {
295
3
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
296
5
        
unfold3
(
Some(action_listener)3
, move |maybe_action_listener| {
297
5
            let client_operation_id = client_operation_id.clone();
298
5
            async move {
299
5
                let mut action_listener = maybe_action_listener
?0
;
300
5
                match action_listener.changed().await {
301
4
                    Ok((action_update, _maybe_origin_metadata)) => {
302
4
                        debug!(?action_update, "Execute Resp Stream");
303
4
                        Some((
304
4
                            Ok(action_update.as_operation(client_operation_id)),
305
4
                            (!action_update.stage.is_finished()).then_some(action_listener),
306
4
                        ))
307
                    }
308
0
                    Err(err) => {
309
0
                        error!(?err, "Error in action_listener stream");
310
0
                        Some((Err(err.into()), None))
311
                    }
312
                }
313
4
            }
314
5
        })
315
3
    }
316
317
5
    async fn inner_execute(
318
5
        &self,
319
5
        request: ExecuteRequest,
320
5
    ) -> Result<ExecuteOutcome<impl Stream<Item = Result<Operation, Status>> + Send + use<>>, Error>
321
5
    {
322
5
        let instance_name = request.instance_name;
323
324
5
        let 
instance_info4
= self
325
5
            .instance_infos
326
5
            .get(&instance_name)
327
5
            .err_tip(|| 
format!1
("'instance_name' not configured for '{instance_name}'"))
?1
;
328
329
4
        let digest = DigestInfo::try_from(
330
4
            request
331
4
                .action_digest
332
4
                .err_tip(|| "Expected action_digest to exist")
?0
,
333
        )
334
4
        .err_tip(|| "Failed to unwrap action cache")
?0
;
335
336
4
        let priority = request
337
4
            .execution_policy
338
4
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
339
340
4
        let 
action3
= match get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into())
341
4
            .await
342
        {
343
3
            Ok(a) => a,
344
1
            Err(e) if e.code == Code::NotFound => {
345
1
                warn!(
346
                    %digest,
347
                    %e,
348
                    "Execute: Action proto missing from CAS; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
349
                );
350
1
                let summary = format!(
351
                    "Action {digest} is missing from CAS; client should re-upload it and retry"
352
                );
353
1
                return Ok(ExecuteOutcome::Reject(missing_blobs_failed_precondition(
354
1
                    &[(digest, "Action")],
355
1
                    &summary,
356
1
                )));
357
            }
358
0
            Err(e) => return Err(e).err_tip(|| "Decoding Action proto in Execute")?,
359
        };
360
361
3
        let action_command_digest = action
362
3
            .command_digest
363
3
            .as_ref()
364
3
            .map(|d| DigestInfo::try_from(d.clone()))
365
3
            .transpose()
366
3
            .err_tip(|| "Failed to parse command_digest from Action")
?0
;
367
3
        let action_input_root_digest = action
368
3
            .input_root_digest
369
3
            .as_ref()
370
3
            .map(|d| DigestInfo::try_from(d.clone()))
371
3
            .transpose()
372
3
            .err_tip(|| "Failed to parse input_root_digest from Action")
?0
;
373
3
        let mut blobs_to_check: Vec<DigestInfo> = Vec::with_capacity(2);
374
3
        if let Some(d) = action_command_digest {
375
3
            blobs_to_check.push(d);
376
3
        
}0
377
3
        if let Some(d) = action_input_root_digest {
378
3
            blobs_to_check.push(d);
379
3
        
}0
380
3
        if !blobs_to_check.is_empty() {
381
6
            let 
store_keys3
:
Vec<_>3
=
blobs_to_check.iter()3
.
map3
(|d| (*d).into()).
collect3
();
382
3
            let sizes = instance_info
383
3
                .cas_store
384
3
                .has_many(&store_keys)
385
3
                .await
386
3
                .err_tip(|| "Validating Action input blobs in CAS")
?0
;
387
3
            let mut missing: Vec<(DigestInfo, &'static str)> = Vec::new();
388
6
            for ((digest, present), label) in blobs_to_check
389
3
                .iter()
390
3
                .zip(sizes.iter())
391
3
                .zip(["Action.command_digest", "Action.input_root_digest"].iter())
392
            {
393
6
                if present.is_none() {
394
4
                    missing.push((*digest, label));
395
4
                
}2
396
            }
397
3
            if !missing.is_empty() {
398
3
                warn!(
399
                    ?missing,
400
                    %digest,
401
                    "Execute pre-check found missing CAS blobs; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
402
                );
403
3
                let summary = format!(
404
                    "{} CAS blob(s) referenced by action {} are missing; client should re-upload them and retry",
405
3
                    missing.len(),
406
                    digest,
407
                );
408
3
                return Ok(ExecuteOutcome::Reject(missing_blobs_failed_precondition(
409
3
                    &missing, &summary,
410
3
                )));
411
0
            }
412
0
        }
413
414
0
        let action_info = instance_info
415
0
            .build_action_info(
416
0
                instance_name.clone(),
417
0
                digest,
418
0
                action,
419
0
                priority,
420
0
                request.skip_cache_lookup,
421
0
                request
422
0
                    .digest_function
423
0
                    .try_into()
424
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
425
            )
426
0
            .await?;
427
428
0
        let action_listener = instance_info
429
0
            .scheduler
430
0
            .add_action(OperationId::default(), Arc::new(action_info))
431
0
            .await
432
0
            .err_tip(|| "Failed to schedule task")?;
433
434
0
        Ok(ExecuteOutcome::Stream(Self::to_execute_stream(
435
0
            &NativelinkOperationId::new(
436
0
                instance_name,
437
0
                action_listener
438
0
                    .as_state()
439
0
                    .await
440
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
441
                    .0
442
                    .client_operation_id
443
0
                    .clone(),
444
            ),
445
0
            action_listener,
446
        )))
447
5
    }
448
449
4
    async fn inner_wait_execution(
450
4
        &self,
451
4
        request: WaitExecutionRequest,
452
4
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Status> {
453
4
        let nl_operation_id = NativelinkOperationId::from_name(&request.name)
454
4
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")
?0
;
455
4
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
456
0
            return Err(Status::not_found(format!(
457
0
                "No scheduler with the instance name {}",
458
0
                nl_operation_id.instance_name,
459
0
            )));
460
        };
461
4
        let Some(
rx3
) = instance_info
462
4
            .scheduler
463
4
            .filter_operations(OperationFilter {
464
4
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
465
4
                ..Default::default()
466
4
            })
467
4
            .await
468
4
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")
?0
469
4
            .next()
470
4
            .await
471
        else {
472
1
            return Err(Status::not_found("Failed to find existing task"));
473
        };
474
3
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
475
4
    }
476
}
477
478
#[tonic::async_trait]
479
impl Execution for ExecutionServer {
480
    type ExecuteStream = ExecuteStream;
481
    type WaitExecutionStream = ExecuteStream;
482
483
    #[instrument(
484
        err,
485
        level = Level::ERROR,
486
        skip_all,
487
        fields(request = ?grpc_request.get_ref())
488
    )]
489
    async fn execute(
490
        &self,
491
        grpc_request: Request<ExecuteRequest>,
492
    ) -> Result<Response<ExecuteStream>, Status> {
493
        let request = grpc_request.into_inner();
494
495
        let digest_function = request.digest_function;
496
        let result = self
497
            .inner_execute(request)
498
            .instrument(error_span!("execution_server_execute"))
499
            .with_context(
500
                make_ctx_for_hash_func(digest_function)
501
                    .err_tip(|| "In ExecutionServer::execute")?,
502
            )
503
            .await
504
            .err_tip(|| "Failed on execute() command")?;
505
506
        match result {
507
            ExecuteOutcome::Stream(stream) => Ok(Response::new(Box::pin(stream))),
508
            ExecuteOutcome::Reject(status) => Err(status),
509
        }
510
    }
511
512
    #[instrument(
513
        err,
514
        level = Level::ERROR,
515
        skip_all,
516
        fields(request = ?grpc_request.get_ref())
517
    )]
518
    async fn wait_execution(
519
        &self,
520
        grpc_request: Request<WaitExecutionRequest>,
521
    ) -> Result<Response<ExecuteStream>, Status> {
522
        let request = grpc_request.into_inner();
523
524
        let stream_result = self
525
            .inner_wait_execution(request)
526
            .await
527
            .err_tip(|| "Failed on wait_execution() command")
528
            .map_err(Into::into);
529
        let stream = match stream_result {
530
            Ok(stream) => stream,
531
            Err(e) => return Err(e),
532
        };
533
        debug!(return = "Ok(<stream>)");
534
        Ok(Response::new(Box::pin(stream)))
535
    }
536
}
537
538
#[tonic::async_trait]
539
impl Operations for ExecutionServer {
540
    async fn list_operations(
541
        &self,
542
        _request: Request<ListOperationsRequest>,
543
1
    ) -> Result<Response<ListOperationsResponse>, Status> {
544
        Err(Status::unimplemented("list_operations not implemented"))
545
1
    }
546
547
    async fn delete_operation(
548
        &self,
549
        _request: Request<DeleteOperationRequest>,
550
1
    ) -> Result<Response<()>, Status> {
551
        Err(Status::unimplemented("delete_operation not implemented"))
552
1
    }
553
554
    async fn cancel_operation(
555
        &self,
556
        _request: Request<CancelOperationRequest>,
557
1
    ) -> Result<Response<()>, Status> {
558
        Err(Status::unimplemented("cancel_operation not implemented"))
559
1
    }
560
561
    async fn get_operation(
562
        &self,
563
        request: Request<GetOperationRequest>,
564
2
    ) -> Result<Response<Operation>, Status> {
565
        let inner_request = request.into_inner();
566
567
        let mut stream = Box::pin(
568
            self.inner_wait_execution(WaitExecutionRequest {
569
                name: inner_request.name,
570
            })
571
            .await?,
572
        );
573
574
        let operation = stream
575
            .next()
576
            .await
577
0
            .ok_or_else(|| Status::not_found("Operation not found"))??;
578
579
        Ok(Response::new(operation))
580
2
    }
581
582
    async fn wait_operation(
583
        &self,
584
        request: Request<WaitOperationRequest>,
585
2
    ) -> Result<Response<Operation>, Status> {
586
        let inner_request = request.into_inner();
587
1
        let timeout_opt = inner_request.timeout.map(|d| {
588
1
            let secs = u64::try_from(d.seconds).unwrap_or(0);
589
1
            let nanos = u32::try_from(d.nanos).unwrap_or(0);
590
1
            Duration::new(secs, nanos)
591
1
        });
592
593
        let mut stream = Box::pin(
594
            self.inner_wait_execution(WaitExecutionRequest {
595
                name: inner_request.name,
596
            })
597
            .await?,
598
        );
599
600
        let mut last_operation = stream
601
            .next()
602
            .await
603
0
            .ok_or_else(|| Status::not_found("Operation not found"))??;
604
605
        if last_operation.done {
606
            return Ok(Response::new(last_operation));
607
        }
608
609
1
        let end_time = timeout_opt.map(|t| tokio::time::Instant::now() + t);
610
611
        loop {
612
            let next_fut = stream.next();
613
            let next_res = if let Some(end) = end_time {
614
                match tokio::time::timeout_at(end, next_fut).await {
615
                    Ok(res) => res,
616
                    Err(_) => break,
617
                }
618
            } else {
619
                next_fut.await
620
            };
621
622
            match next_res {
623
                Some(Ok(operation)) => {
624
                    let is_done = operation.done;
625
                    last_operation = operation;
626
                    if is_done {
627
                        break;
628
                    }
629
                }
630
                Some(Err(e)) => return Err(e),
631
                None => break,
632
            }
633
        }
634
635
        Ok(Response::new(last_operation))
636
2
    }
637
}
638
639
#[cfg(test)]
640
#[test]
641
1
fn test_nl_op_id_from_name() -> Result<(), Box<dyn core::error::Error>> {
642
1
    let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")];
643
644
2
    for (input, expected) in 
examples1
{
645
2
        let id = NativelinkOperationId::from_name(input)
?0
;
646
2
        assert_eq!(id.instance_name, expected);
647
    }
648
649
1
    Ok(())
650
1
}