Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/src/bin/nativelink.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::net::SocketAddr;
16
use core::time::Duration;
17
use std::collections::{HashMap, HashSet};
18
use std::sync::Arc;
19
20
use async_lock::Mutex as AsyncMutex;
21
use axum::Router;
22
use clap::Parser;
23
use futures::FutureExt;
24
use futures::future::{BoxFuture, Either, OptionFuture, TryFutureExt, try_join_all};
25
use hyper::StatusCode;
26
use hyper_util::rt::tokio::TokioIo;
27
use hyper_util::server::conn::auto;
28
use hyper_util::service::TowerToHyperService;
29
use mimalloc::MiMalloc;
30
use nativelink_config::cas_server::{
31
    CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, SchedulerConfig,
32
    ServerConfig, StoreConfig, WorkerConfig,
33
};
34
use nativelink_config::stores::ConfigDigestHashFunction;
35
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
36
use nativelink_scheduler::default_scheduler_factory::scheduler_factory;
37
use nativelink_service::ac_server::AcServer;
38
use nativelink_service::bep_server::BepServer;
39
use nativelink_service::bytestream_server::ByteStreamServer;
40
use nativelink_service::capabilities_server::CapabilitiesServer;
41
use nativelink_service::cas_server::CasServer;
42
use nativelink_service::execution_server::ExecutionServer;
43
use nativelink_service::fetch_server::FetchServer;
44
use nativelink_service::health_server::HealthServer;
45
use nativelink_service::push_server::PushServer;
46
use nativelink_service::worker_api_server::WorkerApiServer;
47
use nativelink_store::default_store_factory::store_factory;
48
use nativelink_store::store_manager::StoreManager;
49
use nativelink_util::common::fs::set_open_file_limit;
50
use nativelink_util::digest_hasher::{DigestHasherFunc, set_default_digest_hasher_func};
51
use nativelink_util::health_utils::HealthRegistryBuilder;
52
use nativelink_util::origin_event_publisher::OriginEventPublisher;
53
#[cfg(target_family = "unix")]
54
use nativelink_util::shutdown_guard::Priority;
55
use nativelink_util::shutdown_guard::ShutdownGuard;
56
use nativelink_util::store_trait::{
57
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG, set_default_digest_size_health_check,
58
};
59
use nativelink_util::task::TaskExecutor;
60
use nativelink_util::telemetry::init_tracing;
61
use nativelink_util::{background_spawn, fs, spawn};
62
use nativelink_worker::local_worker::new_local_worker;
63
use rustls_pemfile::{certs as extract_certs, crls as extract_crls};
64
use tokio::net::TcpListener;
65
use tokio::select;
66
#[cfg(target_family = "unix")]
67
use tokio::signal::unix::{SignalKind, signal};
68
use tokio::sync::{broadcast, mpsc};
69
use tokio_rustls::TlsAcceptor;
70
use tokio_rustls::rustls::pki_types::CertificateDer;
71
use tokio_rustls::rustls::server::WebPkiClientVerifier;
72
use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig};
73
use tonic::codec::CompressionEncoding;
74
use tonic::service::Routes;
75
#[cfg(target_family = "unix")]
76
use tracing::warn;
77
use tracing::{error, error_span, info, trace_span};
78
79
#[global_allocator]
80
static GLOBAL: MiMalloc = MiMalloc;
81
82
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
83
const DEFAULT_ADMIN_API_PATH: &str = "/admin";
84
85
// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
86
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";
87
88
// Note: This must be kept in sync with the documentation in
89
// `OriginEventsConfig::max_event_queue_size`.
90
const DEFAULT_MAX_QUEUE_EVENTS: usize = 0x0001_0000;
91
92
/// Broadcast Channel Capacity
93
/// Note: The actual capacity may be greater than the provided capacity.
94
const BROADCAST_CAPACITY: usize = 1;
95
96
/// Backend for bazel remote execution / cache API.
97
#[derive(Parser, Debug)]
98
#[clap(
99
    author = "Trace Machina, Inc. <nativelink@tracemachina.com>",
100
    version,
101
    about,
102
    long_about = None
