Coverage Report

Created: 2026-04-01 18:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/src/bin/nativelink.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::net::SocketAddr;
16
use core::time::Duration;
17
use std::collections::{HashMap, HashSet};
18
use std::sync::Arc;
19
20
use async_lock::Mutex as AsyncMutex;
21
use axum::Router;
22
use axum::http::Uri;
23
use clap::Parser;
24
use futures::FutureExt;
25
use futures::future::{BoxFuture, Either, OptionFuture, TryFutureExt, try_join_all};
26
use hyper::StatusCode;
27
use hyper_util::rt::tokio::TokioIo;
28
use hyper_util::server::conn::auto;
29
use hyper_util::service::TowerToHyperService;
30
use mimalloc::MiMalloc;
31
use nativelink_config::cas_server::{
32
    CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, SchedulerConfig,
33
    ServerConfig, StoreConfig, WorkerConfig,
34
};
35
use nativelink_config::stores::ConfigDigestHashFunction;
36
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
37
use nativelink_scheduler::default_scheduler_factory::scheduler_factory;
38
use nativelink_service::ac_server::AcServer;
39
use nativelink_service::bep_server::BepServer;
40
use nativelink_service::bytestream_server::ByteStreamServer;
41
use nativelink_service::capabilities_server::CapabilitiesServer;
42
use nativelink_service::cas_server::CasServer;
43
use nativelink_service::execution_server::ExecutionServer;
44
use nativelink_service::fetch_server::FetchServer;
45
use nativelink_service::health_server::HealthServer;
46
use nativelink_service::push_server::PushServer;
47
use nativelink_service::worker_api_server::WorkerApiServer;
48
use nativelink_store::default_store_factory::store_factory;
49
use nativelink_store::store_manager::StoreManager;
50
use nativelink_util::common::fs::set_open_file_limit;
51
use nativelink_util::digest_hasher::{DigestHasherFunc, set_default_digest_hasher_func};
52
use nativelink_util::health_utils::HealthRegistryBuilder;
53
use nativelink_util::origin_event_publisher::OriginEventPublisher;
54
#[cfg(target_family = "unix")]
55
use nativelink_util::shutdown_guard::Priority;
56
use nativelink_util::shutdown_guard::ShutdownGuard;
57
use nativelink_util::store_trait::{
58
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG, set_default_digest_size_health_check,
59
};
60
use nativelink_util::task::TaskExecutor;
61
use nativelink_util::telemetry::init_tracing;
62
use nativelink_util::{background_spawn, fs, spawn};
63
use nativelink_worker::local_worker::new_local_worker;
64
use rustls_pki_types::pem::PemObject;
65
use rustls_pki_types::{CertificateRevocationListDer, PrivateKeyDer};
66
use tokio::net::TcpListener;
67
use tokio::select;
68
#[cfg(target_family = "unix")]
69
use tokio::signal::unix::{SignalKind, signal};
70
use tokio::sync::oneshot::Sender;
71
use tokio::sync::{broadcast, mpsc, oneshot};
72
use tokio_rustls::TlsAcceptor;
73
use tokio_rustls::rustls::pki_types::CertificateDer;
74
use tokio_rustls::rustls::server::WebPkiClientVerifier;
75
use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig};
76
use tonic::codec::CompressionEncoding;
77
use tonic::service::Routes;
78
use tracing::{error, error_span, info, trace_span, warn};
79
80
#[global_allocator]
81
static GLOBAL: MiMalloc = MiMalloc;
82
83
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
84
const DEFAULT_ADMIN_API_PATH: &str = "/admin";
85
86
// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
87
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";
88
89
// Note: This must be kept in sync with the documentation in
90
// `OriginEventsConfig::max_event_queue_size`.
91
const DEFAULT_MAX_QUEUE_EVENTS: usize = 0x0001_0000;
92
93
/// Broadcast Channel Capacity
94
/// Note: The actual capacity may be greater than the provided capacity.
95
const BROADCAST_CAPACITY: usize = 1;
96
97
/// Backend for bazel remote execution / cache API.
98
#[derive(Parser, Debug)]
99
#[clap(
100
    author = "Trace Machina, Inc. <nativelink@tracemachina.com>",
101
    version,
102
    about,
