Implement ENet for control and input streams

This commit is contained in:
Cameron Gutman 2016-03-05 17:56:42 -06:00
parent 306c2d143b
commit 5fd105c9a9
6 changed files with 200 additions and 41 deletions

View File

@ -292,7 +292,7 @@ public class NvConnection {
// This avoids the race where inputStream != null but inputStream.initialize()
// has not returned yet.
ControllerStream tempController = new ControllerStream(context);
tempController.initialize();
tempController.initialize(controlStream);
tempController.start();
inputStream = tempController;
return true;

View File

@ -13,11 +13,13 @@ import com.limelight.LimeLog;
import com.limelight.nvstream.ConnectionContext;
import com.limelight.nvstream.av.ConnectionStatusListener;
import com.limelight.nvstream.av.video.VideoDecoderRenderer;
import com.limelight.nvstream.enet.EnetConnection;
import com.limelight.utils.TimeHelper;
public class ControlStream implements ConnectionStatusListener {
public class ControlStream implements ConnectionStatusListener, InputPacketSender {
private static final int PORT = 47995;
private static final int TCP_PORT = 47995;
private static final int UDP_PORT = 47999;
private static final int CONTROL_TIMEOUT = 10000;
@ -117,6 +119,10 @@ public class ControlStream implements ConnectionStatusListener {
private int slowSinkCount;
// Used on Gen 5 servers and above
private EnetConnection enetConnection;
// Used on Gen 4 servers and below
private Socket s;
private InputStream in;
private OutputStream out;
@ -163,26 +169,43 @@ public class ControlStream implements ConnectionStatusListener {
public void initialize() throws IOException
{
if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) {
enetConnection = EnetConnection.connect(context.serverAddress.getHostAddress(), UDP_PORT, CONTROL_TIMEOUT);
}
else {
s = new Socket();
s.setTcpNoDelay(true);
s.connect(new InetSocketAddress(context.serverAddress, PORT), CONTROL_TIMEOUT);
s.connect(new InetSocketAddress(context.serverAddress, TCP_PORT), CONTROL_TIMEOUT);
in = s.getInputStream();
out = s.getOutputStream();
}
}
private void sendPacket(NvCtlPacket packet) throws IOException
{
// Prevent multiple clients from writing to the stream at the same time
synchronized (this) {
if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) {
packet.write(enetConnection);
}
else {
packet.write(out);
out.flush();
}
}
}
private ControlStream.NvCtlResponse sendAndGetReply(NvCtlPacket packet) throws IOException
private void sendAndDiscardReply(NvCtlPacket packet) throws IOException
{
synchronized (this) {
sendPacket(packet);
return new NvCtlResponse(in);
if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) {
enetConnection.readPacket(CONTROL_TIMEOUT);
}
else {
new NvCtlResponse(in);
}
}
}
private void sendLossStats(ByteBuffer bb) throws IOException
@ -200,6 +223,10 @@ public class ControlStream implements ConnectionStatusListener {
payloadLengths[IDX_LOSS_STATS], bb.array()));
}
public void sendInputPacket(byte[] data, short length) throws IOException {
sendPacket(new NvCtlPacket((short) 0x0207, length, data));
}
public void abort()
{
if (aborting) {
@ -208,9 +235,17 @@ public class ControlStream implements ConnectionStatusListener {
aborting = true;
if (s != null) {
try {
s.close();
} catch (IOException e) {}
}
if (enetConnection != null) {
try {
enetConnection.close();
} catch (IOException e) {}
}
if (lossStatsThread != null) {
lossStatsThread.interrupt();
@ -232,13 +267,17 @@ public class ControlStream implements ConnectionStatusListener {
public void start() throws IOException
{
// Use a finite timeout during the handshake process
if (s != null) {
s.setSoTimeout(CONTROL_TIMEOUT);
}
doStartA();
doStartB();
// Return to an infinte read timeout after the initial control handshake
if (s != null) {
s.setSoTimeout(0);
}
lossStatsThread = new Thread() {
@Override
@ -333,14 +372,14 @@ public class ControlStream implements ConnectionStatusListener {
resyncThread.start();
}
private ControlStream.NvCtlResponse doStartA() throws IOException
private void doStartA() throws IOException
{
return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_A],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_A],
(short) preconstructedPayloads[IDX_START_A].length,
preconstructedPayloads[IDX_START_A]));
}
private ControlStream.NvCtlResponse doStartB() throws IOException
private void doStartB() throws IOException
{
// Gen 3 and 5 both use a packet of this form
if (context.serverGeneration != ConnectionContext.SERVER_GENERATION_4) {
@ -351,11 +390,11 @@ public class ControlStream implements ConnectionStatusListener {
payload.putInt(0);
payload.putInt(0xa);
return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_B],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_B],
payloadLengths[IDX_START_B], payload.array()));
}
else {
return sendAndGetReply(new NvCtlPacket(packetTypes[IDX_START_B],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_START_B],
(short) preconstructedPayloads[IDX_START_B].length,
preconstructedPayloads[IDX_START_B]));
}
@ -386,11 +425,11 @@ public class ControlStream implements ConnectionStatusListener {
}
conf.putLong(0);
sendAndGetReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES],
payloadLengths[IDX_INVALIDATE_REF_FRAMES], conf.array()));
}
else {
sendAndGetReply(new NvCtlPacket(packetTypes[IDX_REQUEST_IDR_FRAME],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_REQUEST_IDR_FRAME],
(short) preconstructedPayloads[IDX_REQUEST_IDR_FRAME].length,
preconstructedPayloads[IDX_REQUEST_IDR_FRAME]));
}
@ -407,7 +446,7 @@ public class ControlStream implements ConnectionStatusListener {
conf.putLong(nextSuccessfulFrame);
conf.putLong(0);
sendAndGetReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES],
sendAndDiscardReply(new NvCtlPacket(packetTypes[IDX_INVALIDATE_REF_FRAMES],
payloadLengths[IDX_INVALIDATE_REF_FRAMES], conf.array()));
LimeLog.warning("Reference frame invalidation sent");
@ -419,7 +458,7 @@ public class ControlStream implements ConnectionStatusListener {
public byte[] payload;
private static final ByteBuffer headerBuffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
private static final ByteBuffer serializationBuffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN);
private static final ByteBuffer serializationBuffer = ByteBuffer.allocate(256).order(ByteOrder.LITTLE_ENDIAN);
public NvCtlPacket(InputStream in) throws IOException
{
@ -529,6 +568,18 @@ public class ControlStream implements ConnectionStatusListener {
out.write(serializationBuffer.array(), 0, serializationBuffer.position());
}
}
public void write(EnetConnection conn) throws IOException
{
// Use the class's serialization buffer to construct the wireform to send
synchronized (serializationBuffer) {
serializationBuffer.rewind();
serializationBuffer.putShort(type);
serializationBuffer.put(payload);
conn.writePacket(serializationBuffer);
}
}
}
class NvCtlResponse extends NvCtlPacket {

View File

@ -0,0 +1,7 @@
package com.limelight.nvstream.control;
import java.io.IOException;
public interface InputPacketSender {
void sendInputPacket(byte[] data, short length) throws IOException;
}

View File

@ -0,0 +1,79 @@
package com.limelight.nvstream.enet;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
public class EnetConnection implements Closeable {
private long enetPeer;
private long enetClient;
private static final int ENET_PACKET_FLAG_RELIABLE = 1;
static {
System.loadLibrary("jnienet");
initializeEnet();
}
private EnetConnection() {}
public static EnetConnection connect(String host, int port, int timeout) throws IOException {
EnetConnection conn = new EnetConnection();
conn.enetClient = createClient();
if (conn.enetClient == 0) {
throw new IOException("Unable to create ENet client");
}
conn.enetPeer = connectToPeer(conn.enetClient, host, port, timeout);
if (conn.enetPeer == 0) {
try {
conn.close();
} catch (IOException e) {}
throw new IOException("Unable to connect to UDP port "+port);
}
return conn;
}
public ByteBuffer readPacket(int timeout) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(128);
int readLength = readPacket(enetClient, buffer.array(), buffer.limit(), timeout);
if (readLength <= 0) {
throw new IOException("Failed to receive ENet packet");
}
buffer.limit(readLength);
return buffer;
}
public void writePacket(ByteBuffer buffer) throws IOException {
if (!writePacket(enetClient, enetPeer, buffer.array(), buffer.position(), ENET_PACKET_FLAG_RELIABLE)) {
throw new IOException("Failed to send ENet packet");
}
}
@Override
public void close() throws IOException {
if (enetPeer != 0) {
disconnectPeer(enetPeer);
enetPeer = 0;
}
if (enetClient != 0) {
destroyClient(enetClient);
enetClient = 0;
}
}
private static native int initializeEnet();
private static native long createClient();
private static native long connectToPeer(long client, String host, int port, int timeout);
private static native int readPacket(long client, byte[] data, int length, int timeout);
private static native boolean writePacket(long client, long peer, byte[] data, int length, int packetFlags);
private static native void destroyClient(long client);
private static native void disconnectPeer(long peer);
}

