~rjarry/dlrepo

bd1c23893882423e61ce83752d4357cd1be621d1 — Robin Jarry 2 years ago 87a0105
server: add filesystem api

Signed-off-by: Robin Jarry <robin@jarry.cc>
A dlrepo/__init__.py => dlrepo/__init__.py +0 -0
A dlrepo/cleanup.py => dlrepo/cleanup.py +125 -0
@@ 0,0 1,125 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

"""
Automatically cleanup old tags according to the branch cleanup policy.
"""

import argparse
import os
import pathlib
import re
import sys
import time

from dlrepo.fs import ArtifactRepository
from dlrepo.fs.tag import Tag


# --------------------------------------------------------------------------------------
def main():
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "-q",
        "--quiet",
        default=False,
        action="store_true",
        help="""
        Only display errors.
        """,
    )
    parser.add_argument(
        "-p",
        "--root-path",
        type=local_dir,
        default=default_root_path(),
        help="""
        The root path of the repository. Default to DLREPO_ROOT_PATH from the
        environment or from /etc/default/dlrepo.
        """,
    )
    args = parser.parse_args()
    try:
        cleanup(args)
    except Exception as e:
        print(f"error: {e}", file=sys.stderr)
        return 1
    return 0


# --------------------------------------------------------------------------------------
def cleanup(args):
    repo = ArtifactRepository(args.root_path)
    start = time.time()
    deleted = 0

    for branch in repo.get_branches():
        released_tags = []
        daily_tags = []

        for tag in branch.get_tags():
            if tag.is_locked():
                continue
            if tag.is_released():
                released_tags.append(tag)
            else:
                daily_tags.append(tag)

        released_tags.sort(key=Tag.creation_date, reverse=True)
        daily_tags.sort(key=Tag.creation_date, reverse=True)

        policy = branch.get_cleanup_policy()
        max_daily = policy.get("max_daily_tags", 0)
        if isinstance(max_daily, int) and max_daily > 0:
            for tag in daily_tags[max_daily:]:
                if not args.quiet:
                    print(f"Deleting daily tag {branch.name}/{tag.name} ...")
                tag.delete(cleanup_orphans=False)
                deleted += 1

        max_released = policy.get("max_released_tags", 0)
        if isinstance(max_released, int) and max_released > 0:
            for tag in released_tags[max_released:]:
                if not args.quiet:
                    print(f"Deleting released tag {branch.name}/{tag.name} ...")
                tag.delete(force=True, cleanup_orphans=False)
                deleted += 1

    repo.cleanup_orphan_blobs()

    for user_repo in repo.get_user_repos():
        user_repo.disk_usage_refresh()
        user_repo.disk_usage_save()

    if not args.quiet and deleted > 0:
        print(f"Deleted {deleted} tags in {time.time() - start:.1f}s")


# --------------------------------------------------------------------------------------
def local_dir(value):
    value = pathlib.Path(value)
    if not value.is_dir():
        raise argparse.ArgumentTypeError(f"{value}: No such directory")
    return value


# --------------------------------------------------------------------------------------
def default_root_path():
    if "DLREPO_ROOT_PATH" in os.environ:
        return pathlib.Path(os.environ["DLREPO_ROOT_PATH"])

    default_file = pathlib.Path("/etc/default/dlrepo")
    if default_file.is_file():
        match = re.search(
            r"DLREPO_ROOT_PATH=(.*)", default_file.read_text(encoding="utf-8")
        )
        if match:
            return pathlib.Path(match.group(1).strip().strip("\"'"))

    return pathlib.Path(".")


# --------------------------------------------------------------------------------------
if __name__ == "__main__":
    sys.exit(main())

A dlrepo/fs/__init__.py => dlrepo/fs/__init__.py +377 -0
@@ 0,0 1,377 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import hashlib
import logging
import os
from pathlib import Path
import shutil
import time
from typing import Awaitable, Callable, Iterator, Optional

from cachetools import LFUCache

from .branch import Branch
from .container import ContainerRegistry
from .product import Product
from .util import human_readable, parse_digest


LOG = logging.getLogger(__name__)
CHUNK_SIZE = int(os.getenv("DLREPO_CHUNK_SIZE", str(256 * 1024)))


# --------------------------------------------------------------------------------------
class AbstractRepository:
    def __init__(self, path: str):
        self._path = Path(path)
        self.parent = None
        self.container_registry = ContainerRegistry(self)

    def get_branches(self) -> Iterator[Branch]:
        yield from Branch.all(self)

    def get_branch(self, name: str) -> Branch:
        return Branch(self, name)

    def get_products(self) -> Iterator[Product]:
        yield from Product.all(self)

    def get_product(self, name: str) -> Product:
        return Product(self, name)

    def path(self) -> Path:
        return self._path

    def create(self):
        self._path.mkdir(mode=0o755, parents=True, exist_ok=True)

    def url_bit(self) -> Optional[str]:
        raise NotImplementedError()

    def cleanup_orphan_blobs(self):
        raise NotImplementedError()

    def next_upload(self) -> str:
        raise NotImplementedError()

    def blob_path(self, digest: str, parent: Optional[Path] = None) -> Path:
        raise NotImplementedError()

    def rmtree(self, path: Path):
        raise NotImplementedError()

    async def update_upload(
        self, uuid: str, stream: Callable[[int], Awaitable[bytes]]
    ) -> int:
        raise NotImplementedError()

    def cancel_upload(self, uuid: str):
        raise NotImplementedError()

    def finalize_upload(self, uuid: str, digest: str) -> Path:
        raise NotImplementedError()

    def link_blob(self, digest: str, target: Path):
        raise NotImplementedError()

    def link_blob_ignore_quota(self, digest: str, target: Path):
        raise NotImplementedError()


