Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/compression_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use byteorder::{ByteOrder, LittleEndian};
21
use bytes::{Buf, BufMut, BytesMut};
22
use futures::future::FutureExt;
23
use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size};
24
use nativelink_config::stores::CompressionSpec;
25
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
26
use nativelink_metric::MetricsComponent;
27
use nativelink_util::buf_channel::{
28
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
29
};
30
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
31
use nativelink_util::spawn;
32
use nativelink_util::store_trait::{
33
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
34
};
35
use serde::{Deserialize, Serialize};
36
use wincode::{SchemaRead, SchemaWrite};
37
38
use crate::cas_utils::is_zero_digest;
39
40
// In the event the bytestream format changes this number should be incremented to prevent
41
// backwards compatibility issues.
42
pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
43
44
// Default block size that will be used to slice stream into.
45
pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;
46
47
const U32_SZ: u64 = size_of::<u8>() as u64;
48
49
// We use a custom frame format here because I wanted the ability in the future to:
50
// * Read a random part of the data without needing to parse entire file.
51
// * Compress the data on the fly without needing to know the exact input size.
52
//
53
// The frame format that LZ4 uses does not contain an index of where the different
54
// blocks are located. This would mean in the event we only wanted the last byte of
55
// a file, we'd need to seek to the header of each block to find where the next block
56
// offset is until we got to the last block then decompress it.
57
//
58
// By using this custom frame format we keep the ability to have each block reference
59
// the next block (for efficiency), but also after all blocks we have a footer frame
60
// which contains an index of each block in the stream. This means that we can read a
61
// fixed number of bytes of the file, which always contain the size of the index, then
62
// with that size we know the exact size of the footer frame, which contains the entire
63
// index. Once this footer frame is loaded, we could then do some math to figure out
64
// which block each byte is in and the offset of each block within the compressed file.
65
//
66
// The frame format is as follows:
67
// |----------------------------------HEADER-----------------------------------------|
68
// |  version(u8) |  block_size (u32) |  upload_size_type (u32) |  upload_size (u32) |
69
// |----------------------------------BLOCK------------------------------------------|
70
// |  frame_type(u8) 0x00 |  compressed_data_size (u32) |        ...DATA...          |
71
// |                                ...DATA...                                       |
72
// | [Possibly repeat block]                                                         |
73
// |----------------------------------FOOTER-----------------------------------------|
74
// |  frame_type(u8) 0x01 |    footer_size (u32) |           index_count1 (u64)      |
75
// |      ...[pos_from_prev_index (u32) - repeat for count {index_count*}]...        |
76
// |  index_count2 (u32) |    uncompressed_data_sz (u64) |    block_size (u32)       |
77
// | version (u8) |------------------------------------------------------------------|
78
// |---------------------------------------------------------------------------------|
79
//
80
// version              - A constant number used to define what version of this format is being
81
//                        used. Version in header and footer must match.
82
// block_size           - Size of each block uncompressed except for last block. This means that
83
//                        every block uncompressed will be a constant size except last block may
84
//                        be variable size. Block size in header and footer must match.
85
// upload_size_type     - Value of 0 = UploadSizeInfo::ExactSize, 1 = UploadSizeInfo::MaxSize.
86
//                        This is for debug reasons only.
87
// upload_size          - The size of the data. WARNING: Do not rely on this being the uncompressed
88
//                        payload size. It is a debug field and a "best guess" on how large the data
89
//                        is. The header does not contain the upload data size. This value is the
90
//                        value counter part to what the `upload_size_type` field.
91
// frame_type           - Type of each frame. 0 = BLOCK frame, 1 = FOOTER frame. Header frame will
92
//                        always start with the first byte of the stream, so no magic number for it.
93
// compressed_data_size - The size of this block. The bytes after this field should be read
94
//                        in sequence to get all of the block's data in this block.
95
// footer_size          - Size of the footer for bytes after this field.
96
// index_count1         - Number of items in the index. ({index_count1} * 4) represents the number
97
//                        of bytes that should be read after this field in order to get all index
98
//                        data.
99
// pos_from_prev_index  - Index of an individual block in the stream relative to the previous
100
//                        block. This field may repeat {index_count} times.
101
// index_count2         - Same as {index_count1} and should always be equal. This field might be
102
//                        useful if you want to read a random byte from this stream, have random
103
//                        access to it and know the size of the stream during reading, because
104
//                        this field is always in the exact same place in the stream relative to
105
//                        the last byte.
106
// uncompressed_data_sz - Size of the original uncompressed data.
107
//
108
// Note: All fields fields little-endian.
109
110
/// Number representing a chunk.
111
pub const CHUNK_FRAME_TYPE: u8 = 0;
112
113
/// Number representing the footer.
114
pub const FOOTER_FRAME_TYPE: u8 = 1;
115
116
/// This is a partial mirror of `nativelink_config::stores::Lz4Config`.
117
/// We cannot use that natively here because it could cause our
118
/// serialized format to change if we added more configs.
119
#[derive(
120
0
    Serialize, Deserialize, PartialEq, Eq, Debug, Default, Copy, Clone, SchemaWrite, SchemaRead,
