Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-service/src/execution_server.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::convert::Into;
16
use core::pin::Pin;
17
use core::time::Duration;
18
use std::collections::HashMap;
19
use std::fmt;
20
use std::sync::Arc;
21
use std::time::{SystemTime, UNIX_EPOCH};
22
23
use bytes::Bytes;
24
use futures::stream::unfold;
25
use futures::{Stream, StreamExt};
26
use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName};
27
use nativelink_error::{Error, ResultExt, make_input_err};
28
use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
29
    Execution, ExecutionServer as Server,
30
};
31
use nativelink_proto::build::bazel::remote::execution::v2::{
32
    Action, Command, ExecuteRequest, WaitExecutionRequest,
33
};
34
use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer};
35
use nativelink_proto::google::longrunning::{
36
    CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest,
37
    ListOperationsResponse, Operation, WaitOperationRequest,
38
};
39
use nativelink_proto::google::rpc::{
40
    PreconditionFailure, Status as GrpcStatusProto, precondition_failure,
41
};
42
use nativelink_store::ac_utils::get_and_decode_digest;
43
use nativelink_store::store_manager::StoreManager;
44
use nativelink_util::action_messages::{
45
    ActionInfo, ActionUniqueKey, ActionUniqueQualifier, DEFAULT_EXECUTION_PRIORITY, OperationId,
46
    TypeUrl,
47
};
48
use nativelink_util::common::{self, DigestInfo};
49
use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func};
50
use nativelink_util::operation_state_manager::{
51
    ActionStateResult, ClientStateManager, OperationFilter,
52
};
53
use nativelink_util::store_trait::{Store, StoreLike};
54
use opentelemetry::context::FutureExt;
55
use prost::Message as _;
56
use tonic::{Code, Request, Response, Status};
57
use tracing::{Instrument, Level, debug, error, error_span, instrument, warn};
58
59
/// Result of a synchronous `Execute` decision before the async
60
/// scheduling stream begins. Stream is the happy path; Reject is a
61
/// client-facing gRPC `Status` returned without going through NL's
62
/// internal Error/instrumentation pipeline.
63
enum ExecuteOutcome<S> {
64
    Stream(S),
65
    Reject(Status),
66
}
67
68
/// Build a tonic [`Status`] of code `FAILED_PRECONDITION` whose details
69
/// carry a `google.rpc.PreconditionFailure` listing the missing CAS
70
/// blobs.
71
///
72
/// The pre-check that calls this is intentionally shallow: it only
73
/// validates the top-level Action proto, `command_digest`, and
74
/// `input_root_digest`. Nested Directory protos and file contents
75
/// under the input root are not walked here — the worker path fetches
76
/// those lazily and reports them via the same mechanism (see
77
/// `action_messages::to_execute_response`, which dispatches on
78
/// `Error::context`).
79
///
80
/// Race note: the pre-check uses `has_many` then the action is
81
/// scheduled; a blob present at check time may be evicted before the
82
/// worker fetches it. That case is intentionally not addressed here —
83
/// the worker path covers it with the same `FAILED_PRECONDITION`
84
/// surfacing, so Bazel retries either way.
85
4
fn missing_blobs_failed_precondition(
86
4
    missing: &[(DigestInfo, &'static str)],
87
4
    summary: &str,
88
4
) -> Status {
89
4
    let pf = PreconditionFailure {
90
4
        violations: missing
91
4
            .iter()
92
4
            .map(|(d, ctx)| precondition_failure::Violation {
93
5
                r#type: common::VIOLATION_TYPE_MISSING.to_string(),
94
                // Per REv2, the subject for a missing-blob violation is
95
                // `blobs/<hash>/<size>` so the client knows exactly
96
                // which digest to re-upload.
97
5
                subject: format!("blobs/{}/{}", d.packed_hash(), d.size_bytes()),
98
5
                description: (*ctx).to_string(),
99
5
            })
100
4
            .collect(),
101
    };
102
103
    // Wrap PreconditionFailure into a google.protobuf.Any.
104
4
    let mut pf_buf: Vec<u8> = Vec::with_capacity(pf.encoded_len());
105
4
    pf.encode(&mut pf_buf)
106
4
        .expect("encoding prost message into Vec<u8> cannot fail");
107
4
    let any = prost_types::Any {
108
4
        type_url: PreconditionFailure::TYPE_URL.to_string(),
109
4
        value: pf_buf,
110
4
    };
111
112
4
    let status_proto = GrpcStatusProto {
113
4
        code: Code::FailedPrecondition as i32,
114
4
        message: summary.to_string(),
115
4
        details: vec![any],
116
4
    };
117
4
    let mut status_buf: Vec<u8> = Vec::with_capacity(status_proto.encoded_len());
118
4
    status_proto
119
4
        .encode(&mut status_buf)
120
4
        .expect("encoding prost message into Vec<u8> cannot fail");
121
122
4
    Status::with_details(
123
4
        Code::FailedPrecondition,
124
4
        summary.to_string(),
125
4
        Bytes::from(status_buf),
126
    )
127
4
}
128
129
type InstanceInfoName = String;
130
131
struct NativelinkOperationId {
132
    instance_name: InstanceInfoName,
133
    client_operation_id: OperationId,
134
}
135
136
impl NativelinkOperationId {
137
6
    const fn new(instance_name: InstanceInfoName, client_operation_id: OperationId) -> Self {
138
6
        Self {
139
6
            instance_name,
140
6
            client_operation_id,
141
6
        }
142
6
    }
143
144
6
    fn from_name(name: &str) -> Result<Self, Error> {
145
6
        let (instance_name, name) = name
146
6
            .rsplit_once('/')
147
6
            .err_tip(|| "Expected instance_name and name to be separated by '/'")
?0
;
148
6
        Ok(Self::new(
149
6
            instance_name.to_string(),
150
6
            OperationId::from(name),
151
6
        ))
152
6
    }
153
}
154
155
impl fmt::Display for NativelinkOperationId {
156
3
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157
3
        write!(f, "{}/{}", self.instance_name, self.client_operation_id)
158
3
    }
159
}
160
161
#[derive(Clone)]
162
struct InstanceInfo {
163
    scheduler: Arc<dyn ClientStateManager>,
164
    cas_store: Store,
165
}
166
167
impl fmt::Debug for InstanceInfo {
168
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169
0
        f.debug_struct("InstanceInfo")
170
0
            .field("cas_store", &self.cas_store)
171
0
            .finish_non_exhaustive()
172
0
    }
173
}
174
175
impl InstanceInfo {
176
0
    async fn build_action_info(
177
0
        &self,
178
0
        instance_name: String,
179
0
        action_digest: DigestInfo,
180
0
        action: Action,
181
0
        priority: i32,
182
0
        skip_cache_lookup: bool,
183
0
        digest_function: DigestHasherFunc,
184
0
    ) -> Result<ActionInfo, Error> {
185
0
        let command_digest = DigestInfo::try_from(
186
0
            action
187
0
                .command_digest
188
0
                .clone()
189
0
                .err_tip(|| "Expected command_digest to exist")?,
190
        )
191
0
        .err_tip(|| "Could not decode command digest")?;
192
193
0
        let input_root_digest = DigestInfo::try_from(
194
0
            action
195
0
                .clone()
196
0
                .input_root_digest
197
0
                .err_tip(|| "Expected input_digest_root")?,
198
0
        )?;
199
0
        let timeout = action.timeout.map_or(Duration::MAX, |v| {
200
0
            Duration::new(v.seconds as u64, v.nanos as u32)
201
0
        });
202
203
0
        let mut platform_properties = HashMap::new();
204
0
        if let Some(platform) = action.platform {
205
0
            for property in platform.properties {
206
0
                platform_properties.insert(property.name, property.value);
207
0
            }
208
0
        }
209
210
        // Goma puts the properties in the Command.
211
0
        if platform_properties.is_empty() {
212
0
            let command =
213
0
                get_and_decode_digest::<Command>(&self.cas_store, command_digest.into()).await?;
214
0
            if let Some(platform) = command.platform {
215
0
                for property in platform.properties {
216
0
                    platform_properties.insert(property.name, property.value);
217
0
                }
218
0
            }
219
0
        }
220
221
0
        let action_key = ActionUniqueKey {
222
0
            instance_name,
223
0
            digest_function,
224
0
            digest: action_digest,
225
0
        };
226
0
        let unique_qualifier = if skip_cache_lookup {
227
0
            ActionUniqueQualifier::Uncacheable(action_key)
228
        } else {
229
0
            ActionUniqueQualifier::Cacheable(action_key)
230
        };
231
232
0
        Ok(ActionInfo {
233
0
            command_digest,
234
0
            input_root_digest,
235
0
            timeout,
236
0
            platform_properties,
237
0
            priority,
238
0
            load_timestamp: UNIX_EPOCH,
239
0
            insert_timestamp: SystemTime::now(),
240
0
            unique_qualifier,
241
0
        })
242
0
    }
243
}
244
245
#[derive(Debug, Clone)]
246
pub struct ExecutionServer {
247
    instance_infos: HashMap<InstanceName, InstanceInfo>,
248
}
249
250
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
251
252
impl ExecutionServer {
253
12
    pub fn new(
254
12
        configs: &[WithInstanceName<ExecutionConfig>],
255
12
        scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
256
12
        store_manager: &StoreManager,
257
12
    ) -> Result<Self, Error> {
258
12
        let mut instance_infos = HashMap::with_capacity(configs.len());
259
12
        for config in configs {
260
12
            let cas_store = store_manager.get_store(&config.cas_store).ok_or_else(|| 
{0
261
0
                make_input_err!("'cas_store': '{}' does not exist", config.cas_store)
262
0
            })?;
263
12
            let scheduler = scheduler_map
264
12
                .get(&config.scheduler)
265
12
                .err_tip(|| 
{0
266
0
                    format!(
267
                        "Scheduler needs config for '{}' because it exists in execution",
268
0
                        config.scheduler
269
                    )
270
0
                })?
271
12
                .clone();
272
273
12
            instance_infos.insert(
274
12
                config.instance_name.clone(),
275
12
                InstanceInfo {
276
12
                    scheduler,
277
12
                    cas_store,
278
12
                },
279
            );
280
        }
281
12
        Ok(Self { instance_infos })
282
12
    }
283
284
0
    pub fn into_service(self) -> Server<Self> {
285
0
        Server::new(self)
286
0
    }
287
288
0
    pub fn into_operations_service(self) -> OperationsServer<Self> {
289
0
        OperationsServer::new(self)
290
0
    }
291
292
3
    fn to_execute_stream(
293
3
        nl_client_operation_id: &NativelinkOperationId,
294
3
        action_listener: Box<dyn ActionStateResult>,
295
3
    ) -> impl Stream<Item = Result<Operation, Status>> + Send + use<> {
296
3
        let client_operation_id = OperationId::from(nl_client_operation_id.to_string());
297
5
        
unfold3
(
Some(action_listener)3
, move |maybe_action_listener| {
298
5
            let client_operation_id = client_operation_id.clone();
299
5
            async move {
300
5
                let mut action_listener = maybe_action_listener
?0
;
301
5
                match action_listener.changed().await {
302
4
                    Ok((action_update, _maybe_origin_metadata)) => {
303
4
                        debug!(?action_update, "Execute Resp Stream");
304
4
                        Some((
305
4
                            Ok(action_update.as_operation(client_operation_id)),
306
4
                            (!action_update.stage.is_finished()).then_some(action_listener),
307
4
                        ))
308
                    }
309
0
                    Err(err) => {
310
0
                        error!(?err, "Error in action_listener stream");
311
0
                        Some((Err(err.into()), None))
312
                    }
313
                }
314
4
            }
315
5
        })
316
3
    }
317
318
5
    async fn inner_execute(
319
5
        &self,
320
5
        request: ExecuteRequest,
321
5
    ) -> Result<ExecuteOutcome<impl Stream<Item = Result<Operation, Status>> + Send + use<>>, Error>
322
5
    {
323
5
        let instance_name = request.instance_name;
324
325
5
        let 
instance_info4
= self
326
5
            .instance_infos
327
5
            .get(&instance_name)
328
5
            .err_tip(|| 
format!1
("'instance_name' not configured for '{instance_name}'"))
?1
;
329
330
4
        let digest = DigestInfo::try_from(
331
4
            request
332
4
                .action_digest
333
4
                .err_tip(|| "Expected action_digest to exist")
?0
,
334
        )
335
4
        .err_tip(|| "Failed to unwrap action cache")
?0
;
336
337
4
        let priority = request
338
4
            .execution_policy
339
4
            .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);
340
341
4
        let 
action3
= match get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into())
342
4
            .await
