M AsynchronousService/CMakeLists.txt => AsynchronousService/CMakeLists.txt +1 -0
@@ 5,6 5,7 @@ if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
endif()
include_directories(
+ ${PROJECT_SOURCE_DIR}/BlockingQueue/include
${PROJECT_SOURCE_DIR}/AsynchronousExecutor/include
src
)
M AsynchronousService/src/MessageQueue.h => AsynchronousService/src/MessageQueue.h +1 -0
@@ 5,6 5,7 @@ template<typename T>
class MessageQueue
{
public:
+ virtual ~MessageQueue() {}
virtual void put(T message) = 0;
virtual T get() = 0;
};
A AsynchronousService/src/MessageQueueImpl.h => AsynchronousService/src/MessageQueueImpl.h +40 -0
@@ 0,0 1,40 @@
+#ifndef MESSAGEQUEUEIMPL_H
+#define MESSAGEQUEUEIMPL_H
+
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include "Message.h"
+#include "MessageQueue.h"
+#include "blocking_queue.h"
+
+class MessageQueueImpl : public MessageQueue<Message>
+{
+public:
+ MessageQueueImpl()
+ : blockingQueue(taskQueue, mutex, conditionVariable)
+ {}
+
+ void put(Message message)
+ {
+ blockingQueue.push_and_notify_one(message);
+ }
+
+ Message get()
+ {
+ return blockingQueue.wait_and_pop();
+ }
+
+private:
+ std::queue<Message> taskQueue;
+ std::mutex mutex;
+ std::condition_variable conditionVariable;
+
+ blocking_queue<Message,
+ std::queue<Message>,
+ std::mutex,
+ std::condition_variable,
+ std::unique_lock<std::mutex> > blockingQueue;
+};
+
+#endif // MESSAGEQUEUEIMPL_H
M AsynchronousService/test/CMakeLists.txt => AsynchronousService/test/CMakeLists.txt +1 -0
@@ 17,6 17,7 @@ add_executable(asyncService-ut
target_link_libraries(asyncService-ut
${CMAKE_THREAD_LIBS_INIT}
${GTEST_GMOCK_MAIN_LIBRARY}
+ asyncExecutor
)
add_custom_target(asyncService-ut-run
A AsynchronousService/test/ut/IntegrationTests.cpp => AsynchronousService/test/ut/IntegrationTests.cpp +90 -0
@@ 0,0 1,90 @@
+#include <gtest/gtest.h>
+#include "MessageQueueImpl.h"
+#include "MockTaskWithDie.h"
+#include "NotifierImpl.h"
+#include "AsynchronousExecutor.h"
+#include "SynchronousCompletion.h"
+#include "DispatcherImpl.h"
+#include "MainMessageLoop.h"
+#include <unistd.h>
+
+using std::unique_ptr;
+using testing::InSequence;
+
+class TaskController
+{
+public:
+ TaskController(AsynchronousExecutor& asynchronousExecutor)
+ : asynchronousExecutor(asynchronousExecutor)
+ {}
+
+ void addTaskToExecute(Task* task)
+ {
+ taskToExecute = task;
+ }
+
+ void runParallelTasks()
+ {
+ asynchronousExecutor.execute(taskToExecute);
+ }
+
+private:
+ AsynchronousExecutor& asynchronousExecutor;
+ Task* taskToExecute;
+};
+
+ACTION_P(ShutDownMessageQueue, messageQueue)
+{
+ messageQueue->put( {MessageId::ShutDownMessage, nullptr} );
+}
+
+struct IntegrationTests : testing::Test
+{
+ IntegrationTests()
+ : notifier(messageQueue),
+ asynchronousExecutor(notifier),
+ mainMessageLoop(messageQueue, synchronousCompletion, dispatcher)
+ {}
+
+ void SetUp()
+ {
+ asynchronousExecutor.createAndStartTaskExecutors(1);
+ }
+
+ void TearDown()
+ {
+ asynchronousExecutor.joinAndReleaseTaskExecutors();
+ }
+
+ MessageQueueImpl messageQueue;
+ NotifierImpl notifier;
+ AsynchronousExecutor asynchronousExecutor;
+ SynchronousCompletion synchronousCompletion;
+ DispatcherImpl dispatcher;
+ MainMessageLoop mainMessageLoop;
+};
+
+TEST_F(IntegrationTests, OneTaskExecutionWithOneWorker)
+{
+ TaskController taskController(asynchronousExecutor);
+
+ dispatcher.bind(MessageId::RunParallelTasksMessage, [&] { taskController.runParallelTasks(); });
+
+ MockTaskWithDie* mockTask = new MockTaskWithDie();
+
+ {
+ InSequence seq;
+
+ EXPECT_CALL(*mockTask, onExecute());
+
+ EXPECT_CALL(*mockTask, onComplete());
+
+ EXPECT_CALL(*mockTask, Die()).WillOnce(ShutDownMessageQueue(&messageQueue));
+ }
+
+ taskController.addTaskToExecute(mockTask);
+
+ messageQueue.put( {MessageId::RunParallelTasksMessage, nullptr});
+
+ mainMessageLoop.dispatchMessagesOrCompleteTasksUntilShutDown();
+}