103
    long_about = None
104
)]
105
struct Args {
106
    /// Config file to use.
107
    #[clap(value_parser)]
108
    config_file: String,
109
}
110
111
trait RoutesExt {
112
    fn add_optional_service<S>(self, svc: Option<S>) -> Self
113
    where
114
        S: tower::Service<
115
                axum::http::Request<tonic::body::Body>,
116
                Error = core::convert::Infallible,
117
            > + tonic::server::NamedService
118
            + Clone
119
            + Send
120
            + Sync
121
            + 'static,
122
        S::Response: axum::response::IntoResponse,
123
        S::Future: Send + 'static;
124
}
125
126
impl RoutesExt for Routes {
127
0
    fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
128
0
    where
129
0
        S: tower::Service<
130
0
                axum::http::Request<tonic::body::Body>,
131
0
                Error = core::convert::Infallible,
132
0
            > + tonic::server::NamedService
133
0
            + Clone
134
0
            + Send
135
0
            + Sync
136
0
            + 'static,
137
0
        S::Response: axum::response::IntoResponse,
138
0
        S::Future: Send + 'static,
139
    {
140
0
        if let Some(svc) = svc {
141
0
            self = self.add_service(svc);
142
0
        }
143
0
        self
144
0
    }
145
}
146
147
/// If this value changes update the documentation in the config definition.
148
const DEFAULT_MAX_DECODING_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
149
150
macro_rules! service_setup {
151
    ($service: expr, $http_config: ident) => {{
152
        let mut service = $service;
153
        let max_decoding_message_size = if $http_config.max_decoding_message_size == 0 {
154
            DEFAULT_MAX_DECODING_MESSAGE_SIZE
155
        } else {
156
            $http_config.max_decoding_message_size
157
        };
158
        service = service.max_decoding_message_size(max_decoding_message_size);
159
        let send_algo = &$http_config.compression.send_compression_algorithm;
160
        if let Some(encoding) = into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::None)) {
161
            service = service.send_compressed(encoding);
162
        }
163
        for encoding in $http_config
164
            .compression
165
            .accepted_compression_algorithms
166
            .iter()
167
            // Filter None values.
168
0
            .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
169
        {
170
            service = service.accept_compressed(encoding);
171
        }
172
        service
173
    }};