# --------------------------------------------------------------------------------------
class ArtifactRepository(AbstractRepository):
    def __init__(self, path: str):
        super().__init__(path)
        self.blobs = self._path / ".blobs"
        self.uploads = self._path / ".uploads"
        self.user_repos = LFUCache(maxsize=512)

    def get_user_repo(self, user: str) -> "UserRepository":
        user = user.lower()
        repo = self.user_repos.get(user)
        if repo is None:
            repo = self.user_repos[user] = UserRepository(self, user)
        return repo

    def get_user_repos(self) -> Iterator["UserRepository"]:
        yield from UserRepository.all(self)

    def url_bit(self) -> Optional[str]:
        return None

    def url(self) -> str:
        return "/"

    def cleanup_orphan_blobs(self):
        # TODO: call blocking stuff (open, stat, unlink) in a thread?
        for folder in (self.blobs, self.uploads):
            _cleanup_orphans(folder)

    def next_upload(self) -> str:
        self.uploads.mkdir(mode=0o755, parents=True, exist_ok=True)
        while True:
            try:
                path = self.uploads / os.urandom(16).hex()
                path.touch(mode=0o600, exist_ok=False)
                return path.name
            except FileExistsError:
                continue

    def upload_path(self, uuid: str):
        if "/" in uuid or ".." in uuid:
            raise FileNotFoundError(uuid)
        path = self.uploads / uuid
        if not path.is_file():
            raise FileNotFoundError(uuid)
        return path

    def blob_path(self, digest: str, parent: Optional[Path] = None) -> Path:
        if not digest:
            raise ValueError(f"invalid digest: {digest}")
        if parent is None:
            parent = self.blobs
        algo, digest = parse_digest(digest.lower())
        path = parent / f"{algo}/{digest[:2]}/{digest}"
        if path.is_file():
            path.touch()
        return path

    def rmtree(self, path: Path):
        shutil.rmtree(path, onerror=_rmtree_error_cb)

    async def update_upload(
        self, uuid: str, stream: Callable[[int], Awaitable[bytes]]
    ) -> int:
        return await _stream_to_file(stream, self.upload_path(uuid))

    def cancel_upload(self, uuid: str):
        self.upload_path(uuid).unlink()

    def finalize_upload(self, uuid: str, digest: str) -> Path:
        return _check_and_move(self.upload_path(uuid), self.blob_path(digest), digest)

    def link_blob(self, digest: str, target: Path):
        _hardlink(self.blob_path(digest), target)

    def link_blob_ignore_quota(self, digest: str, target: Path):
        _hardlink(self.blob_path(digest), target)


# --------------------------------------------------------------------------------------
class UserRepository(AbstractRepository):
    def __init__(self, base: ArtifactRepository, user: str):
        self.base = base
        self.user = user
        super().__init__(base.path() / "users" / user)
        self.disk_usage = self._parse_disk_usage_file()
        self.disk_usage_dirty = False

    @classmethod
    def all(cls, base: ArtifactRepository) -> Iterator["UserRepository"]:
        path = base.path() / "users"
        if not path.is_dir():
            return
        for d in path.iterdir():
            if not d.is_dir():
                continue
            yield cls(base, d.name)

    def _parse_disk_usage_file(self) -> int:
        du_path = self.path() / ".disk_usage"
        if du_path.is_file():
            try:
                return int(du_path.read_text(encoding="utf-8"))
            except ValueError:
                pass
        return 0

    def disk_usage_refresh(self):
        self.disk_usage = 0
        for root, _, files in os.walk(self.path()):
            for f in files:
                self.disk_usage += Path(root, f).stat().st_size
        self.disk_usage_dirty = True

    def disk_usage_save(self):
        if not self.disk_usage_dirty:
            return
        self.create()
        du_path = self.path() / ".disk_usage"
        du_path.write_text(str(self.disk_usage), encoding="utf-8")
        self.disk_usage_dirty = False

    # 10G max usage per user
    QUOTA = int(os.getenv("DLREPO_USER_QUOTA", str(10 * (1024 ** 3))))

    def disk_usage_add(self, usage: int):
        new_usage = self.disk_usage + usage
        if new_usage > self.QUOTA:
            usage = f"{human_readable(self.disk_usage)}/{human_readable(self.QUOTA)}"
            raise PermissionError(
                f"User {self.user} quota exceeded ({usage}). Please make some ménage."
            )
        self.disk_usage = new_usage
        self.disk_usage_dirty = True

    def disk_usage_rm(self, usage: int):
        self.disk_usage = max(0, self.disk_usage - usage)
        self.disk_usage_dirty = True

    def url_bit(self) -> Optional[str]:
        return f"~{self.user}"

    def url(self) -> str:
        return f"/~{self.user}/"

    def cleanup_orphan_blobs(self):
        self.base.cleanup_orphan_blobs()

    def next_upload(self) -> str:
        return self.base.next_upload()

    def cancel_upload(self, uuid: str):
        self.disk_usage_rm(self.base.upload_path(uuid).stat().st_size)
        try:
            self.base.cancel_upload(uuid)
        finally:
            self.disk_usage_save()

    def rmtree(self, path: Path):
        removed = 0
        for root, _, files in os.walk(path):
            for f in files:
                removed += Path(root, f).stat().st_size
        self.disk_usage_rm(removed)
        try:
            self.base.rmtree(path)
        finally:
            self.disk_usage_save()

    async def update_upload(
        self, uuid: str, stream: Callable[[int], Awaitable[bytes]]
    ) -> int:
        try:
            return await _stream_to_file(stream, self.base.upload_path(uuid), self)
        finally:
            self.disk_usage_save()

    def finalize_upload(self, uuid: str, digest: str) -> Path:
        try:
            return _check_and_move(
                self.base.upload_path(uuid), self.base.blob_path(digest), digest, self
            )
        finally:
            self.disk_usage_save()

    def blob_path(self, digest: str, parent: Optional[Path] = None) -> Path:
        return self.base.blob_path(digest, parent=parent)

    def link_blob(self, digest: str, target: Path):
        try:
            _hardlink(self.base.blob_path(digest), target, self)
        finally:
            self.disk_usage_save()

    def link_blob_ignore_quota(self, digest: str, target: Path):
        _hardlink(self.base.blob_path(digest), target)


