~mdkcore/jackplug

aab8fefd640e22af0f971d7973de27642a255aca — Rodrigo Oliveira 4 years ago a70e6ba + 569f8b8
Merged in remove_simb_pilsner (pull request #2)

Remove simb.pilsner dependency

Approved-by: Gustavo Heinz
M .gitignore => .gitignore +3 -0
@@ 4,4 4,7 @@
JackPlug.egg-info/
build/
dist/

.python-version
WORK-*
__pycache__

A .pre-commit-config.yaml => .pre-commit-config.yaml +6 -0
@@ 0,0 1,6 @@
repos:
  - repo: https://github.com/psf/black
    rev: 20.8b1
    hooks:
      - id: black
        language_version: python3

M README.md => README.md +3 -3
@@ 10,7 10,7 @@ port number).
  well on latest pyzmq packages.

### Development requirements:
* python 2.7 (or 3.8)
* python 3.8
* pyenv 1.2.20 (optional)

## Installation instructions for development:


@@ 24,8 24,8 @@ port number).

- Create a new python virtualenv (optional):

        $ pyenv install 2.7.18
        $ pyenv virtualenv 2.7.18 jackplug
        $ pyenv install 3.8.5
        $ pyenv virtualenv 3.8.5 jackplug
        $ pyenv activate jackplug

- Install python requirements:

M examples/jack.py => examples/jack.py +5 -10
@@ 1,6 1,3 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.



@@ 38,13 35,11 @@ class JackTest(JackBase):
        print("Recv: %s" % message)

    def send(self):
        message = {"event": "message",
                   "data": "ABC"
                   }
        message = {"event": "message", "data": "ABC"}
        JackBase.send(self, message)


if __name__ == '__main__':
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()


@@ 52,10 47,10 @@ if __name__ == '__main__':
    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
    if sys.argv[1] == "ipc":
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
    elif sys.argv[1] == "tcp":
        print("Using TCP transport")
        use_tcp = True
    else:


@@ 71,5 66,5 @@ if __name__ == '__main__':
    elif use_tcp:
        endpoint = TCPEndpoint(address="127.0.0.1", port="1234")

    jack = JackTest(service=ident, endpoint=endpoint)
    jack = JackTest(service=ident.encode(), endpoint=endpoint)
    print("Exiting Jack...")

M examples/plug.py => examples/plug.py +3 -6
@@ 1,6 1,3 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.



@@ 31,7 28,7 @@ class PlugTest(PlugBase):
        self.send(service, message)


if __name__ == '__main__':
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()


@@ 39,10 36,10 @@ if __name__ == '__main__':
    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
    if sys.argv[1] == "ipc":
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
    elif sys.argv[1] == "tcp":
        print("Using TCP transport")
        use_tcp = True
    else:

M jackplug/jack.py => jackplug/jack.py +21 -14
@@ 1,6 1,3 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.