174
}
175
176
0
async fn inner_main(
177
0
    cfg: CasConfig,
178
0
    shutdown_tx: broadcast::Sender<ShutdownGuard>,
179
0
    scheduler_shutdown_tx: Sender<()>,
180
0
) -> Result<(), Error> {
181
0
    const fn into_encoding(from: HttpCompressionAlgorithm) -> Option<CompressionEncoding> {
182
0
        match from {
183
0
            HttpCompressionAlgorithm::Gzip => Some(CompressionEncoding::Gzip),
184
0
            HttpCompressionAlgorithm::None => None,
185
        }
186
0
    }
187
188
0
    let health_registry_builder =
189
0
        Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink")));
190
191
0
    let store_manager = Arc::new(StoreManager::new());
192
    {
193
0
        let mut health_registry_lock = health_registry_builder.lock().await;
194
195
0
        for StoreConfig { name, spec } in cfg.stores {
196
0
            let health_component_name = format!("stores/{name}");
197
0
            let mut health_register_store =
198
0
                health_registry_lock.sub_builder(&health_component_name);
199
0
            let store = store_factory(&spec, &store_manager, Some(&mut health_register_store))
200
0
                .await
201
0
                .err_tip(|| format!("Failed to create store '{name}'"))?;
202
0
            store_manager.add_store(&name, store);
203
        }
204
    }
205
206
0
    let mut root_futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
207
208
0
    let maybe_origin_event_tx = cfg
209
0
        .experimental_origin_events
210
0
        .as_ref()
211
0
        .map(|origin_events_cfg| {
212
0
            let mut max_queued_events = origin_events_cfg.max_event_queue_size;
213
0
            if max_queued_events == 0 {
214
0
                max_queued_events = DEFAULT_MAX_QUEUE_EVENTS;
215
0
            }
216
0
            let (tx, rx) = mpsc::channel(max_queued_events);
217
0
            let store_name = origin_events_cfg.publisher.store.as_str();
218
0
            let store = store_manager.get_store(store_name).err_tip(|| {
219
0
                format!("Could not get store {store_name} for origin event publisher")
220
0
            })?;
221
222
0
            root_futures.push(Box::pin(
223
0
                OriginEventPublisher::new(store, rx, shutdown_tx.clone())
224
0
                    .run()
225
0
                    .map(Ok),
226
0
            ));
227
228
0
            Ok::<_, Error>(tx)
229
0
        })
230
0
        .transpose()?;
231
232
0
    let mut action_schedulers = HashMap::new();
233
0
    let mut worker_schedulers = HashMap::new();
234
0
    for SchedulerConfig { name, spec } in cfg.schedulers.iter().flatten() {
235
0
        let (maybe_action_scheduler, maybe_worker_scheduler) =
236
0
            scheduler_factory(spec, &store_manager, maybe_origin_event_tx.as_ref())
237
0
                .await
238
0
                .err_tip(|| format!("Failed to create scheduler '{name}'"))?;
239
0
        if let Some(action_scheduler) = maybe_action_scheduler {
240
0
            action_schedulers.insert(name.clone(), action_scheduler.clone());
241
0
        }
242
0
        if let Some(worker_scheduler) = maybe_worker_scheduler {
243
0
            worker_schedulers.insert(name.clone(), worker_scheduler.clone());
244
0
        }
245
    }
246
247
0
    let server_cfgs: Vec<ServerConfig> = cfg.servers.into_iter().collect();
248
249
0
    for server_cfg in server_cfgs {
250
0
        let services = server_cfg
251
0
            .services
252
0
            .err_tip(|| "'services' must be configured")?;
253
254
        // Currently we only support http as our socket type.
255
0
        let ListenerConfig::Http(http_config) = server_cfg.listener;
256
257
0
        let execution_server = services
258
0
            .execution
259
0
            .as_ref()
260
0
            .map(|cfg| ExecutionServer::new(cfg, &action_schedulers, &store_manager))
261
0
            .transpose()
262
0
            .err_tip(|| "Could not create Execution service")?;
263
264
0
        let tonic_services = Routes::builder()
265
0
            .routes()
266
0
            .add_optional_service(
267
0
                services
268
0
                    .ac
269
0
                    .map_or(Ok(None), |cfg| {
270
0
                        AcServer::new(&cfg, &store_manager)
271
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
272
0
                    })
273
0
                    .err_tip(|| "Could not create AC service")?,
274
            )
275
0
            .add_optional_service(
276
0
                services
277
0
                    .cas
278
0
                    .map_or(Ok(None), |cfg| {
279
0
                        CasServer::new(&cfg, &store_manager)
280
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
281
0
                    })
282
0
                    .err_tip(|| "Could not create CAS service")?,
283
            )
284
0
            .add_optional_service(
285
0
                execution_server
286
0
                    .clone()
287
0
                    .map(|v| service_setup!(v.into_service(), http_config)),
288
            )
289
0
            .add_optional_service(
290
0
                execution_server.map(|v| service_setup!(v.into_operations_service(), http_config)),
291
            )
292
0
            .add_optional_service(
293
0
                services
294
0
                    .fetch
295
0
                    .map_or(Ok(None), |cfg| {
296
0
                        FetchServer::new(&cfg, &store_manager)
297
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
298
0
                    })
299
0
                    .err_tip(|| "Could not create Fetch service")?,
300
            )
301
0
            .add_optional_service(
302
0
                services
303
0
                    .push
304
0
                    .map_or(Ok(None), |cfg| {
305
0
                        PushServer::new(&cfg, &store_manager)
306
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
307
0
                    })
308
0
                    .err_tip(|| "Could not create Push service")?,
309
            )
310
0
            .add_optional_service(
311
0
                services
312
0
                    .bytestream
313
0
                    .map_or(Ok(None), |cfg| {
314
0
                        ByteStreamServer::new(&cfg, &store_manager)
315
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
316
0
                    })
317
0
                    .err_tip(|| "Could not create ByteStream service")?,
318
            )
319
0
            .add_optional_service(
320
0
                OptionFuture::from(
321
0
                    services
322
0
                        .capabilities
323
0
                        .as_ref()
324
0
                        .map(|cfg| CapabilitiesServer::new(cfg, &action_schedulers)),
325
                )
326
0
                .await
327
0
                .map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
328
0
                    Ok(Some(server?))
329
0
                })
330
0
                .err_tip(|| "Could not create Capabilities service")?
331
0
                .map(|v| service_setup!(v.into_service(), http_config)),
332
            )
