Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/compression_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::serde::{decode_from_slice, encode_to_vec};
21
use byteorder::{ByteOrder, LittleEndian};
22
use bytes::{Buf, BufMut, BytesMut};
23
use futures::future::FutureExt;
24
use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size};
25
use nativelink_config::stores::CompressionSpec;
26
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
27
use nativelink_metric::MetricsComponent;
28
use nativelink_util::buf_channel::{
29
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
30
};
31
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
32
use nativelink_util::spawn;
33
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
34
use serde::{Deserialize, Serialize};
35
36
use crate::cas_utils::is_zero_digest;
37
38
// In the event the bytestream format changes this number should be incremented to prevent
39
// backwards compatibility issues.
40
pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
41
42
// Default block size that will be used to slice stream into.
43
pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;
44
45
const U32_SZ: u64 = size_of::<u8>() as u64;
46
47
// We use a custom frame format here because I wanted the ability in the future to:
48
// * Read a random part of the data without needing to parse entire file.
49
// * Compress the data on the fly without needing to know the exact input size.
50
//
51
// The frame format that LZ4 uses does not contain an index of where the different
52
// blocks are located. This would mean in the event we only wanted the last byte of
53
// a file, we'd need to seek to the header of each block to find where the next block
54
// offset is until we got to the last block then decompress it.
55
//
56
// By using this custom frame format we keep the ability to have each block reference
57
// the next block (for efficiency), but also after all blocks we have a footer frame
58
// which contains an index of each block in the stream. This means that we can read a
59
// fixed number of bytes of the file, which always contain the size of the index, then
60
// with that size we know the exact size of the footer frame, which contains the entire
61
// index. Once this footer frame is loaded, we could then do some math to figure out
62
// which block each byte is in and the offset of each block within the compressed file.
63
//
64
// The frame format is as follows:
65
// |----------------------------------HEADER-----------------------------------------|
66
// |  version(u8) |  block_size (u32) |  upload_size_type (u32) |  upload_size (u32) |
67
// |----------------------------------BLOCK------------------------------------------|
68
// |  frame_type(u8) 0x00 |  compressed_data_size (u32) |        ...DATA...          |
69
// |                                ...DATA...                                       |
70
// | [Possibly repeat block]                                                         |
71
// |----------------------------------FOOTER-----------------------------------------|
72
// |  frame_type(u8) 0x01 |    footer_size (u32) |           index_count1 (u64)      |
73
// |      ...[pos_from_prev_index (u32) - repeat for count {index_count*}]...        |
74
// |  index_count2 (u32) |    uncompressed_data_sz (u64) |    block_size (u32)       |
75
// | version (u8) |------------------------------------------------------------------|
76
// |---------------------------------------------------------------------------------|
77
//
78
// version              - A constant number used to define what version of this format is being
79
//                        used. Version in header and footer must match.
80
// block_size           - Size of each block uncompressed except for last block. This means that
81
//                        every block uncompressed will be a constant size except last block may
82
//                        be variable size. Block size in header and footer must match.
83
// upload_size_type     - Value of 0 = UploadSizeInfo::ExactSize, 1 = UploadSizeInfo::MaxSize.
84
//                        This is for debug reasons only.
85
// upload_size          - The size of the data. WARNING: Do not rely on this being the uncompressed
86
//                        payload size. It is a debug field and a "best guess" on how large the data
87
//                        is. The header does not contain the upload data size. This value is the
88
//                        value counter part to what the `upload_size_type` field.
89
// frame_type           - Type of each frame. 0 = BLOCK frame, 1 = FOOTER frame. Header frame will
90
//                        always start with the first byte of the stream, so no magic number for it.
91
// compressed_data_size - The size of this block. The bytes after this field should be read
92
//                        in sequence to get all of the block's data in this block.
93
// footer_size          - Size of the footer for bytes after this field.
94
// index_count1         - Number of items in the index. ({index_count1} * 4) represents the number
95
//                        of bytes that should be read after this field in order to get all index
96
//                        data.
97
// pos_from_prev_index  - Index of an individual block in the stream relative to the previous
98
//                        block. This field may repeat {index_count} times.
99
// index_count2         - Same as {index_count1} and should always be equal. This field might be
100
//                        useful if you want to read a random byte from this stream, have random
101
//                        access to it and know the size of the stream during reading, because
102
//                        this field is always in the exact same place in the stream relative to
103
//                        the last byte.
104
// uncompressed_data_sz - Size of the original uncompressed data.
105
//
106
// Note: All fields fields little-endian.
107
108
/// Number representing a chunk.
109
pub const CHUNK_FRAME_TYPE: u8 = 0;
110
111
/// Number representing the footer.
112
pub const FOOTER_FRAME_TYPE: u8 = 1;
113
114
/// This is a partial mirror of `nativelink_config::stores::Lz4Config`.
115
/// We cannot use that natively here because it could cause our
116
/// serialized format to change if we added more configs.
117
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Copy, Clone)]
118
pub struct Lz4Config {
119
    pub block_size: u32,
120
}
121
122
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]
123
pub struct Header {
124
    pub version: u8,
125
    pub config: Lz4Config,
126
    pub upload_size: UploadSizeInfo,
127
}
128
129
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Clone, Copy)]
130
pub struct SliceIndex {
131
    pub position_from_prev_index: u32,
132
}
133
134
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
135
pub struct Footer {
136
    pub indexes: Vec<SliceIndex>,
137
    pub index_count: u32,
138
    pub uncompressed_data_size: u64,
139
    pub config: Lz4Config,
140
    pub version: u8,
141
}
142
143
/// `lz4_flex::block::get_maximum_output_size()` way over estimates, so we use the
144
/// one provided here: <https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61>
145
/// Local testing shows this gives quite accurate worst case given random input.
146
6
const fn lz4_compress_bound(input_size: u64) -> u64 {
147
6
    input_size + (input_size / 255) + 16
148
6
}
149
150
1.24k
fn serialized_size<C>(value: &impl Serialize, config: C) -> Result<u64, Error>
151
1.24k
where
152
1.24k
    C: bincode::config::Config,
