~mdkcore/jackplug

eecc5cd0ca89bb50d8320e4ed0d6f9ebdda75785 — Rodrigo Oliveira 3 years ago 80c6456
Use asyncio instead of ioloop (tornado) as event loop
2 files changed, 73 insertions(+), 117 deletions(-)

M jackplug/jack.py
M jackplug/plug.py
M jackplug/jack.py => jackplug/jack.py +42 -54
@@ 7,25 7,27 @@ We are calling ZMQ's 'identity' as 'service', to ease the understanding of
services.
"""

import asyncio
import logging
import signal
import uuid

import zmq
import zmq.asyncio
from zmq.utils import jsonapi
from zmq.eventloop import ioloop, zmqstream

from .utils import Configuration
from .utils import IPCEndpoint
from .utils import PeriodicCall

log = logging.getLogger("JackPlug")


class JackBase(object):
class JackBase:
    """Base Jack class

    This handles low level communication (plus heartbeat), server side"""

    _timeout = None
    _timeout_callback = None

    def __init__(self, service, endpoint=IPCEndpoint()):
        """Class contructor


@@ 34,38 36,40 @@ class JackBase(object):
        :param endpoint: IPCEndpoint(pathname) or TCPEndpoint(address, port) to
        connect (default pathname: /tmp/jack.plug, default port: 3559)
        """
        self.context = zmq.Context.instance()
        self.socket = None
        self._service = service
        self._endpoint = endpoint

        self.socket = self.context.socket(zmq.DEALER)
        self._identity = str(uuid.uuid4())
        self.socket.identity = service

        self._conf = Configuration.instance()
        self._liveness = self._conf.ping_max_liveness

        self._heartbeat_loop = PeriodicCall(
            self._conf.ping_interval, self.heartbeat
        )

    async def _recv(self):
        context = zmq.asyncio.Context.instance()
        self.socket = context.socket(zmq.DEALER)

        self.socket.identity = self._service

        # use with flags=zmq.DONTWAIT on send; also, any new message sent after
        # reaching HWM will be discarded (dealer)
        self.socket.setsockopt(zmq.SNDHWM, 3)

        # use with close to discard all messages
        # self.socket.setsockopt(zmq.LINGER, 0)

        # return immediately if message cannot be sent
        self.socket.setsockopt(zmq.SNDTIMEO, 0)

        # do not queue message if connection not completed (zmq level)
        self.socket.setsockopt(zmq.IMMEDIATE, 1)

        self.socket.connect(endpoint.endpoint)

        self.socket_stream = zmqstream.ZMQStream(self.socket)
        self.socket_stream.on_recv(self._recv)

        self._conf = Configuration.instance()
        self._liveness = self._conf.ping_max_liveness

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval
        )
        self.socket.connect(self._endpoint.endpoint)

        self._heartbeat_loop.start()
        while True:
            message = await self.socket.recv_multipart()
            await self.recv(jsonapi.loads(message[0]))

    def close(self):
        """Do close Jack with its socket and loopers


@@ 74,9 78,6 @@ class JackBase(object):
        this base function to proper cleanup.
        """
        self._heartbeat_loop.stop()

        ioloop.IOLoop.instance().stop()
        self.socket_stream.close()
        self.socket.close()

    @staticmethod


@@ 86,14 87,14 @@ class JackBase(object):
        log.propagate = False
        log = logger

    def heartbeat(self):
    async def heartbeat(self):
        """Send a ping message to the other endpoint"""
        JackBase.send(self, {"event": "ping", "data": {"id": self._identity}})

    def _recv(self, message):
        self.recv(jsonapi.loads(message[0]))
        await JackBase.send(
            self,
            {"event": "ping", "data": {"id": self._identity}}
        )

    def recv(self, message):
    async def recv(self, message):
        """Receive a message

        Should be reimplemented on the derived class.


@@ 102,7 103,7 @@ class JackBase(object):
        """
        raise NotImplementedError

    def send(self, message):
    async def send(self, message):
        """Send a message

        Tries to send a message to the other endpoint. Also, check service


@@ 110,24 111,22 @@ class JackBase(object):
        :param message: message to be sent
        """
        try:
            self.socket.send_json(message)
            await self.socket.send_json(message)
        except zmq.Again as e:
            if message.get("event", None) == "ping":
                self._liveness = self._liveness - 1

                max_str = ""
                if self._liveness + 1 == self._conf.ping_max_liveness:
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )
                    max_str = f" (MAX) | interval: {self._conf.ping_interval}ms"

                if self._liveness >= 0:
                    log.info("Jack: liveness: %s%s", self._liveness, max_str)

                if self._liveness == 0:
                    log.error("Jack: Plug seems unavailable now")
                    if self._timeout:
                        self._timeout()
                    if self._timeout_callback:
                        await self._timeout_callback()
            else:
                log.error("Jack: Could not send message: %s", e)
        except Exception as e:


@@ 135,29 134,18 @@ class JackBase(object):
        else:
            self._liveness = self._conf.ping_max_liveness

    def start(self):
    async def start(self):
        """Initialize all service loopers"""
        loop = ioloop.IOLoop.instance()

        signal.signal(
            signal.SIGINT,
            lambda sig, frame: loop.add_callback_from_signal(self.close),
        await asyncio.gather(
            self._heartbeat_loop.start(),
            self._recv()
        )

        try:
            loop.start()
        except RuntimeError as e:
            log.debug("Jack: %s", e)
        except zmq.ZMQError as e:
            if e.errno != zmq.ENOTSOCK:
                log.error("Error starting IOLoop: %s", e)

    def on_timeout(self, timeout_callback=None):
        """Register a callback to be called when a timeout occurs


        :param timeout_callback: function to be set as callback. If 'None', it
        will unregister the callback.
        """

        self._timeout = timeout_callback
        self._timeout_callback = timeout_callback

M jackplug/plug.py => jackplug/plug.py +31 -63
@@ 7,23 7,23 @@ We are calling ZMQ's 'identity' as 'service', to ease the understanding of the
services.
"""