103
)]
104
struct Args {
105
    /// Config file to use.
106
    #[clap(value_parser)]
107
    config_file: String,
108
}
109
110
trait RoutesExt {
111
    fn add_optional_service<S>(self, svc: Option<S>) -> Self
112
    where
113
        S: tower::Service<
114
                axum::http::Request<tonic::body::Body>,
115
                Error = core::convert::Infallible,
116
            > + tonic::server::NamedService
117
            + Clone
118
            + Send
119
            + Sync
120
            + 'static,
121
        S::Response: axum::response::IntoResponse,
122
        S::Future: Send + 'static;
123
}
124
125
impl RoutesExt for Routes {
126
0
    fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
127
0
    where
128
0
        S: tower::Service<
129
0
                axum::http::Request<tonic::body::Body>,
130
0
                Error = core::convert::Infallible,
131
0
            > + tonic::server::NamedService
132
0
            + Clone
133
0
            + Send
134
0
            + Sync
135
0
            + 'static,
136
0
        S::Response: axum::response::IntoResponse,
137
0
        S::Future: Send + 'static,
138
    {
139
0
        if let Some(svc) = svc {
  Branch (139:16): [Folded - Ignored]
140
0
            self = self.add_service(svc);
141
0
        }
142
0
        self
143
0
    }
144
}
145
146
0
async fn inner_main(
147
0
    cfg: CasConfig,
148
0
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
149
0
) -> Result<(), Error> {
150
0
    const fn into_encoding(from: HttpCompressionAlgorithm) -> Option<CompressionEncoding> {
151
0
        match from {
152
0
            HttpCompressionAlgorithm::Gzip => Some(CompressionEncoding::Gzip),
153
0
            HttpCompressionAlgorithm::None => None,
154
        }
155
0
    }
156
157
0
    let health_registry_builder =
158
0
        Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink")));
159
160
0
    let store_manager = Arc::new(StoreManager::new());
161
    {
162
0
        let mut health_registry_lock = health_registry_builder.lock().await;
163
164
0
        for StoreConfig { name, spec } in cfg.stores {
165
0
            let health_component_name = format!("stores/{name}");
166
0
            let mut health_register_store =
167
0
                health_registry_lock.sub_builder(&health_component_name);
168
0
            let store = store_factory(&spec, &store_manager, Some(&mut health_register_store))
169
0
                .await
170
0
                .err_tip(|| format!("Failed to create store '{name}'"))?;
171
0
            store_manager.add_store(&name, store);
172
        }
173
    }
174
175
0
    let mut root_futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
176
177
0
    let maybe_origin_event_tx = cfg
178
0
        .experimental_origin_events
179
0
        .as_ref()
180
0
        .map(|origin_events_cfg| {
181
0
            let mut max_queued_events = origin_events_cfg.max_event_queue_size;
182
0
            if max_queued_events == 0 {
  Branch (182:16): [Folded - Ignored]
183
0
                max_queued_events = DEFAULT_MAX_QUEUE_EVENTS;
184
0
            }
185
0
            let (tx, rx) = mpsc::channel(max_queued_events);
186
0
            let store_name = origin_events_cfg.publisher.store.as_str();
187
0
            let store = store_manager.get_store(store_name).err_tip(|| {
188
0
                format!("Could not get store {store_name} for origin event publisher")
189
0
            })?;
190
191
0
            root_futures.push(Box::pin(
192
0
                OriginEventPublisher::new(store, rx, shutdown_tx.clone())
193
0
                    .run()
194
0
                    .map(Ok),
195
0
            ));
196
197
0
            Ok::<_, Error>(tx)
198
0
        })
199
0
        .transpose()?;
200
201
0
    let mut action_schedulers = HashMap::new();
202
0
    let mut worker_schedulers = HashMap::new();
203
0
    for SchedulerConfig { name, spec } in cfg.schedulers.iter().flatten() {
204
0
        let (maybe_action_scheduler, maybe_worker_scheduler) =
205
0
            scheduler_factory(spec, &store_manager, maybe_origin_event_tx.as_ref())
206
0
                .err_tip(|| format!("Failed to create scheduler '{name}'"))?;
207
0
        if let Some(action_scheduler) = maybe_action_scheduler {
  Branch (207:16): [Folded - Ignored]
208
0
            action_schedulers.insert(name.clone(), action_scheduler.clone());
209
0
        }
210
0
        if let Some(worker_scheduler) = maybe_worker_scheduler {
  Branch (210:16): [Folded - Ignored]
211
0
            worker_schedulers.insert(name.clone(), worker_scheduler.clone());
212
0
        }
213
    }
214
215
0
    let server_cfgs: Vec<ServerConfig> = cfg.servers.into_iter().collect();
216
217
0
    for server_cfg in server_cfgs {
218
0
        let services = server_cfg
219
0
            .services
220
0
            .err_tip(|| "'services' must be configured")?;
221
222
        // Currently we only support http as our socket type.
223
0
        let ListenerConfig::Http(http_config) = server_cfg.listener;
224
225
0
        let tonic_services = Routes::builder()
226
0
            .routes()
227
0
            .add_optional_service(
228
0
                services
229
0
                    .ac
230
0
                    .map_or(Ok(None), |cfg| {
231
0
                        AcServer::new(&cfg, &store_manager).map(|v| {
232
0
                            let mut service = v.into_service();
233
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
234
0
                            if let Some(encoding) =
  Branch (234:36): [Folded - Ignored]
235
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
236
0
                            {
237
0
                                service = service.send_compressed(encoding);
238
0
                            }
239
0
                            for encoding in http_config
240
0
                                .compression
241
0
                                .accepted_compression_algorithms
242
0
                                .iter()
243
                                // Filter None values.
244
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
245
0
                            {
246
0
                                service = service.accept_compressed(encoding);
247
0
                            }
248
0
                            Some(service)
249
0
                        })
250
0
                    })
251
0
                    .err_tip(|| "Could not create AC service")?,
252
            )
253
0
            .add_optional_service(
254
0
                services
255
0
                    .cas
256
0
                    .map_or(Ok(None), |cfg| {
257
0
                        CasServer::new(&cfg, &store_manager).map(|v| {
258
0
                            let mut service = v.into_service();
259
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
260
0
                            if let Some(encoding) =
  Branch (260:36): [Folded - Ignored]
261
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
262
0
                            {
263
0
                                service = service.send_compressed(encoding);
264
0
                            }
265
0
                            for encoding in http_config
266
0
                                .compression
267
0
                                .accepted_compression_algorithms
268
0
                                .iter()
269
                                // Filter None values.
270
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
271
0
                            {
272
0
                                service = service.accept_compressed(encoding);
273
0
                            }
274
0
                            Some(service)
275
0
                        })
276
0
                    })
277
0
                    .err_tip(|| "Could not create CAS service")?,
278
            )
279
0
            .add_optional_service(
280
0
                services
281
0
                    .execution
282
0
                    .map_or(Ok(None), |cfg| {
283
0
                        ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| {
284
0
                            let mut service = v.into_service();
285
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
286
0
                            if let Some(encoding) =
  Branch (286:36): [Folded - Ignored]
287
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
288
0
                            {
289
0
                                service = service.send_compressed(encoding);
290
0
                            }
291
0
                            for encoding in http_config
292
0
                                .compression
293
0
                                .accepted_compression_algorithms
294
0
                                .iter()
295
                                // Filter None values.
296
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
297
0
                            {
298
0
                                service = service.accept_compressed(encoding);
299
0
                            }
300
0
                            Some(service)
301
0
                        })
302
0
                    })
303
0
                    .err_tip(|| "Could not create Execution service")?,
304
            )
305
0
            .add_optional_service(
306
0
                services
307
0
                    .fetch
308
0
                    .map_or(Ok(None), |cfg| {
309
0
                        FetchServer::new(&cfg, &store_manager).map(|v| {
310
0
                            let mut service = v.into_service();
311
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
312
0
                            if let Some(encoding) =
  Branch (312:36): [Folded - Ignored]
313
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
314
0
                            {
315
0
                                service = service.send_compressed(encoding);
316
0
                            }
317
0
                            for encoding in http_config
318
0
                                .compression
319
0
                                .accepted_compression_algorithms
320
0
                                .iter()
321
                                // Filter None values.
322
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
323
0
                            {
324
0
                                service = service.accept_compressed(encoding);
325
0
                            }
326
0
                            Some(service)
327
0
                        })
328
0
                    })
329
0
                    .err_tip(|| "Could not create Fetch service")?,
330
            )
331
0
            .add_optional_service(
332
0
                services
333
0
                    .push
334
0
                    .map_or(Ok(None), |cfg| {
335
0
                        PushServer::new(&cfg, &store_manager).map(|v| {
336
0
                            let mut service = v.into_service();
337
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
338
0
                            if let Some(encoding) =
  Branch (338:36): [Folded - Ignored]
339
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
340
0
                            {
341
0
                                service = service.send_compressed(encoding);
342
0
                            }
343
0
                            for encoding in http_config
344
0
                                .compression
345
0
                                .accepted_compression_algorithms
346
0
                                .iter()
347
                                // Filter None values.
348
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
349
0
                            {
350
0
                                service = service.accept_compressed(encoding);
351
0
                            }
352
0
                            Some(service)
353
0
                        })
354
0
                    })
355
0
                    .err_tip(|| "Could not create Push service")?,
356
            )
357
0
            .add_optional_service(
358
0
                services
359
0
                    .bytestream
360
0
                    .map_or(Ok(None), |cfg| {
361
0
                        ByteStreamServer::new(&cfg, &store_manager).map(|v| {
362
0
                            let mut service = v.into_service();
363
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
364
0
                            if let Some(encoding) =
  Branch (364:36): [Folded - Ignored]
365
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
366
0
                            {
367
0
                                service = service.send_compressed(encoding);
368
0
                            }
369
0
                            for encoding in http_config
370
0
                                .compression
371
0
                                .accepted_compression_algorithms
372
0
                                .iter()
373
                                // Filter None values.
374
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
375
0
                            {
376
0
                                service = service.accept_compressed(encoding);
377
0
                            }
378
0
                            Some(service)
379
0
                        })
380
0
                    })
381
0
                    .err_tip(|| "Could not create ByteStream service")?,
382
            )
383
0
            .add_optional_service(
384
0
                OptionFuture::from(
385
0
                    services
386
0
                        .capabilities
387
0
                        .as_ref()
388
                        // Borrow checker fighting here...
389
0
                        .map(|_| {
390
0
                            CapabilitiesServer::new(
391
0
                                services.capabilities.as_ref().unwrap(),
392
0
                                &action_schedulers,
393
                            )
394
0
                        }),
395
                )
396
0
                .await
397
0
                .map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
398
0
                    Ok(Some(server?))
399
0
                })
