Prevent a decoder stall from causing corruption of queued decode units

This commit is contained in:
Cameron Gutman 2014-10-10 21:15:50 -07:00
parent 4b93207def
commit abc7f135f3
6 changed files with 83 additions and 8 deletions

View File

@ -1,7 +1,10 @@
package com.limelight.nvstream.av; package com.limelight.nvstream.av;
import java.util.HashSet;
import java.util.List; import java.util.List;
import com.limelight.nvstream.av.video.VideoPacket;
public class DecodeUnit { public class DecodeUnit {
public static final int TYPE_UNKNOWN = 0; public static final int TYPE_UNKNOWN = 0;
public static final int TYPE_H264 = 1; public static final int TYPE_H264 = 1;
@ -16,11 +19,13 @@ public class DecodeUnit {
private int frameNumber; private int frameNumber;
private long receiveTimestamp; private long receiveTimestamp;
private int flags; private int flags;
private HashSet<VideoPacket> backingPackets;
public DecodeUnit() { public DecodeUnit() {
} }
public void initialize(int type, List<ByteBufferDescriptor> bufferList, int dataLength, int frameNumber, long receiveTimestamp, int flags) public void initialize(int type, List<ByteBufferDescriptor> bufferList, int dataLength,
int frameNumber, long receiveTimestamp, int flags, HashSet<VideoPacket> backingPackets)
{ {
this.type = type; this.type = type;
this.bufferList = bufferList; this.bufferList = bufferList;
@ -28,6 +33,7 @@ public class DecodeUnit {
this.frameNumber = frameNumber; this.frameNumber = frameNumber;
this.receiveTimestamp = receiveTimestamp; this.receiveTimestamp = receiveTimestamp;
this.flags = flags; this.flags = flags;
this.backingPackets = backingPackets;
} }
public int getType() public int getType()
@ -59,4 +65,14 @@ public class DecodeUnit {
{ {
return flags; return flags;
} }
// Internal use only
public HashSet<VideoPacket> getBackingPackets() {
return backingPackets;
}
// Internal use only
public void clearBackingPackets() {
backingPackets.clear();
}
} }

View File

@ -6,8 +6,12 @@ public class PopulatedBufferList<T> {
private ArrayBlockingQueue<T> populatedList; private ArrayBlockingQueue<T> populatedList;
private ArrayBlockingQueue<T> freeList; private ArrayBlockingQueue<T> freeList;
private BufferFactory factory;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public PopulatedBufferList(int maxQueueSize, BufferFactory factory) { public PopulatedBufferList(int maxQueueSize, BufferFactory factory) {
this.factory = factory;
this.populatedList = new ArrayBlockingQueue<T>(maxQueueSize, false); this.populatedList = new ArrayBlockingQueue<T>(maxQueueSize, false);
this.freeList = new ArrayBlockingQueue<T>(maxQueueSize, false); this.freeList = new ArrayBlockingQueue<T>(maxQueueSize, false);
@ -25,13 +29,14 @@ public class PopulatedBufferList<T> {
} }
public void freePopulatedObject(T object) { public void freePopulatedObject(T object) {
factory.cleanupObject(object);
freeList.add(object); freeList.add(object);
} }
public void clearPopulatedObjects() { public void clearPopulatedObjects() {
T object; T object;
while ((object = populatedList.poll()) != null) { while ((object = populatedList.poll()) != null) {
freeList.add(object); freePopulatedObject(object);
} }
} }
@ -49,5 +54,6 @@ public class PopulatedBufferList<T> {
public static interface BufferFactory { public static interface BufferFactory {
public Object createFreeBuffer(); public Object createFreeBuffer();
public void cleanupObject(Object o);
} }
} }

View File

@ -32,6 +32,10 @@ public class AudioDepacketizer {
public Object createFreeBuffer() { public Object createFreeBuffer() {
return new ByteBufferDescriptor(new byte[OpusDecoder.getMaxOutputShorts()*2], 0, OpusDecoder.getMaxOutputShorts()*2); return new ByteBufferDescriptor(new byte[OpusDecoder.getMaxOutputShorts()*2], 0, OpusDecoder.getMaxOutputShorts()*2);
} }
public void cleanupObject(Object o) {
// Nothing to do
}
}); });
} }
} }

View File

