Coverage Report

Created: 2024-11-20 10:13

/build/source/src/bin/nativelink.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::{HashMap, HashSet};
16
use std::net::SocketAddr;
17
use std::sync::Arc;
18
use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20
use async_lock::Mutex as AsyncMutex;
21
use axum::Router;
22
use clap::Parser;
23
use futures::future::{try_join_all, BoxFuture, Either, OptionFuture, TryFutureExt};
24
use hyper::{Response, StatusCode};
25
use hyper_util::rt::tokio::TokioIo;
26
use hyper_util::server::conn::auto;
27
use hyper_util::service::TowerToHyperService;
28
use mimalloc::MiMalloc;
29
use nativelink_config::cas_server::{
30
    CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, ServerConfig, WorkerConfig,
31
};
32
use nativelink_config::stores::ConfigDigestHashFunction;
33
use nativelink_error::{make_err, Code, Error, ResultExt};
34
use nativelink_metric::{
35
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, RootMetricsComponent,
36
};
37
use nativelink_metric_collector::{otel_export, MetricsCollectorLayer};
38
use nativelink_scheduler::default_scheduler_factory::scheduler_factory;
39
use nativelink_service::ac_server::AcServer;
40
use nativelink_service::bep_server::BepServer;
41
use nativelink_service::bytestream_server::ByteStreamServer;
42
use nativelink_service::capabilities_server::CapabilitiesServer;
43
use nativelink_service::cas_server::CasServer;
44
use nativelink_service::execution_server::ExecutionServer;
45
use nativelink_service::health_server::HealthServer;
46
use nativelink_service::worker_api_server::WorkerApiServer;
47
use nativelink_store::default_store_factory::store_factory;
48
use nativelink_store::store_manager::StoreManager;
49
use nativelink_util::action_messages::WorkerId;
50
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
51
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
52
use nativelink_util::health_utils::HealthRegistryBuilder;
53
use nativelink_util::metrics_utils::{set_metrics_enabled_for_this_thread, Counter};
54
use nativelink_util::operation_state_manager::ClientStateManager;
55
use nativelink_util::origin_context::OriginContext;
56
use nativelink_util::store_trait::{
57
    set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
58
};
59
use nativelink_util::task::TaskExecutor;
60
use nativelink_util::{background_spawn, init_tracing, spawn, spawn_blocking};
61
use nativelink_worker::local_worker::new_local_worker;
62
use opentelemetry::metrics::MeterProvider;
63
use opentelemetry_sdk::metrics::SdkMeterProvider;
64
use parking_lot::{Mutex, RwLock};
65
use prometheus::{Encoder, TextEncoder};
66
use rustls_pemfile::{certs as extract_certs, crls as extract_crls};
67
use scopeguard::guard;
68
use tokio::net::TcpListener;
69
use tokio::select;
70
#[cfg(target_family = "unix")]
71
use tokio::signal::unix::{signal, SignalKind};
72
use tokio::sync::{broadcast, oneshot};
73
use tokio_rustls::rustls::pki_types::{CertificateDer, CertificateRevocationListDer};
74
use tokio_rustls::rustls::server::WebPkiClientVerifier;
75
use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig};
76
use tokio_rustls::TlsAcceptor;
77
use tonic::codec::CompressionEncoding;
78
use tonic::transport::Server as TonicServer;
79
use tracing::{error_span, event, trace_span, Level};
80
use tracing_subscriber::layer::SubscriberExt;
81
82
#[global_allocator]
83
static GLOBAL: MiMalloc = MiMalloc;
84
85
/// Note: This must be kept in sync with the documentation in `PrometheusConfig::path`.
86
const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";
87
88
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
89
const DEFAULT_ADMIN_API_PATH: &str = "/admin";
90
91
// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
92
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";
93
94
/// Name of environment variable to disable metrics.
95
const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS";
96
97
/// Broadcast Channel Capacity
98
/// Note: The actual capacity may be greater than the provided capacity.
99
const BROADCAST_CAPACITY: usize = 1;
100
101
/// Backend for bazel remote execution / cache API.
102
#[derive(Parser, Debug)]
103
#[clap(
104
    author = "Trace Machina, Inc. <nativelink@tracemachina.com>",
105
    version,
106
    about,
107
    long_about = None