400
0
                .err_tip(|| "Could not create Capabilities service")?
401
0
                .map(|v| {
402
0
                    let mut service = v.into_service();
403
0
                    let send_algo = &http_config.compression.send_compression_algorithm;
404
0
                    if let Some(encoding) =
  Branch (404:28): [Folded - Ignored]
405
0
                        into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
406
0
                    {
407
0
                        service = service.send_compressed(encoding);
408
0
                    }
409
0
                    for encoding in http_config
410
0
                        .compression
411
0
                        .accepted_compression_algorithms
412
0
                        .iter()
413
                        // Filter None values.
414
0
                        .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
415
0
                    {
416
0
                        service = service.accept_compressed(encoding);
417
0
                    }
418
0
                    service
419
0
                }),
420
            )
421
0
            .add_optional_service(
422
0
                services
423
0
                    .worker_api
424
0
                    .map_or(Ok(None), |cfg| {
425
0
                        WorkerApiServer::new(&cfg, &worker_schedulers).map(|v| {
426
0
                            let mut service = v.into_service();
427
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
428
0
                            if let Some(encoding) =
  Branch (428:36): [Folded - Ignored]
429
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
430
0
                            {
431
0
                                service = service.send_compressed(encoding);
432
0
                            }
433
0
                            for encoding in http_config
434
0
                                .compression
435
0
                                .accepted_compression_algorithms
436
0
                                .iter()
437
                                // Filter None values.
438
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
439
0
                            {
440
0
                                service = service.accept_compressed(encoding);
441
0
                            }
442
0
                            Some(service)
443
0
                        })
444
0
                    })
445
0
                    .err_tip(|| "Could not create WorkerApi service")?,
446
            )
