~science-computing/butido

889649ac16367fe671ce61363bb6ce82531e5a6b — Matthias Beyer 9 months ago 4cb9ace
Reimplement: Orchestrator::run()

This patch reimplements the running of the computed jobs.

The old implementation was structured as follows:

1. Compute a Tree of dependencies for the requested package
2. Make sets of this tree (see below)
3. For each set
    3.1. Run set in parallel by submitting each job in the set to the scheduler
    3.2. collect outputs and errors
    3.3. Record outputs and return errors (if any)

The complexity here was the computing of the JobSets but also the running of
each job in a set in parallel.
The code was non-trivial to understand.

But that's not even the biggest concern with this approch.
Consider the following tree of jobs:

        A
       / \
      B   E
     / \   \
    C   D   F
           / \
          G   H
               \
                I

Each node here represents a package, the edges represent dependencies on the
lower-hanging package.

This tree would result in 5 sets of jobs:

    [
        [ I ]
        [ G, H ]
        [ C, D, F ]
        [ B, E ]
        [ A ]
    ]

because each "layer" in the tree would be run in parallel.
It can be easily seen, that in the tree from above, the jobs for [ I, G, D, C ]
can be run in parallel easily, because they do not have dependencies.

The reimplementation also has another (crucial) benefit: The implementation does
not depend on a structure of artifact path names anymore.

Before, the artifacts needed to have a name as follows:

    <name of the package>-<version of the package>.<something>

which was extremely restrictive.

With the changes from this patch, the implementation does not depend on such
a format anymore.
Instead: Dependencies are associated with a job, by the output of jobs run
for dependent packages.

That means that, considering the above tree of packages:

    deps_of(B) = outputs_of(job_for(C)) + outputs_of(job_for(D))

in text:
    The dependencies of package B are the outputs of the job run for package C
    plus the outputs of the job run for package D.

With that change in place, the outputs of a job run for a package can yield
arbitrary file names and as long as the build script for the package can process
them, everything is fine.

The new algorithm, that solves that issue, is rather simple:

1. Hold a list of errors
2. Hold a list of artifacts that were built
3. Hold a list of jobs that were run
4. Iterate over all jobs, filtered by
    - If the job appears in the "already run jobs" list, ignore it
    - If a job has dependencies (on outputs of other jobs) that do not appear in
      the "already run jobs", ignore it (for now)
5. Run these jobs, and for each job:
    5.1. Take the job UUID and put it in the "already run jobs" list.
    5.2. Take the result of the job,
        5.2.1. if it is an error, put it in the "list of errors"
        5.2.2. if it is ok, put the artifact in the "list of artifacts"
6. if the list of errors is not empty, goto 9
7. if all jobs are in the "already run jobs" list, goto 9
8. goto 4
9. return all artifacts and all errors

Because this approach is fundamentally different than the previous approach, a
lot of things had to be rewritten:

- The `JobSet` type was complete removed

- There is a new type `crate::job:Tree` that gets built from the
  `crate::package::Tree`
  It is a mapping of a UUID (the job UUID) to a `JobDefinition`.
  The `JobDefinition` type is
    - A Job
    - A list of UUIDs of other jobs, where this job depends on the outputs
  It is therefore a mapping of `Job -> outputs(jobs_of(dependencies)`
  The `crate::job::Tree` type is now responsible for building a `Job` object for
  each `crate::package::Package` object from the `crate::package::Tree` object.

  Because the `crate::package::Tree` object contains all required packages for
  the complete built, the implementation of `crate::job::Tree::build_tree()`
  does not check sanity.
  It is assumed that the input tree to the function contains all mappings.

  Despite the name `crate::job::Tree` ("Tree"), the actual structure stored in
  the type is not a real tree.

- The `MergedStores::get_artifact_by_path()` function was adapted because in the
  previous implementation, it used `StagingStore::load_from_path()`, which tried
  to load the file from the filesystem and put it into the internal map, which
  failed if it was already there.
  The adaption checks if the artifact already exists in the internal map and
  returns that object instead.
  (For the release store accordingly)

- The interface of the `RunnableJob::build_from_job()` function was adapted, as
  this function does not need to access the `MergedStores` object anymore to
  load dependency-Artifacts from the filesystem.
  Instead, these Artifacts are passed to the function now.