import asyncio
import logging
import signal
import threading
import time

import zmq
import zmq.asyncio
from zmq.utils import jsonapi
from zmq.eventloop import ioloop, zmqstream

from .utils import Configuration
from .utils import IPCEndpoint
from .utils import PeriodicCall


log = logging.getLogger("JackPlug")


class PlugBase(object):
class PlugBase:
    """Base Plug class

    This handles low level communication (plus hearbeat), microservice side"""


@@ 37,30 37,19 @@ class PlugBase(object):

        :param pathname: IPC pathname to be used (default: /tmp/jack.plug)
        """
        self.context = zmq.Context.instance()

        self.socket = self.context.socket(zmq.ROUTER)

        self.socket.bind(endpoint.endpoint)

        # XXX check zmq.asyncio.Socket with recv_multipart
        self.socket_stream = zmqstream.ZMQStream(self.socket)
        self.socket_stream.on_recv(self._recv)
        self.socket = None
        self._endpoint = endpoint

        self._conf = Configuration.instance()
        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()
        self._heartbeat_loop = PeriodicCall(
            self._conf.ping_interval, self.heartbeat
        )

    def close(self):
        """Do close Plug with its socket and loopers"""
        ioloop.IOLoop.instance().stop()

        try:
            self.socket.setsockopt(zmq.LINGER, 0)
            self.socket_stream.close()
            self.socket.close()
        except Exception as e:
            log.error("An error occurred while closing socket: %s", e)


@@ 72,7 61,7 @@ class PlugBase(object):
        log.propagate = False
        log = logger

    def heartbeat(self):
    async def heartbeat(self):
        """Check if known jacks are alive (pinging us)"""
        services = list(self._services_ping.keys())
        for service in services:


@@ 84,9 73,7 @@ class PlugBase(object):
                liveness = liveness - 1
                max_str = ""
                if liveness + 1 == self._conf.ping_max_liveness:
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )
                    max_str = f" (MAX) | interval: {self._conf.ping_interval}ms"

                log.debug(
                    "Plug: Service '%s' liveness: %s%s",


@@ 106,14 93,23 @@ class PlugBase(object):
                        self._services_ping[service]["alive"] = False

                        if self._timeout_callback:
                            self._timeout_callback(service.decode())
                            await self._timeout_callback(service.decode())
                elif liveness < 0:
                    del self._services_ping[service]
                else:
                    self._services_ping[service]["last_ping"] = now
                    self._services_ping[service]["liveness"] = liveness

    def _recv(self, message):
    async def _recv(self):
        context = zmq.asyncio.Context.instance()
        self.socket = context.socket(zmq.ROUTER)
        self.socket.bind(self._endpoint.endpoint)

        while True:
            message = await self.socket.recv_multipart()
            await self._process_message(message)

    async def _process_message(self, message):
        """Receive a message from any jack

        Internally handles messages from jacks and prepare them to be consumed


@@ 154,13 150,13 @@ class PlugBase(object):
                self._services_ping[service]["id"] = identity

                if self._connection_callback:
                    self._connection_callback(service.decode(), True)
                    await self._connection_callback(service.decode(), True)

            return

        self.recv(service, message)
        await self.recv(service, message)

    def recv(self, service, message):
    async def recv(self, service, message):
        """Receive a message

        Should be reimplemented on the derived class.


@@ 169,7 165,7 @@ class PlugBase(object):
        """
        raise NotImplementedError

    def send(self, service, message):
    async def send(self, service, message):
        """Send a message

        Tries to send a message to a given service.


@@ 179,42 175,15 @@ class PlugBase(object):
        if self.socket.closed:
            return

        self.socket.send_multipart([service, jsonapi.dumps(message)])
        await self.socket.send_multipart([service, jsonapi.dumps(message)])

    def start(self):
    async def start(self):
        """Initialize all plug loopers"""
        loop = ioloop.IOLoop.instance()

        # handle signal if, and only, if we are running on the main thread
        if isinstance(threading.current_thread(), threading._MainThread):
            signal.signal(
                signal.SIGINT,
                lambda sig, frame: loop.add_callback_from_signal(self.close),
            )

        try:
            loop.start()
        except RuntimeError as e:
            log.debug("Plug: %s", e)
        except zmq.ZMQError as e:
            if e.errno != zmq.ENOTSOCK:
                log.error("Error starting IOLoop: %s", e)

    def restart(self):
        del self.socket_stream
        self._heartbeat_loop.stop()

        self.socket_stream = zmqstream.ZMQStream(self.socket)
        self.socket_stream.on_recv(self._recv)

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval
        await asyncio.gather(
            self._heartbeat_loop.start(),
            self._recv()
        )

        self._heartbeat_loop.start()

        self.start()

    # apocalypse
    def _now(self):
        """Helper function to get current time as milliseconds"""


@@ 251,7 220,6 @@ class Plug(PlugBase):
        super(Plug, self).__init__(**kwargs)

        self._listener = listener
        self.start()

    def recv(self, service, message):
        """PlugBase.recv reimplementation