153
{
154
1.24k
    let mut size_writer = bincode::enc::write::SizeWriter { bytes_written: 0 };
155
1.24k
    bincode::serde::encode_into_writer(value, &mut size_writer, config).map_err(|e| 
{0
156
0
        make_err!(
157
0
            Code::Internal,
158
            "Failed to calculate serialized size: {:?}",
159
            e
160
        )
161
0
    })?;
162
1.24k
    Ok(size_writer.bytes_written as u64)
163
1.24k
}
164
165
struct UploadState {
166
    header: Header,
167
    footer: Footer,
168
    max_output_size: u64,
169
    input_max_size: u64,
170
}
171
172
impl UploadState {
173
6
    pub(crate) fn new(
174
6
        store: &CompressionStore,
175
6
        upload_size: UploadSizeInfo,
176
6
    ) -> Result<Self, Error> {
177
6
        let input_max_size = match upload_size {
178
6
            UploadSizeInfo::MaxSize(
sz1
) | UploadSizeInfo::ExactSize(
sz5
) => sz,
179
        };
180
181
6
        let max_index_count = (input_max_size / u64::from(store.config.block_size)) + 1;
182
183
6
        let header = Header {
184
6
            version: CURRENT_STREAM_FORMAT_VERSION,
185
6
            config: Lz4Config {
186
6
                block_size: store.config.block_size,
187
6
            },
188
6
            upload_size,
189
6
        };
190
6
        let footer = Footer {
191
6
            indexes: vec![
192
6
                SliceIndex::default();
193
6
                usize::try_from(max_index_count)
194
6
                    .err_tip(|| "Could not convert max_index_count to usize")
?0
195
            ],
196
6
            index_count: max_index_count as u32,
197
            uncompressed_data_size: 0, // Updated later.
198
6
            config: header.config,
199
            version: CURRENT_STREAM_FORMAT_VERSION,
200
        };
201
202
        // This is more accurate of an estimate than what get_maximum_output_size calculates.
203
6
        let max_block_size = lz4_compress_bound(u64::from(store.config.block_size)) + U32_SZ + 1;
204
205
6
        let max_output_size = {
206
6
            let header_size = serialized_size(&header, store.bincode_config)
?0
;
207
6
            let max_content_size = max_block_size * max_index_count;
208
6
            let max_footer_size = U32_SZ + 1 + serialized_size(&footer, store.bincode_config)
?0
;
209
6
            header_size + max_content_size + max_footer_size
210
        };
211
212
6
        Ok(Self {
213
6
            header,
214
6
            footer,
215
6
            max_output_size,
216
6
            input_max_size,
217
6
        })
218
6
    }
219
}
220
221
// TODO(jaroeichler): Use the default `standard` config.
222
type LegacyBincodeConfig = bincode::config::Configuration<
223
    bincode::config::LittleEndian,