333
0
            .add_optional_service(
334
0
                services
335
0
                    .worker_api
336
0
                    .map_or(Ok(None), |cfg| {
337
0
                        WorkerApiServer::new(&cfg, &worker_schedulers)
338
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
339
0
                    })
340
0
                    .err_tip(|| "Could not create WorkerApi service")?,
341
            )
342
0
            .add_optional_service(
343
0
                services
344
0
                    .experimental_bep
345
0
                    .map_or(Ok(None), |cfg| {
346
0
                        BepServer::new(&cfg, &store_manager)
347
0
                            .map(|v| Some(service_setup!(v.into_service(), http_config)))
348
0
                    })
349
0
                    .err_tip(|| "Could not create BEP service")?,
350
            );
351
352
0
        let health_registry = health_registry_builder.lock().await.build();
353
354
0
        let mut svc =
355
0
            tonic_services
356
0
                .into_axum_router()
357
0
                .layer(nativelink_util::telemetry::OtlpLayer::new(
358
0
                    server_cfg.experimental_identity_header.required,
359
                ));
360
361
0
        if let Some(health_cfg) = services.health {
362
0
            let path = if health_cfg.path.is_empty() {
363
0
                DEFAULT_HEALTH_STATUS_CHECK_PATH
364
            } else {
365
0
                &health_cfg.path
366
            };
367
0
            svc = svc.route_service(path, HealthServer::new(health_registry, &health_cfg));
368
0
        }
369
370
0
        if let Some(admin_config) = services.admin {
371
0
            let path = if admin_config.path.is_empty() {
372
0
                DEFAULT_ADMIN_API_PATH
373
            } else {
374
0
                &admin_config.path
375
            };
376
0
            let worker_schedulers = Arc::new(worker_schedulers.clone());
377
0
            svc = svc.nest_service(
378
0
                path,
379
0
                Router::new().route(
380
0
                    "/scheduler/{instance_name}/set_drain_worker/{worker_id}/{is_draining}",
381
0
                    axum::routing::post(
382
0
                        move |params: axum::extract::Path<(String, String, String)>| async move {
383
0
                            let (instance_name, worker_id, is_draining) = params.0;
384
0
                            (async move {
385
0
                                let is_draining = match is_draining.as_str() {
386
0
                                    "0" => false,
387
0
                                    "1" => true,
388
                                    _ => {
389
0
                                        return Err(make_err!(
390
0
                                            Code::Internal,
391
0
                                            "{} is neither 0 nor 1",
392
0
                                            is_draining
393
0
                                        ));
394
                                    }
395
                                };
396
0
                                worker_schedulers
397
0
                                    .get(&instance_name)
398
0
                                    .err_tip(|| {
399
0
                                        format!(
400
                                            "Can not get an instance with the name of '{}'",
401
0
                                            &instance_name
402
                                        )
403
0
                                    })?
404
0
                                    .clone()
405
0
                                    .set_drain_worker(&worker_id.clone().into(), is_draining)
406
0
                                    .await?;
407
0
                                Ok::<_, Error>(format!("Draining worker {worker_id}"))
408
                            })
409
0
                            .await
410
0
                            .map_err(|e| {
411
0
                                Err::<String, _>((
412
0
                                    StatusCode::INTERNAL_SERVER_ERROR,
413
0
                                    format!("Error: {e:?}"),
414
0
                                ))
415
0
                            })
416
0
                        },
417
                    ),
418
                ),
419
            );
420
0
        }
421
422
        // This is the default service that executes if no other endpoint matches.
423
0
        svc = svc.fallback(|uri: Uri| async move {
424
0
            warn!("No route for {uri}");
425
0
            (StatusCode::NOT_FOUND, format!("No route for {uri}"))
426
0
        });
427
428
        // Configure our TLS acceptor if we have TLS configured.