343
        {
344
3
            Ok(a) => a,
345
1
            Err(e) if e.code == Code::NotFound => {
346
1
                warn!(
347
                    %digest,
348
                    %e,
349
                    "Execute: Action proto missing from CAS; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
350
                );
351
1
                let summary = format!(
352
                    "Action {digest} is missing from CAS; client should re-upload it and retry"
353
                );
354
1
                return Ok(ExecuteOutcome::Reject(missing_blobs_failed_precondition(
355
1
                    &[(digest, "Action")],
356
1
                    &summary,
357
1
                )));
358
            }
359
0
            Err(e) => return Err(e).err_tip(|| "Decoding Action proto in Execute")?,
360
        };
361
362
3
        let action_command_digest = action
363
3
            .command_digest
364
3
            .as_ref()
365
3
            .map(|d| DigestInfo::try_from(d.clone()))
366
3
            .transpose()
367
3
            .err_tip(|| "Failed to parse command_digest from Action")
?0
;
368
3
        let action_input_root_digest = action
369
3
            .input_root_digest
370
3
            .as_ref()
371
3
            .map(|d| DigestInfo::try_from(d.clone()))
372
3
            .transpose()
373
3
            .err_tip(|| "Failed to parse input_root_digest from Action")
?0
;
374
3
        let mut blobs_to_check: Vec<DigestInfo> = Vec::with_capacity(2);