View File

@ -17,6 +17,7 @@ import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.IvParameterSpec;
import com.limelight.nvstream.ConnectionContext;
import com.limelight.nvstream.control.InputPacketSender;
public class ControllerStream {
@ -26,8 +27,13 @@ public class ControllerStream {
private ConnectionContext context;
// Only used on Gen 4 or below servers
private Socket s;
private OutputStream out;
// Used on Gen 5+ servers
private InputPacketSender controlSender;
private Cipher riCipher;
private Thread inputThread;
@ -58,13 +64,20 @@ public class ControllerStream {
}
}
public void initialize() throws IOException
public void initialize(InputPacketSender controlSender) throws IOException
{
if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) {
// Gen 5 sends input over the control stream
this.controlSender = controlSender;
}
else {
// Gen 4 and below uses a separate TCP connection for input
s = new Socket();
s.connect(new InetSocketAddress(context.serverAddress, PORT), CONTROLLER_TIMEOUT);
s.setTcpNoDelay(true);
out = s.getOutputStream();
}
}
public void start()
{
@ -197,10 +210,12 @@ public class ControllerStream {
} catch (InterruptedException e) {}
}
if (s != null) {
try {
s.close();
} catch (IOException e) {}
}
}
private static int getPaddedSize(int length) {
return ((length + 15) / 16) * 16;
@ -249,10 +264,16 @@ public class ControllerStream {
return;
}
// Send the packet
// Send the packet over the control stream on Gen 5+
if (context.serverGeneration >= ConnectionContext.SERVER_GENERATION_5) {
controlSender.sendInputPacket(sendBuffer.array(), (short) (paddedLength + 4));
}
else {
// Send the packet over the TCP connection on Gen 4 and below
out.write(sendBuffer.array(), 0, paddedLength + 4);
out.flush();
}
}
private void queuePacket(InputPacket packet) {
synchronized (inputQueue) {

View File

@ -58,10 +58,11 @@ public class SdpGenerator {
}
private static void addGen5Attributes(StringBuilder config, ConnectionContext context) {
// We want to use the legacy TCP connections for control and input rather than the new UDP stuff
addSessionAttribute(config, "x-nv-general.useReliableUdp", "0");
addSessionAttribute(config, "x-nv-ri.useControlChannel", "0");
// We want to use the new ENet connections for control and input
addSessionAttribute(config, "x-nv-general.useReliableUdp", "1");
addSessionAttribute(config, "x-nv-ri.useControlChannel", "1");
// Disable dynamic resolution switching
addSessionAttribute(config, "x-nv-vqos[0].drc.enable", "0");
}