- The Orchestrator code
    - Got a type alias `JobResult` which represents the result of a job run wich
      is either
        - A number of artifacts (for optimization reasons with their associated
          database artifact entry)
        - or an error with the job uuid that failed (again, for optimization
          reasons)
    - Got an implementation of the algorithm described above
    - Got a new implementation of run_job(), which
        - Fetches the pathes of dependency-artifacts from the database by using
          the job uuids from the JobDefinition object
        - Creates the RunnableJob object for that
        - Schedules the RunnableJob object in the scheduler
        - For each output artifact (database object representing it)
            - get the filesystem Artifact object for it

Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Tested-by: Matthias Beyer <matthias.beyer@atos.net>
M src/commands/build.rs => src/commands/build.rs +2 -3
@@ 33,7 33,6 @@ use crate::filestore::path::StoreRoot;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::JobSet;
use crate::log::LogItem;
use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;


@@ 306,7 305,7 @@ pub async fn build(

    trace!("Setting up job sets");
    let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
    let jobsets = JobSet::sets_from_tree(tree, shebang, image_name, phases.clone(), resources)?;
    let jobtree = crate::job::Tree::from_package_tree(tree, shebang, image_name, phases.clone(), resources);
    trace!("Setting up job sets finished successfully");

    trace!("Setting up Orchestrator");


@@ 324,7 323,7 @@ pub async fn build(
        } else {
            None
        })
        .jobsets(jobsets)
        .jobtree(jobtree)
        .config(config)
        .build()
        .setup()

M src/filestore/merged.rs => src/filestore/merged.rs +20 -48
@@ 9,18 9,17 @@
//

use std::sync::Arc;

use log::trace;
use tokio::sync::RwLock;
use std::path::Path;

use anyhow::Result;
use getset::Getters;
use log::trace;
use tokio::sync::RwLock;

use crate::filestore::Artifact;
use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::package::PackageName;
use crate::package::PackageVersionConstraint;

/// A type that merges the release store and the staging store
///


