9 #ifndef CLUON_NOTIFYINGPIPELINE_HPP 10 #define CLUON_NOTIFYINGPIPELINE_HPP 15 #include <condition_variable> 33 : m_delegate(delegate) {
34 m_pipelineThread = std::thread(&NotifyingPipeline::processPipeline,
this);
37 using namespace std::literals::chrono_literals;
38 do { std::this_thread::sleep_for(1ms); }
while (!m_pipelineThreadRunning.load());
42 m_pipelineThreadRunning.store(
false);
45 m_pipelineCondition.notify_all();
49 if (m_pipelineThread.joinable()) {
50 m_pipelineThread.join();
56 inline void add(T &&entry) noexcept {
57 std::unique_lock<std::mutex> lck(m_pipelineMutex);
58 m_pipeline.emplace_back(entry);
61 inline void notifyAll() noexcept { m_pipelineCondition.notify_all(); }
63 inline bool isRunning() noexcept {
return m_pipelineThreadRunning.load(); }
66 inline void processPipeline() noexcept {
68 m_pipelineThreadRunning.store(
true);
70 while (m_pipelineThreadRunning.load()) {
71 std::unique_lock<std::mutex> lck(m_pipelineMutex);
73 m_pipelineCondition.wait(lck, [
this] {
return (!this->m_pipelineThreadRunning.load() || !this->m_pipeline.empty()); });
82 entries =
static_cast<uint32_t
>(m_pipeline.size());
85 for (uint32_t i{0}; i < entries; i++) {
89 entry = m_pipeline.front();
93 if (
nullptr != m_delegate) {
94 m_delegate(std::move(entry));
99 m_pipeline.pop_front();
107 std::function<void(T &&)> m_delegate;
109 std::atomic<bool> m_pipelineThreadRunning{
false};
110 std::thread m_pipelineThread{};
111 std::mutex m_pipelineMutex{};
112 std::condition_variable m_pipelineCondition{};
114 std::deque<T> m_pipeline{};
void notifyAll() noexcept
Definition: NotifyingPipeline.hpp:61
Definition: NotifyingPipeline.hpp:24
~NotifyingPipeline()
Definition: NotifyingPipeline.hpp:41
bool isRunning() noexcept
Definition: NotifyingPipeline.hpp:63
#define LIBCLUON_API
Definition: cluon.hpp:56
void add(T &&entry) noexcept
Definition: NotifyingPipeline.hpp:56
NotifyingPipeline(std::function< void(T &&)> delegate)
Definition: NotifyingPipeline.hpp:32