~martijnbraam/pyatem

ba730106fdf058c715b891901c3e4c114ec0174b — Martijn Braam 10 months ago 2955a9d transfer-support
WIP file transfer support

TODO:
- Image format conversion
- Software -> Hardware transfers
- Bug where the atem only retransmits me packets I already have
5 files changed, 349 insertions(+), 20 deletions(-)

M pyatem/command.py
M pyatem/field.py
A pyatem/media.py
M pyatem/protocol.py
M pyatem/transport.py
M pyatem/command.py => pyatem/command.py +86 -0
@@ 1,6 1,8 @@
import colorsys
import struct

from hexdump import hexdump


class Command:
    def get_command(self):


@@ 919,3 921,87 @@ class KeyFillCommand(Command):
    def get_command(self):
        data = struct.pack('>BBH', self.index, self.keyer, self.source)
        return self._make_command('CKeF', data)


class LockCommand(Command):
    """
    Implementation of the `LOCK` command. This requests a new lock, used for data transfers.

    ====== ==== ====== ===========
    Offset Size Type   Description
    ====== ==== ====== ===========
    0      1    u16    Store index
    2      1    bool   Lock state
    3      1    ?      unknown
    ====== ==== ====== ===========

    """

    def __init__(self, store, state):
        """
        :param index: 0-indexed DSK number to trigger
        :param keyer: 0-indexed keyer number
        :param source: Source index for the keyer fill
        """
        self.store = store
        self.state = state

    def get_command(self):
        data = struct.pack('>H?x', self.store, self.state)
        return self._make_command('LOCK', data)


class TransferDownloadRequestCommand(Command):
    """
    Implementation of the `FTSU` command. This requests download from the switcher to the client.

    ====== ==== ====== ===========
    Offset Size Type   Description
    ====== ==== ====== ===========
    0      2    u16    Tranfer id
    2      2    u16    Store
    4      2    u16    Slot
    8      4    u32    unknown
    ====== ==== ====== ===========

    """

    def __init__(self, transfer, store, slot):
        """
        :param transfer: Unique transfer number
        :param store: Store index
        :param slot: Slot index
        """
        self.transfer = transfer
        self.store = store
        self.slot = slot

    def get_command(self):
        data = struct.pack('>HHH2xI', self.transfer, self.store, self.slot, 0x00965340)
        return self._make_command('FTSU', data)


class TransferAckCommand(Command):
    """
    Implementation of the `FTUA` command. This is an acknowledgement for FTDa packets.

    ====== ==== ====== ===========
    Offset Size Type   Description
    ====== ==== ====== ===========
    0      2    u16    Tranfer id
    2      2    u16    Slot
    ====== ==== ====== ===========

    """

    def __init__(self, transfer, slot):
        """
        :param transfer: Unique transfer number
        :param slot: Slot index
        """
        self.transfer = transfer
        self.slot = slot

    def get_command(self):
        data = struct.pack('>HH', self.transfer, self.slot)
        return self._make_command('FTUA', data)

M pyatem/field.py => pyatem/field.py +45 -0
@@ 1345,3 1345,48 @@ class KeyPropertiesBaseField(FieldBase):

    def __repr__(self):
        return '<key-properties-base me={}, key={}, type={}>'.format(self.index, self.keyer, self.type)


class LockObtainedField(FieldBase):
    """
    Data from the `LKOB`. This signals that a datastore lock has been successfully obtained for
    a specific datastore index. Used for data transfers.
    """

    def __init__(self, raw):
        self.raw = raw
        self.store, = struct.unpack('>H2x', raw)

    def __repr__(self):
        return '<lock-obtained store={}>'.format(self.store)


class FileTransferDataField(FieldBase):
    """
    Data from the `FTDa`. This is an incoming chunk of data for a running file transfer.
    """

    def __init__(self, raw):
        self.raw = raw
        self.transfer, self.size = struct.unpack('>HH', raw[0:4])
        self.data = raw[4:(4 + self.size)]

    def __repr__(self):
        return '<file-transfer-data transfer={} size={}>'.format(self.transfer, self.size)


class FileTransferErrorField(FieldBase):
    """
    Data from the `FTDE`. Somehting went wrong with a file transfer.
    """

    def __init__(self, raw):
        self.raw = raw
        self.transfer, self.status = struct.unpack('>HBx', raw)

    def __repr__(self):
        errors = {
            1: 'try-again',
            2: 'not-found',
        }
        return '<file-transfer-error transfer={} status={}>'.format(self.transfer, errors[self.status])