# --------------------------------------------------------------------------------------
def _rmtree_error_cb(func, path, exc_info):
    # nothing much we can do here, simply log a message
    LOG.error("%s(%r) failed:", func, path, exc_info=exc_info)


async def _stream_to_file(
    stream: Callable[[int], Awaitable[bytes]],
    path: Path,
    user_repo: Optional[UserRepository] = None,
) -> int:
    # TODO: call blocking stuff (open, write, unlink) in a thread?
    try:
        with path.open("ab") as f:
            while True:
                chunk = await stream(CHUNK_SIZE)
                if not chunk:
                    break
                if user_repo is not None:
                    user_repo.disk_usage_add(len(chunk))
                f.write(chunk)
    except:
        if user_repo is not None:
            user_repo.disk_usage_rm(path.stat().st_size)
        path.unlink()
        raise
    return path.stat().st_size


# --------------------------------------------------------------------------------------
def _hardlink(src: Path, dst: Path, user_repo: Optional[UserRepository] = None):
    if not src.is_file():
        raise FileNotFoundError()
    dst.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
    if dst.is_file():
        if user_repo is not None:
            user_repo.disk_usage_rm(dst.stat().st_size)
        dst.unlink()
    if user_repo is not None:
        user_repo.disk_usage_add(src.stat().st_size)
    os.link(src, dst)


# --------------------------------------------------------------------------------------
def _file_digest(algo: str, path: Path) -> str:
    h = hashlib.new(algo)
    buf = bytearray(CHUNK_SIZE)
    view = memoryview(buf)
    # TODO: call blocking stuff (open, readinto) in a thread?
    with path.open("rb") as f:
        while True:
            n = f.readinto(buf)
            if not n:
                break
            h.update(view[:n])
    return h.hexdigest()


# --------------------------------------------------------------------------------------
def _check_and_move(
    src: Path, dst: Path, digest: str, user_repo: Optional[UserRepository] = None
):
    algo, dig = parse_digest(digest.lower())
    if _file_digest(algo, src) != dig:
        if user_repo is not None:
            user_repo.disk_usage_rm(src.stat().st_size)
        src.unlink()
        try:
            os.removedirs(src.parent)
        except OSError:
            pass
        raise ValueError(f"Received data does not match digest: {digest}")
    dst.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
    src.rename(dst)
    dst.chmod(0o644)
    return dst


# --------------------------------------------------------------------------------------
ORPHAN_BLOB_LIFETIME = int(os.getenv("DLREPO_ORPHAN_BLOB_LIFETIME", "600"))


def _cleanup_orphans(folder: Path):
    if not folder.is_dir():
        return
    now = time.time()
    for root, _, files in os.walk(folder):
        for f in files:
            f = Path(root, f)
            if not f.is_file():
                continue
            stat = f.stat()
            if stat.st_nlink > 1:
                continue
            if now - stat.st_mtime < ORPHAN_BLOB_LIFETIME:
                continue
            f.unlink()

A dlrepo/fs/branch.py => dlrepo/fs/branch.py +82 -0
@@ 0,0 1,82 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import json
import logging
from pathlib import Path
from typing import Callable, Dict, Iterator

from .tag import Tag
from .util import SubDir


LOG = logging.getLogger(__name__)