224
    bincode::config::Fixint,
225
    bincode::config::NoLimit,
226
>;
227
228
/// This store will compress data before sending it on to the inner store.
229
/// Note: Currently using `get_part()` and trying to read part of the data will
230
/// result in the entire contents being read from the inner store but will
231
/// only send the contents requested.
232
#[derive(MetricsComponent)]
233
pub struct CompressionStore {
234
    #[metric(group = "inner_store")]
235
    inner_store: Store,
236
    config: nativelink_config::stores::Lz4Config,
237
    bincode_config: LegacyBincodeConfig,
238
}
239
240
impl core::fmt::Debug for CompressionStore {
241
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
242
0
        f.debug_struct("CompressionStore")
243
0
            .field("inner_store", &self.inner_store)
244
0
            .field("config", &self.config)
245
0
            .finish_non_exhaustive()
246
0
    }
247
}
248
249
impl CompressionStore {
250
7
    pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result<Arc<Self>, Error> {
251
7
        let lz4_config = match spec.compression_algorithm {
252
7
            nativelink_config::stores::CompressionAlgorithm::Lz4(mut lz4_config) => {
253
7
                if lz4_config.block_size == 0 {
  Branch (253:20): [True: 3, False: 4]
  Branch (253:20): [Folded - Ignored]
254
3
                    lz4_config.block_size = DEFAULT_BLOCK_SIZE;
255
4
                }
256
7
                if lz4_config.max_decode_block_size == 0 {
  Branch (256:20): [True: 7, False: 0]
  Branch (256:20): [Folded - Ignored]
257
7
                    lz4_config.max_decode_block_size = lz4_config.block_size;
258
7
                
}0
259
7
                lz4_config
260
            }
261
        };
262
7
        Ok(Arc::new(Self {
263
7
            inner_store,
264
7
            config: lz4_config,
265
7
            bincode_config: bincode::config::legacy(),
266
7
        }))
267
7
    }
268
}
269
270
#[async_trait]
271
impl StoreDriver for CompressionStore {
272
    async fn has_with_results(
273
        self: Pin<&Self>,
274
        digests: &[StoreKey<'_>],
275
        results: &mut [Option<u64>],
276
0
    ) -> Result<(), Error> {
277
0
        self.inner_store.has_with_results(digests, results).await
278
0
    }
279
280
    async fn update(
281
        self: Pin<&Self>,
282
        key: StoreKey<'_>,
283
        mut reader: DropCloserReadHalf,
284
        upload_size: UploadSizeInfo,
285
12
    ) -> Result<(), Error> {
286
6
        let mut output_state = UploadState::new(&self, upload_size)
?0
;
287
288
6
        let (mut tx, rx) = make_buf_channel_pair();
289
290
6
        let inner_store = self.inner_store.clone();
291
6
        let key = key.into_owned();
292
6
        let update_fut = spawn!("compression_store_update_spawn", async move {
293
6
            inner_store
294
6
                .update(
295
6
                    key,
296
6
                    rx,
297
6
                    UploadSizeInfo::MaxSize(output_state.max_output_size),
298
6
                )
299
6
                .await
300
6
                .err_tip(|| "Inner store update in compression store failed")
301
6
        })
302
6
        .map(
303
6
            |result| match result.err_tip(|| "Failed to run compression update spawn") {
304
6
                Ok(inner_result) => {
305
6
                    inner_result.err_tip(|| "Compression underlying store update failed")
306
                }
307
0
                Err(e) => Err(e),
308
6
            },
309
        );
310
311
6
        let write_fut = async move {
312
            {
313
                // Write Header.
314
6
                let serialized_header = encode_to_vec(output_state.header, self.bincode_config)
315
6
                    .map_err(|e| 
{0
316
0
                        make_err!(Code::Internal, "Failed to serialize header : {:?}", e)
317
0
                    })?;
318
6
                tx.send(serialized_header.into())
319
6
                    .await
320
6
                    .err_tip(|| "Failed to write compression header on upload")
?0
;
321
            }
322
323
6
            let mut received_amt = 0;
324
6
            let mut index_count: u32 = 0;
325
99
            for 
index98
in &mut output_state.footer.indexes {
326
98
                let chunk = reader
327
98
                    .consume(Some(self.config.block_size as usize))
328
98
                    .await
329
98
                    .err_tip(|| "Failed to read take in update in compression store")
?0
;
330
98
                if chunk.is_empty() {
  Branch (330:20): [True: 5, False: 93]
  Branch (330:20): [Folded - Ignored]
331
5
                    break; // EOF.
332
93
                }
333
334
93
                received_amt += u64::try_from(chunk.len())
335
93
                    .err_tip(|| "Could not convert chunk.len() to u64")
?0
;
336
0
                error_if!(
337
93
                    received_amt > output_state.input_max_size,
  Branch (337:21): [True: 0, False: 93]
  Branch (337:21): [Folded - Ignored]
338
                    "Got more data than stated in compression store upload request"
339
                );
340
341
93
                let max_output_size = get_maximum_output_size(self.config.block_size as usize);
342
93
                let mut compressed_data_buf = BytesMut::with_capacity(max_output_size);
343
93
                compressed_data_buf.put_u8(CHUNK_FRAME_TYPE);
344
93
                compressed_data_buf.put_u32_le(0); // Filled later.
345
346
                // For efficiency reasons we do some raw slice manipulation so we can write directly
347
                // into our buffer instead of having to do another allocation.
348
93
                let raw_compressed_data = unsafe {
349
93
                    core::slice::from_raw_parts_mut(
350
93
                        compressed_data_buf.chunk_mut().as_mut_ptr(),
351
93
                        max_output_size,
352
93
                    )
353
                };
354
355
93
                let compressed_data_sz = compress_into(&chunk, raw_compressed_data)
356
93
                    .map_err(|e| make_err!(
Code::Internal0
, "Compression error {:?}", e))
?0
;
357
93
                unsafe {
358
93
                    compressed_data_buf.advance_mut(compressed_data_sz);
359
93
                }
360
361
                // Now fill the size in our slice.
362
93
                LittleEndian::write_u32(&mut compressed_data_buf[1..5], compressed_data_sz as u32);
363
364
                // Now send our chunk.
365
93
                tx.send(compressed_data_buf.freeze())
366
93
                    .await
367
93
                    .err_tip(|| "Failed to write chunk to inner store in compression store")
?0
;
368
369
93
                index.position_from_prev_index = compressed_data_sz as u32;
370
371
93
                index_count += 1;
372
            }
373
            // Index 0 is actually a pointer to the second chunk. This is because we don't need
374
            // an index for the first item, since it starts at position `{header_len}`.
375
            // The code above causes us to create 1 more index than we actually need, so we
376
            // remove the last index from our vector here, because at this point we are always
377
            // one index too many.
378
            // Note: We need to be careful that if we don't have any data (zero bytes) it
379
            // doesn't go to -1.
380
6
            index_count = index_count.saturating_sub(1);
381
6
            output_state
382
6
                .footer
383
6
                .indexes
384
6
                .resize(index_count as usize, SliceIndex::default());
385
6
            output_state.footer.index_count = output_state.footer.indexes.len() as u32;
386
6
            output_state.footer.uncompressed_data_size = received_amt;
387
            {
388
                // Write Footer.
389
6
                let serialized_footer = encode_to_vec(output_state.footer, self.bincode_config)
390
6
                    .map_err(|e| 
{0
391
0
                        make_err!(Code::Internal, "Failed to serialize header : {:?}", e)
392
0
                    })?;
393
394
6
                let mut footer = BytesMut::with_capacity(1 + 4 + serialized_footer.len());
395
6
                footer.put_u8(FOOTER_FRAME_TYPE);
396
6
                footer.put_u32_le(serialized_footer.len() as u32);
397
6
                footer.extend_from_slice(&serialized_footer);
398
399
6
                tx.send(footer.freeze())
400
6
                    .await
401
6
                    .err_tip(|| "Failed to write footer to inner store in compression store")
?0
;
402
6
                tx.send_eof()
403
6
                    .err_tip(|| "Failed writing EOF in compression store update")
?0
;
404
            }
405
406
6
            Result::<(), Error>::Ok(())
407
6
        };
408
6
        let (write_result, update_result) = tokio::join!(write_fut, update_fut);
409
6
        write_result.merge(update_result)
410
12
    }
411
412
    async fn get_part(
413
        self: Pin<&Self>,
414
        key: StoreKey<'_>,
415
        writer: &mut DropCloserWriteHalf,
416
        offset: u64,
417
        length: Option<u64>,
418
2.45k
    ) -> Result<(), Error> {
419
1.22k
        if is_zero_digest(key.borrow()) {
  Branch (419:12): [True: 1, False: 1.22k]
  Branch (419:12): [Folded - Ignored]
420
1
            writer
421
1
                .send_eof()
422
1
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")
?0
;
423
1
            return Ok(());
424
1.22k
        }
425
426
1.22k
        let (tx, mut rx) = make_buf_channel_pair();
427
428
1.22k
        let inner_store = self.inner_store.clone();
429
1.22k
        let key = key.into_owned();
430
1.22k
        let get_part_fut = spawn!("compression_store_get_part_spawn", async move {
431
1.22k
            inner_store
432
1.22k
                .get_part(key, tx, 0, None)
433
1.22k
                .await
434
1.22k
                .err_tip(|| "Inner store get in compression store failed")
435
1.22k
        })
436
1.22k
        .map(
437
1.22k
            |result| match result.err_tip(|| "Failed to run compression get spawn") {
438
1.22k
                Ok(inner_result) => {
439
1.22k
                    inner_result.err_tip(|| "Compression underlying store get failed")
440
                }
441
0
                Err(e) => Err(e),
442
1.22k
            },
443
        );
444
1.22k
        let read_fut = async move {
445
1.22k
            let header = {
446
                // Read header.
447
                const EMPTY_HEADER: Header = Header {
448
                    version: CURRENT_STREAM_FORMAT_VERSION,
449
                    config: Lz4Config { block_size: 0 },
450
                    upload_size: UploadSizeInfo::ExactSize(0),
451
                };
452
1.22k
                let header_size = serialized_size(&EMPTY_HEADER, self.bincode_config)
?0
;
453
1.22k
                let chunk = rx
454
1.22k
                    .consume(Some(header_size as usize))
455
1.22k
                    .await
456
1.22k
                    .err_tip(|| "Failed to read header in get_part compression store")
?0
;
457
0
                error_if!(
458
1.22k
                    chunk.len() as u64 != header_size,
  Branch (458:21): [True: 0, False: 1.22k]
  Branch (458:21): [Folded - Ignored]
459
                    "Expected inner store to return the proper amount of data in compression store {} != {}",
460
0
                    chunk.len(),
461
                    header_size,
462
                );
463
464
1.22k
                let (header, _) = decode_from_slice::<Header, _>(&chunk, self.bincode_config)
465
1.22k
                    .map_err(|e| 
{0
466
0
                        make_err!(Code::Internal, "Failed to deserialize header : {:?}", e)
467
0
                    })?;
468
1.22k
                header
469
            };
470
471
0
            error_if!(
472
1.22k
                header.version != CURRENT_STREAM_FORMAT_VERSION,
  Branch (472:17): [True: 0, False: 1.22k]
  Branch (472:17): [Folded - Ignored]
473
                "Expected header version to match in get compression, got {}, want {}",
474
                header.version,
475
                CURRENT_STREAM_FORMAT_VERSION
476
            );
477
0
            error_if!(
478
1.22k
                header.config.block_size > self.config.max_decode_block_size,
  Branch (478:17): [True: 0, False: 1.22k]
  Branch (478:17): [Folded - Ignored]
479
                "Block size is too large in compression, got {} > {}",
480
                header.config.block_size,
481
0
                self.config.max_decode_block_size
482
            );
483
484
1.22k
            let mut chunk = rx
485
1.22k
                .consume(Some(1 + 4))
486
1.22k
                .await
487
1.22k
                .err_tip(|| "Failed to read init frame info in compression store")
?0
;
488
0
            error_if!(
489
1.22k
                chunk.len() < 1 + 4,
  Branch (489:17): [True: 0, False: 1.22k]
  Branch (489:17): [Folded - Ignored]
490
                "Received EOF too early while reading init frame info in compression store"
491
            );
492
493
1.22k
            let mut frame_type = chunk.get_u8();
494
1.22k
            let mut frame_sz = chunk.get_u32_le();
495
496
1.22k
            let mut uncompressed_data_sz: u64 = 0;
497
1.22k
            let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX);
498
1.22k
            let mut chunks_count: u32 = 0;
499
4.98k
            while frame_type != FOOTER_FRAME_TYPE {
  Branch (499:19): [True: 3.75k, False: 1.22k]
  Branch (499:19): [Folded - Ignored]
500
0
                error_if!(
501
3.75k
                    frame_type != CHUNK_FRAME_TYPE,
  Branch (501:21): [True: 0, False: 3.75k]
  Branch (501:21): [Folded - Ignored]
502
                    "Expected frame to be BODY in compression store, got {} at {}",
503
                    frame_type,
504
                    chunks_count
505
                );
506
507
3.75k
                let chunk = rx
508
3.75k
                    .consume(Some(frame_sz as usize))
509
3.75k
                    .await
510
3.75k
                    .err_tip(|| "Failed to read chunk in get_part compression store")
?0
;
511
3.75k
                if chunk.len() < frame_sz as usize {
  Branch (511:20): [True: 0, False: 3.75k]
  Branch (511:20): [Folded - Ignored]
512
0
                    return Err(make_err!(
513
0
                        Code::Internal,
514
0
                        "Got EOF earlier than expected. Maybe the data is not compressed or different format?"
515
0
                    ));
516
3.75k
                }
517
                {
518
3.75k
                    let max_output_size =
519
3.75k
                        get_maximum_output_size(header.config.block_size as usize);
520
3.75k
                    let mut uncompressed_data = BytesMut::with_capacity(max_output_size);
521
522
                    // For efficiency reasons we do some raw slice manipulation so we can write directly
523
                    // into our buffer instead of having to do another allocation.
524
3.75k
                    let raw_decompressed_data = unsafe {
525
3.75k
                        core::slice::from_raw_parts_mut(
526
3.75k
                            uncompressed_data.chunk_mut().as_mut_ptr(),
527
3.75k
                            max_output_size,
528
3.75k
                        )
529
                    };
530
531
3.75k
                    let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data)
532
3.75k
                        .map_err(|e| make_err!(
Code::Internal0
, "Decompression error {:?}", e))
?0
;
533
3.75k
                    unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) };
534
3.75k
                    let new_uncompressed_data_sz =
535
3.75k
                        uncompressed_data_sz + uncompressed_chunk_sz as u64;
536
3.75k
                    if new_uncompressed_data_sz >= offset && 
remaining_bytes_to_send > 02.28k
{
  Branch (536:24): [True: 2.28k, False: 1.47k]
  Branch (536:62): [True: 1.95k, False: 328]
  Branch (536:24): [Folded - Ignored]
  Branch (536:62): [Folded - Ignored]
537
1.95k
                        let start_pos = offset.saturating_sub(uncompressed_data_sz) as usize;
538
1.95k
                        let end_pos = cmp::min(
539
1.95k
                            start_pos + remaining_bytes_to_send as usize,
540
1.95k
                            uncompressed_chunk_sz,
541
                        );
542
1.95k
                        if end_pos != start_pos {
  Branch (542:28): [True: 1.85k, False: 102]
  Branch (542:28): [Folded - Ignored]
543
                            // Make sure we don't send an EOF by accident.
544
1.85k
                            writer
545
1.85k
                                .send(uncompressed_data.freeze().slice(start_pos..end_pos))
546
1.85k
                                .await
547
1.85k
                                .err_tip(|| "Failed sending chunk in compression store")
?0
;
548
102
                        }
549
1.95k
                        remaining_bytes_to_send -= (end_pos - start_pos) as u64;
550
1.79k
                    }
551
3.75k
                    uncompressed_data_sz = new_uncompressed_data_sz;
552
                }
553
3.75k
                chunks_count += 1;
554
555
3.75k
                let mut chunk = rx
556
3.75k
                    .consume(Some(1 + 4))
557
3.75k
                    .await
558
3.75k
                    .err_tip(|| "Failed to read frame info in compression store")
?0
;
559
0
                error_if!(
560
3.75k
                    chunk.len() < 1 + 4,
  Branch (560:21): [True: 0, False: 3.75k]
  Branch (560:21): [Folded - Ignored]
561
                    "Received EOF too early while reading frame info in compression store"
562
                );
563
564
3.75k
                frame_type = chunk.get_u8();
565
3.75k
                frame_sz = chunk.get_u32_le();
566
            }