121
)]
122
pub struct Lz4Config {
123
    pub block_size: u32,
124
}
125
126
0
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy, SchemaWrite, SchemaRead)]
127
pub struct Header {
128
    pub version: u8,
129
    pub config: Lz4Config,
130
    pub upload_size: UploadSizeInfo,
131
}
132
133
#[derive(
134
0
    Serialize, Deserialize, PartialEq, Eq, Debug, Default, Clone, Copy, SchemaWrite, SchemaRead,
135
)]
136
pub struct SliceIndex {
137
    pub position_from_prev_index: u32,
138
}
139
140
0
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, SchemaWrite, SchemaRead)]
141
pub struct Footer {
142
    pub indexes: Vec<SliceIndex>,
143
    pub index_count: u32,
144
    pub uncompressed_data_size: u64,
145
    pub config: Lz4Config,
146
    pub version: u8,
147
}
148
149
/// `lz4_flex::block::get_maximum_output_size()` way over estimates, so we use the
150
/// one provided here: <https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61>
151
/// Local testing shows this gives quite accurate worst case given random input.
152
7
const fn lz4_compress_bound(input_size: u64) -> u64 {
153
7
    input_size + (input_size / 255) + 16
154
7
}
155
156
1.26k
fn serialized_size<C, V>(value: &V, config: C) -> Result<u64, Error>
157
1.26k
where
158
1.26k
    C: wincode::config::Config,
159
1.26k
    V: SchemaWrite<C, Src = V>,
160
{
161
1.26k
    wincode::config::serialized_size(value, config).map_err(|e| 
{0
162
0
        make_err!(
163
0
            Code::Internal,
164
            "Failed to calculate serialized size: {:?}",
165
            e
166
        )
167
0
    })
168
1.26k
}
169
170
struct UploadState {
171
    header: Header,
172
    footer: Footer,
173
    max_output_size: u64,
174
    input_max_size: u64,
175
}
176
177
impl UploadState {
178
7
    pub(crate) fn new(
179
7
        store: &CompressionStore,
180
7
        upload_size: UploadSizeInfo,
181
7
    ) -> Result<Self, Error> {
182
7
        let input_max_size = match upload_size {
183
7
            UploadSizeInfo::MaxSize(
sz1
) | UploadSizeInfo::ExactSize(
sz6
) => sz,
184
        };
185
186
7
        let max_index_count = (input_max_size / u64::from(store.config.block_size)) + 1;
187
188
7
        let header = Header {
189
7
            version: CURRENT_STREAM_FORMAT_VERSION,
190
7
            config: Lz4Config {
191
7
                block_size: store.config.block_size,
192
7
            },
193
7
            upload_size,
194
7
        };
195
7
        let footer = Footer {
196
7
            indexes: vec![
197
7
                SliceIndex::default();
198
7
                usize::try_from(max_index_count)
199
7
                    .err_tip(|| "Could not convert max_index_count to usize")
?0
200
            ],
201
7
            index_count: u32::try_from(max_index_count)
202
7
                .err_tip(|| "Could not convert max_index_count to u32")
?0
,
203
            uncompressed_data_size: 0, // Updated later.
204
7
            config: header.config,
205
            version: CURRENT_STREAM_FORMAT_VERSION,
206
        };
207
208
        // This is more accurate of an estimate than what get_maximum_output_size calculates.
209
7
        let max_block_size = lz4_compress_bound(u64::from(store.config.block_size)) + U32_SZ + 1;
210
211
7
        let max_output_size = {
212
7
            let header_size = serialized_size(&header, store.wincode_config)
?0
;
213
7
            let max_content_size = max_block_size * max_index_count;
214
7
            let max_footer_size = U32_SZ + 1 + serialized_size(&footer, store.wincode_config)
?0
;
215
7
            header_size + max_content_size + max_footer_size
216
        };
217
218
7
        Ok(Self {
219
7
            header,
220
7
            footer,
221
7
            max_output_size,
222
7
            input_max_size,
223
7
        })
224
7
    }
225
}
226
227
// Based off of an older Bincode config
228
pub type WincodeConfig = wincode::config::Configuration<
229
    true,