447
0
            .add_optional_service(
448
0
                services
449
0
                    .experimental_bep
450
0
                    .map_or(Ok(None), |cfg| {
451
0
                        BepServer::new(&cfg, &store_manager).map(|v| {
452
0
                            let mut service = v.into_service();
453
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
454
0
                            if let Some(encoding) =
  Branch (454:36): [Folded - Ignored]
455
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None))
456
0
                            {
457
0
                                service = service.send_compressed(encoding);
458
0
                            }
459
0
                            for encoding in http_config
460
0
                                .compression
461
0
                                .accepted_compression_algorithms
462
0
                                .iter()
463
                                // Filter None values.
464
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
465
0
                            {
466
0
                                service = service.accept_compressed(encoding);
467
0
                            }
468
0
                            Some(service)
469
0
                        })
470
0
                    })
471
0
                    .err_tip(|| "Could not create BEP service")?,
472
            );
473
474
0
        let health_registry = health_registry_builder.lock().await.build();
475
476
0
        let mut svc =
477
0
            tonic_services
478
0
                .into_axum_router()
479
0
                .layer(nativelink_util::telemetry::OtlpLayer::new(
480
0
                    server_cfg.experimental_identity_header.required,
481
                ));
482
483
0
        if let Some(health_cfg) = services.health {
  Branch (483:16): [Folded - Ignored]
484
0
            let path = if health_cfg.path.is_empty() {
  Branch (484:27): [Folded - Ignored]
485
0
                DEFAULT_HEALTH_STATUS_CHECK_PATH
486
            } else {
487
0
                &health_cfg.path
488
            };
489
0
            svc = svc.route_service(path, HealthServer::new(health_registry));
490
0
        }
