~hww3/org.openhab.binding.appletv

4c10a68a87284dc4c8d9cf1a739387b85326dfc1 — H. William Welliver III 1 year, 10 months ago 6378faa
Rework external pyatv process handling. Close all streams from pipe to pyatv, should fix a major fd leak.

Also, update README and add start of command queue
M README.md => README.md +14 -6
@@ 22,6 22,9 @@ New features:
* Artwork channel (read-only) added to provide artwork Image data
* App Name channel (read-only) added to display name of app playing (which is not necessarily the app currently on screen)
* App Id channel (read-write) added to display the bundle identifier of the app playing. Can be written to in order to request a given application be launched. There seem to be certain constraints on this, it /seems/ that the play state must be stopped in order for the new application to be launched. This is likely an AppleTV side limitation.
* Support for HomePods and other devices
* Binding now understands which protocols must be paired and will indicate this during thing configuration.
* Binding has very primitive support for using HomePods and AppleTV devices as audio sinks. This is preliminary and has a number of restrictions

Installing:



@@ 87,19 90,24 @@ atvremote pair -i C869CD8002EA --protocol companion

Once you've paired and entered the authentication strings into your thing configuration, the binding should connect and start receiving updates automatically. The binding currently outputs a good deal of information about the process, so if things don't seem to be working, you should check your openhab log file. 

UPDATES:

* Added a number of new remote keys, such as Home, Home Hold and Skip Forward/Backward
* Shuffle mode, Repeat mode and Position are wired up for setting
* App Name and App ID channels are now updated properly

TODO:

* Better error handling when pyATV has a problem
* Need to control restarting of pyATV tools so that if the tool fails to start repeatedly, we back off.
* Need to control restarting of pyATV tools so that if the tool fails to start repeatedly, we back off. (in progress)
* Ought to queue commands so that they aren't lost if the pyATV connection is temporarily down.
* Ought to set thing to offline if our connections to pyATV are not up.
* Out to figure out why killing a pyATV subprocess causes openhab to restart.
* Need to permit location of pyATV tools to be specified
* Need to permit location of pyATV tools to be specified (python venv?)
* Should detect available features (in progress)
* Need to submit pyATV enhancements upstream
* Broader support for more ATV features: sending streams to airplay, text to speech, etc.
* Easier installation
* Broader support for more ATV features: sending streams to airplay, text to speech, etc. (in progress)
* Easier installation (perhaps automate the python setup process?)
* Easier pairing (maybe?)
* Support for HomePods and other devices (maybe?)

----
Notes from previous version of the binding:

