Properly terminate threads when the game activity exits.

This commit is contained in:
Cameron Gutman 2013-11-10 01:30:55 -05:00
parent 61ae83337c
commit 0a445b9add
6 changed files with 228 additions and 103 deletions

View File

@ -64,9 +64,13 @@ public class Game extends Activity implements OnGenericMotionListener, OnTouchLi
@Override @Override
public void onPause() { public void onPause() {
finish();
super.onPause(); super.onPause();
}
System.exit(0); public void onDestroy() {
conn.stop();
super.onDestroy();
} }
@Override @Override

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.SocketException; import java.net.SocketException;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import jlibrtp.Participant; import jlibrtp.Participant;
@ -33,8 +34,40 @@ public class NvAudioStream {
private AvAudioDepacketizer depacketizer = new AvAudioDepacketizer(); private AvAudioDepacketizer depacketizer = new AvAudioDepacketizer();
private LinkedList<Thread> threads = new LinkedList<Thread>();
private AvByteBufferPool pool = new AvByteBufferPool(1500); private AvByteBufferPool pool = new AvByteBufferPool(1500);
private boolean aborting = false;
public void abort()
{
if (aborting) {
return;
}
aborting = true;
for (Thread t : threads) {
t.interrupt();
}
// Close the socket to interrupt the receive thread
rtp.close();
// Wait for threads to terminate
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { }
}
//session.endSession();
track.release();
threads.clear();
}
public void startAudioStream(final String host) public void startAudioStream(final String host)
{ {
new Thread(new Runnable() { new Thread(new Runnable() {
@ -111,17 +144,17 @@ public class NvAudioStream {
{ {
// This thread lessens the work on the receive thread // This thread lessens the work on the receive thread
// so it can spend more time waiting for data // so it can spend more time waiting for data
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
for (;;) while (!isInterrupted())
{ {
AvRtpPacket packet; AvRtpPacket packet;
try { try {
packet = packets.take(); packet = packets.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -129,23 +162,25 @@ public class NvAudioStream {
depacketizer.decodeInputData(packet); depacketizer.decodeInputData(packet);
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startDecoderThread() private void startDecoderThread()
{ {
// Decoder thread // Decoder thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
for (;;) while (!isInterrupted())
{ {
AvShortBufferDescriptor samples; AvShortBufferDescriptor samples;
try { try {
samples = depacketizer.getNextDecodedData(); samples = depacketizer.getNextDecodedData();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -154,24 +189,26 @@ public class NvAudioStream {
depacketizer.releaseBuffer(samples); depacketizer.releaseBuffer(samples);
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startReceiveThread() private void startReceiveThread()
{ {
// Receive thread // Receive thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500); DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500);
AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0); AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0);
for (;;) while (!isInterrupted())
{ {
try { try {
rtp.receive(packet); rtp.receive(packet);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -186,13 +223,15 @@ public class NvAudioStream {
packet.setData(pool.allocate(), 0, 1500); packet.setData(pool.allocate(), 0, 1500);
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startUdpPingThread() private void startUdpPingThread()
{ {
// Ping thread // Ping thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
// PING in ASCII // PING in ASCII
@ -202,17 +241,20 @@ public class NvAudioStream {
session.payloadType(127); session.payloadType(127);
// Send PING every 100 ms // Send PING every 100 ms
for (;;) while (!isInterrupted())
{ {
session.sendData(pingPacket); session.sendData(pingPacket);
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; abort();
return;
} }
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
} }

View File

@ -2,7 +2,6 @@ package com.limelight.nvstream;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -17,7 +16,6 @@ import android.app.Activity;
import android.view.Surface; import android.view.Surface;
import android.widget.Toast; import android.widget.Toast;
import com.limelight.Game;
import com.limelight.nvstream.input.NvController; import com.limelight.nvstream.input.NvController;
public class NvConnection { public class NvConnection {
@ -27,6 +25,8 @@ public class NvConnection {
private NvControl controlStream; private NvControl controlStream;
private NvController inputStream; private NvController inputStream;
private Surface video; private Surface video;
private NvVideoStream videoStream = new NvVideoStream();
private NvAudioStream audioStream = new NvAudioStream();
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
@ -65,6 +65,20 @@ public class NvConnection {
return null; return null;
} }
public void stop()
{
videoStream.abort();
audioStream.abort();
if (controlStream != null) {
controlStream.abort();
}
if (inputStream != null) {
inputStream.close();
}
}
public void start() public void start()
{ {
new Thread(new Runnable() { new Thread(new Runnable() {
@ -81,8 +95,8 @@ public class NvConnection {
try { try {
startSteamBigPicture(); startSteamBigPicture();
performHandshake(); performHandshake();
startVideo(video); videoStream.startVideoStream(host, video);
startAudio(); audioStream.startAudioStream(host);
beginControlStream(); beginControlStream();
controlStream.startJitterPackets(); controlStream.startJitterPackets();
startController(); startController();
@ -97,16 +111,6 @@ public class NvConnection {
}).start(); }).start();
} }
public void startVideo(Surface surface)
{
new NvVideoStream().startVideoStream(host, surface);
}
public void startAudio()
{
new NvAudioStream().startAudioStream(host);
}
public void sendMouseMove(final short deltaX, final short deltaY) public void sendMouseMove(final short deltaX, final short deltaY)
{ {
if (inputStream == null) if (inputStream == null)
@ -224,7 +228,7 @@ public class NvConnection {
controlStream = new NvControl(host); controlStream = new NvControl(host);
System.out.println("Starting control"); System.out.println("Starting control");
controlStream.beginControl(); controlStream.start();
} }
private void startController() throws UnknownHostException, IOException private void startController() throws UnknownHostException, IOException

View File

@ -161,6 +161,10 @@ public class NvControl {
private InputStream in; private InputStream in;
private OutputStream out; private OutputStream out;
private Thread heartbeatThread;
private Thread jitterThread;
private boolean aborting = false;
public NvControl(String host) throws UnknownHostException, IOException public NvControl(String host) throws UnknownHostException, IOException
{ {
s = new Socket(host, PORT); s = new Socket(host, PORT);
@ -192,7 +196,28 @@ public class NvControl {
sendPacket(new NvCtlPacket(PTYPE_JITTER, PPAYLEN_JITTER, bb.array())); sendPacket(new NvCtlPacket(PTYPE_JITTER, PPAYLEN_JITTER, bb.array()));
} }
public void beginControl() throws IOException public void abort()
{
if (aborting) {
return;
}
aborting = true;
if (jitterThread != null) {
jitterThread.interrupt();
}
if (heartbeatThread != null) {
heartbeatThread.interrupt();
}
try {
s.close();
} catch (IOException e) {}
}
public void start() throws IOException
{ {
System.out.println("CTL: Sending hello"); System.out.println("CTL: Sending hello");
sendHello(); sendHello();
@ -206,55 +231,54 @@ public class NvControl {
//send1404(); //send1404();
System.out.println("CTL: Launching heartbeat thread"); System.out.println("CTL: Launching heartbeat thread");
new Thread(new Runnable() { heartbeatThread = new Thread() {
@Override @Override
public void run() { public void run() {
for (;;) while (!isInterrupted())
{ {
try { try {
sendHeartbeat(); sendHeartbeat();
} catch (IOException e1) { } catch (IOException e1) {
e1.printStackTrace(); abort();
break; return;
} }
try { try {
Thread.sleep(3000); Thread.sleep(3000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; abort();
return;
} }
} }
} }
}).start(); };
heartbeatThread.start();
} }
public void startJitterPackets() public void startJitterPackets()
{ {
new Thread(new Runnable() { jitterThread = new Thread() {
@Override @Override
public void run() { public void run() {
for (;;) while (!isInterrupted())
{ {
try { try {
sendJitter(); sendJitter();
} catch (IOException e1) { } catch (IOException e1) {
e1.printStackTrace(); abort();
break; return;
} }
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; abort();
return;
} }
} }
} }
}).start(); };
} jitterThread.start();
public void endControl() throws IOException
{
s.close();
} }
private NvControl.NvCtlResponse send1405AndGetResponse() throws IOException private NvControl.NvCtlResponse send1405AndGetResponse() throws IOException
@ -262,11 +286,6 @@ public class NvControl {
return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405)); return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405));
} }
private void send1404() throws IOException
{
sendPacket(new NvCtlPacket(PTYPE_1404, PPAYLEN_1404));
}
private void sendHello() throws IOException private void sendHello() throws IOException
{ {
sendPacket(new NvCtlPacket(PTYPE_HELLO, PPAYLEN_HELLO, PPAYLOAD_HELLO)); sendPacket(new NvCtlPacket(PTYPE_HELLO, PPAYLEN_HELLO, PPAYLOAD_HELLO));

View File

@ -8,6 +8,7 @@ import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import com.limelight.nvstream.av.AvByteBufferDescriptor; import com.limelight.nvstream.av.AvByteBufferDescriptor;
@ -37,10 +38,43 @@ public class NvVideoStream {
private RTPSession session; private RTPSession session;
private DatagramSocket rtp; private DatagramSocket rtp;
private LinkedList<Thread> threads = new LinkedList<Thread>();
private AvByteBufferPool pool = new AvByteBufferPool(1500); private AvByteBufferPool pool = new AvByteBufferPool(1500);
private AvVideoDepacketizer depacketizer = new AvVideoDepacketizer(); private AvVideoDepacketizer depacketizer = new AvVideoDepacketizer();
private boolean aborting = false;
public void abort()
{
if (aborting) {
return;
}
aborting = true;
// Interrupt threads
for (Thread t : threads) {
t.interrupt();
}
// Close the socket to interrupt the receive thread
rtp.close();
// Wait for threads to terminate
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { }
}
//session.endSession();
videoDecoder.release();
threads.clear();
}
private InputStream openFirstFrameInputStream(String host) throws UnknownHostException, IOException private InputStream openFirstFrameInputStream(String host) throws UnknownHostException, IOException
{ {
Socket s = new Socket(host, FIRST_FRAME_PORT); Socket s = new Socket(host, FIRST_FRAME_PORT);
@ -104,8 +138,8 @@ public class NvVideoStream {
public void startVideoStream(final String host, final Surface surface) public void startVideoStream(final String host, final Surface surface)
{ {
new Thread(new Runnable() { // This thread becomes the output display thread
Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
// Setup the decoder context // Setup the decoder context
@ -137,29 +171,32 @@ public class NvVideoStream {
readFirstFrame(host); readFirstFrame(host);
} catch (IOException e2) { } catch (IOException e2) {
e2.printStackTrace(); e2.printStackTrace();
abort();
return; return;
} }
// Render the frames that are coming out of the decoder // Render the frames that are coming out of the decoder
outputDisplayLoop(); outputDisplayLoop(this);
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startDecoderThread() private void startDecoderThread()
{ {
// Decoder thread // Decoder thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
// Read the decode units generated from the RTP stream // Read the decode units generated from the RTP stream
for (;;) while (!isInterrupted())
{ {
AvDecodeUnit du; AvDecodeUnit du;
try { try {
du = depacketizer.getNextDecodeUnit(); du = depacketizer.getNextDecodeUnit();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -167,7 +204,10 @@ public class NvVideoStream {
{ {
case AvDecodeUnit.TYPE_H264: case AvDecodeUnit.TYPE_H264:
{ {
int inputIndex = videoDecoder.dequeueInputBuffer(-1); // Wait for an input buffer or thread termination
while (!isInterrupted())
{
int inputIndex = videoDecoder.dequeueInputBuffer(100);
if (inputIndex >= 0) if (inputIndex >= 0)
{ {
ByteBuffer buf = videoDecoderInputBuffers[inputIndex]; ByteBuffer buf = videoDecoderInputBuffers[inputIndex];
@ -187,36 +227,42 @@ public class NvVideoStream {
videoDecoder.queueInputBuffer(inputIndex, videoDecoder.queueInputBuffer(inputIndex,
0, du.getDataLength(), 0, du.getDataLength(),
0, du.getFlags()); 0, du.getFlags());
break;
}
} }
} }
break; break;
default: default:
{ {
System.out.println("Unknown decode unit type"); System.err.println("Unknown decode unit type");
} abort();
break; return;
} }
} }
} }
}).start(); }
};
threads.add(t);
t.start();
} }
private void startDepacketizerThread() private void startDepacketizerThread()
{ {
// This thread lessens the work on the receive thread // This thread lessens the work on the receive thread
// so it can spend more time waiting for data // so it can spend more time waiting for data
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
for (;;) while (!isInterrupted())
{ {
AvRtpPacket packet; AvRtpPacket packet;
try { try {
packet = packets.take(); packet = packets.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -224,24 +270,26 @@ public class NvVideoStream {
depacketizer.addInputData(packet); depacketizer.addInputData(packet);
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startReceiveThread() private void startReceiveThread()
{ {
// Receive thread // Receive thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500); DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500);
AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0); AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0);
for (;;) while (!isInterrupted())
{ {
try { try {
rtp.receive(packet); rtp.receive(packet);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); abort();
return; return;
} }
@ -256,13 +304,15 @@ public class NvVideoStream {
packet.setData(pool.allocate(), 0, 1500); packet.setData(pool.allocate(), 0, 1500);
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void startUdpPingThread() private void startUdpPingThread()
{ {
// Ping thread // Ping thread
new Thread(new Runnable() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
// PING in ASCII // PING in ASCII
@ -272,26 +322,29 @@ public class NvVideoStream {
session.payloadType(127); session.payloadType(127);
// Send PING every 100 ms // Send PING every 100 ms
for (;;) while (!isInterrupted())
{ {
session.sendData(pingPacket); session.sendData(pingPacket);
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; abort();
return;
} }
} }
} }
}).start(); };
threads.add(t);
t.start();
} }
private void outputDisplayLoop() private void outputDisplayLoop(Thread t)
{ {
for (;;) while (!t.isInterrupted())
{ {
BufferInfo info = new BufferInfo(); BufferInfo info = new BufferInfo();
int outIndex = videoDecoder.dequeueOutputBuffer(info, -1); int outIndex = videoDecoder.dequeueOutputBuffer(info, 100);
switch (outIndex) { switch (outIndex) {
case MediaCodec.INFO_OUTPUT_BUFFERS_CHANGED: case MediaCodec.INFO_OUTPUT_BUFFERS_CHANGED:
System.out.println("Output buffers changed"); System.out.println("Output buffers changed");
@ -300,16 +353,12 @@ public class NvVideoStream {
System.out.println("Output format changed"); System.out.println("Output format changed");
System.out.println("New output Format: " + videoDecoder.getOutputFormat()); System.out.println("New output Format: " + videoDecoder.getOutputFormat());
break; break;
case MediaCodec.INFO_TRY_AGAIN_LATER:
System.out.println("Try again later");
break;
default: default:
break; break;
} }
if (outIndex >= 0) { if (outIndex >= 0) {
videoDecoder.releaseOutputBuffer(outIndex, true); videoDecoder.releaseOutputBuffer(outIndex, true);
} }
} }
} }
} }

View File

@ -19,6 +19,13 @@ public class NvController {
out = s.getOutputStream(); out = s.getOutputStream();
} }
public void close()
{
try {
s.close();
} catch (IOException e) {}
}
public void sendControllerInput(short buttonFlags, byte leftTrigger, byte rightTrigger, public void sendControllerInput(short buttonFlags, byte leftTrigger, byte rightTrigger,
short leftStickX, short leftStickY, short rightStickX, short rightStickY) throws IOException short leftStickX, short leftStickY, short rightStickX, short rightStickY) throws IOException
{ {