Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-store/src/compression_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::config::{FixintEncoding, WithOtherIntEncoding};
21
use bincode::{DefaultOptions, Options};
22
use byteorder::{ByteOrder, LittleEndian};
23
use bytes::{Buf, BufMut, BytesMut};
24
use futures::future::FutureExt;
25
use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size};
26
use nativelink_config::stores::CompressionSpec;
27
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
28
use nativelink_metric::MetricsComponent;
29
use nativelink_util::buf_channel::{
30
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
31
};
32
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
33
use nativelink_util::spawn;
34
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
35
use serde::{Deserialize, Serialize};
36
37
use crate::cas_utils::is_zero_digest;
38
39
// In the event the bytestream format changes this number should be incremented to prevent
40
// backwards compatibility issues.
41
pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
42
43
// Default block size that will be used to slice stream into.
44
pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;
45
46
const U32_SZ: u64 = std::mem::size_of::<u8>() as u64;
47
48
type BincodeOptions = WithOtherIntEncoding<DefaultOptions, FixintEncoding>;
49
50
// We use a custom frame format here because I wanted the ability in the future to:
51
// * Read a random part of the data without needing to parse entire file.
52
// * Compress the data on the fly without needing to know the exact input size.
53
//
54
// The frame format that LZ4 uses does not contain an index of where the different
55
// blocks are located. This would mean in the event we only wanted the last byte of
56
// a file, we'd need to seek to the header of each block to find where the next block
57
// offset is until we got to the last block then decompress it.
58
//
59
// By using this custom frame format we keep the ability to have each block reference
60
// the next block (for efficiency), but also after all blocks we have a footer frame
61
// which contains an index of each block in the stream. This means that we can read a
62
// fixed number of bytes of the file, which always contain the size of the index, then
63
// with that size we know the exact size of the footer frame, which contains the entire
64
// index. Once this footer frame is loaded, we could then do some math to figure out
65
// which block each byte is in and the offset of each block within the compressed file.
66
//
67
// The frame format is as follows:
68
// |----------------------------------HEADER-----------------------------------------|
69
// |  version(u8) |  block_size (u32) |  upload_size_type (u32) |  upload_size (u32) |
70
// |----------------------------------BLOCK------------------------------------------|
71
// |  frame_type(u8) 0x00 |  compressed_data_size (u32) |        ...DATA...          |
72
// |                                ...DATA...                                       |
73
// | [Possibly repeat block]                                                         |
74
// |----------------------------------FOOTER-----------------------------------------|
75
// |  frame_type(u8) 0x01 |    footer_size (u32) |           index_count1 (u64)      |
76
// |      ...[pos_from_prev_index (u32) - repeat for count {index_count*}]...        |
77
// |  index_count2 (u32) |    uncompressed_data_sz (u64) |    block_size (u32)       |
78
// | version (u8) |------------------------------------------------------------------|
79
// |---------------------------------------------------------------------------------|
80
//
81
// version              - A constant number used to define what version of this format is being
82
//                        used. Version in header and footer must match.
83
// block_size           - Size of each block uncompressed except for last block. This means that
84
//                        every block uncompressed will be a constant size except last block may
85
//                        be variable size. Block size in header and footer must match.
86
// upload_size_type     - Value of 0 = UploadSizeInfo::ExactSize, 1 = UploadSizeInfo::MaxSize.
87
//                        This is for debug reasons only.
88
// upload_size          - The size of the data. WARNING: Do not rely on this being the uncompressed
89
//                        payload size. It is a debug field and a "best guess" on how large the data
90
//                        is. The header does not contain the upload data size. This value is the
91
//                        value counter part to what the `upload_size_type` field.
92
// frame_type           - Type of each frame. 0 = BLOCK frame, 1 = FOOTER frame. Header frame will
93
//                        always start with the first byte of the stream, so no magic number for it.
94
// compressed_data_size - The size of this block. The bytes after this field should be read
95
//                        in sequence to get all of the block's data in this block.
96
// footer_size          - Size of the footer for bytes after this field.
97
// index_count1         - Number of items in the index. ({index_count1} * 4) represents the number
98
//                        of bytes that should be read after this field in order to get all index
99
//                        data.
100
// pos_from_prev_index  - Index of an individual block in the stream relative to the previous
101
//                        block. This field may repeat {index_count} times.
102
// index_count2         - Same as {index_count1} and should always be equal. This field might be
103
//                        useful if you want to read a random byte from this stream, have random
104
//                        access to it and know the size of the stream during reading, because
105
//                        this field is always in the exact same place in the stream relative to
106
//                        the last byte.
107
// uncompressed_data_sz - Size of the original uncompressed data.
108
//
109
// Note: All fields fields little-endian.
110
111
/// Number representing a chunk.
112
pub const CHUNK_FRAME_TYPE: u8 = 0;
113
114
/// Number representing the footer.
115
pub const FOOTER_FRAME_TYPE: u8 = 1;
116
117
/// This is a partial mirror of nativelink_config::stores::Lz4Config.
118
/// We cannot use that natively here because it could cause our
119
/// serialized format to change if we added more configs.
120
2.45k
#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Copy, Clone)]
121
pub struct Lz4Config {
122
    pub block_size: u32,
123
}
124
125
1.22k
#[derive(Serialize, Deserialize, PartialEq, Debug)]
126
pub struct Header {
127
    pub version: u8,
128
    pub config: Lz4Config,
129
    pub upload_size: UploadSizeInfo,
130
}
131
132
2.53k
#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone, Copy)]
133
pub struct SliceIndex {
134
    pub position_from_prev_index: u32,
135
}
136
137
1.23k
#[derive(Serialize, Deserialize, PartialEq, Debug, Default)]
138
pub struct Footer {
139
    pub indexes: Vec<SliceIndex>,
140
    pub index_count: u32,
141
    pub uncompressed_data_size: u64,
142
    pub config: Lz4Config,
143
    pub version: u8,
144
}
145
146
/// lz4_flex::block::get_maximum_output_size() way over estimates, so we use the
147
/// one provided here: https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61
148
/// Local testing shows this gives quite accurate worst case given random input.
149
6
fn lz4_compress_bound(input_size: u64) -> u64 {
150
6
    input_size + (input_size / 255) + 16
151
6
}
152
153
struct UploadState {
154
    header: Header,
155
    footer: Footer,
156
    max_output_size: u64,
157
    input_max_size: u64,
158
}
159
160
impl UploadState {
161
6
    pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Result<Self, Error> {
162
6
        let input_max_size = match upload_size {
163
6
            UploadSizeInfo::MaxSize(
sz1
) | UploadSizeInfo::ExactSize(
sz5
) => sz,
164
6
        };
165
6
166
6
        let max_index_count = (input_max_size / u64::from(store.config.block_size)) + 1;
167
6
168
6
        let header = Header {
169
6
            version: CURRENT_STREAM_FORMAT_VERSION,
170
6
            config: Lz4Config {
171
6
                block_size: store.config.block_size,
172
6
            },
173
6
            upload_size,
174
6
        };
175
6
        let footer = Footer {
176
6
            indexes: vec![
177
6
                SliceIndex {
178
6
                    ..Default::default()
179
6
                };
180
6
                usize::try_from(max_index_count)
181
6
                    .err_tip(|| 
"Could not convert max_index_count to usize"0
)
?0
182
            ],
183
6
            index_count: max_index_count as u32,
184
6
            uncompressed_data_size: 0, // Updated later.
185
6
            config: header.config,
186
6
            version: CURRENT_STREAM_FORMAT_VERSION,
187
6
        };
188
6
189
6
        // This is more accurate of an estimate than what get_maximum_output_size calculates.
190
6
        let max_block_size = lz4_compress_bound(u64::from(store.config.block_size)) + U32_SZ + 1;
191
6
192
6
        let max_output_size = {
193
6
            let header_size = store.bincode_options.serialized_size(&header).unwrap();
194
6
            let max_content_size = max_block_size * max_index_count;
195
6
            let max_footer_size =
196
6
                U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap();
197
6
            header_size + max_content_size + max_footer_size
198
6
        };
199
6
200
6
        Ok(Self {
201
6
            header,
202
6
            footer,
203
6
            max_output_size,
204
6
            input_max_size,
205
6
        })
206
6
    }
207
}
208
209
/// This store will compress data before sending it on to the inner store.
210
/// Note: Currently using get_part() and trying to read part of the data will
211
/// result in the entire contents being read from the inner store but will
212
/// only send the contents requested.
213
0
#[derive(MetricsComponent)]
214
pub struct CompressionStore {
215
    #[metric(group = "inner_store")]
216
    inner_store: Store,
217
    config: nativelink_config::stores::Lz4Config,
218
    bincode_options: BincodeOptions,
219
}
220
221
impl CompressionStore {
222
7
    pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result<Arc<Self>, Error> {
223
7
        let lz4_config = match spec.compression_algorithm {
224
7
            nativelink_config::stores::CompressionAlgorithm::lz4(mut lz4_config) => {
225
7
                if lz4_config.block_size == 0 {
  Branch (225:20): [True: 3, False: 4]
  Branch (225:20): [Folded - Ignored]
226
3
                    lz4_config.block_size = DEFAULT_BLOCK_SIZE;
227
4
                }
228
7
                if lz4_config.max_decode_block_size == 0 {
  Branch (228:20): [True: 7, False: 0]
  Branch (228:20): [Folded - Ignored]
229
7
                    lz4_config.max_decode_block_size = lz4_config.block_size;
230
7
                }
0
231
7
                lz4_config
232
7
            }
233
7
        };
234
7
        Ok(Arc::new(CompressionStore {
235
7
            inner_store,
236
7
            config: lz4_config,
237
7
            bincode_options: DefaultOptions::new().with_fixint_encoding(),
238
7
        }))
239
7
    }
240
}
241
242
#[async_trait]
243
impl StoreDriver for CompressionStore {
244
    async fn has_with_results(
245
        self: Pin<&Self>,
246
        digests: &[StoreKey<'_>],
247
        results: &mut [Option<u64>],
248
0
    ) -> Result<(), Error> {
249
0
        self.inner_store.has_with_results(digests, results).await
250
0
    }
251
252
    async fn update(
253
        self: Pin<&Self>,
254
        key: StoreKey<'_>,
255
        mut reader: DropCloserReadHalf,
256
        upload_size: UploadSizeInfo,
257
6
    ) -> Result<(), Error> {
258
6
        let mut output_state = UploadState::new(&self, upload_size)
?0
;
259
260
6
        let (mut tx, rx) = make_buf_channel_pair();
261
6
262
6
        let inner_store = self.inner_store.clone();
263
6
        let key = key.into_owned();
264
6
        let update_fut = spawn!("compression_store_update_spawn", async move {
265
6
            inner_store
266
6
                .update(
267
6
                    key,
268
6
                    rx,
269
6
                    UploadSizeInfo::MaxSize(output_state.max_output_size),
270
6
                )
271
48
                .await
272
6
                .err_tip(|| 
"Inner store update in compression store failed"0
)
273
6
        })
274
6
        .map(
275
6
            |result| match result.err_tip(|| 
"Failed to run compression update spawn"0
) {
276
6
                Ok(inner_result) => {
277
6
                    inner_result.err_tip(|| 
"Compression underlying store update failed"0
)
278
                }
279
0
                Err(e) => Err(e),
280
6
            },
281
6
        );
282
6
283
6
        let write_fut = async move {
284
            {
285
                // Write Header.
286
6
                let serialized_header = self
287
6
                    .bincode_options
288
6
                    .serialize(&output_state.header)
289
6
                    .map_err(|e| {
290
0
                        make_err!(Code::Internal, "Failed to serialize header : {:?}", e)
291
6
                    })
?0
;
292
6
                tx.send(serialized_header.into())
293
0
                    .await
294
6
                    .err_tip(|| 
"Failed to write compression header on upload"0
)
?0
;
295
            }
296
297
6
            let mut received_amt = 0;
298
6
            let mut index_count: u32 = 0;
299
99
            for 
index98
in &mut output_state.footer.indexes {
300
98
                let chunk = reader
301
98
                    .consume(Some(self.config.block_size as usize))
302
0
                    .await
303
98
                    .err_tip(|| 
"Failed to read take in update in compression store"0
)
?0
;
304
98
                if chunk.is_empty() {
  Branch (304:20): [True: 5, False: 93]
  Branch (304:20): [Folded - Ignored]
305
5
                    break; // EOF.
306
93
                }
307
93
308
93
                received_amt += u64::try_from(chunk.len())
309
93
                    .err_tip(|| 
"Could not convert chunk.len() to u64"0
)
?0
;
310
0
                error_if!(
311
93
                    received_amt > output_state.input_max_size,
  Branch (311:21): [True: 0, False: 93]
  Branch (311:21): [Folded - Ignored]
312
                    "Got more data than stated in compression store upload request"
313
                );
314
315
93
                let max_output_size = get_maximum_output_size(self.config.block_size as usize);
316
93
                let mut compressed_data_buf = BytesMut::with_capacity(max_output_size);
317
93
                compressed_data_buf.put_u8(CHUNK_FRAME_TYPE);
318
93
                compressed_data_buf.put_u32_le(0); // Filled later.
319
93
320
93
                // For efficiency reasons we do some raw slice manipulation so we can write directly
321
93
                // into our buffer instead of having to do another allocation.
322
93
                let raw_compressed_data = unsafe {
323
93
                    std::slice::from_raw_parts_mut(
324
93
                        compressed_data_buf.chunk_mut().as_mut_ptr(),
325
93
                        max_output_size,
326
93
                    )
327
                };
328
329
93
                let compressed_data_sz = compress_into(&chunk, raw_compressed_data)
330
93
                    .map_err(|e| 
make_err!(Code::Internal, "Compression error {:?}", e)0
)
?0
;
331
93
                unsafe {
332
93
                    compressed_data_buf.advance_mut(compressed_data_sz);
333
93
                }
334
93
335
93
                // Now fill the size in our slice.
336
93
                LittleEndian::write_u32(&mut compressed_data_buf[1..5], compressed_data_sz as u32);
337
93
338
93
                // Now send our chunk.
339
93
                tx.send(compressed_data_buf.freeze())
340
45
                    .await
341
93
                    .err_tip(|| 
"Failed to write chunk to inner store in compression store"0
)
?0
;
342
343
93
                index.position_from_prev_index = compressed_data_sz as u32;
344
93
345
93
                index_count += 1;
346
            }
347
            // Index 0 is actually a pointer to the second chunk. This is because we don't need
348
            // an index for the first item, since it starts at position `{header_len}`.
349
            // The code above causes us to create 1 more index than we actually need, so we
350
            // remove the last index from our vector here, because at this point we are always
351
            // one index too many.
352
            // Note: We need to be careful that if we don't have any data (zero bytes) it
353
            // doesn't go to -1.
354
6
            index_count = index_count.saturating_sub(1);
355
6
            output_state.footer.indexes.resize(
356
6
                index_count as usize,
357
6
                SliceIndex {
358
6
                    ..Default::default()
359
6
                },
360
6
            );
361
6
            output_state.footer.index_count = output_state.footer.indexes.len() as u32;
362
6
            output_state.footer.uncompressed_data_size = received_amt;
363
            {
364
                // Write Footer.
365
6
                let serialized_footer = self
366
6
                    .bincode_options
367
6
                    .serialize(&output_state.footer)
368
6
                    .map_err(|e| {
369
0
                        make_err!(Code::Internal, "Failed to serialize header : {:?}", e)
370
6
                    })
?0
;
371
372
6
                let mut footer = BytesMut::with_capacity(1 + 4 + serialized_footer.len());
373
6
                footer.put_u8(FOOTER_FRAME_TYPE);
374
6
                footer.put_u32_le(serialized_footer.len() as u32);
375
6
                footer.extend_from_slice(&serialized_footer);
376
6
377
6
                tx.send(footer.freeze())
378
3
                    .await
379
6
                    .err_tip(|| 
"Failed to write footer to inner store in compression store"0
)
?0
;
380
6
                tx.send_eof()
381
6
                    .err_tip(|| 
"Failed writing EOF in compression store update"0
)
?0
;
382
            }
383
384
6
            Result::<(), Error>::Ok(())
385
6
        };
386
6
        let (write_result, update_result) = tokio::join!(write_fut, update_fut);
387
6
        write_result.merge(update_result)
388
12
    }
389
390
    async fn get_part(
391
        self: Pin<&Self>,
392
        key: StoreKey<'_>,
393
        writer: &mut DropCloserWriteHalf,
394
        offset: u64,
395
        length: Option<u64>,
396
1.22k
    ) -> Result<(), Error> {
397
1.22k
        if is_zero_digest(key.borrow()) {
  Branch (397:12): [True: 1, False: 1.22k]
  Branch (397:12): [Folded - Ignored]
398
1
            writer
399
1
                .send_eof()
400
1
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
401
1
            return Ok(());
402
1.22k
        }
403
1.22k
404
1.22k
        let (tx, mut rx) = make_buf_channel_pair();
405
1.22k
406
1.22k
        let inner_store = self.inner_store.clone();
407
1.22k
        let key = key.into_owned();
408
1.22k
        let get_part_fut = spawn!("compression_store_get_part_spawn", async move {
409
1.22k
            inner_store
410
1.22k
                .get_part(key, tx, 0, None)
411
0
                .await
412
1.22k
                .err_tip(|| 
"Inner store get in compression store failed"0
)
413
1.22k
        })
414
1.22k
        .map(
415
1.22k
            |result| match result.err_tip(|| 
"Failed to run compression get spawn"0
) {
416
1.22k
                Ok(inner_result) => {
417
1.22k
                    inner_result.err_tip(|| 
"Compression underlying store get failed"0
)
418
                }
419
0
                Err(e) => Err(e),
420
1.22k
            },
421
1.22k
        );
422
1.22k
        let read_fut = async move {
423
1.22k
            let header = {
424
                // Read header.
425
                static EMPTY_HEADER: Header = Header {
426
                    version: CURRENT_STREAM_FORMAT_VERSION,
427
                    config: Lz4Config { block_size: 0 },
428
                    upload_size: UploadSizeInfo::ExactSize(0),
429
                };
430
1.22k
                let header_size = self.bincode_options.serialized_size(&EMPTY_HEADER).unwrap();
431
1.22k
                let chunk = rx
432
1.22k
                    .consume(Some(header_size as usize))
433
1.22k
                    .await
434
1.22k
                    .err_tip(|| 
"Failed to read header in get_part compression store"0
)
?0
;
435
0
                error_if!(
436
1.22k
                    chunk.len() as u64 != header_size,
  Branch (436:21): [True: 0, False: 1.22k]
  Branch (436:21): [Folded - Ignored]
437
                    "Expected inner store to return the proper amount of data in compression store {} != {}",
438
0
                    chunk.len(),
439
                    header_size,
440
                );
441
442
1.22k
                self.bincode_options
443
1.22k
                    .deserialize::<Header>(&chunk)
444
1.22k
                    .map_err(|e| {
445
0
                        make_err!(Code::Internal, "Failed to deserialize header : {:?}", e)
446
1.22k
                    })
?0
447
            };
448
449
0
            error_if!(
450
1.22k
                header.version != CURRENT_STREAM_FORMAT_VERSION,
  Branch (450:17): [True: 0, False: 1.22k]
  Branch (450:17): [Folded - Ignored]
451
                "Expected header version to match in get compression, got {}, want {}",
452
                header.version,
453
                CURRENT_STREAM_FORMAT_VERSION
454
            );
455
0
            error_if!(
456
1.22k
                header.config.block_size > self.config.max_decode_block_size,
  Branch (456:17): [True: 0, False: 1.22k]
  Branch (456:17): [Folded - Ignored]
457
                "Block size is too large in compression, got {} > {}",
458
                header.config.block_size,
459
0
                self.config.max_decode_block_size
460
            );
461
462
1.22k
            let mut chunk = rx
463
1.22k
                .consume(Some(1 + 4))
464
0
                .await
465
1.22k
                .err_tip(|| 
"Failed to read init frame info in compression store"0
)
?0
;
466
0
            error_if!(
467
1.22k
                chunk.len() < 1 + 4,
  Branch (467:17): [True: 0, False: 1.22k]
  Branch (467:17): [Folded - Ignored]
468
                "Received EOF too early while reading init frame info in compression store"
469
            );
470
471
1.22k
            let mut frame_type = chunk.get_u8();
472
1.22k
            let mut frame_sz = chunk.get_u32_le();
473
1.22k
474
1.22k
            let mut uncompressed_data_sz: u64 = 0;
475
1.22k
            let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX);
476
1.22k
            let mut chunks_count: u32 = 0;
477
4.98k
            while frame_type != FOOTER_FRAME_TYPE {
  Branch (477:19): [True: 3.75k, False: 1.22k]
  Branch (477:19): [Folded - Ignored]
478
0
                error_if!(
479
3.75k
                    frame_type != CHUNK_FRAME_TYPE,
  Branch (479:21): [True: 0, False: 3.75k]
  Branch (479:21): [Folded - Ignored]
480
                    "Expected frame to be BODY in compression store, got {} at {}",
481
                    frame_type,
482
                    chunks_count
483
                );
484
485
3.75k
                let chunk = rx
486
3.75k
                    .consume(Some(frame_sz as usize))
487
0
                    .await
488
3.75k
                    .err_tip(|| 
"Failed to read chunk in get_part compression store"0
)
?0
;
489
3.75k
                if chunk.len() < frame_sz as usize {
  Branch (489:20): [True: 0, False: 3.75k]
  Branch (489:20): [Folded - Ignored]
490
0
                    return Err(make_err!(
491
0
                        Code::Internal,
492
0
                        "Got EOF earlier than expected. Maybe the data is not compressed or different format?"
493
0
                    ));
494
3.75k
                }
495
3.75k
                {
496
3.75k
                    let max_output_size =
497
3.75k
                        get_maximum_output_size(header.config.block_size as usize);
498
3.75k
                    let mut uncompressed_data = BytesMut::with_capacity(max_output_size);
499
3.75k
500
3.75k
                    // For efficiency reasons we do some raw slice manipulation so we can write directly
501
3.75k
                    // into our buffer instead of having to do another allocation.
502
3.75k
                    let raw_decompressed_data = unsafe {
503
3.75k
                        std::slice::from_raw_parts_mut(
504
3.75k
                            uncompressed_data.chunk_mut().as_mut_ptr(),
505
3.75k
                            max_output_size,
506
3.75k
                        )
507
                    };
508
509
3.75k
                    let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data)
510
3.75k
                        .map_err(|e| 
make_err!(Code::Internal, "Decompression error {:?}", e)0
)
?0
;
511
3.75k
                    unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) };
512
3.75k
                    let new_uncompressed_data_sz =
513
3.75k
                        uncompressed_data_sz + uncompressed_chunk_sz as u64;
514
3.75k
                    if new_uncompressed_data_sz >= offset && 
remaining_bytes_to_send > 02.28k
{
  Branch (514:24): [True: 2.28k, False: 1.47k]
  Branch (514:62): [True: 1.95k, False: 328]
  Branch (514:24): [Folded - Ignored]
  Branch (514:62): [Folded - Ignored]
515
1.95k
                        let start_pos = if offset <= uncompressed_data_sz {
  Branch (515:44): [True: 938, False: 1.02k]
  Branch (515:44): [Folded - Ignored]
516
938
                            0
517
                        } else {
518
1.02k
                            offset - uncompressed_data_sz
519
                        } as usize;
520
1.95k
                        let end_pos = cmp::min(
521
1.95k
                            start_pos + remaining_bytes_to_send as usize,
522
1.95k
                            uncompressed_chunk_sz,
523
1.95k
                        );
524
1.95k
                        if end_pos != start_pos {
  Branch (524:28): [True: 1.85k, False: 102]
  Branch (524:28): [Folded - Ignored]
525
                            // Make sure we don't send an EOF by accident.
526
1.85k
                            writer
527
1.85k
                                .send(uncompressed_data.freeze().slice(start_pos..end_pos))
528
224
                                .await
529
1.85k
                                .err_tip(|| 
"Failed sending chunk in compression store"0
)
?0
;
530
102
                        }
531
1.95k
                        remaining_bytes_to_send -= (end_pos - start_pos) as u64;
532
1.79k
                    }
533
3.75k
                    uncompressed_data_sz = new_uncompressed_data_sz;
534
3.75k
                }
535
3.75k
                chunks_count += 1;
536
537
3.75k
                let mut chunk = rx
538
3.75k
                    .consume(Some(1 + 4))
539
0
                    .await
540
3.75k
                    .err_tip(|| 
"Failed to read frame info in compression store"0
)
?0
;
541
0
                error_if!(
542
3.75k
                    chunk.len() < 1 + 4,
  Branch (542:21): [True: 0, False: 3.75k]
  Branch (542:21): [Folded - Ignored]
543
                    "Received EOF too early while reading frame info in compression store"
544
                );
545
546
3.75k
                frame_type = chunk.get_u8();
547
3.75k
                frame_sz = chunk.get_u32_le();
548
            }