108
)]
109
struct Args {
110
    /// Config file to use.
111
    #[clap(value_parser)]
112
0
    config_file: String,
113
}
114
115
/// The root metrics collector struct. All metrics will be
116
/// collected from this struct traversing down each child
117
/// component.
118
0
#[derive(MetricsComponent)]
119
struct RootMetrics {
120
    #[metric(group = "stores")]
121
    stores: Arc<dyn RootMetricsComponent>,
122
    #[metric(group = "servers")]
123
    servers: HashMap<String, Arc<dyn RootMetricsComponent>>,
124
    #[metric(group = "workers")]
125
    workers: HashMap<String, Arc<dyn RootMetricsComponent>>,
126
    // TODO(allada) We cannot upcast these to RootMetricsComponent because
127
    // of https://github.com/rust-lang/rust/issues/65991.
128
    // TODO(allada) To prevent output from being too verbose we only
129
    // print the action_schedulers.
130
    #[metric(group = "action_schedulers")]
131
    schedulers: HashMap<String, Arc<dyn ClientStateManager>>,
132
}
133
134
impl RootMetricsComponent for RootMetrics {}
135
136
/// Wrapper to allow us to hash `SocketAddr` for metrics.
137
#[derive(Hash, PartialEq, Eq)]
138
struct SocketAddrWrapper(SocketAddr);
139
140
impl MetricsComponent for SocketAddrWrapper {
141
0
    fn publish(
142
0
        &self,
143
0
        _kind: MetricKind,
144
0
        _field_metadata: MetricFieldData,
145
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
146
0
        Ok(MetricPublishKnownKindData::String(self.0.to_string()))
147
0
    }
148
}
149
150
impl RootMetricsComponent for SocketAddrWrapper {}
151
152
/// Simple wrapper to enable us to register the Hashmap so it can
153
/// report metrics about what clients are connected.
154
0
#[derive(MetricsComponent)]
155
struct ConnectedClientsMetrics {
156
    #[metric(group = "currently_connected_clients")]
157
    inner: Mutex<HashSet<SocketAddrWrapper>>,
158
    #[metric(help = "Total client connections since server started")]
159
    counter: Counter,
160
    #[metric(help = "Timestamp when the server started")]
161
    server_start_ts: u64,
162
}
163
164
impl RootMetricsComponent for ConnectedClientsMetrics {}
165
166
0
async fn inner_main(
167
0
    cfg: CasConfig,
168
0
    server_start_timestamp: u64,
169
0
    shutdown_tx: broadcast::Sender<Arc<oneshot::Sender<()>>>,
170
0
) -> Result<(), Box<dyn std::error::Error>> {
171
0
    fn into_encoding(from: HttpCompressionAlgorithm) -> Option<CompressionEncoding> {
172
0
        match from {
173
0
            HttpCompressionAlgorithm::gzip => Some(CompressionEncoding::Gzip),
174
0
            HttpCompressionAlgorithm::none => None,
175
        }
176
0
    }
177
178
0
    let health_registry_builder =
179
0
        Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink")));
180
0
181
0
    let store_manager = Arc::new(StoreManager::new());
182
    {
183
0
        let mut health_registry_lock = health_registry_builder.lock().await;
184
185
0
        for (name, store_cfg) in cfg.stores {
186
0
            let health_component_name = format!("stores/{name}");
187
0
            let mut health_register_store =
188
0
                health_registry_lock.sub_builder(&health_component_name);
189
0
            let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store))
190
0
                .await
191
0
                .err_tip(|| format!("Failed to create store '{name}'"))?;
192
0
            store_manager.add_store(&name, store);
193
        }
194
    }
195
196
0
    let mut action_schedulers = HashMap::new();
197
0
    let mut worker_schedulers = HashMap::new();
198
0
    if let Some(schedulers_cfg) = cfg.schedulers {
  Branch (198:12): [Folded - Ignored]
199
0
        for (name, scheduler_cfg) in schedulers_cfg {
200
0
            let (maybe_action_scheduler, maybe_worker_scheduler) =
201
0
                scheduler_factory(&scheduler_cfg, &store_manager)
202
0
                    .err_tip(|| format!("Failed to create scheduler '{name}'"))?;
203
0
            if let Some(action_scheduler) = maybe_action_scheduler {
  Branch (203:20): [Folded - Ignored]
204
0
                action_schedulers.insert(name.clone(), action_scheduler.clone());
205
0
            }
206
0
            if let Some(worker_scheduler) = maybe_worker_scheduler {
  Branch (206:20): [Folded - Ignored]
207
0
                worker_schedulers.insert(name.clone(), worker_scheduler.clone());
208
0
            }
209
        }
210
0
    }
211
212
0
    let mut server_metrics: HashMap<String, Arc<dyn RootMetricsComponent>> = HashMap::new();
213
0
    // Registers all the ConnectedClientsMetrics to the registries
214
0
    // and zips them in. It is done this way to get around the need
215
0
    // for `root_metrics_registry` to become immutable in the loop.
216
0
    let servers_and_clients: Vec<(ServerConfig, _)> = cfg
217
0
        .servers
218
0
        .into_iter()
219
0
        .enumerate()
220
0
        .map(|(i, server_cfg)| {
221
0
            let name = if server_cfg.name.is_empty() {
  Branch (221:27): [Folded - Ignored]
222
0
                format!("{i}")
223
            } else {
224
0
                server_cfg.name.clone()
225
            };
226
0
            let connected_clients_mux = Arc::new(ConnectedClientsMetrics {
227
0
                inner: Mutex::new(HashSet::new()),
228
0
                counter: Counter::default(),
229
0
                server_start_ts: server_start_timestamp,
230
0
            });
231
0
            server_metrics.insert(name.clone(), connected_clients_mux.clone());
232
0
233
0
            (server_cfg, connected_clients_mux)
234
0
        })
235
0
        .collect();
236
0
237
0
    let mut root_futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
238
0
239
0
    let root_metrics = Arc::new(RwLock::new(RootMetrics {
240
0
        stores: store_manager.clone(),
241
0
        servers: server_metrics,
242
0
        workers: HashMap::new(), // Will be filled in later.
243
0
        schedulers: action_schedulers.clone(),
244
0
    }));
