Increase resilience to packet loss. IDR frames are no longer requested if error correction data was lost. A maximum of one IDR frame is requested per corrupt frame. Error correction data is used to recover from the loss of a single-packet frame.

This commit is contained in:
Cameron Gutman 2014-02-24 12:54:03 -05:00
parent ccc3eeebe8
commit bc2ca0b386
4 changed files with 142 additions and 75 deletions

View File

@ -3,7 +3,7 @@ package com.limelight.nvstream.av;
public interface ConnectionStatusListener { public interface ConnectionStatusListener {
public void connectionTerminated(); public void connectionTerminated();
public void connectionDetectedPacketLoss(); public void connectionDetectedFrameLoss(int firstLostFrame, int lastLostFrame);
public void connectionSinkTooSlow(); public void connectionSinkTooSlow(int firstLostFrame, int lastLostFrame);
} }

View File

@ -11,13 +11,15 @@ public class DecodeUnit {
private List<ByteBufferDescriptor> bufferList; private List<ByteBufferDescriptor> bufferList;
private int dataLength; private int dataLength;
private int flags; private int flags;
private int frameNumber;
public DecodeUnit(int type, List<ByteBufferDescriptor> bufferList, int dataLength, int flags) public DecodeUnit(int type, List<ByteBufferDescriptor> bufferList, int dataLength, int flags, int frameNumber)
{ {
this.type = type; this.type = type;
this.bufferList = bufferList; this.bufferList = bufferList;
this.dataLength = dataLength; this.dataLength = dataLength;
this.flags = flags; this.flags = flags;
this.frameNumber = frameNumber;
} }
public int getType() public int getType()
@ -39,4 +41,9 @@ public class DecodeUnit {
{ {
return dataLength; return dataLength;
} }
public int getFrameNumber()
{
return frameNumber;
}
} }

View File