230
    { wincode::config::PREALLOCATION_SIZE_LIMIT_DISABLED },
231
    wincode::len::BincodeLen,
232
    wincode::int_encoding::LittleEndian,
233
    wincode::int_encoding::FixInt,
234
    u32,
235
>;
236
237
/// This store will compress data before sending it on to the inner store.
238
/// Note: Currently using `get_part()` and trying to read part of the data will
239
/// result in the entire contents being read from the inner store but will
240
/// only send the contents requested.
241
#[derive(MetricsComponent)]
242
pub struct CompressionStore {
243
    #[metric(group = "inner_store")]
244
    inner_store: Store,
245
    config: nativelink_config::stores::Lz4Config,
246
    wincode_config: WincodeConfig,
247
}
248
249
impl core::fmt::Debug for CompressionStore {
250
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
251
0
        f.debug_struct("CompressionStore")
252
0
            .field("inner_store", &self.inner_store)
253
0
            .field("config", &self.config)
254
0
            .finish_non_exhaustive()
255
0
    }
256
}
257
258
impl CompressionStore {
259
8
    pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result<Arc<Self>, Error> {
260
8
        let lz4_config = match spec.compression_algorithm {
261
8
            nativelink_config::stores::CompressionAlgorithm::Lz4(mut lz4_config) => {
262
8
                if lz4_config.block_size == 0 {
263
3
                    lz4_config.block_size = DEFAULT_BLOCK_SIZE;
264
5
                }
265
8
                if lz4_config.max_decode_block_size == 0 {
266
8
                    lz4_config.max_decode_block_size = lz4_config.block_size;
267
8
                
}0
268
8
                lz4_config
269
            }
270
        };
271
8
        Ok(Arc::new(Self {
272
8
            inner_store,
273
8
            config: lz4_config,
274
8
            wincode_config: WincodeConfig::new(),
275
8
        }))
276
8
    }
277
}
278
279
#[async_trait]
280
impl StoreDriver for CompressionStore {
281
    async fn has_with_results(
282
        self: Pin<&Self>,
283
        digests: &[StoreKey<'_>],
284
        results: &mut [Option<u64>],
285
0
    ) -> Result<(), Error> {
286
        self.inner_store.has_with_results(digests, results).await
287
0
    }
288
289
    async fn update(
290
        self: Pin<&Self>,
291
        key: StoreKey<'_>,
292
        mut reader: DropCloserReadHalf,
293
        upload_size: UploadSizeInfo,
294
7
    ) -> Result<u64, Error> {
295
        let mut output_state = UploadState::new(&self, upload_size)?;
296
297
        let (mut tx, rx) = make_buf_channel_pair();
298
299
        let inner_store = self.inner_store.clone();
300
        let key = key.into_owned();
301
7
        let update_fut = spawn!("compression_store_update_spawn", async move {
302
7
            inner_store
303
7
                .update(
304
7
                    key,
305
7
                    rx,
306
7
                    UploadSizeInfo::MaxSize(output_state.max_output_size),
307
7
                )
308
7
                .await
309
7
                .err_tip(|| "Inner store update in compression store failed")
310
7
        })
311
        .map(
312
7
            |result| match result.err_tip(|| "Failed to run compression update spawn") {
313
7
                Ok(inner_result) => {
314
7
                    inner_result.err_tip(|| "Compression underlying store update failed")
315
                }
316
0
                Err(e) => Err(e),
317
7
            },
318
        );
319
320
7
        let write_fut = async move {
321
            {
322
                // Write Header.
323
7
                let serialized_header =
324
7
                    wincode::config::serialize(&output_state.header, self.wincode_config).map_err(
325
0
                        |e| make_err!(Code::Internal, "Failed to serialize header : {:?}", e),
326
0
                    )?;
327
7
                tx.send(serialized_header.into())
328
7
                    .await
329
7
                    .err_tip(|| "Failed to write compression header on upload")
?0
;
330
            }
331
332
7
            let mut received_amt = 0;
333
7
            let mut index_count: u32 = 0;
334
102
            for index in 
&mut output_state.footer.indexes7
{
335
102
                let chunk = reader
336
102
                    .consume(Some(self.config.block_size as usize))
337
102
                    .await
338
102
                    .err_tip(|| "Failed to read take in update in compression store")
?0
;
339
102
                if chunk.is_empty() {
340
6
                    break; // EOF.
341
96
                }
342
343
96
                received_amt += u64::try_from(chunk.len())
344
96
                    .err_tip(|| "Could not convert chunk.len() to u64")
?0
;
345
0
                error_if!(
346
96
                    received_amt > output_state.input_max_size,
347
                    "Got more data than stated in compression store upload request"
348
                );
349
350
96
                let max_output_size = get_maximum_output_size(self.config.block_size as usize);
351
96
                let mut compressed_data_buf = BytesMut::with_capacity(max_output_size);
352
96
                compressed_data_buf.put_u8(CHUNK_FRAME_TYPE);
353
96
                compressed_data_buf.put_u32_le(0); // Filled later.
354
355
                // For efficiency reasons we do some raw slice manipulation so we can write directly
356
                // into our buffer instead of having to do another allocation.
357
96
                let raw_compressed_data = unsafe {
358
96
                    core::slice::from_raw_parts_mut(
359
96
                        compressed_data_buf.chunk_mut().as_mut_ptr(),
360
96
                        max_output_size,
361
96
                    )
362
                };
363
364
96
                let compressed_data_sz = compress_into(&chunk, raw_compressed_data)
365
96
                    .map_err(|e| 
make_err!0
(
Code::Internal0
, "Compression error {:?}", e))
?0
;
366
96
                unsafe {
367
96
                    compressed_data_buf.advance_mut(compressed_data_sz);
368
96
                }
369
370
                // Now fill the size in our slice.
371
96
                LittleEndian::write_u32(
372
96
                    &mut compressed_data_buf[1..5],
373
96
                    u32::try_from(compressed_data_sz).unwrap_or(u32::MAX),
374
                );
375
376
                // Now send our chunk.
377
96
                tx.send(compressed_data_buf.freeze())
378
96
                    .await
379
96
                    .err_tip(|| "Failed to write chunk to inner store in compression store")
?0
;
380
381
96
                index.position_from_prev_index =
382
96
                    u32::try_from(compressed_data_sz).unwrap_or(u32::MAX);
383
384
96
                index_count += 1;
385
            }
386
            // Index 0 is actually a pointer to the second chunk. This is because we don't need
387
            // an index for the first item, since it starts at position `{header_len}`.
388
            // The code above causes us to create 1 more index than we actually need, so we
389
            // remove the last index from our vector here, because at this point we are always
390
            // one index too many.
391
            // Note: We need to be careful that if we don't have any data (zero bytes) it
392
            // doesn't go to -1.
393
7
            index_count = index_count.saturating_sub(1);
394
7
            output_state
395
7
                .footer
396
7
                .indexes
397
7
                .resize(index_count as usize, SliceIndex::default());
398
7
            output_state.footer.index_count =
399
7
                u32::try_from(output_state.footer.indexes.len()).unwrap_or(u32::MAX);
400
7
            output_state.footer.uncompressed_data_size = received_amt;
401
            {
402
                // Write Footer.
403
7
                let serialized_footer =
404
7
                    wincode::config::serialize(&output_state.footer, self.wincode_config).map_err(
405
0
                        |e| make_err!(Code::Internal, "Failed to serialize header : {:?}", e),
406
0
                    )?;
407
408
7
                let mut footer = BytesMut::with_capacity(1 + 4 + serialized_footer.len());
409
7
                footer.put_u8(FOOTER_FRAME_TYPE);
410
7
                footer.put_u32_le(u32::try_from(serialized_footer.len()).unwrap_or(u32::MAX));
411
7
                footer.extend_from_slice(&serialized_footer);
412
413
7
                tx.send(footer.freeze())
414
7
                    .await
415
7
                    .err_tip(|| "Failed to write footer to inner store in compression store")
?0
;
416
7
                tx.send_eof()
417
7
                    .err_tip(|| "Failed writing EOF in compression store update")
?0
;
418
            }
419
420
7
            Result::<u64, Error>::Ok(received_amt)
421
7
        };
422
        let (write_result, update_result) = tokio::join!(write_fut, update_fut);
423
        match (write_result, update_result) {
424
            (Ok(size), Ok(_)) => Ok(size),
425
            (Err(e), _) | (_, Err(e)) => Err(e),
426
        }
427
7
    }
428
429
    async fn get_part(
430
        self: Pin<&Self>,
431
        key: StoreKey<'_>,
432
        writer: &mut DropCloserWriteHalf,
433
        offset: u64,
434
        length: Option<u64>,
435
1.24k
    ) -> Result<(), Error> {
436
        if is_zero_digest(key.borrow()) {
437
            writer
438
                .send_eof()
439
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
440
            return Ok(());
441
        }
442
443
        let (tx, mut rx) = make_buf_channel_pair();
444
445
        let inner_store = self.inner_store.clone();
446
        let key = key.into_owned();
447
1.24k
        let get_part_fut = spawn!("compression_store_get_part_spawn", async move {
448
1.24k
            inner_store
449
1.24k
                .get_part(key, tx, 0, None)
450
1.24k
                .await
451
1.24k
                .err_tip(|| "Inner store get in compression store failed")
452
1.24k
        })
453
        .map(
454
1.24k
            |result| match result.err_tip(|| "Failed to run compression get spawn") {
455
1.24k
                Ok(inner_result) => {
456
1.24k
                    inner_result.err_tip(|| "Compression underlying store get failed")
457
                }
458
0
                Err(e) => Err(e),
459
1.24k
            },
460
        );
461
1.24k
        let read_fut = async move {
462
1.24k
            let header = {
463
                // Read header.
464
                const EMPTY_HEADER: Header = Header {
465
                    version: CURRENT_STREAM_FORMAT_VERSION,
466
                    config: Lz4Config { block_size: 0 },
467
                    upload_size: UploadSizeInfo::ExactSize(0),
468
                };
469
1.24k
                let header_size = serialized_size(&EMPTY_HEADER, self.wincode_config)
?0
;
470
1.24k
                let chunk = rx
471
1.24k
                    .consume(Some(usize::try_from(header_size).unwrap_or(usize::MAX)))
472
1.24k
                    .await
473
1.24k
                    .err_tip(|| "Failed to read header in get_part compression store")
?0
;
474
0
                error_if!(
475
1.24k
                    chunk.len() as u64 != header_size,
476
                    "Expected inner store to return the proper amount of data in compression store {} != {}",
477
0
                    chunk.len(),
478
                    header_size,
479
                );
480
481
1.24k
                wincode::config::deserialize::<Header, WincodeConfig>(
482
1.24k
                    &chunk,
483
1.24k
                    self.wincode_config,
484
                )
485
1.24k
                .map_err(|e| 
make_err!0
(
Code::Internal0
, "Failed to deserialize header : {:?}", e))
?0
486
            };
487
488
0
            error_if!(
489
1.24k
                header.version != CURRENT_STREAM_FORMAT_VERSION,
490
                "Expected header version to match in get compression, got {}, want {}",
491
                header.version,
492
                CURRENT_STREAM_FORMAT_VERSION
493
            );
494
0
            error_if!(
495
1.24k
                header.config.block_size > self.config.max_decode_block_size,
496
                "Block size is too large in compression, got {} > {}",
497
                header.config.block_size,
498
0
                self.config.max_decode_block_size
499
            );
500
501
1.24k
            let mut chunk = rx
502
1.24k
                .consume(Some(1 + 4))
503
1.24k
                .await
504
1.24k
                .err_tip(|| "Failed to read init frame info in compression store")
?0
;
505
0
            error_if!(
506
1.24k
                chunk.len() < 1 + 4,
507
                "Received EOF too early while reading init frame info in compression store"
508
            );
509
510
1.24k
            let mut frame_type = chunk.get_u8();
511
1.24k
            let mut frame_sz = chunk.get_u32_le();
512
513
1.24k
            let mut uncompressed_data_sz: u64 = 0;
514
1.24k
            let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX);
515
1.24k
            let mut chunks_count: u32 = 0;
516
5.05k
            while frame_type != FOOTER_FRAME_TYPE {
517
0
                error_if!(
518
3.81k
                    frame_type != CHUNK_FRAME_TYPE,
519
                    "Expected frame to be BODY in compression store, got {} at {}",
520
                    frame_type,
521
                    chunks_count
522
                );
523
524
3.81k
                let chunk = rx
525
3.81k
                    .consume(Some(frame_sz as usize))
526
3.81k
                    .await
527
3.81k
                    .err_tip(|| "Failed to read chunk in get_part compression store")
?0
;
528
3.81k
                if chunk.len() < frame_sz as usize {
529
0
                    return Err(make_err!(
530
0
                        Code::Internal,
531
0
                        "Got EOF earlier than expected. Maybe the data is not compressed or different format?"
532
0
                    ));
533
3.81k
                }
534
                {
535
3.81k
                    let max_output_size =
536
3.81k
                        get_maximum_output_size(header.config.block_size as usize);
537
3.81k
                    let mut uncompressed_data = BytesMut::with_capacity(max_output_size);
538
539
                    // For efficiency reasons we do some raw slice manipulation so we can write directly
540
                    // into our buffer instead of having to do another allocation.
541
3.81k
                    let raw_decompressed_data = unsafe {
542
3.81k
                        core::slice::from_raw_parts_mut(
543
3.81k
                            uncompressed_data.chunk_mut().as_mut_ptr(),
544
3.81k
                            max_output_size,
545
3.81k
                        )
546
                    };
547
548
3.81k
                    let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data)
549
3.81k
                        .map_err(|e| 
make_err!0
(
Code::Internal0
, "Decompression error {:?}", e))
?0
;
550
3.81k
                    unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) };
551
3.81k
                    let new_uncompressed_data_sz =
552
3.81k
                        uncompressed_data_sz + uncompressed_chunk_sz as u64;
553
3.81k
                    if new_uncompressed_data_sz >= offset && 
remaining_bytes_to_send > 02.33k
{
554
1.98k
                        let start_pos =
555
1.98k
                            usize::try_from(offset.saturating_sub(uncompressed_data_sz))
556
1.98k
                                .unwrap_or(usize::MAX);
557
1.98k
                        let end_pos = cmp::min(
558
1.98k
                            start_pos.saturating_add(
559
1.98k
                                usize::try_from(remaining_bytes_to_send).unwrap_or(usize::MAX),
560
                            ),
561
1.98k
                            uncompressed_chunk_sz,
562
                        );
563
1.98k
                        if end_pos != start_pos {
564
                            // Make sure we don't send an EOF by accident.
565
1.87k
                            writer
566
1.87k
                                .send(uncompressed_data.freeze().slice(start_pos..end_pos))
567
1.87k
                                .await
568
1.87k
                                .err_tip(|| "Failed sending chunk in compression store")
?0
;
569
112
                        }
570
1.98k
                        remaining_bytes_to_send -= (end_pos - start_pos) as u64;
571
1.82k
                    }
572
3.81k
                    uncompressed_data_sz = new_uncompressed_data_sz;
573
                }
574
3.81k
                chunks_count += 1;
575
576
3.81k
                let mut chunk = rx
577
3.81k
                    .consume(Some(1 + 4))
578
3.81k
                    .await
579
3.81k
                    .err_tip(|| "Failed to read frame info in compression store")
?0
;
580
0
                error_if!(
581
3.81k
                    chunk.len() < 1 + 4,
582
                    "Received EOF too early while reading frame info in compression store"
583
                );
584
585
3.81k
                frame_type = chunk.get_u8();
586
3.81k
                frame_sz = chunk.get_u32_le();
587
            }