245
246
0
    for (server_cfg, connected_clients_mux) in servers_and_clients {
247
0
        let services = server_cfg.services.ok_or("'services' must be configured")?;
248
249
        // Currently we only support http as our socket type.
250
0
        let ListenerConfig::http(http_config) = server_cfg.listener;
251
252
0
        let tonic_services = TonicServer::builder()
253
0
            .add_optional_service(
254
0
                services
255
0
                    .ac
256
0
                    .map_or(Ok(None), |cfg| {
257
0
                        AcServer::new(&cfg, &store_manager).map(|v| {
258
0
                            let mut service = v.into_service();
259
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
260
0
                            if let Some(encoding) =
  Branch (260:36): [Folded - Ignored]
261
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
262
0
                            {
263
0
                                service = service.send_compressed(encoding);
264
0
                            }
265
0
                            for encoding in http_config
266
0
                                .compression
267
0
                                .accepted_compression_algorithms
268
0
                                .iter()
269
0
                                // Filter None values.
270
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
271
0
                            {
272
0
                                service = service.accept_compressed(encoding);
273
0
                            }
274
0
                            Some(service)
275
0
                        })
276
0
                    })
277
0
                    .err_tip(|| "Could not create AC service")?,
278
            )
279
            .add_optional_service(
280
0
                services
281
0
                    .cas
282
0
                    .map_or(Ok(None), |cfg| {
283
0
                        CasServer::new(&cfg, &store_manager).map(|v| {
284
0
                            let mut service = v.into_service();
285
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
286
0
                            if let Some(encoding) =
  Branch (286:36): [Folded - Ignored]
287
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
288
0
                            {
289
0
                                service = service.send_compressed(encoding);
290
0
                            }
291
0
                            for encoding in http_config
292
0
                                .compression
293
0
                                .accepted_compression_algorithms
294
0
                                .iter()
295
0
                                // Filter None values.
296
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
297
0
                            {
298
0
                                service = service.accept_compressed(encoding);
299
0
                            }
300
0
                            Some(service)
301
0
                        })
302
0
                    })
303
0
                    .err_tip(|| "Could not create CAS service")?,
304
            )
305
            .add_optional_service(
306
0
                services
307
0
                    .execution
308
0
                    .map_or(Ok(None), |cfg| {
309
0
                        ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| {
310
0
                            let mut service = v.into_service();
311
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
312
0
                            if let Some(encoding) =
  Branch (312:36): [Folded - Ignored]
313
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
314
0
                            {
315
0
                                service = service.send_compressed(encoding);
316
0
                            }
317
0
                            for encoding in http_config
318
0
                                .compression
319
0
                                .accepted_compression_algorithms
320
0
                                .iter()
321
0
                                // Filter None values.
322
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
323
0
                            {
324
0
                                service = service.accept_compressed(encoding);
325
0
                            }
326
0
                            Some(service)
327
0
                        })
328
0
                    })
329
0
                    .err_tip(|| "Could not create Execution service")?,
330
            )
331
            .add_optional_service(
332
0
                services
333
0
                    .bytestream
334
0
                    .map_or(Ok(None), |cfg| {
335
0
                        ByteStreamServer::new(&cfg, &store_manager).map(|v| {
336
0
                            let mut service = v.into_service();
337
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
338
0
                            if let Some(encoding) =
  Branch (338:36): [Folded - Ignored]
339
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
340
0
                            {
341
0
                                service = service.send_compressed(encoding);
342
0
                            }
343
0
                            for encoding in http_config
344
0
                                .compression
345
0
                                .accepted_compression_algorithms
346
0
                                .iter()
347
0
                                // Filter None values.
348
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
349
0
                            {
350
0
                                service = service.accept_compressed(encoding);
351
0
                            }
352
0
                            Some(service)
353
0
                        })
354
0
                    })
355
0
                    .err_tip(|| "Could not create ByteStream service")?,
356
            )
357
            .add_optional_service(
358
0
                OptionFuture::from(
359
0
                    services
360
0
                        .capabilities
361
0
                        .as_ref()
362
0
                        // Borrow checker fighting here...
363
0
                        .map(|_| {
364
0
                            CapabilitiesServer::new(
365
0
                                services.capabilities.as_ref().unwrap(),
366
0
                                &action_schedulers,
367
0
                            )
368
0
                        }),
369
0
                )
370
0
                .await
371
0
                .map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
372
0
                    Ok(Some(server?))
373
0
                })
374
0
                .err_tip(|| "Could not create Capabilities service")?
375
0
                .map(|v| {
376
0
                    let mut service = v.into_service();
377
0
                    let send_algo = &http_config.compression.send_compression_algorithm;
378
0
                    if let Some(encoding) =
  Branch (378:28): [Folded - Ignored]
379
0
                        into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
380
0
                    {
381
0
                        service = service.send_compressed(encoding);
382
0
                    }
383
0
                    for encoding in http_config
384
0
                        .compression
385
0
                        .accepted_compression_algorithms
386
0
                        .iter()
387
0
                        // Filter None values.
388
0
                        .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
389
0
                    {
390
0
                        service = service.accept_compressed(encoding);
391
0
                    }
392
0
                    service
393
0
                }),
394
0
            )
395
0
            .add_optional_service(
396
0
                services
397
0
                    .worker_api
398
0
                    .map_or(Ok(None), |cfg| {
399
0
                        WorkerApiServer::new(&cfg, &worker_schedulers).map(|v| {
400
0
                            let mut service = v.into_service();
401
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
402
0
                            if let Some(encoding) =
  Branch (402:36): [Folded - Ignored]
403
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
404
0
                            {
405
0
                                service = service.send_compressed(encoding);
406
0
                            }
407
0
                            for encoding in http_config
408
0
                                .compression
409
0
                                .accepted_compression_algorithms
410
0
                                .iter()
411
0
                                // Filter None values.
412
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
413
0
                            {
414
0
                                service = service.accept_compressed(encoding);
415
0
                            }
416
0
                            Some(service)
417
0
                        })
418
0
                    })
419
0
                    .err_tip(|| "Could not create WorkerApi service")?,
420
            )
421
            .add_optional_service(
422
0
                services
423
0
                    .experimental_bep
424
0
                    .map_or(Ok(None), |cfg| {
425
0
                        BepServer::new(&cfg, &store_manager).map(|v| {
426
0
                            let mut service = v.into_service();
427
0
                            let send_algo = &http_config.compression.send_compression_algorithm;
428
0
                            if let Some(encoding) =
  Branch (428:36): [Folded - Ignored]
429
0
                                into_encoding(send_algo.unwrap_or(HttpCompressionAlgorithm::none))
430
0
                            {
431
0
                                service = service.send_compressed(encoding);
432
0
                            }
433
0
                            for encoding in http_config
434
0
                                .compression
435
0
                                .accepted_compression_algorithms
436
0
                                .iter()
437
0
                                // Filter None values.
438
0
                                .filter_map(|from: &HttpCompressionAlgorithm| into_encoding(*from))
439
0
                            {
440
0
                                service = service.accept_compressed(encoding);
441
0
                            }
442
0
                            Some(service)
443
0
                        })
444
0
                    })
445
0
                    .err_tip(|| "Could not create BEP service")?,
446
            );
447
448
0
        let health_registry = health_registry_builder.lock().await.build();
449
0
450
0
        let mut svc = Router::new()
