use scheduled thread pool instead of loopers for the EngineSVC

This commit is contained in:
Julian Krings 2025-06-02 17:25:49 +02:00
parent adb7188eb9
commit 2c60192de3
No known key found for this signature in database
GPG Key ID: 208C6E08C3B718D2
2 changed files with 131 additions and 123 deletions

View File

@ -155,6 +155,7 @@ public class IrisSettings {
@Data
public static class IrisSettingsPerformance {
private IrisSettingsEngineSVC engineSVC = new IrisSettingsEngineSVC();
public boolean trimMantleInStudio = false;
public int mantleKeepAlive = 30;
public int cacheSize = 4_096;
@ -242,4 +243,14 @@ public class IrisSettings {
public boolean disableTimeAndWeather = true;
public boolean autoStartDefaultStudio = false;
}
@Data
public static class IrisSettingsEngineSVC {
public boolean useVirtualThreads = true;
public int priority = Thread.NORM_PRIORITY;
public int getPriority() {
return Math.max(Math.min(priority, Thread.MAX_PRIORITY), Thread.MIN_PRIORITY);
}
}
}

View File

@ -4,67 +4,72 @@ import com.google.common.util.concurrent.AtomicDouble;
import com.volmit.iris.Iris;
import com.volmit.iris.core.IrisSettings;
import com.volmit.iris.core.loader.IrisData;
import com.volmit.iris.core.nms.container.Pair;
import com.volmit.iris.core.tools.IrisToolbelt;
import com.volmit.iris.engine.framework.Engine;
import com.volmit.iris.engine.platform.PlatformChunkGenerator;
import com.volmit.iris.util.collection.KMap;
import com.volmit.iris.util.format.C;
import com.volmit.iris.util.format.Form;
import com.volmit.iris.util.math.RNG;
import com.volmit.iris.util.plugin.IrisService;
import com.volmit.iris.util.plugin.VolmitSender;
import com.volmit.iris.util.scheduling.Looper;
import lombok.Synchronized;
import org.bukkit.Bukkit;
import org.bukkit.World;
import org.bukkit.event.EventHandler;
import org.bukkit.event.world.WorldLoadEvent;
import org.bukkit.event.world.WorldUnloadEvent;
import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class IrisEngineSVC implements IrisService {
private final AtomicInteger tectonicLimit = new AtomicInteger(30);
private final AtomicInteger tectonicPlates = new AtomicInteger();
private final AtomicInteger queuedTectonicPlates = new AtomicInteger();
private final AtomicInteger trimmerAlive = new AtomicInteger();
private final AtomicInteger unloaderAlive = new AtomicInteger();
private final AtomicInteger totalWorlds = new AtomicInteger();
private final AtomicDouble maxIdleDuration = new AtomicDouble();
private final AtomicDouble minIdleDuration = new AtomicDouble();
private final AtomicLong loadedChunks = new AtomicLong();
private final List<Pair<World, PlatformChunkGenerator>> worlds = new CopyOnWriteArrayList<>();
private Looper trimTicker;
private Looper unloadTicker;
private final KMap<World, Registered> worlds = new KMap<>();
private ScheduledExecutorService service;
private Looper updateTicker;
@Override
public void onEnable() {
tectonicLimit.set(IrisSettings.get().getPerformance().getTectonicPlateSize());
for (World world : Bukkit.getWorlds()) {
var access = IrisToolbelt.access(world);
if (access == null) return;
worlds.add(new Pair<>(world, access));
}
trimLogic();
unloadLogic();
var settings = IrisSettings.get().getPerformance();
var engine = settings.getEngineSVC();
service = Executors.newScheduledThreadPool(0,
(engine.isUseVirtualThreads()
? Thread.ofVirtual()
: Thread.ofPlatform().priority(engine.getPriority()))
.name("Iris EngineSVC-", 0)
.factory());
tectonicLimit.set(settings.getTectonicPlateSize());
Bukkit.getWorlds().forEach(this::add);
setup();
}
@Override
public void onDisable() {
service.shutdown();
updateTicker.interrupt();
trimTicker.interrupt();
unloadTicker.interrupt();
worlds.keySet().forEach(this::remove);
worlds.clear();
}
public void engineStatus(VolmitSender sender) {
sender.sendMessage(C.DARK_PURPLE + "-------------------------");
sender.sendMessage(C.DARK_PURPLE + "Status:");
sender.sendMessage(C.DARK_PURPLE + "- Trim: " + C.LIGHT_PURPLE + trimTicker.isAlive());
sender.sendMessage(C.DARK_PURPLE + "- Unload: " + C.LIGHT_PURPLE + unloadTicker.isAlive());
sender.sendMessage(C.DARK_PURPLE + "- Update: " + C.LIGHT_PURPLE + updateTicker.isAlive());
sender.sendMessage(C.DARK_PURPLE + "- Service: " + C.LIGHT_PURPLE + (service.isShutdown() ? "Shutdown" : "Running"));
sender.sendMessage(C.DARK_PURPLE + "- Updater: " + C.LIGHT_PURPLE + (updateTicker.isAlive() ? "Running" : "Stopped"));
sender.sendMessage(C.DARK_PURPLE + "- Trimmers: " + C.LIGHT_PURPLE + trimmerAlive.get());
sender.sendMessage(C.DARK_PURPLE + "- Unloaders: " + C.LIGHT_PURPLE + unloaderAlive.get());
sender.sendMessage(C.DARK_PURPLE + "Tectonic Plates:");
sender.sendMessage(C.DARK_PURPLE + "- Limit: " + C.LIGHT_PURPLE + tectonicLimit.get());
sender.sendMessage(C.DARK_PURPLE + "- Total: " + C.LIGHT_PURPLE + tectonicPlates.get());
@ -72,7 +77,7 @@ public class IrisEngineSVC implements IrisService {
sender.sendMessage(C.DARK_PURPLE + "- Max Idle Duration: " + C.LIGHT_PURPLE + Form.duration(maxIdleDuration.get(), 2));
sender.sendMessage(C.DARK_PURPLE + "- Min Idle Duration: " + C.LIGHT_PURPLE + Form.duration(minIdleDuration.get(), 2));
sender.sendMessage(C.DARK_PURPLE + "Other:");
sender.sendMessage(C.DARK_PURPLE + "- Iris Worlds: " + C.LIGHT_PURPLE + worlds.size());
sender.sendMessage(C.DARK_PURPLE + "- Iris Worlds: " + C.LIGHT_PURPLE + totalWorlds.get());
sender.sendMessage(C.DARK_PURPLE + "- Loaded Chunks: " + C.LIGHT_PURPLE + loadedChunks.get());
sender.sendMessage(C.DARK_PURPLE + "- Cache Size: " + C.LIGHT_PURPLE + Form.f(IrisData.cacheSize()));
sender.sendMessage(C.DARK_PURPLE + "-------------------------");
@ -80,15 +85,24 @@ public class IrisEngineSVC implements IrisService {
@EventHandler
public void onWorldUnload(WorldUnloadEvent event) {
worlds.removeIf(p -> p.getA() == event.getWorld());
remove(event.getWorld());
}
@EventHandler
public void onWorldLoad(WorldLoadEvent event) {
var world = event.getWorld();
add(event.getWorld());
}
private void remove(World world) {
var entry = worlds.remove(world);
if (entry == null) return;
entry.close();
}
private void add(World world) {
var access = IrisToolbelt.access(world);
if (access == null) return;
worlds.add(new Pair<>(world, access));
worlds.put(world, new Registered(world.getName(), access));
}
private synchronized void setup() {
@ -102,16 +116,26 @@ public class IrisEngineSVC implements IrisService {
queuedTectonicPlates.set(0);
tectonicPlates.set(0);
loadedChunks.set(0);
unloaderAlive.set(0);
trimmerAlive.set(0);
totalWorlds.set(0);
double maxDuration = Long.MIN_VALUE;
double minDuration = Long.MAX_VALUE;
for (var pair : worlds) {
var engine = pair.getB().getEngine();
for (var entry : worlds.entrySet()) {
var registered = entry.getValue();
if (registered.closed) continue;
totalWorlds.incrementAndGet();
unloaderAlive.addAndGet(registered.unloaderAlive() ? 1 : 0);
trimmerAlive.addAndGet(registered.trimmerAlive() ? 1 : 0);
var engine = registered.getEngine();
if (engine == null) continue;
queuedTectonicPlates.addAndGet((int) engine.getMantle().getUnloadRegionCount());
tectonicPlates.addAndGet(engine.getMantle().getLoadedRegionCount());
loadedChunks.addAndGet(pair.getA().getLoadedChunks().length);
loadedChunks.addAndGet(entry.getKey().getLoadedChunks().length);
double duration = engine.getMantle().getAdjustedIdleDuration();
if (duration > maxDuration) maxDuration = duration;
@ -120,25 +144,7 @@ public class IrisEngineSVC implements IrisService {
maxIdleDuration.set(maxDuration);
minIdleDuration.set(minDuration);
if (!trimTicker.isAlive()) {
Iris.error("TrimTicker found dead! Booting it up!");
try {
trimLogic();
} catch (Exception e) {
Iris.error("What happened?");
e.printStackTrace();
}
}
if (!unloadTicker.isAlive()) {
Iris.error("UnloadTicker found dead! Booting it up!");
try {
unloadLogic();
} catch (Exception e) {
Iris.error("What happened?");
e.printStackTrace();
}
}
worlds.values().forEach(Registered::update);
} catch (Throwable e) {
e.printStackTrace();
}
@ -148,98 +154,89 @@ public class IrisEngineSVC implements IrisService {
updateTicker.start();
}
private synchronized void trimLogic() {
if (trimTicker != null && trimTicker.isAlive())
return;
private final class Registered {
private final String name;
private final PlatformChunkGenerator access;
private transient ScheduledFuture<?> trimmer;
private transient ScheduledFuture<?> unloader;
private transient boolean closed;
trimTicker = new Looper() {
private final Supplier<Engine> supplier = createSupplier();
private Registered(String name, PlatformChunkGenerator access) {
this.name = name;
this.access = access;
update();
}
@Override
protected long loop() {
long start = System.currentTimeMillis();
try {
Engine engine = supplier.get();
if (engine != null) {
private boolean unloaderAlive() {
return unloader != null && !unloader.isDone() && !unloader.isCancelled();
}
private boolean trimmerAlive() {
return trimmer != null && !trimmer.isDone() && !trimmer.isCancelled();
}
@Synchronized
private void update() {
if (closed || service == null || service.isShutdown())
return;
if (trimmer == null || trimmer.isDone() || trimmer.isCancelled()) {
trimmer = service.scheduleAtFixedRate(() -> {
Engine engine = getEngine();
if (engine == null || !engine.getMantle().getMantle().shouldReduce(engine))
return;
try {
engine.getMantle().trim(tectonicLimit.get() / worlds.size());
} catch (Throwable e) {
Iris.reportError(e);
Iris.error("EngineSVC: Failed to trim for " + name);
e.printStackTrace();
}
} catch (Throwable e) {
Iris.reportError(e);
Iris.error("EngineSVC: Failed to trim.");
e.printStackTrace();
return -1;
}
int size = worlds.size();
long time = (size > 0 ? 1000 / size : 1000) - (System.currentTimeMillis() - start);
if (time <= 0)
return 0;
return time;
}, RNG.r.nextInt(1000), 1000, TimeUnit.MILLISECONDS);
}
};
trimTicker.start();
}
private synchronized void unloadLogic() {
if (unloadTicker != null && unloadTicker.isAlive())
return;
if (unloader == null || unloader.isDone() || unloader.isCancelled()) {
unloader = service.scheduleAtFixedRate(() -> {
Engine engine = getEngine();
if (engine == null || !engine.getMantle().getMantle().shouldReduce(engine))
return;
unloadTicker = new Looper() {
private final Supplier<Engine> supplier = createSupplier();
@Override
protected long loop() {
long start = System.currentTimeMillis();
try {
Engine engine = supplier.get();
if (engine != null) {
try {
long unloadStart = System.currentTimeMillis();
int count = engine.getMantle().unloadTectonicPlate(tectonicLimit.get() / worlds.size());
if (count > 0) {
Iris.debug(C.GOLD + "Unloaded " + C.YELLOW + count + " TectonicPlates in " + C.RED + Form.duration(System.currentTimeMillis() - unloadStart, 2));
}
} catch (Throwable e) {
Iris.reportError(e);
Iris.error("EngineSVC: Failed to unload for " + name);
e.printStackTrace();
}
} catch (Throwable e) {
Iris.reportError(e);
Iris.error("EngineSVC: Failed to unload.");
e.printStackTrace();
return -1;
}
int size = worlds.size();
long time = (size > 0 ? 1000 / size : 1000) - (System.currentTimeMillis() - start);
if (time <= 0)
return 0;
return time;
}, RNG.r.nextInt(1000), 1000, TimeUnit.MILLISECONDS);
}
};
unloadTicker.start();
}
}
private Supplier<Engine> createSupplier() {
AtomicInteger i = new AtomicInteger();
return () -> {
if (i.get() >= worlds.size()) {
i.set(0);
}
try {
for (int j = 0; j < worlds.size(); j++) {
var pair = worlds.get(i.getAndIncrement());
if (i.get() >= worlds.size()) {
i.set(0);
}
@Synchronized
private void close() {
if (closed) return;
closed = true;
var engine = pair.getB().getEngine();
if (engine != null && !engine.isClosed() && engine.getMantle().getMantle().shouldReduce(engine)) {
return engine;
}
}
} catch (Throwable e) {
Iris.error("EngineSVC: Failed to create supplier.");
e.printStackTrace();
Iris.reportError(e);
if (trimmer != null) {
trimmer.cancel(false);
trimmer = null;
}
return null;
};
if (unloader != null) {
unloader.cancel(false);
unloader = null;
}
}
@Nullable
private Engine getEngine() {
if (closed) return null;
return access.getEngine();
}
}
}