~alextee/zrythm

0e677b84b22603f98ef3ba200389270ea7e354da — Alexandros Theodotou 3 years ago d48de9f concurrent_queue
benchmark concurrent queue

https://github.com/cameron314/concurrentqueue
A inc/utils/concurrent_queue.h => inc/utils/concurrent_queue.h +99 -0
@@ 0,0 1,99 @@
/*
 * Copyright (C) 2021 Alexandros Theodotou <alex at zrythm dot org>
 *
 * This file is part of Zrythm
 *
 * Zrythm is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Zrythm is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with Zrythm.  If not, see <https://www.gnu.org/licenses/>.
 *
 * This file incorporates work covered by the following copyright and
 * permission notice:
 *
 * Simplified BSD License:
 *
 * Copyright (c) 2013-2016, Cameron Desrochers. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
 *
 *Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * \file
 *
 * Multiple Producer Multiple Consumer lock-free
 * queue.
 */

#ifndef __UTILS_CONCURRENT_QUEUE_H__
#define __UTILS_CONCURRENT_QUEUE_H__

#include <stdbool.h>
#include <stdlib.h>

#ifdef __cplusplus
extern "C" {
#endif

/**
 * @addtogroup utils
 *
 * @{
 */

/**
 * @param num_elements Estimated number of elements.
 */
NONNULL
bool
concurrent_queue_create (
  void ** handle,
  size_t  num_elements);

NONNULL
bool
concurrent_queue_destroy (
  void * handle);

NONNULL
bool
concurrent_queue_enqueue (
  void * handle,
  void * value);

NONNULL
HOT
bool
concurrent_queue_try_enqueue (
  void * handle,
  void * value);

NONNULL
HOT
bool
concurrent_queue_try_dequeue (
  void * handle,
  void ** value);

/**
 * @}
 */

#ifdef __cplusplus
}
#endif

#endif

M meson.build => meson.build +14 -1
@@ 37,7 37,10 @@ gnome = import ('gnome')
fs = import ('fs')
cmake = import ('cmake')
cmake_opts = cmake.subproject_options ()
cmake_opts.add_cmake_defines ({'CMAKE_POSITION_INDEPENDENT_CODE': 'ON'})
cmake_opts.add_cmake_defines ({
  'CMAKE_POSITION_INDEPENDENT_CODE': 'ON',
  #'CMAKE_BUILD_TYPE': 'Release',
  })

prefix = get_option ('prefix')
bindir = prefix / get_option ('bindir')


@@ 1054,6 1057,15 @@ if not libbacktrace_dep.found ()
endif
cdata.set ('HAVE_LIBBACKTRACE', 1)

concurrentqueue_dep = []
if not cc.has_header('concurrentqueue/concurrentqueue.h')
  concurrentqueue_subproject = cmake.subproject (
    'concurrentqueue', options: cmake_opts)
  concurrentqueue_dep = concurrentqueue_subproject.dependency (
    'concurrentqueue')
  cdata.set ('CONCURRENT_QUEUE_SUBPROJECT', 1)
endif

# TODO add Cantarell font as dependency