@ -10,13 +10,16 @@ import com.limelight.nvstream.av.ConnectionStatusListener;
public class VideoDepacketizer { public class VideoDepacketizer {
// Current NAL state // Current frame state
private LinkedList<ByteBufferDescriptor> avcNalDataChain = null; private LinkedList<ByteBufferDescriptor> avcFrameDataChain = null;
private int avcNalDataLength = 0; private int avcFrameDataLength = 0;
private int currentlyDecoding = DecodeUnit.TYPE_UNKNOWN; private int currentlyDecoding = DecodeUnit.TYPE_UNKNOWN;
// Sequencing state // Sequencing state
private short lastSequenceNumber; private int nextFrameNumber = 1;
private int nextPacketNumber;
private int startFrameNumber = 1;
private boolean waitingForFrameStart;
// Cached objects // Cached objects
private ByteBufferDescriptor cachedDesc = new ByteBufferDescriptor(null, 0, 0); private ByteBufferDescriptor cachedDesc = new ByteBufferDescriptor(null, 0, 0);
@ -33,31 +36,31 @@ public class VideoDepacketizer {
this.controlListener = controlListener; this.controlListener = controlListener;
} }
private void clearAvcNalState() private void clearAvcFrameState()
{ {
avcNalDataChain = null; avcFrameDataChain = null;
avcNalDataLength = 0; avcFrameDataLength = 0;
} }
private void reassembleAvcNal() private void reassembleAvcFrame(int frameNumber)
{ {
// This is the start of a new NAL // This is the start of a new frame
if (avcNalDataChain != null && avcNalDataLength != 0) { if (avcFrameDataChain != null && avcFrameDataLength != 0) {
// Construct the H264 decode unit // Construct the H264 decode unit
DecodeUnit du = new DecodeUnit(DecodeUnit.TYPE_H264, avcNalDataChain, avcNalDataLength, 0); DecodeUnit du = new DecodeUnit(DecodeUnit.TYPE_H264, avcFrameDataChain, avcFrameDataLength, 0, frameNumber);
if (directSubmitDr != null) { if (directSubmitDr != null) {
// Submit directly to the decoder // Submit directly to the decoder
directSubmitDr.submitDecodeUnit(du); directSubmitDr.submitDecodeUnit(du);
} }
else if (!decodedUnits.offer(du)) { else if (!decodedUnits.offer(du)) {
// We need a new IDR frame since we're discarding data now
System.out.println("Video decoder is too slow! Forced to drop decode units"); System.out.println("Video decoder is too slow! Forced to drop decode units");
// Invalidate all frames from the start of the DU queue to this frame number
controlListener.connectionSinkTooSlow(decodedUnits.remove().getFrameNumber(), frameNumber);
decodedUnits.clear(); decodedUnits.clear();
controlListener.connectionSinkTooSlow();
} }
// Clear old state // Clear old state
clearAvcNalState(); clearAvcFrameState();
} }
} }
@ -80,11 +83,11 @@ public class VideoDepacketizer {
if (NAL.isAvcFrameStart(cachedDesc)) if (NAL.isAvcFrameStart(cachedDesc))
{ {
// Reassemble any pending AVC NAL // Reassemble any pending AVC NAL
reassembleAvcNal(); reassembleAvcFrame(packet.getFrameIndex());
// Setup state for the new NAL // Setup state for the new NAL
avcNalDataChain = new LinkedList<ByteBufferDescriptor>(); avcFrameDataChain = new LinkedList<ByteBufferDescriptor>();
avcNalDataLength = 0; avcFrameDataLength = 0;
} }
// Skip the start sequence // Skip the start sequence
@ -97,7 +100,7 @@ public class VideoDepacketizer {
if (currentlyDecoding == DecodeUnit.TYPE_H264 && if (currentlyDecoding == DecodeUnit.TYPE_H264 &&
NAL.isPadding(cachedDesc)) { NAL.isPadding(cachedDesc)) {
// The decode unit is complete // The decode unit is complete
reassembleAvcNal(); reassembleAvcFrame(packet.getFrameIndex());
} }
// Not decoding AVC // Not decoding AVC
@ -133,13 +136,13 @@ public class VideoDepacketizer {
location.length--; location.length--;
} }
if (currentlyDecoding == DecodeUnit.TYPE_H264 && avcNalDataChain != null) if (currentlyDecoding == DecodeUnit.TYPE_H264 && avcFrameDataChain != null)
{ {
ByteBufferDescriptor data = new ByteBufferDescriptor(location.data, start, location.offset-start); ByteBufferDescriptor data = new ByteBufferDescriptor(location.data, start, location.offset-start);
// Add a buffer descriptor describing the NAL data in this packet // Add a buffer descriptor describing the NAL data in this packet
avcNalDataChain.add(data); avcFrameDataChain.add(data);
avcNalDataLength += location.offset-start; avcFrameDataLength += location.offset-start;
} }
} }
} }
@ -148,15 +151,13 @@ public class VideoDepacketizer {
{ {
if (firstPacket) { if (firstPacket) {
// Setup state for the new frame // Setup state for the new frame
avcNalDataChain = new LinkedList<ByteBufferDescriptor>(); avcFrameDataChain = new LinkedList<ByteBufferDescriptor>();
avcNalDataLength = 0; avcFrameDataLength = 0;
} }
// Add the payload data to the chain // Add the payload data to the chain
if (avcNalDataChain != null) { avcFrameDataChain.add(location);
avcNalDataChain.add(location); avcFrameDataLength += location.length;
avcNalDataLength += location.length;
}
} }
public void addInputData(VideoPacket packet) public void addInputData(VideoPacket packet)
@ -164,29 +165,87 @@ public class VideoDepacketizer {
ByteBufferDescriptor location = packet.getNewPayloadDescriptor(); ByteBufferDescriptor location = packet.getNewPayloadDescriptor();
// Runt packets get decoded using the slow path // Runt packets get decoded using the slow path
// These packets stand alone so there's no need to verify
// sequencing before submitting
if (location.length < 968) { if (location.length < 968) {
addInputDataSlow(packet, location); addInputDataSlow(packet, location);
return; return;
} }
int frameIndex = packet.getFrameIndex();
int packetIndex = packet.getPacketIndex(); int packetIndex = packet.getPacketIndex();
int packetsInFrame = packet.getTotalPackets(); int packetsInFrame = packet.getTotalPackets();
// We can use FEC to correct single packet errors
// on single packet frames because we just get a
// duplicate of the original packet
if (packetsInFrame == 1 && packetIndex == 1 &&
nextPacketNumber == 0 && frameIndex == nextFrameNumber) {
System.out.println("Using FEC for error correction");
nextPacketNumber = 1;
}
// Discard FEC data early // Discard FEC data early
if (packetIndex >= packetsInFrame) { else if (packetIndex >= packetsInFrame) {
return; return;
} }
// Check that this is the next frame
boolean firstPacket = (packet.getFlags() & VideoPacket.FLAG_SOF) != 0;
if (firstPacket && waitingForFrameStart) {
// This is the next frame after a loss event
controlListener.connectionDetectedFrameLoss(startFrameNumber, frameIndex - 1);
startFrameNumber = nextFrameNumber = frameIndex;
nextPacketNumber = 0;
waitingForFrameStart = false;
clearAvcFrameState();
}
else if (frameIndex > nextFrameNumber) {
// Nope, but we can still work with it if it's
// the start of the next frame
if (firstPacket) {
System.out.println("Got start of frame "+frameIndex+
" when expecting packet "+nextPacketNumber+
" of frame "+nextFrameNumber);
controlListener.connectionDetectedFrameLoss(startFrameNumber, frameIndex - 1);
startFrameNumber = nextFrameNumber = frameIndex;
nextPacketNumber = 0;
clearAvcFrameState();
}
else {
System.out.println("Got packet "+packetIndex+" of frame "+frameIndex+
" when expecting packet "+nextPacketNumber+
" of frame "+nextFrameNumber);
// We dropped the start of this frame too, so pick up on the next frame
waitingForFrameStart = true;
return;
}
}
else if (frameIndex < nextFrameNumber) {
System.out.println("Frame "+frameIndex+" is behind our current frame number "+nextFrameNumber);
// Discard the frame silently if it's behind our current sequence number
return;
}
// We know it's the right frame, now check the packet number
if (packetIndex != nextPacketNumber) {
System.out.println("Frame "+frameIndex+": expected packet "+nextPacketNumber+" but got "+packetIndex);
// At this point, we're guaranteed that it's not FEC data that we lost
waitingForFrameStart = true;
return;
}
nextPacketNumber++;
// Remove extra padding // Remove extra padding
location.length = packet.getPayloadLength(); location.length = packet.getPayloadLength();
boolean firstPacket = (packet.getFlags() & VideoPacket.FLAG_SOF) != 0;
if (firstPacket) if (firstPacket)
{ {
if (NAL.getSpecialSequenceDescriptor(location, cachedDesc) && NAL.isAvcFrameStart(cachedDesc) if (NAL.getSpecialSequenceDescriptor(location, cachedDesc) && NAL.isAvcFrameStart(cachedDesc)
&& cachedDesc.data[cachedDesc.offset+cachedDesc.length] == 0x67) && cachedDesc.data[cachedDesc.offset+cachedDesc.length] == 0x67)
{ {
// SPS and PPS prefix is padded between NALs, so we must decode it with the slow path // SPS and PPS prefix is padded between NALs, so we must decode it with the slow path
clearAvcFrameState();
addInputDataSlow(packet, location); addInputDataSlow(packet, location);
return; return;
} }
@ -194,32 +253,21 @@ public class VideoDepacketizer {
addInputDataFast(packet, location, firstPacket); addInputDataFast(packet, location, firstPacket);
// We can't use the EOF flag here because real frames can be split across
// multiple "frames" when packetized to fit under the bandwidth ceiling
if (packetIndex + 1 >= packetsInFrame) {
nextFrameNumber++;
nextPacketNumber = 0;
}
if ((packet.getFlags() & VideoPacket.FLAG_EOF) != 0) { if ((packet.getFlags() & VideoPacket.FLAG_EOF) != 0) {
reassembleAvcNal(); reassembleAvcFrame(packet.getFrameIndex());
startFrameNumber = nextFrameNumber;
} }
} }
public void addInputData(RtpPacket packet) public void addInputData(RtpPacket packet)
{ {
short seq = packet.getSequenceNumber();
// Toss out the current NAL if we receive a packet that is
// out of sequence
if (lastSequenceNumber != 0 &&
(short)(lastSequenceNumber + 1) != seq)
{
System.out.println("Received OOS video data (expected "+(lastSequenceNumber + 1)+", got "+seq+")");
// Reset the depacketizer state
clearAvcNalState();
// Request an IDR frame
controlListener.connectionDetectedPacketLoss();
}
lastSequenceNumber = seq;
// Pass the payload to the non-sequencing parser
ByteBufferDescriptor rtpPayload = packet.getNewPayloadDescriptor(); ByteBufferDescriptor rtpPayload = packet.getNewPayloadDescriptor();
addInputData(new VideoPacket(rtpPayload)); addInputData(new VideoPacket(rtpPayload));
} }

View File

@ -8,6 +8,7 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.concurrent.LinkedBlockingQueue;
import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.NvConnectionListener;
import com.limelight.nvstream.StreamConfiguration; import com.limelight.nvstream.StreamConfiguration;
@ -60,7 +61,7 @@ public class ControlStream implements ConnectionStatusListener {
private Thread heartbeatThread; private Thread heartbeatThread;
private Thread jitterThread; private Thread jitterThread;
private Thread resyncThread; private Thread resyncThread;
private Object resyncNeeded = new Object(); private LinkedBlockingQueue<int[]> invalidReferenceFrameTuples = new LinkedBlockingQueue<int[]>();
private boolean aborting = false; private boolean aborting = false;
public ControlStream(InetAddress host, NvConnectionListener listener, StreamConfiguration streamConfig) public ControlStream(InetAddress host, NvConnectionListener listener, StreamConfiguration streamConfig)
@ -140,12 +141,6 @@ public class ControlStream implements ConnectionStatusListener {
} }
} }
public void requestResync() throws IOException
{
System.out.println("CTL: Requesting IDR frame");
sendResync();
}
public void start() throws IOException public void start() throws IOException
{ {
// Use a finite timeout during the handshake process // Use a finite timeout during the handshake process
@ -188,18 +183,36 @@ public class ControlStream implements ConnectionStatusListener {
public void run() { public void run() {
while (!isInterrupted()) while (!isInterrupted())
{ {
int[] tuple;
// Wait for a tuple
try { try {
// Wait for notification of a resync needed tuple = invalidReferenceFrameTuples.take();
synchronized (resyncNeeded) {
resyncNeeded.wait();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
listener.connectionTerminated(e); listener.connectionTerminated(e);
return; return;
} }
// Aggregate all lost frames into one range
int[] lastTuple = null;
for (;;) {
int[] nextTuple = lastTuple = invalidReferenceFrameTuples.poll();
if (nextTuple == null) {
break;
}
lastTuple = nextTuple;
}
// Update the end of the range to the latest tuple
if (lastTuple != null) {
tuple[1] = lastTuple[1];
}
try { try {
requestResync(); System.err.println("Invalidating reference frames from "+tuple[0]+" to "+tuple[1]);
ControlStream.this.sendResync(tuple[0], tuple[1]);
System.err.println("Frames invalidated");
} catch (IOException e) { } catch (IOException e) {
listener.connectionTerminated(e); listener.connectionTerminated(e);
return; return;
@ -243,12 +256,14 @@ public class ControlStream implements ConnectionStatusListener {
return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405)); return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405));
} }
private void sendResync() throws IOException private void sendResync(int firstLostFrame, int lastLostFrame) throws IOException
{ {
ByteBuffer conf = ByteBuffer.wrap(new byte[PPAYLEN_RESYNC]).order(ByteOrder.LITTLE_ENDIAN); ByteBuffer conf = ByteBuffer.wrap(new byte[PPAYLEN_RESYNC]).order(ByteOrder.LITTLE_ENDIAN);
conf.putLong(0); conf.putLong(0);
conf.putLong(0xFFFFF); conf.putLong(0xFFFFF);
//conf.putLong(firstLostFrame);
//conf.putLong(lastLostFrame);
sendAndGetReply(new NvCtlPacket(PTYPE_RESYNC, PPAYLEN_RESYNC, conf.array())); sendAndGetReply(new NvCtlPacket(PTYPE_RESYNC, PPAYLEN_RESYNC, conf.array()));
} }
@ -413,14 +428,11 @@ public class ControlStream implements ConnectionStatusListener {
abort(); abort();
} }
private void resyncConnection() { private void resyncConnection(int firstLostFrame, int lastLostFrame) {
synchronized (resyncNeeded) { invalidReferenceFrameTuples.add(new int[]{firstLostFrame, lastLostFrame});
// Wake up the resync thread
resyncNeeded.notify();
}
} }
public void connectionDetectedPacketLoss() { public void connectionDetectedFrameLoss(int firstLostFrame, int lastLostFrame) {
if (System.currentTimeMillis() > LOSS_PERIOD_MS + lossTimestamp) { if (System.currentTimeMillis() > LOSS_PERIOD_MS + lossTimestamp) {
lossCount++; lossCount++;
lossTimestamp = System.currentTimeMillis(); lossTimestamp = System.currentTimeMillis();
@ -433,15 +445,15 @@ public class ControlStream implements ConnectionStatusListener {
} }
} }
resyncConnection(); resyncConnection(firstLostFrame, lastLostFrame);
} }
public void connectionSinkTooSlow() { public void connectionSinkTooSlow(int firstLostFrame, int lastLostFrame) {
if (++slowSinkCount == MAX_SLOW_SINK_COUNT) { if (++slowSinkCount == MAX_SLOW_SINK_COUNT) {
listener.displayTransientMessage("Your device is processing the A/V data too slowly. Try lowering stream settings."); listener.displayTransientMessage("Your device is processing the A/V data too slowly. Try lowering stream settings.");
slowSinkCount = -MAX_SLOW_SINK_COUNT * MESSAGE_DELAY_FACTOR; slowSinkCount = -MAX_SLOW_SINK_COUNT * MESSAGE_DELAY_FACTOR;
} }
resyncConnection(); resyncConnection(firstLostFrame, lastLostFrame);
} }
} }