~mdkcore/jackplug

569f8b81d06d60f5999c33d3ecdc01e3a05c6979 — Rodrigo Oliveira 3 years ago 2b8b8a4
Blacken JackPlug codebase
6 files changed, 76 insertions(+), 57 deletions(-)

M examples/jack.py
M examples/plug.py
M jackplug/jack.py
M jackplug/plug.py
M jackplug/utils.py
M setup.py
M examples/jack.py => examples/jack.py +4 -6
@@ 35,13 35,11 @@ class JackTest(JackBase):
        print("Recv: %s" % message)

    def send(self):
        message = {"event": "message",
                   "data": "ABC"
                   }
        message = {"event": "message", "data": "ABC"}
        JackBase.send(self, message)


if __name__ == '__main__':
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()


@@ 49,10 47,10 @@ if __name__ == '__main__':
    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
    if sys.argv[1] == "ipc":
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
    elif sys.argv[1] == "tcp":
        print("Using TCP transport")
        use_tcp = True
    else:

M examples/plug.py => examples/plug.py +3 -3
@@ 28,7 28,7 @@ class PlugTest(PlugBase):
        self.send(service, message)


if __name__ == '__main__':
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()


@@ 36,10 36,10 @@ if __name__ == '__main__':
    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
    if sys.argv[1] == "ipc":
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
    elif sys.argv[1] == "tcp":
        print("Using TCP transport")
        use_tcp = True
    else:

M jackplug/jack.py => jackplug/jack.py +12 -7
@@ 24,6 24,7 @@ class JackBase(object):
    """Base Jack class

    This handles low level communication (plus heartbeat), server side"""

    _timeout = None

    def __init__(self, service, endpoint=IPCEndpoint()):


@@ 61,7 62,8 @@ class JackBase(object):
        self._liveness = self._conf.ping_max_liveness

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 86,7 88,7 @@ class JackBase(object):

    def heartbeat(self):
        """Send a ping message to the other endpoint"""
        JackBase.send(self, {"event": "ping", "data": {'id': self._identity}})
        JackBase.send(self, {"event": "ping", "data": {"id": self._identity}})

    def _recv(self, message):
        self.recv(jsonapi.loads(message[0]))


@@ 110,13 112,14 @@ class JackBase(object):
        try:
            self.socket.send_json(message)
        except zmq.Again as e:
            if message.get('event', None) == 'ping':
            if message.get("event", None) == "ping":
                self._liveness = self._liveness - 1

                max_str = ""
                if self._liveness + 1 == self._conf.ping_max_liveness:
                    max_str = " (MAX) | interval: %sms" %\
                            self._conf.ping_interval
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )

                if self._liveness >= 0:
                    log.info("Jack: liveness: %s%s", self._liveness, max_str)


@@ 136,8 139,10 @@ class JackBase(object):
        """Initialize all service loopers"""
        loop = ioloop.IOLoop.instance()

        signal.signal(signal.SIGINT, lambda sig, frame:
                      loop.add_callback_from_signal(self.close))
        signal.signal(
            signal.SIGINT,
            lambda sig, frame: loop.add_callback_from_signal(self.close),
        )

        try:
            loop.start()

M jackplug/plug.py => jackplug/plug.py +46 -30
@@ 27,6 27,7 @@ class PlugBase(object):
    """Base Plug class

    This handles low level communication (plus hearbeat), microservice side"""

    _services_ping = dict()
    _timeout_callback = None
    _connection_callback = None