375
3
        if let Some(d) = action_command_digest {
376
3
            blobs_to_check.push(d);
377
3
        
}0
378
3
        if let Some(d) = action_input_root_digest {
379
3
            blobs_to_check.push(d);
380
3
        
}0
381
3
        if !blobs_to_check.is_empty() {
382
6
            let 
store_keys3
:
Vec<_>3
=
blobs_to_check.iter()3
.
map3
(|d| (*d).into()).
collect3
();
383
3
            let sizes = instance_info
384
3
                .cas_store
385
3
                .has_many(&store_keys)
386
3
                .await
387
3
                .err_tip(|| "Validating Action input blobs in CAS")
?0
;
388
3
            let mut missing: Vec<(DigestInfo, &'static str)> = Vec::new();
389
6
            for ((digest, present), label) in blobs_to_check
390
3
                .iter()
391
3
                .zip(sizes.iter())
392
3
                .zip(["Action.command_digest", "Action.input_root_digest"].iter())
393
            {
394
6
                if present.is_none() {
395
4
                    missing.push((*digest, label));
396
4
                
}2
397
            }
398
3
            if !missing.is_empty() {
399
3
                warn!(
400
                    ?missing,
401
                    %digest,
402
                    "Execute pre-check found missing CAS blobs; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
403
                );
404
3
                let summary = format!(
405
                    "{} CAS blob(s) referenced by action {} are missing; client should re-upload them and retry",
406
3
                    missing.len(),
407
                    digest,
408
                );
409
3
                return Ok(ExecuteOutcome::Reject(missing_blobs_failed_precondition(
410
3
                    &missing, &summary,
411
3
                )));
412
0
            }
413
0
        }
414
415
0
        let action_info = instance_info
416
0
            .build_action_info(
417
0
                instance_name.clone(),
418
0
                digest,
419
0
                action,
420
0
                priority,
421
0
                request.skip_cache_lookup,
422
0
                request
423
0
                    .digest_function
424
0
                    .try_into()
425
0
                    .err_tip(|| "Could not convert digest function in inner_execute()")?,
426
            )
427
0
            .await?;
428
429
0
        let action_listener = instance_info
430
0
            .scheduler
431
0
            .add_action(OperationId::default(), Arc::new(action_info))
432
0
            .await
433
0
            .err_tip(|| "Failed to schedule task")?;
434
435
0
        Ok(ExecuteOutcome::Stream(Self::to_execute_stream(
436
0
            &NativelinkOperationId::new(
437
0
                instance_name,
438
0
                action_listener
439
0
                    .as_state()
440
0
                    .await
441
0
                    .err_tip(|| "In ExecutionServer::inner_execute")?
442
                    .0
443
                    .client_operation_id
444
0
                    .clone(),
445
            ),
446
0
            action_listener,
447
        )))
448
5
    }
