Skip to content

Commit 58ad994

Browse files
committed
Fix the CSV chunk exporter
1 parent 116f548 commit 58ad994

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

src/csv.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@ use std::fs::File;
22
use std::mem;
33
use std::path::PathBuf;
44

5-
use csv::ByteRecord;
5+
use csv::{ByteRecord, WriterBuilder};
66

77
pub struct CsvChunker {
88
pub(crate) reader: csv::Reader<File>,
99
pub(crate) headers: ByteRecord,
10-
pub(crate) buffer: Vec<u8>,
10+
pub(crate) writer: csv::Writer<Vec<u8>>,
1111
pub(crate) record: ByteRecord,
1212
pub(crate) size: usize,
13+
pub(crate) delimiter: u8,
1314
}
1415

1516
impl CsvChunker {
16-
pub fn new(file: PathBuf, size: usize) -> Self {
17+
pub fn new(file: PathBuf, size: usize, delimiter: u8) -> Self {
1718
let mut reader = csv::Reader::from_path(file).unwrap();
18-
let mut buffer = Vec::new();
19+
let mut writer = WriterBuilder::new().delimiter(delimiter).from_writer(Vec::new());
1920
let headers = reader.byte_headers().unwrap().clone();
20-
buffer.extend_from_slice(headers.as_slice());
21-
buffer.push(b'\n');
22-
Self { reader, headers, buffer, record: ByteRecord::new(), size }
21+
writer.write_byte_record(&headers).unwrap();
22+
Self { reader, headers, writer, record: ByteRecord::new(), size, delimiter }
2323
}
2424
}
2525

@@ -28,30 +28,33 @@ impl Iterator for CsvChunker {
2828

2929
fn next(&mut self) -> Option<Self::Item> {
3030
while self.reader.read_byte_record(&mut self.record).unwrap() {
31-
if self.buffer.len() + self.record.len() >= self.size {
32-
let buffer = mem::take(&mut self.buffer);
31+
if self.writer.get_ref().len() + self.record.len() >= self.size {
32+
let mut writer =
33+
WriterBuilder::new().delimiter(self.delimiter).from_writer(Vec::new());
34+
writer.write_byte_record(&self.headers).unwrap();
35+
let writer = mem::replace(&mut self.writer, writer);
3336

3437
// Insert the header and out of bound record
35-
self.buffer.extend_from_slice(self.headers.as_slice());
36-
self.buffer.push(b'\n');
37-
self.buffer.extend_from_slice(self.record.as_slice());
38-
self.buffer.push(b'\n');
38+
self.writer.write_byte_record(&self.headers).unwrap();
39+
self.writer.write_byte_record(&self.record).unwrap();
3940

40-
return Some(buffer);
41+
return Some(writer.into_inner().unwrap());
4142
} else {
4243
// Insert only the record
43-
self.buffer.extend_from_slice(self.record.as_slice());
44-
self.buffer.push(b'\n');
44+
self.writer.write_byte_record(&self.record).unwrap();
4545
}
4646
}
4747
// If there only less than or the headers in the buffer and a
4848
// newline character it means that there are no documents in it.
49-
if self.buffer.len() <= self.headers.len() + 1 {
49+
if self.writer.get_ref().len() <= self.headers.len() + 1 {
5050
None
5151
} else {
52+
let mut writer = WriterBuilder::new().delimiter(self.delimiter).from_writer(Vec::new());
53+
writer.write_byte_record(&self.headers).unwrap();
5254
// We make the buffer empty by doing that and next time we will
5355
// come back to this _if else_ condition to then return None.
54-
Some(mem::take(&mut self.buffer))
56+
let writer = mem::replace(&mut self.writer, writer);
57+
Some(writer.into_inner().unwrap())
5558
}
5659
}
5760
}

src/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ struct Opt {
4343
#[structopt(long)]
4444
api_key: Option<String>,
4545

46+
/// The delimiter to use for the CSV files.
47+
#[structopt(long, default_value_t = b',')]
48+
csv_delimiter: u8,
49+
4650
/// A list of file paths that are streamed and sent to Meilisearch in batches.
4751
#[structopt(long, num_args(1..))]
4852
files: Vec<PathBuf>,
@@ -161,7 +165,7 @@ fn main() -> anyhow::Result<()> {
161165
}
162166
}
163167
Mime::Csv => {
164-
for chunk in csv::CsvChunker::new(file, size) {
168+
for chunk in csv::CsvChunker::new(file, size, opt.csv_delimiter) {
165169
if opt.skip_batches.zip(pb.length()).map_or(true, |(s, l)| s > l) {
166170
send_data(&opt, &agent, opt.upload_operation, &pb, &mime, &chunk)?;
167171
}

0 commit comments

Comments
 (0)