diff --git a/build.gradle b/build.gradle index c49d5bb4d..2f0920924 100644 --- a/build.gradle +++ b/build.gradle @@ -65,6 +65,7 @@ def JVM_VERSION = Map.of( "v1_21_R1", 21, "v1_20_R4", 21, ) +def entryPoint = 'com.volmit.iris.server.EntryPoint' NMS_BINDINGS.each { nms -> project(":nms:${nms.key}") { apply plugin: 'java' @@ -95,6 +96,10 @@ shadowJar { relocate 'io.papermc.lib', 'com.volmit.iris.util.paper' relocate 'net.kyori', 'com.volmit.iris.util.kyori' archiveFileName.set("Iris-${project.version}.jar") + + manifest { + attributes 'Main-Class': entryPoint + } } dependencies { @@ -156,6 +161,7 @@ allprojects { compileOnly 'net.bytebuddy:byte-buddy-agent:1.12.8' compileOnly 'org.bytedeco:javacpp:1.5.10' compileOnly 'org.bytedeco:cuda-platform:12.3-8.9-1.5.10' + compileOnly 'io.netty:netty-all:4.1.112.Final' } /** diff --git a/core/src/main/java/com/volmit/iris/Iris.java b/core/src/main/java/com/volmit/iris/Iris.java index ce32385aa..a4215738a 100644 --- a/core/src/main/java/com/volmit/iris/Iris.java +++ b/core/src/main/java/com/volmit/iris/Iris.java @@ -41,6 +41,8 @@ import com.volmit.iris.engine.object.IrisDimension; import com.volmit.iris.engine.object.IrisWorld; import com.volmit.iris.engine.platform.BukkitChunkGenerator; import com.volmit.iris.engine.platform.DummyChunkGenerator; +import com.volmit.iris.server.master.IrisMasterServer; +import com.volmit.iris.server.node.IrisServer; import com.volmit.iris.util.collection.KList; import com.volmit.iris.util.collection.KMap; import com.volmit.iris.util.exceptions.IrisException; @@ -91,10 +93,7 @@ import org.jetbrains.annotations.Nullable; import java.io.*; import java.lang.annotation.Annotation; import java.net.URL; -import java.util.Collection; -import java.util.Date; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -112,6 +111,7 @@ public class Iris extends VolmitPlugin implements Listener { public static MythicMobsLink linkMythicMobs; public static IrisCompat compat; public static FileWatcher configWatcher; + private static IrisServer server; private static VolmitSender sender; static { @@ -475,6 +475,29 @@ public class Iris extends VolmitPlugin implements Listener { services.values().forEach(this::registerListener); ServerConfigurator.setupDataPack(); installMainDimension(); + try { + info("Starting server..."); + try { + int port = Integer.parseInt(System.getProperty("com.volmit.iris.server.port")); + String[] remote = Optional.ofNullable(System.getProperty("com.volmit.iris.server.remote")) + .map(String::trim) + .map(s -> s.isBlank() ? null : s.split(",")) + .orElse(new String[0]); + server = remote.length > 0 ? new IrisMasterServer(port, remote) : new IrisServer(port); + } catch (NullPointerException | NumberFormatException ignored) { + var serverSettings = IrisSettings.get().getServer(); + if (serverSettings.isActive()) { + server = serverSettings.isRemote() ? + new IrisMasterServer(serverSettings.getPort(), serverSettings.remote) : + new IrisServer(serverSettings.getPort()); + } + } + } catch (InterruptedException ignored) { + } catch (Throwable e) { + error("Failed to start server: " + e.getClass().getSimpleName()); + e.printStackTrace(); + } + if (!IrisSafeguard.instance.acceptUnstable && IrisSafeguard.instance.unstablemode) { Iris.info(C.RED + "World loading has been disabled until the incompatibility is resolved."); Iris.info(C.DARK_RED + "Alternatively, go to plugins/iris/settings.json and set ignoreBootMode to true."); diff --git a/core/src/main/java/com/volmit/iris/core/IrisSettings.java b/core/src/main/java/com/volmit/iris/core/IrisSettings.java index c17c8f0a2..f3cd720dd 100644 --- a/core/src/main/java/com/volmit/iris/core/IrisSettings.java +++ b/core/src/main/java/com/volmit/iris/core/IrisSettings.java @@ -44,6 +44,7 @@ public class IrisSettings { private IrisSettingsPerformance performance = new IrisSettingsPerformance(); private IrisWorldDump worldDump = new IrisWorldDump(); private IrisWorldSettings irisWorldSettings = new IrisWorldSettings(); + private IrisServerSettings server = new IrisServerSettings(); public static int getThreadCount(int c) { return switch (c) { @@ -202,6 +203,17 @@ public class IrisSettings { public int mcaCacheSize = 3; } + @Data + public static class IrisServerSettings { + public boolean active = false; + public int port = 1337; + public String[] remote = new String[0]; + + public boolean isRemote() { + return remote.length != 0; + } + } + // todo: Goal:Have these as the default world settings and when put in bukkit.yml it will again overwrite that world from these. @Data public static class IrisWorldSettings { diff --git a/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java b/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java index 20296b8b3..7207af562 100644 --- a/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java +++ b/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java @@ -102,8 +102,10 @@ public class CommandDeveloper implements DecreeExecutor { @Decree(description = "Test") public void packBenchmark( - @Param(description = "The pack to bench", aliases = {"pack"}) + @Param(description = "The pack to bench", defaultValue = "overworld", aliases = {"pack"}) IrisDimension dimension, + @Param(description = "The address to use", defaultValue = "-") + String address, @Param(description = "Headless", defaultValue = "true") boolean headless, @Param(description = "GUI", defaultValue = "false") @@ -113,7 +115,7 @@ public class CommandDeveloper implements DecreeExecutor { ) { int rb = diameter << 9; Iris.info("Benchmarking pack " + dimension.getName() + " with diameter: " + rb + "(" + diameter + ")"); - IrisPackBenchmarking benchmark = new IrisPackBenchmarking(dimension, diameter, headless, gui); + IrisPackBenchmarking benchmark = new IrisPackBenchmarking(dimension, address.replace("-", "").trim(), diameter, headless, gui); benchmark.runBenchmark(); } diff --git a/core/src/main/java/com/volmit/iris/core/nms/IHeadless.java b/core/src/main/java/com/volmit/iris/core/nms/IHeadless.java index 90d56c0de..1c27b5a36 100644 --- a/core/src/main/java/com/volmit/iris/core/nms/IHeadless.java +++ b/core/src/main/java/com/volmit/iris/core/nms/IHeadless.java @@ -19,22 +19,30 @@ package com.volmit.iris.core.nms; import com.volmit.iris.core.pregenerator.PregenListener; +import com.volmit.iris.server.node.IrisSession; +import com.volmit.iris.server.packet.work.ChunkPacket; import com.volmit.iris.util.documentation.ChunkCoordinates; import com.volmit.iris.util.documentation.RegionCoordinates; import com.volmit.iris.util.parallel.MultiBurst; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; public interface IHeadless extends Closeable { + void setSession(IrisSession session); + int getLoadedChunks(); @ChunkCoordinates boolean exists(int x, int z); @RegionCoordinates - void generateRegion(MultiBurst burst, int x, int z, PregenListener listener); + CompletableFuture generateRegion(MultiBurst burst, int x, int z, int maxConcurrent, PregenListener listener); @ChunkCoordinates void generateChunk(int x, int z); + + @ChunkCoordinates + void addChunk(ChunkPacket packet); } diff --git a/core/src/main/java/com/volmit/iris/core/pregenerator/PregenTask.java b/core/src/main/java/com/volmit/iris/core/pregenerator/PregenTask.java index 08307b28c..eea5ed08b 100644 --- a/core/src/main/java/com/volmit/iris/core/pregenerator/PregenTask.java +++ b/core/src/main/java/com/volmit/iris/core/pregenerator/PregenTask.java @@ -23,6 +23,8 @@ import com.volmit.iris.util.collection.KMap; import com.volmit.iris.util.math.Position2; import com.volmit.iris.util.math.Spiraled; import com.volmit.iris.util.math.Spiraler; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -30,6 +32,7 @@ import java.util.Comparator; @Builder @Data +@AllArgsConstructor(access = AccessLevel.PROTECTED) public class PregenTask { private static final Position2 ZERO = new Position2(0, 0); private static final KList ORDER_CENTER = computeChunkOrder(); diff --git a/core/src/main/java/com/volmit/iris/core/pregenerator/methods/HeadlessPregenMethod.java b/core/src/main/java/com/volmit/iris/core/pregenerator/methods/HeadlessPregenMethod.java index d4ef57f3f..7e42de852 100644 --- a/core/src/main/java/com/volmit/iris/core/pregenerator/methods/HeadlessPregenMethod.java +++ b/core/src/main/java/com/volmit/iris/core/pregenerator/methods/HeadlessPregenMethod.java @@ -65,6 +65,7 @@ public class HeadlessPregenMethod implements PregeneratorMethod { Iris.error("Failed to close headless"); e.printStackTrace(); } + burst.close(); } @Override diff --git a/core/src/main/java/com/volmit/iris/core/tools/IrisPackBenchmarking.java b/core/src/main/java/com/volmit/iris/core/tools/IrisPackBenchmarking.java index c8495a338..3cc9a2944 100644 --- a/core/src/main/java/com/volmit/iris/core/tools/IrisPackBenchmarking.java +++ b/core/src/main/java/com/volmit/iris/core/tools/IrisPackBenchmarking.java @@ -31,6 +31,8 @@ import com.volmit.iris.engine.framework.Engine; import com.volmit.iris.engine.framework.EngineTarget; import com.volmit.iris.engine.object.IrisDimension; import com.volmit.iris.engine.object.IrisWorld; +import com.volmit.iris.server.pregen.CloudMethod; +import com.volmit.iris.server.pregen.CloudTask; import com.volmit.iris.util.collection.KList; import com.volmit.iris.util.collection.KMap; import com.volmit.iris.util.exceptions.IrisException; @@ -65,10 +67,12 @@ public class IrisPackBenchmarking { private int radius; private boolean finished = false; private Engine engine; + private String address; - public IrisPackBenchmarking(IrisDimension dimension, int r, boolean headless, boolean gui) { + public IrisPackBenchmarking(IrisDimension dimension, String address, int r, boolean headless, boolean gui) { instance = this; this.IrisDimension = dimension; + this.address = address; this.radius = r; this.headless = headless; this.gui = gui; @@ -90,7 +94,13 @@ public class IrisPackBenchmarking { } Iris.info("Starting Benchmark!"); stopwatch.begin(); - startBenchmark(); + try { + if (address != null && !address.isBlank()) + startCloudBenchmark(); + else startBenchmark(); + } catch (Throwable e) { + e.printStackTrace(); + } }, "PackBenchmarking").start(); } @@ -197,6 +207,19 @@ public class IrisPackBenchmarking { IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getParallelism())), engine); } + private void startCloudBenchmark() throws InterruptedException { + int x = 0; + int z = 0; + IrisToolbelt.pregenerate(CloudTask + .couldBuilder() + .gui(gui) + .center(new Position2(x, z)) + .width(radius) + .height(radius) + .distance(engine.getMantle().getRadius() * 2) + .build(), new CloudMethod(address, engine), engine); + } + private double calculateAverage(KList list) { double sum = 0; for (int num : list) { diff --git a/core/src/main/java/com/volmit/iris/server/EntryPoint.java b/core/src/main/java/com/volmit/iris/server/EntryPoint.java new file mode 100644 index 000000000..13ab1c90f --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/EntryPoint.java @@ -0,0 +1,116 @@ +/* + * Iris is a World Generator for Minecraft Bukkit Servers + * Copyright (c) 2024 Arcane Arts (Volmit Software) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.volmit.iris.server; + +import lombok.extern.java.Log; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; + +@Log(topic = "Iris-Server") +public class EntryPoint { + + public static void main(String[] args) throws Throwable { + if (args.length < 4) { + log.info("Usage: java -jar Iris.jar [nodes]"); + System.exit(-1); + return; + } + + String[] nodes = new String[args.length - 4]; + System.arraycopy(args, 4, nodes, 0, nodes.length); + try { + runServer(args[0], Integer.parseInt(args[1]), Integer.parseInt(args[2]), Integer.parseInt(args[3]), nodes); + } catch (Throwable e) { + log.log(Level.SEVERE, "Failed to start server", e); + System.exit(-1); + } + } + + private static void runServer(String version, int minMemory, int maxMemory, int serverPort, String[] nodes) throws IOException { + File serverJar = new File("cache", "spigot-"+version+".jar"); + if (!serverJar.getParentFile().exists() && !serverJar.getParentFile().mkdirs()) { + log.severe("Failed to create cache directory"); + System.exit(-1); + return; + } + + + if (!serverJar.exists()) { + try (var in = new URL("https://download.getbukkit.org/spigot/spigot-"+ version+".jar").openStream()) { + Files.copy(in, serverJar.toPath()); + } + log.info("Downloaded spigot-"+version+".jar to "+serverJar.getAbsolutePath()); + } + + File pluginFile = new File("plugins/Iris.jar"); + if (pluginFile.exists()) pluginFile.delete(); + if (!pluginFile.getParentFile().exists() && !pluginFile.getParentFile().mkdirs()) { + log.severe("Failed to create plugins directory"); + System.exit(-1); + return; + } + + boolean windows = System.getProperty("os.name").toLowerCase().contains("win"); + String path = System.getProperty("java.home") + File.separator + "bin" + File.separator + (windows ? "java.exe" : "java"); + + try { + File irisFile = new File(EntryPoint.class.getProtectionDomain().getCodeSource().getLocation().toURI()); + if (!irisFile.isFile()) { + log.severe("Failed to locate the Iris plugin jar"); + System.exit(-1); + return; + } + Files.createSymbolicLink(pluginFile.toPath(), irisFile.toPath()); + } catch (URISyntaxException ignored) {} + + List cmd = new ArrayList<>(List.of( + path, + "-Xms" + minMemory + "M", + "-Xmx" + maxMemory + "M", + "-XX:+AlwaysPreTouch", + "-XX:+HeapDumpOnOutOfMemoryError", + "-Ddisable.watchdog=true", + "-Dcom.mojang.eula.agree=true", + "-Dcom.volmit.iris.server.port="+serverPort + )); + if (nodes.length > 0) + cmd.add("-Dcom.volmit.iris.server.remote=" + String.join(",", nodes)); + cmd.addAll(List.of("-jar", serverJar.getAbsolutePath(), "nogui")); + + var process = new ProcessBuilder(cmd) + .inheritIO() + .start(); + Runtime.getRuntime().addShutdownHook(new Thread(process::destroy)); + + + while (true) { + try { + process.waitFor(); + break; + } catch (InterruptedException ignored) {} + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/IrisConnection.java b/core/src/main/java/com/volmit/iris/server/IrisConnection.java new file mode 100644 index 000000000..68189936b --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/IrisConnection.java @@ -0,0 +1,201 @@ +package com.volmit.iris.server; + +import com.volmit.iris.server.execption.RejectedException; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.handle.Decoder; +import com.volmit.iris.server.packet.handle.Encoder; +import com.volmit.iris.server.packet.handle.Prepender; +import com.volmit.iris.server.packet.handle.Splitter; +import com.volmit.iris.server.util.ErrorPacket; +import com.volmit.iris.server.util.ConnectionHolder; +import com.volmit.iris.server.util.PacketListener; +import com.volmit.iris.server.util.PacketSendListener; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.TimeoutException; +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import java.util.logging.Level; + +@RequiredArgsConstructor +@Log(topic = "IrisConnection") +public class IrisConnection extends SimpleChannelInboundHandler { + private static EventLoopGroup WORKER; + + private Channel channel; + private SocketAddress address; + private final PacketListener listener; + private final Queue queue = new ConcurrentLinkedQueue<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception { + if (!channel.isOpen() || listener == null || !listener.isAccepting()) return; + + try { + listener.onPacket(packet); + } catch (RejectedException e) { + send(new ErrorPacket("Rejected: " + e.getMessage())); + } + } + + public void send(Packet packet) { + this.send(packet, null); + } + + public void send(Packet packet, @Nullable PacketSendListener listener) { + if (!isConnected()) { + queue.add(new PacketHolder(packet, listener)); + return; + } + + flushQueue(); + sendPacket(packet, listener); + } + + public boolean isConnected() { + return channel != null && channel.isOpen(); + } + + public void disconnect() { + try { + if (channel != null && channel.isOpen()) { + log.info("Closed on " + address); + channel.close(); + } + if (listener != null) + listener.onDisconnect(); + } catch (Throwable e) { + log.log(Level.SEVERE, "Failed to close on " + address, e); + } + } + + public void execute(Runnable runnable) { + if (channel == null || !channel.isOpen()) return; + channel.eventLoop().execute(runnable); + } + + private void flushQueue() { + if (this.channel != null && this.channel.isOpen()) { + synchronized(this.queue) { + PacketHolder packetHolder; + while((packetHolder = this.queue.poll()) != null) { + sendPacket(packetHolder.packet, packetHolder.listener); + } + } + } + } + + private void sendPacket(Packet packet, @Nullable PacketSendListener listener) { + if (!channel.eventLoop().inEventLoop()) { + channel.eventLoop().execute(() -> sendPacket(packet, listener)); + return; + } + + ChannelFuture channelFuture = channel.writeAndFlush(packet); + + if (listener != null) { + channelFuture.addListener(future -> { + if (future.isSuccess()) { + listener.onSuccess(); + } else { + Packet fallback = listener.onFailure(); + if (fallback == null) return; + channel.writeAndFlush(fallback) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } + }); + } + channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + + channel = ctx.channel(); + address = channel.remoteAddress(); + log.info("Opened on " + channel.remoteAddress()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + disconnect(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!channel.isOpen()) return; + ErrorPacket error; + if (cause instanceof TimeoutException) { + error = new ErrorPacket("Timed out"); + } else { + error = new ErrorPacket("Internal Exception: " + cause.getMessage()); + log.log(Level.SEVERE, "Failed to send packet", cause); + } + + sendPacket(error, PacketSendListener.thenRun(this::disconnect)); + channel.config().setAutoRead(false); + } + + @Override + public String toString() { + return "IrisConnection{address=%s}".formatted(address); + } + + public static void configureSerialization(Channel channel, ConnectionHolder holder) { + channel.pipeline() + .addLast("timeout", new ReadTimeoutHandler(30)) + .addLast("splitter", new Splitter()) + .addLast("decoder", new Decoder()) + .addLast("prepender", new Prepender()) + .addLast("encoder", new Encoder()) + .addLast("packet_handler", holder.getConnection()); + } + + public static T connect(InetSocketAddress address, Supplier factory) throws InterruptedException { + var holder = factory.get(); + new Bootstrap() + .group(getWorker()) + .handler(new ChannelInitializer<>() { + @Override + protected void initChannel(Channel channel) { + channel.config().setOption(ChannelOption.TCP_NODELAY, true); + IrisConnection.configureSerialization(channel, holder); + } + }) + .channel(NioSocketChannel.class) + .connect(address) + .sync(); + return holder; + } + + private static class PacketHolder { + private final Packet packet; + @Nullable + private final PacketSendListener listener; + + public PacketHolder(Packet packet, @Nullable PacketSendListener listener) { + this.packet = packet; + this.listener = listener; + } + } + + private static EventLoopGroup getWorker() { + if (WORKER == null) { + WORKER = new NioEventLoopGroup(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> WORKER.shutdownGracefully())); + } + return WORKER; + } +} diff --git a/core/src/main/java/com/volmit/iris/server/execption/RejectedException.java b/core/src/main/java/com/volmit/iris/server/execption/RejectedException.java new file mode 100644 index 000000000..e4f1579bf --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/execption/RejectedException.java @@ -0,0 +1,8 @@ +package com.volmit.iris.server.execption; + +public class RejectedException extends Exception { + + public RejectedException(String message) { + super(message); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java new file mode 100644 index 000000000..da65a1aec --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java @@ -0,0 +1,35 @@ +package com.volmit.iris.server.master; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.util.ConnectionHolder; +import com.volmit.iris.server.util.PacketListener; +import com.volmit.iris.server.util.PacketSendListener; +import lombok.Getter; +import org.jetbrains.annotations.Nullable; + +public class IrisMasterClient implements ConnectionHolder, PacketListener { + private final @Getter IrisConnection connection = new IrisConnection(this); + private final IrisMasterSession session; + + IrisMasterClient(IrisMasterSession session) { + this.session = session; + } + + protected void send(Packet packet) { + connection.send(packet); + } + + protected void send(Packet packet, @Nullable PacketSendListener listener) { + connection.send(packet, listener); + } + + @Override + public void onPacket(Packet packet) throws Exception { + session.onClientPacket(packet); + } + + public void disconnect() { + connection.disconnect(); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java new file mode 100644 index 000000000..bd1b6b130 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java @@ -0,0 +1,107 @@ +package com.volmit.iris.server.master; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.node.IrisServer; +import com.volmit.iris.server.util.PregenHolder; +import com.volmit.iris.util.collection.KMap; +import lombok.extern.java.Log; + +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Log(topic = "Iris-MasterServer") +public class IrisMasterServer extends IrisServer { + private static final Object VALUE = new Object(); + private static IrisMasterServer instance; + private final KMap>> sessions = new KMap<>(); + private final KMap nodes = new KMap<>(); + + public IrisMasterServer(int port, String[] remote) throws InterruptedException { + super("Iris-MasterServer", port, IrisMasterSession::new); + if (instance != null && !instance.isRunning()) + close("Server already running"); + instance = this; + + for (var address : remote) { + try { + var split = address.split(":"); + if (split.length != 2 || !split[1].matches("\\d+")) { + log.warning("Invalid remote server address: " + address); + continue; + } + + addNode(new InetSocketAddress(split[0], Integer.parseInt(split[1]))); + } catch (Throwable e) { + log.log(Level.WARNING, "Failed to parse address: " + address, e); + } + } + } + + public void addNode(InetSocketAddress address) { + nodes.put(address, VALUE); + } + + public void removeNode(InetSocketAddress address) { + nodes.remove(address); + } + + public static void close(UUID session) { + var map = get().sessions.remove(session); + if (map == null) return; + map.keySet().forEach(IrisMasterClient::disconnect); + map.clear(); + } + + public static KMap> getNodes(IrisMasterSession session) { + var master = get(); + var uuid = session.getUuid(); + close(uuid); + + master.getLogger().info("Requesting nodes for session " + uuid); + var map = new KMap>(); + for (var address : master.nodes.keySet()) { + try { + map.put(IrisConnection.connect(address, () -> new IrisMasterClient(session)), new KMap<>()); + } catch (Exception e) { + master.getLogger().log(Level.WARNING, "Failed to connect to server " + address, e); + master.removeNode(address); + } + } + + master.sessions.put(uuid, map); + return map; + } + + @Override + public void close() throws Exception { + log.info("Closing!"); + super.close(); + sessions.values() + .stream() + .map(KMap::keySet) + .flatMap(Set::stream) + .forEach(IrisMasterClient::disconnect); + sessions.clear(); + } + + @Override + protected Logger getLogger() { + return log; + } + + private void close(String message) throws IllegalStateException { + try { + close(); + } catch (Exception ignored) {} + throw new IllegalStateException(message); + } + + public static IrisMasterServer get() { + if (instance == null) + throw new IllegalStateException("IrisMasterServer not running"); + return instance; + } +} diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java new file mode 100644 index 000000000..feb81106d --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java @@ -0,0 +1,98 @@ +package com.volmit.iris.server.master; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.execption.RejectedException; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.server.packet.init.EnginePacket; +import com.volmit.iris.server.packet.init.FilePacket; +import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.work.ChunkPacket; +import com.volmit.iris.server.packet.work.MantleChunkPacket; +import com.volmit.iris.server.packet.work.PregenPacket; +import com.volmit.iris.server.util.*; +import com.volmit.iris.util.collection.KMap; +import lombok.Getter; +import lombok.extern.java.Log; + +import java.util.Comparator; +import java.util.UUID; +import java.util.logging.Level; + +//TODO handle done packet +@Log(topic = "IrisMasterSession") +public class IrisMasterSession implements ConnectionHolder, PacketListener { + private final @Getter IrisConnection connection = new IrisConnection(this); + private final @Getter UUID uuid = UUID.randomUUID(); + private final KMap map = new KMap<>(); + private final CPSLooper cpsLooper = new CPSLooper("IrisMasterSession-" + uuid, connection); + private KMap> clients; + private int radius = -1; + + @Override + public void onPacket(Packet raw) throws Exception { + if (clients == null) { + clients = IrisMasterServer.getNodes(this); + cpsLooper.setNodeCount(clients.size()); + + var info = Packets.INFO.newPacket(); + info.setNodeCount(clients.size()); + connection.send(info); + } + + if (raw instanceof FilePacket packet) { + if (radius != -1) + throw new RejectedException("Engine already setup"); + clients.keySet().forEach(client -> client.send(packet)); + } else if (raw instanceof EnginePacket packet) { + if (radius != -1) + throw new RejectedException("Engine already setup"); + radius = packet.getRadius(); + clients.keySet().forEach(client -> client.send(packet)); + } else if (raw instanceof PregenPacket packet) { + if (radius == -1) + throw new RejectedException("Engine not setup"); + var client = pick(); + map.put(packet.getId(), client); + new PregenHolder(packet, radius, true, null) + .put(clients.get(client)); + + client.send(packet); + } else if (raw instanceof MantleChunkPacket packet) { + var client = map.get(packet.getPregenId()); + client.send(packet); + } + } + + protected void onClientPacket(Packet raw) { + if (raw instanceof ErrorPacket packet) { + packet.log(log, Level.SEVERE); + } else if (raw instanceof ChunkPacket) { + connection.send(raw); + } else if (raw instanceof MantleChunkPacket packet) { + var map = clients.get(this.map.get(packet.getPregenId())); + if (map.get(packet.getPregenId()) + .remove(packet.getX(), packet.getZ())) + map.get(packet.getPregenId()); + + connection.send(packet); + } else if (raw instanceof MantleChunkPacket.Request packet) { + connection.send(packet); + } else if (raw instanceof InfoPacket packet) { + int i = packet.getGenerated(); + if (i != -1) cpsLooper.addChunks(i); + } + } + + @Override + public void onDisconnect() { + IrisMasterServer.close(uuid); + } + + private IrisMasterClient pick() throws RejectedException { + return clients.keySet() + .stream() + .min(Comparator.comparingInt(c -> clients.get(c).size())) + .orElseThrow(() -> new RejectedException("No clients available")); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/node/IrisServer.java b/core/src/main/java/com/volmit/iris/server/node/IrisServer.java new file mode 100644 index 000000000..fd4f647ec --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/node/IrisServer.java @@ -0,0 +1,68 @@ +package com.volmit.iris.server.node; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.util.ConnectionHolder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; + +import java.util.function.Supplier; +import java.util.logging.Logger; + +@Log(topic = "Iris-Server") +public class IrisServer implements AutoCloseable { + private final NioEventLoopGroup bossGroup, workerGroup; + private final Channel channel; + private @Getter boolean running = true; + + public IrisServer(int port) throws InterruptedException { + this("Iris-Server", port, IrisSession::new); + } + + protected IrisServer(String name, int port, Supplier factory) throws InterruptedException { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(name, LogLevel.DEBUG)) + .childHandler(new Initializer(factory)); + + channel = bootstrap.bind(port).sync().channel(); + + getLogger().info("Started on port " + port); + } + + @Override + public void close() throws Exception { + if (!running) return; + running = false; + channel.close().sync(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + + getLogger().info("Stopped"); + } + + protected Logger getLogger() { + return log; + } + + @RequiredArgsConstructor + private static class Initializer extends ChannelInitializer { + private final Supplier factory; + + @Override + protected void initChannel(Channel ch) { + IrisConnection.configureSerialization(ch, factory.get()); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/node/IrisSession.java b/core/src/main/java/com/volmit/iris/server/node/IrisSession.java new file mode 100644 index 000000000..960326945 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/node/IrisSession.java @@ -0,0 +1,110 @@ +package com.volmit.iris.server.node; + +import com.volmit.iris.core.nms.IHeadless; +import com.volmit.iris.core.nms.INMS; +import com.volmit.iris.engine.data.cache.Cache; +import com.volmit.iris.engine.framework.Engine; +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.execption.RejectedException; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.server.packet.init.EnginePacket; +import com.volmit.iris.server.packet.init.FilePacket; +import com.volmit.iris.server.util.CPSLooper; +import com.volmit.iris.server.util.ConnectionHolder; +import com.volmit.iris.server.util.PacketListener; +import com.volmit.iris.server.packet.work.ChunkPacket; +import com.volmit.iris.server.packet.work.MantleChunkPacket; +import com.volmit.iris.server.packet.work.PregenPacket; +import com.volmit.iris.server.util.PregenHolder; +import com.volmit.iris.util.collection.KMap; +import com.volmit.iris.util.io.IO; +import com.volmit.iris.util.parallel.MultiBurst; +import lombok.Getter; + +import java.io.File; +import java.util.UUID; + +public class IrisSession implements ConnectionHolder, PacketListener { + private static final MultiBurst burst = new MultiBurst("IrisHeadless", 8); + private final @Getter IrisConnection connection = new IrisConnection(this); + private final File base = new File("cache/" + UUID.randomUUID()); + private final KMap chunks = new KMap<>(); + private final KMap pregens = new KMap<>(); + private final CPSLooper cpsLooper = new CPSLooper("IrisSession-"+base.getName(), connection); + + private Engine engine; + private IHeadless headless; + + @Override + public void onPacket(Packet raw) throws Exception { + cpsLooper.setNodeCount(1); + if (raw instanceof FilePacket packet) { + if (engine != null) throw new RejectedException("Engine already setup"); + + packet.write(base); + Packets.DONE.newPacket() + .setId(packet.getId()) + .send(connection); + } else if (raw instanceof EnginePacket packet) { + if (engine != null) throw new RejectedException("Engine already setup"); + engine = packet.getEngine(base); + headless = INMS.get().createHeadless(engine); + headless.setSession(this); + + Packets.DONE.newPacket() + .setId(packet.getId()) + .send(connection); + } else if (raw instanceof MantleChunkPacket packet) { + if (engine == null) throw new RejectedException("Engine not setup"); + packet.set(engine.getMantle().getMantle()); + + var holder = chunks.get(packet.getPregenId()); + if (holder.remove(packet.getX(), packet.getZ())) { + headless.generateRegion(burst, holder.getX(), holder.getZ(), 20, null) + .thenRun(() -> { + holder.iterate(chunkPos -> { + var resp = Packets.MANTLE_CHUNK.newPacket(); + resp.setPregenId(holder.getId()); + resp.read(chunkPos, engine.getMantle().getMantle()); + connection.send(resp); + }); + chunks.remove(holder.getId()); + }); + } + } else if (raw instanceof PregenPacket packet) { + if (engine == null) throw new RejectedException("Engine not setup"); + var radius = engine.getMantle().getRadius(); + + var holder = new PregenHolder(packet, radius, true, null); + var request = Packets.MANTLE_CHUNK_REQUEST.newPacket() + .setPregenId(packet.getId()); + holder.iterate(request::add); + var pregen = new PregenHolder(packet, 0, true, null); + pregens.put(Cache.key(pregen.getX(), pregen.getZ()), pregen); + + chunks.put(packet.getId(), holder); + connection.send(request); + } else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName()); + } + + public void completeChunk(int x, int z, byte[] data) { + cpsLooper.addChunks(1); + long id = Cache.key(x >> 5, z >> 5); + var pregen = pregens.get(id); + if (pregen.remove(x, z)) + pregens.remove(id); + connection.send(new ChunkPacket(pregen.getId(), x, z, data)); + } + + @Override + public void onDisconnect() { + if (engine != null) { + engine.close(); + engine = null; + headless = null; + } + cpsLooper.exit(); + IO.delete(base); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/Packet.java b/core/src/main/java/com/volmit/iris/server/packet/Packet.java new file mode 100644 index 000000000..103420093 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/Packet.java @@ -0,0 +1,24 @@ +package com.volmit.iris.server.packet; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.util.PacketSendListener; +import io.netty.buffer.ByteBuf; + +import java.io.IOException; + +public interface Packet { + void read(ByteBuf byteBuf) throws IOException; + void write(ByteBuf byteBuf) throws IOException; + + default Packets getType() { + return Packets.get(getClass()); + } + + default void send(IrisConnection connection) { + send(connection, null); + } + + default void send(IrisConnection connection, PacketSendListener listener) { + connection.send(this, listener); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/Packets.java b/core/src/main/java/com/volmit/iris/server/packet/Packets.java new file mode 100644 index 000000000..ed1baa858 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/Packets.java @@ -0,0 +1,91 @@ +package com.volmit.iris.server.packet; + +import com.volmit.iris.server.packet.init.EnginePacket; +import com.volmit.iris.server.packet.init.FilePacket; +import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.work.DonePacket; +import com.volmit.iris.server.util.ErrorPacket; +import com.volmit.iris.server.packet.work.ChunkPacket; +import com.volmit.iris.server.packet.work.MantleChunkPacket; +import com.volmit.iris.server.packet.work.PregenPacket; +import lombok.AccessLevel; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.function.Supplier; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class Packets { + private static final List> REGISTRY; + private static final Map, Packets> MAP; + + public static final Packets ERROR; + public static final Packets INFO; + public static final Packets FILE; + public static final Packets ENGINE; + + public static final Packets DONE; + public static final Packets PREGEN; + public static final Packets CHUNK; + public static final Packets MANTLE_CHUNK; + public static final Packets MANTLE_CHUNK_REQUEST; + + private final Class type; + private final Supplier factory; + private int id = -1; + + public int getId() { + if (id == -1) throw new IllegalStateException("Unknown packet type: " + this); + return id; + } + + public T newPacket() { + return factory.get(); + } + + @NotNull + public static Packets get(int id) { + return REGISTRY.get(id); + } + + @NotNull + public static Packet newPacket(int id) { + return get(id).newPacket(); + } + + @NotNull + public static Packets get(Class type) { + var t = MAP.get(type); + if (t == null) throw new IllegalArgumentException("Unknown packet type: " + type); + return (Packets) t; + } + + public static int getId(Class type) { + return get(type).getId(); + } + + static { + ERROR = new Packets<>(ErrorPacket.class, ErrorPacket::new); + INFO = new Packets<>(InfoPacket.class, InfoPacket::new); + FILE = new Packets<>(FilePacket.class, FilePacket::new); + ENGINE = new Packets<>(EnginePacket.class, EnginePacket::new); + + DONE = new Packets<>(DonePacket.class, DonePacket::new); + PREGEN = new Packets<>(PregenPacket.class, PregenPacket::new); + CHUNK = new Packets<>(ChunkPacket.class, ChunkPacket::new); + MANTLE_CHUNK = new Packets<>(MantleChunkPacket.class, MantleChunkPacket::new); + MANTLE_CHUNK_REQUEST = new Packets<>(MantleChunkPacket.Request.class, MantleChunkPacket.Request::new); + + REGISTRY = List.of(ERROR, INFO, FILE, ENGINE, DONE, PREGEN, CHUNK, MANTLE_CHUNK, MANTLE_CHUNK_REQUEST); + + var map = new HashMap, Packets>(); + for (int i = 0; i < REGISTRY.size(); i++) { + var entry = REGISTRY.get(i); + entry.id = i; + map.put(entry.type, entry); + } + MAP = Collections.unmodifiableMap(map); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/handle/Decoder.java b/core/src/main/java/com/volmit/iris/server/packet/handle/Decoder.java new file mode 100644 index 000000000..a3cf54c85 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/handle/Decoder.java @@ -0,0 +1,17 @@ +package com.volmit.iris.server.packet.handle; + +import com.volmit.iris.server.packet.Packets; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class Decoder extends ByteToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List list) throws Exception { + var packet = Packets.newPacket(byteBuf.readByte()); + packet.read(byteBuf); + list.add(packet); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/handle/Encoder.java b/core/src/main/java/com/volmit/iris/server/packet/handle/Encoder.java new file mode 100644 index 000000000..0ca3b11bf --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/handle/Encoder.java @@ -0,0 +1,14 @@ +package com.volmit.iris.server.packet.handle; + +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class Encoder extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf byteBuf) throws Exception { + byteBuf.writeByte(packet.getType().getId()); + packet.write(byteBuf); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/handle/Prepender.java b/core/src/main/java/com/volmit/iris/server/packet/handle/Prepender.java new file mode 100644 index 000000000..3c7af8bb1 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/handle/Prepender.java @@ -0,0 +1,14 @@ +package com.volmit.iris.server.packet.handle; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class Prepender extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + int i = in.readableBytes(); + out.writeInt(i); + out.writeBytes(in, in.readerIndex(), i); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/handle/Splitter.java b/core/src/main/java/com/volmit/iris/server/packet/handle/Splitter.java new file mode 100644 index 000000000..cdfd75582 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/handle/Splitter.java @@ -0,0 +1,33 @@ +package com.volmit.iris.server.packet.handle; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class Splitter extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List list) throws Exception { + if (!byteBuf.isReadable(4)) + return; + byteBuf.markReaderIndex(); + + byte[] bytes = new byte[4]; + byteBuf.readBytes(bytes); + var buffer = Unpooled.wrappedBuffer(bytes); + try { + int j = buffer.readInt(); + if (byteBuf.readableBytes() >= j) { + list.add(byteBuf.readBytes(j)); + return; + } + + byteBuf.resetReaderIndex(); + } finally { + buffer.release(); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/init/EnginePacket.java b/core/src/main/java/com/volmit/iris/server/packet/init/EnginePacket.java new file mode 100644 index 000000000..8a6ba80f9 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/init/EnginePacket.java @@ -0,0 +1,60 @@ +package com.volmit.iris.server.packet.init; + +import com.volmit.iris.core.loader.IrisData; +import com.volmit.iris.engine.IrisEngine; +import com.volmit.iris.engine.framework.Engine; +import com.volmit.iris.engine.framework.EngineTarget; +import com.volmit.iris.engine.object.IrisWorld; +import com.volmit.iris.server.util.ByteBufUtil; +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class EnginePacket implements Packet { + private UUID id = UUID.randomUUID(); + private String dimension; + private long seed; + private int radius; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + id = new UUID(byteBuf.readLong(), byteBuf.readLong()); + dimension = ByteBufUtil.readString(byteBuf); + seed = byteBuf.readLong(); + radius = byteBuf.readInt(); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(id.getMostSignificantBits()); + byteBuf.writeLong(id.getLeastSignificantBits()); + ByteBufUtil.writeString(byteBuf, dimension); + byteBuf.writeLong(seed); + byteBuf.writeInt(radius); + } + + public Engine getEngine(File base) { + var data = IrisData.get(new File(base, "iris/pack")); + var type = data.getDimensionLoader().load(dimension); + var world = IrisWorld.builder() + .name(base.getName()) + .seed(seed) + .worldFolder(base) + .minHeight(type.getMinHeight()) + .maxHeight(type.getMaxHeight()) + .environment(type.getEnvironment()) + .build(); + + return new IrisEngine(new EngineTarget(world, type, data), false); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/init/FilePacket.java b/core/src/main/java/com/volmit/iris/server/packet/init/FilePacket.java new file mode 100644 index 000000000..e43f5949a --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/init/FilePacket.java @@ -0,0 +1,56 @@ +package com.volmit.iris.server.packet.init; + +import com.volmit.iris.server.util.ByteBufUtil; +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.*; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class FilePacket implements Packet { + private UUID id = UUID.randomUUID(); + private String path; + private long offset; + private long length; + private byte[] data; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + id = new UUID(byteBuf.readLong(), byteBuf.readLong()); + path = ByteBufUtil.readString(byteBuf); + offset = byteBuf.readLong(); + length = byteBuf.readLong(); + data = ByteBufUtil.readBytes(byteBuf); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(id.getMostSignificantBits()); + byteBuf.writeLong(id.getLeastSignificantBits()); + ByteBufUtil.writeString(byteBuf, path); + byteBuf.writeLong(offset); + byteBuf.writeLong(length); + ByteBufUtil.writeBytes(byteBuf, data); + } + + public void write(File base) throws IOException { + File f = new File(base, path); + if (!f.getAbsolutePath().startsWith(base.getAbsolutePath())) + throw new IOException("Invalid path " + path); + if (!f.getParentFile().exists() && !f.getParentFile().mkdirs()) + throw new IOException("Failed to create directory " + f.getParentFile()); + + try (var raf = new RandomAccessFile(f, "rws")) { + if (raf.length() < length) + raf.setLength(length); + raf.seek(offset); + raf.write(data); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/init/InfoPacket.java b/core/src/main/java/com/volmit/iris/server/packet/init/InfoPacket.java new file mode 100644 index 000000000..cd688d9e0 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/init/InfoPacket.java @@ -0,0 +1,33 @@ +package com.volmit.iris.server.packet.init; + +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.IOException; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class InfoPacket implements Packet { + private int nodeCount = -1; + private int cps = -1; + private int generated = -1; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + nodeCount = byteBuf.readInt(); + cps = byteBuf.readInt(); + generated = byteBuf.readInt(); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeInt(nodeCount); + byteBuf.writeInt(cps); + byteBuf.writeInt(generated); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/work/ChunkPacket.java b/core/src/main/java/com/volmit/iris/server/packet/work/ChunkPacket.java new file mode 100644 index 000000000..fdef5d0c4 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/work/ChunkPacket.java @@ -0,0 +1,37 @@ +package com.volmit.iris.server.packet.work; + +import com.volmit.iris.server.util.ByteBufUtil; +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.IOException; +import java.util.UUID; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ChunkPacket implements Packet { + private UUID pregenId; + private int x, z; + private byte[] data; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + pregenId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + x = byteBuf.readInt(); + z = byteBuf.readInt(); + data = ByteBufUtil.readBytes(byteBuf); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(pregenId.getMostSignificantBits()); + byteBuf.writeLong(pregenId.getLeastSignificantBits()); + byteBuf.writeInt(x); + byteBuf.writeInt(z); + ByteBufUtil.writeBytes(byteBuf, data); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/work/DonePacket.java b/core/src/main/java/com/volmit/iris/server/packet/work/DonePacket.java new file mode 100644 index 000000000..f07f6fc4a --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/work/DonePacket.java @@ -0,0 +1,28 @@ +package com.volmit.iris.server.packet.work; + +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.IOException; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class DonePacket implements Packet { + private UUID id = UUID.randomUUID(); + + @Override + public void read(ByteBuf byteBuf) throws IOException { + id = new UUID(byteBuf.readLong(), byteBuf.readLong()); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(id.getMostSignificantBits()); + byteBuf.writeLong(id.getLeastSignificantBits()); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/work/MantleChunkPacket.java b/core/src/main/java/com/volmit/iris/server/packet/work/MantleChunkPacket.java new file mode 100644 index 000000000..e92b482d6 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/work/MantleChunkPacket.java @@ -0,0 +1,103 @@ +package com.volmit.iris.server.packet.work; + +import com.volmit.iris.server.util.ByteBufUtil; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.util.collection.KList; +import com.volmit.iris.util.documentation.ChunkCoordinates; +import com.volmit.iris.util.mantle.Mantle; +import com.volmit.iris.util.mantle.MantleChunk; +import com.volmit.iris.util.math.Position2; +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; + +import java.io.*; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class MantleChunkPacket implements Packet { + private UUID pregenId; + private int x, z; + private MantleChunk chunk; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + pregenId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + x = byteBuf.readInt(); + z = byteBuf.readInt(); + int sectionHeight = byteBuf.readInt(); + try (var din = new DataInputStream(new BufferedInputStream(new LZ4BlockInputStream(new ByteArrayInputStream(ByteBufUtil.readBytes(byteBuf)))))) { + chunk = new MantleChunk(sectionHeight, din); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to read chunk", e); + } + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(pregenId.getMostSignificantBits()); + byteBuf.writeLong(pregenId.getLeastSignificantBits()); + byteBuf.writeInt(x); + byteBuf.writeInt(z); + byteBuf.writeInt(chunk.getSectionHeight()); + var out = new ByteArrayOutputStream(); + try (var dos = new DataOutputStream(new LZ4BlockOutputStream(out))) { + chunk.write(dos); + } + ByteBufUtil.writeBytes(byteBuf, out.toByteArray()); + } + + @ChunkCoordinates + public MantleChunkPacket read(Position2 pos, Mantle mantle) { + this.x = pos.getX(); + this.z = pos.getZ(); + this.chunk = mantle.getChunk(x, z); + return this; + } + + public void set(Mantle mantle) { + mantle.setChunk(x, z, chunk); + } + + @Data + @Accessors(chain = true) + @NoArgsConstructor + public static class Request implements Packet { + private UUID pregenId; + private KList positions = new KList<>(); + + @Override + public void read(ByteBuf byteBuf) throws IOException { + pregenId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + var count = byteBuf.readInt(); + positions = new KList<>(count); + for (int i = 0; i < count; i++) { + positions.add(new Position2(byteBuf.readInt(), byteBuf.readInt())); + } + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(pregenId.getMostSignificantBits()); + byteBuf.writeLong(pregenId.getLeastSignificantBits()); + byteBuf.writeInt(positions.size()); + for (Position2 p : positions) { + byteBuf.writeInt(p.getX()); + byteBuf.writeInt(p.getZ()); + } + } + + @ChunkCoordinates + public void add(Position2 chunkPos) { + if (positions == null) + positions = new KList<>(); + positions.add(chunkPos); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/packet/work/PregenPacket.java b/core/src/main/java/com/volmit/iris/server/packet/work/PregenPacket.java new file mode 100644 index 000000000..d8f7fc241 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/work/PregenPacket.java @@ -0,0 +1,35 @@ +package com.volmit.iris.server.packet.work; + +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.IOException; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor +public class PregenPacket implements Packet { + private UUID id = UUID.randomUUID(); + private int x, z; + + @Override + public void read(ByteBuf byteBuf) throws IOException { + id = new UUID(byteBuf.readLong(), byteBuf.readLong()); + x = byteBuf.readInt(); + z = byteBuf.readInt(); + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(id.getMostSignificantBits()); + byteBuf.writeLong(id.getLeastSignificantBits()); + byteBuf.writeInt(x); + byteBuf.writeInt(z); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java b/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java new file mode 100644 index 000000000..137c6356c --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java @@ -0,0 +1,264 @@ +package com.volmit.iris.server.pregen; + +import com.volmit.iris.Iris; +import com.volmit.iris.core.IrisSettings; +import com.volmit.iris.core.gui.PregeneratorJob; +import com.volmit.iris.core.nms.IHeadless; +import com.volmit.iris.core.nms.INMS; +import com.volmit.iris.core.pregenerator.PregenListener; +import com.volmit.iris.core.pregenerator.PregeneratorMethod; +import com.volmit.iris.engine.framework.Engine; +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.execption.RejectedException; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.work.ChunkPacket; +import com.volmit.iris.server.packet.work.DonePacket; +import com.volmit.iris.server.packet.work.MantleChunkPacket; +import com.volmit.iris.server.util.*; +import com.volmit.iris.util.collection.KMap; +import com.volmit.iris.util.mantle.Mantle; +import com.volmit.iris.util.parallel.MultiBurst; +import lombok.Getter; +import lombok.extern.java.Log; +import org.bukkit.World; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.logging.Level; + +@Log(topic = "CloudPregen") +public class CloudMethod implements PregeneratorMethod, ConnectionHolder, PacketListener { + private final @Getter IrisConnection connection = new IrisConnection(this); + private final Engine engine; + private final IHeadless headless; + private final KMap holders = new KMap<>(); + private final CompletableFuture future = new CompletableFuture<>(); + private final KMap> locks = new KMap<>(); + + public CloudMethod(String address, Engine engine) throws InterruptedException { + var split = address.split(":"); + if (split.length != 2 || !split[1].matches("\\d+")) + throw new IllegalArgumentException("Invalid remote server address: " + address); + + IrisConnection.connect(new InetSocketAddress(split[0], Integer.parseInt(split[1])), () -> this); + + this.engine = engine; + this.headless = INMS.get().createHeadless(engine); + } + + @Override + public void init() { + var studio = engine.isStudio(); + var base = studio ? + engine.getData().getDataFolder() : + engine.getWorld().worldFolder(); + var name = engine.getWorld().name(); + var exit = new AtomicBoolean(false); + var limited = new LimitedSemaphore(IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getParallelism())); + + log.info(name + ": Uploading pack..."); + iterate(engine.getData().getDataFolder(), f -> { + if (exit.get()) return; + + try { + limited.acquire(); + } catch (InterruptedException e) { + exit.set(true); + return; + } + + MultiBurst.burst.complete(() -> { + try { + upload(exit, base, f, studio, 8192); + } finally { + limited.release(); + } + }); + }); + + try { + limited.acquireAll(); + } catch (InterruptedException ignored) {} + + log.info(name + ": Done uploading pack"); + log.info(name + ": Initializing Engine..."); + CompletableFuture future = new CompletableFuture<>(); + var packet = Packets.ENGINE.newPacket() + .setDimension(engine.getDimension().getLoadKey()) + .setSeed(engine.getWorld().getRawWorldSeed()) + .setRadius(engine.getMantle().getRadius()); + + locks.put(packet.getId(), future); + packet.send(connection); + + try { + future.get(); + } catch (Throwable ignored) {} + log.info(name + ": Done initializing Engine"); + } + + private void upload(AtomicBoolean exit, File base, File f, boolean studio, int packetSize) { + if (exit.get() || (!studio && !f.getAbsolutePath().startsWith(base.getAbsolutePath()))) + return; + + String path = studio ? "iris/pack/" : ""; + path += f.getAbsolutePath().substring(base.getAbsolutePath().length() + 1); + + try (FileInputStream in = new FileInputStream(f)) { + long offset = 0; + byte[] data; + while ((data = in.readNBytes(packetSize)).length > 0 && !exit.get()) { + CompletableFuture future = new CompletableFuture<>(); + var packet = Packets.FILE.newPacket() + .setPath(path) + .setOffset(offset) + .setLength(f.length()) + .setData(data); + + locks.put(packet.getId(), future); + packet.send(connection); + future.get(); + + offset += data.length; + } + } catch (IOException | ExecutionException | InterruptedException e) { + Iris.error("Failed to upload " + f); + e.printStackTrace(); + exit.set(true); + } + } + + private void iterate(File file, Consumer consumer) { + var queue = new ArrayDeque(); + queue.add(file); + + while (!queue.isEmpty()) { + var f = queue.remove(); + if (f.isFile()) + consumer.accept(f); + if (f.isDirectory()) { + var files = f.listFiles(); + if (files == null) continue; + queue.addAll(Arrays.asList(files)); + } + } + } + + @Override + public void close() { + close0(); + connection.disconnect(); + } + + @Override + public void save() {} + + @Override + public boolean supportsRegions(int x, int z, PregenListener listener) { + return true; + } + + @Override + public void generateRegion(int x, int z, PregenListener listener) { + var semaphore = future.join(); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + semaphore.release(); + return; + } + + var p = Packets.PREGEN.newPacket() + .setX(x) + .setZ(z); + new PregenHolder(p, engine.getMantle().getRadius(), true, listener) + .put(holders); + p.send(connection); + } + + @Override + public void generateChunk(int x, int z, PregenListener listener) {} + + @Override + public String getMethod(int x, int z) { + return "Cloud"; + } + + @Override + public Mantle getMantle() { + return engine.getMantle().getMantle(); + } + + @Override + public World getWorld() { + return engine.getWorld().realWorld(); + } + + @Override + public void onPacket(Packet raw) throws Exception { + if (raw instanceof ChunkPacket packet) { + headless.addChunk(packet); + holders.get(packet.getPregenId()) + .getListener() + .onChunkGenerated(packet.getX(), packet.getZ()); + } else if (raw instanceof MantleChunkPacket packet) { + if (holders.get(packet.getPregenId()) + .remove(packet.getX(), packet.getZ())) { + future.join().release(); + } + packet.set(getMantle()); + } else if (raw instanceof MantleChunkPacket.Request packet) { + var mantle = getMantle(); + for (var chunk : packet.getPositions()) { + Packets.MANTLE_CHUNK.newPacket() + .setPregenId(packet.getPregenId()) + .read(chunk, mantle) + .send(connection); + } + } else if (raw instanceof InfoPacket packet) { + if (packet.getNodeCount() > 0 && !future.isDone()) + future.complete(new LimitedSemaphore(packet.getNodeCount())); + //if (packet.getCps() >= 0) + // Iris.info("Cloud CPS: " + packet.getCps()); + } else if (raw instanceof DonePacket packet) { + locks.remove(packet.getId()).complete(null); + } else if (raw instanceof ErrorPacket packet) { + packet.log(log, Level.SEVERE); + } else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName()); + } + + @Override + public void onDisconnect() { + try { + if (!future.isDone()) + future.cancel(false); + } catch (Throwable ignored) {} + PregeneratorJob.shutdownInstance(); + } + + private void close0() { + if (!future.isCancelled()) { + try { + future.join().acquireAll(); + } catch (InterruptedException ignored) {} + } + + try { + headless.close(); + } catch (IOException e) { + log.log(Level.SEVERE, "Failed to close headless", e); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/pregen/CloudTask.java b/core/src/main/java/com/volmit/iris/server/pregen/CloudTask.java new file mode 100644 index 000000000..18a3895cc --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/pregen/CloudTask.java @@ -0,0 +1,70 @@ +package com.volmit.iris.server.pregen; + +import com.volmit.iris.core.pregenerator.PregenTask; +import com.volmit.iris.util.collection.KList; +import com.volmit.iris.util.math.Position2; +import com.volmit.iris.util.math.Spiraled; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Comparator; +import java.util.Map; + +@ToString +@Builder(builderMethodName = "couldBuilder") +@EqualsAndHashCode(callSuper = true) +public class CloudTask extends PregenTask { + @Builder.Default + private boolean resetCache = false; + @Builder.Default + private boolean gui = false; + @Builder.Default + private Position2 center = new Position2(0, 0); + @Builder.Default + private int width = 1; + @Builder.Default + private int height = 1; + private int distance; + + private CloudTask(boolean resetCache, boolean gui, Position2 center, int width, int height, int distance) { + super(resetCache, gui, center, width, height); + this.resetCache = resetCache; + this.gui = gui; + this.center = center; + this.width = width; + this.height = height; + + int d = distance & 31; + if (d > 0) d = 32 - d; + this.distance = 32 + d + distance >> 5; + } + + @Override + public void iterateRegions(Spiraled s) { + var c = Comparator.comparingInt(DPos2::distance); + for (int oX = 0; oX < distance; oX++) { + for (int oZ = 0; oZ < distance; oZ++) { + var p = new KList(); + for (int x = -width; x <= width - oX; x+=distance) { + for (int z = -height; z <= height - oZ; z+=distance) { + s.on(x + oX, z + oZ); + //p.add(new DPos2(x + oX, z + oZ)); + } + } + p.sort(c); + p.forEach(i -> i.on(s)); + } + } + } + + private record DPos2(int x, int z, int distance) { + private DPos2(int x, int z) { + this(x, z, x * x + z * z); + } + + public void on(Spiraled s) { + s.on(x, z); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/ByteBufUtil.java b/core/src/main/java/com/volmit/iris/server/util/ByteBufUtil.java new file mode 100644 index 000000000..22c75db0f --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/ByteBufUtil.java @@ -0,0 +1,27 @@ +package com.volmit.iris.server.util; + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; + +public class ByteBufUtil { + + public static String readString(ByteBuf byteBuf) { + return new String(readBytes(byteBuf), StandardCharsets.UTF_8); + } + + public static void writeString(ByteBuf byteBuf, String s) { + writeBytes(byteBuf, s.getBytes(StandardCharsets.UTF_8)); + } + + public static byte[] readBytes(ByteBuf byteBuf) { + byte[] bytes = new byte[byteBuf.readInt()]; + byteBuf.readBytes(bytes); + return bytes; + } + + public static void writeBytes(ByteBuf byteBuf, byte[] bytes) { + byteBuf.writeInt(bytes.length); + byteBuf.writeBytes(bytes); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/CPSLooper.java b/core/src/main/java/com/volmit/iris/server/util/CPSLooper.java new file mode 100644 index 000000000..a5f56ee24 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/CPSLooper.java @@ -0,0 +1,67 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.util.math.M; +import com.volmit.iris.util.math.RollingSequence; +import com.volmit.iris.util.scheduling.Looper; +import lombok.Setter; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class CPSLooper extends Looper { + private final RollingSequence chunksPerSecond = new RollingSequence(10); + private final AtomicInteger generated = new AtomicInteger(); + private final AtomicInteger generatedLast = new AtomicInteger(); + private final AtomicBoolean running = new AtomicBoolean(true); + private final IrisConnection connection; + private int nodeCount = 0; + + public CPSLooper(String name, IrisConnection connection) { + this.connection = connection; + setName(name); + setPriority(Thread.MAX_PRIORITY); + start(); + } + + public void addChunks(int count) { + generated.addAndGet(count); + } + + public void exit() { + running.set(false); + } + + public synchronized void setNodeCount(int count) { + if (nodeCount != 0 || count < 1) + return; + nodeCount = count; + Packets.INFO.newPacket() + .setNodeCount(nodeCount) + .send(connection); + } + + @Override + protected long loop() { + if (!running.get()) + return -1; + long t = M.ms(); + + int secondGenerated = generated.get() - generatedLast.get(); + generatedLast.set(generated.get()); + chunksPerSecond.put(secondGenerated); + + if (secondGenerated > 0 && nodeCount > 0) { + Packets.INFO.newPacket() + .setNodeCount(nodeCount) + .setCps((int) Math.round(chunksPerSecond.getAverage())) + .setGenerated(secondGenerated) + .send(connection); + } + + return Math.max(5000 - (M.ms() - t), 0); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/ConnectionHolder.java b/core/src/main/java/com/volmit/iris/server/util/ConnectionHolder.java new file mode 100644 index 000000000..acb40d465 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/ConnectionHolder.java @@ -0,0 +1,8 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.server.IrisConnection; + +public interface ConnectionHolder { + + IrisConnection getConnection(); +} diff --git a/core/src/main/java/com/volmit/iris/server/util/ErrorPacket.java b/core/src/main/java/com/volmit/iris/server/util/ErrorPacket.java new file mode 100644 index 000000000..4236246fd --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/ErrorPacket.java @@ -0,0 +1,54 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.server.packet.Packet; +import io.netty.buffer.ByteBuf; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Data +@NoArgsConstructor +public class ErrorPacket implements Packet { + private String message; + private String stackTrace; + + public ErrorPacket(String message) { + this.message = message; + } + + public ErrorPacket(String message, Throwable cause) { + this.message = message; + StringWriter writer = new StringWriter(); + cause.printStackTrace(new PrintWriter(writer)); + stackTrace = writer.toString(); + } + + @Override + public void read(ByteBuf byteBuf) throws IOException { + message = ByteBufUtil.readString(byteBuf); + if (byteBuf.readBoolean()) { + stackTrace = ByteBufUtil.readString(byteBuf); + } + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + ByteBufUtil.writeString(byteBuf, message); + byteBuf.writeBoolean(stackTrace != null); + if (stackTrace != null) { + ByteBufUtil.writeString(byteBuf, stackTrace); + } + } + + public void log(Logger logger, Level level) { + if (stackTrace == null) { + logger.log(level, message); + return; + } + logger.log(level, message + "\n" + stackTrace); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/LimitedSemaphore.java b/core/src/main/java/com/volmit/iris/server/util/LimitedSemaphore.java new file mode 100644 index 000000000..ecca55dc4 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/LimitedSemaphore.java @@ -0,0 +1,41 @@ +package com.volmit.iris.server.util; + +import lombok.Getter; + +import java.util.concurrent.Semaphore; + +@Getter +public class LimitedSemaphore extends Semaphore { + private final int permits; + + public LimitedSemaphore(int permits) { + super(permits); + this.permits = permits; + } + + public void runBlocking(Runnable runnable) throws InterruptedException { + try { + acquire(); + runnable.run(); + } finally { + release(); + } + } + + public void runAllBlocking(Runnable runnable) throws InterruptedException { + try { + acquireAll(); + runnable.run(); + } finally { + releaseAll(); + } + } + + public void acquireAll() throws InterruptedException { + acquire(permits); + } + + public void releaseAll() { + release(permits); + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/PacketListener.java b/core/src/main/java/com/volmit/iris/server/util/PacketListener.java new file mode 100644 index 000000000..2a0923a06 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/PacketListener.java @@ -0,0 +1,14 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.server.packet.Packet; + +public interface PacketListener { + + void onPacket(Packet packet) throws Exception; + + default void onDisconnect() {} + + default boolean isAccepting() { + return true; + } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/PacketSendListener.java b/core/src/main/java/com/volmit/iris/server/util/PacketSendListener.java new file mode 100644 index 000000000..000b0790e --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/PacketSendListener.java @@ -0,0 +1,26 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.server.packet.Packet; + +import javax.annotation.Nullable; + +public interface PacketSendListener { + static PacketSendListener thenRun(Runnable runnable) { + return new PacketSendListener() { + public void onSuccess() { + runnable.run(); + } + + @Nullable + public Packet onFailure() { + runnable.run(); + return null; + } + }; + } + + default void onSuccess() {} + + @Nullable + default Packet onFailure() { return null; } +} diff --git a/core/src/main/java/com/volmit/iris/server/util/PregenHolder.java b/core/src/main/java/com/volmit/iris/server/util/PregenHolder.java new file mode 100644 index 000000000..76b1cabb3 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/util/PregenHolder.java @@ -0,0 +1,62 @@ +package com.volmit.iris.server.util; + +import com.volmit.iris.core.pregenerator.PregenListener; +import com.volmit.iris.server.packet.work.PregenPacket; +import com.volmit.iris.util.collection.KList; +import com.volmit.iris.util.collection.KMap; +import com.volmit.iris.util.math.Position2; +import lombok.AccessLevel; +import lombok.Getter; + +import java.util.UUID; +import java.util.function.Consumer; + +@Getter +public class PregenHolder { + private final UUID id; + private final int x, z, r; + @Getter(AccessLevel.NONE) + private final KList chunks = new KList<>(); + private final PregenListener listener; + + public PregenHolder(PregenPacket packet, int r, boolean fill, PregenListener listener) { + this.id = packet.getId(); + this.x = packet.getX(); + this.z = packet.getZ(); + this.r = r; + this.listener = listener; + + if (fill) + iterate(chunks::add); + } + + public void put(KMap holders) { + holders.put(id, this); + } + + public synchronized boolean remove(int x, int z) { + chunks.remove(new Position2(x, z)); + boolean b = chunks.isEmpty(); + if (b && listener != null) listener.onRegionGenerated(x, z); + return b; + } + + public void iterate(Consumer consumer) { + int cX = x << 5; + int cZ = z << 5; + for (int x = -r; x <= r; x++) { + for (int z = -r; z <= r; z++) { + if (x == 0 && z == 0) { + for (int xx = 0; xx < 32; xx++) { + for (int zz = 0; zz < 32; zz++) { + consumer.accept(new Position2(x + cX + xx, z + cZ + zz)); + } + } + continue; + } + + consumer.accept(new Position2(x + cX + x < 0 ? 0 : 32, z + cZ + z < 0 ? 0 : 32)); + } + } + } +} diff --git a/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java b/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java index e4dcbe650..4f607dfb5 100644 --- a/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java +++ b/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java @@ -195,6 +195,11 @@ public class Mantle { return get(x >> 5, z >> 5).getOrCreate(x & 31, z & 31); } + @ChunkCoordinates + public void setChunk(int x, int z, MantleChunk chunk) { + get(x >> 5, z >> 5).set(x & 31, z & 31, chunk); + } + /** * Flag or unflag a chunk * diff --git a/core/src/main/java/com/volmit/iris/util/mantle/MantleChunk.java b/core/src/main/java/com/volmit/iris/util/mantle/MantleChunk.java index 38278b762..d00d38f3b 100644 --- a/core/src/main/java/com/volmit/iris/util/mantle/MantleChunk.java +++ b/core/src/main/java/com/volmit/iris/util/mantle/MantleChunk.java @@ -100,6 +100,10 @@ public class MantleChunk { return flags.get(flag.ordinal()) == 1; } + public int getSectionHeight() { + return sections.length(); + } + /** * Check if a section exists (same as get(section) != null) * diff --git a/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java b/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java index a32645115..9212aaae2 100644 --- a/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java +++ b/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java @@ -158,6 +158,20 @@ public class TectonicPlate { return chunk; } + /** + * Set a tectonic plate + * + * @param x the chunk relative x (0-31) + * @param z the chunk relative z (0-31) + * @param chunk the chunk + */ + @ChunkCoordinates + public void set(int x, int z, MantleChunk chunk) { + if (x != chunk.getX() || z != chunk.getZ()) + throw new IllegalArgumentException("X/Z of chunk must match the plate"); + chunks.set(index(x, z), chunk); + } + @ChunkCoordinates private int index(int x, int z) { return Cache.to1D(x, z, 0, 32, 32); diff --git a/core/src/main/resources/plugin.yml b/core/src/main/resources/plugin.yml index bca7e4638..3ebee0f20 100644 --- a/core/src/main/resources/plugin.yml +++ b/core/src/main/resources/plugin.yml @@ -24,6 +24,7 @@ libraries: - rhino:js:1.7R2 - bsf:bsf:2.4.0 - org.lz4:lz4-java:1.8.0 + - io.netty:netty-all:4.1.112.Final commands: iris: aliases: [ ir, irs ] diff --git a/nms/v1_20_R3/src/main/java/com/volmit/iris/core/nms/v1_20_R3/Headless.java b/nms/v1_20_R3/src/main/java/com/volmit/iris/core/nms/v1_20_R3/Headless.java index d171aa022..f0c5ee5aa 100644 --- a/nms/v1_20_R3/src/main/java/com/volmit/iris/core/nms/v1_20_R3/Headless.java +++ b/nms/v1_20_R3/src/main/java/com/volmit/iris/core/nms/v1_20_R3/Headless.java @@ -30,6 +30,8 @@ import com.volmit.iris.engine.framework.Engine; import com.volmit.iris.engine.framework.EngineStage; import com.volmit.iris.engine.framework.WrongEngineBroException; import com.volmit.iris.engine.object.IrisBiome; +import com.volmit.iris.server.node.IrisSession; +import com.volmit.iris.server.packet.work.ChunkPacket; import com.volmit.iris.util.collection.KList; import com.volmit.iris.util.collection.KMap; import com.volmit.iris.util.context.ChunkContext; @@ -61,6 +63,7 @@ import net.minecraft.world.level.chunk.storage.RegionFile; import org.bukkit.Material; import org.bukkit.block.data.BlockData; +import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; @@ -82,6 +85,8 @@ public class Headless implements IHeadless, LevelHeightAccessor { private final RNG BIOME_RNG; private final @Getter int minBuildHeight; private final @Getter int height; + private IrisSession session; + private CompletingThread regionThread; private boolean closed = false; public Headless(NMSBinding binding, Engine engine) { @@ -127,6 +132,12 @@ public class Headless implements IHeadless, LevelHeightAccessor { cleaner.start(); } + public void setSession(IrisSession session) { + if (this.session != null) + throw new IllegalStateException("Session already set"); + this.session = session; + } + @Override public int getLoadedChunks() { return loadedChunks.get(); @@ -153,21 +164,42 @@ public class Headless implements IHeadless, LevelHeightAccessor { } @Override - public void generateRegion(MultiBurst burst, int x, int z, PregenListener listener) { - if (closed) return; - boolean listening = listener != null; - if (listening) listener.onRegionGenerating(x, z); - CountDownLatch latch = new CountDownLatch(1024); - iterateRegion(x, z, pos -> burst.complete(() -> { - if (listening) listener.onChunkGenerating(pos.x, pos.z); - generateChunk(pos.x, pos.z); - if (listening) listener.onChunkGenerated(pos.x, pos.z); - latch.countDown(); - })); - try { - latch.await(); - } catch (InterruptedException ignored) {} - if (listening) listener.onRegionGenerated(x, z); + public synchronized CompletableFuture generateRegion(MultiBurst burst, int x, int z, int maxConcurrent, PregenListener listener) { + if (closed) return CompletableFuture.completedFuture(null); + if (regionThread != null && !regionThread.future.isDone()) + throw new IllegalStateException("Region generation already in progress"); + + regionThread = new CompletingThread(() -> { + boolean listening = listener != null; + Semaphore semaphore = new Semaphore(maxConcurrent); + CountDownLatch latch = new CountDownLatch(1024); + + iterateRegion(x, z, pos -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + semaphore.release(); + return; + } + + burst.complete(() -> { + try { + if (listening) listener.onChunkGenerating(pos.x, pos.z); + generateChunk(pos.x, pos.z); + if (listening) listener.onChunkGenerated(pos.x, pos.z); + } finally { + semaphore.release(); + latch.countDown(); + } + }); + }); + try { + latch.await(); + } catch (InterruptedException ignored) {} + if (listening) listener.onRegionGenerated(x, z); + }, "Region Generator - " + x + "," + z, Thread.MAX_PRIORITY); + + return regionThread.future; } @RegionCoordinates @@ -199,6 +231,12 @@ public class Headless implements IHeadless, LevelHeightAccessor { inject(engine, chunk, ctx); chunk.setStatus(ChunkStatus.FULL); + if (session != null) { + session.completeChunk(x, z, write(chunk)); + loadedChunks.decrementAndGet(); + return; + } + long key = Cache.key(pos.getRegionX(), pos.getRegionZ()); regions.computeIfAbsent(key, Region::new) .add(chunk); @@ -209,6 +247,16 @@ public class Headless implements IHeadless, LevelHeightAccessor { } } + @Override + public void addChunk(ChunkPacket packet) { + if (closed) return; + if (session != null) throw new IllegalStateException("Headless running as Server"); + var pos = new ChunkPos(packet.getX(), packet.getZ()); + regions.computeIfAbsent(Cache.key(pos.getRegionX(), pos.getRegionZ()), Region::new) + .add(packet); + loadedChunks.incrementAndGet(); + } + @BlockCoordinates private ChunkContext generate(Engine engine, int x, int z, Hunk vblocks, Hunk vbiomes) throws WrongEngineBroException { if (engine.isClosed()) { @@ -284,6 +332,11 @@ public class Headless implements IHeadless, LevelHeightAccessor { public void close() throws IOException { if (closed) return; try { + if (regionThread != null) { + regionThread.future.join(); + regionThread = null; + } + regions.values().forEach(Region::submit); Iris.info("Waiting for " + loadedChunks.get() + " chunks to unload..."); while (loadedChunks.get() > 0 || !regions.isEmpty()) @@ -304,6 +357,7 @@ public class Headless implements IHeadless, LevelHeightAccessor { private final int x, z; private final long key; private final KList chunks = new KList<>(1024); + private final KList remoteChunks = new KList<>(1024); private final AtomicBoolean full = new AtomicBoolean(); private long lastEntry = M.ms(); @@ -334,13 +388,31 @@ public class Headless implements IHeadless, LevelHeightAccessor { } loadedChunks.decrementAndGet(); } + for (var chunk : remoteChunks) { + var pos = new ChunkPos(chunk.getX(), chunk.getZ()); + try (DataOutputStream dos = regionFile.getChunkDataOutputStream(pos)) { + dos.write(chunk.getData()); + } catch (Throwable e) { + Iris.error("Failed to save remote chunk " + pos.x + ", " + pos.z); + e.printStackTrace(); + } + loadedChunks.decrementAndGet(); + } regions.remove(key); } public synchronized void add(ProtoChunk chunk) { chunks.add(chunk); lastEntry = M.ms(); - if (chunks.size() < 1024) + if (chunks.size() + remoteChunks.size() < 1024) + return; + submit(); + } + + public synchronized void add(ChunkPacket packet) { + remoteChunks.add(packet); + lastEntry = M.ms(); + if (chunks.size() + remoteChunks.size() < 1024) return; submit(); } @@ -350,4 +422,31 @@ public class Headless implements IHeadless, LevelHeightAccessor { executor.submit(this); } } + + private byte[] write(ProtoChunk chunk) throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + NbtIo.write(binding.serializeChunk(chunk, Headless.this), dos); + return out.toByteArray(); + } + } + + private static class CompletingThread extends Thread { + private final CompletableFuture future = new CompletableFuture<>(); + + private CompletingThread(Runnable task, String name, int priority) { + super(task, name); + setPriority(priority); + start(); + } + + @Override + public void run() { + try { + super.run(); + } finally { + future.complete(null); + } + } + } }