From d22f49492fdeef896a0dc239be94cc1d2e3083a6 Mon Sep 17 00:00:00 2001 From: Julian Krings Date: Sat, 7 Sep 2024 12:32:09 +0200 Subject: [PATCH] implement done packet into master and add safety checks for remote server version --- .../volmit/iris/server/IrisConnection.java | 4 +- .../iris/server/master/IrisMasterClient.java | 40 ++++++++++++- .../iris/server/master/IrisMasterServer.java | 36 +++++++++--- .../iris/server/master/IrisMasterSession.java | 48 ++++++++++++--- .../iris/server/master/PingConnection.java | 32 ++++++++++ .../volmit/iris/server/node/IrisSession.java | 3 + .../volmit/iris/server/packet/Packets.java | 5 +- .../iris/server/packet/init/PingPacket.java | 58 +++++++++++++++++++ .../iris/server/pregen/CloudMethod.java | 30 ++++++++-- 9 files changed, 227 insertions(+), 29 deletions(-) create mode 100644 core/src/main/java/com/volmit/iris/server/master/PingConnection.java create mode 100644 core/src/main/java/com/volmit/iris/server/packet/init/PingPacket.java diff --git a/core/src/main/java/com/volmit/iris/server/IrisConnection.java b/core/src/main/java/com/volmit/iris/server/IrisConnection.java index 68189936b..e87ef1de0 100644 --- a/core/src/main/java/com/volmit/iris/server/IrisConnection.java +++ b/core/src/main/java/com/volmit/iris/server/IrisConnection.java @@ -24,7 +24,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.logging.Level; @@ -163,8 +162,7 @@ public class IrisConnection extends SimpleChannelInboundHandler { .addLast("packet_handler", holder.getConnection()); } - public static T connect(InetSocketAddress address, Supplier factory) throws InterruptedException { - var holder = factory.get(); + public static T connect(InetSocketAddress address, T holder) throws InterruptedException { new Bootstrap() .group(getWorker()) .handler(new ChannelInitializer<>() { diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java index da65a1aec..af4ed90aa 100644 --- a/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterClient.java @@ -2,18 +2,36 @@ package com.volmit.iris.server.master; import com.volmit.iris.server.IrisConnection; import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.init.PingPacket; import com.volmit.iris.server.util.ConnectionHolder; import com.volmit.iris.server.util.PacketListener; import com.volmit.iris.server.util.PacketSendListener; import lombok.Getter; import org.jetbrains.annotations.Nullable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + public class IrisMasterClient implements ConnectionHolder, PacketListener { private final @Getter IrisConnection connection = new IrisConnection(this); private final IrisMasterSession session; + private final CompletableFuture pingResponse = new CompletableFuture<>(); + private final CompletableFuture nodeCount = new CompletableFuture<>(); - IrisMasterClient(IrisMasterSession session) { + IrisMasterClient(String version, IrisMasterSession session){ this.session = session; + Packets.PING.newPacket() + .setVersion(version) + .send(connection); + try { + var packet = pingResponse.get(); + if (!packet.getVersion().contains(version)) + throw new IllegalStateException("Remote server version does not match"); + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException("Failed to get ping packet", e); + } } protected void send(Packet packet) { @@ -25,8 +43,24 @@ public class IrisMasterClient implements ConnectionHolder, PacketListener { } @Override - public void onPacket(Packet packet) throws Exception { - session.onClientPacket(packet); + public void onPacket(Packet raw) { + if (!pingResponse.isDone() && raw instanceof PingPacket packet) { + pingResponse.complete(packet); + return; + } + if (!nodeCount.isDone() && raw instanceof InfoPacket packet && packet.getNodeCount() > 0) { + nodeCount.complete(packet.getNodeCount()); + return; + } + session.onClientPacket(this, raw); + } + + public int getNodeCount() { + try { + return nodeCount.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } public void disconnect() { diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java index bd1b6b130..e1346005f 100644 --- a/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterServer.java @@ -3,21 +3,23 @@ package com.volmit.iris.server.master; import com.volmit.iris.server.IrisConnection; import com.volmit.iris.server.node.IrisServer; import com.volmit.iris.server.util.PregenHolder; +import com.volmit.iris.util.collection.KList; import com.volmit.iris.util.collection.KMap; +import com.volmit.iris.util.collection.KSet; import lombok.extern.java.Log; import java.net.InetSocketAddress; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @Log(topic = "Iris-MasterServer") public class IrisMasterServer extends IrisServer { - private static final Object VALUE = new Object(); private static IrisMasterServer instance; private final KMap>> sessions = new KMap<>(); - private final KMap nodes = new KMap<>(); + private final KMap> nodes = new KMap<>(); public IrisMasterServer(int port, String[] remote) throws InterruptedException { super("Iris-MasterServer", port, IrisMasterSession::new); @@ -40,12 +42,24 @@ public class IrisMasterServer extends IrisServer { } } - public void addNode(InetSocketAddress address) { - nodes.put(address, VALUE); + private void addNode(InetSocketAddress address) throws InterruptedException { + var ping = new PingConnection(address); + try { + for (String version : ping.getVersion().get()) + addNode(address, version); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void addNode(InetSocketAddress address, String version) { + nodes.computeIfAbsent(version, v -> new KSet<>()).add(address); } public void removeNode(InetSocketAddress address) { - nodes.remove(address); + for (var set : nodes.values()) { + set.remove(address); + } } public static void close(UUID session) { @@ -55,17 +69,21 @@ public class IrisMasterServer extends IrisServer { map.clear(); } - public static KMap> getNodes(IrisMasterSession session) { + public static KList getVersions() { + return get().nodes.k(); + } + + public static KMap> getNodes(String version, IrisMasterSession session) { var master = get(); var uuid = session.getUuid(); close(uuid); master.getLogger().info("Requesting nodes for session " + uuid); var map = new KMap>(); - for (var address : master.nodes.keySet()) { + for (var address : master.nodes.getOrDefault(version, new KSet<>())) { try { - map.put(IrisConnection.connect(address, () -> new IrisMasterClient(session)), new KMap<>()); - } catch (Exception e) { + map.put(IrisConnection.connect(address, new IrisMasterClient(version, session)), new KMap<>()); + } catch (Throwable e) { master.getLogger().log(Level.WARNING, "Failed to connect to server " + address, e); master.removeNode(address); } diff --git a/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java b/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java index feb81106d..985555c6b 100644 --- a/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java +++ b/core/src/main/java/com/volmit/iris/server/master/IrisMasterSession.java @@ -7,10 +7,13 @@ import com.volmit.iris.server.packet.Packets; import com.volmit.iris.server.packet.init.EnginePacket; import com.volmit.iris.server.packet.init.FilePacket; import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.init.PingPacket; import com.volmit.iris.server.packet.work.ChunkPacket; +import com.volmit.iris.server.packet.work.DonePacket; import com.volmit.iris.server.packet.work.MantleChunkPacket; import com.volmit.iris.server.packet.work.PregenPacket; import com.volmit.iris.server.util.*; +import com.volmit.iris.util.collection.KList; import com.volmit.iris.util.collection.KMap; import lombok.Getter; import lombok.extern.java.Log; @@ -19,35 +22,50 @@ import java.util.Comparator; import java.util.UUID; import java.util.logging.Level; -//TODO handle done packet @Log(topic = "IrisMasterSession") public class IrisMasterSession implements ConnectionHolder, PacketListener { private final @Getter IrisConnection connection = new IrisConnection(this); private final @Getter UUID uuid = UUID.randomUUID(); private final KMap map = new KMap<>(); private final CPSLooper cpsLooper = new CPSLooper("IrisMasterSession-" + uuid, connection); + private final KMap waiting = new KMap<>(); private KMap> clients; private int radius = -1; @Override public void onPacket(Packet raw) throws Exception { if (clients == null) { - clients = IrisMasterServer.getNodes(this); - cpsLooper.setNodeCount(clients.size()); + if (raw instanceof PingPacket packet) { + var versions = packet.getVersion(); + PacketSendListener listener = versions.size() != 1 ? PacketSendListener.thenRun(connection::disconnect) : null; + if (listener == null) { + clients = IrisMasterServer.getNodes(versions.get(0), this); + if (clients.isEmpty()) { + connection.disconnect(); + return; + } - var info = Packets.INFO.newPacket(); - info.setNodeCount(clients.size()); - connection.send(info); + var nodeCount = clients.keySet() + .stream() + .mapToInt(IrisMasterClient::getNodeCount) + .sum(); + cpsLooper.setNodeCount(nodeCount); + } + packet.setVersion(IrisMasterServer.getVersions()) + .send(connection, listener); + } else throw new RejectedException("Not a ping packet"); } if (raw instanceof FilePacket packet) { if (radius != -1) throw new RejectedException("Engine already setup"); + waiting.put(packet.getId(), new CompletingHolder(clients.k())); clients.keySet().forEach(client -> client.send(packet)); } else if (raw instanceof EnginePacket packet) { if (radius != -1) throw new RejectedException("Engine already setup"); radius = packet.getRadius(); + waiting.put(packet.getId(), new CompletingHolder(clients.k())); clients.keySet().forEach(client -> client.send(packet)); } else if (raw instanceof PregenPacket packet) { if (radius == -1) @@ -64,13 +82,13 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener { } } - protected void onClientPacket(Packet raw) { + protected void onClientPacket(IrisMasterClient client, Packet raw) { if (raw instanceof ErrorPacket packet) { packet.log(log, Level.SEVERE); } else if (raw instanceof ChunkPacket) { connection.send(raw); } else if (raw instanceof MantleChunkPacket packet) { - var map = clients.get(this.map.get(packet.getPregenId())); + var map = clients.get(client); if (map.get(packet.getPregenId()) .remove(packet.getX(), packet.getZ())) map.get(packet.getPregenId()); @@ -81,6 +99,12 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener { } else if (raw instanceof InfoPacket packet) { int i = packet.getGenerated(); if (i != -1) cpsLooper.addChunks(i); + } else if (raw instanceof DonePacket packet) { + var holder = waiting.get(packet.getId()); + if (holder.remove(client)) { + connection.send(Packets.DONE.newPacket().setId(packet.getId())); + waiting.remove(packet.getId()); + } } } @@ -95,4 +119,12 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener { .min(Comparator.comparingInt(c -> clients.get(c).size())) .orElseThrow(() -> new RejectedException("No clients available")); } + + private record CompletingHolder(KList clients) { + + public synchronized boolean remove(IrisMasterClient client) { + clients.remove(client); + return clients.isEmpty(); + } + } } diff --git a/core/src/main/java/com/volmit/iris/server/master/PingConnection.java b/core/src/main/java/com/volmit/iris/server/master/PingConnection.java new file mode 100644 index 000000000..800549a28 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/master/PingConnection.java @@ -0,0 +1,32 @@ +package com.volmit.iris.server.master; + +import com.volmit.iris.server.IrisConnection; +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.packet.Packets; +import com.volmit.iris.server.packet.init.PingPacket; +import com.volmit.iris.server.util.ConnectionHolder; +import com.volmit.iris.server.util.PacketListener; +import com.volmit.iris.util.collection.KList; +import lombok.Getter; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +@Getter +public class PingConnection implements ConnectionHolder, PacketListener { + private final IrisConnection connection = new IrisConnection(this); + private final CompletableFuture> version = new CompletableFuture<>(); + + public PingConnection(InetSocketAddress address) throws InterruptedException { + IrisConnection.connect(address, this); + Packets.PING.newPacket().send(connection); + } + + @Override + public void onPacket(Packet packet) throws Exception { + if (packet instanceof PingPacket p) { + version.complete(p.getVersion()); + connection.disconnect(); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/server/node/IrisSession.java b/core/src/main/java/com/volmit/iris/server/node/IrisSession.java index 960326945..c9242a514 100644 --- a/core/src/main/java/com/volmit/iris/server/node/IrisSession.java +++ b/core/src/main/java/com/volmit/iris/server/node/IrisSession.java @@ -10,6 +10,7 @@ import com.volmit.iris.server.packet.Packet; import com.volmit.iris.server.packet.Packets; import com.volmit.iris.server.packet.init.EnginePacket; import com.volmit.iris.server.packet.init.FilePacket; +import com.volmit.iris.server.packet.init.PingPacket; import com.volmit.iris.server.util.CPSLooper; import com.volmit.iris.server.util.ConnectionHolder; import com.volmit.iris.server.util.PacketListener; @@ -85,6 +86,8 @@ public class IrisSession implements ConnectionHolder, PacketListener { chunks.put(packet.getId(), holder); connection.send(request); + } else if (raw instanceof PingPacket packet) { + packet.setBukkit().send(connection); } else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName()); } diff --git a/core/src/main/java/com/volmit/iris/server/packet/Packets.java b/core/src/main/java/com/volmit/iris/server/packet/Packets.java index ed1baa858..59f7efb29 100644 --- a/core/src/main/java/com/volmit/iris/server/packet/Packets.java +++ b/core/src/main/java/com/volmit/iris/server/packet/Packets.java @@ -3,6 +3,7 @@ package com.volmit.iris.server.packet; import com.volmit.iris.server.packet.init.EnginePacket; import com.volmit.iris.server.packet.init.FilePacket; import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.init.PingPacket; import com.volmit.iris.server.packet.work.DonePacket; import com.volmit.iris.server.util.ErrorPacket; import com.volmit.iris.server.packet.work.ChunkPacket; @@ -23,6 +24,7 @@ public class Packets { public static final Packets ERROR; public static final Packets INFO; + public static final Packets PING; public static final Packets FILE; public static final Packets ENGINE; @@ -69,6 +71,7 @@ public class Packets { static { ERROR = new Packets<>(ErrorPacket.class, ErrorPacket::new); INFO = new Packets<>(InfoPacket.class, InfoPacket::new); + PING = new Packets<>(PingPacket.class, PingPacket::new); FILE = new Packets<>(FilePacket.class, FilePacket::new); ENGINE = new Packets<>(EnginePacket.class, EnginePacket::new); @@ -78,7 +81,7 @@ public class Packets { MANTLE_CHUNK = new Packets<>(MantleChunkPacket.class, MantleChunkPacket::new); MANTLE_CHUNK_REQUEST = new Packets<>(MantleChunkPacket.Request.class, MantleChunkPacket.Request::new); - REGISTRY = List.of(ERROR, INFO, FILE, ENGINE, DONE, PREGEN, CHUNK, MANTLE_CHUNK, MANTLE_CHUNK_REQUEST); + REGISTRY = List.of(ERROR, INFO, PING, FILE, ENGINE, DONE, PREGEN, CHUNK, MANTLE_CHUNK, MANTLE_CHUNK_REQUEST); var map = new HashMap, Packets>(); for (int i = 0; i < REGISTRY.size(); i++) { diff --git a/core/src/main/java/com/volmit/iris/server/packet/init/PingPacket.java b/core/src/main/java/com/volmit/iris/server/packet/init/PingPacket.java new file mode 100644 index 000000000..8921547fe --- /dev/null +++ b/core/src/main/java/com/volmit/iris/server/packet/init/PingPacket.java @@ -0,0 +1,58 @@ +package com.volmit.iris.server.packet.init; + +import com.volmit.iris.server.packet.Packet; +import com.volmit.iris.server.util.ByteBufUtil; +import com.volmit.iris.util.collection.KList; +import io.netty.buffer.ByteBuf; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; +import org.bukkit.Bukkit; + +import java.io.IOException; +import java.util.UUID; + +@Data +@Accessors(chain = true) +@NoArgsConstructor +public class PingPacket implements Packet { + private UUID id = UUID.randomUUID(); + private KList version = new KList<>(); + + @Override + public void read(ByteBuf byteBuf) throws IOException { + id = new UUID(byteBuf.readLong(), byteBuf.readLong()); + int size = byteBuf.readInt(); + + version = new KList<>(); + for (int i = 0; i < size; i++) { + version.add(ByteBufUtil.readString(byteBuf)); + } + } + + @Override + public void write(ByteBuf byteBuf) throws IOException { + byteBuf.writeLong(id.getMostSignificantBits()); + byteBuf.writeLong(id.getLeastSignificantBits()); + + byteBuf.writeInt(version.size()); + for (String s : version) { + ByteBufUtil.writeString(byteBuf, s); + } + } + + public PingPacket setBukkit() { + this.version = new KList<>(Bukkit.getBukkitVersion().split("-")[0]); + return this; + } + + public PingPacket setVersion(String version) { + this.version = new KList<>(version); + return this; + } + + public PingPacket setVersion(KList version) { + this.version = version; + return this; + } +} diff --git a/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java b/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java index 137c6356c..54134a588 100644 --- a/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java +++ b/core/src/main/java/com/volmit/iris/server/pregen/CloudMethod.java @@ -13,6 +13,7 @@ import com.volmit.iris.server.execption.RejectedException; import com.volmit.iris.server.packet.Packet; import com.volmit.iris.server.packet.Packets; import com.volmit.iris.server.packet.init.InfoPacket; +import com.volmit.iris.server.packet.init.PingPacket; import com.volmit.iris.server.packet.work.ChunkPacket; import com.volmit.iris.server.packet.work.DonePacket; import com.volmit.iris.server.packet.work.MantleChunkPacket; @@ -32,7 +33,6 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -45,14 +45,14 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet private final IHeadless headless; private final KMap holders = new KMap<>(); private final CompletableFuture future = new CompletableFuture<>(); - private final KMap> locks = new KMap<>(); + private final KMap> locks = new KMap<>(); public CloudMethod(String address, Engine engine) throws InterruptedException { var split = address.split(":"); if (split.length != 2 || !split[1].matches("\\d+")) throw new IllegalArgumentException("Invalid remote server address: " + address); - IrisConnection.connect(new InetSocketAddress(split[0], Integer.parseInt(split[1])), () -> this); + IrisConnection.connect(new InetSocketAddress(split[0], Integer.parseInt(split[1])), this); this.engine = engine; this.headless = INMS.get().createHeadless(engine); @@ -68,6 +68,24 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet var exit = new AtomicBoolean(false); var limited = new LimitedSemaphore(IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getParallelism())); + var remoteVersion = new CompletableFuture<>(); + var ping = Packets.PING.newPacket() + .setBukkit(); + locks.put(ping.getId(), remoteVersion); + ping.send(connection); + + try { + var o = remoteVersion.get(); + if (!(o instanceof PingPacket packet)) + throw new IllegalStateException("Invalid response from remote server"); + if (!packet.getVersion().contains(ping.getVersion().get(0))) + throw new IllegalStateException("Remote server version does not match"); + } catch (Throwable e) { + connection.disconnect(); + throw new IllegalStateException("Failed to connect to remote server", e); + } + + log.info(name + ": Uploading pack..."); iterate(engine.getData().getDataFolder(), f -> { if (exit.get()) return; @@ -94,7 +112,7 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet log.info(name + ": Done uploading pack"); log.info(name + ": Initializing Engine..."); - CompletableFuture future = new CompletableFuture<>(); + var future = new CompletableFuture<>(); var packet = Packets.ENGINE.newPacket() .setDimension(engine.getDimension().getLoadKey()) .setSeed(engine.getWorld().getRawWorldSeed()) @@ -120,7 +138,7 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet long offset = 0; byte[] data; while ((data = in.readNBytes(packetSize)).length > 0 && !exit.get()) { - CompletableFuture future = new CompletableFuture<>(); + var future = new CompletableFuture<>(); var packet = Packets.FILE.newPacket() .setPath(path) .setOffset(offset) @@ -234,6 +252,8 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet // Iris.info("Cloud CPS: " + packet.getCps()); } else if (raw instanceof DonePacket packet) { locks.remove(packet.getId()).complete(null); + } else if (raw instanceof PingPacket packet) { + locks.remove(packet.getId()).complete(packet); } else if (raw instanceof ErrorPacket packet) { packet.log(log, Level.SEVERE); } else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName());