From 5fd105c9a90a34b27706bb7a0b715a757f5d3008 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 5 Mar 2016 17:56:42 -0600 Subject: [PATCH] Implement ENet for control and input streams --- .../com/limelight/nvstream/NvConnection.java | 2 +- .../nvstream/control/ControlStream.java | 103 +++++++++++++----- .../nvstream/control/InputPacketSender.java | 7 ++ .../nvstream/enet/EnetConnection.java | 79 ++++++++++++++ .../nvstream/input/ControllerStream.java | 43 ++++++-- .../limelight/nvstream/rtsp/SdpGenerator.java | 7 +- 6 files changed, 200 insertions(+), 41 deletions(-) create mode 100644 moonlight-common/src/com/limelight/nvstream/control/InputPacketSender.java create mode 100644 moonlight-common/src/com/limelight/nvstream/enet/EnetConnection.java diff --git a/moonlight-common/src/com/limelight/nvstream/NvConnection.java b/moonlight-common/src/com/limelight/nvstream/NvConnection.java index 02080fba..1744d372 100644 --- a/moonlight-common/src/com/limelight/nvstream/NvConnection.java +++ b/moonlight-common/src/com/limelight/nvstream/NvConnection.java @@ -292,7 +292,7 @@ public class NvConnection { // This avoids the race where inputStream != null but inputStream.initialize() // has not returned yet. ControllerStream tempController = new ControllerStream(context); - tempController.initialize(); + tempController.initialize(controlStream); tempController.start(); inputStream = tempController; return true; diff --git a/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java b/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java index 63b256d0..4d5659e3 100644 --- a/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java +++ b/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java @@ -13,11 +13,13 @@ import com.limelight.LimeLog; import com.limelight.nvstream.ConnectionContext; import com.limelight.nvstream.av.ConnectionStatusListener; import com.limelight.nvstream.av.video.VideoDecoderRenderer; +import com.limelight.nvstream.enet.EnetConnection; import com.limelight.utils.TimeHelper; -public class ControlStream implements ConnectionStatusListener { +public class ControlStream implements ConnectionStatusListener, InputPacketSender { - private static final int PORT = 47995; + private static final int TCP_PORT = 47995; + private static final int UDP_PORT = 47999; private static final int CONTROL_TIMEOUT = 10000; @@ -117,6 +119,10 @@ public class ControlStream implements ConnectionStatusListener { private int slowSinkCount; + // Used on Gen 5 servers and above + private EnetConnection enetConnection; + + // Used on Gen 4 servers and below private Socket s; private InputStream in; private OutputStream out; @@ -163,26 +169,43 @@ public class ControlStream implements ConnectionStatusListener { public void initialize() throws IOException { - s = new Socket(); - s.setTcpNoDelay(true); - s.connect(new InetSocketAddress(context.serverAddress, PORT), CONTROL_TIMEOUT); - in = s.getInputStream(); - out = s.getOutputStream(); + if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) { + enetConnection = EnetConnection.connect(context.serverAddress.getHostAddress(), UDP_PORT, CONTROL_TIMEOUT); + } + else { + s = new Socket(); + s.setTcpNoDelay(true); + s.connect(new InetSocketAddress(context.serverAddress, TCP_PORT), CONTROL_TIMEOUT); + in = s.getInputStream(); + out = s.getOutputStream(); + } } private void sendPacket(NvCtlPacket packet) throws IOException { // Prevent multiple clients from writing to the stream at the same time synchronized (this) { - packet.write(out); - out.flush(); + if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) { + packet.write(enetConnection); + } + else { + packet.write(out); + out.flush(); + } } } - private ControlStream.NvCtlResponse sendAndGetReply(NvCtlPacket packet) throws IOException + private void sendAndDiscardReply(NvCtlPacket packet) throws IOException { - sendPacket(packet); - return new NvCtlResponse(in); + synchronized (this) { + sendPacket(packet); + if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) { + enetConnection.readPacket(CONTROL_TIMEOUT); + } + else { + new NvCtlResponse(in); + } + } } private void sendLossStats(ByteBuffer bb) throws IOException @@ -200,6 +223,10 @@ public class ControlStream implements ConnectionStatusListener { payloadLengths[IDX_LOSS_STATS], bb.array())); } + public void sendInputPacket(byte[] data, short length) throws IOException { + sendPacket(new NvCtlPacket((short) 0x0207, length, data)); + } + public void abort() { if (aborting) { @@ -208,9 +235,17 @@ public class ControlStream implements ConnectionStatusListener { aborting = true; - try { - s.close(); - } catch (IOException e) {} + if (s != null) { + try { + s.close(); + } catch (IOException e) {} + } + + if (enetConnection != null) { + try { + enetConnection.close(); + } catch (IOException e) {} + } if (lossStatsThread != null) { lossStatsThread.interrupt(); @@ -232,13 +267,17 @@ public class ControlStream implements ConnectionStatusListener { public void start() throws IOException { // Use a finite timeout during the handshake process - s.setSoTimeout(CONTROL_TIMEOUT); + if (s != null) { + s.setSoTimeout(CONTROL_TIMEOUT); + } doStartA(); doStartB(); // Return to an infinte read timeout after the initial control handshake - s.setSoTimeout(0); + if (s != null) { + s.setSoTimeout(0); + } lossStatsThread = new Thread() { @Override @@ -333,14 +372,14 @@ public class ControlStream implements ConnectionStatusListener { resyncThread.start(); } - private ControlStream.NvCtlResponse doStartA() throws IOException + private void doStartA() throws IOException { - return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_A], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_A], (short) preconstructedPayloads[IDX_START_A].length, preconstructedPayloads[IDX_START_A])); } - private ControlStream.NvCtlResponse doStartB() throws IOException + private void doStartB() throws IOException { // Gen 3 and 5 both use a packet of this form if (context.serverGeneration != ConnectionContext.SERVER_GENERATION_4) { @@ -351,11 +390,11 @@ public class ControlStream implements ConnectionStatusListener { payload.putInt(0); payload.putInt(0xa); - return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_B], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_B], payloadLengths[IDX_START_B], payload.array())); } else { - return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_B], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_B], (short) preconstructedPayloads[IDX_START_B].length, preconstructedPayloads[IDX_START_B])); } @@ -386,11 +425,11 @@ public class ControlStream implements ConnectionStatusListener { } conf.putLong(0); - sendAndGetReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES], payloadLengths[IDX_INVALIDATE_REF_FRAMES], conf.array())); } else { - sendAndGetReply(new NvCtlPacket(packetTypes[IDX_REQUEST_IDR_FRAME], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_REQUEST_IDR_FRAME], (short) preconstructedPayloads[IDX_REQUEST_IDR_FRAME].length, preconstructedPayloads[IDX_REQUEST_IDR_FRAME])); } @@ -407,7 +446,7 @@ public class ControlStream implements ConnectionStatusListener { conf.putLong(nextSuccessfulFrame); conf.putLong(0); - sendAndGetReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES], + sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES], payloadLengths[IDX_INVALIDATE_REF_FRAMES], conf.array())); LimeLog.warning("Reference frame invalidation sent"); @@ -419,7 +458,7 @@ public class ControlStream implements ConnectionStatusListener { public byte[] payload; private static final ByteBuffer headerBuffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); - private static final ByteBuffer serializationBuffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN); + private static final ByteBuffer serializationBuffer = ByteBuffer.allocate(256).order(ByteOrder.LITTLE_ENDIAN); public NvCtlPacket(InputStream in) throws IOException { @@ -529,6 +568,18 @@ public class ControlStream implements ConnectionStatusListener { out.write(serializationBuffer.array(), 0, serializationBuffer.position()); } } + + public void write(EnetConnection conn) throws IOException + { + // Use the class's serialization buffer to construct the wireform to send + synchronized (serializationBuffer) { + serializationBuffer.rewind(); + serializationBuffer.putShort(type); + serializationBuffer.put(payload); + + conn.writePacket(serializationBuffer); + } + } } class NvCtlResponse extends NvCtlPacket { diff --git a/moonlight-common/src/com/limelight/nvstream/control/InputPacketSender.java b/moonlight-common/src/com/limelight/nvstream/control/InputPacketSender.java new file mode 100644 index 00000000..186dbf76 --- /dev/null +++ b/moonlight-common/src/com/limelight/nvstream/control/InputPacketSender.java @@ -0,0 +1,7 @@ +package com.limelight.nvstream.control; + +import java.io.IOException; + +public interface InputPacketSender { + void sendInputPacket(byte[] data, short length) throws IOException; +} diff --git a/moonlight-common/src/com/limelight/nvstream/enet/EnetConnection.java b/moonlight-common/src/com/limelight/nvstream/enet/EnetConnection.java new file mode 100644 index 00000000..d7eb6602 --- /dev/null +++ b/moonlight-common/src/com/limelight/nvstream/enet/EnetConnection.java @@ -0,0 +1,79 @@ +package com.limelight.nvstream.enet; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class EnetConnection implements Closeable { + private long enetPeer; + private long enetClient; + + private static final int ENET_PACKET_FLAG_RELIABLE = 1; + + static { + System.loadLibrary("jnienet"); + + initializeEnet(); + } + + private EnetConnection() {} + + public static EnetConnection connect(String host, int port, int timeout) throws IOException { + EnetConnection conn = new EnetConnection(); + + conn.enetClient = createClient(); + if (conn.enetClient == 0) { + throw new IOException("Unable to create ENet client"); + } + + conn.enetPeer = connectToPeer(conn.enetClient, host, port, timeout); + if (conn.enetPeer == 0) { + try { + conn.close(); + } catch (IOException e) {} + throw new IOException("Unable to connect to UDP port "+port); + } + + return conn; + } + + public ByteBuffer readPacket(int timeout) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(128); + + int readLength = readPacket(enetClient, buffer.array(), buffer.limit(), timeout); + if (readLength <= 0) { + throw new IOException("Failed to receive ENet packet"); + } + + buffer.limit(readLength); + + return buffer; + } + + public void writePacket(ByteBuffer buffer) throws IOException { + if (!writePacket(enetClient, enetPeer, buffer.array(), buffer.position(), ENET_PACKET_FLAG_RELIABLE)) { + throw new IOException("Failed to send ENet packet"); + } + } + + @Override + public void close() throws IOException { + if (enetPeer != 0) { + disconnectPeer(enetPeer); + enetPeer = 0; + } + + if (enetClient != 0) { + destroyClient(enetClient); + enetClient = 0; + } + } + + private static native int initializeEnet(); + private static native long createClient(); + private static native long connectToPeer(long client, String host, int port, int timeout); + private static native int readPacket(long client, byte[] data, int length, int timeout); + private static native boolean writePacket(long client, long peer, byte[] data, int length, int packetFlags); + private static native void destroyClient(long client); + private static native void disconnectPeer(long peer); +} diff --git a/moonlight-common/src/com/limelight/nvstream/input/ControllerStream.java b/moonlight-common/src/com/limelight/nvstream/input/ControllerStream.java index c566b5ed..ea506659 100644 --- a/moonlight-common/src/com/limelight/nvstream/input/ControllerStream.java +++ b/moonlight-common/src/com/limelight/nvstream/input/ControllerStream.java @@ -17,6 +17,7 @@ import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.IvParameterSpec; import com.limelight.nvstream.ConnectionContext; +import com.limelight.nvstream.control.InputPacketSender; public class ControllerStream { @@ -26,8 +27,13 @@ public class ControllerStream { private ConnectionContext context; + // Only used on Gen 4 or below servers private Socket s; private OutputStream out; + + // Used on Gen 5+ servers + private InputPacketSender controlSender; + private Cipher riCipher; private Thread inputThread; @@ -58,12 +64,19 @@ public class ControllerStream { } } - public void initialize() throws IOException + public void initialize(InputPacketSender controlSender) throws IOException { - s = new Socket(); - s.connect(new InetSocketAddress(context.serverAddress, PORT), CONTROLLER_TIMEOUT); - s.setTcpNoDelay(true); - out = s.getOutputStream(); + if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) { + // Gen 5 sends input over the control stream + this.controlSender = controlSender; + } + else { + // Gen 4 and below uses a separate TCP connection for input + s = new Socket(); + s.connect(new InetSocketAddress(context.serverAddress, PORT), CONTROLLER_TIMEOUT); + s.setTcpNoDelay(true); + out = s.getOutputStream(); + } } public void start() @@ -197,9 +210,11 @@ public class ControllerStream { } catch (InterruptedException e) {} } - try { - s.close(); - } catch (IOException e) {} + if (s != null) { + try { + s.close(); + } catch (IOException e) {} + } } private static int getPaddedSize(int length) { @@ -249,9 +264,15 @@ public class ControllerStream { return; } - // Send the packet - out.write(sendBuffer.array(), 0, paddedLength + 4); - out.flush(); + // Send the packet over the control stream on Gen 5+ + if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) { + controlSender.sendInputPacket(sendBuffer.array(), (short) (paddedLength + 4)); + } + else { + // Send the packet over the TCP connection on Gen 4 and below + out.write(sendBuffer.array(), 0, paddedLength + 4); + out.flush(); + } } private void queuePacket(InputPacket packet) { diff --git a/moonlight-common/src/com/limelight/nvstream/rtsp/SdpGenerator.java b/moonlight-common/src/com/limelight/nvstream/rtsp/SdpGenerator.java index 26ea44fe..3d7c85aa 100644 --- a/moonlight-common/src/com/limelight/nvstream/rtsp/SdpGenerator.java +++ b/moonlight-common/src/com/limelight/nvstream/rtsp/SdpGenerator.java @@ -58,10 +58,11 @@ public class SdpGenerator { } private static void addGen5Attributes(StringBuilder config, ConnectionContext context) { - // We want to use the legacy TCP connections for control and input rather than the new UDP stuff - addSessionAttribute(config, "x-nv-general.useReliableUdp", "0"); - addSessionAttribute(config, "x-nv-ri.useControlChannel", "0"); + // We want to use the new ENet connections for control and input + addSessionAttribute(config, "x-nv-general.useReliableUdp", "1"); + addSessionAttribute(config, "x-nv-ri.useControlChannel", "1"); + // Disable dynamic resolution switching addSessionAttribute(config, "x-nv-vqos[0].drc.enable", "0"); }