429
0
        let maybe_tls_acceptor = http_config.tls.map_or(Ok(None), |tls_config| {
430
0
            fn read_cert(cert_file: &str) -> Result<Vec<CertificateDer<'static>>, Error> {
431
0
                let mut cert_reader = std::io::BufReader::new(
432
0
                    std::fs::File::open(cert_file)
433
0
                        .err_tip(|| format!("Could not open cert file {cert_file}"))?,
434
                );
435
0
                let certs = CertificateDer::pem_reader_iter(&mut cert_reader)
436
0
                    .collect::<Result<Vec<CertificateDer<'_>>, _>>()
437
0
                    .err_tip(|| format!("Could not extract certs from file {cert_file}"))?;
438
0
                Ok(certs)
439
0
            }
440
0
            let certs = read_cert(&tls_config.cert_file)?;
441
0
            let mut key_reader = std::io::BufReader::new(
442
0
                std::fs::File::open(&tls_config.key_file)
443
0
                    .err_tip(|| format!("Could not open key file {}", tls_config.key_file))?,
444
            );
445
0
            let key = match PrivateKeyDer::from_pem_reader(&mut key_reader)
446
0
                .err_tip(|| format!("Could not extract key(s) from file {}", tls_config.key_file))?
447
            {
448
0
                PrivateKeyDer::Pkcs8(key) => key.into(),
449
0
                PrivateKeyDer::Sec1(key) => key.into(),
450
0
                PrivateKeyDer::Pkcs1(key) => key.into(),
451
                _ => {
452
0
                    return Err(make_err!(
453
0
                        Code::Internal,
454
0
                        "No keys found in file {}",
455
0
                        tls_config.key_file
456
0
                    ));
457
                }
458
            };
459
0
            if PrivateKeyDer::from_pem_reader(&mut key_reader).is_ok() {
460
0
                return Err(make_err!(
461
0
                    Code::InvalidArgument,
462
0
                    "Expected 1 key in file {}",
463
0
                    tls_config.key_file
464
0
                ));
465
0
            }
466
0
            let verifier = if let Some(client_ca_file) = &tls_config.client_ca_file {
467
0
                let mut client_auth_roots = RootCertStore::empty();
468
0
                for cert in read_cert(client_ca_file)? {
469
0
                    client_auth_roots.add(cert).map_err(|e| {
470
0
                        Error::from_std_err(Code::Internal, &e).append("Could not read client CA")
471
0
                    })?;
472
                }
473
0
                let crls = if let Some(client_crl_file) = &tls_config.client_crl_file {
474
0
                    let mut crl_reader = std::io::BufReader::new(
475
0
                        std::fs::File::open(client_crl_file)
476
0
                            .err_tip(|| format!("Could not open CRL file {client_crl_file}"))?,
477
                    );
478
0
                    CertificateRevocationListDer::pem_reader_iter(&mut crl_reader)
479
0
                        .collect::<Result<_, _>>()
480
0
                        .err_tip(|| format!("Could not extract CRLs from file {client_crl_file}"))?
481
                } else {
482
0
                    Vec::new()
483
                };
484
0
                WebPkiClientVerifier::builder(Arc::new(client_auth_roots))
485
0
                    .with_crls(crls)
486
0
                    .build()
487
0
                    .map_err(|e| {
488
0
                        Error::from_std_err(Code::Internal, &e)
489
0
                            .append("Could not create WebPkiClientVerifier")
490
0
                    })?
491
            } else {
492
0
                WebPkiClientVerifier::no_client_auth()
493
            };
494
0
            let mut config = TlsServerConfig::builder()
495
0
                .with_client_cert_verifier(verifier)
496
0
                .with_single_cert(certs, key)
497
0
                .map_err(|e| {
498
0
                    Error::from_std_err(Code::Internal, &e)
499
0
                        .append("Could not create TlsServerConfig")
500
0
                })?;
501
502
0
            config.alpn_protocols.push("h2".into());
503
0
            Ok(Some(TlsAcceptor::from(Arc::new(config))))
504
0
        })?;
505
506
0
        let socket_addr = http_config
507
0
            .socket_address
508
0
            .parse::<SocketAddr>()
