From c1a28ecdeedc922cc65f1e5667e7b099ee55b0c2 Mon Sep 17 00:00:00 2001 From: qdequele Date: Thu, 5 Sep 2024 16:13:14 +0200 Subject: [PATCH 1/3] add csv delimiter --- src/csv.rs | 20 +++++++++++++++++--- src/main.rs | 15 ++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/csv.rs b/src/csv.rs index 9ab6179..e94c886 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -3,6 +3,7 @@ use std::mem; use std::path::PathBuf; use csv::ByteRecord; +use csv::ReaderBuilder; pub struct CsvChunker { pub(crate) reader: csv::Reader, @@ -10,16 +11,29 @@ pub struct CsvChunker { pub(crate) buffer: Vec, pub(crate) record: ByteRecord, pub(crate) size: usize, + pub(crate) delimiter: Option, } impl CsvChunker { - pub fn new(file: PathBuf, size: usize) -> Self { - let mut reader = csv::Reader::from_path(file).unwrap(); + pub fn new(file: PathBuf, size: usize, delimiter: Option) -> Self { + let mut reader_builder = ReaderBuilder::new(); + if let Some(delim) = delimiter { + reader_builder = reader_builder.delimiter(delim as u8); + } + let mut reader = reader_builder.from_path(file).unwrap(); + let mut buffer = Vec::new(); let headers = reader.byte_headers().unwrap().clone(); buffer.extend_from_slice(headers.as_slice()); buffer.push(b'\n'); - Self { reader, headers, buffer, record: ByteRecord::new(), size } + Self { + reader, + headers, + buffer, + record: ByteRecord::new(), + size, + delimiter: delimiter.map(|d| d as u8), + } } } diff --git a/src/main.rs b/src/main.rs index 04b6b49..daf2fb0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,8 @@ mod csv; mod mime; mod nd_json; +use csv::ReaderBuilder; + /// A tool to import massive datasets into Meilisearch by sending them in batches. #[derive(Debug, Parser, Clone)] #[command(name = "meilisearch-importer")] @@ -64,6 +66,10 @@ struct Opt { value_enum )] upload_operation: DocumentOperation, + + /// The CSV delimiter to use when reading CSV files. If not specified, the default comma (,) will be used. + #[structopt(long)] + csv_delimiter: Option, } #[derive(ValueEnum, Copy, Clone, Debug, PartialEq, Eq)] @@ -85,6 +91,13 @@ fn send_data( if let Some(primary_key) = &opt.primary_key { url = format!("{}?primaryKey={}", url, primary_key); } + + // Add CSV delimiter to the URL if the mime type is CSV and delimiter is specified + if mime == &Mime::Csv { + if let Some(delimiter) = opt.csv_delimiter { + url = format!("{}&csvDelimiter={}", url, delimiter); + } + } let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); encoder.write_all(data)?; @@ -161,7 +174,7 @@ fn main() -> anyhow::Result<()> { } } Mime::Csv => { - for chunk in csv::CsvChunker::new(file, size) { + for chunk in csv::CsvChunker::new(file, size, opt.csv_delimiter) { if opt.skip_batches.zip(pb.length()).map_or(true, |(s, l)| s > l) { send_data(&opt, &agent, opt.upload_operation, &pb, &mime, &chunk)?; } From f3c3de0bb48c8175f88aa7427bf311280d8fb9a3 Mon Sep 17 00:00:00 2001 From: qdequele Date: Thu, 5 Sep 2024 16:23:43 +0200 Subject: [PATCH 2/3] add csv delimiter --- README.md | 119 ++++++++++++++++++++++++++++++++++++++++++---------- src/csv.rs | 67 ++++++++++++++++------------- src/main.rs | 11 ++--- 3 files changed, 139 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index 25f39d4..963d968 100644 --- a/README.md +++ b/README.md @@ -1,47 +1,120 @@ # Meilisearch Importer -The most efficient CLI tool to import massive CSVs, NSJSON or JSON (array of objects) into Meilisearch. +The most efficient CLI tool to import massive CSVs, NDJSON, or JSON (array of objects) into Meilisearch. -This tool has been tested with multiple datasets from hundreds of thousand documents to some with more than forty millions documents. The progress bar is very handy in this case. +This tool has been tested with datasets ranging from hundreds of thousands to over forty million documents. The progress bar is particularly useful for monitoring large imports. ## Features - - Uploads millions of documents to Meilisearch. - - Automatically retries on error. - - Shows the upload progress along with the estimated time of arrival (ETA). - - [Works on the Cloud](https://www.meilisearch.com/pricing) and on self-hosted instances. +- Uploads millions of documents to Meilisearch +- Automatically retries on error with exponential backoff +- Shows upload progress with estimated time of arrival (ETA) +- Works with [Meilisearch Cloud](https://www.meilisearch.com/cloud) and self-hosted instances +- Supports CSV, NDJSON, and JSON file formats +- Configurable batch size for optimized imports +- Optional CSV delimiter specification +- Ability to skip batches for resuming interrupted imports +- Support for both "add or replace" and "add or update" operations ## Installation -You can download the latest version of this tool [on the release page](https://github.com/meilisearch/meilisearch-importer/releases). +Download the latest version of this tool from the [releases page](https://github.com/meilisearch/meilisearch-importer/releases). -## Example Usage +## Command-line Options -### Send Documents to the Cloud +- `--url`: Meilisearch instance URL (required) +- `--index`: Index name to send documents to (required) +- `--files`: List of file paths to import (required, supports multiple files) +- `--primary-key`: Field to use as the primary key +- `--api-key`: API key for authentication +- `--batch-size`: Size of document batches (default: 20 MiB) +- `--csv-delimiter`: Custom delimiter for CSV files +- `--skip-batches`: Number of batches to skip (for resuming imports) +- `--upload-operation`: Choose between `add-or-replace` (default) and `add-or-update` -It's straightforward to [create a project on the Cloud](https://www.meilisearch.com/pricing) and send your documents into it. -If you cannot send your dataset directly from the website by drag-and-dropping it, this tool is perfect for you. You can send them by running the following command: +## Usage Examples + +### Import to Meilisearch Cloud + +```bash +meilisearch-importer \ +--url 'https://ms-************.sfo.meilisearch.io' \ +--index products \ +--primary-key uuid \ +--api-key 'D2jkS***************' \ +--files products.csv +``` + +### Import Large CSV File with Custom Delimiter ```bash meilisearch-importer \ - --url 'https://ms-************.sfo.meilisearch.io' - --index crunchbase \ - --primary-key uuid \ - --api-key 'D2jkS***************' \ - --files ./dataset/organizations.csv +--url 'https://ms-************.sfo.meilisearch.io' \ +--api-key 'D2jkS***************' \ +--index products \ +--primary-key uuid \ +--files large_product_list.csv \ +--csv-delimiter ';' \ +--batch-size 50MB ``` -### Send Documents to a Local Instance +### Import Multiple Files -This tool is also useful when you want to test Meilisearch locally. The only mandatory parameters to define are the URL, the index name and your dataset. +```bash +meilisearch-importer \ +--url 'https://ms-************.sfo.meilisearch.io' \ +--api-key 'D2jkS***************' \ +--index library \ +--files books.json authors.json publishers.ndjson +``` + +### Use Add or Update Operation + +```bash +meilisearch-importer \ +--url 'https://ms-************.sfo.meilisearch.io' \ +--api-key 'D2jkS***************' \ +--index users \ +--files users_update.json \ +--upload-operation add-or-update +``` -However, you can also increase the batch size to make meilisearch index faster. +### Resume Interrupted Import ```bash meilisearch-importer \ - --url 'http://localhost:7700' - --index movies \ - --files movies.json \ - --batch-size 100MB +--url 'https://ms-************.sfo.meilisearch.io' \ +--api-key 'D2jkS***************' \ +--index large_dataset \ +--files huge_file.ndjson \ +--skip-batches 100 ``` + +## Error Handling and Retries + +The importer automatically retries on errors using an exponential backoff strategy: +- Starts with a 100ms delay +- Increases delay up to a maximum of 1 hour +- Makes up to 20 retry attempts before giving up + +## Supported File Formats + +- JSON: Must contain an array of objects +- NDJSON: Each line should be a valid JSON object +- CSV: Can specify custom delimiters with `--csv-delimiter` + +## Troubleshooting + +- "Too many errors": Check your network connection and Meilisearch instance status. +- "File does not exist": Verify file paths and permissions. +- "Failed to read CSV headers": Ensure your CSV file is properly formatted and uses the correct delimiter. +- If uploads are slow, try increasing the `--batch-size` or check your network speed. + +## Contributing + +We welcome contributions to the Meilisearch Importer! Please check out our [Contributing Guide](CONTRIBUTING.md) for more information on how to get started. + +## License + +Meilisearch Importer is released under the MIT License. See the [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/src/csv.rs b/src/csv.rs index e94c886..28b8a27 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -2,8 +2,8 @@ use std::fs::File; use std::mem; use std::path::PathBuf; -use csv::ByteRecord; -use csv::ReaderBuilder; +use anyhow::{Context, Result}; +use csv::{ByteRecord, ReaderBuilder}; pub struct CsvChunker { pub(crate) reader: csv::Reader, @@ -15,57 +15,64 @@ pub struct CsvChunker { } impl CsvChunker { - pub fn new(file: PathBuf, size: usize, delimiter: Option) -> Self { + pub fn new(file: PathBuf, size: usize, delimiter: Option) -> Result { let mut reader_builder = ReaderBuilder::new(); if let Some(delim) = delimiter { reader_builder = reader_builder.delimiter(delim as u8); } - let mut reader = reader_builder.from_path(file).unwrap(); + let mut reader = reader_builder.from_path(&file) + .with_context(|| format!("Failed to create CSV reader for file {:?}", file))?; let mut buffer = Vec::new(); - let headers = reader.byte_headers().unwrap().clone(); + let headers = reader.byte_headers() + .with_context(|| "Failed to read CSV headers")? + .clone(); buffer.extend_from_slice(headers.as_slice()); buffer.push(b'\n'); - Self { + Ok(Self { reader, headers, buffer, record: ByteRecord::new(), size, delimiter: delimiter.map(|d| d as u8), - } + }) } } impl Iterator for CsvChunker { - type Item = Vec; + type Item = Result>; fn next(&mut self) -> Option { - while self.reader.read_byte_record(&mut self.record).unwrap() { - if self.buffer.len() + self.record.len() >= self.size { - let buffer = mem::take(&mut self.buffer); + loop { + match self.reader.read_byte_record(&mut self.record) { + Ok(true) => { + if self.buffer.len() + self.record.len() >= self.size { + let buffer = mem::take(&mut self.buffer); - // Insert the header and out of bound record - self.buffer.extend_from_slice(self.headers.as_slice()); - self.buffer.push(b'\n'); - self.buffer.extend_from_slice(self.record.as_slice()); - self.buffer.push(b'\n'); + // Insert the header and out of bound record + self.buffer.extend_from_slice(self.headers.as_slice()); + self.buffer.push(b'\n'); + self.buffer.extend_from_slice(self.record.as_slice()); + self.buffer.push(b'\n'); - return Some(buffer); - } else { - // Insert only the record - self.buffer.extend_from_slice(self.record.as_slice()); - self.buffer.push(b'\n'); + return Some(Ok(buffer)); + } else { + // Insert only the record + self.buffer.extend_from_slice(self.record.as_slice()); + self.buffer.push(b'\n'); + } + }, + Ok(false) => { + // End of file reached + if self.buffer.len() <= self.headers.len() + 1 { + return None; + } else { + return Some(Ok(mem::take(&mut self.buffer))); + } + }, + Err(e) => return Some(Err(e.into())), } } - // If there only less than or the headers in the buffer and a - // newline character it means that there are no documents in it. - if self.buffer.len() <= self.headers.len() + 1 { - None - } else { - // We make the buffer empty by doing that and next time we will - // come back to this _if else_ condition to then return None. - Some(mem::take(&mut self.buffer)) - } } } diff --git a/src/main.rs b/src/main.rs index daf2fb0..d81feaa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,14 +138,12 @@ fn send_data( anyhow::bail!("Too many errors. Stopping the retries.") } -fn main() -> anyhow::Result<()> { +fn main() -> Result<()> { let opt = Opt::parse(); let agent = AgentBuilder::new().timeout(Duration::from_secs(30)).build(); let files = opt.files.clone(); - // for each files present in the argument for file in files { - // check if the file exists if !file.exists() { anyhow::bail!("The file {:?} does not exist", file); } @@ -166,7 +164,8 @@ fn main() -> anyhow::Result<()> { pb.inc(1); } Mime::NdJson => { - for chunk in nd_json::NdJsonChunker::new(file, size) { + for chunk in nd_json::NdJsonChunker::new(file, size)? { + let chunk = chunk?; if opt.skip_batches.zip(pb.length()).map_or(true, |(s, l)| s > l) { send_data(&opt, &agent, opt.upload_operation, &pb, &mime, &chunk)?; } @@ -174,7 +173,9 @@ fn main() -> anyhow::Result<()> { } } Mime::Csv => { - for chunk in csv::CsvChunker::new(file, size, opt.csv_delimiter) { + let chunker = csv::CsvChunker::new(file, size, opt.csv_delimiter)?; + for chunk_result in chunker { + let chunk = chunk_result?; if opt.skip_batches.zip(pb.length()).map_or(true, |(s, l)| s > l) { send_data(&opt, &agent, opt.upload_operation, &pb, &mime, &chunk)?; } From 1e2ef2c729ed7a0be34938a2271f60c520706b27 Mon Sep 17 00:00:00 2001 From: qdequele Date: Thu, 5 Sep 2024 16:48:20 +0200 Subject: [PATCH 3/3] Cleanup code --- Cargo.lock | 63 +++++++++++++++++++++++++------------------------- Cargo.toml | 17 +++++++++----- src/csv.rs | 4 +--- src/main.rs | 10 ++++---- src/mime.rs | 1 + src/nd_json.rs | 30 ++++++++++++++++-------- 6 files changed, 69 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ee9d04..b5a107a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,10 +3,10 @@ version = 3 [[package]] -name = "adler" -version = "1.0.2" +name = "adler2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" [[package]] name = "ahash" @@ -35,9 +35,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.6" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" @@ -69,9 +69,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.81" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "arrayvec" @@ -87,9 +87,9 @@ checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] name = "base64" -version = "0.21.7" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitvec" @@ -186,9 +186,9 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" [[package]] name = "clap" -version = "4.5.4" +version = "4.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" dependencies = [ "clap_builder", "clap_derive", @@ -196,9 +196,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.2" +version = "4.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" dependencies = [ "anstream", "anstyle", @@ -208,9 +208,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.4" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck", "proc-macro2", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.28" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", "miniz_oxide", @@ -440,11 +440,11 @@ checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "miniz_oxide" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" dependencies = [ - "adler", + "adler2", ] [[package]] @@ -663,11 +663,12 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "log", + "once_cell", "ring", "rustls-pki-types", "rustls-webpki", @@ -677,15 +678,15 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "868e20fada228fefaf6b652e00cc73623d54f8171e7352c18bb281571f2d92da" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.2" +version = "0.102.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" dependencies = [ "ring", "rustls-pki-types", @@ -726,12 +727,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.115" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "indexmap", "itoa", + "memchr", "ryu", "serde", ] @@ -867,9 +869,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" -version = "2.9.6" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f214ce18d8b2cbe84ed3aa6486ed3f5b285cf8d8fbdbce9f3f767a724adc35" +checksum = "b74fc6b57825be3373f7054754755f03ac3a8f5d70015ccad699ba2029956f4a" dependencies = [ "base64", "flate2", @@ -877,7 +879,6 @@ dependencies = [ "once_cell", "rustls", "rustls-pki-types", - "rustls-webpki", "url", "webpki-roots", ] diff --git a/Cargo.toml b/Cargo.toml index d7cdc2b..d694ffa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,15 @@ license = "MIT" edition = "2021" [dependencies] -anyhow = "1.0.81" +anyhow = "1.0.86" byte-unit = { version = "5.1.4", features = ["byte", "serde"] } -clap = { version = "4.5.3", features = ["derive"] } +clap = { version = "4.5.17", features = ["derive"] } csv = "1.3.0" exponential-backoff = "1.2.0" -flate2 = "1.0" +flate2 = "1.0.33" indicatif = "0.17.8" -serde_json = { version = "1.0.114", features = ["preserve_order"] } -ureq = "2.9.6" +serde_json = { version = "1.0.128", features = ["preserve_order"] } +ureq = "2.10.1" # The profile that 'cargo dist' will build with [profile.dist] @@ -29,7 +29,12 @@ cargo-dist-version = "0.11.1" # The installers to generate for each app installers = [] # Target platforms to build apps for (Rust target-triple syntax) -targets = ["aarch64-apple-darwin", "x86_64-apple-darwin", "x86_64-unknown-linux-gnu", "x86_64-pc-windows-msvc"] +targets = [ + "aarch64-apple-darwin", + "x86_64-apple-darwin", + "x86_64-unknown-linux-gnu", + "x86_64-pc-windows-msvc", +] # CI backends to support ci = ["github"] # Publish jobs to run in CI diff --git a/src/csv.rs b/src/csv.rs index 28b8a27..293d7ea 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -11,14 +11,13 @@ pub struct CsvChunker { pub(crate) buffer: Vec, pub(crate) record: ByteRecord, pub(crate) size: usize, - pub(crate) delimiter: Option, } impl CsvChunker { pub fn new(file: PathBuf, size: usize, delimiter: Option) -> Result { let mut reader_builder = ReaderBuilder::new(); if let Some(delim) = delimiter { - reader_builder = reader_builder.delimiter(delim as u8); + reader_builder.delimiter(delim as u8); } let mut reader = reader_builder.from_path(&file) .with_context(|| format!("Failed to create CSV reader for file {:?}", file))?; @@ -35,7 +34,6 @@ impl CsvChunker { buffer, record: ByteRecord::new(), size, - delimiter: delimiter.map(|d| d as u8), }) } } diff --git a/src/main.rs b/src/main.rs index d81feaa..c41c5ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,8 +18,6 @@ mod csv; mod mime; mod nd_json; -use csv::ReaderBuilder; - /// A tool to import massive datasets into Meilisearch by sending them in batches. #[derive(Debug, Parser, Clone)] #[command(name = "meilisearch-importer")] @@ -93,7 +91,7 @@ fn send_data( } // Add CSV delimiter to the URL if the mime type is CSV and delimiter is specified - if mime == &Mime::Csv { + if *mime == Mime::Csv { if let Some(delimiter) = opt.csv_delimiter { url = format!("{}&csvDelimiter={}", url, delimiter); } @@ -138,7 +136,7 @@ fn send_data( anyhow::bail!("Too many errors. Stopping the retries.") } -fn main() -> Result<()> { +fn main() -> anyhow::Result<()> { let opt = Opt::parse(); let agent = AgentBuilder::new().timeout(Duration::from_secs(30)).build(); let files = opt.files.clone(); @@ -164,8 +162,8 @@ fn main() -> Result<()> { pb.inc(1); } Mime::NdJson => { - for chunk in nd_json::NdJsonChunker::new(file, size)? { - let chunk = chunk?; + for chunk_result in nd_json::NdJsonChunker::new(file, size)? { + let chunk = chunk_result?; if opt.skip_batches.zip(pb.length()).map_or(true, |(s, l)| s > l) { send_data(&opt, &agent, opt.upload_operation, &pb, &mime, &chunk)?; } diff --git a/src/mime.rs b/src/mime.rs index 831abd7..b4a1992 100644 --- a/src/mime.rs +++ b/src/mime.rs @@ -1,5 +1,6 @@ use std::path::Path; +#[derive(PartialEq, Eq)] pub enum Mime { Json, NdJson, diff --git a/src/nd_json.rs b/src/nd_json.rs index 259c217..e6148a1 100644 --- a/src/nd_json.rs +++ b/src/nd_json.rs @@ -6,6 +6,7 @@ use serde_json::de::IoRead; use serde_json::{to_writer, Deserializer, Map, StreamDeserializer, Value}; use crate::byte_count::ByteCount; +use anyhow::{Result}; pub struct NdJsonChunker { pub reader: StreamDeserializer<'static, IoRead>, Map>, @@ -14,37 +15,46 @@ pub struct NdJsonChunker { } impl NdJsonChunker { - pub fn new(file: PathBuf, size: usize) -> Self { - let reader = io::BufReader::new(File::open(file).unwrap()); - Self { reader: Deserializer::from_reader(reader).into_iter(), buffer: Vec::new(), size } + pub fn new(file: PathBuf, size: usize) -> Result { + let reader = io::BufReader::new(File::open(file)?); + Ok(Self { reader: Deserializer::from_reader(reader).into_iter(), buffer: Vec::new(), size }) } } impl Iterator for NdJsonChunker { - type Item = Vec; + type Item = Result>; fn next(&mut self) -> Option { for result in self.reader.by_ref() { - let object = result.unwrap(); + let object = match result { + Ok(obj) => obj, + Err(e) => return Some(Err(e.into())), + }; // Evaluate the size it will take if we serialize it in the buffer let mut counter = ByteCount::new(); - to_writer(&mut counter, &object).unwrap(); + if let Err(e) = to_writer(&mut counter, &object) { + return Some(Err(e.into())); + } if self.buffer.len() + counter.count() >= self.size { let buffer = mem::take(&mut self.buffer); // Insert the record but after we sent the buffer - to_writer(&mut self.buffer, &object).unwrap(); - return Some(buffer); + if let Err(e) = to_writer(&mut self.buffer, &object) { + return Some(Err(e.into())); + } + return Some(Ok(buffer)); } else { // Insert the record - to_writer(&mut self.buffer, &object).unwrap(); + if let Err(e) = to_writer(&mut self.buffer, &object) { + return Some(Err(e.into())); + } } } if self.buffer.is_empty() { None } else { - Some(mem::take(&mut self.buffer)) + Some(Ok(mem::take(&mut self.buffer))) } } }