~matthiasbeyer/butido

9ad578dc96cdf7b74d5f58f7798d0327523b07f7 — Matthias Beyer 1 year, 9 months ago 5d98b4b
Add progress wrapper

When creating only one progress bar for all downloads, we still want to know
how many downloads are happening and the remaining number of bytes to be
received.

This patch implements a ProgressWrapper that synchronizes between the download
tasks and the progress bar.

Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
1 files changed, 99 insertions(+), 22 deletions(-)

M src/commands/source.rs
M src/commands/source.rs => src/commands/source.rs +99 -22
@@ 10,10 10,11 @@

//! Implementation of the 'source' subcommand

use std::convert::TryFrom;
use std::io::Write;
use std::path::PathBuf;
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::Context;
use anyhow::Error;


@@ 21,10 22,9 @@ use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
use colored::Colorize;
use log::{info, trace};
use result_inspect::ResultInspect;
use result_inspect::ResultInspectErr;
use log::{debug, info, trace};
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use tokio_stream::StreamExt;

use crate::config::*;


@@ 226,7 226,68 @@ pub async fn download(
    repo: Repository,
    progressbars: ProgressBars,
) -> Result<()> {
    async fn perform_download(source: &SourceEntry, timeout: Option<u64>) -> Result<()> {
    #[derive(Clone)]
    struct ProgressWrapper {
        download_count: u64,
        finished_downloads: u64,
        current_bytes: usize,
        sum_bytes: u64,
        bar: Arc<Mutex<indicatif::ProgressBar>>,
    }

    impl ProgressWrapper {
        fn new(bar: indicatif::ProgressBar) -> Self {
            Self {
                download_count: 0,
                finished_downloads: 0,
                current_bytes: 0,
                sum_bytes: 0,
                bar: Arc::new(Mutex::new(bar))
            }
        }

        async fn inc_download_count(&mut self) {
            self.download_count += 1;
            self.set_message().await;
        }

        async fn inc_download_bytes(&mut self, bytes: u64) {
            self.sum_bytes += bytes;
            self.set_message().await;
        }

        async fn finish_one_download(&mut self) {
            self.finished_downloads += 1;
            self.bar.lock().await.inc(1);
            self.set_message().await;
        }

        async fn add_bytes(&mut self, len: usize) {
            self.current_bytes += len;
            self.set_message().await;
        }

        async fn set_message(&self) {
            let bar = self.bar.lock().await;
            bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished",
                    current_bytes = self.current_bytes,
                    sum_bytes = self.sum_bytes,
                    dlfinished = self.finished_downloads,
                    dlsum = self.download_count));
        }

        async fn success(&self) {
            let bar = self.bar.lock().await;
            bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count));
        }

        async fn error(&self) {
            let bar = self.bar.lock().await;
            bar.finish_with_message(format!("At least one download of {} failed", self.download_count));
        }
    }

    async fn perform_download(source: &SourceEntry, progress: Arc<Mutex<ProgressWrapper>>, timeout: Option<u64>) -> Result<()> {
        trace!("Creating: {:?}", source);
        let file = source.create().await.with_context(|| {
            anyhow!(


@@ 258,11 319,22 @@ pub async fn download(
            }
        };

        progress.lock()
            .await
            .inc_download_bytes(response.content_length().unwrap_or(0))
            .await;

        let mut stream = response.bytes_stream();
        while let Some(bytes) = stream.next().await {
            file.write_all(bytes?.as_ref()).await?;
            let bytes = bytes?;
            file.write_all(bytes.as_ref()).await?;
            progress.lock()
                .await
                .add_bytes(bytes.len())
                .await
        }

        progress.lock().await.finish_one_download().await;
        file.flush()
            .await
            .map_err(Error::from)


@@ 289,9 361,9 @@ pub async fn download(
        .map(crate::commands::util::mk_package_name_regex)
        .transpose()?;

    let progressbar = progressbars.bar();
    let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar())));

    repo.packages()
    let r = repo.packages()
        .filter(|p| {
            match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) {
                (None, None, None)              => true,


@@ 321,19 393,21 @@ pub async fn download(
                    if source_path_exists && !force {
                        Err(anyhow!("Source exists: {}", source.path().display()))
                    } else {
                        progressbar.lock()
                            .await
                            .inc_download_count()
                            .await;

                        if source_path_exists /* && force is implied by 'if' above*/ {
                            if let Err(e) = source.remove_file().await {
                                progressbar.inc(1);
                                progressbar.lock().await.finish_one_download().await;
                                return Err(e)
                            }
                        }


                        perform_download(&source, timeout)
                            .await
                            .inspect(|_| {
                                progressbar.inc(1);
                            })
                        perform_download(&source, progressbar.clone(), timeout).await?;
                        progressbar.lock().await.finish_one_download().await;
                        Ok(())
                    }
                }
            })


@@ 343,13 417,16 @@ pub async fn download(
        .collect::<Vec<Result<()>>>()
        .await
        .into_iter()
        .collect::<Result<()>>()
        .inspect_err(|_| {
            progressbar.finish_with_message("At least one package failed");
        })
        .inspect(|_| {
            progressbar.finish_with_message("Succeeded");
        })
        .collect::<Result<()>>();

    if r.is_err() {
        progressbar.lock().await.error().await;
    } else {
        progressbar.lock().await.success().await;
    }

    debug!("r = {:?}", r);
    r
}

async fn of(