451
0
            .merge(tonic_services.into_service().into_axum_router())
452
0
            // This is the default service that executes if no other endpoint matches.
453
0
            .fallback((StatusCode::NOT_FOUND, "Not Found"));
454
455
0
        if let Some(health_cfg) = services.health {
  Branch (455:16): [Folded - Ignored]
456
0
            let path = if health_cfg.path.is_empty() {
  Branch (456:27): [Folded - Ignored]
457
0
                DEFAULT_HEALTH_STATUS_CHECK_PATH
458
            } else {
459
0
                &health_cfg.path
460
            };
461
0
            svc = svc.route_service(path, HealthServer::new(health_registry));
462
0
        }
463
464
0
        if let Some(prometheus_cfg) = services.experimental_prometheus {
  Branch (464:16): [Folded - Ignored]
465
0
            fn error_to_response<E: std::error::Error>(e: E) -> Response<axum::body::Body> {
466
0
                let mut response = Response::new(format!("Error: {e:?}").into());
467
0
                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
468
0
                response
469
0
            }
470
0
            let path = if prometheus_cfg.path.is_empty() {
  Branch (470:27): [Folded - Ignored]
471
0
                DEFAULT_PROMETHEUS_METRICS_PATH
472
            } else {
473
0
                &prometheus_cfg.path
474
            };
475
476
0
            let root_metrics_clone = root_metrics.clone();
477
0
478
0
            svc = svc.route_service(
479
0
                path,
480
0
                axum::routing::get(move |request: hyper::Request<axum::body::Body>| {
481
0
                    Arc::new(OriginContext::new()).wrap_async(
482
0
                        trace_span!("prometheus_ctx"),
483
0
                        async move {
484
0
                            // We spawn on a thread that can block to give more freedom to our metrics
485
0
                            // collection. This allows it to call functions like `tokio::block_in_place`
486
0
                            // if it needs to wait on a future.
487
0
                            spawn_blocking!("prometheus_metrics", move || {
488
0
                                let (layer, output_metrics) = MetricsCollectorLayer::new();
489
0
490
0
                                // Traverse all the MetricsComponent's. The `MetricsCollectorLayer` will
491
0
                                // collect all the metrics and store them in `output_metrics`.
492
0
                                tracing::subscriber::with_default(
493
0
                                    tracing_subscriber::registry().with(layer),
494
0
                                    || {
495
0
                                        let metrics_component = root_metrics_clone.read();
496
0
                                        MetricsComponent::publish(
497
0
                                            &*metrics_component,
498
0
                                            MetricKind::Component,
499
0
                                            MetricFieldData::default(),
500
0
                                        )
501
0
                                    },
502
0
                                )
503
0
                                .map_err(|e| make_err!(Code::Internal, "{e}"))
504
0
                                .err_tip(|| "While processing prometheus metrics")?;
505
506
                                // Convert the collected metrics into OpenTelemetry metrics then
507
                                // encode them into Prometheus format and populate them into a
508
                                // hyper::Response.
509
0
                                let response = {
510
0
                                    let registry = prometheus::Registry::new();
511
0
                                    let exporter = opentelemetry_prometheus::exporter()
512
0
                                        .with_registry(registry.clone())
513
0
                                        .without_counter_suffixes()
514
0
                                        .without_scope_info()
515
0
                                        .build()
516
0
                                        .map_err(|e| make_err!(Code::Internal, "{e}"))
517
0
                                        .err_tip(|| {
518
0
                                            "While creating OpenTelemetry Prometheus exporter"
519
0
                                        })?;
520
521
                                    // Prepare our OpenTelemetry collector/exporter.
522
0
                                    let provider =
523
0
                                        SdkMeterProvider::builder().with_reader(exporter).build();
524
0
                                    let meter = provider.meter("nativelink");
525
526
                                    // TODO(allada) We should put this as part of the config instead of a magic
527
                                    // request header.
528
0
                                    if let Some(json_type) =
  Branch (528:44): [Folded - Ignored]
529
0
                                        request.headers().get("x-nativelink-json")
530
                                    {
531
0
                                        let json_data = if json_type == "pretty" {
  Branch (531:60): [Folded - Ignored]
532
0
                                            serde_json::to_string_pretty(&*output_metrics.lock())
533
0
                                                .map_err(|e| {
534
0
                                                    make_err!(
535
0
                                                        Code::Internal,
536
0
                                                        "Could not convert to json {e:?}"
537
0
                                                    )
538
0
                                                })?
539
                                        } else {
540
0
                                            serde_json::to_string(&*output_metrics.lock()).map_err(
541
0
                                                |e| {
542
0
                                                    make_err!(
543
0
                                                        Code::Internal,
544
0
                                                        "Could not convert to json {e:?}"
545
0
                                                    )
546
0
                                                },
547
0
                                            )?
548
                                        };
549
0
                                        let mut response =
550
0
                                            Response::new(axum::body::Body::from(json_data));
551
0
                                        response.headers_mut().insert(
552
0
                                            hyper::header::CONTENT_TYPE,
553
0
                                            hyper::header::HeaderValue::from_static(
554
0
                                                "application/json",
555
0
                                            ),
556
0
                                        );
557
0
                                        return Ok(response);
558
0
                                    }
559
0
560
0
                                    // Export the metrics to OpenTelemetry.
561
0
                                    otel_export(
562
0
                                        "nativelink".to_string(),
563
0
                                        &meter,
564
0
                                        &output_metrics.lock(),
565
0
                                    );
566
0
567
0
                                    // Translate the OpenTelemetry metrics to Prometheus format and encode
568
0
                                    // them into a hyper::Response.
569
0
                                    let mut result = vec![];
570
0
                                    TextEncoder::new()
571
0
                                        .encode(&registry.gather(), &mut result)
572
0
                                        .unwrap();
573
0
                                    let mut response =
574
0
                                        Response::new(axum::body::Body::from(result));
575
0
                                    // Per spec we should probably use `application/openmetrics-text; version=1.0.0; charset=utf-8`
576
0
                                    // https://github.com/OpenObservability/OpenMetrics/blob/1386544931307dff279688f332890c31b6c5de36/specification/OpenMetrics.md#overall-structure
577
0
                                    // However, this makes debugging more difficult, so we use the old text/plain instead.
578
0
                                    response.headers_mut().insert(
579
0
                                        hyper::header::CONTENT_TYPE,
580
0
                                        hyper::header::HeaderValue::from_static(
581
0
                                            "text/plain; version=0.0.4; charset=utf-8",
582
0
                                        ),
583
0
                                    );
584
0
                                    Result::<_, Error>::Ok(response)
585
0
                                };
586
0
                                response
587
0
                            })
588
0
                            .await
589
0
                            .unwrap_or_else(|e| Ok(error_to_response(e)))
590
0
                            .unwrap_or_else(error_to_response)
591
0
                        },
592
0
                    )
593
0
                }),
594
0
            );
595
0
        }