M src/main/java/org/openhab/binding/appletv/internal/AppleTVHandler.java => src/main/java/org/openhab/binding/appletv/internal/AppleTVHandler.java +6 -2
@@ 221,7 221,7 @@ public class AppleTVHandler extends BaseThingHandler {
    private void verifyPairing() {
        String[] args = makeArgs("atvremote", "mac");

        String result = pyATV.executeCommandLineAndWaitResponse(Duration.ofSeconds(7), args);
        String result = pyATV.executeCommandLineAndWaitResponse(Duration.ofSeconds(15), args);

        if (result == null || !result.trim().equalsIgnoreCase(getThing().getProperties().get(PROPERTY_MAC_ADDRESS))) {
            logger.info("Pairing failed: " + result);


@@ 350,13 350,16 @@ public class AppleTVHandler extends BaseThingHandler {
                    if ("complete".equals(jsonObject.get("state").getAsString())) {
                        airPlaying = false;
                        logger.info("Setting airPlaying to false");
                        if (true == true)
                            return true;
                        if (streamingFiles.size() > 1)
                            logger.warn("Have more than one streaming file in flight. We shouldn't really get here.");
                        for (Path path : streamingFiles) {
                            try {
                                logger.warn("Deleting " + path);
                                Files.deleteIfExists(path);
                            } catch (IOException e) {
                            } catch (Exception e) {
                                // TODO this may have undesirable effects on the scanner
                                logger.warn("Unable to remove streamed file path " + path.toString());
                                throw new RuntimeException(e);
                            }


@@ 914,6 917,7 @@ public class AppleTVHandler extends BaseThingHandler {
    public void stopPlaying(OnOffType on) {
        // if we're playing a url, stopping the command loop should cause this to stop playing.
        // perhaps it would be best to start a separate process just for the player.
        logger.error("STOP PLAYING CALLED");
        commandScanner.start();
    }
}

M src/main/java/org/openhab/binding/appletv/internal/PyATV.java => src/main/java/org/openhab/binding/appletv/internal/PyATV.java +252 -84
@@ 24,10 24,7 @@ import org.openhab.binding.appletv.internal.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonStreamParser;
import com.google.gson.*;

/**
 * The {@link PyATV} wraps the PyATV library


@@ 36,7 33,7 @@ import com.google.gson.JsonStreamParser;
 */
public class PyATV {

    private final Logger logger = LoggerFactory.getLogger(PyATV.class);
    private static final Logger logger = LoggerFactory.getLogger(PyATV.class);

    private ExecutorService executor;



@@ 146,7 143,7 @@ public class PyATV {

    <T> T exec(Class<T> returnType, String... args) {
        Gson g = getGson();
        String res = executeCommandLineAndWaitResponse(Duration.ofSeconds(10), args);
        String res = executeCommandLineAndWaitResponse(Duration.ofSeconds(15), args);
        if (res == null)
            throw new RuntimeException("Error executing " + Arrays.toString(args));
        return g.fromJson(res, returnType);


@@ 206,21 203,30 @@ public class PyATV {
        Future<String> outputFuture = null;
        cleanup: try {
            Process process = processTemp = new ProcessBuilder(commandLine).redirectErrorStream(true).start();

            logger.info("Started process " + process.pid());
            outputFuture = executor.submit(() -> {
                logger.info("Running " + Arrays.toString(commandLine));
                try (InputStream inputStream = process.getInputStream();
                        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                    StringWriter output = new StringWriter();
                InputStream inputStream;
                StringWriter output;
                try {
                    inputStream = process.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                    output = new StringWriter();
                    reader.transferTo(output);
                    return output.toString();
                } catch (IOException e) {
                    logger.warn("error reading command output", e);
                    closeStreams(process);
                    throw new RuntimeException(e);
                }
                closeStreams(process);
                return output.toString();
            });

            if (timeout == null) {
                process.waitFor();
            } else if (!process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                logger.warn("Timeout occurred when executing commandLine '{}'", Arrays.toString(commandLine));
                logger.warn("Timeout occurred when executing commandLine '{}', timeout={}",
                        Arrays.toString(commandLine), timeout.toMillis());
                break cleanup;
            }
            return outputFuture.get();


@@ 241,7 247,9 @@ public class PyATV {
            }
        }
        if (processTemp != null && processTemp.isAlive()) {
            logger.info("forcibly destroying " + processTemp.pid());
            processTemp.destroyForcibly();
            closeStreams(processTemp);
        }
        if (outputFuture != null) {
            outputFuture.cancel(true);


@@ 254,13 262,16 @@ public class PyATV {

        String name;
        Process process;
        Thread thread;
        Thread readThread;
        Thread writeThread;

        Function<JsonElement, Boolean> func;
        String[] args;

        // Queue<String> commandQueue = new ConcurrentLinkedQueue();

        boolean isStarted = false;

        private boolean keepRunning = true;

        public Scanner(String name, Function<JsonElement, Boolean> func, String[] args) {


@@ 269,6 280,12 @@ public class PyATV {
            this.func = func;
        }

        @Override
        public String toString() {
            return "Scanner{" + "name='" + name + '\'' + ", process=" + process + ", isStarted=" + isStarted
                    + ", expectedCommandLine='" + expectedCommandLine + '\'' + '}';
        }

        public boolean isRunning() {
            return (process != null && process.isAlive());
        }


@@ 277,51 294,46 @@ public class PyATV {
            sendCommand(s, true);
        }

        BlockingQueue<String> commandQueue = new LinkedBlockingDeque<>();

        public void sendCommand(String s, boolean canRetry) {
            s = s.trim();
            logger.info("sending command: " + s);
            // TODO
            // this is a pretty terrible approach, we should use condition variables or switch to having commands
            // read from a queue. Or something...
            if (process == null) {
                logger.info(name + ": Process is not running, waiting before sending.");
                try {
                    Thread.sleep(600);
                } catch (InterruptedException e) {
                    logger.info("Thread interrupted.", e);
                }
            }
            OutputStream os = new BufferedOutputStream(process.getOutputStream());
            try {
                os.write((s + "\n\n").getBytes(StandardCharsets.UTF_8));
                os.flush();
            } catch (Exception e) {
                logger.info(name + ": Got exception sending command", e);
                if (process == null || !process.isAlive()) {
                    logger.info(name + ": Restarting process.");
                    stop();
                    start();
                    if (canRetry)
                        sendCommand(s, false); // don't get stuck in a retry loop
                }
            if (commandQueue.size() > 5) {
                logger.error("commandQueue is getting too full. dropping this command.");
            } else {
                logger.info("queueing command: " + s);
                commandQueue.add(s);
            }
        }

        public void start() {
            if (thread != null && thread.isAlive())
            logger.info(name + ": start()");

            if ((readThread != null && readThread.isAlive()) || (writeThread != null && writeThread.isAlive()))
                stop();

            thread = new Thread(this::runThread, name);
            thread.start();
            logger.info(name + ": starting handler threads");

            isStarted = true;
            keepRunning = true;
            readThread = new Thread(this::runReaderThread, name);
            readThread.start();
            writeThread = new Thread(this::runWriterThread, name);
            writeThread.start();
        }

        void runThread() {
        void runReaderThread() {
            while (keepRunning == true) {
                logger.info(name + ": Scanner starting");
                executeCommandLineAndParseJson(func, args);
                logger.info(name + ": Scanner exited");
                if (!keepRunning)
                    return;
                try {
                    executeCommandLineAndParseJson(func, args);
                } catch (Throwable th) {
                    logger.error("Throwable from json response parser.", th);
                } finally {
                    closeStreams(process);
                }
                logger.info(name + ": Scanner pid=" + process.pid() + " exited.");

                // TODO should implement backoff in event of repeated failures.
                try {
                    Thread.sleep(500); // half a second


@@ 329,53 341,187 @@ public class PyATV {
                    logger.warn(name + ": Error thrown while sleeping", e);
                }
            }

            logger.info(name + ": runReaderThread exiting");
        }

        void runWriterThread() {

            try {
                while (keepRunning == true) {
                    logger.info(name + ": Commander starting");
                    executeCommand();
                    logger.info(name + ": Commander pid=" + process.pid() + " exited.");
                    if (!keepRunning) {
                        logger.info(name + ": Commander KeepRunning is false, so returning.");
                        return;
                    }
                    // TODO should implement backoff in event of repeated failures.
                    try {

                        Thread.sleep(500); // half a second
                    } catch (InterruptedException e) {
                        logger.warn(name + ": Commander Error thrown while sleeping", e);
                    }
                }
            } catch (Throwable th) {
                logger.error(name + ": Exception thrown in runWriterThread", th);
            } finally {
                closeStreams(process);
            }
        }

        OutputStream os;

        void executeCommand() {
            while (keepRunning) {
                logger.info(name + " polling for command");
                if (process == null || !process.isAlive()) {
                    logger.info("aieee");
                    if (process == null)
                        logger.warn(name + " process not running. waiting 500ms. " + this.toString());
                    else if (!process.isAlive())
                        logger.warn(
                                name + " process " + process.pid() + " not alive. waiting 500ms. " + this.toString());

                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        logger.info("Thread interrupted.", e);
                    }
                }
                String s = null;
                logger.info(name + " polling commandQueue");
                try {
                    s = commandQueue.poll(10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    logger.info(name + ": Attempt to read command queue was interrupted.");
                    if (!keepRunning) {
                        logger.info(name + ": exiting command poller");
                        closeStreams(process);
                        return;
                    }
                }
                logger.info(name + ": got command value " + s);
                if (s != null) {
                    logger.info(name + ": Sending command " + s);
                    if (process == null) {
                        logger.info(name + ": Process is not running, waiting before sending.");
                        try {
                            Thread.sleep(600);
                        } catch (InterruptedException e) {
                            logger.info("Thread interrupted.", e);
                            if (!keepRunning) {
                                logger.info(name + ": exiting command poller");
                                closeStreams(process);
                                return;
                            }
                        }
                    }

                    try {
                        os.write((s + "\n\n").getBytes(StandardCharsets.UTF_8));
                        os.flush();
                    } catch (Exception e) {
                        logger.info(name + ": Got exception sending command", e);
                        if (process == null || !process.isAlive()) {
                            logger.info(name + ": Process check failed.");
                            closeStreams(process);
                            if (!keepRunning) {
                                logger.info(name + ": exiting command poller");
                                return;
                            }
                        }
                    }

                }
            }
        }

        String expectedCommandLine;

        public void executeCommandLineAndParseJson(Function<JsonElement, Boolean> func, String... commandLine) {
            Process processTemp = null;
            cleanup: try {
                process = processTemp = new ProcessBuilder(commandLine).redirectErrorStream(true).start();
            do {
                Process processTemp = null;
                InputStream inputStream = null;
                cleanup: try {
                    expectedCommandLine = StringUtils.join(commandLine, " ");
                    processTemp = new ProcessBuilder(commandLine).redirectErrorStream(true).start();
                    process = processTemp;
                    logger.info("Started process " + process.pid());
                    logger.info("Process " + process.pid() + " isRunning=" + process.isAlive());
                    logger.info(name + ": Running " + expectedCommandLine);

                    os = new BufferedOutputStream(process.getOutputStream());
                    inputStream = process.getInputStream();

                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

                logger.info(name + ": Running " + StringUtils.join(commandLine, " "));
                try (InputStream inputStream = process.getInputStream();
                        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                    Gson gson = new GsonBuilder().create();
                    JsonStreamParser p = new JsonStreamParser(reader);

                    while (p.hasNext()) {
                        JsonElement e = p.next();
                        if (e.isJsonObject()) {
                            try {
                                Boolean b = func.apply(e);
                                if (!b) {
                                    break;
                    while (keepRunning && processTemp != null && processTemp.isAlive()) {
                        // we should maybe attempt to read more than one object
                        try {
                            logger.info(name + " looking for json");
                            JsonElement e;
                            if (p.hasNext()) {
                                e = p.next();
                                logger.info(name + ": got some json");
                                if (e.isJsonObject()) {
                                    try {
                                        Boolean b = func.apply(e);
                                        if (!b) {
                                            logger.info(name + ": response handler returned false.");
                                            break;
                                        }
                                    } catch (Throwable th) {
                                        logger.info(name + ": Exception occurred while running callback.", th);
                                    }
                                }
                            } catch (Throwable th) {
                                logger.info(name + ": Exception occurred while running callback.", th);
                                /* handle other JSON data structures */
                            }
                        } catch (JsonIOException e) {
                            if (keepRunning == false)
                                ; // do nothing, we expect this when shutting down.
                            else
                                logger.warn(name + " received exception from subprocess.", e);
                        }
                        /* handle other JSON data structures */
                    }
                    logger.info(name + ": No more JSON");
                } catch (Throwable e) {
                    logger.warn(name + ": Error occurred in '{}'", StringUtils.join(commandLine, " "), e);
                }

            } catch (Throwable e) {
                logger.warn(name + ": Failed to execute commandLine '{}'", StringUtils.join(commandLine, " "), e);
            }
                logger.info(name + ": Command process=" + process.pid() + " exited.");

            logger.info(name + ": Command exited.");
                if (processTemp != null) {
                    logger.info("Process " + processTemp.pid() + " exists.");
                    if (processTemp.isAlive()) {
                        logger.info("destroying process " + processTemp.pid() + " forcibly.");
                        processTemp.destroyForcibly();
                    }
                }

            if (processTemp != null && processTemp.isAlive()) {
                processTemp.destroyForcibly();
            }
                logger.info("process clear: " + process.info());

            process = null;
                closeStreams(process);

            return;
                process = null;
            } while (keepRunning);

            logger.info("returning because !keepRunning");
        }

        public void stop() {
            logger.info(name + ": stop()");
            keepRunning = false;
            isStarted = false;

            if (readThread != null)
                readThread.interrupt();
            if (writeThread != null)
                writeThread.interrupt();

            if (process == null) {
                logger.info(name + ": Scanner not running, will not attempt to stop.");


@@ 384,25 530,47 @@ public class PyATV {

            logger.info(name + ": Sending exit command to scanner process.");

            OutputStream os = process.getOutputStream();
            try {
                os.write("\r\n".getBytes());
                os.flush();
            } catch (IOException e) {
                logger.info(name + ": Got exception ending scanner process", e);
            if (os != null) {
                try {
                    os.write("\r\n".getBytes());
                    os.flush();
                    os.close();
                } catch (IOException e) {
                    logger.info(name + ": Got exception ending scanner process", e);
                }
            }

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                logger.info(name + ": Got exception waiting for scanner process to end", e);
                if (logger != null)
                    logger.info(name + ": Got exception waiting for scanner process to end", e);
            }
            process.destroyForcibly();
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                logger.info(name + ": Got exception waiting for scanner process to end", e);
            if (logger != null && process != null) {
                logger.info("forcibly destroying " + process.pid());

                process.destroyForcibly();
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    if (logger != null)
                        logger.info(name + ": Got exception waiting for scanner process to end", e);
                }
                closeStreams(process);
                process = null;
            }
            process = null;
        }
    }

    static void closeStreams(Process p) {
        if (p == null)
            return;
        try {
            p.getInputStream().close();
            p.getOutputStream().close();
            p.getErrorStream().close();
        } catch (IOException ioException) {
            logger.warn("Error closing streams for process {}.", p.info(), ioException);
        }
    }
}