zrythm_deps = [


@@ 1091,6 1103,7 @@ zrythm_deps = [
  lsp_dsp_dep,
  valgrind_dep,
  libbacktrace_dep,
  concurrentqueue_dep,

  libm,
]

A src/utils/concurrent_queue.cpp => src/utils/concurrent_queue.cpp +114 -0
@@ 0,0 1,114 @@
/*
 * Copyright (C) 2021 Alexandros Theodotou <alex at zrythm dot org>
 *
 * This file is part of Zrythm
 *
 * Zrythm is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Zrythm is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with Zrythm.  If not, see <https://www.gnu.org/licenses/>.
 *
 * This file incorporates work covered by the following copyright and
 * permission notice:
 *
 * Simplified BSD License:
 *
 * Copyright (c) 2013-2016, Cameron Desrochers. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
 *
 *Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "zrythm-config.h"

#include "utils/concurrent_queue.h"

#ifdef CONCURRENT_QUEUE_SUBPROJECT
#include <concurrentqueue.h>
#else
#include <concurrentqueue/concurrentqueue.h>
#endif

struct traits : public moodycamel::ConcurrentQueueDefaultTraits
{
	// Use a slightly larger default block size; the default offers
	// a good trade off between speed and memory usage, but a bigger
	// block size will improve throughput (which is mostly what
	// we're after with these benchmarks).
	static const size_t BLOCK_SIZE = 128;
};

typedef moodycamel::ConcurrentQueue<void*, traits> MoodycamelCQType, *MoodycamelCQPtr;

extern "C" {

static size_t
power_of_two_size (
  size_t sz)
{
  int32_t power_of_two;
  for (power_of_two = 1;
       1U << power_of_two < sz; ++power_of_two);
  return 1U << power_of_two;
}

bool
concurrent_queue_create (
  void ** handle,
  size_t  num_elements)
{
  num_elements = power_of_two_size (num_elements);
  MoodycamelCQPtr retval =
    new MoodycamelCQType (num_elements);
  if (retval == nullptr) {
    return false;
  }
  *handle = retval;
  return true;
}

bool
concurrent_queue_destroy (
  void * handle)
{
  delete reinterpret_cast<MoodycamelCQPtr>(handle);
  return true;
}

bool
concurrent_queue_enqueue (
  void * handle,
  void * value)
{
  return reinterpret_cast<MoodycamelCQPtr>(handle)->enqueue(value);
}

bool
concurrent_queue_try_enqueue (
  void * handle,
  void * value)
{
  return reinterpret_cast<MoodycamelCQPtr>(handle)->try_enqueue(value);
}

bool
concurrent_queue_try_dequeue (
  void * handle,
  void ** value)
{
  return reinterpret_cast<MoodycamelCQPtr>(handle)->try_dequeue(*value);
}

} /* extern "C" */

M src/utils/meson.build => src/utils/meson.build +1 -0
@@ 21,6 21,7 @@ util_srcs = [
  'backtrace.c',
  'cairo.c',
  'color.c',
  'concurrent_queue.cpp',
  'cpu_windows.cpp',
  'curl.c',
  'datetime.c',

A subprojects/concurrentqueue.wrap => subprojects/concurrentqueue.wrap +3 -0
@@ 0,0 1,3 @@
[wrap-git]
url = https://github.com/cameron314/concurrentqueue
revision = v1.0.3

A tests/benchmarks/queues.c => tests/benchmarks/queues.c +266 -0
@@ 0,0 1,266 @@
/*
 * Copyright (C) 2021 Alexandros Theodotou <alex at zrythm dot org>
 *
 * This file is part of Zrythm
 *
 * Zrythm is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Zrythm is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with Zrythm.  If not, see <https://www.gnu.org/licenses/>.
 */

#include "zrythm-test-config.h"

#include "utils/mpmc_queue.h"
#include "utils/concurrent_queue.h"
#include "zrythm.h"

#include "tests/helpers/plugin_manager.h"
#include "tests/helpers/project.h"
#include "tests/helpers/zrythm.h"

typedef struct DspBenchmark
{
  /* function called */
  const char * func_name;
  /* microseconds taken */
  long         concurrent_usec;
  long         mpmc_usec;
} DspBenchmark;

static DspBenchmark benchmarks[400];
static int num_benchmarks = 0;
static size_t num_nodes = 40000;
static MPMCQueue * mpmc_queue;
static void * concurrent_queue;
static bool finished_enqueuing = false;

static DspBenchmark *
benchmark_find (
  const char * func_name)
{
  for (int i = 0; i < num_benchmarks; i++)
    {
      DspBenchmark * benchmark = &benchmarks[i];
      if (string_is_equal (
            benchmark->func_name, func_name))
        {
          return benchmark;
        }
    }
  return NULL;
}

static void *
benchmark_thread (void * arg)
{
  bool concurrent = *((bool *) arg);

  const char * fname = "thread";
  DspBenchmark * benchmark = benchmark_find (fname);
  if (!benchmark)
    {
      benchmark = &benchmarks[num_benchmarks];
      num_benchmarks++;
    }
  benchmark->func_name = fname;

  /*g_message ("start thread");*/

  gint64 start = g_get_monotonic_time ();

  void * el;
  if (concurrent)
    {
      while (concurrent_queue_try_dequeue (
               concurrent_queue, &el) ||
             !finished_enqueuing)
        {
          g_usleep (1);
        }
    }
  else
    {
      while (mpmc_queue_dequeue (
               mpmc_queue, &el) ||
             !finished_enqueuing)
        {
          g_usleep (1);
        }
    }

  gint64 end = g_get_monotonic_time ();

  /*g_message ("end thread");*/
  if (concurrent)
    {
      benchmark->concurrent_usec = end - start;
      g_warn_if_fail (end > start);
    }
  else
    {
      benchmark->mpmc_usec = end - start;
      g_warn_if_fail (end > start);
    }

  return 0;
}

static void
_test_enqueue_dequeue (
  bool concurrent)
{
  /*test_helper_zrythm_init ();*/

  int num_threads = 16;
  pthread_t threads[num_threads];
  memset (
    threads, 0,
    (size_t) num_threads * sizeof (pthread_t));
  void * nodes[num_nodes];

  mpmc_queue = mpmc_queue_new ();
  mpmc_queue_reserve (mpmc_queue, num_nodes);
  bool ret =
    concurrent_queue_create (
      &concurrent_queue, num_nodes);
  g_assert_true (ret);

  finished_enqueuing = false;

  for (int i = 0; i < num_threads; i++)
    {
      int iret =
        pthread_create (
          &threads[i], NULL, &benchmark_thread,
          &concurrent);
      if (iret)
        {
          g_message ("%s", strerror (iret));
        }
      g_assert_cmpint (iret, ==, 0);
    }

  /*DspBenchmark * benchmark;*/
  /*gint64 start, end;*/

  if (concurrent)
    {
      for (size_t i = 0; i < num_nodes; i++)
        {
          g_assert_true (
            concurrent_queue_try_enqueue (
              concurrent_queue, &nodes[i]));
          if (i % 6 == 0)
            g_usleep (1);
        }
    }
  else
    {
      for (size_t i = 0; i < num_nodes; i++)
        {
          g_assert_true (
            mpmc_queue_push_back (
              mpmc_queue, &nodes[i]));
          if (i % 6 == 0)
            g_usleep (1);
        }
    }

  g_message ("finished enqueueing");
  finished_enqueuing = true;

  for (int i = 0; i < num_threads; i++)
    {
      g_assert_false (
        pthread_join (threads[i], NULL));
    }

  mpmc_queue_free (mpmc_queue);
  concurrent_queue_destroy (concurrent_queue);

  /*test_helper_zrythm_cleanup ();*/
}

static void
test_enqueue_dequeue ()
{
  DspBenchmark * benchmark;
  gint64 start, end;

#define LOOP_START \
  start = g_get_monotonic_time (); \
  for (size_t i = 0; i < 3; i++) \
    {

#define LOOP_END(fname,is_concurrent) \
    } \
  end = g_get_monotonic_time (); \
  benchmark = benchmark_find (fname); \
  if (!benchmark) \
    { \
      benchmark = &benchmarks[num_benchmarks]; \
      num_benchmarks++; \
    } \
  benchmark->func_name = fname; \
  if (is_concurrent) \
    { \
      benchmark->concurrent_usec = end - start; \
    } \
  else \
    { \
      benchmark->mpmc_usec = end - start; \
    }

  LOOP_START
  _test_enqueue_dequeue (true);
  LOOP_END ("full", true);
  LOOP_START
  _test_enqueue_dequeue (false);
  LOOP_END ("full", false);
}

static void
print_benchmark_results ()
{
  for (int i = 0; i < num_benchmarks; i++)
    {
      DspBenchmark * benchmark = &benchmarks[i];
      fprintf (
        stderr,
        "---- %s ----\n"
        "concurrent queue: %ld.%ldms\n"
        "mpmc queue: %ld.%ldms\n",
        benchmark->func_name,
        benchmark->concurrent_usec / 1000,
        benchmark->concurrent_usec % 1000,
        benchmark->mpmc_usec / 1000,
        benchmark->mpmc_usec % 1000);
    }
}

int
main (int argc, char *argv[])
{
  g_test_init (&argc, &argv, NULL);

#define TEST_PREFIX "/benchmarks/queues/"

  g_test_add_func (
    TEST_PREFIX "test enqueue dequeue",
    (GTestFunc) test_enqueue_dequeue);
  g_test_add_func (
    TEST_PREFIX "print benchmark results",
    (GTestFunc) print_benchmark_results);

  return g_test_run ();
}


M tests/meson.build => tests/meson.build +2 -2
@@ 202,8 202,8 @@ if get_option ('tests')
        parallel: false },
      'actions/tracklist_selections_edit': {
        parallel: false },
      'benchmarks/dsp': {
        parallel: true },
      'benchmarks/dsp': { parallel: true },
      'benchmarks/queues': { parallel: true },
      'integration/midi_file': {
        parallel: false },
      # cannot be parallel because it needs multiple