596
597
0
        if let Some(admin_config) = services.admin {
  Branch (597:16): [Folded - Ignored]
598
0
            let path = if admin_config.path.is_empty() {
  Branch (598:27): [Folded - Ignored]
599
0
                DEFAULT_ADMIN_API_PATH
600
            } else {
601
0
                &admin_config.path
602
            };
603
0
            let worker_schedulers = Arc::new(worker_schedulers.clone());
604
0
            svc = svc.nest_service(
605
0
                path,
606
0
                Router::new().route(
607
0
                    "/scheduler/:instance_name/set_drain_worker/:worker_id/:is_draining",
608
0
                    axum::routing::post(
609
0
                        move |params: axum::extract::Path<(String, String, String)>| async move {
610
0
                            let (instance_name, worker_id, is_draining) = params.0;
611
0
                            (async move {
612
0
                                let is_draining = match is_draining.as_str() {
613
0
                                    "0" => false,
614
0
                                    "1" => true,
615
                                    _ => {
616
0
                                        return Err(make_err!(
617
0
                                            Code::Internal,
618
0
                                            "{} is neither 0 nor 1",
619
0
                                            is_draining
620
0
                                        ))
621
                                    }
622
                                };
623
0
                                worker_schedulers
624
0
                                    .get(&instance_name)
625
0
                                    .err_tip(|| {
626
0
                                        format!(
627
0
                                            "Can not get an instance with the name of '{}'",
628
0
                                            &instance_name
629
0
                                        )
630
0
                                    })?
631
0
                                    .clone()
632
0
                                    .set_drain_worker(
633
0
                                        &WorkerId::try_from(worker_id.clone())?,
634
0
                                        is_draining,
635
                                    )
636
0
                                    .await?;
637
0
                                Ok::<_, Error>(format!("Draining worker {worker_id}"))
638
                            })
639
0
                            .await
640
0
                            .map_err(|e| {
641
0
                                Err::<String, _>((
642
0
                                    axum::http::StatusCode::INTERNAL_SERVER_ERROR,
643
0
                                    format!("Error: {e:?}"),
644
0
                                ))
645
0
                            })
646
0
                        },
647
0
                    ),
648
0
                ),
649
0
            );
650
0
        }
651
652
        // Configure our TLS acceptor if we have TLS configured.
653
0
        let maybe_tls_acceptor = http_config.tls.map_or(Ok(None), |tls_config| {
654
0
            fn read_cert(cert_file: &str) -> Result<Vec<CertificateDer<'static>>, Error> {
655
0
                let mut cert_reader = std::io::BufReader::new(
656
0
                    std::fs::File::open(cert_file)
657
0
                        .err_tip(|| format!("Could not open cert file {cert_file}"))?,
658
                );
659
0
                let certs = extract_certs(&mut cert_reader)
660
0
                    .map(|certificate| certificate.map(CertificateDer::from))
661
0
                    .collect::<Result<Vec<CertificateDer<'_>>, _>>()
662
0
                    .err_tip(|| format!("Could not extract certs from file {cert_file}"))?;
663
0
                Ok(certs)
664
0
            }
665
0
            let certs = read_cert(&tls_config.cert_file)?;
666
0
            let mut key_reader = std::io::BufReader::new(
667
0
                std::fs::File::open(&tls_config.key_file)
668
0
                    .err_tip(|| format!("Could not open key file {}", tls_config.key_file))?,
669
            );
670
0
            let key = match rustls_pemfile::read_one(&mut key_reader)
671
0
                .err_tip(|| format!("Could not extract key(s) from file {}", tls_config.key_file))?
672
            {
673
0
                Some(rustls_pemfile::Item::Pkcs8Key(key)) => key.into(),
674
0
                Some(rustls_pemfile::Item::Sec1Key(key)) => key.into(),
675
0
                Some(rustls_pemfile::Item::Pkcs1Key(key)) => key.into(),
676
                _ => {
677
0
                    return Err(make_err!(
678
0
                        Code::Internal,
679
0
                        "No keys found in file {}",
680
0
                        tls_config.key_file
681
0
                    ))
682
                }
683
            };
684
0
            if let Ok(Some(_)) = rustls_pemfile::read_one(&mut key_reader) {
  Branch (684:20): [Folded - Ignored]
685
0
                return Err(make_err!(
686
0
                    Code::InvalidArgument,
687
0
                    "Expected 1 key in file {}",
688
0
                    tls_config.key_file
689
0
                ));
690
0
            }
691
0
            let verifier = if let Some(client_ca_file) = &tls_config.client_ca_file {
  Branch (691:35): [Folded - Ignored]
692
0
                let mut client_auth_roots = RootCertStore::empty();
693
0
                for cert in read_cert(client_ca_file)? {
694
0
                    client_auth_roots.add(cert).map_err(|e| {
695
0
                        make_err!(Code::Internal, "Could not read client CA: {e:?}")
696
0
                    })?;
697
                }
698
0
                let crls = if let Some(client_crl_file) = &tls_config.client_crl_file {
  Branch (698:35): [Folded - Ignored]
699
0
                    let mut crl_reader = std::io::BufReader::new(
700
0
                        std::fs::File::open(client_crl_file)
701
0
                            .err_tip(|| format!("Could not open CRL file {client_crl_file}"))?,
702
                    );
703
0
                    extract_crls(&mut crl_reader)
704
0
                        .map(|crl| crl.map(CertificateRevocationListDer::from))
705
0
                        .collect::<Result<_, _>>()
706
0
                        .err_tip(|| format!("Could not extract CRLs from file {client_crl_file}"))?
707
                } else {
708
0
                    Vec::new()
709
                };
710
0
                WebPkiClientVerifier::builder(Arc::new(client_auth_roots))
711
0
                    .with_crls(crls)
712
0
                    .build()
713
0
                    .map_err(|e| {
714
0
                        make_err!(
715
0
                            Code::Internal,
716
0
                            "Could not create WebPkiClientVerifier: {e:?}"
717
0
                        )
718
0
                    })?
719
            } else {
720
0
                WebPkiClientVerifier::no_client_auth()
721
            };
722
0
            let mut config = TlsServerConfig::builder()
723
0
                .with_client_cert_verifier(verifier)
724
0
                .with_single_cert(certs, key)
725
0
                .map_err(|e| {
726
0
                    make_err!(Code::Internal, "Could not create TlsServerConfig : {e:?}")
727
0
                })?;
728
729
0
            config.alpn_protocols.push("h2".into());
730
0
            Ok(Some(TlsAcceptor::from(Arc::new(config))))
731
0
        })?;