588
            // Index count will always be +1 (unless it is zero bytes long).
589
1.24k
            chunks_count = chunks_count.saturating_sub(1);
590
            {
591
                // Read and validate footer.
592
1.24k
                let chunk = rx
593
1.24k
                    .consume(Some(frame_sz as usize))
594
1.24k
                    .await
595
1.24k
                    .err_tip(|| "Failed to read chunk in get_part compression store")
?0
;
596
0
                error_if!(
597
1.24k
                    chunk.len() < frame_sz as usize,
598
                    "Unexpected EOF when reading footer in compression store get_part"
599
                );
600
601
1.24k
                let footer = wincode::config::deserialize::<Footer, WincodeConfig>(
602
1.24k
                    &chunk,
603
1.24k
                    self.wincode_config,
604
                )
605
1.24k
                .map_err(|e| 
make_err!0
(
Code::Internal0
, "Failed to deserialize footer : {:?}", e))
?0
;
606
607
0
                error_if!(
608
1.24k
                    header.version != footer.version,
609
                    "Expected header and footer versions to match compression store get_part, {} != {}",
610
                    header.version,
611
                    footer.version
612
                );
613
0
                error_if!(
614
1.24k
                    footer.indexes.len() != footer.index_count as usize,
615
                    "Expected index counts to match in compression store footer in get_part, {} != {}",
616
0
                    footer.indexes.len(),
617
                    footer.index_count
618
                );
619
0
                error_if!(
620
1.24k
                    footer.index_count != chunks_count,
621
                    concat!(
622
                        "Expected index counts to match received chunks count ",
623
                        "in compression store footer in get_part, {} != {}"
624
                    ),
625
                    footer.index_count,
626
                    chunks_count
627
                );
628
0
                error_if!(
629
1.24k
                    footer.uncompressed_data_size != uncompressed_data_sz,
630
                    "Expected uncompressed data sizes to match in compression store footer in get_part, {} != {}",
631
                    footer.uncompressed_data_size,
632
                    uncompressed_data_sz
633
                );
634
            }
635
636
1.24k
            writer
637
1.24k
                .send_eof()
638
1.24k
                .err_tip(|| "Failed to send eof in compression store write")
?0
;
639
1.24k
            Ok(())
640
1.24k
        };
641
642
        let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut);
643
        if let Err(mut e) = read_result {
644
            // We may need to propagate the error from reading the data through first.
645
            if let Err(err) = get_part_fut_result {
646
                e = err.merge(e);
647
            }
648
            return Err(e);
649
        }
650
        Ok(())
651
1.24k
    }
652
653
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
654
0
        self
655
0
    }
656
657
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) {
658
0
        self
659
0
    }
660
661
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
662
0
        self
663
0
    }
664
665
0
    fn register_remove_callback(
666
0
        self: Arc<Self>,
667
0
        callback: Arc<dyn RemoveItemCallback>,
668
0
    ) -> Result<(), Error> {
669
0
        self.inner_store.register_remove_callback(callback)
670
0
    }
671
}
672
673
default_health_status_indicator!(CompressionStore);