Skip to content

Commit a588fa7

Browse files
bors[bot]Razaloc
andauthored
Merge #41
41: Download compressed gz image from remote source r=obbardc a=Razaloc Accept url arguments for remote image download and copy. Implements async support for use of reqwest. Bmap file is searched as in local option in the current file with same name and the extension ".bmap" Closes: #9 Closes: #46 Closes: #8 Signed-off-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com> Co-authored-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com>
2 parents 9e31765 + 5a6e346 commit a588fa7

File tree

5 files changed

+265
-33
lines changed

5 files changed

+265
-33
lines changed

bmap-rs/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,10 @@ bmap = { path = "../bmap" }
1111
anyhow = "1.0.66"
1212
nix = "0.26.1"
1313
flate2 = "1.0.24"
14-
clap = { version = "4.0.18", features = ["derive"] }
15-
indicatif = "0.17.1"
14+
clap = { version = "4.0.18", features = ["cargo"] }
15+
indicatif = { version = "0.17.1", features = ["tokio"] }
16+
async-compression = { version = "0.3.15", features = ["gzip", "futures-io"] }
17+
tokio = { version = "1.21.2", features = ["rt", "macros", "fs", "rt-multi-thread"] }
18+
reqwest = { version = "0.11.12", features = ["stream"] }
19+
tokio-util = { version = "0.7.4", features = ["compat"] }
20+
futures = "0.3.25"

bmap-rs/src/main.rs

Lines changed: 143 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,75 @@
1-
use anyhow::{anyhow, bail, Context, Result};
2-
use bmap::{Bmap, Discarder, SeekForward};
3-
use clap::Parser;
1+
use anyhow::{anyhow, bail, ensure, Context, Result};
2+
use async_compression::futures::bufread::GzipDecoder;
3+
use bmap::{AsyncDiscarder, Bmap, Discarder, SeekForward};
4+
use clap::{arg, command, Command};
45
use flate2::read::GzDecoder;
6+
use futures::TryStreamExt;
57
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
68
use nix::unistd::ftruncate;
9+
use reqwest::{Response, Url};
710
use std::ffi::OsStr;
811
use std::fmt::Write;
912
use std::fs::File;
1013
use std::io::Read;
1114
use std::os::unix::io::AsRawFd;
1215
use std::path::{Path, PathBuf};
16+
use tokio_util::compat::TokioAsyncReadCompatExt;
1317

14-
#[derive(Parser, Debug)]
18+
#[derive(Debug)]
19+
enum Image {
20+
Path(PathBuf),
21+
Url(Url),
22+
}
23+
24+
#[derive(Debug)]
1525
struct Copy {
16-
image: PathBuf,
26+
image: Image,
1727
dest: PathBuf,
1828
}
1929

20-
#[derive(Parser, Debug)]
30+
#[derive(Debug)]
2131

22-
enum Command {
32+
enum Subcommand {
2333
Copy(Copy),
2434
}
2535

26-
#[derive(Parser, Debug)]
36+
#[derive(Debug)]
2737
struct Opts {
28-
#[command(subcommand)]
29-
command: Command,
38+
command: Subcommand,
39+
}
40+
41+
impl Opts {
42+
fn parser() -> Opts {
43+
let matches = command!()
44+
.propagate_version(true)
45+
.subcommand_required(true)
46+
.arg_required_else_help(true)
47+
.subcommand(
48+
Command::new("copy")
49+
.about("Copy image to block device or file")
50+
.arg(arg!([IMAGE]).required(true))
51+
.arg(arg!([DESTINATION]).required(true)),
52+
)
53+
.get_matches();
54+
match matches.subcommand() {
55+
Some(("copy", sub_matches)) => Opts {
56+
command: Subcommand::Copy({
57+
Copy {
58+
image: match Url::parse(sub_matches.get_one::<String>("IMAGE").unwrap()) {
59+
Ok(url) => Image::Url(url),
60+
Err(_) => Image::Path(PathBuf::from(
61+
sub_matches.get_one::<String>("IMAGE").unwrap(),
62+
)),
63+
},
64+
dest: PathBuf::from(sub_matches.get_one::<String>("DESTINATION").unwrap()),
65+
}
66+
}),
67+
},
68+
_ => unreachable!(
69+
"Exhausted list of subcommands and subcommand_required prevents `None`"
70+
),
71+
}
72+
}
3073
}
3174

3275
fn append(path: PathBuf) -> PathBuf {
@@ -51,6 +94,13 @@ fn find_bmap(img: &Path) -> Option<PathBuf> {
5194
}
5295
}
5396

97+
fn find_remote_bmap(mut url: Url) -> Result<Url> {
98+
let mut path = PathBuf::from(url.path());
99+
path.set_extension("bmap");
100+
url.set_path(path.to_str().unwrap());
101+
Ok(url)
102+
}
103+
54104
trait ReadSeekForward: SeekForward + Read {}
55105
impl<T: Read + SeekForward> ReadSeekForward for T {}
56106

@@ -78,7 +128,7 @@ impl SeekForward for Decoder {
78128
}
79129
}
80130