491
492
0
        if let Some(admin_config) = services.admin {
  Branch (492:16): [Folded - Ignored]
493
0
            let path = if admin_config.path.is_empty() {
  Branch (493:27): [Folded - Ignored]
494
0
                DEFAULT_ADMIN_API_PATH
495
            } else {
496
0
                &admin_config.path
497
            };
498
0
            let worker_schedulers = Arc::new(worker_schedulers.clone());
499
0
            svc = svc.nest_service(
500
0
                path,
501
0
                Router::new().route(
502
0
                    "/scheduler/{instance_name}/set_drain_worker/{worker_id}/{is_draining}",
503
0
                    axum::routing::post(
504
0
                        move |params: axum::extract::Path<(String, String, String)>| async move {
505
0
                            let (instance_name, worker_id, is_draining) = params.0;
506
0
                            (async move {
507
0
                                let is_draining = match is_draining.as_str() {
508
0
                                    "0" => false,
509
0
                                    "1" => true,
510
                                    _ => {
511
0
                                        return Err(make_err!(
512
0
                                            Code::Internal,
513
0
                                            "{} is neither 0 nor 1",
514
0
                                            is_draining
515
0
                                        ));
516
                                    }
517
                                };
518
0
                                worker_schedulers
519
0
                                    .get(&instance_name)
520
0
                                    .err_tip(|| {
521
0
                                        format!(
522
0
                                            "Can not get an instance with the name of '{}'",
523
0
                                            &instance_name
524
                                        )
525
0
                                    })?
526
0
                                    .clone()
527
0
                                    .set_drain_worker(&worker_id.clone().into(), is_draining)
528
0
                                    .await?;
529
0
                                Ok::<_, Error>(format!("Draining worker {worker_id}"))
530
                            })
531
0
                            .await
532
0
                            .map_err(|e| {
533
0
                                Err::<String, _>((
534
0
                                    StatusCode::INTERNAL_SERVER_ERROR,
535
0
                                    format!("Error: {e:?}"),
536
0
                                ))
537
0
                            })
538
0
                        },
539
                    ),
540
                ),
541
            );
542
0
        }
543
544
0
        svc = svc
545
            // This is the default service that executes if no other endpoint matches.
546
0
            .fallback((StatusCode::NOT_FOUND, "Not Found"));
547
548
        // Configure our TLS acceptor if we have TLS configured.
549
0
        let maybe_tls_acceptor = http_config.tls.map_or(Ok(None), |tls_config| {
550
0
            fn read_cert(cert_file: &str) -> Result<Vec<CertificateDer<'static>>, Error> {
551
0
                let mut cert_reader = std::io::BufReader::new(
552
0
                    std::fs::File::open(cert_file)
553
0
                        .err_tip(|| format!("Could not open cert file {cert_file}"))?,
554
                );
555
0
                let certs = extract_certs(&mut cert_reader)
556
0
                    .collect::<Result<Vec<CertificateDer<'_>>, _>>()
557
0
                    .err_tip(|| format!("Could not extract certs from file {cert_file}"))?;
558
0
                Ok(certs)
559
0
            }
560
0
            let certs = read_cert(&tls_config.cert_file)?;
561
0
            let mut key_reader = std::io::BufReader::new(
562
0
                std::fs::File::open(&tls_config.key_file)
563
0
                    .err_tip(|| format!("Could not open key file {}", tls_config.key_file))?,
564
            );
565
0
            let key = match rustls_pemfile::read_one(&mut key_reader)
566
0
                .err_tip(|| format!("Could not extract key(s) from file {}", tls_config.key_file))?
567
            {
568
0
                Some(rustls_pemfile::Item::Pkcs8Key(key)) => key.into(),
569
0
                Some(rustls_pemfile::Item::Sec1Key(key)) => key.into(),
570
0
                Some(rustls_pemfile::Item::Pkcs1Key(key)) => key.into(),
571
                _ => {
572
0
                    return Err(make_err!(
573
0
                        Code::Internal,
574
0
                        "No keys found in file {}",
575
0
                        tls_config.key_file
576
0
                    ));
577
                }
578
            };
579
0
            if let Ok(Some(_)) = rustls_pemfile::read_one(&mut key_reader) {
  Branch (579:20): [Folded - Ignored]
580
0
                return Err(make_err!(
581
0
                    Code::InvalidArgument,
582
0
                    "Expected 1 key in file {}",
583
0
                    tls_config.key_file
584
0
                ));
585
0
            }
586
0
            let verifier = if let Some(client_ca_file) = &tls_config.client_ca_file {
  Branch (586:35): [Folded - Ignored]
587
0
                let mut client_auth_roots = RootCertStore::empty();
588
0
                for cert in read_cert(client_ca_file)? {
589
0
                    client_auth_roots.add(cert).map_err(|e| {
590
0
                        make_err!(Code::Internal, "Could not read client CA: {e:?}")
591
0
                    })?;
592
                }
593
0
                let crls = if let Some(client_crl_file) = &tls_config.client_crl_file {
  Branch (593:35): [Folded - Ignored]
594
0
                    let mut crl_reader = std::io::BufReader::new(
595
0
                        std::fs::File::open(client_crl_file)
596
0
                            .err_tip(|| format!("Could not open CRL file {client_crl_file}"))?,
597
                    );
598
0
                    extract_crls(&mut crl_reader)
599
0
                        .collect::<Result<_, _>>()
600
0
                        .err_tip(|| format!("Could not extract CRLs from file {client_crl_file}"))?
601
                } else {
602
0
                    Vec::new()
603
                };
604
0
                WebPkiClientVerifier::builder(Arc::new(client_auth_roots))
605
0
                    .with_crls(crls)
606
0
                    .build()
607
0
                    .map_err(|e| {
608
0
                        make_err!(
609
0
                            Code::Internal,
610
                            "Could not create WebPkiClientVerifier: {e:?}"
611
                        )
612
0
                    })?
613
            } else {
614
0
                WebPkiClientVerifier::no_client_auth()
615
            };
616
0
            let mut config = TlsServerConfig::builder()
617
0
                .with_client_cert_verifier(verifier)
618
0
                .with_single_cert(certs, key)
619
0
                .map_err(|e| {
620
0
                    make_err!(Code::Internal, "Could not create TlsServerConfig : {e:?}")
621
0
                })?;
622
623
0
            config.alpn_protocols.push("h2".into());
624
0
            Ok(Some(TlsAcceptor::from(Arc::new(config))))
625
0
        })?;