509
0
            .map_err(|e| {
510
0
                Error::from_std_err(Code::InvalidArgument, &e)
511
0
                    .append(format!("Invalid address '{}'", http_config.socket_address))
512
0
            })?;
513
0
        let tcp_listener = TcpListener::bind(&socket_addr).await?;
514
0
        let mut http = auto::Builder::new(TaskExecutor::default());
515
516
0
        let http_config = &http_config.advanced_http;
517
0
        if let Some(value) = http_config.http2_keep_alive_interval {
518
0
            http.http2()
519
0
                .keep_alive_interval(Duration::from_secs(u64::from(value)));
520
0
        }
521
522
0
        if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams {
523
0
            http.http2()
524
0
                .max_pending_accept_reset_streams(usize::try_from(value).err_tip(
525
                    || "Could not convert experimental_http2_max_pending_accept_reset_streams",
526
0
                )?);
527
0
        }
528
0
        if let Some(value) = http_config.experimental_http2_initial_stream_window_size {
529
0
            http.http2().initial_stream_window_size(value);
530
0
        }
531
0
        if let Some(value) = http_config.experimental_http2_initial_connection_window_size {
532
0
            http.http2().initial_connection_window_size(value);
533
0
        }
534
0
        if let Some(value) = http_config.experimental_http2_adaptive_window {
535
0
            http.http2().adaptive_window(value);
536
0
        }
537
0
        if let Some(value) = http_config.experimental_http2_max_frame_size {
538
0
            http.http2().max_frame_size(value);
539
0
        }
540
0
        if let Some(value) = http_config.experimental_http2_max_concurrent_streams {
541
0
            http.http2().max_concurrent_streams(value);
542
0
        }
543
0
        if let Some(value) = http_config.experimental_http2_keep_alive_timeout {
544
0
            http.http2()
545
0
                .keep_alive_timeout(Duration::from_secs(u64::from(value)));
546
0
        }
547
0
        if let Some(value) = http_config.experimental_http2_max_send_buf_size {
548
0
            http.http2().max_send_buf_size(
549
0
                usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?,
550
            );
551
0
        }
552
0
        if http_config.experimental_http2_enable_connect_protocol == Some(true) {
553
0
            http.http2().enable_connect_protocol();
554
0
        }
555
0
        if let Some(value) = http_config.experimental_http2_max_header_list_size {
556
0
            http.http2().max_header_list_size(value);
557
0
        }
558
0
        info!("Ready, listening on {socket_addr}",);
559
0
        root_futures.push(Box::pin(async move {
560
            loop {
561
0
                select! {
562
0
                    accept_result = tcp_listener.accept() => {
563
0
                        match accept_result {
564
0
                            Ok((tcp_stream, remote_addr)) => {
565
0
                                info!(
566
                                    target: "nativelink::services",
567
                                    ?remote_addr,
568
                                    ?socket_addr,
569
                                    "Client connected"
570
                                );
571
572
0
                                let (http, svc, maybe_tls_acceptor) =
573
0
                                    (http.clone(), svc.clone(), maybe_tls_acceptor.clone());
574
575
0
                                background_spawn!(
576
                                    name: "http_connection",
577
0
                                    fut: error_span!(
578
                                        "http_connection",
579
                                        remote_addr = %remote_addr,
580
                                        socket_addr = %socket_addr,
581
0
                                    ).in_scope(|| async move {
582
0
                                        let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor {
583
0
                                            match tls_acceptor.accept(tcp_stream).await {
584
0
                                                Ok(tls_stream) => Either::Left(http.serve_connection(
585
0
                                                    TokioIo::new(tls_stream),
586
0
                                                    TowerToHyperService::new(svc),
587
0
                                                )),
588
0
                                                Err(err) => {
589
0
                                                    error!(?err, "Failed to accept tls stream");
590
0
                                                    return;
591
                                                }
592
                                            }
593
                                        } else {
594
0
                                            Either::Right(http.serve_connection(
595
0
                                                TokioIo::new(tcp_stream),
596
0
                                                TowerToHyperService::new(svc),
597
0
                                            ))
598
                                        };
599
600
0
                                        if let Err(err) = serve_connection.await {
601
0
                                            error!(
602
                                                target: "nativelink::services",
603
                                                ?err,
604
                                                "Failed running service"
605
                                            );
606
0
                                        }
607
0
                                    }),
608
                                    target: "nativelink::services",
609
                                    ?remote_addr,
610
                                    ?socket_addr,
611
                                );
612
                            },
613
0
                            Err(err) => {
614
0
                                error!(?err, "Failed to accept tcp connection");
615
                            }
616
                        }
617
                    },
618
                }
619
            }
620
            // Unreachable
621
        }));