@@ 40,55 39,22 @@ impl MergedStores {
        MergedStores { release, staging }
    }

    pub async fn get_artifact_by_name_and_version(
        &self,
        name: &PackageName,
        version: &PackageVersionConstraint,
    ) -> Result<Vec<Artifact>> {
        let v = self
            .staging
            .read()
            .await
            .0
            .values()
            .filter(|a| {
                trace!(
                    "Checking {:?} == {:?} && {:?} == {:?}",
                    a.name(),
                    name,
                    version,
                    a.version()
                );
                a.name() == name && version.matches(a.version())
            })
            .cloned()
            .collect::<Vec<_>>();

        if v.is_empty() {
            Ok({
                self.release
                    .read()
                    .await
                    .0
                    .values()
                    .filter(|a| a.name() == name && version.matches(a.version()))
                    .cloned()
                    .collect()
            })
        } else {
            Ok(v)
        }
    }

    pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<Artifact>> {
        trace!("Fetching artifact from path: {:?}", p.display());
        let artifact_path = ArtifactPath::new(p.to_path_buf())?;

        let staging = &mut self.staging.write().await.0;
        let staging_path = staging.root_path().join(&artifact_path)?;
        trace!("staging_path = {:?}", staging_path.display());

        if staging_path.exists() {
            let art_path = ArtifactPath::new(p.to_path_buf())?;
            let art = staging.load_from_path(&artifact_path)?;
            let art = if let Some(art) = staging.get(&artifact_path) {
                art
            } else {
                trace!("Loading path from staging store: {:?}", artifact_path.display());
                staging.load_from_path(&artifact_path)?
            };

            return Ok(Some(art.clone()))
        }



@@ 96,9 62,15 @@ impl MergedStores {

        let release = &mut self.release.write().await.0;
        let release_path = release.root_path().join(&artifact_path)?;
        trace!("release_path = {:?}", release_path);

        if release_path.exists() {
            let art = release.load_from_path(&artifact_path)?;
            let art = if let Some(art) = release.get(&artifact_path) {
                art
            } else {
                trace!("Loading path from release store: {:?}", artifact_path.display());
                release.load_from_path(&artifact_path)?
            };
            return Ok(Some(art.clone()))
        }


M src/filestore/path.rs => src/filestore/path.rs +0 -4
@@ 73,10 73,6 @@ impl StoreRoot {
        FullArtifactPath(self, ap)
    }

    pub(in crate::filestore) fn is_file(&self, subpath: &Path) -> bool {
        self.0.join(subpath).is_file()
    }

    pub(in crate::filestore) fn is_dir(&self, subpath: &Path) -> bool {
        self.0.join(subpath).is_dir()
    }

M src/filestore/staging.rs => src/filestore/staging.rs +0 -5
@@ 9,7 9,6 @@
//

use std::fmt::Debug;
use std::path::Path;

use anyhow::anyhow;
use anyhow::Context;


@@ 101,8 100,4 @@ impl StagingStore {
    pub fn root_path(&self) -> &StoreRoot {
        self.0.root_path()
    }

    pub fn path_exists_in_store_root(&self, path: &Path) -> bool {
        self.0.path_exists_in_store_root(path)
    }
}

M src/filestore/util.rs => src/filestore/util.rs +2 -7
@@ 12,7 12,6 @@
//!

use std::collections::BTreeMap;
use std::path::Path;

use anyhow::anyhow;
use anyhow::Result;


@@ 51,12 50,8 @@ impl FileStoreImpl {
        &self.root
    }

    pub fn path_exists_in_store_root(&self, p: &Path) -> bool {
        self.root.is_file(p)
    }

    pub(in crate::filestore) fn values(&self) -> impl Iterator<Item = &Artifact> {
        self.store.values()
    pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&Artifact> {
        self.store.get(artifact_path)
    }

    pub(in crate::filestore) fn load_from_path(

M src/job/mod.rs => src/job/mod.rs +2 -2
@@ 12,8 12,8 @@
mod job;
pub use job::*;

mod set;
pub use set::*;
mod tree;
pub use tree::*;

mod resource;
pub use resource::*;

M src/job/runnable.rs => src/job/runnable.rs +15 -62
@@ 12,25 12,22 @@ use std::collections::HashMap;

use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use getset::Getters;
use log::{debug, trace, warn};
use tokio::stream::StreamExt;
use log::debug;
use uuid::Uuid;

use crate::config::Configuration;
use crate::filestore::MergedStores;
use crate::filestore::Artifact;
use crate::job::Job;
use crate::job::JobResource;
use crate::package::Package;
use crate::package::ParseDependency;
use crate::package::Script;
use crate::package::ScriptBuilder;
use crate::source::SourceCache;
use crate::source::SourceEntry;
use crate::util::docker::ImageName;
use crate::util::EnvironmentVariableName;
use crate::util::docker::ImageName;

/// A job configuration that can be run. All inputs are clear here.
#[derive(Debug, Getters)]


@@ 56,39 53,22 @@ pub struct RunnableJob {

impl RunnableJob {
    pub async fn build_from_job(
        job: Job,
        merged_stores: &MergedStores,
        job: &Job,
        source_cache: &SourceCache,
        config: &Configuration,
        dependencies: Vec<Artifact>,
    ) -> Result<Self> {
        trace!("Preparing build dependencies");
        let resources = {
            let mut resources = job
                .package()
                .dependencies()
                .build()
                .iter()
                .map(|dep| Self::build_resource(dep, merged_stores))
                .chain({
                    job.package()
                        .dependencies()
                        .runtime()
                        .iter()
                        .map(|dep| Self::build_resource(dep, merged_stores))
                })
                .collect::<futures::stream::FuturesUnordered<_>>()
                .collect::<Result<Vec<JobResource>>>()
                .await?;

            resources.extend({
        // Add the environment from the original Job object to the resources
        let resources = dependencies
            .into_iter()
            .map(JobResource::from)
            .chain({
                job.resources()
                    .iter()
                    .filter(|jr| jr.env().is_some())
                    .cloned()
            });

            resources
        };
            })
            .collect();

        if config.containers().check_env_names() {
            debug!("Checking environment if all variables are allowed!");


@@ 121,9 101,9 @@ impl RunnableJob {
        )?;

        Ok(RunnableJob {
            uuid: job.uuid,
            package: job.package,
            image: job.image,
            uuid: job.uuid.clone(),
            package: job.package.clone(),
            image: job.image.clone(),
            resources,
            source_cache: source_cache.clone(),



@@ 179,31 159,4 @@ impl RunnableJob {
        ]
    }

    async fn build_resource(
        dep: &dyn ParseDependency,
        merged_stores: &MergedStores,
    ) -> Result<JobResource> {
        let (name, vers) = dep.parse_as_name_and_version()?;
        trace!("Copying dep: {:?} {:?}", name, vers);
        let mut a = merged_stores
            .get_artifact_by_name_and_version(&name, &vers)
            .await?;

        if a.is_empty() {
            Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers))
                .context("Building a runnable job")
                .map_err(Error::from)
        } else {
            a.sort();
            let a_len = a.len();
            let found_dependency = a.into_iter().next().unwrap(); // save by above check
            if a_len > 1 {
                warn!("Found more than one dependency for {:?} {:?}", name, vers);
                warn!("Using: {:?}", found_dependency);
                warn!("Please investigate, this might be a BUG");
            }

            Ok(JobResource::Artifact(found_dependency))
        }
    }
}

D src/job/set.rs => src/job/set.rs +0 -351
@@ 1,351 0,0 @@
//
// Copyright (c) 2020-2021 science+computing ag and other contributors
//
// This program and the accompanying materials are made
// available under the terms of the Eclipse Public License 2.0
// which is available at https://www.eclipse.org/legal/epl-2.0/
//
// SPDX-License-Identifier: EPL-2.0
//

use anyhow::Result;
use log::{debug, trace};
use tokio::stream::StreamExt;

use crate::config::Configuration;
use crate::filestore::MergedStores;
use crate::job::Job;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::package::PhaseName;
use crate::package::Shebang;
use crate::package::Tree;
use crate::source::SourceCache;
use crate::util::docker::ImageName;

/// A set of jobs that could theoretically be run in parallel
#[derive(Debug)]
pub struct JobSet {
    set: Vec<Job>,
}

impl JobSet {
    pub fn sets_from_tree(
        t: Tree,
        shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> Result<Vec<JobSet>> {
        tree_into_jobsets(t, shebang, image, phases, resources)
    }

    fn is_empty(&self) -> bool {
        self.set.is_empty()
    }

    pub async fn into_runables<'a>(
        self,
        merged_stores: &'a MergedStores,
        source_cache: &'a SourceCache,
        config: &Configuration,
    ) -> Result<Vec<RunnableJob>> {
        self.set
            .into_iter()
            .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache, config))
            .collect::<futures::stream::FuturesUnordered<_>>()
            .collect::<Result<Vec<RunnableJob>>>()
            .await
    }
}

/// Get the tree as sets of jobs, the deepest level of the tree first
fn tree_into_jobsets(
    tree: Tree,
    shebang: Shebang,
    image: ImageName,
    phases: Vec<PhaseName>,
    resources: Vec<JobResource>,
) -> Result<Vec<JobSet>> {
    fn inner(
        tree: Tree,
        shebang: &Shebang,
        image: &ImageName,
        phases: &[PhaseName],
        resources: &[JobResource],
    ) -> Result<Vec<JobSet>> {
        trace!("Creating jobsets for tree: {:?}", tree);

        let mut sets = vec![];
        let mut current_set = vec![];

        for (package, dep) in tree.into_iter() {
            trace!("Recursing for package: {:?}", package);
            let mut sub_sets = inner(dep, shebang, image, phases, resources)?; // recursion!
            sets.append(&mut sub_sets);
            current_set.push(package);
        }

        debug!("Jobset for set: {:?}", current_set);
        let jobset = JobSet {
            set: current_set
                .into_iter()
                .map(|package| {
                    Job::new(
                        package,
                        shebang.clone(),
                        image.clone(),
                        phases.to_vec(),
                        resources.to_vec(),
                    )
                })
                .collect(),
        };
        debug!("Jobset = {:?}", jobset);

        // make sure the current recursion is added _before_ all other recursions
        // which yields the highest level in the tree as _first_ element of the resulting vector
        let mut result = Vec::new();
        if !jobset.is_empty() {
            debug!("Adding jobset: {:?}", jobset);
            result.push(jobset)
        }
        result.append(&mut sets);
        debug!("Result =  {:?}", result);
        Ok(result)
    }

    inner(tree, &shebang, &image, &phases, &resources).map(|mut v| {
        // reverse, because the highest level in the tree is added as first element in the vector
        // and the deepest level is last.
        //
        // After reversing, we have a chain of things to build. Awesome, huh?
        v.reverse();
        v
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::collections::BTreeMap;

    use crate::package::tests::package;
    use crate::package::tests::pname;
    use crate::package::tests::pversion;
    use crate::package::Dependencies;
    use crate::package::Dependency;
    use crate::package::PhaseName;
    use crate::repository::Repository;
    use crate::util::docker::ImageName;

    use indicatif::ProgressBar;

    fn setup_logging() {
        let _ = ::env_logger::try_init();
    }

    #[test]
    fn test_one_element_tree_to_jobsets() {
        setup_logging();
        let mut btree = BTreeMap::new();

        let p1 = {
            let name = "a";
            let vers = "1";
            let pack = package(name, vers, "https://rust-lang.org", "123");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        assert!(r.is_ok());

        let image = ImageName::from(String::from("test"));
        let phases = vec![PhaseName::from(String::from("testphase"))];
        let shebang = Shebang::from(String::from("#!/bin/bash"));

        let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
        assert!(js.is_ok());
        let js = js.unwrap();

        assert_eq!(js.len(), 1, "There should be only one jobset if there is only one element in the dependency tree: {:?}", js);

        let js = js.get(0).unwrap();
        assert_eq!(
            js.set.len(),
            1,
            "The jobset should contain exactly one job: {:?}",
            js
        );

        let job = js.set.get(0).unwrap();
        assert_eq!(
            *job.package.name(),
            pname("a"),
            "The job should be for the package 'a': {:?}",
            job
        );
    }

    #[test]
    fn test_two_element_tree_to_jobsets() {
        setup_logging();
        let mut btree = BTreeMap::new();

        let p1 = {
            let name = "a";
            let vers = "1";
            let pack = package(name, vers, "https://rust-lang.org", "123");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let p2 = {
            let name = "b";
            let vers = "2";
            let pack = package(name, vers, "https://rust-lang.org", "124");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress.clone());
        assert!(r.is_ok());

        let r = tree.add_package(p2, &repo, progress);
        assert!(r.is_ok());

        let image = ImageName::from(String::from("test"));
        let phases = vec![PhaseName::from(String::from("testphase"))];
        let shebang = Shebang::from(String::from("#!/bin/bash"));

        let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
        assert!(js.is_ok());
        let js = js.unwrap();

        assert_eq!(
            js.len(),
            1,
            "There should be one set of jobs for two packages on the same level in the tree: {:?}",
            js
        );

        let js = js.get(0).unwrap();
        assert_eq!(
            js.set.len(),
            2,
            "The jobset should contain exactly two jobs: {:?}",
            js
        );

        let job = js.set.get(0).unwrap();
        assert_eq!(
            *job.package.name(),
            pname("a"),
            "The job should be for the package 'a': {:?}",
            job
        );

        let job = js.set.get(1).unwrap();
        assert_eq!(
            *job.package.name(),
            pname("b"),
            "The job should be for the package 'a': {:?}",
            job
        );
    }

    #[test]
    fn test_two_dependent_elements_to_jobsets() {
        setup_logging();
        let mut btree = BTreeMap::new();

        let p1 = {
            let name = "a";
            let vers = "1";
            let mut pack = package(name, vers, "https://rust-lang.org", "123");
            {
                let d1 = Dependency::from(String::from("b =2"));
                let ds = Dependencies::with_runtime_dependencies(vec![d1]);
                pack.set_dependencies(ds);
            }
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let _ = {
            let name = "b";
            let vers = "2";
            let pack = package(name, vers, "https://rust-lang.org", "124");
            btree.insert((pname(name), pversion(vers)), pack.clone());
            pack
        };

        let repo = Repository::from(btree);
        let progress = ProgressBar::hidden();

        let mut tree = Tree::default();
        let r = tree.add_package(p1, &repo, progress);
        assert!(r.is_ok());

        let image = ImageName::from(String::from("test"));
        let phases = vec![PhaseName::from(String::from("testphase"))];
        let shebang = Shebang::from(String::from("#!/bin/bash"));

        let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
        assert!(js.is_ok());
        let js = js.unwrap();

        assert_eq!(
            js.len(),
            2,
            "There should be two set of jobs for two packages where one depends on the other: {:?}",
            js
        );

        {
            let first_js = js.get(0).unwrap();
            assert_eq!(
                first_js.set.len(),
                1,
                "The first jobset should contain exactly one job: {:?}",
                js
            );

            let job = first_js.set.get(0).unwrap();
            assert_eq!(
                *job.package.name(),
                pname("b"),
                "The job from the first set should be for the package 'b': {:?}",
                job
            );
        }

        {
            let second_js = js.get(1).unwrap();
            assert_eq!(
                second_js.set.len(),
                1,
                "The second jobset should contain exactly one job: {:?}",
                js
            );

            let job = second_js.set.get(0).unwrap();
            assert_eq!(
                *job.package.name(),
                pname("a"),
                "The job should be for the package 'a': {:?}",
                job
            );
        }
    }
}

A src/job/tree.rs => src/job/tree.rs +80 -0
@@ 0,0 1,80 @@
//
// Copyright (c) 2020-2021 science+computing ag and other contributors
//
// This program and the accompanying materials are made
// available under the terms of the Eclipse Public License 2.0
// which is available at https://www.eclipse.org/legal/epl-2.0/
//
// SPDX-License-Identifier: EPL-2.0
//

use std::collections::BTreeMap;

use uuid::Uuid;
use getset::Getters;

use crate::job::Job;
use crate::job::JobResource;
use crate::package::PhaseName;
use crate::package::Shebang;
use crate::util::docker::ImageName;

#[derive(Debug, Getters)]
pub struct Tree {
    #[getset(get = "pub")]
    inner: BTreeMap<Uuid, JobDefinition>,
}

impl Tree {
    pub fn from_package_tree(pt: crate::package::Tree,
        script_shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> Self {
        Tree { inner: Self::build_tree(pt, script_shebang, image, phases, resources) }
    }

    fn build_tree(pt: crate::package::Tree,
        script_shebang: Shebang,
        image: ImageName,
        phases: Vec<PhaseName>,
        resources: Vec<JobResource>,
    ) -> BTreeMap<Uuid, JobDefinition> {
        let mut tree = BTreeMap::new();

        for (package, dependencies) in pt.into_iter() {
            let mut deps = Self::build_tree(dependencies,
                script_shebang.clone(),
                image.clone(),
                phases.clone(),
                resources.clone());

            let deps_uuids = deps.keys().cloned().collect();
            tree.append(&mut deps);

            let job = Job::new(package,
                script_shebang.clone(),
                image.clone(),
                phases.clone(),
                resources.clone());

            let job_uuid = job.uuid().clone();
            let jdef = JobDefinition { job, dependencies: deps_uuids };

            tree.insert(job_uuid, jdef);
        }

        tree
    }

}

/// A job definition is the job itself and all UUIDs from jobs this job depends on.
#[derive(Debug)]
pub struct JobDefinition {
    pub job: Job,

    /// Uuids of the jobs where this job depends on the outputs
    pub dependencies: Vec<Uuid>,
}

M src/orchestrator/orchestrator.rs => src/orchestrator/orchestrator.rs +151 -111
@@ 12,25 12,27 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use diesel::PgConnection;
use indicatif::ProgressBar;
use log::trace;
use tokio::sync::RwLock;
use tokio::stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::config::Configuration;
use crate::db::models::Artifact;
use crate::db::models::Submit;
use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::Artifact;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobSet;
use crate::job::JobDefinition;
use crate::job::RunnableJob;
use crate::job::Tree as JobTree;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;



@@ 39,8 41,9 @@ pub struct Orchestrator<'a> {
    progress_generator: ProgressBars,
    merged_stores: MergedStores,
    source_cache: SourceCache,
    jobsets: Vec<JobSet>,
    jobtree: JobTree,
    config: &'a Configuration,
    database: Arc<PgConnection>,
}

#[derive(TypedBuilder)]


@@ 50,9 53,9 @@ pub struct OrchestratorSetup<'a> {
    staging_store: Arc<RwLock<StagingStore>>,
    release_store: Arc<RwLock<ReleaseStore>>,
    source_cache: SourceCache,
    jobsets: Vec<JobSet>,
    jobtree: JobTree,
    database: Arc<PgConnection>,
    submit: Submit,
    submit: dbmodels::Submit,
    log_dir: Option<PathBuf>,
    config: &'a Configuration,
}


@@ 62,7 65,7 @@ impl<'a> OrchestratorSetup<'a> {
        let scheduler = EndpointScheduler::setup(
            self.endpoint_config,
            self.staging_store.clone(),
            self.database,
            self.database.clone(),
            self.submit.clone(),
            self.log_dir,
        )


@@ 73,125 76,162 @@ impl<'a> OrchestratorSetup<'a> {
            progress_generator: self.progress_generator,
            merged_stores: MergedStores::new(self.release_store, self.staging_store),
            source_cache: self.source_cache,
            jobsets: self.jobsets,
            jobtree: self.jobtree,
            config: self.config,
            database: self.database,
        })
    }
}

/// Helper type
///
/// Represents a result that came from the run of a job inside a container
///
/// It is either a list of artifacts (with their respective database artifact objects)
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>;

impl<'a> Orchestrator<'a> {
    pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
        for jobset in self.jobsets.into_iter() {
            let errs = Self::run_jobset(
                &self.scheduler,
                &self.merged_stores,
                &self.source_cache,
                &self.config,
                &self.progress_generator,
                jobset,
                output,
            )
            .await?;
    pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
        let (results, errors) = self.run_tree().await?;
        output.extend(results.into_iter().map(|(_, dba)| dba));
        Ok(errors)
    }

    async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
        use futures::FutureExt;

        let mut already_built = vec![];
        let mut artifacts = vec![];
        let mut errors = vec![];

        loop {
            // loop{}
            //  until for all elements of self.jobtree, the uuid exists in already_built
            //
            //  for each element in jobtree
            //      where dependencies(element) all in already_built
            //      run_job_for(element)
            //
            //  for results from run_job_for calls
            //      remember UUID in already_built
            //      put built artifacts in artifacts
            //      if error, abort everything
            //
            //
            let multibar = Arc::new(indicatif::MultiProgress::new());
            let build_results = self.jobtree
                .inner()
                .iter()
                .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built`
                    trace!("Filtering job definition: {:?}", jobdef);
                    jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid)
                })
                .map(|(uuid, jobdef)| {
                    trace!("Running job {}", uuid);
                    let bar = multibar.add(self.progress_generator.bar());
                    let uuid = uuid.clone();
                    self.run_job(jobdef, bar).map(move |r| (uuid, r))
                })
                .collect::<futures::stream::FuturesUnordered<_>>()
                .collect::<Vec<(_, Result<JobResult>)>>();

            let multibar_block = tokio::task::spawn_blocking(move || multibar.join());                        
            let (_, build_results) = tokio::join!(multibar_block, build_results);

            for (uuid, artifact_result) in build_results.into_iter() {
                already_built.push(uuid);

                match artifact_result {
                    Ok(Ok(mut arts)) => artifacts.append(&mut arts),
                    Ok(Err((uuid, e))) => { // error during job running
                        log::error!("Error for job {} = {}", uuid, e);
                        errors.push((uuid, e));
                    },

                    Err(e) => return Err(e), // error during container execution
                }
            }

            if !errors.is_empty() {
                break
            }

            if !errs.is_empty() {
                return Ok(errs);
            // already_built.sort(); // TODO: optimization for binary search in
            // above and below contains() clause

            if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) {
                break
            }
        }

        Ok(vec![])
        Ok((artifacts, errors))
    }

    async fn run_jobset(
        scheduler: &EndpointScheduler,
        merged_store: &MergedStores,
        source_cache: &SourceCache,
        config: &Configuration,
        progress_generator: &ProgressBars,
        jobset: JobSet,
        output: &mut Vec<Artifact>,
    ) -> Result<Vec<(Uuid, anyhow::Error)>> {
        use tokio::stream::StreamExt;

        let multibar = Arc::new(indicatif::MultiProgress::new());
        let results = jobset // run the jobs in the set
            .into_runables(&merged_store, source_cache, config)
            .await?
            .into_iter()
            .map(|runnable| {
                let bar = multibar.add(progress_generator.bar());

                async {
                    let uuid = *runnable.uuid();
                    Self::run_runnable(runnable, scheduler, bar)
                        .await
                        .map_err(|e| (uuid, e))
                }
            })
            .collect::<futures::stream::FuturesUnordered<_>>()
            .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>();

        let multibar_block = tokio::task::spawn_blocking(move || multibar.join());

        let (results, barres) = tokio::join!(results, multibar_block);
        let _ = barres?;
        let (okays, errors): (Vec<_>, Vec<_>) = results
            .into_iter()
            .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
            .partition(|e| e.is_ok());

        let results = okays
            .into_iter()
            .filter_map(Result::ok)
            .flatten()
            .collect::<Vec<Artifact>>();

        {
            // check if all paths that were written are actually there in the staging store
            let staging_store_lock = merged_store.staging().read().await;

            trace!("Checking {} results...", results.len());
            for artifact in results.iter() {
                let a_path = artifact.path_buf();
                trace!("Checking path: {}", a_path.display());
                if !staging_store_lock.path_exists_in_store_root(&a_path) {
                    return Err(anyhow!(
                        "Result path {} is missing from staging store",
                        a_path.display()
                    ))
                    .with_context(|| {
                        anyhow!(
                            "Should be: {}/{}",
                            staging_store_lock.root_path().display(),
                            a_path.display()
                        )
    async fn run_job(&self, jobdef: &JobDefinition, bar: ProgressBar) -> Result<JobResult> {
        let dependency_artifacts = self.get_dependency_artifacts_for_jobs(&jobdef.dependencies).await?;
        bar.set_message("Preparing...");

        let runnable = RunnableJob::build_from_job(
            &jobdef.job,
            &self.source_cache,
            &self.config,
            dependency_artifacts)
            .await?;

        bar.set_message("Scheduling...");
        let job_uuid = jobdef.job.uuid().clone();
        match self.scheduler.schedule_job(runnable, bar).await?.run().await {
            Err(e) => return Ok(Err((job_uuid, e))),
            Ok(db_artifacts) => {
                db_artifacts.into_iter()
                    .map(|db_artifact| async {
                        trace!("Getting store Artifact for db Artifact: {:?}", db_artifact);
                        let art = self.get_store_artifact_for(&db_artifact).await?;
                        trace!("Store Artifact: {:?}", art);
                        Ok(Ok((art, db_artifact)))
                    })
                    .map_err(Error::from);
                }
            }
                    .collect::<futures::stream::FuturesUnordered<_>>()
                    .collect::<Result<JobResult>>()
                    .await
            },
        }
    }

        let mut results = results; // rebind
        output.append(&mut results);
        Ok(errors.into_iter().filter_map(Result::err).collect())
    /// Get all dependency artifacts for the job from the database
    ///
    /// Use the JobDefinition object and find all dependency outputs in the database
    async fn get_dependency_artifacts_for_jobs(&self, uuids: &[Uuid]) -> Result<Vec<Artifact>> {
        use crate::schema;
        use crate::diesel::ExpressionMethods;
        use crate::diesel::QueryDsl;
        use crate::diesel::RunQueryDsl;

        // Pseudo code:
        //
        // * return for uuid in uuids:
        //      self.database.get(job).get_artifacts()

        schema::artifacts::table
            .left_outer_join(schema::jobs::table)
            .filter(schema::jobs::uuid.eq_any(uuids))
            .select(schema::artifacts::all_columns)
            .load::<dbmodels::Artifact>(&*self.database)?
            .iter()
            .map(|dbart| self.get_store_artifact_for(dbart))
            .collect::<futures::stream::FuturesUnordered<_>>()
            .collect()
            .await
    }

    async fn run_runnable(
        runnable: RunnableJob,
        scheduler: &EndpointScheduler,
        bar: indicatif::ProgressBar,
    ) -> Result<Vec<Artifact>> {
        let job_id = *runnable.uuid();
        trace!(
            "Runnable {} for package {}",
            job_id,
            runnable.package().name()
        );

        let jobhandle = scheduler.schedule_job(runnable, bar).await?;
        trace!("Jobhandle -> {:?}", jobhandle);

        let r = jobhandle.run().await;
        trace!("Found result in job {}: {:?}", job_id, r);
        r
    async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> {
        let p = PathBuf::from(&db_artifact.path);
        self.merged_stores
            .get_artifact_by_path(&p)
            .await?
            .ok_or_else(|| {
                anyhow!("Artifact not found in {}", p.display())
            })
    }
}