@@ 48,7 49,8 @@ class PlugBase(object):

        self._conf = Configuration.instance()
        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 74,36 76,42 @@ class PlugBase(object):
        """Check if known jacks are alive (pinging us)"""
        services = list(self._services_ping.keys())
        for service in services:
            last_ping = self._services_ping[service]['last_ping']
            liveness = self._services_ping[service]['liveness']
            last_ping = self._services_ping[service]["last_ping"]
            liveness = self._services_ping[service]["liveness"]

            now = self._now()
            if now - last_ping > self._conf.ping_interval:
                liveness = liveness - 1
                max_str = ""
                if liveness + 1 == self._conf.ping_max_liveness:
                    max_str = " (MAX) | interval: %sms" %\
                            self._conf.ping_interval
                    max_str = (
                        " (MAX) | interval: %sms" % self._conf.ping_interval
                    )

                log.debug("Plug: Service '%s' liveness: %s%s", service,
                          liveness, max_str)
                log.debug(
                    "Plug: Service '%s' liveness: %s%s",
                    service,
                    liveness,
                    max_str,
                )

                if liveness == 0:
                    if self._services_ping[service]['alive']:
                        log.debug("Plug: Service '%s' seems unavailable now",
                                  service)
                    if self._services_ping[service]["alive"]:
                        log.debug(
                            "Plug: Service '%s' seems unavailable now", service
                        )

                        self._services_ping[service]['last_ping'] = now
                        self._services_ping[service]['liveness'] = liveness
                        self._services_ping[service]['alive'] = False
                        self._services_ping[service]["last_ping"] = now
                        self._services_ping[service]["liveness"] = liveness
                        self._services_ping[service]["alive"] = False

                        if self._timeout_callback:
                            self._timeout_callback(service.decode())
                elif liveness < 0:
                    del self._services_ping[service]
                else:
                    self._services_ping[service]['last_ping'] = now
                    self._services_ping[service]['liveness'] = liveness
                    self._services_ping[service]["last_ping"] = now
                    self._services_ping[service]["liveness"] = liveness

    def _recv(self, message):
        """Receive a message from any jack


@@ 118,28 126,32 @@ class PlugBase(object):
        # setup heartbeat settings for this service (or update it)
        # any message is treated as ping
        service_ping = {
            'last_ping': self._now(),
            'liveness': self._conf.ping_max_liveness,
            "last_ping": self._now(),
            "liveness": self._conf.ping_max_liveness,
        }

        if service in self._services_ping:
            self._services_ping[service].update(service_ping)
        else:
            service_ping['alive'] = False
            service_ping['id'] = -1
            service_ping["alive"] = False
            service_ping["id"] = -1
            self._services_ping[service] = service_ping

        # do not propagate ping messages
        if message.get('event', None) == 'ping':
            identity = message.get('data')['id']
        if message.get("event", None) == "ping":
            identity = message.get("data")["id"]

            if 'alive' in self._services_ping[service] and\
               not self._services_ping[service]['alive']:
                self._services_ping[service]['alive'] = True
            if (
                "alive" in self._services_ping[service]
                and not self._services_ping[service]["alive"]
            ):
                self._services_ping[service]["alive"] = True

            if 'id' in self._services_ping[service] and\
               self._services_ping[service]['id'] != identity:
                self._services_ping[service]['id'] = identity
            if (
                "id" in self._services_ping[service]
                and self._services_ping[service]["id"] != identity
            ):
                self._services_ping[service]["id"] = identity

                if self._connection_callback:
                    self._connection_callback(service.decode(), True)


@@ 175,8 187,10 @@ class PlugBase(object):

        # handle signal if, and only, if we are running on the main thread
        if isinstance(threading.current_thread(), threading._MainThread):
            signal.signal(signal.SIGINT, lambda sig, frame:
                          loop.add_callback_from_signal(self.close))
            signal.signal(
                signal.SIGINT,
                lambda sig, frame: loop.add_callback_from_signal(self.close),
            )

        try:
            loop.start()


@@ 194,7 208,8 @@ class PlugBase(object):
        self.socket_stream.on_recv(self._recv)

        self._heartbeat_loop = ioloop.PeriodicCallback(
            self.heartbeat, self._conf.ping_interval)
            self.heartbeat, self._conf.ping_interval
        )

        self._heartbeat_loop.start()



@@ 224,6 239,7 @@ class PlugBase(object):

class Plug(PlugBase):
    """Plug class, ready for use python object"""

    _listener = None

    def __init__(self, listener=None, **kwargs):

M jackplug/utils.py => jackplug/utils.py +5 -2
@@ 4,6 4,7 @@

class Configuration(object):
    """Configuration class for Jack and Plug"""

    _instance = None

    DEFAULT_IPC_PATHNAME = "/tmp/jack.plug"


@@ 24,11 25,12 @@ class Configuration(object):
        return self._instance


class IPCEndpoint():
class IPCEndpoint:
    """IPCEndpoint

    A IPC transport wrapper
    """

    pathname = ""
    endpoint = ""



@@ 43,11 45,12 @@ class IPCEndpoint():
        self.endpoint = "ipc://%s" % self.pathname


class TCPEndpoint():
class TCPEndpoint:
    """TCPEndpoint

    A TCP transport wrapper
    """

    address = ""
    port = ""
    endpoint = ""

M setup.py => setup.py +6 -9
@@ 2,14 2,11 @@ from setuptools import setup
from setuptools import find_packages

setup(
    name='JackPlug',
    version='0.1',
    author='Rodrigo Oliveira',
    author_email='rodrigo@byne.com.br',
    name="JackPlug",
    version="0.1",
    author="Rodrigo Oliveira",
    author_email="rodrigo@byne.com.br",
    packages=find_packages(),
    install_requires=[
        'pyzmq==19.0.2',
        'tornado==4.5.3'
    ],
    long_description=open('README.md').read(),
    install_requires=["pyzmq==19.0.2", "tornado==4.5.3"],
    long_description=open("README.md").read(),
)