~martijnbraam/pyatem

5769f8cdde3082b1cb3bb1e2fdbc5fd6b2bc22ea — Martijn Braam 2 years ago d4c72ae stills-v2
File transfer cleanup
6 files changed, 125 insertions(+), 39 deletions(-)

D pyatem/colorspace.c
M pyatem/command.py
M pyatem/field.py
M pyatem/media.py
M pyatem/protocol.py
M pyatem/transport.py
D pyatem/colorspace.c => pyatem/colorspace.c +0 -11
@@ 1,11 0,0 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>

static PyObject *
pyatem_colorpsace(PyObject *self, PyObject *args)
{

}

PyMODINIT_FUNC
PyInit_pyatem
\ No newline at end of file

M pyatem/command.py => pyatem/command.py +5 -1
@@ 1926,7 1926,11 @@ class TransferDownloadRequestCommand(Command):
        self.slot = slot

    def get_command(self):
        data = struct.pack('>HHII', self.transfer, self.store, self.slot, 0x00965340)
        u1 = 0
        u2 = 0
        u3 = 0
        u4 = 0xff
        data = struct.pack('>HHI 4B', self.transfer, self.store, self.slot, u1, u2, u3, u4)
        return self._make_command('FTSU', data)



M pyatem/field.py => pyatem/field.py +66 -4
@@ 218,9 218,9 @@ class VideoModeField(FieldBase):
    def get_pixels(self):
        lut = {
            525: 525,
            720: 1280*720,
            1080: 1920*1080,
            2160: 3840*2160,
            720: 1280 * 720,
            1080: 1920 * 1080,
            2160: 3840 * 2160,
        }
        return lut[self.resolution]



@@ 2090,8 2090,18 @@ class LockObtainedField(FieldBase):
    """
    Data from the `LKOB`. This signals that a datastore lock has been successfully obtained for
    a specific datastore index. Used for data transfers.

    ====== ==== ====== ===========
    Offset Size Type   Descriptions
    ====== ==== ====== ===========
    0      2    u16    Store id
    2      2    ?      Unknown
    ====== ==== ====== ===========

    """

    CODE = "LKOB"

    def __init__(self, raw):
        self.raw = raw
        self.store, = struct.unpack('>H2x', raw)


@@ 2103,8 2113,18 @@ class LockObtainedField(FieldBase):
class FileTransferDataField(FieldBase):
    """
    Data from the `FTDa`. This is an incoming chunk of data for a running file transfer.

    ====== ==== ====== ===========
    Offset Size Type   Descriptions
    ====== ==== ====== ===========
    0      2    u16    Transfer id
    2      2    u16    Data length
    ?      ?    bytes  The rest of the packet contains [Data length] bytes of data
    ====== ==== ====== ===========
    """

    CODE = "FTDa"

    def __init__(self, raw):
        self.raw = raw
        self.transfer, self.size = struct.unpack('>HH', raw[0:4])


@@ 2116,9 2136,27 @@ class FileTransferDataField(FieldBase):

class FileTransferErrorField(FieldBase):
    """
    Data from the `FTDE`. Somehting went wrong with a file transfer.
    Data from the `FTDE`. Something went wrong with a file transfer and it has been aborted.

    ====== ==== ====== ===========
    Offset Size Type   Descriptions
    ====== ==== ====== ===========
    0      2    u16    Transfer id
    2      1    u8     Error code
    3      1    ?      unknown
    ====== ==== ====== ===========

    ========== ===========
    Error code Description
    ========== ===========
    1          try-again, Try the transfer again
    2          not-found, The requested store/slot index doesn't contain data
    ========== ===========

    """

    CODE = "FTDE"

    def __init__(self, raw):
        self.raw = raw
        self.transfer, self.status = struct.unpack('>HBx', raw)


@@ 2129,3 2167,27 @@ class FileTransferErrorField(FieldBase):
            2: 'not-found',
        }
        return '<file-transfer-error transfer={} status={}>'.format(self.transfer, errors[self.status])


class FileTransferDataCompleteField(FieldBase):
    """
    Data from the `FTDC`. Sent after pushing a file.

    ====== ==== ====== ===========
    Offset Size Type   Descriptions
    ====== ==== ====== ===========
    0      2    u16    Transfer id
    2      1    u8     ? (always 1)
    3      1    u8     ? (always 2 or 6)
    ====== ==== ====== ===========

    """

    CODE = "FTDC"

    def __init__(self, raw):
        self.raw = raw
        self.transfer, self.u1, self.u2 = struct.unpack('>HBB', raw)

    def __repr__(self):
        return '<file-transfer-complete transfer={} u1={} u2={}>'.format(self.transfer, self.u1, self.u2)

M pyatem/media.py => pyatem/media.py +11 -9
@@ 3,18 3,21 @@ import struct

def atem_to_image(data, width, height):
    data = rle_decode(data)
    return data

    with open('/workspace/usb-0-0.data', 'wb') as handle:
        handle.write(data)

def rle_decode(data):
    """
    ATEM frames are compressed with a custom RLE encoding. Data in the frame is grouped in 8 byte chunks since
    that is exactly 2 pixels in the 10-bit YCbCr 4:2:2 data. Most of the data is sent without compression but
    if a 8 byte chunk is fully 0xfe then the following chunk is RLE compressed.

def is_rle_header(data, offset):
    if offset >= len(data):
        return True
    return

    An RLE compressed part is an 64 bit integer setting the repeat count following the 8-byte block of data
    to be repeated. This seems mainly useful to compress solid colors.

def rle_decode(data):
    :param data:
    :return:
    """
    result = bytearray()
    offset = 0
    in_size = len(data)