# --------------------------------------------------------------------------------------
class Branch(SubDir):
    """
    TODO
    """

    ROOT_DIR = "branches"

    @classmethod
    def parent_path(cls, parent: "ArtifactRepository") -> Path:
        return parent.path() / cls.ROOT_DIR

    def url_bit(self) -> str:
        return f"branches/{self.name}"

    def get_tags(self, access_cb: Callable[[str], bool] = None) -> Iterator[Tag]:
        for t in Tag.all(self):
            if access_cb is not None and not access_cb(t.url()):
                continue
            yield t

    def get_tag(self, name: str, access_cb: Callable[[str], bool] = None) -> Tag:
        if name in ("latest", "stable"):
            tags = list(self.get_tags(access_cb))
            tags.sort(key=Tag.creation_date, reverse=True)
            for t in tags:
                if name == "latest":
                    return t
                if t.is_released():
                    return t
            raise FileNotFoundError(name)
        return Tag(self, name)

    def _policy_path(self) -> Path:
        return self._path / ".cleanup_policy"

    def set_cleanup_policy(self, max_daily_tags: int, max_released_tags: int):
        policy = {
            "max_daily_tags": max_daily_tags,
            "max_released_tags": max_released_tags,
        }
        self._policy_path().write_text(json.dumps(policy))

    def get_cleanup_policy(self) -> Dict[str, int]:
        try:
            policy = json.loads(self._policy_path().read_text())
        except (OSError, ValueError):
            policy = {}
        for field in "max_daily_tags", "max_released_tags":
            if field not in policy:
                policy[field] = 0
        return policy

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
        if not self.exists():
            raise FileNotFoundError()
        for t in self.get_tags():
            if t.is_locked():
                raise OSError(f"Tag {t.name} is locked")
            if not force and t.is_released():
                raise OSError(f"Tag {t.name} is released, use force")
        for t in self.get_tags():
            t.delete(force=force, cleanup_orphans=False)
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()

A dlrepo/fs/container.py => dlrepo/fs/container.py +124 -0
@@ 0,0 1,124 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import hashlib
import json
from pathlib import Path
from typing import Dict, List, Tuple
import weakref

from .util import parse_digest


# --------------------------------------------------------------------------------------
class ContainerRegistry:
    def __init__(self, repo: "AbstractRepository"):
        self.repo = weakref.proxy(repo)

    def repositories(self) -> List[str]:
        repos = set()
        for d in self.repo.path().glob("branches/*/*/*/container"):
            if d.is_dir():
                job = d.parent.name
                branch = d.parent.parent.parent.name
                repos.add(f"{branch}/{job}")
        for d in self.repo.path().glob("products/*/*/*/*/container"):
            if d.is_dir():
                product_branch = d.parent.parent.name
                variant = d.parent.parent.parent.name
                product = d.parent.parent.parent.parent.name
                repos.add(f"{product}/{variant}/{product_branch}")
        return list(repos)

    def product_tags(self, product: str, variant: str, branch: str) -> List[str]:
        tags = set()
        glob = f"products/{product}/{variant}/{branch}/*/container"
        for d in self.repo.path().glob(glob):
            if d.is_dir():
                tags.add(d.parent.name)
        if tags:
            tags.add("latest")
        return sorted(tags)

    def job_tags(self, branch: str, job: str) -> List[str]:
        tags = set()
        glob = f"branches/{branch}/*/{job}/container"
        for d in self.repo.path().glob(glob):
            if d.is_dir():
                tags.add(d.parent.parent.name)
        if tags:
            tags.add("latest")
        return sorted(tags)

    def blob_link_path(self, parent_path: Path, digest: str):
        if not parent_path.is_dir():
            raise FileNotFoundError()
        algo, digest = parse_digest(digest.lower())
        return parent_path / f"container/blobs/{algo}/{digest[:2]}/{digest}"

    def manifest_by_digest(self, digest: str) -> Path:
        path = self.repo.blob_path(digest)
        if not path.is_file():
            raise FileNotFoundError()
        return path

    def manifest_by_parent(self, parent_path: Path) -> Tuple[Path, str]:
        if not parent_path.is_dir():
            raise FileNotFoundError()
        files = list(parent_path.glob("container/manifests/*/*/*"))
        if len(files) != 1:
            raise FileNotFoundError()
        if not files[0].is_file():
            raise FileNotFoundError()
        path = files[0]
        digest = path.name
        algo = path.parent.parent.name
        return (path, f"{algo}:{digest}")

    def _link_blob_to_job(self, digest: str, job: "Job") -> str:
        job_path = self.repo.blob_path(digest, parent=job.path() / "container/blobs")
        self.repo.link_blob(digest, job_path)
        return job_path.stat().st_size

    def new_manifest(self, job: "Job", manifest: Dict) -> str:
        # For an unknown reason, the manifest json file uploaded by "docker push"
        # does not specify a "size" field for every blob.
        # While "docker pull" can perfectly live with that and k8s/dockershim,
        # k8s/containerd on the other hand considers that the blob is of size 0
        # and issues an error like:
        #
        #   failed commit on ref "config-sha256:5b238[...]": commit failed:
        #   unexpected commit digest sha256:e3b0c[...], expected sha256:5b238[...]:
        #   failed precondition
        #
        # Indeed, e3b0c[...] is the sha256 of the empty string.
        #
        # To fix this, we add the missing 'size' fields into the manifest json
        # file right after it is uploaded. The blobs being uploaded before the
        # manifest, we can read their size on the filesystem.

        # link config blob into the job folder
        config = manifest.get("config", {})
        config["size"] = self._link_blob_to_job(config.get("digest"), job)

        # link layers blobs into the job folder
        for l in manifest.get("layers", []):
            l["size"] = self._link_blob_to_job(l.get("digest"), job)

        data = json.dumps(manifest).encode("utf-8")
        digest = "sha256:" + hashlib.sha256(data).hexdigest()

        manifest_path = self.repo.blob_path(digest)
        if not manifest_path.is_file():
            manifest_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
            manifest_path.write_bytes(data)

        job_manifest_path = self.repo.blob_path(
            digest, parent=job.path() / "container/manifests"
        )
        if job_manifest_path.is_file():
            job_manifest_path.unlink()
        self.repo.link_blob(digest, job_manifest_path)

        return digest

A dlrepo/fs/fmt.py => dlrepo/fs/fmt.py +150 -0
@@ 0,0 1,150 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import json
import logging
import os
from pathlib import Path
from typing import Awaitable, Callable, Dict, Iterator, List, Tuple

from cachetools import LRUCache, cachedmethod

from .util import SubDir


LOG = logging.getLogger(__name__)


# --------------------------------------------------------------------------------------
class ArtifactFormat(SubDir):
    """
    TODO
    """

    def url_bit(self) -> str:
        return self.name

    def get_files(self) -> Iterator[str]:
        for root, dirs, files in os.walk(self._path):
            dirs.sort()
            files.sort()
            for f in files:
                f = Path(root, f)
                if self._is_reserved_file(f):
                    continue
                if f.is_file():
                    yield str(f.relative_to(self._path))

    def archive_name(self) -> str:
        return f"{self.parent.archive_name()}-{self.name}"

    _is_reserved_cache = LRUCache(4096)

    @cachedmethod(lambda self: self._is_reserved_cache)
    def _is_reserved_file(self, path, *, resolve=False):
        internal = self._internal_path()
        digests = self._digest_path()
        dirty = self._dirty_path()
        if resolve:
            internal = internal.resolve()
            digests = digests.resolve()
            dirty = dirty.resolve()
        return path in (internal, digests, dirty)

    def list_dir(self, relpath: str) -> Tuple[List[str], List[str]]:
        path = self.get_filepath(relpath)
        if not path.is_dir():
            raise NotADirectoryError(relpath)
        dirs = []
        files = []
        for e in path.iterdir():
            if self._is_reserved_file(e, resolve=True):
                continue
            if e.is_dir():
                dirs.append(e.name)
            elif e.is_file():
                files.append(e.name)
        return dirs, files

    def _check_filepath(self, relpath: str) -> Path:
        if relpath.startswith("/") or any(x in (".", "..") for x in relpath.split("/")):
            raise PermissionError(relpath)
        path = self._path / relpath
        if self._is_reserved_file(path):
            raise PermissionError(relpath)
        return path

    def get_filepath(self, relpath: str) -> Path:
        return self._check_filepath(relpath).resolve(strict=True)

    def get_digests(self) -> Dict[str, str]:
        try:
            return json.loads(self._digest_path().read_text())
        except (OSError, ValueError):
            return {}

    def _digest_path(self) -> Path:
        return self._path / ".digests"

    async def add_file(
        self,
        relpath: str,
        read: Callable[[int], Awaitable[bytes]],
        digest: str,
    ):
        self._check_filepath(relpath)
        uuid = self.root().next_upload()
        await self.root().update_upload(uuid, read)
        self.root().finalize_upload(uuid, digest)
        # avoid counting disk usage twice (already counted in update_upload()
        self.link_file(digest, relpath, ignore_quota=True)

    def link_file(self, digest: str, relpath: str, ignore_quota: bool = False):
        was_dirty = self.is_dirty()
        self.set_dirty(True)
        try:
            path = self._check_filepath(relpath)
            if ignore_quota:
                self.root().link_blob_ignore_quota(digest, path)
            else:
                self.root().link_blob(digest, path)
        except:
            if not was_dirty:
                self.set_dirty(False)
            raise
        # update digests file
        digests = self.get_digests()
        digests[relpath] = digest
        self._digest_path().write_text(json.dumps(digests))

    def _internal_path(self) -> Path:
        return self._path / ".internal"

    def is_internal(self) -> bool:
        return self._internal_path().is_file()

    def set_internal(self, internal: bool):
        path = self._internal_path()
        if internal:
            path.touch()
        elif path.is_file():
            path.unlink()

    def _dirty_path(self) -> Path:
        return self._path / ".dirty"

    def is_dirty(self) -> bool:
        return self._dirty_path().is_file()

    def set_dirty(self, dirty: bool):
        path = self._dirty_path()
        if dirty:
            path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
            path.touch()
        elif path.is_file():
            path.unlink()
            try:
                os.removedirs(path.parent)
            except OSError:
                pass

A dlrepo/fs/job.py => dlrepo/fs/job.py +162 -0
@@ 0,0 1,162 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import errno
import json
import logging
import os
from typing import Dict, Iterator

