diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index 60a59f8ee..5d7ba961a 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -521,8 +521,9 @@ impl StoreDriver for CompressionStore { uncompressed_data_sz + uncompressed_chunk_sz as u64; if new_uncompressed_data_sz >= offset && remaining_bytes_to_send > 0 { let start_pos = offset.saturating_sub(uncompressed_data_sz) as usize; + // Use saturating_add to prevent overflow when remaining_bytes_to_send is very large let end_pos = cmp::min( - start_pos + remaining_bytes_to_send as usize, + start_pos.saturating_add(remaining_bytes_to_send as usize), uncompressed_chunk_sz, ); if end_pos != start_pos { diff --git a/nativelink-store/tests/compression_store_test.rs b/nativelink-store/tests/compression_store_test.rs index c396b1897..f289df344 100644 --- a/nativelink-store/tests/compression_store_test.rs +++ b/nativelink-store/tests/compression_store_test.rs @@ -510,3 +510,134 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { Ok(()) } + +// Regression test for the bug where start_pos > end_pos in the slice operation +#[nativelink_test] +async fn regression_test_range_start_not_greater_than_end() -> Result<(), Error> { + // Create a store with a small block size to trigger multiple blocks + const BLOCK_SIZE: u32 = 64 * 1024; // 64KB, same as DEFAULT_BLOCK_SIZE + + let inner_store = MemoryStore::new(&MemorySpec::default()); + let store_owned = CompressionStore::new( + &CompressionSpec { + backend: StoreSpec::Memory(MemorySpec::default()), + compression_algorithm: nativelink_config::stores::CompressionAlgorithm::Lz4( + nativelink_config::stores::Lz4Config { + block_size: BLOCK_SIZE, + ..Default::default() + }, + ), + }, + Store::new(inner_store.clone()), + ) + .err_tip(|| "Failed to create compression store")?; + let store = Pin::new(&store_owned); + + // Create a large buffer that spans multiple blocks + let data_size = BLOCK_SIZE as usize * 3; // 3 blocks + let mut data = vec![0u8; data_size]; + let mut rng = SmallRng::seed_from_u64(42); + rng.fill(&mut data[..]); + + let digest = DigestInfo::try_new(VALID_HASH, data_size).unwrap(); + store.update_oneshot(digest, data.clone().into()).await?; + + // Try to read exactly at block boundaries with various offsets + let boundary = u64::from(BLOCK_SIZE); + + // These specific offsets test the case in the bug report where + // start_pos was 65536 and end_pos was 65535 + for (offset, length) in &[ + (boundary - 1, Some(2u64)), // Read across block boundary + (boundary, Some(1u64)), // Read exactly at block boundary + (boundary + 1, Some(10u64)), // Read just after block boundary + // Specifically test the case where offset >= block size + (u64::from(BLOCK_SIZE), Some(20u64)), + // Specifically test the case that caused the bug (65536 and 65535) + (u64::from(BLOCK_SIZE), Some(u64::from(BLOCK_SIZE) - 1)), + // More edge cases around the block boundary to thoroughly test the issue + (u64::from(BLOCK_SIZE) - 1, Some(1u64)), // Just before boundary + (u64::from(BLOCK_SIZE), Some(0u64)), // Zero length at boundary + (u64::from(BLOCK_SIZE), Some(u64::MAX)), // Unlimited length at boundary + (u64::from(BLOCK_SIZE) * 2, Some(u64::from(BLOCK_SIZE) - 1)), // Same issue at next block + ] { + // First test with get_part_unchunked + let result = store.get_part_unchunked(digest, *offset, *length).await; + + // The bug was causing a panic, so just checking that it doesn't panic + // means the fix is working + assert!( + result.is_ok(), + "Reading with get_part_unchunked at offset {offset} with length {length:?} should not fail" + ); + + let store_data = result.unwrap(); + + // Verify the data matches what we expect + let expected_len = cmp::min( + length.unwrap_or(u64::MAX) as usize, + data.len().saturating_sub(*offset as usize), + ); + assert_eq!( + store_data.len(), + expected_len, + "Expected data length to match when reading at offset {} with length {:?}", + offset, + length + ); + + if expected_len > 0 { + let start = *offset as usize; + let end = start + expected_len; + assert_eq!( + &store_data[..], + &data[start..end], + "Expected data content to match when reading at offset {} with length {:?}", + offset, + length + ); + } + + // Now also test with the lower-level get_part method to ensure it doesn't panic + // This is closer to what the bytestream server would call + let (mut tx, mut rx) = make_buf_channel_pair(); + + // The error was happening in this method call + let get_part_result = store.get_part(digest, &mut tx, *offset, *length).await; + assert!( + get_part_result.is_ok(), + "Reading with get_part at offset {offset} with length {length:?} should not fail" + ); + + // Just to consume the stream and ensure it behaves as expected + let mut received_data = Vec::new(); + while let Ok(chunk) = rx.consume(Some(1024)).await { + if chunk.is_empty() { + break; + } + received_data.extend_from_slice(&chunk); + } + + assert_eq!( + received_data.len(), + expected_len, + "Expected get_part received data length to match when reading at offset {} with length {:?}", + offset, + length + ); + + if expected_len > 0 { + let start = *offset as usize; + let end = start + expected_len; + assert_eq!( + &received_data[..], + &data[start..end], + "Expected get_part data content to match when reading at offset {} with length {:?}", + offset, + length + ); + } + } + + Ok(()) +}