81-
fn setup_input(path: &Path) -> Result<Decoder> {
131+
fn setup_local_input(path: &Path) -> Result<Decoder> {
82132
let f = File::open(path)?;
83133
match path.extension().and_then(OsStr::to_str) {
84134
Some("gz") => {
@@ -89,12 +139,44 @@ fn setup_input(path: &Path) -> Result<Decoder> {
89139
}
90140
}
91141

92-
fn copy(c: Copy) -> Result<()> {
93-
if !c.image.exists() {
94-
bail!("Image file doesn't exist")
142+
async fn setup_remote_input(url: Url) -> Result<Response> {
143+
match PathBuf::from(url.path())
144+
.extension()
145+
.and_then(OsStr::to_str)
146+
{
147+
Some("gz") => reqwest::get(url).await.map_err(anyhow::Error::new),
148+
None => bail!("No file extension found"),
149+
_ => bail!("Image file format not implemented"),
95150
}
151+
}
96152

97-
let bmap = find_bmap(&c.image).ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
153+
fn setup_progress_bar(bmap: &Bmap) -> ProgressBar {
154+
let pb = ProgressBar::new(bmap.total_mapped_size());
155+
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
156+
.unwrap()
157+
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
158+
.progress_chars("#>-"));
159+
pb
160+
}
161+
162+
fn setup_output<T: AsRawFd>(output: &T, bmap: &Bmap, metadata: std::fs::Metadata) -> Result<()> {
163+
if metadata.is_file() {
164+
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
165+
.context("Failed to truncate file")?;
166+
}
167+
Ok(())
168+
}
169+
170+
async fn copy(c: Copy) -> Result<()> {
171+
match c.image {
172+
Image::Path(path) => copy_local_input(path, c.dest),
173+
Image::Url(url) => copy_remote_input(url, c.dest).await,
174+
}
175+
}
176+
177+
fn copy_local_input(source: PathBuf, destination: PathBuf) -> Result<()> {
178+
ensure!(source.exists(), "Image file doesn't exist");
179+
let bmap = find_bmap(&source).ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
98180
println!("Found bmap file: {}", bmap.display());
99181

100182
let mut b = File::open(&bmap).context("Failed to open bmap file")?;
@@ -105,32 +187,63 @@ fn copy(c: Copy) -> Result<()> {
105187
let output = std::fs::OpenOptions::new()
106188
.write(true)
107189
.create(true)
108-
.open(c.dest)?;
190+
.open(destination)?;
109191

110-
if output.metadata()?.is_file() {
111-
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
112-
.context("Failed to truncate file")?;
113-
}
192+
setup_output(&output, &bmap, output.metadata()?)?;
114193

115-
let mut input = setup_input(&c.image)?;
116-
let pb = ProgressBar::new(bmap.total_mapped_size());
117-
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
118-
.unwrap()
119-
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
120-
.progress_chars("#>-"));
194+
let mut input = setup_local_input(&source)?;
195+
let pb = setup_progress_bar(&bmap);
121196
bmap::copy(&mut input, &mut pb.wrap_write(&output), &bmap)?;
122197
pb.finish_and_clear();
123198

124199
println!("Done: Syncing...");
125-
output.sync_all().expect("Sync failure");
200+
output.sync_all()?;
201+
202+
Ok(())
203+
}
204+
205+
async fn copy_remote_input(source: Url, destination: PathBuf) -> Result<()> {
206+
let bmap_url = find_remote_bmap(source.clone())?;
207+
208+
let xml = reqwest::get(bmap_url.clone()).await?.text().await?;
209+
println!("Found bmap file: {}", bmap_url);
210+
211+
let bmap = Bmap::from_xml(&xml)?;
212+
let mut output = tokio::fs::OpenOptions::new()
213+
.write(true)
214+
.create(true)
215+
.open(destination)
216+
.await?;
217+
218+
setup_output(&output, &bmap, output.metadata().await?)?;
219+
220+
let res = setup_remote_input(source).await?;
221+
let stream = res
222+
.bytes_stream()
223+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
224+
.into_async_read();
225+
let reader = GzipDecoder::new(stream);
226+
let mut input = AsyncDiscarder::new(reader);
227+
let pb = setup_progress_bar(&bmap);
228+
bmap::copy_async(
229+
&mut input,
230+
&mut pb.wrap_async_write(&mut output).compat(),
231+
&bmap,
232+
)
233+
.await?;
234+
pb.finish_and_clear();
235+
236+
println!("Done: Syncing...");
237+
output.sync_all().await?;
126238

127239
Ok(())
128240
}
129241

130-
fn main() -> Result<()> {
131-
let opts = Opts::parse();
242+
#[tokio::main]
243+
async fn main() -> Result<()> {
244+
let opts = Opts::parser();
132245

133246
match opts.command {
134-
Command::Copy(c) => copy(c),
247+
Subcommand::Copy(c) => copy(c).await,
135248
}
136249
}

bmap/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@ sha2 = { version = "0.10.6", features = [ "asm" ] }
1515
strum = { version = "0.24.1", features = [ "derive"] }
1616
digest = "0.10.5"
1717
flate2 = "1.0.20"
18+
async-trait = "0.1.58"
19+
futures = "0.3.25"

bmap/src/discarder.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
use crate::SeekForward;
1+
use crate::{AsyncSeekForward, SeekForward};
2+
use async_trait::async_trait;
3+
use futures::io::{AsyncRead, AsyncReadExt};
24
use std::io::Read;
35
use std::io::Result as IOResult;
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
48

59
/// Adaptor that implements SeekForward on types only implementing Read by discarding data
610
pub struct Discarder<R: Read> {
@@ -36,6 +40,44 @@ impl<R: Read> SeekForward for Discarder<R> {
3640
}
3741
}
3842

43+
pub struct AsyncDiscarder<R: AsyncRead> {
44+
reader: R,
45+
}
46+
47+
impl<R: AsyncRead> AsyncDiscarder<R> {
48+
pub fn new(reader: R) -> Self {
49+
Self { reader }
50+
}
51+
52+
pub fn into_inner(self) -> R {
53+
self.reader
54+
}
55+
}
56+
57+
impl<R: AsyncRead + Unpin> AsyncRead for AsyncDiscarder<R> {
58+
fn poll_read(
59+
mut self: Pin<&mut Self>,
60+
cx: &mut Context<'_>,
61+
buf: &mut [u8],
62+
) -> Poll<IOResult<usize>> {
63+
Pin::new(&mut self.reader).poll_read(cx, buf)
64+
}
65+
}
66+
67+
#[async_trait(?Send)]
68+
impl<R: AsyncRead + Unpin> AsyncSeekForward for AsyncDiscarder<R> {
69+
async fn async_seek_forward(&mut self, forward: u64) -> IOResult<()> {
70+
let mut buf = [0; 4096];
71+
let mut left = forward as usize;
72+
while left > 0 {
73+
let toread = left.min(buf.len());
74+
let r = self.read(&mut buf[0..toread]).await?;
75+
left -= r;
76+
}
77+
Ok(())
78+
}
79+
}
80+
3981
#[cfg(test)]
4082
mod test {
4183
use super::*;

0 commit comments

Comments
 (0)