from .fmt import ArtifactFormat
from .product import Version
from .util import SubDir


LOG = logging.getLogger(__name__)


# --------------------------------------------------------------------------------------
class Job(SubDir):
    """
    TODO
    """

    def get_formats(self) -> Iterator[ArtifactFormat]:
        yield from ArtifactFormat.all(self)

    def get_format(self, name: str) -> ArtifactFormat:
        return ArtifactFormat(self, name)

    def archive_name(self) -> str:
        data = self.get_metadata()
        if {"product", "product_variant", "version"} <= set(data):
            return f"{data['product']}-{data['product_variant']}-v{data['version']}"
        return f"{self.name}-{self.parent.name}"

    def _metadata_path(self):
        return self._path / ".metadata"

    def _product_link_path(self):
        return self._path / ".product"

    @staticmethod
    def update_symlink(dst, link):
        if link.exists():
            if link.is_symlink():
                link.unlink()
            else:
                raise OSError(errno.ENOTEMPTY, f"{link} exists and is not a symlink")
        link.symlink_to(os.path.relpath(dst, link.parent))

    def _link_to_product(self, version: Version):
        """
        Link the job to its product, according to the job metadata.
        Create the product fs if needed.
        The links are created in both ways:

        - From the job to its product version:
            $ROOT/branches/<branch>/<tag>/<job>/.product ->
                $ROOT/products/<product>/<variant>/<branch>/<version>

        - From the product version to the job(s), for each format:
            $ROOT/products/<product>/<variant>/<branch>/<version>/<format> ->
                $ROOT/branches/<branch>/<tag>/<job>/<format>

        The formats in a product version are not necessarily linked to the same job.
        """
        version.create()
        self.update_symlink(version.path(), self._product_link_path())
        for fmt in self.get_formats():
            self.update_symlink(fmt.path(), version.path() / fmt.name)
        container = self.path() / "container"
        if container.is_dir():
            self.update_symlink(container, version.path() / "container")

    def _cleanup_product_tree(self):
        link = self._product_link_path()
        if not link.is_symlink():
            return
        if not link.is_dir():
            link.unlink()
            return
        product = link.resolve()
        for d in product.iterdir():
            if not d.is_symlink():
                continue
            if not d.is_dir():
                continue
            try:
                if d.resolve().samefile(self.path() / d.name):
                    d.unlink()
            except FileNotFoundError:
                # same product, different jobs (e.g. doc + binaries)
                pass
        try:
            if os.listdir(product) == [".stamp"]:
                (product / ".stamp").unlink()
            # cleanup empty dirs
            os.removedirs(product)
        except OSError:
            # directory not empty, abort
            pass
        link.unlink()

    def get_metadata(self) -> Dict:
        try:
            data = json.loads(self._metadata_path().read_text())
        except (OSError, ValueError):
            data = {}
        data["locked"] = self.is_locked()
        data["name"] = self.name
        return data

    def _lock_path(self):
        return self.path() / ".locked"

    def is_locked(self) -> bool:
        return self._lock_path().is_file()

    def set_locked(self, locked: bool):
        if not self.exists():
            raise FileNotFoundError()
        path = self._lock_path()
        if locked:
            path.touch()
        elif path.is_file():
            path.unlink()

    def add_metadata(self, new_data: Dict):
        if self.is_locked():
            raise FileExistsError("Job is locked")
        self._cleanup_product_tree()
        metadata_path = self._metadata_path()
        try:
            data = json.loads(metadata_path.read_text())
        except (OSError, ValueError):
            data = {}
        for k, v in new_data.items():
            if isinstance(v, str):
                v = v.lower()
            if v == "":
                data.pop(k, None)
            else:
                data[k] = v
        self.create()
        metadata_path.write_text(json.dumps(data))
        if {"product", "version", "product_branch", "product_variant"} <= set(data):
            self._link_to_product(
                self.root()
                .get_product(str(data["product"]))
                .get_variant(str(data["product_variant"]))
                .get_branch(str(data["product_branch"]))
                .get_version(str(data["version"]))
            )

    def delete(self, *, cleanup_orphans: bool = True):
        if not self.exists():
            raise FileNotFoundError()
        self._cleanup_product_tree()
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()

A dlrepo/fs/product.py => dlrepo/fs/product.py +125 -0
@@ 0,0 1,125 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import logging
from pathlib import Path
from typing import Callable, Iterator

from .fmt import ArtifactFormat
from .util import SubDir


LOG = logging.getLogger(__name__)


# --------------------------------------------------------------------------------------
class Product(SubDir):
    """
    TODO
    """

    ROOT_DIR = "products"

    @classmethod
    def parent_path(cls, parent: "ArtifactRepository") -> Path:
        return parent.path() / cls.ROOT_DIR

    def url_bit(self) -> str:
        return f"products/{self.name}"

    def get_variants(self) -> Iterator["Variant"]:
        yield from Variant.all(self)

    def get_variant(self, name: str) -> "Variant":
        return Variant(self, name)


# --------------------------------------------------------------------------------------
class Variant(SubDir):
    """
    TODO
    """

    def get_branches(self) -> Iterator["ProductBranch"]:
        yield from ProductBranch.all(self)

    def get_branch(self, name: str) -> "ProductBranch":
        return ProductBranch(self, name)


