From 0a445b9add121bbe86ce63b8841d613a5621b7ca Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sun, 10 Nov 2013 01:30:55 -0500 Subject: [PATCH] Properly terminate threads when the game activity exits. --- src/com/limelight/Game.java | 8 +- src/com/limelight/nvstream/NvAudioStream.java | 76 +++++++--- src/com/limelight/nvstream/NvConnection.java | 34 +++-- src/com/limelight/nvstream/NvControl.java | 65 +++++--- src/com/limelight/nvstream/NvVideoStream.java | 141 ++++++++++++------ .../nvstream/input/NvController.java | 7 + 6 files changed, 228 insertions(+), 103 deletions(-) diff --git a/src/com/limelight/Game.java b/src/com/limelight/Game.java index a799374e..fbdc13b4 100644 --- a/src/com/limelight/Game.java +++ b/src/com/limelight/Game.java @@ -64,9 +64,13 @@ public class Game extends Activity implements OnGenericMotionListener, OnTouchLi @Override public void onPause() { + finish(); super.onPause(); - - System.exit(0); + } + + public void onDestroy() { + conn.stop(); + super.onDestroy(); } @Override diff --git a/src/com/limelight/nvstream/NvAudioStream.java b/src/com/limelight/nvstream/NvAudioStream.java index 733b4e2f..3d653f30 100644 --- a/src/com/limelight/nvstream/NvAudioStream.java +++ b/src/com/limelight/nvstream/NvAudioStream.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; +import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import jlibrtp.Participant; @@ -33,8 +34,40 @@ public class NvAudioStream { private AvAudioDepacketizer depacketizer = new AvAudioDepacketizer(); + private LinkedList threads = new LinkedList(); + private AvByteBufferPool pool = new AvByteBufferPool(1500); + private boolean aborting = false; + + public void abort() + { + if (aborting) { + return; + } + + aborting = true; + + for (Thread t : threads) { + t.interrupt(); + } + + // Close the socket to interrupt the receive thread + rtp.close(); + + // Wait for threads to terminate + for (Thread t : threads) { + try { + t.join(); + } catch (InterruptedException e) { } + } + + //session.endSession(); + track.release(); + + threads.clear(); + } + public void startAudioStream(final String host) { new Thread(new Runnable() { @@ -96,7 +129,7 @@ public class NvAudioStream { System.err.println("Unsupported channel count"); return; } - + track = new AudioTrack(AudioManager.STREAM_MUSIC, OpusDecoder.getSampleRate(), channelConfig, @@ -111,17 +144,17 @@ public class NvAudioStream { { // This thread lessens the work on the receive thread // so it can spend more time waiting for data - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { - for (;;) + while (!isInterrupted()) { AvRtpPacket packet; try { packet = packets.take(); } catch (InterruptedException e) { - e.printStackTrace(); + abort(); return; } @@ -129,23 +162,25 @@ public class NvAudioStream { depacketizer.decodeInputData(packet); } } - }).start(); + }; + threads.add(t); + t.start(); } private void startDecoderThread() { // Decoder thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { - for (;;) + while (!isInterrupted()) { AvShortBufferDescriptor samples; try { samples = depacketizer.getNextDecodedData(); } catch (InterruptedException e) { - e.printStackTrace(); + abort(); return; } @@ -154,24 +189,26 @@ public class NvAudioStream { depacketizer.releaseBuffer(samples); } } - }).start(); + }; + threads.add(t); + t.start(); } private void startReceiveThread() { // Receive thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500); AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0); - for (;;) + while (!isInterrupted()) { try { rtp.receive(packet); } catch (IOException e) { - e.printStackTrace(); + abort(); return; } @@ -186,13 +223,15 @@ public class NvAudioStream { packet.setData(pool.allocate(), 0, 1500); } } - }).start(); + }; + threads.add(t); + t.start(); } private void startUdpPingThread() { // Ping thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { // PING in ASCII @@ -202,17 +241,20 @@ public class NvAudioStream { session.payloadType(127); // Send PING every 100 ms - for (;;) + while (!isInterrupted()) { session.sendData(pingPacket); try { Thread.sleep(100); } catch (InterruptedException e) { - break; + abort(); + return; } } } - }).start(); + }; + threads.add(t); + t.start(); } } diff --git a/src/com/limelight/nvstream/NvConnection.java b/src/com/limelight/nvstream/NvConnection.java index e20f2206..23beb487 100644 --- a/src/com/limelight/nvstream/NvConnection.java +++ b/src/com/limelight/nvstream/NvConnection.java @@ -2,7 +2,6 @@ package com.limelight.nvstream; import java.io.IOException; import java.net.InetAddress; -import java.net.InterfaceAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.net.UnknownHostException; @@ -17,7 +16,6 @@ import android.app.Activity; import android.view.Surface; import android.widget.Toast; -import com.limelight.Game; import com.limelight.nvstream.input.NvController; public class NvConnection { @@ -27,6 +25,8 @@ public class NvConnection { private NvControl controlStream; private NvController inputStream; private Surface video; + private NvVideoStream videoStream = new NvVideoStream(); + private NvAudioStream audioStream = new NvAudioStream(); private ThreadPoolExecutor threadPool; @@ -64,6 +64,20 @@ public class NvConnection { return null; } + + public void stop() + { + videoStream.abort(); + audioStream.abort(); + + if (controlStream != null) { + controlStream.abort(); + } + + if (inputStream != null) { + inputStream.close(); + } + } public void start() { @@ -81,8 +95,8 @@ public class NvConnection { try { startSteamBigPicture(); performHandshake(); - startVideo(video); - startAudio(); + videoStream.startVideoStream(host, video); + audioStream.startAudioStream(host); beginControlStream(); controlStream.startJitterPackets(); startController(); @@ -97,16 +111,6 @@ public class NvConnection { }).start(); } - public void startVideo(Surface surface) - { - new NvVideoStream().startVideoStream(host, surface); - } - - public void startAudio() - { - new NvAudioStream().startAudioStream(host); - } - public void sendMouseMove(final short deltaX, final short deltaY) { if (inputStream == null) @@ -224,7 +228,7 @@ public class NvConnection { controlStream = new NvControl(host); System.out.println("Starting control"); - controlStream.beginControl(); + controlStream.start(); } private void startController() throws UnknownHostException, IOException diff --git a/src/com/limelight/nvstream/NvControl.java b/src/com/limelight/nvstream/NvControl.java index e506700a..f9cb81f5 100644 --- a/src/com/limelight/nvstream/NvControl.java +++ b/src/com/limelight/nvstream/NvControl.java @@ -161,6 +161,10 @@ public class NvControl { private InputStream in; private OutputStream out; + private Thread heartbeatThread; + private Thread jitterThread; + private boolean aborting = false; + public NvControl(String host) throws UnknownHostException, IOException { s = new Socket(host, PORT); @@ -192,7 +196,28 @@ public class NvControl { sendPacket(new NvCtlPacket(PTYPE_JITTER, PPAYLEN_JITTER, bb.array())); } - public void beginControl() throws IOException + public void abort() + { + if (aborting) { + return; + } + + aborting = true; + + if (jitterThread != null) { + jitterThread.interrupt(); + } + + if (heartbeatThread != null) { + heartbeatThread.interrupt(); + } + + try { + s.close(); + } catch (IOException e) {} + } + + public void start() throws IOException { System.out.println("CTL: Sending hello"); sendHello(); @@ -206,55 +231,54 @@ public class NvControl { //send1404(); System.out.println("CTL: Launching heartbeat thread"); - new Thread(new Runnable() { + heartbeatThread = new Thread() { @Override public void run() { - for (;;) + while (!isInterrupted()) { try { sendHeartbeat(); } catch (IOException e1) { - e1.printStackTrace(); - break; + abort(); + return; } try { Thread.sleep(3000); } catch (InterruptedException e) { - break; + abort(); + return; } } } - }).start(); + }; + heartbeatThread.start(); } public void startJitterPackets() { - new Thread(new Runnable() { + jitterThread = new Thread() { @Override public void run() { - for (;;) + while (!isInterrupted()) { try { sendJitter(); } catch (IOException e1) { - e1.printStackTrace(); - break; + abort(); + return; } try { Thread.sleep(100); } catch (InterruptedException e) { - break; + abort(); + return; } } } - }).start(); - } - - public void endControl() throws IOException - { - s.close(); + }; + jitterThread.start(); } private NvControl.NvCtlResponse send1405AndGetResponse() throws IOException @@ -262,11 +286,6 @@ public class NvControl { return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405)); } - private void send1404() throws IOException - { - sendPacket(new NvCtlPacket(PTYPE_1404, PPAYLEN_1404)); - } - private void sendHello() throws IOException { sendPacket(new NvCtlPacket(PTYPE_HELLO, PPAYLEN_HELLO, PPAYLOAD_HELLO)); diff --git a/src/com/limelight/nvstream/NvVideoStream.java b/src/com/limelight/nvstream/NvVideoStream.java index 2b1a8786..9c5b018d 100644 --- a/src/com/limelight/nvstream/NvVideoStream.java +++ b/src/com/limelight/nvstream/NvVideoStream.java @@ -8,6 +8,7 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import com.limelight.nvstream.av.AvByteBufferDescriptor; @@ -37,10 +38,43 @@ public class NvVideoStream { private RTPSession session; private DatagramSocket rtp; + private LinkedList threads = new LinkedList(); + private AvByteBufferPool pool = new AvByteBufferPool(1500); private AvVideoDepacketizer depacketizer = new AvVideoDepacketizer(); + private boolean aborting = false; + + public void abort() + { + if (aborting) { + return; + } + + aborting = true; + + // Interrupt threads + for (Thread t : threads) { + t.interrupt(); + } + + // Close the socket to interrupt the receive thread + rtp.close(); + + // Wait for threads to terminate + for (Thread t : threads) { + try { + t.join(); + } catch (InterruptedException e) { } + } + + //session.endSession(); + videoDecoder.release(); + + threads.clear(); + } + private InputStream openFirstFrameInputStream(String host) throws UnknownHostException, IOException { Socket s = new Socket(host, FIRST_FRAME_PORT); @@ -103,9 +137,9 @@ public class NvVideoStream { } public void startVideoStream(final String host, final Surface surface) - { - new Thread(new Runnable() { - + { + // This thread becomes the output display thread + Thread t = new Thread() { @Override public void run() { // Setup the decoder context @@ -137,29 +171,32 @@ public class NvVideoStream { readFirstFrame(host); } catch (IOException e2) { e2.printStackTrace(); + abort(); return; } // Render the frames that are coming out of the decoder - outputDisplayLoop(); + outputDisplayLoop(this); } - }).start(); + }; + threads.add(t); + t.start(); } private void startDecoderThread() { // Decoder thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { // Read the decode units generated from the RTP stream - for (;;) + while (!isInterrupted()) { AvDecodeUnit du; try { du = depacketizer.getNextDecodeUnit(); } catch (InterruptedException e) { - e.printStackTrace(); + abort(); return; } @@ -167,56 +204,65 @@ public class NvVideoStream { { case AvDecodeUnit.TYPE_H264: { - int inputIndex = videoDecoder.dequeueInputBuffer(-1); - if (inputIndex >= 0) + // Wait for an input buffer or thread termination + while (!isInterrupted()) { - ByteBuffer buf = videoDecoderInputBuffers[inputIndex]; - - // Clear old input data - buf.clear(); - - // Copy data from our buffer list into the input buffer - for (AvByteBufferDescriptor desc : du.getBufferList()) + int inputIndex = videoDecoder.dequeueInputBuffer(100); + if (inputIndex >= 0) { - buf.put(desc.data, desc.offset, desc.length); + ByteBuffer buf = videoDecoderInputBuffers[inputIndex]; - // Release the buffer back to the buffer pool - pool.free(desc.data); - } + // Clear old input data + buf.clear(); + + // Copy data from our buffer list into the input buffer + for (AvByteBufferDescriptor desc : du.getBufferList()) + { + buf.put(desc.data, desc.offset, desc.length); + + // Release the buffer back to the buffer pool + pool.free(desc.data); + } - videoDecoder.queueInputBuffer(inputIndex, - 0, du.getDataLength(), - 0, du.getFlags()); + videoDecoder.queueInputBuffer(inputIndex, + 0, du.getDataLength(), + 0, du.getFlags()); + + break; + } } } break; default: { - System.out.println("Unknown decode unit type"); + System.err.println("Unknown decode unit type"); + abort(); + return; } - break; } } } - }).start(); + }; + threads.add(t); + t.start(); } private void startDepacketizerThread() { // This thread lessens the work on the receive thread // so it can spend more time waiting for data - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { - for (;;) + while (!isInterrupted()) { AvRtpPacket packet; try { packet = packets.take(); } catch (InterruptedException e) { - e.printStackTrace(); + abort(); return; } @@ -224,24 +270,26 @@ public class NvVideoStream { depacketizer.addInputData(packet); } } - }).start(); + }; + threads.add(t); + t.start(); } private void startReceiveThread() { // Receive thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { DatagramPacket packet = new DatagramPacket(pool.allocate(), 1500); AvByteBufferDescriptor desc = new AvByteBufferDescriptor(null, 0, 0); - for (;;) + while (!isInterrupted()) { try { rtp.receive(packet); } catch (IOException e) { - e.printStackTrace(); + abort(); return; } @@ -256,13 +304,15 @@ public class NvVideoStream { packet.setData(pool.allocate(), 0, 1500); } } - }).start(); + }; + threads.add(t); + t.start(); } private void startUdpPingThread() { // Ping thread - new Thread(new Runnable() { + Thread t = new Thread() { @Override public void run() { // PING in ASCII @@ -272,26 +322,29 @@ public class NvVideoStream { session.payloadType(127); // Send PING every 100 ms - for (;;) + while (!isInterrupted()) { session.sendData(pingPacket); try { Thread.sleep(100); } catch (InterruptedException e) { - break; + abort(); + return; } } } - }).start(); + }; + threads.add(t); + t.start(); } - private void outputDisplayLoop() + private void outputDisplayLoop(Thread t) { - for (;;) + while (!t.isInterrupted()) { BufferInfo info = new BufferInfo(); - int outIndex = videoDecoder.dequeueOutputBuffer(info, -1); + int outIndex = videoDecoder.dequeueOutputBuffer(info, 100); switch (outIndex) { case MediaCodec.INFO_OUTPUT_BUFFERS_CHANGED: System.out.println("Output buffers changed"); @@ -300,16 +353,12 @@ public class NvVideoStream { System.out.println("Output format changed"); System.out.println("New output Format: " + videoDecoder.getOutputFormat()); break; - case MediaCodec.INFO_TRY_AGAIN_LATER: - System.out.println("Try again later"); - break; default: break; } if (outIndex >= 0) { videoDecoder.releaseOutputBuffer(outIndex, true); } - } } } diff --git a/src/com/limelight/nvstream/input/NvController.java b/src/com/limelight/nvstream/input/NvController.java index f4616652..f762f1da 100644 --- a/src/com/limelight/nvstream/input/NvController.java +++ b/src/com/limelight/nvstream/input/NvController.java @@ -19,6 +19,13 @@ public class NvController { out = s.getOutputStream(); } + public void close() + { + try { + s.close(); + } catch (IOException e) {} + } + public void sendControllerInput(short buttonFlags, byte leftTrigger, byte rightTrigger, short leftStickX, short leftStickY, short rightStickX, short rightStickY) throws IOException {