@@ 32,7 35,6 @@ def rle_decode(data):
            # Got an RLE block
            offset += 8
            count, = struct.unpack_from('>Q', data, offset)
            block = data[offset:offset + 8]
            offset += 8
            block = data[offset:offset + 8]
            result += block * count

M pyatem/protocol.py => pyatem/protocol.py +30 -14
@@ 1,6 1,7 @@
import logging
import struct

from pyatem.media import atem_to_image, rle_decode
from pyatem.transport import UdpProtocol, Packet, UsbProtocol, TcpProtocol
import pyatem.field as fieldmodule



@@ 28,6 29,7 @@ class AtemProtocol:
        self.transfer_queue = {}
        self.transfer_id = 42
        self.transfer_buffer = b''
        self.transfer_buffer2 = []
        self.transfer_store = None
        self.transfer_slot = None
        self.transfer_requested = False


@@ 148,7 150,8 @@ class AtemProtocol:
            'LKOB': 'lock-obtained',
            'FTDa': 'file-transfer-data',
            'LKST': 'lock-state',
            'FTDE': 'file-transfer-error'
            'FTDE': 'file-transfer-error',
            'FTDC': 'file-transfer-data-complete',
        }

        fieldname_to_unique = {


@@ 204,12 207,10 @@ class AtemProtocol:
            self._transfer_trigger(contents.store)
            return
        elif key == 'lock-state':
            logging.info('lock state changed')
            logging.debug('lock state changed')
        elif key == 'file-transfer-data':
            print(contents)
            if contents.transfer == self.transfer_id:
                self.transfer_packets += 1
                print("packet", self.transfer_packets)
                self.transfer_buffer += contents.data
                total_size = self.mixerstate['video-mode'].get_pixels() * 4
                transfer_progress = len(self.transfer_buffer) / total_size


@@ 217,23 218,34 @@ class AtemProtocol:
                # The 0 should be the transfer slot, but it seems it's always 0 in practice
                self.send_commands([TransferAckCommand(self.transfer_id, 0)])
            else:
                print("Hmm")
                logging.error('Got file transfer data for wrong transfer id')
            return
        elif key == 'file-transfer-error':
            self.transfer_requested = False
            if contents.status == 1:
                # Status is try-again
                logging.debug('Retrying transfer')
                self._transfer_trigger(self.transfer_store, retry=True)
                return
        elif key == 'FTDC':
        elif key == 'file-transfer-data-complete':
            logging.debug('Transfer complete')
            # Remove current item from the transfer queue
            queue = self.transfer_queue[self.transfer_store]
            self.transfer_queue[self.transfer_store] = queue[1:]

            # Assemble the buffer
            data = self.transfer_buffer
            self.transfer_buffer = b''
            self.transfer_id += 1
            self._transfer_trigger(self.transfer_store)
            self.transfer_requested = False

            # Decompress the buffer if needed
            if self.transfer_store == 0:
                data = rle_decode(data)

            self._raise('download-done', self.transfer_store, self.transfer_slot, data)

            # Start next transfer in the queue
            self._transfer_trigger(self.transfer_store)
            return

        if key in fieldname_to_unique:


@@ 352,10 364,8 @@ if __name__ == '__main__':

    logging.basicConfig(level=logging.INFO)

    # testmixer = AtemProtocol('192.168.2.84')

    testmixer = AtemProtocol(usb='auto')

    testmixer = AtemProtocol('192.168.2.84')
    # testmixer = AtemProtocol(usb='auto')

    def changed(key, contents):
        if key == 'time':


@@ 367,11 377,17 @@ if __name__ == '__main__':


    def connected():
        testmixer.download(0, 0)
        for sid in testmixer.mixerstate['mediaplayer-file-info']:
            still = testmixer.mixerstate['mediaplayer-file-info'][sid]
            if not still.is_used:
                continue
            print("Fetching {}".format(still.name))
            testmixer.download(0, still.index)


    def downloaded(store, slot, data):
        print(f"Downloaded {store}-{slot}")
        logging.info('Downloaded {}:{}'.format(store, slot))

        with open(f'/workspace/usb-{store}-{slot}.bin', 'wb') as handle:
            handle.write(data)


M pyatem/transport.py => pyatem/transport.py +13 -0
@@ 3,6 3,7 @@ import struct
import logging
import time
from queue import Queue, Empty
import collections
from urllib.parse import urlparse

import usb.core


@@ 108,6 109,7 @@ class UdpProtocol:

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.settimeout(5)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024 * 16)

        self.local_sequence_number = 0
        self.local_ack_number = 0


@@ 120,6 122,8 @@ class UdpProtocol:
        self.enable_ack = False
        self.had_traffic = False

        self.received_packets = collections.deque(maxlen=128)

    def _send_packet(self, packet):
        packet.session = self.session_id
        if not packet.flags & UdpProtocol.FLAG_ACK:


@@ 156,6 160,13 @@ class UdpProtocol:
        if self.session_id is None:
            self.session_id = packet.session

        is_retransmissions = packet.flags & UdpProtocol.FLAG_RETRANSMISSION
        if is_retransmissions:
            if self.remote_sequence_number in self.received_packets:
                return True

        self.received_packets.append(self.remote_sequence_number)

        if packet.flags & UdpProtocol.FLAG_RELIABLE and self.enable_ack:
            # This packet needs an ACK
            ack = Packet()


@@ 215,6 226,8 @@ class UdpProtocol:
    def receive_packet(self):
        while True:
            packet = self._receive_packet()
            if packet is True:
                continue
            if packet is None and not self.had_traffic:
                continue
            if packet is None and self.state == UdpProtocol.STATE_SYN_SENT: