~martijnbraam/pts-devctl

a9f4b2905195925d483c7f0866af56d7a5d839ee — Martijn Braam 2 months ago
Initial commit
6 files changed, 597 insertions(+), 0 deletions(-)

A .gitignore
A README.md
A devctl/__init__.py
A devctl/__main__.py
A devctl/enumerator.py
A devctl/util.py
A  => .gitignore +141 -0
@@ 1,141 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

/.idea
\ No newline at end of file

A  => README.md +54 -0
@@ 1,54 @@
# devctl - single-device controller for the test setup

This utility is for directly interfacing with a device hooked up to a testboard without having a web controller in
between. This is for debugging the testfarm or for single-device developer setups.

## Usage

Show the detected test hardware hooked up to the machine
```
$ devctl list
Test devices:
USB port    Serial       Control TTY    Device TTY
----------  -----------  -------------  ------------
1-3         E660D4A0A73  /dev/ttyACM2   /dev/ttyACM3

Disk devices:
USB port    Name             Disk
----------  ---------------  --------
1-8         PinePhone Pro    /dev/sdb
```

This shows one of the Raspberry Pi Pico boards connected to USB port `1-3` and providing ttyACM2 and ttyACM3 for
controlling the device and getting the serial port. It also shows the PinePhone Pro booted into Tow-Boot mass storage
mode on USB port `1-8`. These two USB ports happen to be the tow ports on the left side of my laptop, these unique
port identifiers can be used to specify which devices to use in a more permanent way than the random linux device
assignments

To flash an image and boot into that:

```
$ devctl --picotest 1-3 --diskdevice 1-8 flash example.img
...
```

This will do the following:

* Find the USB devices by the port numbers for this test run
* Hard reset the phone to get into the bootloader
* Launch Tow-Boot mass storage mode
* Wait for a block device to show up from port `1-8` and write example.img to it
* Reset the phone to do a normal boot
* Output all the serial output of the device to stdout, categorized by boot stage.

There are multiple ways to specify the hardware to use. The options are:

```
$ devctl --picotest 1-3 --diskdevice 1-8
$ devctl --controldevice /dev/ttyACM0 --uartdevice /dev/ttyACM1 --diskdevice /dev/sdb
$ devctl --picotest 1-3 --uartdevice /dev/ttyUSB0 --diskdevice 1-8
```

The `--picotest` argument always is used as the base for the information if given. if `--uartdevice` or
`--controldevice` is specified those values are preferred over the picotest argument. All arguments can
take either a path to the device or the usb port identifier.
\ No newline at end of file

A  => devctl/__init__.py +0 -0
A  => devctl/__main__.py +232 -0
@@ 1,232 @@
import argparse
import os.path
import sys
import threading
import time
import subprocess

import colorama
from colorama import Fore
import tabulate
import serial

from devctl.enumerator import find_uart_ports, find_disks, get_device_from_args
from devctl.util import printe, printc


class Flasher(threading.Thread):
    def __init__(self, source, target, control):
        threading.Thread.__init__(self)
        self.source = source
        self.target = target
        self.control = control

    def find_target(self):
        if '-' in self.target:
            disks = find_disks()
            for disk in disks:
                if disk.diskport == self.target:
                    return disk.diskdevice
        elif os.path.exists(self.target):
            return self.target
        return None

    def run(self):
        # Start dd to flash the image to the device
        printe('flasher', f'Write {self.source} to {self.target}')

        while True:
            target = self.find_target()
            if target is not None:
                break
            time.sleep(0.2)

        printe('flasher', f'{target} appeared')

        subprocess.run([
            'sudo', 'dd', f'if={self.source}', f'of={target}',
            'bs=1M', 'oflag=direct,sync', 'status=progress'
        ])

        # Run a filesystem sync to make sure
        printe('flasher', f'Running sync')
        subprocess.run(['sudo', 'sync'])

        # Press the power button
        printe('flasher', f'Flashing complete, shutting down')
        self.control.press_power(True)
        time.sleep(8)
        self.control.press_power(False)
        time.sleep(1)
        printe('flasher', f'Powering on')
        self.control.press_power(True)


class ButtonController:
    def __init__(self, device):
        self.device = device
        self.port = serial.Serial(self.device, 115200)

        # Disable all gpios
        self.port.write(b'pbr')

        self.power_key = False
        self.bootloader_key = False
        self.power = False

    def press_power(self, state):
        self.power_key = state
        self.port.write(b'B' if state else b'b')

    def press_bootloader(self, state):
        self.bootloader_key = state
        self.port.write(b'R' if state else b'r')

    def set_power(self, state):
        self.power = state
        self.port.write(b'P' if state else b'p')


