~fnux/machines.sr.ht

8c0f380f11ca529e0dbf720558e900a9e81e079e — Timothée Floure a month ago 4618e03
Import & wire VMM from uncloud
3 files changed, 303 insertions(+), 0 deletions(-)

A run-worker.py
A worker/__init__.py
A worker/vmm.py
A run-worker.py => run-worker.py +19 -0
@@ 0,0 1,19 @@
#!/usr/bin/env python

from socket import gethostname
from worker.vmm import VMM

def main():
    print("Running sourcehut machines worker as {}.".format(gethostname()))

    # Initialize VMM.
    vmm = VMM()
    vms = vmm.discover()

    print("VMM initialized. Found {} running VMs.".format(len(vms)))

    # TODO: read srht configuration and link to master.


if __name__ == '__main__':
    main()

A worker/__init__.py => worker/__init__.py +0 -0

A worker/vmm.py => worker/vmm.py +284 -0
@@ 0,0 1,284 @@
import os
import subprocess as sp
import logging
import socket
import json
import tempfile
import time

from contextlib import suppress
from multiprocessing import Process
from os.path import join as join_path
from os.path import isdir

logger = logging.getLogger(__name__)


class VMQMPHandles:
    def __init__(self, path):
        self.path = path
        self.sock = socket.socket(socket.AF_UNIX)
        self.file = self.sock.makefile()

    def __enter__(self):
        self.sock.connect(self.path)

        # eat qmp greetings
        self.file.readline()

        # init qmp
        self.sock.sendall(b'{ "execute": "qmp_capabilities" }')
        self.file.readline()

        return self.sock, self.file

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.close()
        self.sock.close()

        if exc_type:
            logger.error(
                "Couldn't get handle for VM.", exc_type, exc_val, exc_tb
            )
            raise exc_type("Couldn't get handle for VM.") from exc_type


class TransferVM(Process):
    def __init__(self, src_uuid, dest_sock_path, host, socket_dir):
        self.src_uuid = src_uuid
        self.host = host
        self.src_sock_path = os.path.join(socket_dir, self.src_uuid)
        self.dest_sock_path = dest_sock_path

        super().__init__()

    def run(self):
     .   with suppress(FileNotFoundError):
            os.remove(self.src_sock_path)

        command = [
            "ssh",
            "-nNT",
            "-L",
            "{}:{}".format(self.src_sock_path, self.dest_sock_path),
            "root@{}".format(self.host),
        ]

        try:
            p = sp.Popen(command)
        except Exception as e:
            logger.error(
                "Couldn' forward unix socks over ssh.", exc_info=e
            )
        else:
            time.sleep(2)
            vmm = VMM()
            logger.debug("Executing: ssh forwarding command: %s", command)
            vmm.execute_command(
                self.src_uuid,
                command="migrate",
                arguments={"uri": "unix:{}".format(self.src_sock_path)},
            )

            while p.poll() is None:
                success, output = vmm.execute_command(self.src_uuid, command="query-migrate")
                if success:
                    status = output["return"]["status"]
                    logger.info('Migration Status: {}'.format(status))
                    if status == "completed":
                        vmm.stop(self.src_uuid)
                        return
                    elif status in ['failed', 'cancelled']:
                        return
                else:
                    logger.error("Couldn't be able to query VM {} that was in migration".format(self.src_uuid))
                    return

                time.sleep(2)