549
            // Index count will always be +1 (unless it is zero bytes long).
550
1.22k
            chunks_count = chunks_count.saturating_sub(1);
551
            {
552
                // Read and validate footer.
553
1.22k
                let chunk = rx
554
1.22k
                    .consume(Some(frame_sz as usize))
555
0
                    .await
556
1.22k
                    .err_tip(|| 
"Failed to read chunk in get_part compression store"0
)
?0
;
557
0
                error_if!(
558
1.22k
                    chunk.len() < frame_sz as usize,
  Branch (558:21): [True: 0, False: 1.22k]
  Branch (558:21): [Folded - Ignored]
559
                    "Unexpected EOF when reading footer in compression store get_part"
560
                );
561
562
1.22k
                let footer = self
563
1.22k
                    .bincode_options
564
1.22k
                    .deserialize::<Footer>(&chunk)
565
1.22k
                    .map_err(|e| {
566
0
                        make_err!(Code::Internal, "Failed to deserialize footer : {:?}", e)
567
1.22k
                    })
?0
;
568
569
0
                error_if!(
570
1.22k
                    header.version != footer.version,
  Branch (570:21): [True: 0, False: 1.22k]
  Branch (570:21): [Folded - Ignored]
571
                    "Expected header and footer versions to match compression store get_part, {} != {}",
572
                    header.version,
573
                    footer.version
574
                );
575
0
                error_if!(
576
1.22k
                    footer.indexes.len() != footer.index_count as usize,
  Branch (576:21): [True: 0, False: 1.22k]
  Branch (576:21): [Folded - Ignored]
577
                    "Expected index counts to match in compression store footer in get_part, {} != {}",
578
0
                    footer.indexes.len(),
579
                    footer.index_count
580
                );
581
0
                error_if!(
582
1.22k
                    footer.index_count != chunks_count,
  Branch (582:21): [True: 0, False: 1.22k]
  Branch (582:21): [Folded - Ignored]
583
                    concat!(
584
                        "Expected index counts to match received chunks count ",
585
                        "in compression store footer in get_part, {} != {}"
586
                    ),
587
                    footer.index_count,
588
                    chunks_count
589
                );
590
0
                error_if!(
591
1.22k
                    footer.uncompressed_data_size != uncompressed_data_sz,
  Branch (591:21): [True: 0, False: 1.22k]
  Branch (591:21): [Folded - Ignored]
592
                    "Expected uncompressed data sizes to match in compression store footer in get_part, {} != {}",
593
                    footer.uncompressed_data_size,
594
                    uncompressed_data_sz
595
                );
596
            }
597
598
1.22k
            writer
599
1.22k
                .send_eof()
600
1.22k
                .err_tip(|| 
"Failed to send eof in compression store write"0
)
?0
;
601
1.22k
            Ok(())
602
1.22k
        };
603
604
1.22k
        let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut);
605
1.22k
        if let Err(
mut e0
) = read_result {
  Branch (605:16): [True: 0, False: 1.22k]
  Branch (605:16): [Folded - Ignored]
606
            // We may need to propagate the error from reading the data through first.
607
0
            if let Err(err) = get_part_fut_result {
  Branch (607:20): [True: 0, False: 0]
  Branch (607:20): [Folded - Ignored]
608
0
                e = err.merge(e);
609
0
            }
610
0
            return Err(e);
611
1.22k
        }
612
1.22k
        Ok(())
613
2.45k
    }
614
615
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
616
0
        self
617
0
    }
618
619
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
620
0
        self
621
0
    }
622
623
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
624
0
        self
625
0
    }
626
}
627
628
default_health_status_indicator!(CompressionStore);