def flash(args, device):
    boot_to_ums = True

    uart = serial.Serial(device.uartdevice, args.baudrate, timeout=5)
    control = ButtonController(device.controldevice)

    time.sleep(1)

    # Hard reset device and launch tow-boot UMS mode.
    printc('Hard reset (15 seconds)', 'sys')
    control.press_power(True)
    time.sleep(15)
    uart.flushInput()
    control.press_power(False)
    printc('Hard reset completed, starting device...', 'sys')
    time.sleep(1)
    control.press_power(True)
    mode = 'unknown'

    while True:
        oldmode = mode
        line = uart.readline()
        if line.startswith(b'U-Boot SPL '):
            if oldmode == 'spl':
                printe("error", "Boot loop detected")
            mode = 'spl'
            if control.power_key:
                control.press_power(False)
                if boot_to_ums:
                    control.press_bootloader(True)
        elif line.startswith(b'Tow-Boot '):
            mode = 'towboot'
        elif mode == 'towboot' and line.startswith(b'Starting kernel'):
            mode = 'kernel'
        elif mode == 'towboot' and line.startswith(b'Allwinner mUSB OTG'):
            mode = 'ums'
            # Don't hold the bootloader key on the next boot to get into the new kernel
            boot_to_ums = False
            control.press_bootloader(False)

            # Start writing the image in a second thread
            thread = Flasher(args.image, args.diskdevice, control)
            thread.daemon = True
            thread.start()
        elif line.startswith(b'[    0.000000] Booting Linux on physical CPU'):
            mode = 'kernel'

        if mode != oldmode:
            if mode == 'spl':
                printe("state", "Reset detected")
            else:
                printe("state", "Moved from [" + Fore.BLUE + oldmode + Fore.RESET + "] to [" +
                       Fore.BLUE + mode + Fore.RESET + "]")

        if mode == 'towboot' and b'to enter the boot menu' in line:
            printe("state", "Boot menu prompt")

        if mode == 'ums' and len(line) < 4:
            # Don't try to print the spinner animation in UMS mode
            continue

        printc(line, mode=mode)


def cmd_list(args):
    testdevices = []
    uartadapters = []

    ports = find_uart_ports()
    for dev in ports:
        if dev.type == 'pico':
            testdevices.append(dev)
        else:
            uartadapters.append(dev)
    diskdevices = find_disks()

    if len(testdevices) > 0:
        print(Fore.BLUE + "Test devices:" + Fore.RESET)
        rows = []
        for dev in testdevices:
            rows.append([dev.controlport, dev.serial, dev.controldevice, dev.uartdevice])
        print(tabulate.tabulate(rows, ['USB port', 'Serial', 'Control TTY', 'Device TTY']))
        print()

    if len(uartadapters) > 0:
        print(Fore.BLUE + "Uart adapters:" + Fore.RESET)
        rows = []
        for dev in uartadapters:
            rows.append([dev.uartport, dev.name, dev.uartdevice])
        print(tabulate.tabulate(rows, ['USB port', 'Name', 'Device TTY']))
        print()

    if len(diskdevices) > 0:
        print(Fore.BLUE + "Disk devices:" + Fore.RESET)
        rows = []
        for dev in diskdevices:
            rows.append([dev.diskport, dev.name, dev.diskdevice])
        print(tabulate.tabulate(rows, ['USB port', 'Name', 'Disk']))


def cmd_flash(args):
    device = get_device_from_args(args)
    if device is None:
        printe('system', 'No device found')
        sys.exit(1)
    if device.controldevice is None:
        printe('system', 'Control device is not set')
        sys.exit(1)
    if device.uartdevice is None:
        printe('system', 'UART device is not set')
        sys.exit(1)
    if args.diskdevice is None:
        printe('system', 'Disk device is not set')
        sys.exit(1)

    flash(args, device)


def main():
    parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    subparsers = parser.add_subparsers(title='Subcommands',
                                       description='Pick one of the subcommands to run')
    parser.add_argument('--picotest', help='Serial number or port for the pico board')
    parser.add_argument('--uartdevice', help='Path or port for the UART adapter')
    parser.add_argument('--controldevice', help='Path or port for the control UART adapter')
    parser.add_argument('--diskdevice', help='Path or port to the disk to flash to')
    parser.add_argument('--baudrate', help='Baudrate for the uartdevice', default=115200, type=int)

    parser_list = subparsers.add_parser('list', help='List connected hardware')
    parser_list.set_defaults(func=cmd_list)
    parser_flash = subparsers.add_parser('flash', help='Flash image and connect')
    parser_flash.add_argument('image')
    parser_flash.set_defaults(func=cmd_flash)

    args = parser.parse_args()
    colorama.init()

    # Call the function set by the subparser
    args.func(args)


if __name__ == '__main__':
    main()

A  => devctl/enumerator.py +155 -0
@@ 1,155 @@
import os
import glob
import sys

from devctl.util import printe


