libcluon  0.0.148
NotifyingPipeline.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2017-2018 Christian Berger
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 #ifndef CLUON_NOTIFYINGPIPELINE_HPP
10 #define CLUON_NOTIFYINGPIPELINE_HPP
11 
12 #include "cluon/cluon.hpp"
13 
14 #include <atomic>
15 #include <condition_variable>
16 #include <deque>
17 #include <functional>
18 #include <mutex>
19 #include <thread>
20 
21 namespace cluon {
22 
23 template <class T>
25  private:
26  NotifyingPipeline(const NotifyingPipeline &) = delete;
28  NotifyingPipeline &operator=(const NotifyingPipeline &) = delete;
29  NotifyingPipeline &operator=(NotifyingPipeline &&) = delete;
30 
31  public:
32  NotifyingPipeline(std::function<void(T &&)> delegate)
33  : m_delegate(delegate) {
34  m_pipelineThread = std::thread(&NotifyingPipeline::processPipeline, this);
35 
36  // Let the operating system spawn the thread.
37  using namespace std::literals::chrono_literals; // NOLINT
38  do { std::this_thread::sleep_for(1ms); } while (!m_pipelineThreadRunning.load());
39  }
40 
42  m_pipelineThreadRunning.store(false);
43 
44  // Wake any waiting threads.
45  m_pipelineCondition.notify_all();
46 
47  // Joining the thread could fail.
48  try {
49  if (m_pipelineThread.joinable()) {
50  m_pipelineThread.join();
51  }
52  } catch (...) {} // LCOV_EXCL_LINE
53  }
54 
55  public:
56  inline void add(T &&entry) noexcept {
57  std::unique_lock<std::mutex> lck(m_pipelineMutex);
58  m_pipeline.emplace_back(entry);
59  }
60 
61  inline void notifyAll() noexcept { m_pipelineCondition.notify_all(); }
62 
63  inline bool isRunning() noexcept { return m_pipelineThreadRunning.load(); }
64 
65  private:
66  inline void processPipeline() noexcept {
67  // Indicate to caller that we are ready.
68  m_pipelineThreadRunning.store(true);
69 
70  while (m_pipelineThreadRunning.load()) {
71  std::unique_lock<std::mutex> lck(m_pipelineMutex);
72  // Wait until the thread should stop or data is available.
73  m_pipelineCondition.wait(lck, [this] { return (!this->m_pipelineThreadRunning.load() || !this->m_pipeline.empty()); });
74 
75  // The condition will automatically lock the mutex after waking up.
76  // As we are locking per entry, we need to unlock the mutex first.
77  lck.unlock();
78 
79  uint32_t entries{0};
80  {
81  lck.lock();
82  entries = static_cast<uint32_t>(m_pipeline.size());
83  lck.unlock();
84  }
85  for (uint32_t i{0}; i < entries; i++) {
86  T entry;
87  {
88  lck.lock();
89  entry = m_pipeline.front();
90  lck.unlock();
91  }
92 
93  if (nullptr != m_delegate) {
94  m_delegate(std::move(entry));
95  }
96 
97  {
98  lck.lock();
99  m_pipeline.pop_front();
100  lck.unlock();
101  }
102  }
103  }
104  }
105 
106  private:
107  std::function<void(T &&)> m_delegate;
108 
109  std::atomic<bool> m_pipelineThreadRunning{false};
110  std::thread m_pipelineThread{};
111  std::mutex m_pipelineMutex{};
112  std::condition_variable m_pipelineCondition{};
113 
114  std::deque<T> m_pipeline{};
115 };
116 } // namespace cluon
117 
118 #endif
void notifyAll() noexcept
Definition: NotifyingPipeline.hpp:61
Definition: NotifyingPipeline.hpp:24
~NotifyingPipeline()
Definition: NotifyingPipeline.hpp:41
Definition: cluon.hpp:65
bool isRunning() noexcept
Definition: NotifyingPipeline.hpp:63
#define LIBCLUON_API
Definition: cluon.hpp:56
void add(T &&entry) noexcept
Definition: NotifyingPipeline.hpp:56
NotifyingPipeline(std::function< void(T &&)> delegate)
Definition: NotifyingPipeline.hpp:32