~hedgepigdaniel/dewobble

27d9311920472b15007cbd1859d1e8a0e6bda750 — Daniel Playfair Cal 2 years ago e4db608
fix: also recycle output frames
M README.md => README.md +22 -4
@@ 98,13 98,22 @@ dewobble::FilterThreaded filter(
);

while (...) {
    cl_mem input_frame = filter.get_input_frame_buffer();

    // ... put data in input frame

    filter.push_frame(input_frame);
}
filter.end_input();

while (filter.frame_ready()) {
    cl_mem output_frame = NULL;
    frame = filter.pull_frame(&output_frame, NULL);
    cl_mem output_frame = NULL, input_frame;
    filter.pull_frame(&output_frame, &input_frame, NULL);

    // ... retrieve data from output frame

    filter.release_input_frame_buffer(&input_frame);
    filter.release_output_frame_buffer(&output_frame);
}

```


@@ 150,13 159,22 @@ dewobble_filter_config_set_opencl_context(config, device);
filter = dewobble_filter_create_threaded(config);

while (...) {
    cl_mem input_frame = dewobble_filter_get_input_frame_buffer(filter);

    // ... put data in input frame

    dewobble_filter_push_frame(filter, input_frame, NULL);
}
dewobble_filter_end_input(filter);

while (dewobble_filter_frame_ready(filter)) {
    cl_mem output_frame = NULL;
    frame = dewobble_filter_pull_frame(&output_frame, NULL);
    cl_mem output_frame = NULL, input_frame;
    frame = dewobble_filter_pull_frame(filter, &output_frame, &input_frame, NULL);

    // ... retrieve data from output frame

    dewobble_filter_release_output_frame_buffer(filter, &output_frame);
    dewobble_filter_release_input_frame_buffer(filter, &input_frame);
}
```


M include/c_bindings/filter.h => include/c_bindings/filter.h +16 -3
@@ 32,13 32,25 @@ cl_mem dewobble_filter_get_input_frame_buffer(
    cl_int *errcode_ret);

/**
 * Release an output frame buffer previously returned from @ref
 * dewobble_filter_get_input_frame_buffer. The buffer may not necessarily be
 * released immediately, and may be reused for subsequent frames with @ref
 * dewobble_filter_get_input_frame_buffer.
 * @param input_buffer pointer to the buffer to release. Will be set
 * to NULL.
 */
void dewobble_filter_release_input_frame_buffer(
    DewobbleFilter c_filter,
    cl_mem *input_buffer);

/**
 * Push an input frame into the filter
 * @param input_buffer OpenCL buffer containing input frame in NV12 format
 * @param extra Opaque pointer to extra data for this frame
 */
int dewobble_filter_push_frame(
    DewobbleFilter c_filter,
    cl_mem *input_buffer,
    cl_mem input_buffer,
    void *extra);

/**


@@ 55,13 67,14 @@ int dewobble_filter_frame_ready(DewobbleFilter c_filter);

/**
 * Pull an output frame from the filter
 * @param output_buffer will be set to the OpenCL buffer containing the output
 * frame
 * @param output_buffer Pointer to an OpenCL buffer which will be set to point
 * to the output frame
 * @param extra will be set to the opaque pointer to extra data about this frame
 */
int dewobble_filter_pull_frame(
    DewobbleFilter c_filter,
    cl_mem *output_buffer,
    cl_mem *input_buffer,
    void **extra);

