CreateEventTimer: Implement CallStrategy

There are two CallStrategies:

- BestEffort (default): Will try to get your event to trigger at the specified
  interval, but will refuse to queue handlers if a handler takes too
  long.
- Precise: Will enqueue event handlers at the exact interval specified.
  Can lead to the queue filling up if the handler takes longer than the
  interval.
This commit is contained in:
Lion Kortlepel
2022-03-31 20:13:59 +02:00
parent 81dbf747d5
commit d4b30a2583
3 changed files with 90 additions and 23 deletions
+17 -3
View File
@@ -5,8 +5,8 @@
#include <any> #include <any>
#include <condition_variable> #include <condition_variable>
#include <filesystem> #include <filesystem>
#include <list>
#include <initializer_list> #include <initializer_list>
#include <list>
#include <lua.hpp> #include <lua.hpp>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -73,6 +73,18 @@ private:
class TLuaEngine : IThreaded { class TLuaEngine : IThreaded {
public: public:
enum CallStrategy : int {
BestEffort,
Precise,
};
struct QueuedFunction {
std::string FunctionName;
std::shared_ptr<TLuaResult> Result;
std::vector<TLuaArgTypes> Args;
std::string EventName; // optional, may be empty
};
TLuaEngine(); TLuaEngine();
~TLuaEngine() noexcept { ~TLuaEngine() noexcept {
beammp_debug("Lua Engine terminated"); beammp_debug("Lua Engine terminated");
@@ -146,7 +158,7 @@ public:
return Results; // return Results; //
} }
std::set<std::string> GetEventHandlersForState(const std::string& EventName, TLuaStateId StateId); std::set<std::string> GetEventHandlersForState(const std::string& EventName, TLuaStateId StateId);
void CreateEventTimer(const std::string& EventName, TLuaStateId StateId, size_t IntervalMS); void CreateEventTimer(const std::string& EventName, TLuaStateId StateId, size_t IntervalMS, CallStrategy Strategy);
void CancelEventTimers(const std::string& EventName, TLuaStateId StateId); void CancelEventTimers(const std::string& EventName, TLuaStateId StateId);
sol::state_view GetStateForPlugin(const fs::path& PluginPath); sol::state_view GetStateForPlugin(const fs::path& PluginPath);
TLuaStateId GetStateIDForPlugin(const fs::path& PluginPath); TLuaStateId GetStateIDForPlugin(const fs::path& PluginPath);
@@ -167,6 +179,7 @@ private:
~StateThreadData() noexcept { beammp_debug("\"" + mStateId + "\" destroyed"); } ~StateThreadData() noexcept { beammp_debug("\"" + mStateId + "\" destroyed"); }
[[nodiscard]] std::shared_ptr<TLuaResult> EnqueueScript(const TLuaChunk& Script); [[nodiscard]] std::shared_ptr<TLuaResult> EnqueueScript(const TLuaChunk& Script);
[[nodiscard]] std::shared_ptr<TLuaResult> EnqueueFunctionCall(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args); [[nodiscard]] std::shared_ptr<TLuaResult> EnqueueFunctionCall(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args);
[[nodiscard]] std::shared_ptr<TLuaResult> EnqueueFunctionCallFromCustomEvent(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args, const std::string& EventName, CallStrategy Strategy);
void RegisterEvent(const std::string& EventName, const std::string& FunctionName); void RegisterEvent(const std::string& EventName, const std::string& FunctionName);
void AddPath(const fs::path& Path); // to be added to path and cpath void AddPath(const fs::path& Path); // to be added to path and cpath
void operator()() override; void operator()() override;
@@ -189,7 +202,7 @@ private:
std::thread mThread; std::thread mThread;
std::queue<std::pair<TLuaChunk, std::shared_ptr<TLuaResult>>> mStateExecuteQueue; std::queue<std::pair<TLuaChunk, std::shared_ptr<TLuaResult>>> mStateExecuteQueue;
std::recursive_mutex mStateExecuteQueueMutex; std::recursive_mutex mStateExecuteQueueMutex;
std::queue<std::tuple<std::string, std::shared_ptr<TLuaResult>, std::vector<TLuaArgTypes>>> mStateFunctionQueue; std::vector<QueuedFunction> mStateFunctionQueue;
std::mutex mStateFunctionQueueMutex; std::mutex mStateFunctionQueueMutex;
std::condition_variable mStateFunctionQueueCond; std::condition_variable mStateFunctionQueueCond;
TLuaEngine* mEngine; TLuaEngine* mEngine;
@@ -203,6 +216,7 @@ private:
std::chrono::high_resolution_clock::time_point LastCompletion {}; std::chrono::high_resolution_clock::time_point LastCompletion {};
std::string EventName; std::string EventName;
TLuaStateId StateId; TLuaStateId StateId;
CallStrategy Strategy;
bool Expired(); bool Expired();
void Reset(); void Reset();
}; };
+72 -20
View File
@@ -81,22 +81,31 @@ void TLuaEngine::operator()() {
std::unique_lock Lock(mTimedEventsMutex); std::unique_lock Lock(mTimedEventsMutex);
for (auto& Timer : mTimedEvents) { for (auto& Timer : mTimedEvents) {
if (Timer.Expired()) { if (Timer.Expired()) {
auto LastCompletionBeforeReset = Timer.LastCompletion;
Timer.Reset(); Timer.Reset();
auto Handlers = GetEventHandlersForState(Timer.EventName, Timer.StateId); auto Handlers = GetEventHandlersForState(Timer.EventName, Timer.StateId);
std::unique_lock StateLock(mLuaStatesMutex); std::unique_lock StateLock(mLuaStatesMutex);
std::unique_lock Lock2(mResultsToCheckMutex); std::unique_lock Lock2(mResultsToCheckMutex);
for (auto& Handler : Handlers) { for (auto& Handler : Handlers) {
auto Res = mLuaStates[Timer.StateId]->EnqueueFunctionCall(Handler, {}); auto Res = mLuaStates[Timer.StateId]->EnqueueFunctionCallFromCustomEvent(Handler, {}, Timer.EventName, Timer.Strategy);
mResultsToCheck.push_back(Res); if (Res) {
mResultsToCheckCond.notify_one(); mResultsToCheck.push_back(Res);
mResultsToCheckCond.notify_one();
} else {
// "revert" reset
Timer.LastCompletion = LastCompletionBeforeReset;
// beammp_trace("Reverted reset of \"" + Timer.EventName + "\" timer");
// no need to try to enqueue more handlers for this event (they will all fail)
break;
}
} }
} }
} }
} }
std::chrono::high_resolution_clock::duration Diff; const auto Expected = std::chrono::milliseconds(10);
if ((Diff = std::chrono::high_resolution_clock::now() - Before) if (auto Diff = std::chrono::high_resolution_clock::now() - Before;
< std::chrono::milliseconds(10)) { Diff < Expected) {
std::this_thread::sleep_for(Diff); std::this_thread::sleep_for(Expected - Diff);
} else { } else {
beammp_trace("Event loop cannot keep up! Running " + std::to_string(Diff.count()) + "s behind"); beammp_trace("Event loop cannot keep up! Running " + std::to_string(Diff.count()) + "s behind");
} }
@@ -155,9 +164,8 @@ void TLuaEngine::WaitForAll(std::vector<std::shared_ptr<TLuaResult>>& Results, c
if (Max.has_value() && std::chrono::milliseconds(ms) > Max.value()) { if (Max.has_value() && std::chrono::milliseconds(ms) > Max.value()) {
beammp_trace("'" + Result->Function + "' in '" + Result->StateId + "' did not finish executing in time (took: " + std::to_string(ms) + "ms)."); beammp_trace("'" + Result->Function + "' in '" + Result->StateId + "' did not finish executing in time (took: " + std::to_string(ms) + "ms).");
Cancelled = true; Cancelled = true;
} } else if (ms > 1000 * 60) {
else if (ms > 1000 * 60) { beammp_lua_warn("'" + Result->Function + "' in '" + Result->StateId + "' is taking very long. The event it's handling is too important to discard the result of this handler, but may block this event and possibly the whole lua state.");
beammp_lua_warn("'" + Result->Function +"' in '" + Result->StateId + "' is taking very long. The event it's handling is too important to discard the result of this handler, but may block this event and possibly the whole lua state.");
} }
} }
if (Cancelled) { if (Cancelled) {
@@ -504,11 +512,27 @@ TLuaEngine::StateThreadData::StateThreadData(const std::string& Name, std::atomi
return Lua_GetPlayerIdentifiers(ID); return Lua_GetPlayerIdentifiers(ID);
}); });
MPTable.set_function("Sleep", &LuaAPI::MP::Sleep); MPTable.set_function("Sleep", &LuaAPI::MP::Sleep);
MPTable.set_function("CreateEventTimer", [&](const std::string& EventName, size_t IntervalMS) { // const std::string& EventName, size_t IntervalMS, int strategy
MPTable.set_function("CreateEventTimer", [&](sol::variadic_args Args) {
if (Args.size() < 2 || Args.size() > 3) {
beammp_lua_error("CreateEventTimer expects 2 or 3 arguments.");
}
if (Args.get_type(0) != sol::type::string) {
beammp_lua_error("CreateEventTimer expects 1st argument to be a string");
}
if (Args.get_type(1) != sol::type::number) {
beammp_lua_error("CreateEventTimer expects 2nd argument to be a number");
}
if (Args.size() == 3 && Args.get_type(2) != sol::type::number) {
beammp_lua_error("CreateEventTimer expects 3rd argument to be a number (MP.CallStrategy)");
}
auto EventName = Args.get<std::string>(0);
auto IntervalMS = Args.get<size_t>(1);
CallStrategy Strategy = Args.size() > 2 ? Args.get<CallStrategy>(2) : CallStrategy::BestEffort;
if (IntervalMS < 25) { if (IntervalMS < 25) {
beammp_warn("Timer for \"" + EventName + "\" on \"" + mStateId + "\" is set to trigger at <25ms, which is likely too fast and won't cancel properly."); beammp_warn("Timer for \"" + EventName + "\" on \"" + mStateId + "\" is set to trigger at <25ms, which is likely too fast and won't cancel properly.");
} }
mEngine->CreateEventTimer(EventName, mStateId, IntervalMS); mEngine->CreateEventTimer(EventName, mStateId, IntervalMS, Strategy);
}); });
MPTable.set_function("CancelEventTimer", [&](const std::string& EventName) { MPTable.set_function("CancelEventTimer", [&](const std::string& EventName) {
mEngine->CancelEventTimers(EventName, mStateId); mEngine->CancelEventTimers(EventName, mStateId);
@@ -528,6 +552,10 @@ TLuaEngine::StateThreadData::StateThreadData(const std::string& Name, std::atomi
"Name", 5, "Name", 5,
"Description", 6); "Description", 6);
MPTable.create_named("CallStrategy",
"BestEffort", CallStrategy::BestEffort,
"Precise", CallStrategy::Precise);
auto FSTable = StateView.create_named_table("FS"); auto FSTable = StateView.create_named_table("FS");
FSTable.set_function("CreateDirectory", &LuaAPI::FS::CreateDirectory); FSTable.set_function("CreateDirectory", &LuaAPI::FS::CreateDirectory);
FSTable.set_function("Exists", &LuaAPI::FS::Exists); FSTable.set_function("Exists", &LuaAPI::FS::Exists);
@@ -550,12 +578,34 @@ std::shared_ptr<TLuaResult> TLuaEngine::StateThreadData::EnqueueScript(const TLu
return Result; return Result;
} }
std::shared_ptr<TLuaResult> TLuaEngine::StateThreadData::EnqueueFunctionCallFromCustomEvent(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args, const std::string& EventName, CallStrategy Strategy) {
// TODO: Document all this
decltype(mStateFunctionQueue)::iterator Iter = mStateFunctionQueue.end();
if (Strategy == CallStrategy::BestEffort) {
Iter = std::find_if(mStateFunctionQueue.begin(), mStateFunctionQueue.end(),
[&EventName](const QueuedFunction& Element) {
return Element.EventName == EventName;
});
}
if (Iter == mStateFunctionQueue.end()) {
auto Result = std::make_shared<TLuaResult>();
Result->StateId = mStateId;
Result->Function = FunctionName;
std::unique_lock Lock(mStateFunctionQueueMutex);
mStateFunctionQueue.push_back({ FunctionName, Result, Args, EventName });
mStateFunctionQueueCond.notify_all();
return Result;
} else {
return nullptr;
}
}
std::shared_ptr<TLuaResult> TLuaEngine::StateThreadData::EnqueueFunctionCall(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args) { std::shared_ptr<TLuaResult> TLuaEngine::StateThreadData::EnqueueFunctionCall(const std::string& FunctionName, const std::vector<TLuaArgTypes>& Args) {
auto Result = std::make_shared<TLuaResult>(); auto Result = std::make_shared<TLuaResult>();
Result->StateId = mStateId; Result->StateId = mStateId;
Result->Function = FunctionName; Result->Function = FunctionName;
std::unique_lock Lock(mStateFunctionQueueMutex); std::unique_lock Lock(mStateFunctionQueueMutex);
mStateFunctionQueue.push({ FunctionName, Result, Args }); mStateFunctionQueue.push_back({ FunctionName, Result, Args, "" });
mStateFunctionQueueCond.notify_all(); mStateFunctionQueueCond.notify_all();
return Result; return Result;
} }
@@ -618,12 +668,13 @@ void TLuaEngine::StateThreadData::operator()() {
std::chrono::milliseconds(500), std::chrono::milliseconds(500),
[&]() -> bool { return !mStateFunctionQueue.empty(); }); [&]() -> bool { return !mStateFunctionQueue.empty(); });
if (NotExpired) { if (NotExpired) {
auto FnNameResultPair = std::move(mStateFunctionQueue.front()); auto TheQueuedFunction = std::move(mStateFunctionQueue.front());
mStateFunctionQueue.pop(); mStateFunctionQueue.erase(mStateFunctionQueue.begin());
Lock.unlock(); Lock.unlock();
auto& FnName = std::get<0>(FnNameResultPair); auto& FnName = TheQueuedFunction.FunctionName;
auto& Result = std::get<1>(FnNameResultPair); auto& Result = TheQueuedFunction.Result;
auto Args = std::get<2>(FnNameResultPair); auto Args = TheQueuedFunction.Args;
// TODO: Use TheQueuedFunction.EventName for errors, warnings, etc
Result->StateId = mStateId; Result->StateId = mStateId;
sol::state_view StateView(mState); sol::state_view StateView(mState);
auto Fn = StateView[FnName]; auto Fn = StateView[FnName];
@@ -671,13 +722,14 @@ void TLuaEngine::StateThreadData::operator()() {
} }
} }
void TLuaEngine::CreateEventTimer(const std::string& EventName, TLuaStateId StateId, size_t IntervalMS) { void TLuaEngine::CreateEventTimer(const std::string& EventName, TLuaStateId StateId, size_t IntervalMS, CallStrategy Strategy) {
std::unique_lock Lock(mTimedEventsMutex); std::unique_lock Lock(mTimedEventsMutex);
TimedEvent Event { TimedEvent Event {
std::chrono::high_resolution_clock::duration { std::chrono::milliseconds(IntervalMS) }, std::chrono::high_resolution_clock::duration { std::chrono::milliseconds(IntervalMS) },
std::chrono::high_resolution_clock::now(), std::chrono::high_resolution_clock::now(),
EventName, EventName,
StateId StateId,
Strategy
}; };
mTimedEvents.push_back(std::move(Event)); mTimedEvents.push_back(std::move(Event));
beammp_trace("created event timer for \"" + EventName + "\" on \"" + StateId + "\" with " + std::to_string(IntervalMS) + "ms interval"); beammp_trace("created event timer for \"" + EventName + "\" on \"" + StateId + "\" with " + std::to_string(IntervalMS) + "ms interval");
+1
View File
@@ -514,6 +514,7 @@ void TNetwork::Looper(const std::weak_ptr<TClient>& c) {
} }
} }
} }
void TNetwork::TCPClient(const std::weak_ptr<TClient>& c) { void TNetwork::TCPClient(const std::weak_ptr<TClient>& c) {
// TODO: the c.expired() might cause issues here, remove if you end up here with your debugger // TODO: the c.expired() might cause issues here, remove if you end up here with your debugger
if (c.expired() || c.lock()->GetTCPSock() == -1) { if (c.expired() || c.lock()->GetTCPSock() == -1) {