732
733
0
        let socket_addr = http_config.socket_address.parse::<SocketAddr>()?;
734
0
        let tcp_listener = TcpListener::bind(&socket_addr).await?;
735
0
        let mut http = auto::Builder::new(TaskExecutor::default());
736
0
737
0
        let http_config = &http_config.advanced_http;
738
0
        if let Some(value) = http_config.http2_keep_alive_interval {
  Branch (738:16): [Folded - Ignored]
739
0
            http.http2()
740
0
                .keep_alive_interval(Duration::from_secs(u64::from(value)));
741
0
        }
742
743
0
        if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams {
  Branch (743:16): [Folded - Ignored]
744
0
            http.http2()
745
0
                .max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| {
746
0
                    "Could not convert experimental_http2_max_pending_accept_reset_streams"
747
0
                })?);
748
0
        }
749
0
        if let Some(value) = http_config.experimental_http2_initial_stream_window_size {
  Branch (749:16): [Folded - Ignored]
750
0
            http.http2().initial_stream_window_size(value);
751
0
        }
752
0
        if let Some(value) = http_config.experimental_http2_initial_connection_window_size {
  Branch (752:16): [Folded - Ignored]
753
0
            http.http2().initial_connection_window_size(value);
754
0
        }
755
0
        if let Some(value) = http_config.experimental_http2_adaptive_window {
  Branch (755:16): [Folded - Ignored]
756
0
            http.http2().adaptive_window(value);
757
0
        }
758
0
        if let Some(value) = http_config.experimental_http2_max_frame_size {
  Branch (758:16): [Folded - Ignored]
759
0
            http.http2().max_frame_size(value);
760
0
        }
761
0
        if let Some(value) = http_config.experimental_http2_max_concurrent_streams {
  Branch (761:16): [Folded - Ignored]
762
0
            http.http2().max_concurrent_streams(value);
763
0
        }
764
0
        if let Some(value) = http_config.experimental_http2_keep_alive_timeout {
  Branch (764:16): [Folded - Ignored]
765
0
            http.http2()
766
0
                .keep_alive_timeout(Duration::from_secs(u64::from(value)));
767
0
        }
768
0
        if let Some(value) = http_config.experimental_http2_max_send_buf_size {
  Branch (768:16): [Folded - Ignored]
769
0
            http.http2().max_send_buf_size(
770
0
                usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?,
771
            );
772
0
        }
773
0
        if let Some(true) = http_config.experimental_http2_enable_connect_protocol {
  Branch (773:16): [Folded - Ignored]
774
0
            http.http2().enable_connect_protocol();
775
0
        }
776
0
        if let Some(value) = http_config.experimental_http2_max_header_list_size {
  Branch (776:16): [Folded - Ignored]
777
0
            http.http2().max_header_list_size(value);
778
0
        }
779
0
        event!(Level::WARN, "Ready, listening on {socket_addr}",);
780
0
        root_futures.push(Box::pin(async move {
781
            loop {
782
0
                select! {
783
0
                    accept_result = tcp_listener.accept() => {
784
0
                        match accept_result {
785
0
                            Ok((tcp_stream, remote_addr)) => {
786
0
                                event!(
787
                                    target: "nativelink::services",
788
0
                                    Level::INFO,
789
                                    ?remote_addr,
790
                                    ?socket_addr,
791
0
                                    "Client connected"
792
                                );
793
0
                                connected_clients_mux
794
0
                                    .inner
795
0
                                    .lock()
796
0
                                    .insert(SocketAddrWrapper(remote_addr));
797
0
                                connected_clients_mux.counter.inc();
798
0
799
0
                                // This is the safest way to guarantee that if our future
800
0
                                // is ever dropped we will cleanup our data.
801
0
                                let scope_guard = guard(
802
0
                                    Arc::downgrade(&connected_clients_mux),
803
0
                                    move |weak_connected_clients_mux| {
804
0
                                        event!(
805
                                            target: "nativelink::services",
806
0
                                            Level::INFO,
807
                                            ?remote_addr,
808
                                            ?socket_addr,
809
0
                                            "Client disconnected"
810
                                        );
811
0
                                        if let Some(connected_clients_mux) = weak_connected_clients_mux.upgrade() {
  Branch (811:48): [Folded - Ignored]
812
0
                                            connected_clients_mux
813
0
                                                .inner
814
0
                                                .lock()
815
0
                                                .remove(&SocketAddrWrapper(remote_addr));
816
0
                                        }
817
0
                                    },
818
0
                                );
819
0
820
0
                                let (http, svc, maybe_tls_acceptor) =
821
0
                                    (http.clone(), svc.clone(), maybe_tls_acceptor.clone());
822
0
                                Arc::new(OriginContext::new()).background_spawn(
823
0
                                    error_span!(
824
0
                                        target: "nativelink::services",
825
0
                                        "http_connection",
826
0
                                        ?remote_addr,
827
0
                                        ?socket_addr
828
0
                                    ),
829
0
                                    async move {},
830
0
                                );
831
0
                                background_spawn!(
832
0
                                    name: "http_connection",
833
0
                                    fut: async move {
834
0
                                        // Move it into our spawn, so if our spawn dies the cleanup happens.
835
0
                                        let _guard = scope_guard;
836
0
                                        let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor {
  Branch (836:71): [Folded - Ignored]
837
0
                                            match tls_acceptor.accept(tcp_stream).await {
838
0
                                                Ok(tls_stream) => Either::Left(http.serve_connection(
839
0
                                                    TokioIo::new(tls_stream),
840
0
                                                    TowerToHyperService::new(svc),
841
0
                                                )),
842
0
                                                Err(err) => {
843
0
                                                    event!(Level::ERROR, ?err, "Failed to accept tls stream");
844
0
                                                    return;
845
                                                }
846
                                            }
847
                                        } else {
848
0
                                            Either::Right(http.serve_connection(
849
0
                                                TokioIo::new(tcp_stream),
850
0
                                                TowerToHyperService::new(svc),
851
0
                                            ))
852
                                        };
853
854
0
                                        if let Err(err) = serve_connection.await {
  Branch (854:48): [Folded - Ignored]
855
0
                                            event!(
856
                                                target: "nativelink::services",
857
0
                                                Level::ERROR,
858
                                                ?err,
859
0
                                                "Failed running service"
860
                                            );
861
0
                                        }
862
0
                                    },
863
0
                                    target: "nativelink::services",
864
0
                                    ?remote_addr,
865
0
                                    ?socket_addr,
866
0
                                );
867
                            },
868
0
                            Err(err) => {
869
0
                                event!(Level::ERROR, ?err, "Failed to accept tcp connection");
870
0
                                continue;
871
                            }
872
                        }
873
                    },
874
                }
875
            }
876
            // Unreachable
877
0
        }));
878
0
    }
879
880
    {
881
        // We start workers after our TcpListener is setup so if our worker connects to one
882
        // of these services it will be able to connect.
883
0
        let worker_cfgs = cfg.workers.unwrap_or_default();
884
0
        let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
885
0
        let mut worker_metrics: HashMap<String, Arc<dyn RootMetricsComponent>> = HashMap::new();
886
0
        for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
887
0
            let spawn_fut = match worker_cfg {
888
0
                WorkerConfig::local(local_worker_cfg) => {
889
0
                    let fast_slow_store = store_manager
890
0
                        .get_store(&local_worker_cfg.cas_fast_slow_store)
891
0
                        .err_tip(|| {
892
0
                            format!(
893
0
                                "Failed to find store for cas_store_ref in worker config : {}",
894
0
                                local_worker_cfg.cas_fast_slow_store
895
0
                            )
896
0
                        })?;
897
898
0
                    let maybe_ac_store = if let Some(ac_store_ref) =
  Branch (898:49): [Folded - Ignored]
899
0
                        &local_worker_cfg.upload_action_result.ac_store
900
                    {
901
0
                        Some(store_manager.get_store(ac_store_ref).err_tip(|| {
902
0
                            format!("Failed to find store for ac_store in worker config : {ac_store_ref}")
903
0
                        })?)
904
                    } else {
905
0
                        None
906
                    };
907
                    // Note: Defaults to fast_slow_store if not specified. If this ever changes it must
908
                    // be updated in config documentation for the `historical_results_store` the field.
909
0
                    let historical_store = if let Some(cas_store_ref) = &local_worker_cfg
  Branch (909:51): [Folded - Ignored]
910
0
                        .upload_action_result
911
0
                        .historical_results_store
912
                    {
913
0
                        store_manager.get_store(cas_store_ref).err_tip(|| {
914
0
                                format!(
915
0
                                "Failed to find store for historical_results_store in worker config : {cas_store_ref}"
916
0
                            )
917
0
                            })?
918
                    } else {
919
0
                        fast_slow_store.clone()
920
                    };
921
0
                    let (local_worker, metrics) = new_local_worker(
922
0
                        Arc::new(local_worker_cfg),
923
0
                        fast_slow_store,
924
0
                        maybe_ac_store,
925
0
                        historical_store,
926
0
                    )
927
0
                    .await
928
0
                    .err_tip(|| "Could not make LocalWorker")?;
929
930
0
                    let name = if local_worker.name().is_empty() {
  Branch (930:35): [Folded - Ignored]
931
0
                        format!("worker_{i}")
932
                    } else {
933
0
                        local_worker.name().clone()
934
                    };
935
936
0
                    if worker_names.contains(&name) {
  Branch (936:24): [Folded - Ignored]
937
0
                        Err(Box::new(make_err!(
938
0
                            Code::InvalidArgument,
939
0
                            "Duplicate worker name '{}' found in config",
940
0
                            name
941
0
                        )))?;
942
0
                    }
943
0
                    worker_names.insert(name.clone());
944
0
                    worker_metrics.insert(name.clone(), metrics);
945
0
                    let shutdown_rx = shutdown_tx.subscribe();
946
0
                    let fut = Arc::new(OriginContext::new())
947
0
                        .wrap_async(trace_span!("worker_ctx"), local_worker.run(shutdown_rx));
948
0
                    spawn!("worker", fut, ?name)
949
                }
950
            };
951
0
            root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
952
0
        }
953
0
        root_metrics.write().workers = worker_metrics;
954
    }