@@ 10,9 7,9 @@ We are calling ZMQ's 'identity' as 'service', to ease the understanding of
services.
"""

import logging
import signal
import uuid

import zmq
from zmq.utils import jsonapi
from zmq.eventloop import ioloop, zmqstream


@@ 20,15 17,14 @@ from zmq.eventloop import ioloop, zmqstream
from .utils import Configuration
from .utils import IPCEndpoint

from simb.pilsner import log as logging

log = logging.getLogger('Service')
log = logging.getLogger("JackPlug")


class JackBase(object):
    """Base Jack class

    This handles low level communication (plus heartbeat), server side"""

    _timeout = None

    def __init__(self, service, endpoint=IPCEndpoint()):


@@ 66,7 62,8 @@ class JackBase(object):
        self._liveness = self._conf.ping_max_liveness

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 82,9 79,16 @@ class JackBase(object):
        self.socket_stream.close()
        self.socket.close()

    @staticmethod
    def set_logger(logger):
        global log

        log.propagate = False
        log = logger

    def heartbeat(self):
        """Send a ping message to the other endpoint"""
        JackBase.send(self, {"event": "ping", "data": {'id': self._identity}})
        JackBase.send(self, {"event": "ping", "data": {"id": self._identity}})

    def _recv(self, message):
        self.recv(jsonapi.loads(message[0]))


@@ 108,13 112,14 @@ class JackBase(object):
        try:
            self.socket.send_json(message)
        except zmq.Again as e:
            if message.get('event', None) == 'ping':
            if message.get("event", None) == "ping":
                self._liveness = self._liveness - 1

                max_str = ""
                if self._liveness + 1 == self._conf.ping_max_liveness:
                    max_str = " (MAX) | interval: %sms" %\
                            self._conf.ping_interval
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )

                if self._liveness >= 0:
                    log.info("Jack: liveness: %s%s", self._liveness, max_str)


@@ 134,8 139,10 @@ class JackBase(object):
        """Initialize all service loopers"""
        loop = ioloop.IOLoop.instance()

        signal.signal(signal.SIGINT, lambda sig, frame:
                      loop.add_callback_from_signal(self.close))
        signal.signal(
            signal.SIGINT,
            lambda sig, frame: loop.add_callback_from_signal(self.close),
        )

        try:
            loop.start()

M jackplug/plug.py => jackplug/plug.py +56 -36
@@ 1,6 1,3 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.



@@ 10,9 7,10 @@ We are calling ZMQ's 'identity' as 'service', to ease the understanding of the
services.
"""

import time
import logging
import signal
import threading
import time

import zmq
from zmq.utils import jsonapi


@@ 21,15 19,15 @@ from zmq.eventloop import ioloop, zmqstream
from .utils import Configuration
from .utils import IPCEndpoint

from simb.pilsner import log as logging

log = logging.getLogger('Service')
log = logging.getLogger("JackPlug")


class PlugBase(object):
    """Base Plug class

    This handles low level communication (plus hearbeat), microservice side"""

    _services_ping = dict()
    _timeout_callback = None
    _connection_callback = None