626
627
0
        let socket_addr = http_config
628
0
            .socket_address
629
0
            .parse::<SocketAddr>()
630
0
            .map_err(|e| {
631
0
                make_input_err!("Invalid address '{}' - {e:?}", http_config.socket_address)
632
0
            })?;
633
0
        let tcp_listener = TcpListener::bind(&socket_addr).await?;
634
0
        let mut http = auto::Builder::new(TaskExecutor::default());
635
636
0
        let http_config = &http_config.advanced_http;
637
0
        if let Some(value) = http_config.http2_keep_alive_interval {
  Branch (637:16): [Folded - Ignored]
638
0
            http.http2()
639
0
                .keep_alive_interval(Duration::from_secs(u64::from(value)));
640
0
        }
641
642
0
        if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams {
  Branch (642:16): [Folded - Ignored]
643
0
            http.http2()
644
0
                .max_pending_accept_reset_streams(usize::try_from(value).err_tip(
645
                    || "Could not convert experimental_http2_max_pending_accept_reset_streams",
646
0
                )?);
647
0
        }
648
0
        if let Some(value) = http_config.experimental_http2_initial_stream_window_size {
  Branch (648:16): [Folded - Ignored]
649
0
            http.http2().initial_stream_window_size(value);
650
0
        }
651
0
        if let Some(value) = http_config.experimental_http2_initial_connection_window_size {
  Branch (651:16): [Folded - Ignored]
652
0
            http.http2().initial_connection_window_size(value);
653
0
        }
654
0
        if let Some(value) = http_config.experimental_http2_adaptive_window {
  Branch (654:16): [Folded - Ignored]
655
0
            http.http2().adaptive_window(value);
656
0
        }
657
0
        if let Some(value) = http_config.experimental_http2_max_frame_size {
  Branch (657:16): [Folded - Ignored]
658
0
            http.http2().max_frame_size(value);
659
0
        }
660
0
        if let Some(value) = http_config.experimental_http2_max_concurrent_streams {
  Branch (660:16): [Folded - Ignored]
661
0
            http.http2().max_concurrent_streams(value);
662
0
        }
663
0
        if let Some(value) = http_config.experimental_http2_keep_alive_timeout {
  Branch (663:16): [Folded - Ignored]
664
0
            http.http2()
665
0
                .keep_alive_timeout(Duration::from_secs(u64::from(value)));
666
0
        }
667
0
        if let Some(value) = http_config.experimental_http2_max_send_buf_size {
  Branch (667:16): [Folded - Ignored]
668
0
            http.http2().max_send_buf_size(
669
0
                usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?,
670
            );
671
0
        }
672
0
        if http_config.experimental_http2_enable_connect_protocol == Some(true) {
  Branch (672:12): [Folded - Ignored]
673
0
            http.http2().enable_connect_protocol();
674
0
        }
675
0
        if let Some(value) = http_config.experimental_http2_max_header_list_size {
  Branch (675:16): [Folded - Ignored]
676
0
            http.http2().max_header_list_size(value);
677
0
        }
678
0
        info!("Ready, listening on {socket_addr}",);
