implement done packet into master and add safety checks for remote server version

This commit is contained in:
Julian Krings 2024-09-07 12:32:09 +02:00
parent b65b112220
commit d22f49492f
9 changed files with 227 additions and 29 deletions

View File

@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.logging.Level;
@ -163,8 +162,7 @@ public class IrisConnection extends SimpleChannelInboundHandler<Packet> {
.addLast("packet_handler", holder.getConnection());
}
public static <T extends ConnectionHolder> T connect(InetSocketAddress address, Supplier<T> factory) throws InterruptedException {
var holder = factory.get();
public static <T extends ConnectionHolder> T connect(InetSocketAddress address, T holder) throws InterruptedException {
new Bootstrap()
.group(getWorker())
.handler(new ChannelInitializer<>() {

View File

@ -2,18 +2,36 @@ package com.volmit.iris.server.master;
import com.volmit.iris.server.IrisConnection;
import com.volmit.iris.server.packet.Packet;
import com.volmit.iris.server.packet.Packets;
import com.volmit.iris.server.packet.init.InfoPacket;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.util.ConnectionHolder;
import com.volmit.iris.server.util.PacketListener;
import com.volmit.iris.server.util.PacketSendListener;
import lombok.Getter;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class IrisMasterClient implements ConnectionHolder, PacketListener {
private final @Getter IrisConnection connection = new IrisConnection(this);
private final IrisMasterSession session;
private final CompletableFuture<PingPacket> pingResponse = new CompletableFuture<>();
private final CompletableFuture<Integer> nodeCount = new CompletableFuture<>();
IrisMasterClient(IrisMasterSession session) {
IrisMasterClient(String version, IrisMasterSession session){
this.session = session;
Packets.PING.newPacket()
.setVersion(version)
.send(connection);
try {
var packet = pingResponse.get();
if (!packet.getVersion().contains(version))
throw new IllegalStateException("Remote server version does not match");
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("Failed to get ping packet", e);
}
}
protected void send(Packet packet) {
@ -25,8 +43,24 @@ public class IrisMasterClient implements ConnectionHolder, PacketListener {
}
@Override
public void onPacket(Packet packet) throws Exception {
session.onClientPacket(packet);
public void onPacket(Packet raw) {
if (!pingResponse.isDone() && raw instanceof PingPacket packet) {
pingResponse.complete(packet);
return;
}
if (!nodeCount.isDone() && raw instanceof InfoPacket packet && packet.getNodeCount() > 0) {
nodeCount.complete(packet.getNodeCount());
return;
}
session.onClientPacket(this, raw);
}
public int getNodeCount() {
try {
return nodeCount.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public void disconnect() {

View File

@ -3,21 +3,23 @@ package com.volmit.iris.server.master;
import com.volmit.iris.server.IrisConnection;
import com.volmit.iris.server.node.IrisServer;
import com.volmit.iris.server.util.PregenHolder;
import com.volmit.iris.util.collection.KList;
import com.volmit.iris.util.collection.KMap;
import com.volmit.iris.util.collection.KSet;
import lombok.extern.java.Log;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@Log(topic = "Iris-MasterServer")
public class IrisMasterServer extends IrisServer {
private static final Object VALUE = new Object();
private static IrisMasterServer instance;
private final KMap<UUID, KMap<IrisMasterClient, KMap<UUID, PregenHolder>>> sessions = new KMap<>();
private final KMap<InetSocketAddress, Object> nodes = new KMap<>();
private final KMap<String, KSet<InetSocketAddress>> nodes = new KMap<>();
public IrisMasterServer(int port, String[] remote) throws InterruptedException {
super("Iris-MasterServer", port, IrisMasterSession::new);
@ -40,12 +42,24 @@ public class IrisMasterServer extends IrisServer {
}
}
public void addNode(InetSocketAddress address) {
nodes.put(address, VALUE);
private void addNode(InetSocketAddress address) throws InterruptedException {
var ping = new PingConnection(address);
try {
for (String version : ping.getVersion().get())
addNode(address, version);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
public void addNode(InetSocketAddress address, String version) {
nodes.computeIfAbsent(version, v -> new KSet<>()).add(address);
}
public void removeNode(InetSocketAddress address) {
nodes.remove(address);
for (var set : nodes.values()) {
set.remove(address);
}
}
public static void close(UUID session) {
@ -55,17 +69,21 @@ public class IrisMasterServer extends IrisServer {
map.clear();
}
public static KMap<IrisMasterClient, KMap<UUID, PregenHolder>> getNodes(IrisMasterSession session) {
public static KList<String> getVersions() {
return get().nodes.k();
}
public static KMap<IrisMasterClient, KMap<UUID, PregenHolder>> getNodes(String version, IrisMasterSession session) {
var master = get();
var uuid = session.getUuid();
close(uuid);
master.getLogger().info("Requesting nodes for session " + uuid);
var map = new KMap<IrisMasterClient, KMap<UUID, PregenHolder>>();
for (var address : master.nodes.keySet()) {
for (var address : master.nodes.getOrDefault(version, new KSet<>())) {
try {
map.put(IrisConnection.connect(address, () -> new IrisMasterClient(session)), new KMap<>());
} catch (Exception e) {
map.put(IrisConnection.connect(address, new IrisMasterClient(version, session)), new KMap<>());
} catch (Throwable e) {
master.getLogger().log(Level.WARNING, "Failed to connect to server " + address, e);
master.removeNode(address);
}

View File

@ -7,10 +7,13 @@ import com.volmit.iris.server.packet.Packets;
import com.volmit.iris.server.packet.init.EnginePacket;
import com.volmit.iris.server.packet.init.FilePacket;
import com.volmit.iris.server.packet.init.InfoPacket;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.packet.work.ChunkPacket;
import com.volmit.iris.server.packet.work.DonePacket;
import com.volmit.iris.server.packet.work.MantleChunkPacket;
import com.volmit.iris.server.packet.work.PregenPacket;
import com.volmit.iris.server.util.*;
import com.volmit.iris.util.collection.KList;
import com.volmit.iris.util.collection.KMap;
import lombok.Getter;
import lombok.extern.java.Log;
@ -19,35 +22,50 @@ import java.util.Comparator;
import java.util.UUID;
import java.util.logging.Level;
//TODO handle done packet
@Log(topic = "IrisMasterSession")
public class IrisMasterSession implements ConnectionHolder, PacketListener {
private final @Getter IrisConnection connection = new IrisConnection(this);
private final @Getter UUID uuid = UUID.randomUUID();
private final KMap<UUID, IrisMasterClient> map = new KMap<>();
private final CPSLooper cpsLooper = new CPSLooper("IrisMasterSession-" + uuid, connection);
private final KMap<UUID, CompletingHolder> waiting = new KMap<>();
private KMap<IrisMasterClient, KMap<UUID, PregenHolder>> clients;
private int radius = -1;
@Override
public void onPacket(Packet raw) throws Exception {
if (clients == null) {
clients = IrisMasterServer.getNodes(this);
cpsLooper.setNodeCount(clients.size());
if (raw instanceof PingPacket packet) {
var versions = packet.getVersion();
PacketSendListener listener = versions.size() != 1 ? PacketSendListener.thenRun(connection::disconnect) : null;
if (listener == null) {
clients = IrisMasterServer.getNodes(versions.get(0), this);
if (clients.isEmpty()) {
connection.disconnect();
return;
}
var info = Packets.INFO.newPacket();
info.setNodeCount(clients.size());
connection.send(info);
var nodeCount = clients.keySet()
.stream()
.mapToInt(IrisMasterClient::getNodeCount)
.sum();
cpsLooper.setNodeCount(nodeCount);
}
packet.setVersion(IrisMasterServer.getVersions())
.send(connection, listener);
} else throw new RejectedException("Not a ping packet");
}
if (raw instanceof FilePacket packet) {
if (radius != -1)
throw new RejectedException("Engine already setup");
waiting.put(packet.getId(), new CompletingHolder(clients.k()));
clients.keySet().forEach(client -> client.send(packet));
} else if (raw instanceof EnginePacket packet) {
if (radius != -1)
throw new RejectedException("Engine already setup");
radius = packet.getRadius();
waiting.put(packet.getId(), new CompletingHolder(clients.k()));
clients.keySet().forEach(client -> client.send(packet));
} else if (raw instanceof PregenPacket packet) {
if (radius == -1)
@ -64,13 +82,13 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener {
}
}
protected void onClientPacket(Packet raw) {
protected void onClientPacket(IrisMasterClient client, Packet raw) {
if (raw instanceof ErrorPacket packet) {
packet.log(log, Level.SEVERE);
} else if (raw instanceof ChunkPacket) {
connection.send(raw);
} else if (raw instanceof MantleChunkPacket packet) {
var map = clients.get(this.map.get(packet.getPregenId()));
var map = clients.get(client);
if (map.get(packet.getPregenId())
.remove(packet.getX(), packet.getZ()))
map.get(packet.getPregenId());
@ -81,6 +99,12 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener {
} else if (raw instanceof InfoPacket packet) {
int i = packet.getGenerated();
if (i != -1) cpsLooper.addChunks(i);
} else if (raw instanceof DonePacket packet) {
var holder = waiting.get(packet.getId());
if (holder.remove(client)) {
connection.send(Packets.DONE.newPacket().setId(packet.getId()));
waiting.remove(packet.getId());
}
}
}
@ -95,4 +119,12 @@ public class IrisMasterSession implements ConnectionHolder, PacketListener {
.min(Comparator.comparingInt(c -> clients.get(c).size()))
.orElseThrow(() -> new RejectedException("No clients available"));
}
private record CompletingHolder(KList<IrisMasterClient> clients) {
public synchronized boolean remove(IrisMasterClient client) {
clients.remove(client);
return clients.isEmpty();
}
}
}

View File

@ -0,0 +1,32 @@
package com.volmit.iris.server.master;
import com.volmit.iris.server.IrisConnection;
import com.volmit.iris.server.packet.Packet;
import com.volmit.iris.server.packet.Packets;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.util.ConnectionHolder;
import com.volmit.iris.server.util.PacketListener;
import com.volmit.iris.util.collection.KList;
import lombok.Getter;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@Getter
public class PingConnection implements ConnectionHolder, PacketListener {
private final IrisConnection connection = new IrisConnection(this);
private final CompletableFuture<KList<String>> version = new CompletableFuture<>();
public PingConnection(InetSocketAddress address) throws InterruptedException {
IrisConnection.connect(address, this);
Packets.PING.newPacket().send(connection);
}
@Override
public void onPacket(Packet packet) throws Exception {
if (packet instanceof PingPacket p) {
version.complete(p.getVersion());
connection.disconnect();
}
}
}

View File

@ -10,6 +10,7 @@ import com.volmit.iris.server.packet.Packet;
import com.volmit.iris.server.packet.Packets;
import com.volmit.iris.server.packet.init.EnginePacket;
import com.volmit.iris.server.packet.init.FilePacket;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.util.CPSLooper;
import com.volmit.iris.server.util.ConnectionHolder;
import com.volmit.iris.server.util.PacketListener;
@ -85,6 +86,8 @@ public class IrisSession implements ConnectionHolder, PacketListener {
chunks.put(packet.getId(), holder);
connection.send(request);
} else if (raw instanceof PingPacket packet) {
packet.setBukkit().send(connection);
} else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName());
}

View File

@ -3,6 +3,7 @@ package com.volmit.iris.server.packet;
import com.volmit.iris.server.packet.init.EnginePacket;
import com.volmit.iris.server.packet.init.FilePacket;
import com.volmit.iris.server.packet.init.InfoPacket;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.packet.work.DonePacket;
import com.volmit.iris.server.util.ErrorPacket;
import com.volmit.iris.server.packet.work.ChunkPacket;
@ -23,6 +24,7 @@ public class Packets<T extends Packet> {
public static final Packets<ErrorPacket> ERROR;
public static final Packets<InfoPacket> INFO;
public static final Packets<PingPacket> PING;
public static final Packets<FilePacket> FILE;
public static final Packets<EnginePacket> ENGINE;
@ -69,6 +71,7 @@ public class Packets<T extends Packet> {
static {
ERROR = new Packets<>(ErrorPacket.class, ErrorPacket::new);
INFO = new Packets<>(InfoPacket.class, InfoPacket::new);
PING = new Packets<>(PingPacket.class, PingPacket::new);
FILE = new Packets<>(FilePacket.class, FilePacket::new);
ENGINE = new Packets<>(EnginePacket.class, EnginePacket::new);
@ -78,7 +81,7 @@ public class Packets<T extends Packet> {
MANTLE_CHUNK = new Packets<>(MantleChunkPacket.class, MantleChunkPacket::new);
MANTLE_CHUNK_REQUEST = new Packets<>(MantleChunkPacket.Request.class, MantleChunkPacket.Request::new);
REGISTRY = List.of(ERROR, INFO, FILE, ENGINE, DONE, PREGEN, CHUNK, MANTLE_CHUNK, MANTLE_CHUNK_REQUEST);
REGISTRY = List.of(ERROR, INFO, PING, FILE, ENGINE, DONE, PREGEN, CHUNK, MANTLE_CHUNK, MANTLE_CHUNK_REQUEST);
var map = new HashMap<Class<? extends Packet>, Packets<? extends Packet>>();
for (int i = 0; i < REGISTRY.size(); i++) {

View File

@ -0,0 +1,58 @@
package com.volmit.iris.server.packet.init;
import com.volmit.iris.server.packet.Packet;
import com.volmit.iris.server.util.ByteBufUtil;
import com.volmit.iris.util.collection.KList;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.bukkit.Bukkit;
import java.io.IOException;
import java.util.UUID;
@Data
@Accessors(chain = true)
@NoArgsConstructor
public class PingPacket implements Packet {
private UUID id = UUID.randomUUID();
private KList<String> version = new KList<>();
@Override
public void read(ByteBuf byteBuf) throws IOException {
id = new UUID(byteBuf.readLong(), byteBuf.readLong());
int size = byteBuf.readInt();
version = new KList<>();
for (int i = 0; i < size; i++) {
version.add(ByteBufUtil.readString(byteBuf));
}
}
@Override
public void write(ByteBuf byteBuf) throws IOException {
byteBuf.writeLong(id.getMostSignificantBits());
byteBuf.writeLong(id.getLeastSignificantBits());
byteBuf.writeInt(version.size());
for (String s : version) {
ByteBufUtil.writeString(byteBuf, s);
}
}
public PingPacket setBukkit() {
this.version = new KList<>(Bukkit.getBukkitVersion().split("-")[0]);
return this;
}
public PingPacket setVersion(String version) {
this.version = new KList<>(version);
return this;
}
public PingPacket setVersion(KList<String> version) {
this.version = version;
return this;
}
}

View File

@ -13,6 +13,7 @@ import com.volmit.iris.server.execption.RejectedException;
import com.volmit.iris.server.packet.Packet;
import com.volmit.iris.server.packet.Packets;
import com.volmit.iris.server.packet.init.InfoPacket;
import com.volmit.iris.server.packet.init.PingPacket;
import com.volmit.iris.server.packet.work.ChunkPacket;
import com.volmit.iris.server.packet.work.DonePacket;
import com.volmit.iris.server.packet.work.MantleChunkPacket;
@ -32,7 +33,6 @@ import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@ -45,14 +45,14 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet
private final IHeadless headless;
private final KMap<UUID, PregenHolder> holders = new KMap<>();
private final CompletableFuture<LimitedSemaphore> future = new CompletableFuture<>();
private final KMap<UUID, CompletableFuture<?>> locks = new KMap<>();
private final KMap<UUID, CompletableFuture<Object>> locks = new KMap<>();
public CloudMethod(String address, Engine engine) throws InterruptedException {
var split = address.split(":");
if (split.length != 2 || !split[1].matches("\\d+"))
throw new IllegalArgumentException("Invalid remote server address: " + address);
IrisConnection.connect(new InetSocketAddress(split[0], Integer.parseInt(split[1])), () -> this);
IrisConnection.connect(new InetSocketAddress(split[0], Integer.parseInt(split[1])), this);
this.engine = engine;
this.headless = INMS.get().createHeadless(engine);
@ -68,6 +68,24 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet
var exit = new AtomicBoolean(false);
var limited = new LimitedSemaphore(IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getParallelism()));
var remoteVersion = new CompletableFuture<>();
var ping = Packets.PING.newPacket()
.setBukkit();
locks.put(ping.getId(), remoteVersion);
ping.send(connection);
try {
var o = remoteVersion.get();
if (!(o instanceof PingPacket packet))
throw new IllegalStateException("Invalid response from remote server");
if (!packet.getVersion().contains(ping.getVersion().get(0)))
throw new IllegalStateException("Remote server version does not match");
} catch (Throwable e) {
connection.disconnect();
throw new IllegalStateException("Failed to connect to remote server", e);
}
log.info(name + ": Uploading pack...");
iterate(engine.getData().getDataFolder(), f -> {
if (exit.get()) return;
@ -94,7 +112,7 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet
log.info(name + ": Done uploading pack");
log.info(name + ": Initializing Engine...");
CompletableFuture<?> future = new CompletableFuture<>();
var future = new CompletableFuture<>();
var packet = Packets.ENGINE.newPacket()
.setDimension(engine.getDimension().getLoadKey())
.setSeed(engine.getWorld().getRawWorldSeed())
@ -120,7 +138,7 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet
long offset = 0;
byte[] data;
while ((data = in.readNBytes(packetSize)).length > 0 && !exit.get()) {
CompletableFuture<?> future = new CompletableFuture<>();
var future = new CompletableFuture<>();
var packet = Packets.FILE.newPacket()
.setPath(path)
.setOffset(offset)
@ -234,6 +252,8 @@ public class CloudMethod implements PregeneratorMethod, ConnectionHolder, Packet
// Iris.info("Cloud CPS: " + packet.getCps());
} else if (raw instanceof DonePacket packet) {
locks.remove(packet.getId()).complete(null);
} else if (raw instanceof PingPacket packet) {
locks.remove(packet.getId()).complete(packet);
} else if (raw instanceof ErrorPacket packet) {
packet.log(log, Level.SEVERE);
} else throw new RejectedException("Unhandled packet: " + raw.getClass().getSimpleName());