@@ 51,7 49,8 @@ class PlugBase(object):

        self._conf = Configuration.instance()
        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 66,40 65,53 @@ class PlugBase(object):
        except Exception as e:
            log.error("An error occurred while closing socket: %s", e)

    @staticmethod
    def set_logger(logger):
        global log

        log.propagate = False
        log = logger

    def heartbeat(self):
        """Check if known jacks are alive (pinging us)"""
        services = list(self._services_ping.keys())
        for service in services:
            last_ping = self._services_ping[service]['last_ping']
            liveness = self._services_ping[service]['liveness']
            last_ping = self._services_ping[service]["last_ping"]
            liveness = self._services_ping[service]["liveness"]

            now = self._now()
            if now - last_ping > self._conf.ping_interval:
                liveness = liveness - 1
                max_str = ""
                if liveness + 1 == self._conf.ping_max_liveness:
                    max_str = " (MAX) | interval: %sms" %\
                            self._conf.ping_interval
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )

                log.debug("Plug: Service '%s' liveness: %s%s", service,
                          liveness, max_str)
                log.debug(
                    "Plug: Service '%s' liveness: %s%s",
                    service,
                    liveness,
                    max_str,
                )

                if liveness == 0:
                    if self._services_ping[service]['alive']:
                        log.debug("Plug: Service '%s' seems unavailable now",
                                  service)
                    if self._services_ping[service]["alive"]:
                        log.debug(
                            "Plug: Service '%s' seems unavailable now", service
                        )

                        self._services_ping[service]['last_ping'] = now
                        self._services_ping[service]['liveness'] = liveness
                        self._services_ping[service]['alive'] = False
                        self._services_ping[service]["last_ping"] = now
                        self._services_ping[service]["liveness"] = liveness
                        self._services_ping[service]["alive"] = False

                        if self._timeout_callback:
                            self._timeout_callback(service.decode())
                elif liveness < 0:
                    del self._services_ping[service]
                else:
                    self._services_ping[service]['last_ping'] = now
                    self._services_ping[service]['liveness'] = liveness
                    self._services_ping[service]["last_ping"] = now
                    self._services_ping[service]["liveness"] = liveness

    def _recv(self, message):
        """Receive a message from any jack


@@ 114,28 126,32 @@ class PlugBase(object):
        # setup heartbeat settings for this service (or update it)
        # any message is treated as ping
        service_ping = {
            'last_ping': self._now(),
            'liveness': self._conf.ping_max_liveness,
            "last_ping": self._now(),
            "liveness": self._conf.ping_max_liveness,
        }

        if service in self._services_ping:
            self._services_ping[service].update(service_ping)
        else:
            service_ping['alive'] = False
            service_ping['id'] = -1
            service_ping["alive"] = False
            service_ping["id"] = -1
            self._services_ping[service] = service_ping

        # do not propagate ping messages
        if message.get('event', None) == 'ping':
            identity = message.get('data')['id']
        if message.get("event", None) == "ping":
            identity = message.get("data")["id"]

            if 'alive' in self._services_ping[service] and\
               not self._services_ping[service]['alive']:
                self._services_ping[service]['alive'] = True
            if (
                "alive" in self._services_ping[service]
                and not self._services_ping[service]["alive"]
            ):
                self._services_ping[service]["alive"] = True

            if 'id' in self._services_ping[service] and\
               self._services_ping[service]['id'] != identity:
                self._services_ping[service]['id'] = identity
            if (
                "id" in self._services_ping[service]
                and self._services_ping[service]["id"] != identity
            ):
                self._services_ping[service]["id"] = identity

                if self._connection_callback:
                    self._connection_callback(service.decode(), True)


@@ 171,8 187,10 @@ class PlugBase(object):

        # handle signal if, and only, if we are running on the main thread
        if isinstance(threading.current_thread(), threading._MainThread):
            signal.signal(signal.SIGINT, lambda sig, frame:
                          loop.add_callback_from_signal(self.close))
            signal.signal(
                signal.SIGINT,
                lambda sig, frame: loop.add_callback_from_signal(self.close),
            )

        try:
            loop.start()


@@ 190,7 208,8 @@ class PlugBase(object):
        self.socket_stream.on_recv(self._recv)

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 220,6 239,7 @@ class PlugBase(object):

class Plug(PlugBase):
    """Plug class, ready for use python object"""

    _listener = None

    def __init__(self, listener=None, **kwargs):

M jackplug/utils.py => jackplug/utils.py +5 -2
@@ 4,6 4,7 @@

class Configuration(object):
    """Configuration class for Jack and Plug"""

    _instance = None

    DEFAULT_IPC_PATHNAME = "/tmp/jack.plug"


@@ 24,11 25,12 @@ class Configuration(object):
        return self._instance


class IPCEndpoint():
class IPCEndpoint:
    """IPCEndpoint

    A IPC transport wrapper
    """

    pathname = ""
    endpoint = ""



@@ 43,11 45,12 @@ class IPCEndpoint():
        self.endpoint = "ipc://%s" % self.pathname


class TCPEndpoint():
class TCPEndpoint:
    """TCPEndpoint

    A TCP transport wrapper
    """

    address = ""
    port = ""
    endpoint = ""

A pyproject.toml => pyproject.toml +3 -0
@@ 0,0 1,3 @@
[tool.black]
line-length = 79
target-version = ['py38']

A requirements.development.txt => requirements.development.txt +5 -0
@@ 0,0 1,5 @@
-r requirements.txt

pylint==2.6.0
black==20.8b1
pre-commit==2.7.1

M setup.py => setup.py +6 -10
@@ 2,15 2,11 @@ from setuptools import setup
from setuptools import find_packages

setup(
    name='JackPlug',
    version='0.1',
    author='Rodrigo Oliveira',
    author_email='rodrigo@byne.com.br',
    name="JackPlug",
    version="0.1",
    author="Rodrigo Oliveira",
    author_email="rodrigo@byne.com.br",
    packages=find_packages(),
    install_requires=[
        'simb.pilsner',
        'pyzmq==19.0.2',
        'tornado==4.5.3'
    ],
    long_description=open('README.md').read(),
    install_requires=["pyzmq==19.0.2", "tornado==4.5.3"],
    long_description=open("README.md").read(),
)