class Device:
    def __init__(self, t):
        self.type = t
        self.name = None
        self.uartdevice = None
        self.controldevice = None
        self.diskdevice = None
        self.serial = None
        self.controlport = None
        self.uartport = None
        self.diskport = None


def find_uart_ports():
    testdevices = []
    uartadapters = []

    # Find all the USB test jigs
    for device in glob.glob('/sys/bus/usb/devices/*/product'):
        path = device.replace('/product', '')
        port = path.replace('/sys/bus/usb/devices/', '')

        with open(device, 'r') as handle:
            product = handle.read().strip()

        # Check if it's the raspberry pi pico test device
        if product == 'Test Device':
            with open(os.path.join(path, 'serial'), 'r') as handle:
                serial = handle.read().strip()

            result = Device('pico')
            result.serial = serial
            result.controlport = port
            testdevices.append(result)

    # Find all USB devices that load TTY stuff
    for device in glob.glob('/sys/bus/usb/devices/*/tty*'):
        part = device.split('/')
        ttydir = part[-1]
        port_intf = part[-2]
        port, intf = port_intf.split(':', maxsplit=1)

        if ttydir == 'tty':
            subdir = list(glob.glob(os.path.join(device, '*')))[0]
            sdp = subdir.split('/')[-1]
            tty = os.path.join('/dev', sdp)
        else:
            tty = os.path.join('/dev', ttydir)

        for device in testdevices:
            if device.controlport == port:
                # This is part of a test pico, find the interface name
                with open(os.path.join('/sys/bus/usb/devices', port_intf, 'interface')) as handle:
                    interface = handle.read().strip()

                if interface == 'Control':
                    device.controldevice = tty
                else:
                    device.uartdevice = tty
                break
        else:
            result = Device('uart')
            result.uartport = port
            result.uartdevice = tty
            with open(os.path.join('/sys/bus/usb/devices', port, 'product')) as handle:
                result.name = handle.read().strip()
            uartadapters.append(result)

    return testdevices + uartadapters


def find_disks():
    diskdevices = []
    # Find all USB devices that are SCSI storage
    for device in glob.glob('/sys/bus/usb/devices/*/host*'):
        part = device.split('/')
        port_intf = part[-2]
        port, intf = port_intf.split(':', maxsplit=1)

        disks = list(glob.glob(os.path.join(device, 'target*', '*', 'block', '*')))
        for disk in disks:
            blockdev = disk.split('/')[-1]
            result = Device('disk')
            result.diskport = port
            result.diskdevice = os.path.join('/dev', blockdev)

            if os.path.isfile(os.path.join('/sys/bus/usb/devices', port, 'manufacturer')):
                with open(os.path.join('/sys/bus/usb/devices', port, 'manufacturer')) as handle:
                    result.name = handle.read().strip() + " "
            else:
                result.name = ""

            if os.path.isfile(os.path.join('/sys/bus/usb/devices', port, 'product')):
                with open(os.path.join('/sys/bus/usb/devices', port, 'product')) as handle:
                    result.name += handle.read().strip()
            result.name = result.name.strip()
            diskdevices.append(result)
    return diskdevices


def get_device_from_args(args):
    adapters = find_uart_ports()
    device = None
    if args.picotest is not None:
        if '-' in args.picotest:
            for dev in adapters:
                if dev.type == 'pico' and dev.controlport == args.picotest:
                    device = dev
                    break
        else:
            for dev in adapters:
                if dev.type == 'pico' and dev.serial == args.picotest:
                    device = dev
                    break
        if device is None:
            printe('system', 'Could not find Pico device: ' + args.picotest)
            sys.exit(1)

    if args.uartdevice is not None:
        if '-' in args.uartdevice:
            for dev in adapters:
                if dev.type == 'uart' and dev.uartport == args.uartdevice:
                    if device is None:
                        device = dev
                    else:
                        device.uartport = dev.uartport
                        device.uartdevice = dev.uartdevice
        else:
            device.uartdevice = args.uartdevice

    if args.controldevice is not None:
        if '-' in args.controldevice:
            for dev in adapters:
                if dev.type == 'uart' and dev.uartport == args.controldevice:
                    if device is None:
                        device = dev
                    else:
                        device.controlport = dev.uartport
                        device.controldevice = dev.uartdevice
        else:
            device.uartdevice = args.uartdevice

    if args.diskdevice is not None:
        if '-' in args.diskdevice:
            device.diskport = args.diskdevice
        else:
            device.diskdevice = args.diskdevice
    return device

A  => devctl/util.py +15 -0
@@ 1,15 @@
from colorama import Fore


def printc(line, mode):
    if isinstance(line, bytes):
        try:
            line = line.decode()
        except:
            pass

    print(Fore.BLUE + mode.ljust(10) + Fore.RESET + line.rstrip())


def printe(event, line):
    print(Fore.YELLOW + event.ljust(10) + Fore.RESET + line.rstrip())