622
    }
623
624
    {
625
        // We start workers after our TcpListener is setup so if our worker connects to one
626
        // of these services it will be able to connect.
627
0
        let worker_cfgs = cfg.workers.unwrap_or_default();
628
0
        let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
629
0
        for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
630
0
            let spawn_fut = match worker_cfg {
631
0
                WorkerConfig::Local(local_worker_cfg) => {
632
0
                    let fast_slow_store = store_manager
633
0
                        .get_store(&local_worker_cfg.cas_fast_slow_store)
634
0
                        .err_tip(|| {
635
0
                            format!(
636
                                "Failed to find store for cas_store_ref in worker config : {}",
637
                                local_worker_cfg.cas_fast_slow_store
638
                            )
639
0
                        })?;
640
641
0
                    let maybe_ac_store = if let Some(ac_store_ref) =
642
0
                        &local_worker_cfg.upload_action_result.ac_store
643
                    {
644
0
                        Some(store_manager.get_store(ac_store_ref).err_tip(|| {
645
0
                            format!("Failed to find store for ac_store in worker config : {ac_store_ref}")
646
0
                        })?)
647
                    } else {
648
0
                        None
649
                    };
650
                    // Note: Defaults to fast_slow_store if not specified. If this ever changes it must
651
                    // be updated in config documentation for the `historical_results_store` the field.
652
0
                    let historical_store = if let Some(cas_store_ref) = &local_worker_cfg
653
0
                        .upload_action_result
654
0
                        .historical_results_store
655
                    {
656
0
                        store_manager.get_store(cas_store_ref).err_tip(|| {
657
0
                                format!(
658
                                "Failed to find store for historical_results_store in worker config : {cas_store_ref}"
659
                            )
660
0
                            })?
661
                    } else {
662
0
                        fast_slow_store.clone()
663
                    };
664
0
                    let local_worker = new_local_worker(
665
0
                        Arc::new(local_worker_cfg),
666
0
                        fast_slow_store,
667
0
                        maybe_ac_store,
668
0
                        historical_store,
669
0
                    )
670
0
                    .await
671
0
                    .err_tip(|| "Could not make LocalWorker")?;
672
673
0
                    let name = if local_worker.name().is_empty() {
674
0
                        format!("worker_{i}")
675
                    } else {
676
0
                        local_worker.name().clone()
677
                    };
678
679
0
                    if worker_names.contains(&name) {
680
0
                        Err(make_input_err!(
681
0
                            "Duplicate worker name '{}' found in config",
682
0
                            name
683
0
                        ))?;
684
0
                    }
685
0
                    worker_names.insert(name.clone());
686
0
                    let shutdown_rx = shutdown_tx.subscribe();
687
0
                    let fut = trace_span!("worker_ctx", worker_name = %name)
688
0
                        .in_scope(|| local_worker.run(shutdown_rx));
689
0
                    spawn!("worker", fut, ?name)
690
                }
691
            };
692
0
            root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
693
        }
694
    }
695
696
    // Set up a shutdown handler for the worker schedulers.
697
0
    let mut shutdown_rx = shutdown_tx.subscribe();
698
0
    root_futures.push(Box::pin(async move {
699
0
        if let Ok(shutdown_guard) = shutdown_rx.recv().await {
700
0
            let _ = scheduler_shutdown_tx.send(());
701
0
            for (_name, scheduler) in worker_schedulers {
702
0
                scheduler.shutdown(shutdown_guard.clone()).await;
703
            }
704
0
        }
705
0
        Ok(())
706
0
    }));
707
708
0
    if let Err(e) = try_join_all(root_futures).await {
709
0
        panic!("{e:?}");
710
0
    }
711
712
0
    Ok(())
713
0
}
714
715
0
fn get_config() -> Result<CasConfig, Error> {
716
0
    let args = Args::parse();
717
0
    CasConfig::try_from_json5_file(&args.config_file)
718
0
}
719
720
0
fn main() -> Result<(), Box<dyn core::error::Error>> {
721
    #[expect(clippy::disallowed_methods, reason = "starting main runtime")]
722
0
    let runtime = tokio::runtime::Builder::new_multi_thread()
723
0
        .enable_all()
724
0
        .build()?;
725
726
    // The OTLP exporters need to run in a Tokio context
727
    // Do this first so all the other logging works
728
    #[expect(clippy::disallowed_methods, reason = "tracing init on main runtime")]
729
0
    runtime.block_on(async { tokio::spawn(async { init_tracing() }).await? })?;
730
731
0
    let mut cfg = get_config()?;
732
733
0
    let global_cfg = if let Some(global_cfg) = &mut cfg.global {
734
0
        if global_cfg.max_open_files == 0 {
735
0
            global_cfg.max_open_files = fs::DEFAULT_OPEN_FILE_LIMIT;
736
0
        }
737
0
        if global_cfg.default_digest_size_health_check == 0 {
738
0
            global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
739
0
        }
740
741
0
        *global_cfg
742
    } else {
743
0
        GlobalConfig {
744
0
            max_open_files: fs::DEFAULT_OPEN_FILE_LIMIT,
745
0
            default_digest_hash_function: None,
746
0
            default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
747
0
        }
748
    };
749
0
    set_open_file_limit(global_cfg.max_open_files);
750
0
    set_default_digest_hasher_func(DigestHasherFunc::from(
751
0
        global_cfg
752
0
            .default_digest_hash_function
753
0
            .unwrap_or(ConfigDigestHashFunction::Sha256),
754
0
    ))?;
755
0
    set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
756
757
    // Initiates the shutdown process by broadcasting the shutdown signal via the `oneshot::Sender` to all listeners.
758
    // Each listener will perform its cleanup and then drop its `oneshot::Sender`, signaling completion.
759
    // Once all `oneshot::Sender` instances are dropped, the worker knows it can safely terminate.
760
0
    let (shutdown_tx, _) = broadcast::channel::<ShutdownGuard>(BROADCAST_CAPACITY);
761
    #[cfg(target_family = "unix")]
762
0
    let shutdown_tx_clone = shutdown_tx.clone();
763
    #[cfg(target_family = "unix")]
764
0
    let mut shutdown_guard = ShutdownGuard::default();
765
766
    #[expect(clippy::disallowed_methods, reason = "signal handler on main runtime")]
767
0
    runtime.spawn(async move {
768
0
        tokio::signal::ctrl_c()
769
0
            .await
770
0
            .expect("Failed to listen to SIGINT");
771
0
        eprintln!("User terminated process via SIGINT");
772
0
        std::process::exit(130);
773
    });
774
775
    #[allow(unused_variables)]
776
0
    let (scheduler_shutdown_tx, scheduler_shutdown_rx) = oneshot::channel();
777
778
    #[cfg(target_family = "unix")]
779
    #[expect(clippy::disallowed_methods, reason = "signal handler on main runtime")]
780
0
    runtime.spawn(async move {
781
0
        signal(SignalKind::terminate())
782
0
            .expect("Failed to listen to SIGTERM")
783
0
            .recv()
784
0
            .await;
785
0
        warn!("Process terminated via SIGTERM",);
786
0
        drop(shutdown_tx_clone.send(shutdown_guard.clone()));
787
0
        scheduler_shutdown_rx
788
0
            .await
789
0
            .expect("Failed to receive scheduler shutdown");
790
0
        let () = shutdown_guard.wait_for(Priority::P0).await;
791
0
        warn!("Successfully shut down nativelink.",);
792
0
        std::process::exit(143);
793
    });
794
795
    #[expect(clippy::disallowed_methods, reason = "waiting on everything to finish")]
796
0
    runtime
797
0
        .block_on(async {
798
0
            trace_span!("main")
799
0
                .in_scope(|| async { inner_main(cfg, shutdown_tx, scheduler_shutdown_tx).await })
800
0
                .await
801
0
        })
802
0
        .err_tip(|| "main() function failed")?;
803
0
    Ok(())
804
0
}