class VMM:
    # Virtual Machine Manager
    def __init__(
            self,
            qemu_path="/usr/bin/qemu-system-x86_64",
            vmm_backend=os.path.expanduser("~/uncloud/vmm/"),
    ):
        self.qemu_path = qemu_path
        self.vmm_backend = vmm_backend
        self.socket_dir = os.path.join(self.vmm_backend, "sock")

        if not os.path.isdir(self.vmm_backend):
            logger.info(
                "{} does not exists. Creating it...".format(
                    self.vmm_backend
                )
            )
            os.makedirs(self.vmm_backend, exist_ok=True)

        if not os.path.isdir(self.socket_dir):
            logger.info(
                "{} does not exists. Creating it...".format(
                    self.socket_dir
                )
            )
            os.makedirs(self.socket_dir, exist_ok=True)

    def is_running(self, uuid):
        sock_path = os.path.join(self.socket_dir, uuid)
        try:
            sock = socket.socket(socket.AF_UNIX)
            sock.connect(sock_path)
            recv = sock.recv(4096)
        except Exception as err:
            # unix sock doesn't exists or it is closed
            logger.debug(
                "VM {} sock either don' exists or it is closed. It mean VM is stopped.".format(
                    uuid
                ),
                exc_info=err,
            )
        else:
            # if we receive greetings from qmp it mean VM is running
            if len(recv) > 0:
                return True

        with suppress(FileNotFoundError):
            os.remove(sock_path)

        return False

    def start(self, *args, uuid, migration=False):
        # start --> sucess?
        migration_args = ()
        if migration:
            migration_args = (
                "-incoming",
                "unix:{}".format(os.path.join(self.socket_dir, uuid)),
            )

        if self.is_running(uuid):
            logger.warning("Cannot start VM. It is already running.")
        else:
            qmp_arg = (
                "-qmp",
                "unix:{},server,nowait".format(
                    join_path(self.socket_dir, uuid)
                ),
            )
            vnc_arg = (
                "-vnc",
                "unix:{}".format(tempfile.NamedTemporaryFile().name),
            )

            command = [
                "sudo",
                "-p",
                "Enter password to start VM {}: ".format(uuid),
                self.qemu_path,
                *args,
                *qmp_arg,
                *migration_args,
                *vnc_arg,
                "-daemonize",
            ]
            try:
                sp.check_output(command, stderr=sp.PIPE)
            except sp.CalledProcessError as err:
                logger.exception(
                    "Error occurred while starting VM.\nDetail %s",
                    err.stderr.decode("utf-8"),
                )
            else:
                sp.check_output(
                    ["sudo", "-p", "Enter password to correct permission for uncloud-vmm's directory",
                     "chmod", "-R", "o=rwx,g=rwx", self.vmm_backend]
                )

                # TODO: Find some good way to check whether the virtual machine is up and
                #       running without relying on non-guarenteed ways.
                for _ in range(10):
                    time.sleep(2)
                    status = self.get_status(uuid)
                    if status in ["running", "inmigrate"]:
                        return status
                logger.warning(
                    "Timeout on VM's status. Shutting down VM %s", uuid
                )
                self.stop(uuid)
                # TODO: What should we do more. VM can still continue to run in background.
                #       If we have pid of vm we can kill it using OS.

    def execute_command(self, uuid, command, **kwargs):
        # execute_command -> sucess?, output
        try:
            with VMQMPHandles(os.path.join(self.socket_dir, uuid)) as (
                sock_handle,
                file_handle,
            ):
                command_to_execute = {"execute": command, **kwargs}
                sock_handle.sendall(
                    json.dumps(command_to_execute).encode("utf-8")
                )
                output = file_handle.readline()
        except Exception:
            logger.exception(
                "Error occurred while executing command and getting valid output from qmp"
            )
        else:
            try:
                output = json.loads(output)
            except Exception:
                logger.exception(
                    "QMP Output isn't valid JSON. %s", output
                )
            else:
                return "return" in output, output
        return False, None

    def stop(self, uuid):
        success, output = self.execute_command(
            command="quit", uuid=uuid
        )
        return success

    def get_status(self, uuid):
        success, output = self.execute_command(
            command="query-status", uuid=uuid
        )
        if success:
            return output["return"]["status"]
        else:
            # TODO: Think about this for a little more
            return "STOPPED"

    def discover(self):
        vms = [
            uuid
            for uuid in os.listdir(self.socket_dir)
            if not isdir(join_path(self.socket_dir, uuid))
        ]
        return vms

    def get_vnc(self, uuid):
        success, output = self.execute_command(
            uuid, command="query-vnc"
        )
        if success:
            return output["return"]["service"]
        return None

    def transfer(self, src_uuid, destination_sock_path, host):
        p = TransferVM(
            src_uuid,
            destination_sock_path,
            socket_dir=self.socket_dir,
            host=host,
        )
        p.start()

    # TODO: the following method should clean things that went wrong
    #       e.g If VM migration fails or didn't start for long time
    #       i.e 15 minutes we should stop the waiting VM.
    def maintenace(self):
        pass