~science-computing/butido

2d82860bbd867a328fa1ab21e77705439ba4636b — Matthias Beyer 3 years ago 66c1fc7 + 7a5f0f9
Merge branch 'test-diamond-dependencies'

This merge changes the organizational structure of the JobTask running
from a Tree to a DAG.

We're close to 0.1.0 with this!
M Cargo.toml => Cargo.toml +1 -0
@@ 21,6 21,7 @@ clap_generate  = "3.0.0-beta.2"
colored        = "2"
config         = "0.10"
csv            = "1.1"
daggy          = { version = "0.7", features = [ "serde" ] }
diesel         = { version = "1.4", features = ["postgres", "chrono", "uuid", "serde_json"] }
env_logger     = "0.8"
filters        = "0.4.0"

A examples/packages/diamond-dependencies/Makefile => examples/packages/diamond-dependencies/Makefile +33 -0
@@ 0,0 1,33 @@
export BUTIDO_RELEASES="/tmp/butido-test-diamond-dependencies-releases"
export BUTIDO_STAGING="/tmp/butido-test-diamond-dependencies-staging"
export BUTIDO_SOURCE_CACHE="/tmp/butido-test-diamond-dependencies-sources"
export BUTIDO_LOG_DIR="/tmp/butido-test-diamond-dependencies-logs"
export BUTIDO_REPO="/tmp/butido-test-diamond-dependencies-repo"

.PHONY: all
all: directories copyrepo copysrc

directories: ${BUTIDO_RELEASES} ${BUTIDO_STAGING} ${BUTIDO_SOURCE_CACHE} ${BUTIDO_LOG_DIR} ${BUTIDO_REPO}