679
0
        root_futures.push(Box::pin(async move {
680
            loop {
681
0
                select! {
682
0
                    accept_result = tcp_listener.accept() => {
683
0
                        match accept_result {
684
0
                            Ok((tcp_stream, remote_addr)) => {
685
0
                                info!(
686
                                    target: "nativelink::services",
687
                                    ?remote_addr,
688
                                    ?socket_addr,
689
0
                                    "Client connected"
690
                                );
691
692
0
                                let (http, svc, maybe_tls_acceptor) =
693
0
                                    (http.clone(), svc.clone(), maybe_tls_acceptor.clone());
694
695
0
                                background_spawn!(
696
                                    name: "http_connection",
697
0
                                    fut: error_span!(
698
                                        "http_connection",
699
                                        remote_addr = %remote_addr,
700
                                        socket_addr = %socket_addr,
701
0
                                    ).in_scope(|| async move {
702
0
                                        let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor {
  Branch (702:71): [Folded - Ignored]
703
0
                                            match tls_acceptor.accept(tcp_stream).await {
704
0
                                                Ok(tls_stream) => Either::Left(http.serve_connection(
705
0
                                                    TokioIo::new(tls_stream),
706
0
                                                    TowerToHyperService::new(svc),
707
0
                                                )),
708
0
                                                Err(err) => {
709
0
                                                    error!(?err, "Failed to accept tls stream");
710
0
                                                    return;
711
                                                }
712
                                            }
713
                                        } else {
714
0
                                            Either::Right(http.serve_connection(
715
0
                                                TokioIo::new(tcp_stream),
716
0
                                                TowerToHyperService::new(svc),
717
0
                                            ))
718
                                        };
719
720
0
                                        if let Err(err) = serve_connection.await {
  Branch (720:48): [Folded - Ignored]
721
0
                                            error!(
722
                                                target: "nativelink::services",
723
                                                ?err,
724
0
                                                "Failed running service"
725
                                            );
726
0
                                        }
727
0
                                    }),
728
                                    target: "nativelink::services",
729
                                    ?remote_addr,
730
                                    ?socket_addr,
731
                                );
732
                            },
733
0
                            Err(err) => {
734
0
                                error!(?err, "Failed to accept tcp connection");
735
                            }
736
                        }
737
                    },
738
                }
739
            }
740
            // Unreachable
741
        }));
742
    }
743
744
    {
745
        // We start workers after our TcpListener is setup so if our worker connects to one
746
        // of these services it will be able to connect.
747
0
        let worker_cfgs = cfg.workers.unwrap_or_default();
748
0
        let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
749
0
        for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
750
0
            let spawn_fut = match worker_cfg {
751
0
                WorkerConfig::Local(local_worker_cfg) => {
752
0
                    let fast_slow_store = store_manager
753
0
                        .get_store(&local_worker_cfg.cas_fast_slow_store)
754
0
                        .err_tip(|| {
755
0
                            format!(
756
0
                                "Failed to find store for cas_store_ref in worker config : {}",
757
                                local_worker_cfg.cas_fast_slow_store
758
                            )
759
0
                        })?;
760
761
0
                    let maybe_ac_store = if let Some(ac_store_ref) =
  Branch (761:49): [Folded - Ignored]
762
0
                        &local_worker_cfg.upload_action_result.ac_store
763
                    {
764
0
                        Some(store_manager.get_store(ac_store_ref).err_tip(|| {
765
0
                            format!("Failed to find store for ac_store in worker config : {ac_store_ref}")
766
0
                        })?)
767
                    } else {
768
0
                        None
769
                    };
770
                    // Note: Defaults to fast_slow_store if not specified. If this ever changes it must
771
                    // be updated in config documentation for the `historical_results_store` the field.
772
0
                    let historical_store = if let Some(cas_store_ref) = &local_worker_cfg
  Branch (772:51): [Folded - Ignored]
773
0
                        .upload_action_result
774
0
                        .historical_results_store
775
                    {
776
0
                        store_manager.get_store(cas_store_ref).err_tip(|| {
777
0
                                format!(
778
0
                                "Failed to find store for historical_results_store in worker config : {cas_store_ref}"
779
                            )
780
0
                            })?
781
                    } else {
782
0
                        fast_slow_store.clone()
783
                    };
784
0
                    let local_worker = new_local_worker(
785
0
                        Arc::new(local_worker_cfg),
786
0
                        fast_slow_store,
787
0
                        maybe_ac_store,
788
0
                        historical_store,
789
0
                    )
790
0
                    .await
791
0
                    .err_tip(|| "Could not make LocalWorker")?;
792
793
0
                    let name = if local_worker.name().is_empty() {
  Branch (793:35): [Folded - Ignored]
794
0
                        format!("worker_{i}")
795
                    } else {
796
0
                        local_worker.name().clone()
797
                    };
798
799
0
                    if worker_names.contains(&name) {
  Branch (799:24): [Folded - Ignored]
800
0
                        Err(make_input_err!(
801
0
                            "Duplicate worker name '{}' found in config",
802
0
                            name
803
0
                        ))?;
804
0
                    }
805
0
                    worker_names.insert(name.clone());
806
0
                    let shutdown_rx = shutdown_tx.subscribe();
807
0
                    let fut = trace_span!("worker_ctx", worker_name = %name)
808
0
                        .in_scope(|| local_worker.run(shutdown_rx));
809
0
                    spawn!("worker", fut, ?name)
810
                }
811
            };
812
0
            root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
813
        }
