diff --git a/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java b/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java index 207af908..72e9e604 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java +++ b/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java @@ -17,7 +17,10 @@ public class DecodeUnit { private long receiveTimestamp; private int flags; - public DecodeUnit(int type, List bufferList, int dataLength, int frameNumber, long receiveTimestamp, int flags) + public DecodeUnit() { + } + + public void initialize(int type, List bufferList, int dataLength, int frameNumber, long receiveTimestamp, int flags) { this.type = type; this.bufferList = bufferList; diff --git a/moonlight-common/src/com/limelight/nvstream/av/PopulatedBufferList.java b/moonlight-common/src/com/limelight/nvstream/av/PopulatedBufferList.java new file mode 100644 index 00000000..5c7a3ace --- /dev/null +++ b/moonlight-common/src/com/limelight/nvstream/av/PopulatedBufferList.java @@ -0,0 +1,53 @@ +package com.limelight.nvstream.av; + +import java.util.concurrent.ArrayBlockingQueue; + +public class PopulatedBufferList { + private ArrayBlockingQueue populatedList; + private ArrayBlockingQueue freeList; + + @SuppressWarnings("unchecked") + public PopulatedBufferList(int maxQueueSize, BufferFactory factory) { + this.populatedList = new ArrayBlockingQueue(maxQueueSize, false); + this.freeList = new ArrayBlockingQueue(maxQueueSize, false); + + for (int i = 0; i < maxQueueSize; i++) { + freeList.add((T) factory.createFreeBuffer()); + } + } + + public T pollFreeObject() { + return freeList.poll(); + } + + public void addPopulatedObject(T object) { + populatedList.add(object); + } + + public void freePopulatedObject(T object) { + freeList.add(object); + } + + public void clearPopulatedObjects() { + T object; + while ((object = populatedList.poll()) != null) { + freeList.add(object); + } + } + + public T pollPopulatedObject() { + return populatedList.poll(); + } + + public T peekPopulatedObject() { + return populatedList.peek(); + } + + public T takePopulatedObject() throws InterruptedException { + return populatedList.take(); + } + + public static interface BufferFactory { + public Object createFreeBuffer(); + } +} diff --git a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java index d8a6a707..72396f64 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java +++ b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java @@ -1,25 +1,19 @@ package com.limelight.nvstream.av.audio; -import java.util.concurrent.LinkedBlockingQueue; - import com.limelight.LimeLog; import com.limelight.nvstream.av.ByteBufferDescriptor; +import com.limelight.nvstream.av.PopulatedBufferList; import com.limelight.nvstream.av.RtpPacket; public class AudioDepacketizer { private static final int DU_LIMIT = 15; - private LinkedBlockingQueue decodedUnits = - new LinkedBlockingQueue(DU_LIMIT); + private PopulatedBufferList decodedUnits; // Direct submit state private AudioRenderer directSubmitRenderer; private byte[] directSubmitData; - // Non-direct submit state - private byte[][] pcmRing; - private int ringIndex; - // Cached objects private ByteBufferDescriptor cachedDesc = new ByteBufferDescriptor(null, 0, 0); @@ -33,22 +27,35 @@ public class AudioDepacketizer { this.directSubmitData = new byte[OpusDecoder.getMaxOutputShorts()*2]; } else { - pcmRing = new byte[DU_LIMIT][OpusDecoder.getMaxOutputShorts()*2]; - } + decodedUnits = new PopulatedBufferList(DU_LIMIT, new PopulatedBufferList.BufferFactory() { + public Object createFreeBuffer() { + return new ByteBufferDescriptor(new byte[OpusDecoder.getMaxOutputShorts()*2], 0, OpusDecoder.getMaxOutputShorts()*2); + } + }); + } } private void decodeData(byte[] data, int off, int len) { // Submit this data to the decoder int decodeLen; - byte[] pcmData; + ByteBufferDescriptor bb; if (directSubmitData != null) { - pcmData = null; + bb = null; decodeLen = OpusDecoder.decode(data, off, len, directSubmitData); } else { - pcmData = pcmRing[ringIndex]; - decodeLen = OpusDecoder.decode(data, off, len, pcmData); + bb = decodedUnits.pollFreeObject(); + if (bb == null) { + LimeLog.warning("Audio player too slow! Forced to drop decoded samples"); + decodedUnits.clearPopulatedObjects(); + bb = decodedUnits.pollFreeObject(); + if (bb == null) { + LimeLog.severe("Audio player is leaking buffers!"); + return; + } + } + decodeLen = OpusDecoder.decode(data, off, len, bb.data); } if (decodeLen > 0) { @@ -58,14 +65,9 @@ public class AudioDepacketizer { if (directSubmitRenderer != null) { directSubmitRenderer.playDecodedAudio(directSubmitData, 0, decodeLen); } - else if (!decodedUnits.offer(new ByteBufferDescriptor(pcmData, 0, decodeLen))) { - LimeLog.warning("Audio player too slow! Forced to drop decoded samples"); - // Clear out the queue - decodedUnits.clear(); - } else { - // Frame successfully submitted for playback - ringIndex = (ringIndex + 1) % DU_LIMIT; + bb.length = decodeLen; + decodedUnits.addPopulatedObject(bb); } } } @@ -95,8 +97,11 @@ public class AudioDepacketizer { decodeData(cachedDesc.data, cachedDesc.offset, cachedDesc.length); } - public ByteBufferDescriptor getNextDecodedData() throws InterruptedException - { - return decodedUnits.take(); + public ByteBufferDescriptor getNextDecodedData() throws InterruptedException { + return decodedUnits.takePopulatedObject(); + } + + public void freeDecodedData(ByteBufferDescriptor data) { + decodedUnits.freePopulatedObject(data); } } diff --git a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java index 5aa3fe15..f43969a0 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java +++ b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java @@ -137,7 +137,7 @@ public class AudioStream { } streamListener.playDecodedAudio(samples.data, samples.offset, samples.length); - + depacketizer.freeDecodedData(samples); } } }; diff --git a/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java b/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java index bd672043..af07c3ca 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java +++ b/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java @@ -1,12 +1,12 @@ package com.limelight.nvstream.av.video; import java.util.LinkedList; -import java.util.concurrent.LinkedBlockingQueue; import com.limelight.LimeLog; import com.limelight.nvstream.av.ByteBufferDescriptor; import com.limelight.nvstream.av.DecodeUnit; import com.limelight.nvstream.av.ConnectionStatusListener; +import com.limelight.nvstream.av.PopulatedBufferList; public class VideoDepacketizer { @@ -32,12 +32,18 @@ public class VideoDepacketizer { private int nominalPacketSize; private static final int DU_LIMIT = 15; - private LinkedBlockingQueue decodedUnits = new LinkedBlockingQueue(DU_LIMIT); + private PopulatedBufferList decodedUnits; public VideoDepacketizer(ConnectionStatusListener controlListener, int nominalPacketSize) { this.controlListener = controlListener; this.nominalPacketSize = nominalPacketSize; + + decodedUnits = new PopulatedBufferList(DU_LIMIT, new PopulatedBufferList.BufferFactory() { + public Object createFreeBuffer() { + return new DecodeUnit(); + } + }); } private void clearAvcFrameState() @@ -66,22 +72,32 @@ public class VideoDepacketizer { } // Construct the H264 decode unit - DecodeUnit du = new DecodeUnit(DecodeUnit.TYPE_H264, avcFrameDataChain, - avcFrameDataLength, frameNumber, frameStartTime, flags); - if (!decodedUnits.offer(du)) { + DecodeUnit du = decodedUnits.pollFreeObject(); + if (du == null) { LimeLog.warning("Video decoder is too slow! Forced to drop decode units"); - + // Invalidate all frames from the start of the DU queue - controlListener.connectionSinkTooSlow(decodedUnits.remove().getFrameNumber(), frameNumber); + controlListener.connectionSinkTooSlow(decodedUnits.pollPopulatedObject().getFrameNumber(), frameNumber); // Remove existing frames - decodedUnits.clear(); + decodedUnits.clearPopulatedObjects(); - // Add this frame - decodedUnits.add(du); + // Try again + du = decodedUnits.pollFreeObject(); + if (du == null) { + LimeLog.warning("Video decoder is leaking decode units!"); + return; + } } + // Initialize the free DU + du.initialize(DecodeUnit.TYPE_H264, avcFrameDataChain, + avcFrameDataLength, frameNumber, frameStartTime, flags); + controlListener.connectionReceivedFrame(frameNumber); + + // Submit the DU to the consumer + decodedUnits.addPopulatedObject(du); // Clear old state clearAvcFrameState(); @@ -335,12 +351,17 @@ public class VideoDepacketizer { public DecodeUnit takeNextDecodeUnit() throws InterruptedException { - return decodedUnits.take(); + return decodedUnits.takePopulatedObject(); } public DecodeUnit pollNextDecodeUnit() { - return decodedUnits.poll(); + return decodedUnits.pollPopulatedObject(); + } + + public void freeDecodeUnit(DecodeUnit du) + { + decodedUnits.freePopulatedObject(du); } }