# --------------------------------------------------------------------------------------
class ProductBranch(SubDir):
    """
    TODO
    """

    def get_versions(
        self, access_cb: Callable[[str], bool] = None
    ) -> Iterator["Version"]:
        for v in Version.all(self):
            if access_cb is not None and not access_cb(v.url()):
                continue
            yield v

    def get_version(
        self, name: str, access_cb: Callable[[str], bool] = None
    ) -> "Version":
        if name in ("latest", "stable"):
            versions = list(self.get_versions(access_cb))
            versions.sort(key=Version.creation_date, reverse=True)
            for v in versions:
                if name == "latest":
                    return v
                if v.is_released():
                    return v
            raise FileNotFoundError(name)
        return Version(self, name)


# --------------------------------------------------------------------------------------
class Version(SubDir):
    """
    TODO
    """

    def create(self):
        super().create()
        stamp = self.path() / ".stamp"
        if not stamp.exists():
            stamp.touch()

    def archive_name(self) -> str:
        variant = self.parent.parent
        product = variant.parent
        return f"{product.name}-{variant.name}-v{self.name}"

    @classmethod
    def creation_date(cls, v):
        stamp = v.path() / ".stamp"
        if stamp.is_file():
            return stamp.stat().st_ctime
        return 0

    def timestamp(self) -> int:
        return Version.creation_date(self)

    def is_released(self) -> bool:
        for fmt in self.get_formats():
            released_path = fmt.path().resolve().parent.parent / ".released"
            if released_path.is_file():
                return True
        return False

    def is_locked(self) -> bool:
        for fmt in self.get_formats():
            released_path = fmt.path().resolve().parent.parent / ".locked"
            if released_path.is_file():
                return True
        return False

    def get_formats(self) -> Iterator[ArtifactFormat]:
        yield from ArtifactFormat.all(self)

    def get_format(self, name: str) -> ArtifactFormat:
        return ArtifactFormat(self, name)

A dlrepo/fs/tag.py => dlrepo/fs/tag.py +190 -0
@@ 0,0 1,190 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import logging
import os
from pathlib import Path
import socket
from typing import Iterator, Optional

import aiohttp

from .fmt import ArtifactFormat
from .job import Job
from .util import SubDir


LOG = logging.getLogger(__name__)


# --------------------------------------------------------------------------------------
class Tag(SubDir):
    """
    TODO
    """

    def create(self):
        super().create()
        stamp = self._path / ".stamp"
        if not stamp.exists():
            stamp.touch()

    @classmethod
    def creation_date(cls, t):
        stamp = t.path() / ".stamp"
        if stamp.is_file():
            return stamp.stat().st_ctime
        return 0

    def timestamp(self) -> int:
        return Tag.creation_date(self)

    def get_jobs(self) -> Iterator[Job]:
        yield from Job.all(self)

    def get_job(self, name: str) -> Job:
        return Job(self, name)

    def _publish_status_path(self) -> Path:
        return self._path / ".publish-status"

    def publish_status(self) -> Optional[str]:
        try:
            return self._publish_status_path().read_text().strip()
        except FileNotFoundError:
            return None

    def _released_path(self) -> Path:
        return self._path / ".released"

    def is_released(self) -> bool:
        return self._released_path().is_file()

    def set_released(self, released: bool):
        if not self._path.is_dir():
            raise FileNotFoundError()
        loop = asyncio.get_running_loop()
        task = loop.create_task(self.do_release(released))
        task.add_done_callback(self.done_cb)

    def done_cb(self, task):
        if task.cancelled():
            return
        exc = task.exception()
        if exc:
            LOG.error("while changing released flag on tag %s", self.name, exc_info=exc)
            self._publish_status_path().write_text(f"error: {exc}\n")

    PUBLISH_URL = os.getenv("DLREPO_PUBLISH_URL")
    PUBLISH_AUTH = os.getenv("DLREPO_PUBLISH_AUTH")
    USER_AGENT = f"dlrepo-server/{socket.gethostname()}"

    def _publish_session(self) -> aiohttp.ClientSession:
        with open(self.PUBLISH_AUTH, "r", encoding="utf-8") as f:
            buf = f.read().strip()
        if ":" not in buf:
            raise ValueError("invalid DLREPO_PUBLISH_AUTH file")
        login, password = buf.split(":", 1)
        auth = aiohttp.BasicAuth(login, password, "utf-8")
        return aiohttp.ClientSession(
            self.PUBLISH_URL,
            auth=auth,
            raise_for_status=True,
            headers={"User-Agent": self.USER_AGENT},
        )

    async def do_release(self, released: bool):
        if self.PUBLISH_URL and self.PUBLISH_AUTH:
            self._publish_status_path().write_text("in progress\n")
            async with self._publish_session() as sess:
                if released:
                    LOG.info(
                        "publishing tag %s/%s to %s",
                        self.parent.name,
                        self.name,
                        self.PUBLISH_URL,
                    )
                    await self._publish(sess)
                else:
                    LOG.info(
                        "deleting tag %s/%s from %s",
                        self.parent.name,
                        self.name,
                        self.PUBLISH_URL,
                    )
                    await sess.delete(self.url(), params={"force": "true"})
        path = self._released_path()
        if released:
            path.touch()
        elif path.is_file():
            path.unlink()

    async def _publish(self, sess: aiohttp.ClientSession):
        for job in self.get_jobs():
            self._publish_status_path().write_text(f"uploading {job.name}\n")
            for fmt in job.get_formats():
                await self._publish_fmt(fmt, sess)
            metadata = job.get_metadata()
            del metadata["name"]
            del metadata["locked"]
            job_url = job.url()
            LOG.debug("publishing job metadata %s", job_url)
            await sess.patch(job_url, json={"job": metadata})
        await sess.put(self.url(), json={"tag": {"released": True}})
        self._publish_status_path().write_text(f"published to {self.PUBLISH_URL}\n")

    async def _publish_fmt(self, fmt: ArtifactFormat, sess: aiohttp.ClientSession):
        fmt_url = fmt.url()

        for file, digest in fmt.get_digests().items():
            file_url = fmt_url + file
            headers = {"Digest": digest}

            resp = await sess.head(file_url, headers=headers, raise_for_status=False)
            if resp.status == 200:
                LOG.debug("publishing file %s (deduplicated)", file_url)
                # file digest already present on the server, do not upload
                # the data again
                headers["X-Dlrepo-Link"] = digest
                await sess.put(file_url, data=None, headers=headers)

            else:
                LOG.debug("publishing file %s", file_url)
                # file digest not on server, proceed with upload
                with open(fmt.path() / file, "rb") as f:
                    await sess.put(file_url, data=f, headers=headers)

        if fmt.is_internal():
            LOG.debug("publishing internal format %s", fmt_url)
            await sess.put(fmt_url, json={"artifact_format": {"internal": True}})

        # clear the dirty flag
        await sess.patch(fmt_url)

    def _locked_path(self) -> Path:
        return self._path / ".locked"

    def is_locked(self) -> bool:
        return self._locked_path().is_file()

    def set_locked(self, locked: bool):
        path = self._locked_path()
        if locked:
            path.touch()
        elif path.is_file():
            path.unlink()

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
        if not self.exists():
            raise FileNotFoundError()
        if self.is_locked():
            raise OSError(f"Tag {self.name} is locked")
        if not force and self.is_released():
            raise OSError(f"Tag {self.name} is released, use force")
        for j in self.get_jobs():
            j.delete(cleanup_orphans=False)
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()

