diff --git a/moonlight-common/src/com/limelight/nvstream/av/ConnectionStatusListener.java b/moonlight-common/src/com/limelight/nvstream/av/ConnectionStatusListener.java index fe58cf63..2111f56b 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/ConnectionStatusListener.java +++ b/moonlight-common/src/com/limelight/nvstream/av/ConnectionStatusListener.java @@ -3,7 +3,7 @@ package com.limelight.nvstream.av; public interface ConnectionStatusListener { public void connectionTerminated(); - public void connectionDetectedPacketLoss(); + public void connectionDetectedFrameLoss(int firstLostFrame, int lastLostFrame); - public void connectionSinkTooSlow(); + public void connectionSinkTooSlow(int firstLostFrame, int lastLostFrame); } diff --git a/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java b/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java index f58510e0..369de8da 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java +++ b/moonlight-common/src/com/limelight/nvstream/av/DecodeUnit.java @@ -11,13 +11,15 @@ public class DecodeUnit { private List bufferList; private int dataLength; private int flags; + private int frameNumber; - public DecodeUnit(int type, List bufferList, int dataLength, int flags) + public DecodeUnit(int type, List bufferList, int dataLength, int flags, int frameNumber) { this.type = type; this.bufferList = bufferList; this.dataLength = dataLength; this.flags = flags; + this.frameNumber = frameNumber; } public int getType() @@ -39,4 +41,9 @@ public class DecodeUnit { { return dataLength; } + + public int getFrameNumber() + { + return frameNumber; + } } diff --git a/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java b/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java index 4215ae6b..7a8a867a 100644 --- a/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java +++ b/moonlight-common/src/com/limelight/nvstream/av/video/VideoDepacketizer.java @@ -10,13 +10,16 @@ import com.limelight.nvstream.av.ConnectionStatusListener; public class VideoDepacketizer { - // Current NAL state - private LinkedList avcNalDataChain = null; - private int avcNalDataLength = 0; + // Current frame state + private LinkedList avcFrameDataChain = null; + private int avcFrameDataLength = 0; private int currentlyDecoding = DecodeUnit.TYPE_UNKNOWN; // Sequencing state - private short lastSequenceNumber; + private int nextFrameNumber = 1; + private int nextPacketNumber; + private int startFrameNumber = 1; + private boolean waitingForFrameStart; // Cached objects private ByteBufferDescriptor cachedDesc = new ByteBufferDescriptor(null, 0, 0); @@ -33,31 +36,31 @@ public class VideoDepacketizer { this.controlListener = controlListener; } - private void clearAvcNalState() + private void clearAvcFrameState() { - avcNalDataChain = null; - avcNalDataLength = 0; + avcFrameDataChain = null; + avcFrameDataLength = 0; } - private void reassembleAvcNal() + private void reassembleAvcFrame(int frameNumber) { - // This is the start of a new NAL - if (avcNalDataChain != null && avcNalDataLength != 0) { + // This is the start of a new frame + if (avcFrameDataChain != null && avcFrameDataLength != 0) { // Construct the H264 decode unit - DecodeUnit du = new DecodeUnit(DecodeUnit.TYPE_H264, avcNalDataChain, avcNalDataLength, 0); + DecodeUnit du = new DecodeUnit(DecodeUnit.TYPE_H264, avcFrameDataChain, avcFrameDataLength, 0, frameNumber); if (directSubmitDr != null) { // Submit directly to the decoder directSubmitDr.submitDecodeUnit(du); } else if (!decodedUnits.offer(du)) { - // We need a new IDR frame since we're discarding data now System.out.println("Video decoder is too slow! Forced to drop decode units"); + // Invalidate all frames from the start of the DU queue to this frame number + controlListener.connectionSinkTooSlow(decodedUnits.remove().getFrameNumber(), frameNumber); decodedUnits.clear(); - controlListener.connectionSinkTooSlow(); } // Clear old state - clearAvcNalState(); + clearAvcFrameState(); } } @@ -80,11 +83,11 @@ public class VideoDepacketizer { if (NAL.isAvcFrameStart(cachedDesc)) { // Reassemble any pending AVC NAL - reassembleAvcNal(); + reassembleAvcFrame(packet.getFrameIndex()); // Setup state for the new NAL - avcNalDataChain = new LinkedList(); - avcNalDataLength = 0; + avcFrameDataChain = new LinkedList(); + avcFrameDataLength = 0; } // Skip the start sequence @@ -97,7 +100,7 @@ public class VideoDepacketizer { if (currentlyDecoding == DecodeUnit.TYPE_H264 && NAL.isPadding(cachedDesc)) { // The decode unit is complete - reassembleAvcNal(); + reassembleAvcFrame(packet.getFrameIndex()); } // Not decoding AVC @@ -133,13 +136,13 @@ public class VideoDepacketizer { location.length--; } - if (currentlyDecoding == DecodeUnit.TYPE_H264 && avcNalDataChain != null) + if (currentlyDecoding == DecodeUnit.TYPE_H264 && avcFrameDataChain != null) { ByteBufferDescriptor data = new ByteBufferDescriptor(location.data, start, location.offset-start); // Add a buffer descriptor describing the NAL data in this packet - avcNalDataChain.add(data); - avcNalDataLength += location.offset-start; + avcFrameDataChain.add(data); + avcFrameDataLength += location.offset-start; } } } @@ -148,15 +151,13 @@ public class VideoDepacketizer { { if (firstPacket) { // Setup state for the new frame - avcNalDataChain = new LinkedList(); - avcNalDataLength = 0; + avcFrameDataChain = new LinkedList(); + avcFrameDataLength = 0; } // Add the payload data to the chain - if (avcNalDataChain != null) { - avcNalDataChain.add(location); - avcNalDataLength += location.length; - } + avcFrameDataChain.add(location); + avcFrameDataLength += location.length; } public void addInputData(VideoPacket packet) @@ -164,29 +165,87 @@ public class VideoDepacketizer { ByteBufferDescriptor location = packet.getNewPayloadDescriptor(); // Runt packets get decoded using the slow path + // These packets stand alone so there's no need to verify + // sequencing before submitting if (location.length < 968) { addInputDataSlow(packet, location); return; } + int frameIndex = packet.getFrameIndex(); int packetIndex = packet.getPacketIndex(); int packetsInFrame = packet.getTotalPackets(); + // We can use FEC to correct single packet errors + // on single packet frames because we just get a + // duplicate of the original packet + if (packetsInFrame == 1 && packetIndex == 1 && + nextPacketNumber == 0 && frameIndex == nextFrameNumber) { + System.out.println("Using FEC for error correction"); + nextPacketNumber = 1; + } // Discard FEC data early - if (packetIndex >= packetsInFrame) { + else if (packetIndex >= packetsInFrame) { return; } + // Check that this is the next frame + boolean firstPacket = (packet.getFlags() & VideoPacket.FLAG_SOF) != 0; + if (firstPacket && waitingForFrameStart) { + // This is the next frame after a loss event + controlListener.connectionDetectedFrameLoss(startFrameNumber, frameIndex - 1); + startFrameNumber = nextFrameNumber = frameIndex; + nextPacketNumber = 0; + waitingForFrameStart = false; + clearAvcFrameState(); + } + else if (frameIndex > nextFrameNumber) { + // Nope, but we can still work with it if it's + // the start of the next frame + if (firstPacket) { + System.out.println("Got start of frame "+frameIndex+ + " when expecting packet "+nextPacketNumber+ + " of frame "+nextFrameNumber); + controlListener.connectionDetectedFrameLoss(startFrameNumber, frameIndex - 1); + startFrameNumber = nextFrameNumber = frameIndex; + nextPacketNumber = 0; + clearAvcFrameState(); + } + else { + System.out.println("Got packet "+packetIndex+" of frame "+frameIndex+ + " when expecting packet "+nextPacketNumber+ + " of frame "+nextFrameNumber); + // We dropped the start of this frame too, so pick up on the next frame + waitingForFrameStart = true; + return; + } + } + else if (frameIndex < nextFrameNumber) { + System.out.println("Frame "+frameIndex+" is behind our current frame number "+nextFrameNumber); + // Discard the frame silently if it's behind our current sequence number + return; + } + + // We know it's the right frame, now check the packet number + if (packetIndex != nextPacketNumber) { + System.out.println("Frame "+frameIndex+": expected packet "+nextPacketNumber+" but got "+packetIndex); + // At this point, we're guaranteed that it's not FEC data that we lost + waitingForFrameStart = true; + return; + } + + nextPacketNumber++; + // Remove extra padding location.length = packet.getPayloadLength(); - - boolean firstPacket = (packet.getFlags() & VideoPacket.FLAG_SOF) != 0; + if (firstPacket) { if (NAL.getSpecialSequenceDescriptor(location, cachedDesc) && NAL.isAvcFrameStart(cachedDesc) && cachedDesc.data[cachedDesc.offset+cachedDesc.length] == 0x67) { // SPS and PPS prefix is padded between NALs, so we must decode it with the slow path + clearAvcFrameState(); addInputDataSlow(packet, location); return; } @@ -194,32 +253,21 @@ public class VideoDepacketizer { addInputDataFast(packet, location, firstPacket); + // We can't use the EOF flag here because real frames can be split across + // multiple "frames" when packetized to fit under the bandwidth ceiling + if (packetIndex + 1 >= packetsInFrame) { + nextFrameNumber++; + nextPacketNumber = 0; + } + if ((packet.getFlags() & VideoPacket.FLAG_EOF) != 0) { - reassembleAvcNal(); + reassembleAvcFrame(packet.getFrameIndex()); + startFrameNumber = nextFrameNumber; } } public void addInputData(RtpPacket packet) { - short seq = packet.getSequenceNumber(); - - // Toss out the current NAL if we receive a packet that is - // out of sequence - if (lastSequenceNumber != 0 && - (short)(lastSequenceNumber + 1) != seq) - { - System.out.println("Received OOS video data (expected "+(lastSequenceNumber + 1)+", got "+seq+")"); - - // Reset the depacketizer state - clearAvcNalState(); - - // Request an IDR frame - controlListener.connectionDetectedPacketLoss(); - } - - lastSequenceNumber = seq; - - // Pass the payload to the non-sequencing parser ByteBufferDescriptor rtpPayload = packet.getNewPayloadDescriptor(); addInputData(new VideoPacket(rtpPayload)); } diff --git a/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java b/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java index ea473f36..259ef2a2 100644 --- a/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java +++ b/moonlight-common/src/com/limelight/nvstream/control/ControlStream.java @@ -8,6 +8,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.LinkedBlockingQueue; import com.limelight.nvstream.NvConnectionListener; import com.limelight.nvstream.StreamConfiguration; @@ -60,7 +61,7 @@ public class ControlStream implements ConnectionStatusListener { private Thread heartbeatThread; private Thread jitterThread; private Thread resyncThread; - private Object resyncNeeded = new Object(); + private LinkedBlockingQueue invalidReferenceFrameTuples = new LinkedBlockingQueue(); private boolean aborting = false; public ControlStream(InetAddress host, NvConnectionListener listener, StreamConfiguration streamConfig) @@ -140,12 +141,6 @@ public class ControlStream implements ConnectionStatusListener { } } - public void requestResync() throws IOException - { - System.out.println("CTL: Requesting IDR frame"); - sendResync(); - } - public void start() throws IOException { // Use a finite timeout during the handshake process @@ -188,18 +183,36 @@ public class ControlStream implements ConnectionStatusListener { public void run() { while (!isInterrupted()) { + int[] tuple; + + // Wait for a tuple try { - // Wait for notification of a resync needed - synchronized (resyncNeeded) { - resyncNeeded.wait(); - } + tuple = invalidReferenceFrameTuples.take(); } catch (InterruptedException e) { listener.connectionTerminated(e); return; } + // Aggregate all lost frames into one range + int[] lastTuple = null; + for (;;) { + int[] nextTuple = lastTuple = invalidReferenceFrameTuples.poll(); + if (nextTuple == null) { + break; + } + + lastTuple = nextTuple; + } + + // Update the end of the range to the latest tuple + if (lastTuple != null) { + tuple[1] = lastTuple[1]; + } + try { - requestResync(); + System.err.println("Invalidating reference frames from "+tuple[0]+" to "+tuple[1]); + ControlStream.this.sendResync(tuple[0], tuple[1]); + System.err.println("Frames invalidated"); } catch (IOException e) { listener.connectionTerminated(e); return; @@ -243,12 +256,14 @@ public class ControlStream implements ConnectionStatusListener { return sendAndGetReply(new NvCtlPacket(PTYPE_1405, PPAYLEN_1405)); } - private void sendResync() throws IOException + private void sendResync(int firstLostFrame, int lastLostFrame) throws IOException { ByteBuffer conf = ByteBuffer.wrap(new byte[PPAYLEN_RESYNC]).order(ByteOrder.LITTLE_ENDIAN); conf.putLong(0); conf.putLong(0xFFFFF); + //conf.putLong(firstLostFrame); + //conf.putLong(lastLostFrame); sendAndGetReply(new NvCtlPacket(PTYPE_RESYNC, PPAYLEN_RESYNC, conf.array())); } @@ -413,14 +428,11 @@ public class ControlStream implements ConnectionStatusListener { abort(); } - private void resyncConnection() { - synchronized (resyncNeeded) { - // Wake up the resync thread - resyncNeeded.notify(); - } + private void resyncConnection(int firstLostFrame, int lastLostFrame) { + invalidReferenceFrameTuples.add(new int[]{firstLostFrame, lastLostFrame}); } - public void connectionDetectedPacketLoss() { + public void connectionDetectedFrameLoss(int firstLostFrame, int lastLostFrame) { if (System.currentTimeMillis() > LOSS_PERIOD_MS + lossTimestamp) { lossCount++; lossTimestamp = System.currentTimeMillis(); @@ -433,15 +445,15 @@ public class ControlStream implements ConnectionStatusListener { } } - resyncConnection(); + resyncConnection(firstLostFrame, lastLostFrame); } - public void connectionSinkTooSlow() { + public void connectionSinkTooSlow(int firstLostFrame, int lastLostFrame) { if (++slowSinkCount == MAX_SLOW_SINK_COUNT) { listener.displayTransientMessage("Your device is processing the A/V data too slowly. Try lowering stream settings."); slowSinkCount = -MAX_SLOW_SINK_COUNT * MESSAGE_DELAY_FACTOR; } - resyncConnection(); + resyncConnection(firstLostFrame, lastLostFrame); } }