955
956
0
    if let Err(e) = try_join_all(root_futures).await {
  Branch (956:12): [Folded - Ignored]
957
0
        panic!("{e:?}");
958
0
    };
959
0
960
0
    Ok(())
961
0
}
962
963
0
async fn get_config() -> Result<CasConfig, Box<dyn std::error::Error>> {
964
0
    let args = Args::parse();
965
0
    let json_contents = String::from_utf8(
966
0
        std::fs::read(&args.config_file)
967
0
            .err_tip(|| format!("Could not open config file {}", args.config_file))?,
968
0
    )?;
969
0
    Ok(serde_json5::from_str(&json_contents)?)
970
0
}
971
972
0
fn main() -> Result<(), Box<dyn std::error::Error>> {
973
0
    init_tracing()?;
974
975
0
    let mut cfg = futures::executor::block_on(get_config())?;
976
977
0
    let (mut metrics_enabled, max_blocking_threads) = {
978
0
        // Note: If the default changes make sure you update the documentation in
979
0
        // `config/cas_server.rs`.
980
0
        const DEFAULT_MAX_OPEN_FILES: usize = 512;
981
0
        // Note: If the default changes make sure you update the documentation in
982
0
        // `config/cas_server.rs`.
983
0
        const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
984
0
        let global_cfg = if let Some(global_cfg) = &mut cfg.global {
  Branch (984:33): [Folded - Ignored]
985
0
            if global_cfg.max_open_files == 0 {
  Branch (985:16): [Folded - Ignored]
986
0
                global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
987
0
            }
988
0
            if global_cfg.idle_file_descriptor_timeout_millis == 0 {
  Branch (988:16): [Folded - Ignored]
989
0
                global_cfg.idle_file_descriptor_timeout_millis =
990
0
                    DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
991
0
            }
992
0
            if global_cfg.default_digest_size_health_check == 0 {
  Branch (992:16): [Folded - Ignored]
993
0
                global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
994
0
            }
995
996
0
            *global_cfg
997
        } else {
998
0
            GlobalConfig {
999
0
                max_open_files: DEFAULT_MAX_OPEN_FILES,
1000
0
                idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
1001
0
                disable_metrics: cfg.servers.iter().all(|v| {
1002
0
                    let Some(service) = &v.services else {
  Branch (1002:25): [Folded - Ignored]
1003
0
                        return true;
1004
                    };
1005
0
                    service.experimental_prometheus.is_none()
1006
0
                }),
1007
0
                default_digest_hash_function: None,
1008
0
                default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
1009
0
            }
1010
        };
1011
0
        set_open_file_limit(global_cfg.max_open_files);
1012
0
        set_idle_file_descriptor_timeout(Duration::from_millis(
1013
0
            global_cfg.idle_file_descriptor_timeout_millis,
1014
0
        ))?;
1015
0
        set_default_digest_hasher_func(DigestHasherFunc::from(
1016
0
            global_cfg
1017
0
                .default_digest_hash_function
1018
0
                .unwrap_or(ConfigDigestHashFunction::sha256),
1019
0
        ))?;
1020
0
        set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
1021
        // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten
1022
0
        (!global_cfg.disable_metrics, global_cfg.max_open_files * 10)
1023
0
    };
1024
0
    // Override metrics enabled if the environment variable is set.
1025
0
    if std::env::var(METRICS_DISABLE_ENV).is_ok() {
  Branch (1025:8): [Folded - Ignored]
1026
0
        metrics_enabled = false;
1027
0
    }
1028
0
    let server_start_time = SystemTime::now()
1029
0
        .duration_since(UNIX_EPOCH)
1030
0
        .unwrap()
1031
0
        .as_secs();
1032
    #[allow(clippy::disallowed_methods)]
1033
    {
1034
0
        let runtime = tokio::runtime::Builder::new_multi_thread()
1035
0
            .max_blocking_threads(max_blocking_threads)
1036
0
            .enable_all()
1037
0
            .on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
1038
0
            .build()?;
1039
1040
        // Initiates the shutdown process by broadcasting the shutdown signal via the `oneshot::Sender` to all listeners.
1041
        // Each listener will perform its cleanup and then drop its `oneshot::Sender`, signaling completion.
1042
        // Once all `oneshot::Sender` instances are dropped, the worker knows it can safely terminate.
1043
0
        let (shutdown_tx, _) = broadcast::channel::<Arc<oneshot::Sender<()>>>(BROADCAST_CAPACITY);
1044
0
        let shutdown_tx_clone = shutdown_tx.clone();
1045
0
        let (complete_tx, complete_rx) = oneshot::channel::<()>();
1046
0
1047
0
        runtime.spawn(async move {
1048
0
            tokio::signal::ctrl_c()
1049
0
                .await
1050
0
                .expect("Failed to listen to SIGINT");
1051
0
            eprintln!("User terminated process via SIGINT");
1052
0
            std::process::exit(130);
1053
0
        });
1054
0
1055
0
        #[cfg(target_family = "unix")]
1056
0
        {
1057
0
            let complete_tx = Arc::new(complete_tx);
1058
0
            runtime.spawn(async move {
1059
0
                signal(SignalKind::terminate())
1060
0
                    .expect("Failed to listen to SIGTERM")
1061
0
                    .recv()
1062
0
                    .await;
1063
0
                event!(Level::WARN, "Process terminated via SIGTERM",);
1064
0
                let _ = shutdown_tx_clone.send(complete_tx);
1065
0
                let _ = complete_rx.await;
1066
0
                event!(Level::WARN, "Successfully shut down nativelink.",);
1067
0
                std::process::exit(143);
1068
0
            });
1069
0
        }
1070
0
1071
0
        let _ = runtime.block_on(Arc::new(OriginContext::new()).wrap_async(
1072
0
            trace_span!("main"),
1073
0
            inner_main(cfg, server_start_time, shutdown_tx),
1074
0
        ));
1075
0
    }
1076
0
    Ok(())
1077
0
}