A dlrepo/fs/util.py => dlrepo/fs/util.py +104 -0
@@ 0,0 1,104 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

import hashlib
from pathlib import Path
import re
from typing import Iterator, Tuple, Union


# --------------------------------------------------------------------------------------
class SubDir:
    def __init__(self, parent: Union["AbstractRepository", "SubDir"], name: str):
        self.parent = parent
        self.name = name.lower()
        self._path = self._resolve_path()

    def root(self) -> "AbstractRepository":
        r = self
        while r.parent is not None:
            r = r.parent
        return r

    def url_bit(self) -> str:
        return self.name

    def url(self) -> str:
        bits = [self.url_bit()]
        r = self
        while r.parent is not None:
            r = r.parent
            if r.url_bit() is not None:
                bits.insert(0, r.url_bit())
        return f"/{'/'.join(bits)}/"

    @classmethod
    def all(cls, parent) -> Iterator["SubDir"]:
        try:
            dirs = list(cls.parent_path(parent).iterdir())
            dirs.sort(key=lambda d: d.name)
            for d in dirs:
                if d.name.startswith(".") or not d.is_dir():
                    continue
                yield cls(parent, d.name)
        except FileNotFoundError:
            pass

    @classmethod
    def parent_path(cls, parent: "SubDir") -> Path:
        return parent.path()

    def _resolve_path(self) -> Path:
        path = self.parent_path(self.parent) / self.name
        if self.name.startswith(".") or "/" in self.name:
            raise FileNotFoundError(path)
        return path

    def exists(self) -> bool:
        return self.path().is_dir()

    def create(self):
        if self.parent is not None:
            self.parent.create()
        self.path().mkdir(mode=0o755, parents=True, exist_ok=True)

    def path(self) -> Path:
        return self._path


# --------------------------------------------------------------------------------------
HASH_ALGOS = "|".join(hashlib.algorithms_guaranteed)
ALGO_RE = re.compile(
    rf"""
    ^
    (?P<algo>{HASH_ALGOS})
    :
    (?P<digest>[A-Fa-f0-9]+)
    $
    """,
    re.VERBOSE,
)


def parse_digest(digest: str) -> Tuple[str, str]:
    match = ALGO_RE.match(digest)
    if not match:
        raise ValueError(f"invalid digest: {digest}")
    return match.groups()


# --------------------------------------------------------------------------------------
def human_readable(value):
    if value == 0:
        return "0"
    units = ("K", "M", "G", "T")
    i = 0
    unit = ""
    while value >= 1000 and i < len(units):
        unit = units[i]
        value /= 1000
        i += 1
    if value < 100:
        return f"{value:.1f}{unit}"
    return f"{value:.0f}{unit}"