diff --git a/moonlight-common/src/com/limelight/nvstream/av/RtpPacket.java b/moonlight-common/src/com/limelight/nvstream/av/RtpPacket.java index 06757e3c..c14b56ad 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/RtpPacket.java +++ b/moonlight-common/src/com/limelight/nvstream/av/RtpPacket.java @@ -3,7 +3,7 @@ package com.limelight.nvstream.av; import java.nio.ByteBuffer; import java.nio.ByteOrder; -public class RtpPacket { +public class RtpPacket implements RtpPacketFields { private byte packetType; private short seqNum; @@ -53,7 +53,7 @@ public class RtpPacket { return packetType; } - public short getSequenceNumber() + public short getRtpSequenceNumber() { return seqNum; } diff --git a/moonlight-common/src/com/limelight/nvstream/av/RtpPacketFields.java b/moonlight-common/src/com/limelight/nvstream/av/RtpPacketFields.java new file mode 100644 index 00000000..c1b4356a --- /dev/null +++ b/moonlight-common/src/com/limelight/nvstream/av/RtpPacketFields.java @@ -0,0 +1,7 @@ +package com.limelight.nvstream.av; + +public interface RtpPacketFields { + public byte getPacketType(); + + public short getRtpSequenceNumber(); +} diff --git a/moonlight-common/src/com/limelight/nvstream/av/RtpReorderQueue.java b/moonlight-common/src/com/limelight/nvstream/av/RtpReorderQueue.java new file mode 100644 index 00000000..1e9fda5f --- /dev/null +++ b/moonlight-common/src/com/limelight/nvstream/av/RtpReorderQueue.java @@ -0,0 +1,230 @@ +package com.limelight.nvstream.av; + +import java.util.Iterator; +import java.util.LinkedList; + +import com.limelight.LimeLog; + +public class RtpReorderQueue { + private final int maxSize; + private final int maxQueueTime; + private final LinkedList queue; + + private short nextRtpSequenceNumber; + + private long oldestQueuedTime; + private RtpQueueEntry oldestQueuedEntry; + + public enum RtpQueueStatus { + HANDLE_IMMEDIATELY, + QUEUED_NOTHING_READY, + QUEUED_PACKETS_READY, + REJECTED + }; + + public RtpReorderQueue() { + this.maxSize = 16; + this.maxQueueTime = 40; + this.queue = new LinkedList(); + + this.oldestQueuedTime = Long.MAX_VALUE; + this.nextRtpSequenceNumber = Short.MAX_VALUE; + } + + public RtpReorderQueue(int maxSize, int maxQueueTime) { + this.maxSize = maxSize; + this.maxQueueTime = maxQueueTime; + this.queue = new LinkedList(); + + this.oldestQueuedTime = Long.MAX_VALUE; + this.nextRtpSequenceNumber = Short.MAX_VALUE; + } + + private boolean queuePacket(boolean head, RtpPacketFields packet) { + short seq = packet.getRtpSequenceNumber(); + + if (nextRtpSequenceNumber != Short.MAX_VALUE) { + // Don't queue packets we're already ahead of + if (seq < nextRtpSequenceNumber) { + return false; + } + + // Don't queue duplicates either + for (RtpQueueEntry existingEntry : queue) { + if (existingEntry.sequenceNumber == seq) { + return false; + } + } + } + + RtpQueueEntry entry = new RtpQueueEntry(); + entry.packet = packet; + entry.queueTime = System.currentTimeMillis(); + entry.sequenceNumber = seq; + + if (oldestQueuedTime == Long.MAX_VALUE) { + oldestQueuedTime = System.currentTimeMillis(); + } + + if (head) { + queue.addFirst(entry); + } + else { + queue.addLast(entry); + } + + return true; + } + + private void updateOldestQueued() { + oldestQueuedTime = Long.MAX_VALUE; + oldestQueuedEntry = null; + for (RtpQueueEntry entry : queue) { + if (entry.queueTime < oldestQueuedTime) { + oldestQueuedEntry = entry; + oldestQueuedTime = entry.queueTime; + } + } + } + + private RtpQueueEntry getEntryByLowestSeq() { + short nextSeq = Short.MAX_VALUE; + RtpQueueEntry lowestSeqEntry = null; + + for (RtpQueueEntry entry : queue) { + if (entry.sequenceNumber < nextSeq) { + lowestSeqEntry = entry; + nextSeq = entry.sequenceNumber; + } + } + + if (nextSeq != Short.MAX_VALUE) { + nextRtpSequenceNumber = nextSeq; + } + + return lowestSeqEntry; + } + + private RtpQueueEntry validateQueueConstraints() { + if (queue.isEmpty()) { + return null; + } + + boolean needsUpdate = false; + + // Check that the queue's time constraint is satisfied + if (System.currentTimeMillis() - oldestQueuedTime > maxQueueTime) { + LimeLog.info("Discarding RTP packet queued for too long"); + queue.remove(oldestQueuedEntry); + needsUpdate = true; + } + + // Check that the queue's size constraint is satisfied + if (queue.size() == maxSize) { + LimeLog.info("Discarding RTP packet after queue overgrowth"); + queue.remove(oldestQueuedEntry); + needsUpdate = true; + } + + if (needsUpdate) { + // Recalculate the oldest entry if needed + updateOldestQueued(); + + // Return the lowest seq queued + return getEntryByLowestSeq(); + } + else { + return null; + } + } + + public RtpQueueStatus addPacket(RtpPacketFields packet) { + if (nextRtpSequenceNumber != Short.MAX_VALUE && + packet.getRtpSequenceNumber() < nextRtpSequenceNumber) { + // Reject packets behind our current sequence number + return RtpQueueStatus.REJECTED; + } + + if (queue.isEmpty()) { + // Return immediately for an exact match with an empty queue + if (nextRtpSequenceNumber == Short.MAX_VALUE || + packet.getRtpSequenceNumber() == nextRtpSequenceNumber) { + nextRtpSequenceNumber = (short) (packet.getRtpSequenceNumber() + 1); + return RtpQueueStatus.HANDLE_IMMEDIATELY; + } + else { + // Queue is empty currently so we'll put this packet on there + if (queuePacket(false, packet)) { + return RtpQueueStatus.QUEUED_NOTHING_READY; + } + else { + return RtpQueueStatus.REJECTED; + } + } + } + else { + // Validate that the queue remains within our contraints + RtpQueueEntry lowestEntry = validateQueueConstraints(); + + // Queue has data inside, so we need to see where this packet fits + if (packet.getRtpSequenceNumber() == nextRtpSequenceNumber) { + // It fits in a hole where we need a packet, now we have some ready + if (queuePacket(true, packet)) { + return RtpQueueStatus.QUEUED_PACKETS_READY; + } + else { + return RtpQueueStatus.REJECTED; + } + } + else { + if (queuePacket(false, packet)) { + // Constraint validation may have changed the oldest packet to one that + // matches the next sequence number + return (lowestEntry != null) ? RtpQueueStatus.QUEUED_PACKETS_READY : + RtpQueueStatus.QUEUED_NOTHING_READY; + } + else { + return RtpQueueStatus.REJECTED; + } + } + } + } + + public RtpPacketFields getQueuedPacket() { + RtpQueueEntry queuedEntry = null; + + System.out.println("Pulling from reordered queue"); + + // Find the matching entry + Iterator i = queue.iterator(); + while (i.hasNext()) { + RtpQueueEntry entry = i.next(); + if (entry.sequenceNumber == nextRtpSequenceNumber) { + nextRtpSequenceNumber++; + queuedEntry = entry; + i.remove(); + break; + } + } + + // Bail if we found nothing + if (queuedEntry == null) { + // Update the oldest queued packet time + updateOldestQueued(); + + return null; + } + + // We don't update the oldest queued entry here, because we know + // the caller will call again until it receives null + + return queuedEntry.packet; + } + + private class RtpQueueEntry { + public RtpPacketFields packet; + + public short sequenceNumber; + public long queueTime; + } +} diff --git a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java index a9c9144a..0cb3c5bb 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java +++ b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioDepacketizer.java @@ -77,12 +77,7 @@ public class AudioDepacketizer { public void decodeInputData(RtpPacket packet) { - short seq = packet.getSequenceNumber(); - - if (packet.getPacketType() != 97) { - // Only type 97 is audio - return; - } + short seq = packet.getRtpSequenceNumber(); // Toss out the current NAL if we receive a packet that is // out of sequence diff --git a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java index f43969a0..e80e8d7b 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java +++ b/moonlight-common/src/com/limelight/nvstream/av/audio/AudioStream.java @@ -11,6 +11,7 @@ import java.util.LinkedList; import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.av.ByteBufferDescriptor; import com.limelight.nvstream.av.RtpPacket; +import com.limelight.nvstream.av.RtpReorderQueue; public class AudioStream { public static final int RTP_PORT = 48000; @@ -154,7 +155,9 @@ public class AudioStream { public void run() { byte[] buffer = new byte[MAX_PACKET_SIZE]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - RtpPacket rtpPacket = new RtpPacket(buffer); + RtpPacket queuedPacket, rtpPacket = new RtpPacket(buffer); + RtpReorderQueue rtpQueue = new RtpReorderQueue(); + RtpReorderQueue.RtpQueueStatus queueStatus; while (!isInterrupted()) { @@ -163,9 +166,38 @@ public class AudioStream { // DecodeInputData() doesn't hold onto the buffer so we are free to reuse it rtpPacket.initializeWithLength(packet.getLength()); - depacketizer.decodeInputData(rtpPacket); - packet.setLength(MAX_PACKET_SIZE); + // Throw away non-audio packets before queuing + if (rtpPacket.getPacketType() != 97) { + // Only type 97 is audio + packet.setLength(MAX_PACKET_SIZE); + continue; + } + + queueStatus = rtpQueue.addPacket(rtpPacket); + if (queueStatus == RtpReorderQueue.RtpQueueStatus.HANDLE_IMMEDIATELY) { + // Send directly to the depacketizer + depacketizer.decodeInputData(rtpPacket); + packet.setLength(MAX_PACKET_SIZE); + } + else { + if (queueStatus != RtpReorderQueue.RtpQueueStatus.REJECTED) { + // The queue consumed our packet, so we must allocate a new one + buffer = new byte[MAX_PACKET_SIZE]; + packet = new DatagramPacket(buffer, buffer.length); + rtpPacket = new RtpPacket(buffer); + } + else { + packet.setLength(MAX_PACKET_SIZE); + } + + // If packets are ready, pull them and send them to the depacketizer + if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) { + while ((queuedPacket = (RtpPacket) rtpQueue.getQueuedPacket()) != null) { + depacketizer.decodeInputData(queuedPacket); + } + } + } } catch (IOException e) { connListener.connectionTerminated(e); return; diff --git a/moonlight-common/src/com/limelight/nvstream/av/video/VideoPacket.java b/moonlight-common/src/com/limelight/nvstream/av/video/VideoPacket.java index c25192a6..62c1a7f7 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/video/VideoPacket.java +++ b/moonlight-common/src/com/limelight/nvstream/av/video/VideoPacket.java @@ -5,8 +5,9 @@ import java.nio.ByteOrder; import com.limelight.nvstream.av.ByteBufferDescriptor; import com.limelight.nvstream.av.RtpPacket; +import com.limelight.nvstream.av.RtpPacketFields; -public class VideoPacket { +public class VideoPacket implements RtpPacketFields { private ByteBufferDescriptor buffer; private ByteBuffer byteBuffer; @@ -16,6 +17,8 @@ public class VideoPacket { private int flags; private int streamPacketIndex; + private short rtpSequenceNumber; + public static final int FLAG_CONTAINS_PIC_DATA = 0x1; public static final int FLAG_EOF = 0x2; public static final int FLAG_SOF = 0x4; @@ -33,6 +36,8 @@ public class VideoPacket { // Back to beginning byteBuffer.rewind(); + // No sequence number field is present in these packets + // Read the video header fields streamPacketIndex = (byteBuffer.getInt() >> 8) & 0xFFFFFF; frameIndex = byteBuffer.getInt(); @@ -47,7 +52,12 @@ public class VideoPacket { public void initializeWithLength(int length) { - // Skip the RTP header + // Read the RTP sequence number field (big endian) + byteBuffer.position(2); + rtpSequenceNumber = byteBuffer.getShort(); + rtpSequenceNumber = (short)(((rtpSequenceNumber << 8) & 0xFF00) | (((rtpSequenceNumber >> 8) & 0x00FF))); + + // Skip the rest of the RTP header byteBuffer.position(RtpPacket.MAX_HEADER_SIZE); // Read the video header fields @@ -86,4 +96,13 @@ public class VideoPacket { { bb.reinitialize(buffer.data, buffer.offset+dataOffset, buffer.length-dataOffset); } + + public byte getPacketType() { + // No consumers use this field so we don't look it up + return -1; + } + + public short getRtpSequenceNumber() { + return rtpSequenceNumber; + } } diff --git a/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java b/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java index b9567696..60c3abd4 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java +++ b/moonlight-common/src/com/limelight/nvstream/av/video/VideoStream.java @@ -14,6 +14,7 @@ import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.StreamConfiguration; import com.limelight.nvstream.av.ConnectionStatusListener; import com.limelight.nvstream.av.RtpPacket; +import com.limelight.nvstream.av.RtpReorderQueue; public class VideoStream { public static final int RTP_PORT = 47998; @@ -191,7 +192,10 @@ public class VideoStream { @Override public void run() { VideoPacket ring[] = new VideoPacket[VIDEO_RING_SIZE]; + VideoPacket queuedPacket; int ringIndex = 0; + RtpReorderQueue rtpQueue = new RtpReorderQueue(); + RtpReorderQueue.RtpQueueStatus queueStatus; // Preinitialize the ring buffer int requiredBufferSize = streamConfig.getMaxPacketSize() + RtpPacket.MAX_HEADER_SIZE; @@ -211,9 +215,22 @@ public class VideoStream { packet.setData(buffer, 0, buffer.length); rtp.receive(packet); - // Submit video data to the depacketizer + // Initialize the video packet ring[ringIndex].initializeWithLength(packet.getLength()); - depacketizer.addInputData(ring[ringIndex]); + + queueStatus = rtpQueue.addPacket(ring[ringIndex]); + if (queueStatus == RtpReorderQueue.RtpQueueStatus.HANDLE_IMMEDIATELY) { + // Submit immediately because the packet is in order + depacketizer.addInputData(ring[ringIndex]); + } + else if (queueStatus == RtpReorderQueue.RtpQueueStatus.QUEUED_PACKETS_READY) { + // The packet queue now has packets ready + while ((queuedPacket = (VideoPacket) rtpQueue.getQueuedPacket()) != null) { + depacketizer.addInputData(queuedPacket); + } + } + + // The ring is large enough to account for the maximum queued packets ringIndex = (ringIndex + 1) % VIDEO_RING_SIZE; } catch (IOException e) { listener.connectionTerminated(e);