Reference count packets in the RTP queue so they don't get overwritten while queued

This commit is contained in:
Cameron Gutman 2015-10-31 15:27:33 -07:00
parent 8a465edad9
commit df59c99f80
6 changed files with 41 additions and 5 deletions

View File

@ -67,4 +67,22 @@ public class RtpPacket implements RtpPacketFields {
{ {
bb.reinitialize(buffer.data, buffer.offset+headerSize, buffer.length-headerSize); bb.reinitialize(buffer.data, buffer.offset+headerSize, buffer.length-headerSize);
} }
@Override
public int referencePacket() {
// There's no circular buffer for audio packets so this is a no-op
return 0;
}
@Override
public int dereferencePacket() {
// There's no circular buffer for audio packets so this is a no-op
return 0;
}
@Override
public int getRefCount() {
// There's no circular buffer for audio packets so this is a no-op
return 0;
}
} }

View File

@ -4,4 +4,10 @@ public interface RtpPacketFields {
public byte getPacketType(); public byte getPacketType();
public short getRtpSequenceNumber(); public short getRtpSequenceNumber();
public int referencePacket();
public int dereferencePacket();
public int getRefCount();
} }

View File

@ -68,6 +68,9 @@ public class RtpReorderQueue {
oldestQueuedEntry = entry; oldestQueuedEntry = entry;
} }
// Add a reference to the packet while it's in the queue
packet.referencePacket();
if (head) { if (head) {
queue.addFirst(entry); queue.addFirst(entry);
} }
@ -111,6 +114,11 @@ public class RtpReorderQueue {
return lowestSeqEntry; return lowestSeqEntry;
} }
private void removeEntry(RtpQueueEntry entry) {
queue.remove(entry);
entry.packet.dereferencePacket();
}
private RtpQueueEntry validateQueueConstraints() { private RtpQueueEntry validateQueueConstraints() {
if (queue.isEmpty()) { if (queue.isEmpty()) {
return null; return null;
@ -121,14 +129,14 @@ public class RtpReorderQueue {
// Check that the queue's time constraint is satisfied // Check that the queue's time constraint is satisfied
if (TimeHelper.getMonotonicMillis() - oldestQueuedTime > maxQueueTime) { if (TimeHelper.getMonotonicMillis() - oldestQueuedTime > maxQueueTime) {
LimeLog.info("Discarding RTP packet queued for too long: "+(TimeHelper.getMonotonicMillis() - oldestQueuedTime)); LimeLog.info("Discarding RTP packet queued for too long: "+(TimeHelper.getMonotonicMillis() - oldestQueuedTime));
queue.remove(oldestQueuedEntry); removeEntry(oldestQueuedEntry);
needsUpdate = true; needsUpdate = true;
} }
// Check that the queue's size constraint is satisfied // Check that the queue's size constraint is satisfied
if (queue.size() == maxSize) { if (queue.size() == maxSize) {
LimeLog.info("Discarding RTP packet after queue overgrowth"); LimeLog.info("Discarding RTP packet after queue overgrowth");
queue.remove(oldestQueuedEntry); removeEntry(oldestQueuedEntry);
needsUpdate = true; needsUpdate = true;
} }
@ -203,6 +211,8 @@ public class RtpReorderQueue {
} }
} }
// This function returns a referenced packet. The caller must dereference
// the packet when it is finished.
public RtpPacketFields getQueuedPacket() { public RtpPacketFields getQueuedPacket() {
RtpQueueEntry queuedEntry = null; RtpQueueEntry queuedEntry = null;

View File

@ -219,6 +219,7 @@ public class AudioStream {
if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) { if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) {
while ((queuedPacket = (RtpPacket) rtpQueue.getQueuedPacket()) != null) { while ((queuedPacket = (RtpPacket) rtpQueue.getQueuedPacket()) != null) {
depacketizer.decodeInputData(queuedPacket); depacketizer.decodeInputData(queuedPacket);
queuedPacket.dereferencePacket();
} }
} }
} }

View File

@ -115,7 +115,7 @@ public class VideoPacket implements RtpPacketFields {
return rtpSequenceNumber; return rtpSequenceNumber;
} }
int referencePacket() { public int referencePacket() {
if (useAtomicRefCount) { if (useAtomicRefCount) {
return duAtomicRefCount.incrementAndGet(); return duAtomicRefCount.incrementAndGet();
} }
@ -124,7 +124,7 @@ public class VideoPacket implements RtpPacketFields {
} }
} }
int dereferencePacket() { public int dereferencePacket() {
if (useAtomicRefCount) { if (useAtomicRefCount) {
return duAtomicRefCount.decrementAndGet(); return duAtomicRefCount.decrementAndGet();
} }
@ -133,7 +133,7 @@ public class VideoPacket implements RtpPacketFields {
} }
} }
int getRefCount() { public int getRefCount() {
if (useAtomicRefCount) { if (useAtomicRefCount) {
return duAtomicRefCount.get(); return duAtomicRefCount.get();
} }

View File

@ -222,6 +222,7 @@ public class VideoStream {
// The packet queue now has packets ready // The packet queue now has packets ready
while ((queuedPacket = (VideoPacket) rtpQueue.getQueuedPacket()) != null) { while ((queuedPacket = (VideoPacket) rtpQueue.getQueuedPacket()) != null) {
depacketizer.addInputData(queuedPacket); depacketizer.addInputData(queuedPacket);
queuedPacket.dereferencePacket();
} }
} }