449
450
4
    async fn inner_wait_execution(
451
4
        &self,
452
4
        request: WaitExecutionRequest,
453
4
    ) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Status> {
454
4
        let nl_operation_id = NativelinkOperationId::from_name(&request.name)
455
4
            .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")
?0
;
456
4
        let Some(instance_info) = self.instance_infos.get(&nl_operation_id.instance_name) else {
457
0
            return Err(Status::not_found(format!(
458
0
                "No scheduler with the instance name {}",
459
0
                nl_operation_id.instance_name,
460
0
            )));
461
        };
462
4
        let Some(
rx3
) = instance_info
463
4
            .scheduler
464
4
            .filter_operations(OperationFilter {
465
4
                client_operation_id: Some(nl_operation_id.client_operation_id.clone()),
466
4
                ..Default::default()
467
4
            })
468
4
            .await
469
4
            .err_tip(|| "Error running find_existing_action in ExecutionServer::wait_execution")
?0
470
4
            .next()
471
4
            .await
472
        else {
473
1
            return Err(Status::not_found("Failed to find existing task"));
474
        };
475
3
        Ok(Self::to_execute_stream(&nl_operation_id, rx))
476
4
    }
477
}
478
479
#[tonic::async_trait]
480
impl Execution for ExecutionServer {
481
    type ExecuteStream = ExecuteStream;
482
    type WaitExecutionStream = ExecuteStream;
483
484
    #[instrument(
485
        err,
486
        level = Level::ERROR,
487
        skip_all,
488
        fields(request = ?grpc_request.get_ref())
489
    )]