A pyatem/media.py => pyatem/media.py +50 -0
@@ 0,0 1,50 @@
import struct


def atem_to_image(data, width, height):
    data = rle_decode(data)

    with open('test.bin', 'wb') as handle:
        handle.write(data)


def is_rle_header(data, offset):
    if offset >= len(data):
        return True
    return


def rle_decode(data):
    result = bytearray()
    offset = 0
    in_size = len(data)

    while offset < in_size:
        block = data[offset:offset + 8]
        if data[offset] == 0xfe \
                and data[offset + 1] == 0xfe \
                and data[offset + 2] == 0xfe \
                and data[offset + 3] == 0xfe \
                and data[offset + 4] == 0xfe \
                and data[offset + 5] == 0xfe \
                and data[offset + 6] == 0xfe \
                and data[offset + 7] == 0xfe:
            # Got an RLE block
            offset += 8
            count, = struct.unpack_from('>Q', data, offset)
            block = data[offset:offset + 8]
            offset += 8
            block = data[offset:offset + 8]
            result += block * count
            offset += 8
        else:
            # Raw data block
            result += block
            offset += 8
    return result


if __name__ == '__main__':
    with open('download-0-0.bin', 'rb') as handle:
        raw = handle.read()
    atem_to_image(raw, 1920, 1080)

M pyatem/protocol.py => pyatem/protocol.py +104 -10
@@ 11,6 11,14 @@ class AtemProtocol:

        self.mixerstate = {}
        self.callbacks = {}
        self.locks = {}
        self.mode = None
        self.transfer_queue = {}
        self.transfer_id = 42
        self.transfer_buffer = b''
        self.transfer_store = None
        self.transfer_slot = None
        self.transfer_requested = False

    def connect(self):
        logging.debug('Starting connection')


@@ 23,6 31,57 @@ class AtemProtocol:
        for fieldname, data in self.decode_packet(packet.data):
            self.save_field_data(fieldname, data)

    def download(self, store, index):
        if store not in self.transfer_queue:
            self.transfer_queue[store] = []
        self.transfer_queue[store].append(index)
        self._transfer_trigger(store)

    def _transfer_trigger(self, store, retry=False):
        next = None

        # Try the preferred queue
        if store in self.transfer_queue:
            if len(self.transfer_queue[store]) > 0:
                next = (store, self.transfer_queue[store][0])

        # Try any queue
        if next is None:
            for store in self.transfer_queue:
                if len(self.transfer_queue[store]) > 0:
                    next = (store, self.transfer_queue[store][0])
                    break

        # All transfers done, clean locks
        if next is None:
            for lock in self.locks:
                if self.locks[lock]:
                    logging.debug('Releasing lock {}'.format(lock))
                    cmd = LockCommand(lock, False)
                    self.send_commands([cmd])
            return

        # Request a lock if needed
        if next[0] not in self.locks or not self.locks[next[0]]:
            logging.debug('Requesting lock for {}'.format(next[0]))
            cmd = LockCommand(next[0], True)
            self.send_commands([cmd])
            return

        # A transfer request is already running, don't start a new one
        if self.transfer_requested:
            logging.debug('Request already submitted, do nothing')
            return

        # Assign a transfer id and start the transfer
        if not retry:
            self.transfer_id += 1
        self.transfer_store, self.transfer_slot = next
        cmd = TransferDownloadRequestCommand(self.transfer_id, next[0], next[1])
        logging.debug('Requesting download of {}:{}'.format(*next))
        self.transfer_requested = True
        self.send_commands([cmd])

    def on(self, event, callback):
        if event not in self.callbacks:
            self.callbacks[event] = []


