Custom thread safe wrapper for std::queue

This commit is contained in:
Anonymous275 2022-07-25 10:02:57 +03:00
parent 16c8f0a052
commit 2ccd17fb33
2 changed files with 63 additions and 4 deletions

56
include/atomic_queue.h Normal file
View File

@ -0,0 +1,56 @@
//
// Created by Anonymous275 on 24/07/22.
//
#pragma once
#include <semaphore>
#include <queue>
template <class T, size_t Size>
class atomic_queue {
public:
bool try_pop(T& val) {
lock_guard guard(semaphore);
if(queue.empty())return false;
val = queue.front();
queue.pop();
full.release();
return true;
}
void push(const T& val) {
check_full();
lock_guard guard(semaphore);
queue.push(val);
}
size_t size() {
lock_guard guard(semaphore);
return queue.size();
}
bool empty() {
lock_guard guard(semaphore);
return queue.empty();
}
private:
void check_full() {
if(size() >= Size) {
full.acquire();
}
}
private:
struct lock_guard {
explicit lock_guard(std::binary_semaphore& lock) : lock(lock){
lock.acquire();
}
~lock_guard() {
lock.release();
}
private:
std::binary_semaphore& lock;
};
std::binary_semaphore semaphore{1}, full{0};
std::queue<T> queue{};
};

View File

@ -4,11 +4,13 @@
///
#include "atomic_queue/atomic_queue.h"
#include "atomic_queue.h"
#include "Memory/BeamNG.h"
#include "Memory/Memory.h"
atomic_queue::AtomicQueue2<std::string, 1000> AtomicQueue;
//atomic_queue::AtomicQueue2<std::string, 1000> AtomicQueue;
std::unique_ptr<atomic_queue<std::string, 1000>> Queue;
int BeamNG::lua_open_jit_D(lua_State* State) {
Memory::Print("Got lua State");
@ -18,6 +20,7 @@ int BeamNG::lua_open_jit_D(lua_State* State) {
}
void BeamNG::EntryPoint() {
Queue = std::make_unique<atomic_queue<std::string, 1000>>();
auto status = MH_Initialize();
if(status != MH_OK)Memory::Print(std::string("MH Error -> ") + MH_StatusToString(status));
Memory::Print("PID : " + std::to_string(Memory::GetPID()));
@ -55,7 +58,7 @@ int Game(lua_State* L) {
int LuaPop(lua_State* L) {
std::string MSG;
if (AtomicQueue.try_pop(MSG)) {
if (Queue->try_pop(MSG)) {
GELua::lua_push_fstring(L, "%s", MSG.c_str());
return 1;
}
@ -82,7 +85,7 @@ void BeamNG::IPCListener() {
IPCFromLauncher->receive();
if (!IPCFromLauncher->receive_timed_out()) {
TimeOuts = 0;
AtomicQueue.push(IPCFromLauncher->msg());
Queue->push(IPCFromLauncher->msg());
IPCFromLauncher->confirm_receive();
} else TimeOuts++;
}