490
    async fn execute(
491
        &self,
492
        grpc_request: Request<ExecuteRequest>,
493
    ) -> Result<Response<ExecuteStream>, Status> {
494
        let request = grpc_request.into_inner();
495
496
        let digest_function = request.digest_function;
497
        let result = self
498
            .inner_execute(request)
499
            .instrument(error_span!("execution_server_execute"))
500
            .with_context(
501
                make_ctx_for_hash_func(digest_function)
502
                    .err_tip(|| "In ExecutionServer::execute")?,
503
            )
504
            .await
505
            .err_tip(|| "Failed on execute() command")?;
506
507
        match result {
508
            ExecuteOutcome::Stream(stream) => Ok(Response::new(Box::pin(stream))),
509
            ExecuteOutcome::Reject(status) => Err(status),
510
        }
511
    }
512
513
    #[instrument(
514
        err,
515
        level = Level::ERROR,
516
        skip_all,
517
        fields(request = ?grpc_request.get_ref())
518
    )]
519
    async fn wait_execution(
520
        &self,
521
        grpc_request: Request<WaitExecutionRequest>,
522
    ) -> Result<Response<ExecuteStream>, Status> {
523
        let request = grpc_request.into_inner();
524
525
        let stream_result = self
526
            .inner_wait_execution(request)
527
            .await
528
            .err_tip(|| "Failed on wait_execution() command")
529
            .map_err(Into::into);
530
        let stream = match stream_result {
531
            Ok(stream) => stream,
532
            Err(e) => return Err(e),
533
        };
534
        debug!(return = "Ok(<stream>)");
535
        Ok(Response::new(Box::pin(stream)))
536
    }
