Implement an RTP queue to handle out of order video and audio packets

This commit is contained in:
Cameron Gutman 2014-08-03 17:59:10 -07:00
parent 2d55562dd3
commit 0f0e41d5a4
7 changed files with 315 additions and 15 deletions

View File

@ -3,7 +3,7 @@ package com.limelight.nvstream.av;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class RtpPacket {
public class RtpPacket implements RtpPacketFields {
private byte packetType;
private short seqNum;
@ -53,7 +53,7 @@ public class RtpPacket {
return packetType;
}
public short getSequenceNumber()
public short getRtpSequenceNumber()
{
return seqNum;
}

View File

@ -0,0 +1,7 @@
package com.limelight.nvstream.av;
public interface RtpPacketFields {
public byte getPacketType();
public short getRtpSequenceNumber();
}

View File

@ -0,0 +1,230 @@
package com.limelight.nvstream.av;
import java.util.Iterator;
import java.util.LinkedList;
import com.limelight.LimeLog;
public class RtpReorderQueue {
private final int maxSize;
private final int maxQueueTime;
private final LinkedList<RtpQueueEntry> queue;
private short nextRtpSequenceNumber;
private long oldestQueuedTime;
private RtpQueueEntry oldestQueuedEntry;
public enum RtpQueueStatus {
HANDLE_IMMEDIATELY,
QUEUED_NOTHING_READY,
QUEUED_PACKETS_READY,
REJECTED
};
public RtpReorderQueue() {
this.maxSize = 16;
this.maxQueueTime = 40;
this.queue = new LinkedList<RtpQueueEntry>();
this.oldestQueuedTime = Long.MAX_VALUE;
this.nextRtpSequenceNumber = Short.MAX_VALUE;
}
public RtpReorderQueue(int maxSize, int maxQueueTime) {
this.maxSize = maxSize;
this.maxQueueTime = maxQueueTime;
this.queue = new LinkedList<RtpQueueEntry>();
this.oldestQueuedTime = Long.MAX_VALUE;
this.nextRtpSequenceNumber = Short.MAX_VALUE;
}
private boolean queuePacket(boolean head, RtpPacketFields packet) {
short seq = packet.getRtpSequenceNumber();
if (nextRtpSequenceNumber != Short.MAX_VALUE) {
// Don't queue packets we're already ahead of
if (seq < nextRtpSequenceNumber) {
return false;
}
// Don't queue duplicates either
for (RtpQueueEntry existingEntry : queue) {
if (existingEntry.sequenceNumber == seq) {
return false;
}
}
}
RtpQueueEntry entry = new RtpQueueEntry();
entry.packet = packet;
entry.queueTime = System.currentTimeMillis();
entry.sequenceNumber = seq;
if (oldestQueuedTime == Long.MAX_VALUE) {
oldestQueuedTime = System.currentTimeMillis();
}
if (head) {
queue.addFirst(entry);
}
else {
queue.addLast(entry);
}
return true;
}
private void updateOldestQueued() {
oldestQueuedTime = Long.MAX_VALUE;
oldestQueuedEntry = null;
for (RtpQueueEntry entry : queue) {
if (entry.queueTime < oldestQueuedTime) {
oldestQueuedEntry = entry;
oldestQueuedTime = entry.queueTime;
}
}
}
private RtpQueueEntry getEntryByLowestSeq() {
short nextSeq = Short.MAX_VALUE;
RtpQueueEntry lowestSeqEntry = null;
for (RtpQueueEntry entry : queue) {
if (entry.sequenceNumber < nextSeq) {
lowestSeqEntry = entry;
nextSeq = entry.sequenceNumber;
}
}
if (nextSeq != Short.MAX_VALUE) {
nextRtpSequenceNumber = nextSeq;
}
return lowestSeqEntry;
}
private RtpQueueEntry validateQueueConstraints() {
if (queue.isEmpty()) {
return null;
}
boolean needsUpdate = false;
// Check that the queue's time constraint is satisfied
if (System.currentTimeMillis() - oldestQueuedTime > maxQueueTime) {
LimeLog.info("Discarding RTP packet queued for too long");
queue.remove(oldestQueuedEntry);
needsUpdate = true;
}
// Check that the queue's size constraint is satisfied
if (queue.size() == maxSize) {
LimeLog.info("Discarding RTP packet after queue overgrowth");
queue.remove(oldestQueuedEntry);
needsUpdate = true;
}
if (needsUpdate) {
// Recalculate the oldest entry if needed
updateOldestQueued();
// Return the lowest seq queued
return getEntryByLowestSeq();
}
else {
return null;
}
}
public RtpQueueStatus addPacket(RtpPacketFields packet) {
if (nextRtpSequenceNumber != Short.MAX_VALUE &&
packet.getRtpSequenceNumber() < nextRtpSequenceNumber) {
// Reject packets behind our current sequence number
return RtpQueueStatus.REJECTED;
}
if (queue.isEmpty()) {
// Return immediately for an exact match with an empty queue
if (nextRtpSequenceNumber == Short.MAX_VALUE ||
packet.getRtpSequenceNumber() == nextRtpSequenceNumber) {
nextRtpSequenceNumber = (short) (packet.getRtpSequenceNumber() + 1);
return RtpQueueStatus.HANDLE_IMMEDIATELY;
}
else {
// Queue is empty currently so we'll put this packet on there
if (queuePacket(false, packet)) {
return RtpQueueStatus.QUEUED_NOTHING_READY;
}
else {
return RtpQueueStatus.REJECTED;
}
}
}
else {
// Validate that the queue remains within our contraints
RtpQueueEntry lowestEntry = validateQueueConstraints();
// Queue has data inside, so we need to see where this packet fits
if (packet.getRtpSequenceNumber() == nextRtpSequenceNumber) {
// It fits in a hole where we need a packet, now we have some ready
if (queuePacket(true, packet)) {
return RtpQueueStatus.QUEUED_PACKETS_READY;
}
else {
return RtpQueueStatus.REJECTED;
}
}
else {
if (queuePacket(false, packet)) {
// Constraint validation may have changed the oldest packet to one that
// matches the next sequence number
return (lowestEntry != null) ? RtpQueueStatus.QUEUED_PACKETS_READY :
RtpQueueStatus.QUEUED_NOTHING_READY;
}
else {
return RtpQueueStatus.REJECTED;
}
}
}
}
public RtpPacketFields getQueuedPacket() {
RtpQueueEntry queuedEntry = null;
System.out.println("Pulling from reordered queue");
// Find the matching entry
Iterator<RtpQueueEntry> i = queue.iterator();
while (i.hasNext()) {
RtpQueueEntry entry = i.next();
if (entry.sequenceNumber == nextRtpSequenceNumber) {
nextRtpSequenceNumber++;
queuedEntry = entry;
i.remove();
break;
}
}
// Bail if we found nothing
if (queuedEntry == null) {
// Update the oldest queued packet time
updateOldestQueued();
return null;
}
// We don't update the oldest queued entry here, because we know
// the caller will call again until it receives null
return queuedEntry.packet;
}
private class RtpQueueEntry {
public RtpPacketFields packet;
public short sequenceNumber;
public long queueTime;
}
}

View File

@ -77,12 +77,7 @@ public class AudioDepacketizer {
public void decodeInputData(RtpPacket packet)
{
short seq = packet.getSequenceNumber();
if (packet.getPacketType() != 97) {
// Only type 97 is audio
return;
}
short seq = packet.getRtpSequenceNumber();
// Toss out the current NAL if we receive a packet that is
// out of sequence

View File

@ -11,6 +11,7 @@ import java.util.LinkedList;
import com.limelight.nvstream.NvConnectionListener;
import com.limelight.nvstream.av.ByteBufferDescriptor;
import com.limelight.nvstream.av.RtpPacket;
import com.limelight.nvstream.av.RtpReorderQueue;
public class AudioStream {
public static final int RTP_PORT = 48000;
@ -154,7 +155,9 @@ public class AudioStream {
public void run() {
byte[] buffer = new byte[MAX_PACKET_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
RtpPacket rtpPacket = new RtpPacket(buffer);
RtpPacket queuedPacket, rtpPacket = new RtpPacket(buffer);
RtpReorderQueue rtpQueue = new RtpReorderQueue();
RtpReorderQueue.RtpQueueStatus queueStatus;
while (!isInterrupted())
{
@ -163,9 +166,38 @@ public class AudioStream {
// DecodeInputData() doesn't hold onto the buffer so we are free to reuse it
rtpPacket.initializeWithLength(packet.getLength());
depacketizer.decodeInputData(rtpPacket);
packet.setLength(MAX_PACKET_SIZE);
// Throw away non-audio packets before queuing
if (rtpPacket.getPacketType() != 97) {
// Only type 97 is audio
packet.setLength(MAX_PACKET_SIZE);
continue;
}
queueStatus = rtpQueue.addPacket(rtpPacket);
if (queueStatus == RtpReorderQueue.RtpQueueStatus.HANDLE_IMMEDIATELY) {
// Send directly to the depacketizer
depacketizer.decodeInputData(rtpPacket);
packet.setLength(MAX_PACKET_SIZE);
}
else {
if (queueStatus != RtpReorderQueue.RtpQueueStatus.REJECTED) {
// The queue consumed our packet, so we must allocate a new one
buffer = new byte[MAX_PACKET_SIZE];
packet = new DatagramPacket(buffer, buffer.length);
rtpPacket = new RtpPacket(buffer);
}
else {
packet.setLength(MAX_PACKET_SIZE);
}
// If packets are ready, pull them and send them to the depacketizer
if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) {
while ((queuedPacket = (RtpPacket) rtpQueue.getQueuedPacket()) != null) {
depacketizer.decodeInputData(queuedPacket);
}
}
}
} catch (IOException e) {
connListener.connectionTerminated(e);
return;

View File

@ -5,8 +5,9 @@ import java.nio.ByteOrder;
import com.limelight.nvstream.av.ByteBufferDescriptor;
import com.limelight.nvstream.av.RtpPacket;
import com.limelight.nvstream.av.RtpPacketFields;
public class VideoPacket {
public class VideoPacket implements RtpPacketFields {
private ByteBufferDescriptor buffer;
private ByteBuffer byteBuffer;
@ -16,6 +17,8 @@ public class VideoPacket {
private int flags;
private int streamPacketIndex;
private short rtpSequenceNumber;
public static final int FLAG_CONTAINS_PIC_DATA = 0x1;
public static final int FLAG_EOF = 0x2;
public static final int FLAG_SOF = 0x4;
@ -33,6 +36,8 @@ public class VideoPacket {
// Back to beginning
byteBuffer.rewind();
// No sequence number field is present in these packets
// Read the video header fields
streamPacketIndex = (byteBuffer.getInt() >> 8) & 0xFFFFFF;
frameIndex = byteBuffer.getInt();
@ -47,7 +52,12 @@ public class VideoPacket {
public void initializeWithLength(int length)
{
// Skip the RTP header
// Read the RTP sequence number field (big endian)
byteBuffer.position(2);
rtpSequenceNumber = byteBuffer.getShort();
rtpSequenceNumber = (short)(((rtpSequenceNumber << 8) & 0xFF00) | (((rtpSequenceNumber >> 8) & 0x00FF)));
// Skip the rest of the RTP header
byteBuffer.position(RtpPacket.MAX_HEADER_SIZE);
// Read the video header fields
@ -86,4 +96,13 @@ public class VideoPacket {
{
bb.reinitialize(buffer.data, buffer.offset+dataOffset, buffer.length-dataOffset);
}
public byte getPacketType() {
// No consumers use this field so we don't look it up
return -1;
}
public short getRtpSequenceNumber() {
return rtpSequenceNumber;
}
}

View File

@ -14,6 +14,7 @@ import com.limelight.nvstream.NvConnectionListener;
import com.limelight.nvstream.StreamConfiguration;
import com.limelight.nvstream.av.ConnectionStatusListener;
import com.limelight.nvstream.av.RtpPacket;
import com.limelight.nvstream.av.RtpReorderQueue;
public class VideoStream {
public static final int RTP_PORT = 47998;
@ -191,7 +192,10 @@ public class VideoStream {
@Override
public void run() {
VideoPacket ring[] = new VideoPacket[VIDEO_RING_SIZE];
VideoPacket queuedPacket;
int ringIndex = 0;
RtpReorderQueue rtpQueue = new RtpReorderQueue();
RtpReorderQueue.RtpQueueStatus queueStatus;
// Preinitialize the ring buffer
int requiredBufferSize = streamConfig.getMaxPacketSize() + RtpPacket.MAX_HEADER_SIZE;
@ -211,9 +215,22 @@ public class VideoStream {
packet.setData(buffer, 0, buffer.length);
rtp.receive(packet);
// Submit video data to the depacketizer
// Initialize the video packet
ring[ringIndex].initializeWithLength(packet.getLength());
depacketizer.addInputData(ring[ringIndex]);
queueStatus = rtpQueue.addPacket(ring[ringIndex]);
if (queueStatus == RtpReorderQueue.RtpQueueStatus.HANDLE_IMMEDIATELY) {
// Submit immediately because the packet is in order
depacketizer.addInputData(ring[ringIndex]);
}
else if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) {
// The packet queue now has packets ready
while ((queuedPacket = (VideoPacket) rtpQueue.getQueuedPacket()) != null) {
depacketizer.addInputData(queuedPacket);
}
}
// The ring is large enough to account for the maximum queued packets
ringIndex = (ringIndex + 1) % VIDEO_RING_SIZE;
} catch (IOException e) {
listener.connectionTerminated(e);