Eliya/src/Reader/ThreadSafeDeque.h

102 lines
2.4 KiB
C++

//
// Created by versustune on 29.02.20.
//
#ifndef ELIYA_THREADSAFEDEQUE_H
#define ELIYA_THREADSAFEDEQUE_H
#include <utility>
#include <atomic>
#include <queue>
#include <condition_variable>
#include <optional>
#include <cassert>
template<typename T>
struct Tsqueue {
/* Create Tsqueue object. Set maximum size of the queue to max_size. */
inline explicit Tsqueue(size_t max_size = -1UL) : maxsize(max_size), end(false) {};
/* Push T to the queue. Many threads can push at the same time.
* If the queue is full, calling thread will be suspended until
* some other thread pop() data. */
void push(const T &);
void push(T &&);
/* Close the queue.
* Be sure all writing threads done their writes before call this.
* Push data to closed queue is forbidden. */
void close();
/* Pop and return T from the queue. Many threads can pop at the same time.
* If the queue is empty, calling thread will be suspended.
* If the queue is empty and closed, nullopt returned. */
std::optional<T> pop();
std::optional<T> waitAndPop();
int getSize() {
return que.size();
}
private:
std::queue<T> que;
std::mutex mtx;
std::condition_variable cv_empty, cv_full;
const size_t maxsize;
std::atomic<bool> end;
};
template<typename T>
void Tsqueue<T>::push(T &&t) {
std::unique_lock<std::mutex> lck(mtx);
while (que.size() == maxsize && !end)
cv_full.wait(lck);
assert(!end);
que.push(std::move(t));
cv_empty.notify_one();
}
template<typename T>
void Tsqueue<T>::push(const T &t) {
std::unique_lock<std::mutex> lck(mtx);
while (que.size() == maxsize && !end)
cv_full.wait(lck);
assert(!end);
que.push(std::move(t));
cv_empty.notify_one();
}
template<typename T>
std::optional<T> Tsqueue<T>::pop() {
std::unique_lock<std::mutex> lck(mtx);
if (que.empty()) return {};
T t = std::move(que.front());
que.pop();
cv_full.notify_one();
return t;
}
template<typename T>
std::optional<T> Tsqueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lck(mtx);
while (que.empty() && !end)
cv_empty.wait(lck);
if (que.empty()) return {};
T t = std::move(que.front());
que.pop();
cv_full.notify_one();
return t;
}
template<typename T>
void Tsqueue<T>::close() {
end = true;
std::lock_guard<std::mutex> lck(mtx);
cv_empty.notify_one();
cv_full.notify_one();
}
#endif //ELIYA_THREADSAFEDEQUE_H