/**

M include/filter_base.hpp => include/filter_base.hpp +50 -22
@@ 17,28 17,53 @@ namespace dewobble
class FilterBase
{
    FilterConfig m_config;
    std::queue<cl_mem> m_preallocated_buffers;
    std::queue<cl_mem> m_preallocated_input_buffers;
    std::queue<cl_mem> m_preallocated_output_buffers;
    cl_mem get_frame_buffer(
        std::queue<cl_mem> &preallocated_buffers,
        cl_mem_flags flags);
    void free_preallocated_buffers(std::queue<cl_mem> &preallocated_buffers);
    void release_frame_buffer(
        std::queue<cl_mem> &preallocated_buffers,
        cl_mem *output_buffer);

  protected:
    /**
     * Get an OpenCL buffer usable with @ref pull_frame. The buffer should be
     * freed using @ref release_output_frame_buffer.
     * @return the buffer
     */
    cl_mem get_output_frame_buffer();

  public:
    FilterBase(FilterConfig config);
    virtual ~FilterBase();
    /**
     * Get an OpenCL buffer to be used for an input frame. As much as
     * possible, the buffers previously released by @ref
     * release_output_frame_buffer are used instead of allocating new buffers.
     * Get an OpenCL buffer usable by @ref input_frame. The buffer should be
     * freed using @ref release_input_frame_buffer
     * @return the buffer
     */
    cl_mem get_input_frame_buffer();

    /**
     * Release an OpenCL buffer that is usable with @ref input_frame. As a
     * performance optimization, the same buffer may be returned again by @ref
     * get_input_frame_buffer
     * @param input_buffer a pointer to the buffer, which will be set to NULL
     */
    void release_input_frame_buffer(cl_mem *input_buffer);

    /**
     * Push an input frame into the filter
     * @param input_buffer Pointer to an OpenCL buffer containing input
     * frame in NV12 format. The filter takes ownership of the buffer and the
     * pointer will be set to NULL.
     * @param extra Opaque pointer to extra data for this frame. The caller
     * retains ownership of extra.
     * @param input_buffer An OpenCL buffer containing input
     * frame in NV12 format. The buffer should be readable from within an OpenCL
     * kernel (e.g. CL_MEM_READ_ONLY). The contents will not be modified, but
     * must remain accessible until the same buffer is returned from @ref
     * pull_frame.
     * @param extra Opaque pointer to related data for this frame which can be
     * retrieved when the frame is output in @ref push_frame.
     */
    virtual void push_frame(cl_mem *input_buffer, void *extra) = 0;
    virtual void push_frame(cl_mem input_buffer, void *extra) = 0;

    /**
     * Notify the filter that there will not be more input frames (and therefore


@@ 53,22 78,25 @@ class FilterBase
    virtual bool frame_ready() = 0;

    /**
     * Pull an output frame from the filter
     * @param output_buffer pointer to be set to the OpenCl buffer containing
     * the output frame
     * @param extra will be set to the opaque pointer to extra data about this
     * frame
     */
    virtual void pull_frame(cl_mem *output_buffer, void **extra) = 0;

    /**
     * Release an output frame buffer previously returned from pull_frame.
     * The buffer may not necessarily be released immediately, and may be
     * reused for subsequent frames with @ref get_input_frame_buffer
     * Release an output frame buffer previously returned from @ref pull_frame.
     * The buffer may not necessarily be freed immediately, and may be later
     * returned again from @ref pull_frame
     * @param output_buffer pointer to the buffer to release. Will be set
     * to NULL.
     */
    void release_output_frame_buffer(cl_mem *output_buffer);

    /**
     * Pull an output frame from the filter
     * @param output_buffer will be set to point to a buffer containing the
     * output frame
     * @param input_buffer Will be set to point to the input buffer for this
     * frame that was passed into @ref push_frame
     * @param extra will be set to the opaque pointer to extra data about this
     * frame
     */
    virtual void
    pull_frame(cl_mem *output_buffer, cl_mem *input_buffer, void **extra) = 0;
};
} // namespace dewobble


M include/filter_sync.hpp => include/filter_sync.hpp +4 -2
@@ 56,13 56,15 @@ class FilterSync : public FilterBase
  public:
    FilterSync(FilterConfig config);

    void push_frame(cl_mem *input_buffer, void *extra);
    void push_frame(cl_mem input_buffer, void *extra);

    void end_input();

    bool frame_ready();

    void pull_frame(cl_mem *output_buffer, void **extra);
    void pull_frame(cl_mem *output_buffer, cl_mem *input_buffer, void **extra);

    cl_mem get_output_frame_buffer();
};
} // namespace dewobble


M include/filter_threaded.hpp => include/filter_threaded.hpp +37 -9
@@ 22,12 22,21 @@ class MessageInit
    MessageInit(FilterConfig filter_config) : filter_config(filter_config) {}
};

class MessageFrame
class MessageInputFrame
{
  public:
    const cl_mem buffer;
    void *extra;
    MessageFrame(cl_mem buffer, void *extra) : buffer(buffer), extra(extra) {}
    MessageInputFrame(cl_mem buffer, void *extra) : buffer(buffer), extra(extra)
    {
    }
};

class MessageReleaseOutputFrameBuffer
{
  public:
    const cl_mem buffer;
    MessageReleaseOutputFrameBuffer(cl_mem buffer) : buffer(buffer) {}
};

class MessageEndInput


