VWeb/Includes/Queue.h

87 lines
2.1 KiB
C++

#pragma once
#include <mutex>
#include <queue>
#include <optional>
#include <condition_variable>
#include <atomic>
#include <cassert>
namespace VWeb {
template <typename T> struct SafeQueue {
explicit SafeQueue(size_t maxSize = -1UL)
: m_MaxSize(maxSize),
m_End(false){};
void Push(const T &t) {
std::unique_lock<std::mutex> lck(m_Mutex);
while (m_Queue.size() == m_MaxSize && !m_End)
m_CVFull.wait(lck);
assert(!m_End);
m_Queue.push(std::move(t));
m_CVEmpty.notify_one();
};
void Push(T &&t) {
std::unique_lock<std::mutex> lck(m_Mutex);
while (m_Queue.size() == m_MaxSize && !m_End)
m_CVFull.wait(lck);
assert(!m_End);
m_Queue.push(std::move(t));
m_CVEmpty.notify_one();
};
void Open() {
m_End = false;
std::lock_guard<std::mutex> lck(m_Mutex);
m_CVEmpty.notify_all();
m_CVFull.notify_all();
};
void Close() {
m_End = true;
std::lock_guard<std::mutex> lck(m_Mutex);
m_CVEmpty.notify_all();
m_CVFull.notify_all();
};
void Clear() {
std::unique_lock<std::mutex> lck(m_Mutex);
std::queue<T> empty;
std::swap(m_Queue, empty);
m_CVEmpty.notify_all();
m_CVFull.notify_all();
};
std::optional<T> Pop() {
std::unique_lock<std::mutex> lck(m_Mutex);
if (m_Queue.empty() || m_End)
return {};
T t = std::move(m_Queue.front());
m_Queue.pop();
m_CVFull.notify_one();
return t;
};
std::optional<T> WaitAndPop() {
std::unique_lock<std::mutex> lck(m_Mutex);
while (m_Queue.empty() && !m_End)
m_CVEmpty.wait(lck);
if (m_Queue.empty() || m_End)
return {};
T t = std::move(m_Queue.front());
m_Queue.pop();
m_CVFull.notify_one();
return t;
};
bool IsClosed() { return m_End; }
int Size() { return m_Queue.size(); }
std::queue<T> &GetQueue() { return m_Queue; }
void Flush() {
while (!m_Queue.empty())
m_CVEmpty.notify_one();
m_End = true;
m_CVEmpty.notify_all();
}
private:
std::queue<T> m_Queue;
std::mutex m_Mutex;
std::condition_variable m_CVEmpty, m_CVFull;
const size_t m_MaxSize;
std::atomic<bool> m_End;
};
}