567
            // Index count will always be +1 (unless it is zero bytes long).
568
1.22k
            chunks_count = chunks_count.saturating_sub(1);
569
            {
570
                // Read and validate footer.
571
1.22k
                let chunk = rx
572
1.22k
                    .consume(Some(frame_sz as usize))
573
1.22k
                    .await
574
1.22k
                    .err_tip(|| "Failed to read chunk in get_part compression store")
?0
;
575
0
                error_if!(
576
1.22k
                    chunk.len() < frame_sz as usize,
  Branch (576:21): [True: 0, False: 1.22k]
  Branch (576:21): [Folded - Ignored]
577
                    "Unexpected EOF when reading footer in compression store get_part"
578
                );
579
580
1.22k
                let (footer, _) = decode_from_slice::<Footer, _>(&chunk, self.bincode_config)
581
1.22k
                    .map_err(|e| 
{0
582
0
                        make_err!(Code::Internal, "Failed to deserialize footer : {:?}", e)
583
0
                    })?;
584
585
0
                error_if!(
586
1.22k
                    header.version != footer.version,
  Branch (586:21): [True: 0, False: 1.22k]
  Branch (586:21): [Folded - Ignored]
587
                    "Expected header and footer versions to match compression store get_part, {} != {}",
588
                    header.version,
589
                    footer.version
590
                );
591
0
                error_if!(
592
1.22k
                    footer.indexes.len() != footer.index_count as usize,
  Branch (592:21): [True: 0, False: 1.22k]
  Branch (592:21): [Folded - Ignored]
593
                    "Expected index counts to match in compression store footer in get_part, {} != {}",
594
0
                    footer.indexes.len(),
595
                    footer.index_count
596
                );
597
0
                error_if!(
598
1.22k
                    footer.index_count != chunks_count,
  Branch (598:21): [True: 0, False: 1.22k]
  Branch (598:21): [Folded - Ignored]
599
                    concat!(
600
                        "Expected index counts to match received chunks count ",
601
                        "in compression store footer in get_part, {} != {}"
602
                    ),
603
                    footer.index_count,
604
                    chunks_count
605
                );
606
0
                error_if!(
607
1.22k
                    footer.uncompressed_data_size != uncompressed_data_sz,
  Branch (607:21): [True: 0, False: 1.22k]
  Branch (607:21): [Folded - Ignored]
608
                    "Expected uncompressed data sizes to match in compression store footer in get_part, {} != {}",
609
                    footer.uncompressed_data_size,
610
                    uncompressed_data_sz
611
                );
612
            }
613
614
1.22k
            writer
615
1.22k
                .send_eof()
616
1.22k
                .err_tip(|| "Failed to send eof in compression store write")
?0
;
617
1.22k
            Ok(())
618
1.22k
        };
619
620
1.22k
        let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut);
621
1.22k
        if let Err(
mut e0
) = read_result {
  Branch (621:16): [True: 0, False: 1.22k]
  Branch (621:16): [Folded - Ignored]
622
            // We may need to propagate the error from reading the data through first.
623
0
            if let Err(err) = get_part_fut_result {
  Branch (623:20): [True: 0, False: 0]
  Branch (623:20): [Folded - Ignored]
624
0
                e = err.merge(e);
625
0
            }
626
0
            return Err(e);
627
1.22k
        }
628
1.22k
        Ok(())
629
2.45k
    }
630
631
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
632
0
        self
633
0
    }
634
635
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) {
636
0
        self
637
0
    }
638
639
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
640
0
        self
641
0
    }
642
}
643
644
default_health_status_indicator!(CompressionStore);