@@ 42,10 51,26 @@ class MessageAcknowledgeFrame
{
};

typedef std::
    variant<MessageInit, MessageFrame, MessageEndInput, MessageTerminate>
        MessageInput;
typedef std::variant<MessageFrame, MessageAcknowledgeFrame> MessageOutput;
class MessageOutputFrame
{
  public:
    const cl_mem output_buffer;
    const cl_mem input_buffer;
    void *extra;
    MessageOutputFrame(cl_mem output_buffer, cl_mem input_buffer, void *extra)
        : output_buffer(output_buffer), input_buffer(input_buffer), extra(extra)
    {
    }
};

typedef std::variant<
    MessageInit,
    MessageInputFrame,
    MessageReleaseOutputFrameBuffer,
    MessageEndInput,
    MessageTerminate>
    MessageInput;
typedef std::variant<MessageOutputFrame, MessageAcknowledgeFrame> MessageOutput;

/**
 * A filter which works synchronously but delegates all processing to a


@@ 61,20 86,23 @@ class FilterThreaded : public FilterBase
    int m_num_unacknowledged_frames = 0;
    int m_num_frames_in_flight = 0;
    std::queue<void *> m_extra_queue;
    std::queue<cl_mem> m_buffer_queue;
    std::queue<cl_mem> m_output_buffer_queue;
    std::queue<cl_mem> m_input_buffer_queue;
    void read_from_worker();

  public:
    ~FilterThreaded();
    FilterThreaded(FilterConfig filter_config);

    void push_frame(cl_mem *input_buffer, void *extra);
    void push_frame(cl_mem input_buffer, void *extra);

    void end_input();

    bool frame_ready();

    void pull_frame(cl_mem *output_buffer, void **extra);
    void pull_frame(cl_mem *output_buffer, cl_mem *input_buffer, void **extra);

    void release_output_frame_buffer(cl_mem *output_buffer);
};
} // namespace dewobble


M src/c_bindings/filter.cpp => src/c_bindings/filter.cpp +13 -2
@@ 66,9 66,19 @@ cl_mem dewobble_filter_get_input_frame_buffer(
    return buffer;
}

void dewobble_filter_release_input_frame_buffer(
    DewobbleFilter c_filter,
    cl_mem *input_buffer)
{
    shared_ptr<dewobble::FilterBase> filter =
        *((shared_ptr<dewobble::FilterBase> *)c_filter);

    filter->release_input_frame_buffer(input_buffer);
}

int dewobble_filter_push_frame(
    DewobbleFilter c_filter,
    cl_mem *input_buffer,
    cl_mem input_buffer,
    void *extra)
{
    shared_ptr<dewobble::FilterBase> filter =


@@ 105,13 115,14 @@ int dewobble_filter_frame_ready(DewobbleFilter c_filter)
int dewobble_filter_pull_frame(
    DewobbleFilter c_filter,
    cl_mem *output_buffer,
    cl_mem *input_buffer,
    void **extra)
{
    shared_ptr<dewobble::FilterBase> filter =
        *((shared_ptr<dewobble::FilterBase> *)c_filter);
    int err = 0;
    try {
        filter->pull_frame(output_buffer, extra);
        filter->pull_frame(output_buffer, input_buffer, extra);
    } catch (...) {
        err = -1;
    }

M src/filter/filter_base.cpp => src/filter/filter_base.cpp +42 -11
@@ 11,25 11,34 @@ FilterBase::FilterBase(FilterConfig config) : m_config(config) {}

FilterBase::~FilterBase()
{
    while (m_preallocated_buffers.size()) {
        cl_mem buffer = m_preallocated_buffers.front();
    free_preallocated_buffers(m_preallocated_input_buffers);
    free_preallocated_buffers(m_preallocated_output_buffers);
}

void FilterBase::free_preallocated_buffers(
    std::queue<cl_mem> &preallocated_buffers)
{
    while (preallocated_buffers.size()) {
        cl_mem buffer = preallocated_buffers.front();
        clReleaseMemObject(buffer);
        m_preallocated_buffers.pop();
        preallocated_buffers.pop();
    }
}

cl_mem FilterBase::get_input_frame_buffer()
cl_mem FilterBase::get_frame_buffer(
    std::queue<cl_mem> &preallocated_buffers,
    cl_mem_flags flags)
{
    cl_mem buffer;
    if (m_preallocated_buffers.size()) {
        buffer = m_preallocated_buffers.front();
        m_preallocated_buffers.pop();
    if (preallocated_buffers.size()) {
        buffer = preallocated_buffers.front();
        preallocated_buffers.pop();
        return buffer;
    }
    cl_int err;
    buffer = clCreateBuffer(
        m_config.get_opencl_context(),
        CL_MEM_READ_ONLY,
        flags,
        m_config.get_input_camera().m_width *
            m_config.get_input_camera().m_height * 3 / 2,
        NULL,


@@ 41,12 50,34 @@ cl_mem FilterBase::get_input_frame_buffer()
    return buffer;
}

void FilterBase::release_output_frame_buffer(cl_mem *output_buffer)
cl_mem FilterBase::get_output_frame_buffer()
{
    if (m_preallocated_buffers.size() < MAX_BUFFERS_TO_KEEP) {
        m_preallocated_buffers.push(*output_buffer);
    return get_frame_buffer(m_preallocated_output_buffers, CL_MEM_WRITE_ONLY);
}

cl_mem FilterBase::get_input_frame_buffer()
{
    return get_frame_buffer(m_preallocated_input_buffers, CL_MEM_READ_ONLY);
}

void FilterBase::release_frame_buffer(
    std::queue<cl_mem> &preallocated_buffers,
    cl_mem *output_buffer)
{
    if (preallocated_buffers.size() < MAX_BUFFERS_TO_KEEP) {
        preallocated_buffers.push(*output_buffer);
    } else {
        clReleaseMemObject(*output_buffer);
    }
    *output_buffer = NULL;
}

void FilterBase::release_output_frame_buffer(cl_mem *output_buffer)
{
    release_frame_buffer(m_preallocated_output_buffers, output_buffer);
}

void FilterBase::release_input_frame_buffer(cl_mem *input_buffer)
{
    release_frame_buffer(m_preallocated_input_buffers, input_buffer);
}
\ No newline at end of file

M src/filter/filter_sync.cpp => src/filter/filter_sync.cpp +17 -16
@@ 341,18 341,17 @@ void FilterSync::add_debug_info(UMat output, StabilizedFrame frame)
        2);
}

void FilterSync::push_frame(cl_mem *input_buffer, void *extra)
void FilterSync::push_frame(cl_mem input_buffer, void *extra)
{
    UMat input_frame;
    ocl::convertFromBuffer(
        *input_buffer,
        input_buffer,
        m_config.get_input_camera().m_width,
        m_config.get_input_camera().m_height * 3 / 2,
        m_config.get_input_camera().m_width,
        CV_8U,
        input_frame);
    Frame frame(input_frame, extra);
    *input_buffer = NULL;

    m_config.get_stabilizer()->push_frame(frame);
    flush_frames();


@@ 366,7 365,10 @@ void FilterSync::end_input()

bool FilterSync::frame_ready() { return m_buffered_frames.size() > 0; }

void FilterSync::pull_frame(cl_mem *output_buffer, void **extra)
void FilterSync::pull_frame(
    cl_mem *output_buffer,
    cl_mem *input_buffer,
    void **extra)
{
    StabilizedFrame frame = m_buffered_frames.front();
    m_buffered_frames.pop();


@@ 381,28 383,27 @@ void FilterSync::pull_frame(cl_mem *output_buffer, void **extra)

    ocl::OpenCLExecutionContext context =
        ocl::OpenCLExecutionContext::getCurrentRef();
    cl_mem buffer = clCreateBuffer(
        (cl_context)context.getContext().ptr(),
        CL_MEM_WRITE_ONLY,
        m_config.get_output_camera().m_width *
            m_config.get_output_camera().m_height * 3 / 2,
        NULL,
        NULL);
    if (buffer == NULL) {
        throw -1;
    }

    UMat output_frame_yuv;
    if (*output_buffer == NULL) {
        *output_buffer = get_output_frame_buffer();
    }
    ocl::convertFromBuffer(
        buffer,
        *output_buffer,
        m_config.get_output_camera().m_width,
        m_config.get_output_camera().m_height * 3 / 2,
        m_config.get_output_camera().m_width,
        CV_8U,
        output_frame_yuv);
    cvtColorBgrToNv12(m_warped_frame, output_frame_yuv);
    *output_buffer = buffer;
    if (extra != NULL) {
        *extra = frame.extra;
    }

    *input_buffer = (cl_mem)frame.frame.handle(ACCESS_READ);
}

cl_mem FilterSync::get_output_frame_buffer()
{
    return FilterBase::get_output_frame_buffer();
}
\ No newline at end of file

M src/filter/filter_threaded.cpp => src/filter/filter_threaded.cpp +41 -15
@@ 1,5 1,7 @@
#include "filter_threaded.hpp"

#include <iostream>

#include "filter_sync.hpp"

using namespace dewobble;


@@ 9,11 11,14 @@ using namespace std;
void flush_frames(SafeQueue<MessageOutput> &output_queue, FilterSync &filter)
{
    while (filter.frame_ready()) {
        cl_mem buffer;
        cl_mem output_buffer = NULL, input_buffer;
        void *extra;
        filter.pull_frame(&buffer, &extra);
        filter.pull_frame(&output_buffer, &input_buffer, &extra);
        output_queue.push(make_unique<MessageOutput>(
            in_place_type<MessageFrame>, buffer, extra));
            in_place_type<MessageOutputFrame>,
            output_buffer,
            input_buffer,
            extra));
    }
    output_queue.push(
        make_unique<MessageOutput>(in_place_type<MessageAcknowledgeFrame>));


@@ 30,10 35,13 @@ void worker_thread(

    while (true) {
        auto message = *input_queue.pop();
        if (auto request = get_if<MessageFrame>(&message)) {
            cl_mem buffer = request->buffer;
            filter.push_frame(&buffer, request->extra);
        if (auto request = get_if<MessageInputFrame>(&message)) {
            filter.push_frame(request->buffer, request->extra);
            flush_frames(output_queue, filter);
        } else if (
            auto request = get_if<MessageReleaseOutputFrameBuffer>(&message)) {
            cl_mem buffer = request->buffer;
            filter.release_output_frame_buffer(&buffer);
        } else if (holds_alternative<MessageEndInput>(message)) {
            filter.end_input();
            flush_frames(output_queue, filter);


@@ 58,11 66,10 @@ FilterThreaded::~FilterThreaded()
    m_worker_thread.join();
}

void FilterThreaded::push_frame(cl_mem *input_buffer, void *extra)
void FilterThreaded::push_frame(cl_mem input_buffer, void *extra)
{
    m_input_queue.push(make_unique<MessageInput>(
        in_place_type<MessageFrame>, *input_buffer, extra));
    *input_buffer = NULL;
        in_place_type<MessageInputFrame>, input_buffer, extra));
    ++m_num_unacknowledged_frames;
    ++m_num_frames_in_flight;
}


@@ 77,16 84,34 @@ void FilterThreaded::end_input()
bool FilterThreaded::frame_ready()
{
    read_from_worker();
    return m_buffer_queue.size() > 0;
    return m_output_buffer_queue.size() > 0;
}

void FilterThreaded::pull_frame(cl_mem *output_buffer, void **extra)
void FilterThreaded::pull_frame(
    cl_mem *output_buffer,
    cl_mem *input_buffer,
    void **extra)
{
    read_from_worker();
    *output_buffer = m_buffer_queue.front();
    if (*output_buffer != NULL) {
        cerr << "FilterThreaded::pull_frame(): *output_buffer must be NULL!"
             << endl;
        throw;
    }
    *output_buffer = m_output_buffer_queue.front();
    m_output_buffer_queue.pop();

    *extra = m_extra_queue.front();
    m_buffer_queue.pop();
    m_extra_queue.pop();
    *input_buffer = m_input_buffer_queue.front();
    m_input_buffer_queue.pop();
}

void FilterThreaded::release_output_frame_buffer(cl_mem *output_buffer)
{
    m_input_queue.push(make_unique<MessageInput>(
        in_place_type<MessageReleaseOutputFrameBuffer>, *output_buffer));
    *output_buffer = NULL;
}

void FilterThreaded::read_from_worker()


@@ 97,8 122,9 @@ void FilterThreaded::read_from_worker()
        if (holds_alternative<MessageAcknowledgeFrame>(message)) {
            --m_num_unacknowledged_frames;
            continue;
        } else if (auto frame_message = get_if<MessageFrame>(&message)) {
            m_buffer_queue.push(frame_message->buffer);
        } else if (auto frame_message = get_if<MessageOutputFrame>(&message)) {
            m_output_buffer_queue.push(frame_message->output_buffer);
            m_input_buffer_queue.push(frame_message->input_buffer);
            m_extra_queue.push(frame_message->extra);
            --m_num_frames_in_flight;
        }