@ -1,5 +1,6 @@
package com.limelight.nvstream.av.video; package com.limelight.nvstream.av.video;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import com.limelight.LimeLog; import com.limelight.LimeLog;
@ -14,6 +15,7 @@ public class VideoDepacketizer {
// Current frame state // Current frame state
private LinkedList<ByteBufferDescriptor> avcFrameDataChain = null; private LinkedList<ByteBufferDescriptor> avcFrameDataChain = null;
private int avcFrameDataLength = 0; private int avcFrameDataLength = 0;
private HashSet<VideoPacket> packetSet = null;
// Sequencing state // Sequencing state
private int lastPacketInStream = 0; private int lastPacketInStream = 0;
@ -43,6 +45,16 @@ public class VideoDepacketizer {
public Object createFreeBuffer() { public Object createFreeBuffer() {
return new DecodeUnit(); return new DecodeUnit();
} }
public void cleanupObject(Object o) {
DecodeUnit du = (DecodeUnit) o;
// Disassociate video packets from this DU
for (VideoPacket pkt : du.getBackingPackets()) {
pkt.decodeUnitRefCount.decrementAndGet();
}
du.clearBackingPackets();
}
}); });
} }
@ -54,6 +66,13 @@ public class VideoDepacketizer {
private void cleanupAvcFrameState() private void cleanupAvcFrameState()
{ {
if (packetSet != null) {
for (VideoPacket pkt : packetSet) {
pkt.decodeUnitRefCount.decrementAndGet();
}
packetSet = null;
}
avcFrameDataChain = null; avcFrameDataChain = null;
avcFrameDataLength = 0; avcFrameDataLength = 0;
} }
@ -95,7 +114,10 @@ public class VideoDepacketizer {
// Initialize the free DU // Initialize the free DU
du.initialize(DecodeUnit.TYPE_H264, avcFrameDataChain, du.initialize(DecodeUnit.TYPE_H264, avcFrameDataChain,
avcFrameDataLength, frameNumber, frameStartTime, flags); avcFrameDataLength, frameNumber, frameStartTime, flags, packetSet);
// Packets now owned by the DU
packetSet = null;
controlListener.connectionReceivedFrame(frameNumber); controlListener.connectionReceivedFrame(frameNumber);
@ -136,6 +158,7 @@ public class VideoDepacketizer {
// Setup state for the new NAL // Setup state for the new NAL
avcFrameDataChain = new LinkedList<ByteBufferDescriptor>(); avcFrameDataChain = new LinkedList<ByteBufferDescriptor>();
avcFrameDataLength = 0; avcFrameDataLength = 0;
packetSet = new HashSet<VideoPacket>();
if (cachedSpecialDesc.data[cachedSpecialDesc.offset+cachedSpecialDesc.length] == 0x65) { if (cachedSpecialDesc.data[cachedSpecialDesc.offset+cachedSpecialDesc.length] == 0x65) {
// This is the NALU code for I-frame data // This is the NALU code for I-frame data
@ -191,6 +214,10 @@ public class VideoDepacketizer {
{ {
ByteBufferDescriptor data = new ByteBufferDescriptor(location.data, start, location.offset-start); ByteBufferDescriptor data = new ByteBufferDescriptor(location.data, start, location.offset-start);
if (packetSet.add(packet)) {
packet.decodeUnitRefCount.incrementAndGet();
}
// Add a buffer descriptor describing the NAL data in this packet // Add a buffer descriptor describing the NAL data in this packet
avcFrameDataChain.add(data); avcFrameDataChain.add(data);
avcFrameDataLength += location.offset-start; avcFrameDataLength += location.offset-start;
@ -205,11 +232,17 @@ public class VideoDepacketizer {
frameStartTime = System.currentTimeMillis(); frameStartTime = System.currentTimeMillis();
avcFrameDataChain = new LinkedList<ByteBufferDescriptor>(); avcFrameDataChain = new LinkedList<ByteBufferDescriptor>();
avcFrameDataLength = 0; avcFrameDataLength = 0;
packetSet = new HashSet<VideoPacket>();
} }
// Add the payload data to the chain // Add the payload data to the chain
avcFrameDataChain.add(new ByteBufferDescriptor(location)); avcFrameDataChain.add(new ByteBufferDescriptor(location));
avcFrameDataLength += location.length; avcFrameDataLength += location.length;
// The receive thread can't use this until we're done with it
if (packetSet.add(packet)) {
packet.decodeUnitRefCount.incrementAndGet();
}
} }
private static boolean isFirstPacket(int flags) { private static boolean isFirstPacket(int flags) {

View File

@ -2,6 +2,7 @@ package com.limelight.nvstream.av.video;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicInteger;
import com.limelight.nvstream.av.ByteBufferDescriptor; import com.limelight.nvstream.av.ByteBufferDescriptor;
import com.limelight.nvstream.av.RtpPacket; import com.limelight.nvstream.av.RtpPacket;
@ -19,6 +20,8 @@ public class VideoPacket implements RtpPacketFields {
private short rtpSequenceNumber; private short rtpSequenceNumber;
AtomicInteger decodeUnitRefCount = new AtomicInteger();
public static final int FLAG_CONTAINS_PIC_DATA = 0x1; public static final int FLAG_CONTAINS_PIC_DATA = 0x1;
public static final int FLAG_EOF = 0x2; public static final int FLAG_EOF = 0x2;
public static final int FLAG_SOF = 0x4; public static final int FLAG_SOF = 0x4;

View File

@ -10,6 +10,7 @@ import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.LinkedList; import java.util.LinkedList;
import com.limelight.LimeLog;
import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.NvConnectionListener;
import com.limelight.nvstream.StreamConfiguration; import com.limelight.nvstream.StreamConfiguration;
import com.limelight.nvstream.av.ConnectionStatusListener; import com.limelight.nvstream.av.ConnectionStatusListener;
@ -203,6 +204,7 @@ public class VideoStream {
byte[] buffer; byte[] buffer;
DatagramPacket packet = new DatagramPacket(new byte[1], 1); // Placeholder array DatagramPacket packet = new DatagramPacket(new byte[1], 1); // Placeholder array
int iterationStart;
while (!isInterrupted()) while (!isInterrupted())
{ {
try { try {
@ -228,8 +230,19 @@ public class VideoStream {
} }
} }
// The ring is large enough to account for the maximum queued packets // Go to the next free element in the ring
ringIndex = (ringIndex + 1) % VIDEO_RING_SIZE; iterationStart = ringIndex;
do {
ringIndex = (ringIndex + 1) % VIDEO_RING_SIZE;
if (ringIndex == iterationStart) {
// Reinitialize the video ring since they're all being used
LimeLog.warning("Packet ring wrapped around!");
for (int i = 0; i < VIDEO_RING_SIZE; i++) {
ring[i] = new VideoPacket(new byte[requiredBufferSize]);
}
break;
}
} while (ring[ringIndex].decodeUnitRefCount.get() != 0);
} catch (IOException e) { } catch (IOException e) {
listener.connectionTerminated(e); listener.connectionTerminated(e);
return; return;