@@ 88,6 147,10 @@ class AtemProtocol:
            'FASD': 'fairlight-strip-ding',
            'FAIP': 'fairlight-audio-input',
            'AMIP': 'audio-input',
            'LKOB': 'lock-obtained',
            'FTDa': 'file-transfer-data',
            'LKST': 'lock-state',
            'FTDE': 'file-transfer-error',
        }

        fieldname_to_unique = {


@@ 118,6 181,35 @@ class AtemProtocol:
            if hasattr(fieldmodule, classname):
                contents = getattr(fieldmodule, classname)(contents)

        if key == 'lock-obtained':
            logging.debug('Got lock for {}'.format(contents.store))
            self.locks[contents.store] = True
            self._transfer_trigger(contents.store)
            return
        elif key == 'lock-state':
            logging.debug('Lock state changed')
        elif key == 'file-transfer-data':
            if contents.transfer == self.transfer_id:
                self.transfer_buffer += contents.data
                self.send_commands([TransferAckCommand(self.transfer_id, 0)])
            return
        elif key == 'file-transfer-error':
            if contents.status == 1:
                # Status is try-again
                logging.debug("Retrying transfer")
                self._transfer_trigger(self.transfer_store, retry=True)
                return
        elif key == 'FTDC':
            logging.debug('Transfer complete')
            queue = self.transfer_queue[self.transfer_store]
            self.transfer_queue[self.transfer_store] = queue[1:]
            self._transfer_trigger(self.transfer_store)
            self._raise('download-done', self.transfer_store, self.transfer_slot, self.transfer_buffer)
            return
        elif key == 'video-mode':
            # Store the current video mode for image processing
            self.mode = contents

        if key in fieldname_to_unique:
            idx, = struct.unpack_from(fieldname_to_unique[key], raw, 0)
            if key not in self.mixerstate:


@@ 145,11 237,11 @@ class AtemProtocol:


if __name__ == '__main__':
    from pyatem.command import CutCommand
    from pyatem.command import CutCommand, TransferDownloadRequestCommand, LockCommand, TransferAckCommand

    logging.basicConfig(level=logging.INFO)

    testmixer = AtemProtocol('192.168.2.17')
    testmixer = AtemProtocol('192.168.2.84')

    waiter = 5
    waiting = False


@@ 161,16 253,9 @@ if __name__ == '__main__':
        global waiting
        global done

        if waiting and not done:
            waiter -= 1
            if waiter == 0:
                logging.debug('SENDING CUT')
                done = True
                cmd = CutCommand(index=0)
                testmixer.send_commands([cmd])

        if key == 'InCm':
            waiting = True
            testmixer.download(0, 0)

        if key == 'time':
            return


@@ 180,7 265,16 @@ if __name__ == '__main__':
            print(key)


    def downloaded(store, slot, data):
        filename = 'download-{}-{}.bin'.format(store, slot)
        print("Saving data to {}".format(filename))
        with open(filename, 'wb') as handle:
            handle.write(data)
        exit(0)


    testmixer.on('change', changed)
    testmixer.on('download-done', downloaded)

    testmixer.connect()
    while True:

M pyatem/transport.py => pyatem/transport.py +64 -10
@@ 11,6 11,7 @@ class Packet:
        self.sequence_number = 0
        self.acknowledgement_number = 0
        self.remote_sequence_number = 0
        self.retransmission_number = 0
        self.data = None
        self.debug = False
        self.original = None


@@ 20,7 21,7 @@ class Packet:
    def from_bytes(cls, packet):
        res = cls()
        res.original = packet
        fields = struct.unpack('>HHH 2x HH', packet[0:12])
        fields = struct.unpack('>HHH H HH', packet[0:12])
        res.length = fields[0] & ~(0x1f << 11)
        res.flags = (fields[0] & (0x1f << 11)) >> 11



@@ 31,8 32,9 @@ class Packet:

        res.session = fields[1]
        res.acknowledgement_number = fields[2]
        res.remote_sequence_number = fields[3]
        res.sequence_number = fields[4]
        res.retransmission_number = fields[3]
        res.remote_sequence_number = fields[4]
        res.sequence_number = fields[5]

        res.data = packet[12:]
        return res


@@ 41,10 43,11 @@ class Packet:
        header_len = 12
        data_len = len(self.data) if self.data is not None else 0
        packet_len = header_len + data_len
        result = struct.pack('!HHH 2x HH',
        result = struct.pack('!HHH H HH',
                             packet_len + (self.flags << 11),
                             self.session,
                             self.acknowledgement_number,
                             self.retransmission_number,
                             self.remote_sequence_number,
                             self.sequence_number)



@@ 64,7 67,7 @@ class Packet:
            flags += ' RETRANSMISSION'
        if self.flags & UdpProtocol.FLAG_REQUEST_RETRANSMISSION:
            flags += ' REQ-RETRANSMISSION'
            extra = ' req={}'.format(self.remote_sequence_number)
            extra = ' req={}'.format(self.retransmission_number)
        if self.flags & UdpProtocol.FLAG_ACK:
            flags += ' ACK'
            extra = ' ack={}'.format(self.acknowledgement_number)


@@ 102,12 105,19 @@ class UdpProtocol:
        self.state = UdpProtocol.STATE_CLOSED
        self.session_id = 0x1337

        self.packetlog = {}
        self.reorder_buffer = {}
        self.missing = []

        self.enable_ack = False

    def _send_packet(self, packet):
        packet.session = self.session_id
        if not packet.flags & UdpProtocol.FLAG_ACK:
            packet.sequence_number = (self.local_sequence_number + 1) % 2 ** 16
        if packet.flags & UdpProtocol.FLAG_REQUEST_RETRANSMISSION:
            print(packet)
        self.packetlog[packet.sequence_number] = packet
        raw = packet.to_bytes()
        self.sock.sendto(raw, (self.ip, self.port))
        logging.debug('> {}'.format(packet))


@@ 118,18 128,51 @@ class UdpProtocol:
            self.local_sequence_number = (self.local_sequence_number + 1) % 2 ** 16

    def _receive_packet(self):

        if len(self.missing) == 0 and len(self.reorder_buffer) > 0:
            print("Flusing reorder buffer")
            next_seq = list(sorted(self.reorder_buffer.keys()))[0]
            packet = self.reorder_buffer[next_seq]
            del self.reorder_buffer[next_seq]
            return packet

        data, address = self.sock.recvfrom(2048)
        packet = Packet.from_bytes(data)
        logging.debug('< {}'.format(packet))

        if packet.flags & UdpProtocol.FLAG_REQUEST_RETRANSMISSION:
            pass
            # hexdump(data)
            logging.debug("ATEM requested a retransmission")
            requested = self.packetlog[packet.retransmission_number]
            requested.flags |= UdpProtocol.FLAG_RETRANSMISSION
            self._send_packet(requested)
            return

        if packet.flags & UdpProtocol.FLAG_ACK:
            keys = list(self.packetlog.keys())
            for i in keys:
                if i <= packet.acknowledgement_number:
                    del self.packetlog[i]

        new_sequence_number = packet.sequence_number
        if new_sequence_number > self.remote_sequence_number + 1:

        if packet.flags & UdpProtocol.FLAG_RETRANSMISSION:
            if packet.sequence_number in self.missing:
                self.reorder_buffer[packet.sequence_number] = packet
                self.missing.remove(packet.sequence_number)
                print(self.missing)
            else:
                print("Somehow got {}".format(packet.sequence_number))
        elif new_sequence_number == 0:
            pass
        self.remote_sequence_number = new_sequence_number
        else:
            if new_sequence_number != 0 and new_sequence_number > 14 and new_sequence_number > self.remote_sequence_number + 1:
                num_missing = new_sequence_number - self.remote_sequence_number - 1
                first_missing = self.remote_sequence_number + 1
                last_missing = first_missing + num_missing - 1
                print("Missed packet {}..{}".format(first_missing, last_missing))
                for i in range(first_missing, last_missing):
                    self.missing.append(i)
            self.remote_sequence_number = new_sequence_number

        if self.session_id is None:
            self.session_id = packet.session


@@ 138,10 181,19 @@ class UdpProtocol:
            # This packet needs an ACK
            ack = Packet()
            ack.flags = UdpProtocol.FLAG_ACK
            ack.acknowledgement_number = self.remote_sequence_number
            ack.acknowledgement_number = packet.sequence_number
            ack.remote_sequence_number = 0x61
            print("ACK {}".format(packet.sequence_number))
            self._send_packet(ack)

        if len(self.missing) > 0:
            # Store packets until the missing ones are received
            self.reorder_buffer[packet.sequence_number] = packet
            return

        if len(self.reorder_buffer) > 0:
            return

        return packet

    def _handshake(self, packet):


@@ 185,6 237,8 @@ class UdpProtocol:
    def receive_packet(self):
        while True:
            packet = self._receive_packet()
            if packet is None:
                continue
            if self.state == UdpProtocol.STATE_SYN_SENT:
                # Got response for the first handshake packet
                self._handshake(packet)