~mdkcore/jackplug

a70e6ba408e20566050aefa19605f9e75e1933db — Rodrigo Oliveira 3 years ago e17be9c + 9274b7c
Merged in feature/PA-2589-move-jackplug-codebase-to-its-ow (pull request #1)

Feature/PA-2589 Move JackPlug codebase to its own repository

Approved-by: Mathias Hillesheim
M .gitignore => .gitignore +1 -0
@@ 4,3 4,4 @@
JackPlug.egg-info/
build/
dist/
WORK-*

M README.md => README.md +43 -2
@@ 1,6 1,47 @@
# JackPlug
ZMQ based microservice communications library

Two transports are supported: **ipc** (though *IPCEndpoint*, using the following path
as default: */tmp/jack.plug*), and **tcp** (*TCPEndpoint*, using *3559* as the default
port number).

## Requirements
    * simb.pilsner
    * pyzmq
* Tornado version 4.5.3 is currently needed, as newer versions does not play
  well on latest pyzmq packages.

### Development requirements:
* python 2.7 (or 3.8)
* pyenv 1.2.20 (optional)

## Installation instructions for development:
- Install pyenv (optional):
    - macos:

            $ brew install pyenv

    - linux:
        - Install pyenv from your package manager, or follow [these instructions](https://github.com/pyenv/pyenv#basic-github-checkout)

- Create a new python virtualenv (optional):

        $ pyenv install 2.7.18
        $ pyenv virtualenv 2.7.18 jackplug
        $ pyenv activate jackplug

- Install python requirements:

        $ pip install -r requirements.txt

## Examples
An example of the use of this library can be found on the *examples* folder, and
running them is pretty straightforward:

Run *examples/jack.py* and *examples/plug.py*, in different terminals. You
should set the same transport argument on both of them (**ipc** or **tcp**).

```bash
$ python examples/jack.py ipc
```
```bash
$ python examples/plug.py ipc
```

A examples/jack.py => examples/jack.py +75 -0
@@ 0,0 1,75 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.

This example subclasses JackBase to create a new class and send some messages
to the "server".
"""

import sys
from random import randint

from zmq.eventloop import ioloop

from jackplug.jack import JackBase
from jackplug.utils import IPCEndpoint
from jackplug.utils import TCPEndpoint


class JackTest(JackBase):
    __send_loop = None

    def __init__(self, *args, **kwargs):
        super(JackTest, self).__init__(*args, **kwargs)

        print("Starting send loop")
        self._send_loop = ioloop.PeriodicCallback(self.send, 2000)
        self._send_loop.start()

        self.on_timeout(self._timeout_occurred)
        self.start()

    def _timeout_occurred(self):
        print("Timeout occurred!")

    def recv(self, message):
        print("Recv: %s" % message)

    def send(self):
        message = {"event": "message",
                   "data": "ABC"
                   }
        JackBase.send(self, message)


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()

    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
        print("Using TCP transport")
        use_tcp = True
    else:
        print("Unknown argument: %s" % sys.argv[1])
        sys.exit()

    ident = "Jack%s" % (randint(0, 1000))
    print("Acting as Jack (%s)" % ident)

    endpoint = None
    if use_ipc:
        endpoint = IPCEndpoint(pathname="/tmp/jack.plug.test")
    elif use_tcp:
        endpoint = TCPEndpoint(address="127.0.0.1", port="1234")

    jack = JackTest(service=ident, endpoint=endpoint)
    print("Exiting Jack...")

A examples/plug.py => examples/plug.py +61 -0
@@ 0,0 1,61 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
Copyright (c) 2016 BYNE. All rights reserved.

This example subclass PlugBase to act as a plug ("server").
"""

import sys
from jackplug.plug import PlugBase
from jackplug.utils import IPCEndpoint
from jackplug.utils import TCPEndpoint


class PlugTest(PlugBase):
    def __init__(self, *args, **kwargs):
        super(PlugTest, self).__init__(*args, **kwargs)

        self.on_timeout(self._timeout_occurred)
        self.start()

    def _timeout_occurred(self, service):
        print("Timeout occurred on service (plug) %s!" % service)

    def recv(self, service, message):

        print("Recv (%s): %s" % (message, service))
        print("Ok, answering :)")

        self.send(service, message)


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Thou shalt run as: %s [ipc | tcp]" % sys.argv[0])
        sys.exit()

    use_ipc = False
    use_tcp = False

    if sys.argv[1] == 'ipc':
        print("Using IPC transport")
        use_ipc = True
    elif sys.argv[1] == 'tcp':
        print("Using TCP transport")
        use_tcp = True
    else:
        print("Unknown argument: %s" % sys.argv[1])
        sys.exit()

    print("Acting as Plug")

    endpoint = None
    if use_ipc:
        endpoint = IPCEndpoint(pathname="/tmp/jack.plug.test")
    elif use_tcp:
        endpoint = TCPEndpoint(address="*", port="1234")

    plug = PlugTest(endpoint=endpoint)
    print("Exiting Plug...")

M jackplug/jack.py => jackplug/jack.py +14 -8
@@ 14,9 14,12 @@ import signal
import uuid

import zmq
from zmq.utils import jsonapi
from zmq.eventloop import ioloop, zmqstream

from utils import Configuration
from .utils import Configuration
from .utils import IPCEndpoint

from simb.pilsner import log as logging

log = logging.getLogger('Service')


@@ 25,21 28,21 @@ log = logging.getLogger('Service')
class JackBase(object):
    """Base Jack class

    This handles low level communication, plus heartbeat"""
    This handles low level communication (plus heartbeat), server side"""
    _timeout = None

    # on linux, len(pathname) == 107
    def __init__(self, service, pathname="/tmp/jack.plug"):
    def __init__(self, service, endpoint=IPCEndpoint()):
        """Class contructor

        :param service: name (or identifier) of this jack
        :param pathname: IPC pathname to be used (default: /tmp/jack.plug)
        :param endpoint: IPCEndpoint(pathname) or TCPEndpoint(address, port) to
        connect (default pathname: /tmp/jack.plug, default port: 3559)
        """
        self.context = zmq.Context.instance()

        self.socket = self.context.socket(zmq.DEALER)
        self._identity = str(uuid.uuid4())
        self.socket.identity = service.encode("ascii")
        self.socket.identity = service

        # use with flags=zmq.DONTWAIT on send; also, any new message sent after
        # reaching HWM will be discarded (dealer)


@@ 54,10 57,10 @@ class JackBase(object):
        # do not queue message if connection not completed (zmq level)
        self.socket.setsockopt(zmq.IMMEDIATE, 1)

        self.socket.connect("ipc://" + pathname)
        self.socket.connect(endpoint.endpoint)

        self.socket_stream = zmqstream.ZMQStream(self.socket)
        self.socket_stream.on_recv(self.recv)
        self.socket_stream.on_recv(self._recv)

        self._conf = Configuration.instance()
        self._liveness = self._conf.ping_max_liveness


@@ 83,6 86,9 @@ class JackBase(object):
        """Send a ping message to the other endpoint"""
        JackBase.send(self, {"event": "ping", "data": {'id': self._identity}})

    def _recv(self, message):
        self.recv(jsonapi.loads(message[0]))

    def recv(self, message):
        """Receive a message


M jackplug/plug.py => jackplug/plug.py +13 -10
@@ 18,7 18,9 @@ import zmq
from zmq.utils import jsonapi
from zmq.eventloop import ioloop, zmqstream

from utils import Configuration
from .utils import Configuration
from .utils import IPCEndpoint

from simb.pilsner import log as logging

log = logging.getLogger('Service')


@@ 27,12 29,12 @@ log = logging.getLogger('Service')
class PlugBase(object):
    """Base Plug class

    This handles low level communication, plus hearbeat"""
    This handles low level communication (plus hearbeat), microservice side"""
    _services_ping = dict()
    _timeout_callback = None
    _connection_callback = None

    # on linux, len(pathname) == 107
    def __init__(self, pathname="/tmp/jack.plug"):
    def __init__(self, endpoint=IPCEndpoint()):
        """Class constructor

        :param pathname: IPC pathname to be used (default: /tmp/jack.plug)


@@ 40,7 42,8 @@ class PlugBase(object):
        self.context = zmq.Context.instance()

        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.bind("ipc://" + pathname)

        self.socket.bind(endpoint.endpoint)

        # XXX check zmq.asyncio.Socket with recv_multipart
        self.socket_stream = zmqstream.ZMQStream(self.socket)


@@ 65,7 68,8 @@ class PlugBase(object):

    def heartbeat(self):
        """Check if known jacks are alive (pinging us)"""
        for service in self._services_ping.keys():
        services = list(self._services_ping.keys())
        for service in services:
            last_ping = self._services_ping[service]['last_ping']
            liveness = self._services_ping[service]['liveness']



@@ 90,7 94,7 @@ class PlugBase(object):
                        self._services_ping[service]['alive'] = False

                        if self._timeout_callback:
                            self._timeout_callback(service)
                            self._timeout_callback(service.decode())
                elif liveness < 0:
                    del self._services_ping[service]
                else:


@@ 134,7 138,7 @@ class PlugBase(object):
                self._services_ping[service]['id'] = identity

                if self._connection_callback:
                    self._connection_callback(service, True)
                    self._connection_callback(service.decode(), True)

            return



@@ 159,8 163,7 @@ class PlugBase(object):
        if self.socket.closed:
            return

        self.socket.send_multipart([service.encode("ascii"),
                                    jsonapi.dumps(message)])
        self.socket.send_multipart([service, jsonapi.dumps(message)])

    def start(self):
        """Initialize all plug loopers"""

M jackplug/utils.py => jackplug/utils.py +47 -0
@@ 6,6 6,9 @@ class Configuration(object):
    """Configuration class for Jack and Plug"""
    _instance = None

    DEFAULT_IPC_PATHNAME = "/tmp/jack.plug"
    DEFAULT_TCP_PORT = "3559"

    # XXX read from config file
    ping_interval = 3000.0
    ping_max_liveness = 3


@@ 19,3 22,47 @@ class Configuration(object):
            self._instance = self()

        return self._instance


class IPCEndpoint():
    """IPCEndpoint

    A IPC transport wrapper
    """
    pathname = ""
    endpoint = ""

    # on linux, len(pathname) == 107
    def __init__(self, pathname=Configuration.DEFAULT_IPC_PATHNAME):
        """Class constructor

        :param pathname: IPC pathname to connect (default: /tmp/jack.plug)
        """
        self.pathname = pathname

        self.endpoint = "ipc://%s" % self.pathname


class TCPEndpoint():
    """TCPEndpoint

    A TCP transport wrapper
    """
    address = ""
    port = ""
    endpoint = ""

    def __init__(self, address, port=Configuration.DEFAULT_TCP_PORT):
        """Class constructor

        :param address: TCP IP address or interface to connect. Example:
            "127.0.0.1"
            "eth0"
            "*"
        A wildcard will bind the socket to all available interfaces.
        :param port: TCP PORT number to connect (default: 3559)
        """
        self.address = address
        self.port = port

        self.endpoint = "tcp://%s:%s" % (self.address, self.port)

A requirements.txt => requirements.txt +2 -0
@@ 0,0 1,2 @@
pyzmq==19.0.2
tornado==4.5.3

M setup.py => setup.py +5 -2
@@ 7,7 7,10 @@ setup(
    author='Rodrigo Oliveira',
    author_email='rodrigo@byne.com.br',
    packages=find_packages(),
    install_requires=['simb.pilsner',
                      'pyzmq==15.4.0'],
    install_requires=[
        'simb.pilsner',
        'pyzmq==19.0.2',
        'tornado==4.5.3'
    ],
    long_description=open('README.md').read(),
)