537
}
538
539
#[tonic::async_trait]
540
impl Operations for ExecutionServer {
541
    async fn list_operations(
542
        &self,
543
        _request: Request<ListOperationsRequest>,
544
1
    ) -> Result<Response<ListOperationsResponse>, Status> {
545
        Err(Status::unimplemented("list_operations not implemented"))
546
1
    }
547
548
    async fn delete_operation(
549
        &self,
550
        _request: Request<DeleteOperationRequest>,
551
1
    ) -> Result<Response<()>, Status> {
552
        Err(Status::unimplemented("delete_operation not implemented"))
553
1
    }
554
555
    async fn cancel_operation(
556
        &self,
557
        _request: Request<CancelOperationRequest>,
558
1
    ) -> Result<Response<()>, Status> {
559
        Err(Status::unimplemented("cancel_operation not implemented"))
560
1
    }
561
562
    async fn get_operation(
563
        &self,
564
        request: Request<GetOperationRequest>,
565
2
    ) -> Result<Response<Operation>, Status> {
566
        let inner_request = request.into_inner();
567
568
        let mut stream = Box::pin(
569
            self.inner_wait_execution(WaitExecutionRequest {
570
                name: inner_request.name,
571
            })
572
            .await?,
573
        );
574
575
        let operation = stream
576
            .next()
577
            .await
578
0
            .ok_or_else(|| Status::not_found("Operation not found"))??;
579
580
        Ok(Response::new(operation))
581
2
    }
582
583
    async fn wait_operation(
584
        &self,
585
        request: Request<WaitOperationRequest>,
586
2
    ) -> Result<Response<Operation>, Status> {
587
        let inner_request = request.into_inner();
588
1
        let timeout_opt = inner_request.timeout.map(|d| {
589
1
            let secs = u64::try_from(d.seconds).unwrap_or(0);
590
1
            let nanos = u32::try_from(d.nanos).unwrap_or(0);
591
1
            Duration::new(secs, nanos)
592
1
        });
593
594
        let mut stream = Box::pin(
595
            self.inner_wait_execution(WaitExecutionRequest {
596
                name: inner_request.name,
597
            })
598
            .await?,
599
        );
600
601
        let mut last_operation = stream
602
            .next()
603
            .await
604
0
            .ok_or_else(|| Status::not_found("Operation not found"))??;
605
606
        if last_operation.done {
607
            return Ok(Response::new(last_operation));
608
        }
609
610
1
        let end_time = timeout_opt.map(|t| tokio::time::Instant::now() + t);
611
612
        loop {
613
            let next_fut = stream.next();
614
            let next_res = if let Some(end) = end_time {
615
                match tokio::time::timeout_at(end, next_fut).await {
616
                    Ok(res) => res,
617
                    Err(_) => break,
618
                }
619
            } else {
620
                next_fut.await
621
            };
622
623
            match next_res {
624
                Some(Ok(operation)) => {
625
                    let is_done = operation.done;
626
                    last_operation = operation;
627
                    if is_done {
628
                        break;
629
                    }
630
                }
631
                Some(Err(e)) => return Err(e),
632
                None => break,
633
            }
634
        }
635
636
        Ok(Response::new(last_operation))
637
2
    }
638
}
639
640
#[cfg(test)]
641
#[test]
642
1
fn test_nl_op_id_from_name() -> Result<(), Box<dyn core::error::Error>> {
643
1
    let examples = [("foo/bar", "foo"), ("a/b/c/d", "a/b/c")];
644
645
2
    for (input, expected) in 
examples1
{
646
2
        let id = NativelinkOperationId::from_name(input)
?0
;
647
2
        assert_eq!(id.instance_name, expected);
648
    }
649
650
1
    Ok(())
651
1
}