IQRF Gateway Daemon
TaskQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016-2017 MICRORISC s.r.o.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <functional>
20 #include <thread>
21 #include <mutex>
22 #include <atomic>
23 #include <condition_variable>
24 #include <queue>
25 
31 template <class T>
32 class TaskQueue
33 {
34 public:
36  typedef std::function<void(T)> ProcessTaskFunc;
37 
43  TaskQueue(ProcessTaskFunc processTaskFunc)
44  :m_processTaskFunc(processTaskFunc)
45  {
46  m_taskPushed = false;
47  m_runWorkerThread = true;
48  m_workerThread = std::thread(&TaskQueue::worker, this);
49  }
50 
54  virtual ~TaskQueue()
55  {
56  {
57  std::unique_lock<std::mutex> lck(m_taskQueueMutex);
58  m_runWorkerThread = false;
59  m_taskPushed = true;
60  }
61  m_conditionVariable.notify_all();
62 
63  if (m_workerThread.joinable())
64  m_workerThread.join();
65  }
66 
73  int pushToQueue(const T& task)
74  {
75  int retval = 0;
76  {
77  std::unique_lock<std::mutex> lck(m_taskQueueMutex);
78  m_taskQueue.push(task);
79  retval = m_taskQueue.size();
80  m_taskPushed = true;
81  }
82  m_conditionVariable.notify_all();
83  return retval;
84  }
85 
89  void stopQueue()
90  {
91  {
92  std::unique_lock<std::mutex> lck(m_taskQueueMutex);
93  m_runWorkerThread = false;
94  m_taskPushed = true;
95  }
96  m_conditionVariable.notify_all();
97  }
98 
101  size_t size()
102  {
103  size_t retval = 0;
104  {
105  std::unique_lock<std::mutex> lck(m_taskQueueMutex);
106  retval = m_taskQueue.size();
107  }
108  return retval;
109  }
110 
111 private:
113  void worker()
114  {
115  std::unique_lock<std::mutex> lck(m_taskQueueMutex, std::defer_lock);
116 
117  while (m_runWorkerThread) {
118 
119  //wait for something in the queue
120  lck.lock();
121  m_conditionVariable.wait(lck, [&] { return m_taskPushed; }); //lock is released in wait
122  //lock is reacquired here
123  m_taskPushed = false;
124 
125  while (m_runWorkerThread) {
126  if (!m_taskQueue.empty()) {
127  auto task = m_taskQueue.front();
128  m_taskQueue.pop();
129  lck.unlock();
130  m_processTaskFunc(task);
131  }
132  else {
133  lck.unlock();
134  break;
135  }
136  lck.lock(); //lock for next iteration
137  }
138  }
139  }
140 
141  std::mutex m_taskQueueMutex;
142  std::condition_variable m_conditionVariable;
143  std::queue<T> m_taskQueue;
144  bool m_taskPushed;
145  bool m_runWorkerThread;
146  std::thread m_workerThread;
147 
148  ProcessTaskFunc m_processTaskFunc;
149 };
int pushToQueue(const T &task)
Push task to queue.
Definition: TaskQueue.h:73
TaskQueue(ProcessTaskFunc processTaskFunc)
constructor
Definition: TaskQueue.h:43
virtual ~TaskQueue()
destructor
Definition: TaskQueue.h:54
size_t size()
Get actual queue size.
Definition: TaskQueue.h:101
void stopQueue()
Stop queue.
Definition: TaskQueue.h:89
std::function< void(T)> ProcessTaskFunc
Processing function type.
Definition: TaskQueue.h:36
Maintain queue of tasks and invoke sequential processing.
Definition: TaskQueue.h:32