814
    }
815
816
0
    if let Err(e) = try_join_all(root_futures).await {
  Branch (816:12): [Folded - Ignored]
817
0
        panic!("{e:?}");
818
0
    }
819
820
0
    Ok(())
821
0
}
822
823
0
fn get_config() -> Result<CasConfig, Box<dyn core::error::Error>> {
824
0
    let args = Args::parse();
825
0
    let json_contents = String::from_utf8(
826
0
        std::fs::read(&args.config_file)
827
0
            .err_tip(|| format!("Could not open config file {}", args.config_file))?,
828
0
    )?;
829
0
    Ok(serde_json5::from_str(&json_contents)?)
830
0
}
831
832
0
fn main() -> Result<(), Box<dyn core::error::Error>> {
833
0
    let mut cfg = get_config()?;
834
835
0
    let global_cfg = if let Some(global_cfg) = &mut cfg.global {
  Branch (835:29): [Folded - Ignored]
836
0
        if global_cfg.max_open_files == 0 {
  Branch (836:12): [Folded - Ignored]
837
0
            global_cfg.max_open_files = fs::DEFAULT_OPEN_FILE_LIMIT;
838
0
        }
839
0
        if global_cfg.default_digest_size_health_check == 0 {
  Branch (839:12): [Folded - Ignored]
840
0
            global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
841
0
        }
842
843
0
        *global_cfg
844
    } else {
845
0
        GlobalConfig {
846
0
            max_open_files: fs::DEFAULT_OPEN_FILE_LIMIT,
847
0
            default_digest_hash_function: None,
848
0
            default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
849
0
        }
850
    };
851
0
    set_open_file_limit(global_cfg.max_open_files);
852
0
    set_default_digest_hasher_func(DigestHasherFunc::from(
853
0
        global_cfg
854
0
            .default_digest_hash_function
855
0
            .unwrap_or(ConfigDigestHashFunction::Sha256),
856
0
    ))?;
857
0
    set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
858
859
    #[expect(clippy::disallowed_methods, reason = "starting main runtime")]
860
0
    let runtime = tokio::runtime::Builder::new_multi_thread()
861
0
        .enable_all()
862
0
        .build()?;
863
864
    // The OTLP exporters need to run in a Tokio context.
865
    #[expect(clippy::disallowed_methods, reason = "tracing init on main runtime")]
866
0
    runtime.block_on(async { tokio::spawn(async { init_tracing() }).await? })?;
867
868
    // Initiates the shutdown process by broadcasting the shutdown signal via the `oneshot::Sender` to all listeners.
869
    // Each listener will perform its cleanup and then drop its `oneshot::Sender`, signaling completion.
870
    // Once all `oneshot::Sender` instances are dropped, the worker knows it can safely terminate.
871
0
    let (shutdown_tx, _) = broadcast::channel::<ShutdownGuard>(BROADCAST_CAPACITY);
872
    #[cfg(target_family = "unix")]
873
0
    let shutdown_tx_clone = shutdown_tx.clone();
874
    #[cfg(target_family = "unix")]
875
0
    let mut shutdown_guard = ShutdownGuard::default();
876
877
    #[expect(clippy::disallowed_methods, reason = "signal handler on main runtime")]
878
0
    runtime.spawn(async move {
879
0
        tokio::signal::ctrl_c()
880
0
            .await
881
0
            .expect("Failed to listen to SIGINT");
882
0
        eprintln!("User terminated process via SIGINT");
883
0
        std::process::exit(130);
884
    });
885
886
    #[cfg(target_family = "unix")]
887
    #[expect(clippy::disallowed_methods, reason = "signal handler on main runtime")]
888
0
    runtime.spawn(async move {
889
0
        signal(SignalKind::terminate())
890
0
            .expect("Failed to listen to SIGTERM")
891
0
            .recv()
892
0
            .await;
893
0
        warn!("Process terminated via SIGTERM",);
894
0
        drop(shutdown_tx_clone.send(shutdown_guard.clone()));
895
0
        let () = shutdown_guard.wait_for(Priority::P0).await;
896
0
        warn!("Successfully shut down nativelink.",);
897
0
        std::process::exit(143);
898
    });
899
900
    #[expect(clippy::disallowed_methods, reason = "waiting on everything to finish")]
901
0
    runtime
902
0
        .block_on(async {
903
0
            trace_span!("main")
904
0
                .in_scope(|| async { inner_main(cfg, shutdown_tx).await })
905
0
                .await
906
0
        })
907
0
        .err_tip(|| "main() function failed")?;
908
0
    Ok(())
909
0
}