copyrepo: ${BUTIDO_REPO}
	cp -rv ./repo/* ${BUTIDO_REPO}/
	cd ${BUTIDO_REPO}/ && git init && git add . && git commit -m init

copysrc: ${BUTIDO_SOURCE_CACHE}
	cp -rv ./sources/* ${BUTIDO_SOURCE_CACHE}/

${BUTIDO_RELEASES}:
	mkdir -p "${BUTIDO_RELEASES}"

${BUTIDO_STAGING}:
	mkdir -p "${BUTIDO_STAGING}"

${BUTIDO_SOURCE_CACHE}:
	mkdir -p "${BUTIDO_SOURCE_CACHE}"

${BUTIDO_LOG_DIR}:
	mkdir -p "${BUTIDO_LOG_DIR}"

${BUTIDO_REPO}:
	mkdir -p "${BUTIDO_REPO}"


A examples/packages/diamond-dependencies/README.md => examples/packages/diamond-dependencies/README.md +20 -0
@@ 0,0 1,20 @@
# Test diamond dependencies

This subtree provides a testcase for the following:

If we have a package tree that looks like this:

      .-> C -.
     /        \
    D          > A
     \        /
      `-> B -ยด

(arrow means "depends on").


## Note

To reproduce the issue, make sure to adapt the ./repo/config.toml as appropriate
for your environment.


A examples/packages/diamond-dependencies/repo/a/pkg.toml => examples/packages/diamond-dependencies/repo/a/pkg.toml +7 -0
@@ 0,0 1,7 @@
name = "a"
version = "1"

[sources.src]
url = "https://example.com"
hash.hash = "e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e"


A examples/packages/diamond-dependencies/repo/b/pkg.toml => examples/packages/diamond-dependencies/repo/b/pkg.toml +9 -0
@@ 0,0 1,9 @@
name = "b"
version = "2"

[dependencies]
build = ["a =1"]

[sources.src]
url = "https://example.com"
hash.hash = "7448d8798a4380162d4b56f9b452e2f6f9e24e7a"

A examples/packages/diamond-dependencies/repo/c/pkg.toml => examples/packages/diamond-dependencies/repo/c/pkg.toml +9 -0
@@ 0,0 1,9 @@
name = "c"
version = "3"

[dependencies]
build = ["a =1"]

[sources.src]
url = "https://example.com"
hash.hash = "a3db5c13ff90a36963278c6a39e4ee3c22e2a436"

A examples/packages/diamond-dependencies/repo/config.toml => examples/packages/diamond-dependencies/repo/config.toml +52 -0
@@ 0,0 1,52 @@
# Example configuration file for butido
compatibility = "0.1.0"
script_highlight_theme = "Solarized (dark)"

releases     = "/tmp/butido-test-diamond-dependencies-releases"
staging      = "/tmp/butido-test-diamond-dependencies-staging"
source_cache = "/tmp/butido-test-diamond-dependencies-sources"
log_dir      = "/tmp/butido-test-diamond-dependencies-logs"


strict_script_interpolation = true


#
#
# Log database configuration
#
#

# Database configuration should be self-explanatory
database_host     = "localhost"
database_port     = 5432
database_user     = "pgdev"
database_password = "password"
database_name     = "butido"

available_phases = [ "dummy" ]


[docker]

# Images which can be used to build
# images not listed here are automatically rejected
images = [ "debian:bullseye" ]
verify_images_present = true

#
# List of docker endpoints
#

[[docker.endpoints]]
name          = "testhostname"
uri           = "http://0.0.0.0:8095" # the URI of the endpoint. Either http or socket path
endpoint_type = "http" # either "http" or "socket"
speed         = 1 # currently ignored, but required to be present
maxjobs       = 1 # currently ignored, but required to be present


[containers]
check_env_names = true
allowed_env     = [ ]


A examples/packages/diamond-dependencies/repo/d/pkg.toml => examples/packages/diamond-dependencies/repo/d/pkg.toml +9 -0
@@ 0,0 1,9 @@
name = "d"
version = "4"

[dependencies]
build = ["b =2", "c =3"]

[sources.src]
url = "https://example.com"
hash.hash = "9c6b057a2b9d96a4067a749ee3b3b0158d390cf1"

A examples/packages/diamond-dependencies/repo/pkg.toml => examples/packages/diamond-dependencies/repo/pkg.toml +20 -0
@@ 0,0 1,20 @@
version_is_semver = false
patches = []

[dependencies]
build = []
runtime = []

[sources.src]
hash.type = "sha1"
download_manually = false

[phases]

dummy.script = '''
    echo "Dummy"

    mkdir /outputs
    touch /outputs/{{this.name}}-{{this.version}}.pkg
'''


A examples/packages/diamond-dependencies/sources/a-1/src-e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e.source => examples/packages/diamond-dependencies/sources/a-1/src-e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e.source +1 -0
@@ 0,0 1,1 @@
1

A examples/packages/diamond-dependencies/sources/b-2/src-7448d8798a4380162d4b56f9b452e2f6f9e24e7a.source => examples/packages/diamond-dependencies/sources/b-2/src-7448d8798a4380162d4b56f9b452e2f6f9e24e7a.source +1 -0
@@ 0,0 1,1 @@
2

A examples/packages/diamond-dependencies/sources/c-3/src-a3db5c13ff90a36963278c6a39e4ee3c22e2a436.source => examples/packages/diamond-dependencies/sources/c-3/src-a3db5c13ff90a36963278c6a39e4ee3c22e2a436.source +1 -0
@@ 0,0 1,1 @@
3

A examples/packages/diamond-dependencies/sources/d-4/src-9c6b057a2b9d96a4067a749ee3b3b0158d390cf1.source => examples/packages/diamond-dependencies/sources/d-4/src-9c6b057a2b9d96a4067a749ee3b3b0158d390cf1.source +1 -0
@@ 0,0 1,1 @@
4

M src/commands/build.rs => src/commands/build.rs +10 -13
@@ 38,7 38,7 @@ use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;
use crate::package::PackageVersion;
use crate::package::Shebang;
use crate::package::Tree;
use crate::package::Dag;
use crate::repository::Repository;
use crate::schema;
use crate::source::SourceCache;


@@ 195,15 195,12 @@ pub async fn build(
        r.map(RwLock::new).map(Arc::new).map(|store| (store, p))?
    };

    let tree = {
    let dag = {
        let bar_tree_building = progressbars.bar();
        bar_tree_building.set_length(max_packages);

        let mut tree = Tree::default();
        tree.add_package(package.clone(), &repo, bar_tree_building.clone())?;

        bar_tree_building.finish_with_message("Finished loading Tree");
        tree
        let dag = Dag::for_root_package(package.clone(), &repo, bar_tree_building.clone())?;
        bar_tree_building.finish_with_message("Finished loading Dag");
        dag
    };

    let source_cache = SourceCache::new(config.source_cache_root().clone());


@@ 212,7 209,7 @@ pub async fn build(
        warn!("No hash verification will be performed");
    } else {
        crate::commands::source::verify_impl(
            tree.all_packages().into_iter(),
            dag.all_packages().into_iter(),
            &source_cache,
            &progressbars,
        )


@@ 223,7 220,7 @@ pub async fn build(
    if matches.is_present("no_lint") {
        warn!("No script linting will be performed!");
    } else if let Some(linter) = crate::ui::find_linter_command(repo_root, config)? {
        let all_packages = tree.all_packages();
        let all_packages = dag.all_packages();
        let bar = progressbars.bar();
        bar.set_length(all_packages.len() as u64);
        bar.set_message("Linting package scripts...");


@@ 234,7 231,7 @@ pub async fn build(
        warn!("No linter set in configuration, no script linting will be performed!");
    } // linting

    tree.all_packages()
    dag.all_packages()
        .into_iter()
        .map(|pkg| {
            if let Some(allowlist) = pkg.allowed_images() {


@@ 304,7 301,7 @@ pub async fn build(

    trace!("Setting up job sets");
    let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
    let jobtree = crate::job::Tree::from_package_tree(tree, shebang, image_name, phases.clone(), resources);
    let jobdag = crate::job::Dag::from_package_dag(dag, shebang, image_name, phases.clone(), resources);
    trace!("Setting up job sets finished successfully");

    trace!("Setting up Orchestrator");


@@ 322,7 319,7 @@ pub async fn build(
        } else {
            None
        })
        .jobtree(jobtree)
        .jobdag(jobdag)
        .config(config)
        .build()
        .setup()

M src/commands/tree_of.rs => src/commands/tree_of.rs +3 -6
@@ 15,7 15,7 @@ use resiter::AndThen;

use crate::package::PackageName;
use crate::package::PackageVersionConstraint;
use crate::package::Tree;
use crate::package::Dag;
use crate::repository::Repository;
use crate::util::progress::ProgressBars;



@@ 45,8 45,7 @@ pub async fn tree_of(
        })
        .map(|package| {
            let bar_tree_building = progressbars.bar();
            let mut tree = Tree::default();
            let _ = tree.add_package(package.clone(), &repo, bar_tree_building.clone())?;
            let tree = Dag::for_root_package(package.clone(), &repo, bar_tree_building.clone())?;
            bar_tree_building.finish_with_message("Finished loading Tree");
            Ok(tree)
        })


@@ 54,9 53,7 @@ pub async fn tree_of(
            let stdout = std::io::stdout();
            let mut outlock = stdout.lock();

            tree.display()
                .iter()
                .try_for_each(|d| ptree::write_tree(d, &mut outlock).map_err(Error::from))
            ptree::write_tree(&tree.display(), &mut outlock).map_err(Error::from)
        })
        .collect::<Result<()>>()
}

A src/job/dag.rs => src/job/dag.rs +86 -0
@@ 0,0 1,86 @@
//
// Copyright (c) 2020-2021 science+computing ag and other contributors
//
// This program and the accompanying materials are made
// available under the terms of the Eclipse Public License 2.0
// which is available at https://www.eclipse.org/legal/epl-2.0/
//
// SPDX-License-Identifier: EPL-2.0
//

use daggy::Dag as DaggyDag;
use daggy::NodeIndex;
use daggy::Walker;
use getset::Getters;
use uuid::Uuid;

use crate::job::Job;
use crate::job::JobResource;
use crate::package::Package;
use crate::package::PhaseName;
use crate::package::Shebang;
use crate::util::docker::ImageName;

#[derive(Debug, Getters)]
pub struct Dag {
    #[getset(get = "pub")]
    dag: DaggyDag<Job, i8>,

    #[getset(get = "pub")]
    root_idx: NodeIndex,
}

impl Dag {
    pub fn from_package_dag(
        dag: crate::package::Dag,
        script_shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> Self {
        let build_job = |_, p: &Package| {
            Job::new(
                p.clone(),
                script_shebang.clone(),
                image.clone(),
                phases.clone(),
                resources.clone(),
            )
        };

        Dag {
            dag: dag.dag().map(build_job, |_, e| *e),
            root_idx: *dag.root_idx(),
        }
    }

    pub fn iter<'a>(&'a self) -> impl Iterator<Item = JobDefinition> + 'a {
        self.dag
            .graph()
            .node_indices()
            .map(move |idx| {
                let job = self.dag.graph().node_weight(idx).unwrap(); // TODO
                let children = self.dag.children(idx);
                let children_uuids = children.iter(&self.dag)
                    .filter_map(|(_, node_idx)| {
                        self.dag.graph().node_weight(node_idx)
                    })
                    .map(Job::uuid)
                    .cloned()
                    .collect();

                JobDefinition {
                    job,
                    dependencies: children_uuids
                }
            })
    }

}

#[derive(Debug)]
pub struct JobDefinition<'a> {
    pub job: &'a Job,
    pub dependencies: Vec<Uuid>,
}


M src/job/mod.rs => src/job/mod.rs +2 -2
@@ 12,8 12,8 @@
mod job;
pub use job::*;

mod tree;
pub use tree::*;
mod dag;
pub use dag::*;

mod resource;
pub use resource::*;

D src/job/tree.rs => src/job/tree.rs +0 -80
@@ 1,80 0,0 @@
//
// Copyright (c) 2020-2021 science+computing ag and other contributors
//
// This program and the accompanying materials are made
// available under the terms of the Eclipse Public License 2.0
// which is available at https://www.eclipse.org/legal/epl-2.0/
//
// SPDX-License-Identifier: EPL-2.0
//

use std::collections::BTreeMap;

use uuid::Uuid;
use getset::Getters;

use crate::job::Job;
use crate::job::JobResource;
use crate::package::PhaseName;
use crate::package::Shebang;
use crate::util::docker::ImageName;

#[derive(Debug, Getters)]
pub struct Tree {
    #[getset(get = "pub")]
    inner: BTreeMap<Uuid, JobDefinition>,
}

impl Tree {
    pub fn from_package_tree(pt: crate::package::Tree,
        script_shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> Self {
        Tree { inner: Self::build_tree(pt, script_shebang, image, phases, resources) }
    }

    fn build_tree(pt: crate::package::Tree,
        script_shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> BTreeMap<Uuid, JobDefinition> {
        let mut tree = BTreeMap::new();

        for (package, dependencies) in pt.into_iter() {
            let mut deps = Self::build_tree(dependencies,
                script_shebang.clone(),
                image.clone(),
                phases.clone(),
                resources.clone());

            let deps_uuids = deps.keys().cloned().collect();
            tree.append(&mut deps);

            let job = Job::new(package,
                script_shebang.clone(),
                image.clone(),
                phases.clone(),
                resources.clone());

            let job_uuid = *job.uuid();
            let jdef = JobDefinition { job, dependencies: deps_uuids };

            tree.insert(job_uuid, jdef);
        }

        tree
    }

}

/// A job definition is the job itself and all UUIDs from jobs this job depends on.
#[derive(Debug)]
pub struct JobDefinition {
    pub job: Job,

    /// Uuids of the jobs where this job depends on the outputs
    pub dependencies: Vec<Uuid>,
}

M src/orchestrator/orchestrator.rs => src/orchestrator/orchestrator.rs +63 -40
@@ 10,6 10,7 @@

#![allow(unused)]

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;



@@ 37,7 38,7 @@ use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
use crate::job::RunnableJob;
use crate::job::Tree as JobTree;
use crate::job::Dag;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;



@@ 45,7 46,7 @@ use crate::util::progress::ProgressBars;
/// The Orchestrator
///
/// The Orchestrator is used to orchestrate the work on one submit.
/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of
/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of
/// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently.
///
/// Because of the implementation of [JobTask], the work happens in


@@ 153,7 154,7 @@ pub struct Orchestrator<'a> {
    progress_generator: ProgressBars,
    merged_stores: MergedStores,
    source_cache: SourceCache,
    jobtree: JobTree,
    jobdag: Dag,
    config: &'a Configuration,
    database: Arc<PgConnection>,
}


@@ 165,7 166,7 @@ pub struct OrchestratorSetup<'a> {
    staging_store: Arc<RwLock<StagingStore>>,
    release_store: Arc<RwLock<ReleaseStore>>,
    source_cache: SourceCache,
    jobtree: JobTree,
    jobdag: Dag,
    database: Arc<PgConnection>,
    submit: dbmodels::Submit,
    log_dir: Option<PathBuf>,


@@ 188,7 189,7 @@ impl<'a> OrchestratorSetup<'a> {
            progress_generator: self.progress_generator,
            merged_stores: MergedStores::new(self.release_store, self.staging_store),
            source_cache: self.source_cache,
            jobtree: self.jobtree,
            jobdag: self.jobdag,
            config: self.config,
            database: self.database,
        })


@@ 202,19 203,19 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>;
type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>;

impl<'a> Orchestrator<'a> {
    pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
    pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> {
        let (results, errors) = self.run_tree().await?;
        output.extend(results.into_iter());
        Ok(errors)
    }

    async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
    async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> {
        let multibar = Arc::new(indicatif::MultiProgress::new());

        // For each job in the jobtree, built a tuple with
        // For each job in the jobdag, built a tuple with
        //
        // 1. The receiver that is used by the task to receive results from dependency tasks from
        // 2. The task itself (as a TaskPreparation object)


@@ 223,16 224,15 @@ impl<'a> Orchestrator<'a> {
        //    This is an Option<> because we need to set it later and the root of the tree needs a
        //    special handling, as this very function will wait on a receiver that gets the results
        //    of the root task
        let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree
            .inner()
        let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag
            .iter()
            .map(|(uuid, jobdef)| {
            .map(|jobdef| {
                // We initialize the channel with 100 elements here, as there is unlikely a task
                // that depends on 100 other tasks.
                // Either way, this might be increased in future.
                let (sender, receiver) = tokio::sync::mpsc::channel(100);

                trace!("Creating TaskPreparation object for job {}", uuid);
                trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid());
                let bar = self.progress_generator.bar();
                let bar = multibar.add(bar);
                bar.set_length(100);


@@ 247,7 247,7 @@ impl<'a> Orchestrator<'a> {
                    database: self.database.clone(),
                };

                (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>))
                (receiver, tp, sender, std::cell::RefCell::new(None as Option<Vec<Sender<JobResult>>>))
            })
            .collect();



@@ 262,9 262,26 @@ impl<'a> Orchestrator<'a> {
        //      find the job that depends on this job
        //      use the sender of the found job and set it as sender for this job
        for job in jobs.iter() {
            *job.3.borrow_mut() = jobs.iter()
                .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
                .map(|j| j.2.clone());
            if let Some(mut v) = job.3.borrow_mut().as_mut() {
                v.extend({
                jobs.iter()
                    .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
                    .map(|j| j.2.clone())
                });
            } else {
                *job.3.borrow_mut() = {
                    let depending_on_job = jobs.iter()
                        .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
                        .map(|j| j.2.clone())
                        .collect::<Vec<Sender<JobResult>>>();

                    if depending_on_job.is_empty() {
                        None
                    } else {
                        Some(depending_on_job)
                    }
                };
            }
        }

        // Find the id of the root task


@@ 309,7 326,7 @@ impl<'a> Orchestrator<'a> {
                    receiver: prep.0,

                    // the sender is set or we need to use the root sender
                    sender: prep.3.into_inner().unwrap_or(root_sender),
                    sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]),
                }
            })
            .map(|task| task.run())


@@ 323,7 340,7 @@ impl<'a> Orchestrator<'a> {
            None                     => Err(anyhow!("No result received...")),
            Some(Ok(results)) => {
                let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
                Ok((results, vec![]))
                Ok((results, HashMap::with_capacity(0)))
            },
            Some(Err(errors))        => Ok((vec![], errors)),
        }


@@ 337,8 354,7 @@ impl<'a> Orchestrator<'a> {
///
/// This simply holds data and does not contain any more functionality
struct TaskPreparation<'a> {
    /// The UUID of this job
    jobdef: &'a JobDefinition,
    jobdef: JobDefinition<'a>,

    bar: ProgressBar,



@@ 353,8 369,7 @@ struct TaskPreparation<'a> {
///
/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
struct JobTask<'a> {
    /// The UUID of this job
    jobdef: &'a JobDefinition,
    jobdef: JobDefinition<'a>,

    bar: ProgressBar,



@@ 368,7 383,7 @@ struct JobTask<'a> {
    receiver: Receiver<JobResult>,

    /// Channel to send the own build outputs to
    sender: Sender<JobResult>,
    sender: Vec<Sender<JobResult>>,
}

impl<'a> JobTask<'a> {


@@ 385,15 400,15 @@ impl<'a> JobTask<'a> {

        // A list of job run results from dependencies that were received from the tasks for the
        // dependencies
        let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![];
        let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();

        // A list of errors that were received from the tasks for the dependencies
        let mut received_errors: Vec<(Uuid, Error)> = vec![];
        let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());

        // Helper function to check whether all UUIDs are in a list of UUIDs
        let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| {
        let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| {
            dependency_uuids.iter().all(|dependency_uuid| {
                list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid)
                list.keys().any(|id| id == dependency_uuid)
            })
        };



@@ 421,7 436,10 @@ impl<'a> JobTask<'a> {
            // if there are any errors from child tasks
            if !received_errors.is_empty() {
                // send them to the parent,...
                self.sender.send(Err(received_errors)).await;
                //
                // We only send to one parent, because it doesn't matter
                // And we know that we have at least one sender
                self.sender[0].send(Err(received_errors)).await;

                // ... and stop operation, because the whole tree will fail anyways.
                return Ok(())


@@ 447,8 465,8 @@ impl<'a> JobTask<'a> {
        // to
        //      Vec<Artifact>
        let dependency_artifacts = received_dependencies
            .iter()
            .map(|tpl| tpl.1.iter())
            .values()
            .map(|v| v.iter())
            .flatten()
            .cloned()
            .collect();


@@ 480,17 498,22 @@ impl<'a> JobTask<'a> {
            Err(e) => {
                trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e);
                // ... and we send that to our parent
                self.sender.send(Err(vec![(job_uuid, e)])).await?;
                //
                // We only send to one parent, because it doesn't matter anymore
                // We know that we have at least one sender available
                let mut errormap = HashMap::with_capacity(1);
                errormap.insert(job_uuid, e);
                self.sender[0].send(Err(errormap)).await?;
            },

            // if the scheduler run reports success,
            // it returns the database artifact objects it created!
            Ok(artifacts) => {
                trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
                received_dependencies.push((*self.jobdef.job.uuid(), artifacts));
                self.sender
                    .send(Ok(received_dependencies))
                    .await?;
                received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
                for s in self.sender {
                    s.send(Ok(received_dependencies.clone())).await?;
                }
            },
        }



@@ 505,20 528,20 @@ impl<'a> JobTask<'a> {
    ///
    /// Return Ok(true) if we should continue operation
    /// Return Ok(false) if the channel is empty and we're done receiving
    async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
    async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
        match self.receiver.recv().await {
            Some(Ok(mut v)) => {
                // The task we depend on succeeded and returned an
                // (uuid of the job, [Artifact])
                trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
                received_dependencies.append(&mut v);
                received_dependencies.extend(v);
                Ok(true)
            },
            Some(Err(mut e)) => {
                // The task we depend on failed
                // we log that error for now
                trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e);
                received_errors.append(&mut e);
                received_errors.extend(e);
                Ok(true)
            },
            None => {


@@ 526,7 549,7 @@ impl<'a> JobTask<'a> {
                trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());

                // Find all dependencies that we need but which are not received
                let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
                let received = received_dependencies.keys().collect::<Vec<_>>();
                let missing_deps: Vec<_> = self.jobdef
                    .dependencies
                    .iter()

R src/package/tree.rs => src/package/dag.rs +211 -208
@@ 9,108 9,105 @@
//

use std::borrow::Cow;
use std::collections::HashMap;
use std::io::Result as IoResult;
use std::io::Write;

use anyhow::anyhow;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use daggy::Walker;
use indicatif::ProgressBar;
use log::trace;
use ptree::Style;
use ptree::TreeItem;
use resiter::AndThen;
use serde::Deserialize;
use serde::Serialize;
use getset::Getters;

use crate::package::Package;
use crate::repository::Repository;

#[derive(Debug, Serialize, Deserialize)]
pub struct Tree {
    root: Vec<Mapping>,
}
#[derive(Debug, Getters)]
pub struct Dag {
    #[getset(get = "pub")]
    dag: daggy::Dag<Package, i8>,

/// Helper type
///
/// This helper type is required so that the serialized JSON is a bit more readable.
#[derive(Debug, Serialize, Deserialize)]
struct Mapping {
    package: Package,
    dependencies: Tree,
    #[getset(get = "pub")]
    root_idx: daggy::NodeIndex,
}

impl Tree {
    pub fn add_package(
        &mut self,
impl Dag {
    pub fn for_root_package(
        p: Package,
        repo: &Repository,
        progress: ProgressBar,
    ) -> Result<()> {
        macro_rules! mk_add_package_tree {
            ($this:ident, $pack:ident, $repo:ident, $root:ident, $progress:ident) => {{
                let mut subtree = Tree::default();
                ($pack)
                    .get_self_packaged_dependencies()
    ) -> Result<Self> {
        fn add_sub_packages<'a>(
            repo: &'a Repository,
            mappings: &mut HashMap<&'a Package, daggy::NodeIndex>,
            dag: &mut daggy::Dag<&'a Package, i8>,
            p: &'a Package,
            progress: &ProgressBar
        ) -> Result<()> {
            p.get_self_packaged_dependencies()
                .and_then_ok(|(name, constr)| {
                    trace!("Dependency for {} {} found: {:?}", p.name(), p.version(), name);
                    let packs = repo.find_with_version(&name, &constr);
                    trace!("Found in repo: {:?}", packs);

                    // If we didn't check that dependency already
                    if !mappings.keys().any(|p| packs.iter().any(|pk| pk.name() == p.name() && pk.version() == p.version())) {
                        // recurse
                        packs.into_iter()
                            .try_for_each(|p| {
                                progress.tick();

                                let idx = dag.add_node(p);
                                mappings.insert(p, idx);

                                trace!("Recursing for: {:?}", p);
                                add_sub_packages(repo, mappings, dag, p, progress)
                            })
                    } else {
                        Ok(())
                    }
                })
                .collect::<Result<()>>()
        }

        fn add_edges(mappings: &HashMap<&Package, daggy::NodeIndex>, dag: &mut daggy::Dag<&Package, i8>) -> Result<()> {
            for (package, idx) in mappings {
                package.get_self_packaged_dependencies()
                    .and_then_ok(|(name, constr)| {
                        trace!("Dependency: {:?}", name);
                        let pack = ($repo).find_with_version(&name, &constr);
                        trace!("Found: {:?}", pack);

                        if pack.iter().any(|p| ($root).has_package(p)) {
                            return Err(anyhow!(
                                "Duplicate version of some package in {:?} found",
                                pack
                            ));
                        }
                        trace!("All dependecies available...");

                        pack.into_iter()
                            .map(|p| {
                                ($progress).tick();
                                trace!("Following dependecy: {:?}", p);
                                add_package_tree(
                                    &mut subtree,
                                    p.clone(),
                                    ($repo),
                                    ($root),
                                    ($progress).clone(),
                                )
                        mappings
                            .iter()
                            .filter(|(package, _)| *package.name() == name && constr.matches(package.version()))
                            .try_for_each(|(_, dep_idx)| {
                                dag.add_edge(*idx, *dep_idx, 0)
                                    .map(|_| ())
                                    .map_err(Error::from)
                            })
                            .collect()
                    })
                    .collect::<Result<Vec<()>>>()?;

                trace!("Inserting subtree: {:?} -> {:?}", ($pack), subtree);
                ($this).root.push(Mapping {
                    package: ($pack),
                    dependencies: subtree,
                });
                Ok(())
            }};
        };
                    .collect::<Result<()>>()?
            }

        fn add_package_tree(
            this: &mut Tree,
            p: Package,
            repo: &Repository,
            root: &mut Tree,
            progress: ProgressBar,
        ) -> Result<()> {
            mk_add_package_tree!(this, p, repo, root, progress)
            Ok(())
        }

        let mut dag: daggy::Dag<&Package, i8> = daggy::Dag::new();
        let mut mappings = HashMap::new();

        trace!("Making package Tree for {:?}", p);
        let r = mk_add_package_tree!(self, p, repo, self, progress);
        let root_idx = dag.add_node(&p);
        mappings.insert(&p, root_idx);
        add_sub_packages(repo, &mut mappings, &mut dag, &p, &progress)?;
        add_edges(&mappings, &mut dag)?;
        trace!("Finished makeing package Tree");
        r
    }

    /// Get packages of the tree
    ///
    /// This does not yield packages which are dependencies of this tree node.
    /// It yields only packages for this particular Tree instance.
    pub fn packages(&self) -> impl Iterator<Item = &Package> {
        self.root.iter().map(|mapping| &mapping.package)
        Ok(Dag {
            dag: dag.map(|_, p: &&Package| -> Package { (*p).clone() }, |_, e| *e),
            root_idx
        })
    }

    /// Get all packages in the tree by reference


@@ 119,63 116,40 @@ impl Tree {
    ///
    /// The order of the packages is _NOT_ guaranteed by the implementation
    pub fn all_packages(&self) -> Vec<&Package> {
        self.root
            .iter()
            .map(|m| m.dependencies.all_packages())
            .flatten()
            .chain(self.root.iter().map(|m| &m.package))
        self.dag
            .graph()
            .node_indices()
            .filter_map(|idx| self.dag.graph().node_weight(idx))
            .collect()
    }

    /// Get dependencies stored in this tree
    pub fn dependencies(&self) -> impl Iterator<Item = &Tree> {
        self.root.iter().map(|mapping| &mapping.dependencies)
    }

    pub fn into_iter(self) -> impl IntoIterator<Item = (Package, Tree)> {
        self.root.into_iter().map(|m| (m.package, m.dependencies))
    }

    pub fn has_package(&self, p: &Package) -> bool {
        let name_eq = |k: &Package| k.name() == p.name();
        self.packages().any(name_eq) || self.dependencies().any(|t| t.has_package(p))
    }

    pub fn display(&self) -> Vec<DisplayTree> {
        self.root.iter().map(DisplayTree).collect()
    pub fn display(&self) -> DagDisplay {
        DagDisplay(self, self.root_idx)
    }
}

#[derive(Clone)]
pub struct DisplayTree<'a>(&'a Mapping);
pub struct DagDisplay<'a>(&'a Dag, daggy::NodeIndex);

impl<'a> TreeItem for DisplayTree<'a> {
impl<'a> TreeItem for DagDisplay<'a> {
    type Child = Self;

    fn write_self<W: Write>(&self, f: &mut W, _: &Style) -> IoResult<()> {
        write!(f, "{} {}", self.0.package.name(), self.0.package.version())
        let p = self.0.dag.graph().node_weight(self.1)
            .ok_or_else(|| anyhow!("Error finding node: {:?}", self.1))
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
        write!(f, "{} {}", p.name(), p.version())
    }

    fn children(&self) -> Cow<[Self::Child]> {
        Cow::from(
            self.0
                .dependencies
                .root
                .iter()
                .map(DisplayTree)
                .collect::<Vec<_>>(),
        let c = self.0.dag.children(self.1);
        Cow::from(c.iter(&self.0.dag)
            .map(|(_, idx)| DagDisplay(self.0, idx))
            .collect::<Vec<_>>()
        )
    }
}

impl Default for Tree {
    fn default() -> Tree {
        Tree {
            root: Vec::default(),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;


@@ 205,38 179,7 @@ mod tests {
        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        assert!(r.is_ok());
    }

    #[test]
    fn test_add_two_packages() {
        let mut btree = BTreeMap::new();

        let p1 = {
            let name = "a";
            let vers = "1";
            let pack = package(name, vers, "https://rust-lang.org", "123");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };
        let p2 = {
            let name = "b";
            let vers = "2";
            let pack = package(name, vers, "https://rust-lang.org", "124");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress.clone());
        assert!(r.is_ok());

        let r = tree.add_package(p2, &repo, progress);
        let r = Dag::for_root_package(p1, &repo, progress);
        assert!(r.is_ok());
    }



@@ 268,17 211,15 @@ mod tests {
        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        assert!(r.is_ok());
        assert!(tree.packages().all(|p| *p.name() == pname("a")));
        assert!(tree.packages().all(|p| *p.version() == pversion("1")));

        let subtree: Vec<&Tree> = tree.dependencies().collect();
        assert_eq!(subtree.len(), 1);
        let subtree = subtree[0];
        assert!(subtree.packages().all(|p| *p.name() == pname("b")));
        assert!(subtree.packages().all(|p| *p.version() == pversion("2")));
        let dag = Dag::for_root_package(p1, &repo, progress);
        assert!(dag.is_ok());
        let dag = dag.unwrap();
        let ps = dag.all_packages();

        assert!(ps.iter().any(|p| *p.name() == pname("a")));
        assert!(ps.iter().any(|p| *p.version() == pversion("1")));
        assert!(ps.iter().any(|p| *p.name() == pname("b")));
        assert!(ps.iter().any(|p| *p.version() == pversion("2")));
    }

    #[test]


@@ 359,37 300,16 @@ mod tests {
        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        let r = Dag::for_root_package(p1, &repo, progress);
        assert!(r.is_ok());
        assert!(tree.packages().all(|p| *p.name() == pname("p1")));
        assert!(tree.packages().all(|p| *p.version() == pversion("1")));

        let subtrees: Vec<&Tree> = tree.dependencies().collect();
        assert_eq!(subtrees.len(), 1);

        let subtree = subtrees[0];
        assert_eq!(subtree.packages().count(), 2);

        assert!(subtree
            .packages()
            .all(|p| { *p.name() == pname("p2") || *p.name() == pname("p4") }));

        let subsubtrees: Vec<&Tree> = subtree.dependencies().collect();
        assert_eq!(subsubtrees.len(), 2);

        assert!(subsubtrees.iter().any(|st| { st.packages().count() == 1 }));

        assert!(subsubtrees.iter().any(|st| { st.packages().count() == 2 }));

        assert!(subsubtrees
            .iter()
            .any(|st| { st.packages().all(|p| *p.name() == pname("p3")) }));

        assert!(subsubtrees.iter().any(|st| {
            st.packages()
                .all(|p| *p.name() == pname("p5") || *p.name() == pname("p6"))
        }));
        let r = r.unwrap();
        let ps = r.all_packages();
        assert!(ps.iter().any(|p| *p.name() == pname("p1") && *p.version() == pversion("1")));
        assert!(ps.iter().any(|p| *p.name() == pname("p2")));
        assert!(ps.iter().any(|p| *p.name() == pname("p4")));
        assert!(ps.iter().any(|p| *p.name() == pname("p3")));
        assert!(ps.iter().any(|p| *p.name() == pname("p5")));
        assert!(ps.iter().any(|p| *p.name() == pname("p6")));
    }

    #[test]


@@ 523,36 443,119 @@ mod tests {
        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        let r = Dag::for_root_package(p1, &repo, progress);
        assert!(r.is_ok());
        assert!(tree.packages().all(|p| *p.name() == pname("p1")));
        assert!(tree.packages().all(|p| *p.version() == pversion("1")));
        let r = r.unwrap();
        let ps = r.all_packages();
        assert!(ps.iter().any(|p| *p.name() == pname("p1") && *p.version() == pversion("1")));
        assert!(ps.iter().any(|p| *p.name() == pname("p2")));
        assert!(ps.iter().any(|p| *p.name() == pname("p3")));
        assert!(ps.iter().any(|p| *p.name() == pname("p4")));
        assert!(ps.iter().any(|p| *p.name() == pname("p5")));
        assert!(ps.iter().any(|p| *p.name() == pname("p6")));
    }

    #[test]
    fn test_add_dag() {
        let mut btree = BTreeMap::new();

        //
        // Test the following (made up) tree:
        //
        //  p1
        //   - p2
        //     - p3
        //   - p4
        //     - p3
        //
        // where "p3" is referenced from "p2" and "p4"
        //
        // The tree also contains a few irrelevant packages.
        //

        let p1 = {
            let name = "p1";
            let vers = "1";
            let mut pack = package(name, vers, "https://rust-lang.org", "123");
            {
                let d1 = Dependency::from(String::from("p2 =2"));
                let d2 = Dependency::from(String::from("p4 =4"));
                let ds = Dependencies::with_runtime_dependencies(vec![d1, d2]);
                pack.set_dependencies(ds);
            }
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let subtrees: Vec<&Tree> = tree.dependencies().collect();
        assert_eq!(subtrees.len(), 1);
        {
            let name = "p1";
            let vers = "2";
            let mut pack = package(name, vers, "https://rust-lang.org", "123");
            {
                let d1 = Dependency::from(String::from("p2 =2"));
                let d2 = Dependency::from(String::from("p4 =5"));
                let ds = Dependencies::with_runtime_dependencies(vec![d1, d2]);
                pack.set_dependencies(ds);
            }
            btree.insert((pname(name), pversion(vers)), pack);
        }

        let subtree = subtrees[0];
        assert_eq!(subtree.packages().count(), 2);
        {
            let name = "p2";
            let vers = "2";
            let mut pack = package(name, vers, "https://rust-lang.org", "124");
            {
                let d1 = Dependency::from(String::from("p3 =3"));
                let ds = Dependencies::with_runtime_dependencies(vec![d1]);
                pack.set_dependencies(ds);
            }
            btree.insert((pname(name), pversion(vers)), pack);
        }

        assert!(subtree
            .packages()
            .all(|p| { *p.name() == pname("p2") || *p.name() == pname("p4") }));
        {
            let name = "p3";
            let vers = "3";
            let pack = package(name, vers, "https://rust-lang.org", "125");
            btree.insert((pname(name), pversion(vers)), pack);
        }

        let subsubtrees: Vec<&Tree> = subtree.dependencies().collect();
        assert_eq!(subsubtrees.len(), 2);
        {
            let name = "p3";
            let vers = "1";
            let pack = package(name, vers, "https://rust-lang.org", "128");
            btree.insert((pname(name), pversion(vers)), pack);
        }

        assert!(subsubtrees.iter().any(|st| { st.packages().count() == 1 }));
        {
            let name = "p3";
            let vers = "3.1";
            let pack = package(name, vers, "https://rust-lang.org", "118");
            btree.insert((pname(name), pversion(vers)), pack);
        }

        assert!(subsubtrees.iter().any(|st| { st.packages().count() == 2 }));
        {
            let name = "p4";
            let vers = "4";
            let mut pack = package(name, vers, "https://rust-lang.org", "125");
            {
                let d1 = Dependency::from(String::from("p3 =3"));
                let ds = Dependencies::with_runtime_dependencies(vec![d1]);
                pack.set_dependencies(ds);
            }
            btree.insert((pname(name), pversion(vers)), pack);
        }

        assert!(subsubtrees
            .iter()
            .any(|st| { st.packages().all(|p| *p.name() == pname("p3")) }));
        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        assert!(subsubtrees.iter().any(|st| {
            st.packages()
                .all(|p| *p.name() == pname("p5") || *p.name() == pname("p6"))
        }));
        let r = Dag::for_root_package(p1, &repo, progress);
        assert!(r.is_ok());
        let r = r.unwrap();
        let ps = r.all_packages();
        assert!(ps.iter().any(|p| *p.name() == pname("p1") && *p.version() == pversion("1")));
        assert!(ps.iter().any(|p| *p.name() == pname("p2")));
        assert!(ps.iter().any(|p| *p.name() == pname("p3")));
        assert!(ps.iter().any(|p| *p.name() == pname("p4")));
    }
}


M src/package/mod.rs => src/package/mod.rs +2 -2
@@ 29,8 29,8 @@ pub use script::*;
mod source;
pub use source::*;

mod tree;
pub use tree::*;
mod dag;
pub use dag::*;

mod version;
pub use version::*;

M src/package/package.rs => src/package/package.rs +7 -0
@@ 75,6 75,13 @@ pub struct Package {
    meta: Option<HashMap<String, String>>,
}

impl std::hash::Hash for Package {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.name.hash(state);
        self.version.hash(state);
    }
}

impl Package {
    #[cfg(test)]
    pub fn new(