From 2d5083179c28c67f020b0b0e8dd156f72aa3710c Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 17 Feb 2014 16:14:03 -0500 Subject: [PATCH] Revert "Remove depacketizer thread" This reverts commit a2a4463c0b684fa54212fe497ac2a8931ebd8821. --- .../nvstream/av/audio/AudioStream.java | 45 ++++++++++++++++--- .../nvstream/av/video/VideoStream.java | 44 ++++++++++++++++-- 2 files changed, 79 insertions(+), 10 deletions(-) diff --git a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java index d1617dbc..fb7604e1 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java +++ b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java @@ -7,6 +7,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.util.LinkedList; +import java.util.concurrent.LinkedBlockingQueue; import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.av.ByteBufferDescriptor; @@ -18,6 +19,8 @@ public class AudioStream { public static final int RTP_RECV_BUFFER = 64 * 1024; + private LinkedBlockingQueue packets = new LinkedBlockingQueue(100); + private DatagramSocket rtp; private AudioDepacketizer depacketizer = new AudioDepacketizer(); @@ -74,6 +77,8 @@ public class AudioStream { startReceiveThread(); + startDepacketizerThread(); + startDecoderThread(); startUdpPingThread(); @@ -99,13 +104,39 @@ public class AudioStream { streamListener.streamInitialized(OpusDecoder.getChannelCount(), OpusDecoder.getSampleRate()); } + private void startDepacketizerThread() + { + // This thread lessens the work on the receive thread + // so it can spend more time waiting for data + Thread t = new Thread() { + @Override + public void run() { + while (!isInterrupted()) + { + RtpPacket packet; + + try { + packet = packets.take(); + } catch (InterruptedException e) { + connListener.connectionTerminated(e); + return; + } + + depacketizer.decodeInputData(packet); + } + } + }; + threads.add(t); + t.setName("Audio - Depacketizer"); + t.start(); + } + private void startDecoderThread() { // Decoder thread Thread t = new Thread() { @Override public void run() { - while (!isInterrupted()) { ByteBufferDescriptor samples; @@ -118,7 +149,6 @@ public class AudioStream { } streamListener.playDecodedAudio(samples.data, samples.offset, samples.length); - } } }; @@ -140,14 +170,17 @@ public class AudioStream { { try { rtp.receive(packet); - desc.length = packet.getLength(); - depacketizer.decodeInputData(new RtpPacket(desc)); - desc.reinitialize(new byte[1500], 0, 1500); - packet.setData(desc.data, desc.offset, desc.length); } catch (IOException e) { connListener.connectionTerminated(e); return; } + + // Give the packet to the depacketizer thread + desc.length = packet.getLength(); + if (packets.offer(new RtpPacket(desc))) { + desc.reinitialize(new byte[1500], 0, 1500); + packet.setData(desc.data, desc.offset, desc.length); + } } } }; diff --git a/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java b/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java index e69fe31a..7414e0ef 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java +++ b/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java @@ -9,6 +9,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.util.LinkedList; +import java.util.concurrent.LinkedBlockingQueue; import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.StreamConfiguration; @@ -25,6 +26,8 @@ public class VideoStream { public static final int FIRST_FRAME_TIMEOUT = 5000; public static final int RTP_RECV_BUFFER = 128 * 1024; + private LinkedBlockingQueue packets = new LinkedBlockingQueue(100); + private InetAddress host; private DatagramSocket rtp; private Socket firstFrameSocket; @@ -158,6 +161,9 @@ public class VideoStream { // early packets startReceiveThread(); + // Start the depacketizer thread to deal with the RTP data + startDepacketizerThread(); + // Start decoding the data we're receiving startDecoderThread(); @@ -194,6 +200,34 @@ public class VideoStream { t.start(); } + private void startDepacketizerThread() + { + // This thread lessens the work on the receive thread + // so it can spend more time waiting for data + Thread t = new Thread() { + @Override + public void run() { + while (!isInterrupted()) + { + RtpPacket packet; + + try { + packet = packets.take(); + } catch (InterruptedException e) { + listener.connectionTerminated(e); + return; + } + + // !!! We no longer own the data buffer at this point !!! + depacketizer.addInputData(packet); + } + } + }; + threads.add(t); + t.setName("Video - Depacketizer"); + t.start(); + } + private void startReceiveThread() { // Receive thread @@ -207,15 +241,17 @@ public class VideoStream { { try { rtp.receive(packet); - desc.length = packet.getLength(); - depacketizer.addInputData(new RtpPacket(desc)); - desc.reinitialize(new byte[1500], 0, 1500); - packet.setData(desc.data, desc.offset, desc.length); } catch (IOException e) { listener.connectionTerminated(e); return; } + // Give the packet to the depacketizer thread + desc.length = packet.getLength(); + if (packets.offer(new RtpPacket(desc))) { + desc.reinitialize(new byte[1500], 0, 1500); + packet.setData(desc.data, desc.offset, desc.length); + } } } };