Rename NvController -> ControllerStream. Use a dedicated input thread rather than a thread pool for processing input. Batch analog axis updates and mouse moves.

This commit is contained in:
Cameron Gutman 2014-08-20 22:27:25 -07:00
parent 6556b3eb9b
commit c9eee2e075
4 changed files with 384 additions and 203 deletions

View File

@ -5,9 +5,6 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
@ -25,7 +22,7 @@ import com.limelight.nvstream.http.LimelightCryptoProvider;
import com.limelight.nvstream.http.NvApp;
import com.limelight.nvstream.http.NvHTTP;
import com.limelight.nvstream.http.PairingManager;
import com.limelight.nvstream.input.NvController;
import com.limelight.nvstream.input.ControllerStream;
import com.limelight.nvstream.rtsp.RtspConnection;
public class NvConnection {
@ -37,7 +34,7 @@ public class NvConnection {
private InetAddress hostAddr;
private ControlStream controlStream;
private NvController inputStream;
private ControllerStream inputStream;
private VideoStream videoStream;
private AudioStream audioStream;
@ -50,8 +47,6 @@ public class NvConnection {
private SecretKey riKey;
private int riKeyId;
private ThreadPoolExecutor threadPool;
public NvConnection(String host, String uniqueId, NvConnectionListener listener, StreamConfiguration config, LimelightCryptoProvider cryptoProvider)
{
this.host = host;
@ -69,9 +64,6 @@ public class NvConnection {
}
this.riKeyId = generateRiKeyId();
this.threadPool = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.DAYS,
new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
}
private static SecretKey generateRiAesKey() throws NoSuchAlgorithmException {
@ -89,8 +81,6 @@ public class NvConnection {
public void stop()
{
threadPool.shutdownNow();
if (videoStream != null) {
videoStream.abort();
}
@ -103,7 +93,7 @@ public class NvConnection {
}
if (inputStream != null) {
inputStream.close();
inputStream.abort();
inputStream = null;
}
}
@ -222,8 +212,9 @@ public class NvConnection {
// it to the instance variable once the object is properly initialized.
// This avoids the race where inputStream != null but inputStream.initialize()
// has not returned yet.
NvController tempController = new NvController(hostAddr, riKey, riKeyId);
ControllerStream tempController = new ControllerStream(hostAddr, riKey, riKeyId, listener);
tempController.initialize();
tempController.start();
inputStream = tempController;
return true;
}
@ -306,15 +297,7 @@ public class NvConnection {
if (inputStream == null)
return;
threadPool.execute(new Runnable() {
public void run() {
try {
inputStream.sendMouseMove(deltaX, deltaY);
} catch (IOException e) {
listener.connectionTerminated(e);
}
}
});
inputStream.sendMouseMove(deltaX, deltaY);
}
public void sendMouseButtonDown(final byte mouseButton)
@ -322,15 +305,7 @@ public class NvConnection {
if (inputStream == null)
return;
threadPool.execute(new Runnable() {
public void run() {
try {
inputStream.sendMouseButtonDown(mouseButton);
} catch (IOException e) {
listener.connectionTerminated(e);
}
}
});
inputStream.sendMouseButtonDown(mouseButton);
}
public void sendMouseButtonUp(final byte mouseButton)
@ -338,15 +313,7 @@ public class NvConnection {
if (inputStream == null)
return;
threadPool.execute(new Runnable() {
public void run() {
try {
inputStream.sendMouseButtonUp(mouseButton);
} catch (IOException e) {
listener.connectionTerminated(e);
}
}
});
inputStream.sendMouseButtonUp(mouseButton);
}
public void sendControllerInput(final short buttonFlags,
@ -357,31 +324,15 @@ public class NvConnection {
if (inputStream == null)
return;
threadPool.execute(new Runnable() {
public void run() {
try {
inputStream.sendControllerInput(buttonFlags, leftTrigger,
rightTrigger, leftStickX, leftStickY,
rightStickX, rightStickY);
} catch (IOException e) {
listener.connectionTerminated(e);
}
}
});
inputStream.sendControllerInput(buttonFlags, leftTrigger,
rightTrigger, leftStickX, leftStickY,
rightStickX, rightStickY);
}
public void sendKeyboardInput(final short keyMap, final byte keyDirection, final byte modifier) {
if (inputStream == null)
return;
threadPool.execute(new Runnable() {
public void run() {
try {
inputStream.sendKeyboardInput(keyMap, keyDirection, modifier);
} catch (IOException e) {
listener.connectionTerminated(e);
}
}
});
inputStream.sendKeyboardInput(keyMap, keyDirection, modifier);
}
}

View File

@ -0,0 +1,87 @@
package com.limelight.nvstream.input;
public class ControllerBatchingBlock {
private byte[] axisDirs = new byte[6];
private short buttonFlags;
private byte leftTrigger;
private byte rightTrigger;
private short leftStickX;
private short leftStickY;
private short rightStickX;
private short rightStickY;
public ControllerBatchingBlock(ControllerPacket initialPacket) {
this.buttonFlags = initialPacket.buttonFlags;
this.leftTrigger = initialPacket.leftTrigger;
this.rightTrigger = initialPacket.rightTrigger;
this.leftStickX = initialPacket.leftStickX;
this.leftStickY = initialPacket.leftStickY;
this.rightStickX = initialPacket.rightStickX;
this.rightStickY = initialPacket.rightStickY;
}
private boolean checkDirs(short currentVal, short newVal, int dirIndex) {
if (currentVal == newVal) {
return true;
}
// We want to send a packet if we've now zeroed an axis
if (newVal == 0) {
return false;
}
if (axisDirs[dirIndex] == 0) {
if (newVal < currentVal) {
axisDirs[dirIndex] = -1;
}
else {
axisDirs[dirIndex] = 1;
}
}
else if (axisDirs[dirIndex] == -1) {
return newVal < currentVal;
}
else if (newVal < currentVal) {
return false;
}
return true;
}
// Controller packet batching is far more restricted than mouse move batching.
// We have several restrictions that will cause batching to break up the controller packets.
// 1) Button flags must be the same for all packets in the batch
// 2) The movement direction of all axes must remain the same or be neutral
public boolean submitNewPacket(ControllerPacket packet) {
if (buttonFlags != packet.buttonFlags ||
!checkDirs(leftTrigger, packet.leftTrigger, 0) ||
!checkDirs(rightTrigger, packet.rightTrigger, 1) ||
!checkDirs(leftStickX, packet.leftStickX, 2) ||
!checkDirs(leftStickY, packet.leftStickY, 3) ||
!checkDirs(rightStickX, packet.rightStickX, 4) ||
!checkDirs(rightStickY, packet.rightStickY, 5))
{
return false;
}
this.leftTrigger = packet.leftTrigger;
this.rightTrigger = packet.rightTrigger;
this.leftStickX = packet.leftStickX;
this.leftStickY = packet.leftStickY;
this.rightStickX = packet.rightStickX;
this.rightStickY = packet.rightStickY;
return true;
}
public void reinitializePacket(ControllerPacket packet) {
packet.buttonFlags = buttonFlags;
packet.leftTrigger = leftTrigger;
packet.rightTrigger = rightTrigger;
packet.leftStickX = leftStickX;
packet.leftStickY = leftStickY;
packet.rightStickX = rightStickX;
packet.rightStickY = rightStickY;
}
}

View File

@ -0,0 +1,285 @@
package com.limelight.nvstream.input;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
import com.limelight.nvstream.NvConnectionListener;
public class ControllerStream {
public final static int PORT = 35043;
public final static int CONTROLLER_TIMEOUT = 3000;
private InetAddress host;
private Socket s;
private OutputStream out;
private Cipher riCipher;
private NvConnectionListener listener;
private Thread inputThread;
private LinkedBlockingQueue<InputPacket> inputQueue = new LinkedBlockingQueue<InputPacket>();
public ControllerStream(InetAddress host, SecretKey riKey, int riKeyId, NvConnectionListener listener)
{
this.host = host;
this.listener = listener;
try {
// This cipher is guaranteed to be supported
this.riCipher = Cipher.getInstance("AES/CBC/NoPadding");
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putInt(riKeyId);
this.riCipher.init(Cipher.ENCRYPT_MODE, riKey, new IvParameterSpec(bb.array()));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (NoSuchPaddingException e) {
e.printStackTrace();
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (InvalidAlgorithmParameterException e) {
e.printStackTrace();
}
}
public void initialize() throws IOException
{
s = new Socket();
s.connect(new InetSocketAddress(host, PORT), CONTROLLER_TIMEOUT);
s.setTcpNoDelay(true);
out = s.getOutputStream();
}
public void start()
{
inputThread = new Thread() {
@Override
public void run() {
while (!isInterrupted()) {
InputPacket packet;
try {
packet = inputQueue.take();
} catch (InterruptedException e) {
listener.connectionTerminated(e);
return;
}
// Try to batch mouse move packets
if (!inputQueue.isEmpty() && packet instanceof MouseMovePacket) {
MouseMovePacket initialMouseMove = (MouseMovePacket) packet;
int totalDeltaX = initialMouseMove.deltaX;
int totalDeltaY = initialMouseMove.deltaY;
// Combine the deltas with other mouse move packets in the queue
synchronized (inputQueue) {
Iterator<InputPacket> i = inputQueue.iterator();
while (i.hasNext()) {
InputPacket queuedPacket = i.next();
if (queuedPacket instanceof MouseMovePacket) {
MouseMovePacket queuedMouseMove = (MouseMovePacket) queuedPacket;
// Add this packet's deltas to the running total
totalDeltaX += queuedMouseMove.deltaX;
totalDeltaY += queuedMouseMove.deltaY;
// Remove this packet from the queue
i.remove();
}
}
}
// Total deltas could overflow the short so we must split them if required
do {
short partialDeltaX = (short)(totalDeltaX < 0 ?
Math.max(Short.MIN_VALUE, totalDeltaX) :
Math.min(Short.MAX_VALUE, totalDeltaX));
short partialDeltaY = (short)(totalDeltaY < 0 ?
Math.max(Short.MIN_VALUE, totalDeltaY) :
Math.min(Short.MAX_VALUE, totalDeltaY));
initialMouseMove.deltaX = partialDeltaX;
initialMouseMove.deltaY = partialDeltaY;
try {
sendPacket(initialMouseMove);
} catch (IOException e) {
listener.connectionTerminated(e);
return;
}
totalDeltaX -= partialDeltaX;
totalDeltaY -= partialDeltaY;
} while (totalDeltaX != 0 && totalDeltaY != 0);
}
// Try to batch axis changes on controller packets too
else if (!inputQueue.isEmpty() && packet instanceof ControllerPacket) {
ControllerPacket initialControllerPacket = (ControllerPacket) packet;
ControllerBatchingBlock batchingBlock = null;
synchronized (inputQueue) {
Iterator<InputPacket> i = inputQueue.iterator();
while (i.hasNext()) {
InputPacket queuedPacket = i.next();
if (queuedPacket instanceof ControllerPacket) {
// Only initialize the batching block if we got here
if (batchingBlock == null) {
batchingBlock = new ControllerBatchingBlock(initialControllerPacket);
}
if (batchingBlock.submitNewPacket((ControllerPacket) queuedPacket))
{
// Batching was successful, so remove this packet
i.remove();
}
else
{
// Unable to batch so we must stop
break;
}
}
}
}
if (batchingBlock != null) {
// Reinitialize the initial packet with the new values
batchingBlock.reinitializePacket(initialControllerPacket);
}
try {
sendPacket(packet);
} catch (IOException e) {
listener.connectionTerminated(e);
return;
}
}
else {
// Send any other packet as-is
try {
sendPacket(packet);
} catch (IOException e) {
listener.connectionTerminated(e);
return;
}
}
}
}
};
inputThread.setName("Input - Queue");
inputThread.start();
}
public void abort()
{
if (inputThread != null) {
inputThread.interrupt();
try {
inputThread.join();
} catch (InterruptedException e) {}
}
try {
s.close();
} catch (IOException e) {}
}
private static int getPaddedSize(int length) {
return ((length + 15) / 16) * 16;
}
private static byte[] padData(byte[] data) {
// This implements the PKCS7 padding algorithm
if ((data.length % 16) == 0) {
// Already a multiple of 16
return data;
}
byte[] padded = Arrays.copyOf(data, getPaddedSize(data.length));
byte paddingByte = (byte)(16 - (data.length % 16));
for (int i = data.length; i < padded.length; i++) {
padded[i] = paddingByte;
}
return padded;
}
private byte[] encryptAesInputData(byte[] data) throws Exception {
return riCipher.update(padData(data));
}
private void sendPacket(InputPacket packet) throws IOException {
byte[] toWire = packet.toWire();
// Pad to 16 byte chunks
int paddedLength = getPaddedSize(toWire.length);
// Allocate a byte buffer to represent the final packet
ByteBuffer bb = ByteBuffer.allocate(4 + paddedLength);
bb.putInt(paddedLength);
try {
bb.put(encryptAesInputData(toWire));
} catch (Exception e) {
// Should never happen
e.printStackTrace();
return;
}
// Send the packet
out.write(bb.array());
out.flush();
}
private void queuePacket(InputPacket packet) {
synchronized (inputQueue) {
inputQueue.add(packet);
}
}
public void sendControllerInput(short buttonFlags, byte leftTrigger, byte rightTrigger,
short leftStickX, short leftStickY, short rightStickX, short rightStickY)
{
queuePacket(new ControllerPacket(buttonFlags, leftTrigger,
rightTrigger, leftStickX, leftStickY,
rightStickX, rightStickY));
}
public void sendMouseButtonDown(byte mouseButton)
{
queuePacket(new MouseButtonPacket(true, mouseButton));
}
public void sendMouseButtonUp(byte mouseButton)
{
queuePacket(new MouseButtonPacket(false, mouseButton));
}
public void sendMouseMove(short deltaX, short deltaY)
{
queuePacket(new MouseMovePacket(deltaX, deltaY));
}
public void sendKeyboardInput(short keyMap, byte keyDirection, byte modifier)
{
queuePacket(new KeyboardPacket(keyMap, keyDirection, modifier));
}
}

View File

@ -1,142 +0,0 @@
package com.limelight.nvstream.input;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
public class NvController {
public final static int PORT = 35043;
public final static int CONTROLLER_TIMEOUT = 3000;
private InetAddress host;
private Socket s;
private OutputStream out;
private Cipher riCipher;
public NvController(InetAddress host, SecretKey riKey, int riKeyId)
{
this.host = host;
try {
// This cipher is guaranteed to be supported
this.riCipher = Cipher.getInstance("AES/CBC/NoPadding");
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putInt(riKeyId);
this.riCipher.init(Cipher.ENCRYPT_MODE, riKey, new IvParameterSpec(bb.array()));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (NoSuchPaddingException e) {
e.printStackTrace();
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (InvalidAlgorithmParameterException e) {
e.printStackTrace();
}
}
public void initialize() throws IOException
{
s = new Socket();
s.connect(new InetSocketAddress(host, PORT), CONTROLLER_TIMEOUT);
s.setTcpNoDelay(true);
out = s.getOutputStream();
}
public void close()
{
try {
s.close();
} catch (IOException e) {}
}
private static int getPaddedSize(int length) {
return ((length + 15) / 16) * 16;
}
private static byte[] padData(byte[] data) {
// This implements the PKCS7 padding algorithm
if ((data.length % 16) == 0) {
// Already a multiple of 16
return data;
}
byte[] padded = Arrays.copyOf(data, getPaddedSize(data.length));
byte paddingByte = (byte)(16 - (data.length % 16));
for (int i = data.length; i < padded.length; i++) {
padded[i] = paddingByte;
}
return padded;
}
private byte[] encryptAesInputData(byte[] data) throws Exception {
return riCipher.update(padData(data));
}
private void sendPacket(InputPacket packet) throws IOException {
byte[] toWire = packet.toWire();
// Pad to 16 byte chunks
int paddedLength = getPaddedSize(toWire.length);
// Allocate a byte buffer to represent the final packet
ByteBuffer bb = ByteBuffer.allocate(4 + paddedLength);
bb.putInt(paddedLength);
try {
bb.put(encryptAesInputData(toWire));
} catch (Exception e) {
// Should never happen
e.printStackTrace();
return;
}
// Send the packet
out.write(bb.array());
out.flush();
}
public void sendControllerInput(short buttonFlags, byte leftTrigger, byte rightTrigger,
short leftStickX, short leftStickY, short rightStickX, short rightStickY) throws IOException
{
sendPacket(new ControllerPacket(buttonFlags, leftTrigger,
rightTrigger, leftStickX, leftStickY,
rightStickX, rightStickY));
}
public void sendMouseButtonDown(byte mouseButton) throws IOException
{
sendPacket(new MouseButtonPacket(true, mouseButton));
}
public void sendMouseButtonUp(byte mouseButton) throws IOException
{
sendPacket(new MouseButtonPacket(false, mouseButton));
}
public void sendMouseMove(short deltaX, short deltaY) throws IOException
{
sendPacket(new MouseMovePacket(deltaX, deltaY));
}
public void sendKeyboardInput(short keyMap, byte keyDirection, byte modifier) throws IOException
{
sendPacket(new KeyboardPacket(keyMap, keyDirection, modifier));
}
}