diff --git a/codecs/opus/src/main/java/org/restcomm/media/codec/opus/Decoder.java b/codecs/opus/src/main/java/org/restcomm/media/codec/opus/Decoder.java index 593c6e233..5ffc62aa0 100644 --- a/codecs/opus/src/main/java/org/restcomm/media/codec/opus/Decoder.java +++ b/codecs/opus/src/main/java/org/restcomm/media/codec/opus/Decoder.java @@ -52,7 +52,7 @@ public class Decoder implements Codec { private final int OPUS_SAMPLE_RATE = 8000; private final int MAX_FRAME_SIZE = 160; - private final int SAMPLE_LENGTH = 1000000 / OPUS_SAMPLE_RATE; // 1s / 8Khz ~ 125000ns / sample + private final int SAMPLE_LENGTH = 1000 / OPUS_SAMPLE_RATE; // 1s / 8Khz ~ 125000ns / sample private short[] decodedBuff = new short[MAX_FRAME_SIZE]; diff --git a/component/src/main/java/org/mobicents/media/server/impl/AbstractSource.java b/component/src/main/java/org/mobicents/media/server/impl/AbstractSource.java index e67ddcd6a..f0ff44c0d 100644 --- a/component/src/main/java/org/mobicents/media/server/impl/AbstractSource.java +++ b/component/src/main/java/org/mobicents/media/server/impl/AbstractSource.java @@ -390,14 +390,15 @@ public long perform() { readCount++; frame = evolve(timestamp); if (frame == null) { - if(readCount==1) - { - //stop if frame was not generated - isSynchronized = false; - return 0; - } - else +// if(readCount==1) +// { +// //stop if frame was not generated +// isSynchronized = false; +// return 0; +// } +// else { + timestamp += 20000000; //frame was generated so continue scheduler.submit(this,queueNumber); return 0; diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/BufferListener.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/BufferListener.java deleted file mode 100644 index 8ad29df0c..000000000 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/BufferListener.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2011, Red Hat, Inc. and individual contributors - * by the @authors tag. See the copyright.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.mobicents.media.server.impl.rtp; - -/** - * - * @author kulikov - */ -public interface BufferListener { - public void onFill(); -} diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java index 25fd3af6e..0fa2f9a53 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.nio.file.Path; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -32,7 +33,6 @@ import org.apache.logging.log4j.Logger; import org.mobicents.media.server.io.sdp.format.RTPFormat; -import org.mobicents.media.server.io.sdp.format.RTPFormats; import org.mobicents.media.server.scheduler.PriorityQueueScheduler; import org.mobicents.media.server.spi.memory.Frame; @@ -60,11 +60,21 @@ public class JitterBuffer implements Serializable { private final ReentrantLock LOCK = new ReentrantLock(); - private final double JC_BETA = .01d; - private final double JC_GAMMA = .01d; + private final int BUFFER_SIZE_MAX = 6; + private final int BUFFER_SIZE_NOR = 3; + private final int BUFFER_SIZE_MIN = 1; + + private final double SPEED_FAST = 0.3; + private final double SPEED_NOR = 0.8; + private final double SPEED_SLOW = 1.5; + private final int NUM_FRAME_TIME_HISTORY = 60; + + private int avgFrameRate; + private int lastFrameRate; + private LinkedList decodedFrameTime = new LinkedList<>(); //The underlying buffer size - private static final int QUEUE_SIZE = 10; + private static final int QUEUE_SIZE = 20; //the underlying buffer private ArrayList queue = new ArrayList(QUEUE_SIZE); @@ -73,43 +83,11 @@ public class JitterBuffer implements Serializable { //first received sequence number private long isn = -1; - //allowed jitter - private long jitterBufferSize; - - //packet arrival dead line measured on RTP clock. - //initial value equals to infinity - private long arrivalDeadLine = 0; - //packet arrival dead line measured on RTP clock. //initial value equals to infinity - private long droppedInRaw = 0; - - //The number of dropped packets - private int dropCount; - - //known duration of media wich contains in this buffer. - private volatile long duration; + private long arrivalDeadLine = -1; - //buffer's monitor - private BufferListener listener; - private volatile boolean ready; - - /** - * used to calculate network jitter. - * currentTransit measures the relative time it takes for an RTP packet - * to arrive from the remote server to MMS - */ - private long currentTransit = 0; - - /** - * continuously updated value of network jitter - */ - private long currentJitter = 0; - - //transmission formats - private RTPFormats rtpFormats = new RTPFormats(); - //currently used format private RTPFormat format; @@ -117,11 +95,6 @@ public class JitterBuffer implements Serializable { private final static Logger logger = org.apache.logging.log4j.LogManager.getLogger(JitterBuffer.class); - private long clockOffset = 0; - private int adaptJittCompTimestamp = 0; - private long jittCompTimestamp = 0; - private double jitter = 0d; - private PriorityQueueScheduler scheduler; private static AtomicLong recordingIndex = new AtomicLong(); @@ -131,6 +104,8 @@ public class JitterBuffer implements Serializable { private Path dumpDir; private List dumpConfig; + private long syncSource = -1; + /** * Creates new instance of jitter. * @@ -138,8 +113,10 @@ public class JitterBuffer implements Serializable { */ public JitterBuffer(RtpClock clock, int jitterBufferSize, PriorityQueueScheduler scheduler, Path dumpDir) { this.rtpClock = clock; - this.jitterBufferSize = jitterBufferSize; this.scheduler = scheduler; + this.lastFrameRate = 50; + this.avgFrameRate = 50; + if (dumpDir != null) { this.dumpDir = dumpDir; this.dumpConfig = JitterBufferRTPDump.getDumpConfig(dumpDir); @@ -149,110 +126,11 @@ public JitterBuffer(RtpClock clock, int jitterBufferSize, PriorityQueueScheduler } } - private void initJitter(RtpPacket firstPacket) { - long arrival = rtpClock.getLocalRtpTime(); - long firstPacketTimestamp = firstPacket.getTimestamp(); - currentTransit = arrival - firstPacketTimestamp; - currentJitter = 0; - clockOffset = currentTransit; - } - - /** - * Calculates the current network jitter, which is an estimate of the - * statistical variance of the RTP data packet interarrival time: - * http://tools.ietf.org/html/rfc3550#appendix-A.8 - */ - private void estimateJitter(RtpPacket newPacket) { - long arrival = rtpClock.getLocalRtpTime(); - long newPacketTimestamp = newPacket.getTimestamp(); - long transit = arrival - newPacketTimestamp; - long d = transit - currentTransit; - if (d < 0) { - d = -d; - } - - currentTransit = transit; - currentJitter += d - ((currentJitter + 8) >> 4); - - long diff = newPacketTimestamp - arrival; - double slide = (double)clockOffset*(1-JC_BETA) + (diff*JC_BETA); - double gap = diff - slide; - - gap = gap < 0 ? -gap : 0; - jitter = jitter*(1-JC_GAMMA) + (gap*JC_GAMMA); - - if (newPacket.getSeqNumber()%50 == 0) { - adaptJittCompTimestamp = Math.max((int)jittCompTimestamp, (int)(2*jitter)); - } - - clockOffset = (long)slide; - } - - /** - * - * @return the current value of the network RTP jitter. The value is in normalized form as specified in RFC 3550 - * http://tools.ietf.org/html/rfc3550#appendix-A.8 - */ - public long getEstimatedJitter() { - long jitterEstimate = currentJitter >> 4; - // logger.info(String.format("Jitter estimated at %d. Current transit time is %d.", jitterEstimate, currentTransit)); - return jitterEstimate; - } - - public void setFormats(RTPFormats rtpFormats) { - this.rtpFormats = rtpFormats; - } - - /** - * Gets the interarrival jitter. - * - * @return the current jitter value. - */ - public double getJitter() { - return 0; - } - - /** - * Gets the maximum interarrival jitter. - * - * @return the jitter value. - */ - public double getMaxJitter() { - return 0; - } - - /** - * Get the number of dropped packets. - * - * @return the number of dropped packets. - */ - public int getDropped() { - return dropCount; - } - - public boolean bufferInUse() - { - return this.useBuffer; - } - public void setBufferInUse(boolean useBuffer) { this.useBuffer=useBuffer; } - /** - * Assigns listener for this buffer. - * - * @param listener the listener object. - */ - public void setListener(BufferListener listener) { - this.listener = listener; - } - - private long compensatedTimestamp(long userTimestamp) { - return userTimestamp+clockOffset-adaptJittCompTimestamp; - } - /** * Accepts specified packet * @@ -267,8 +145,6 @@ public void write(RtpPacket packet, RTPFormat format) { return; } - - if (this.format == null || this.format.getID() != format.getID()) { logger.info( "Format has been changed: " + @@ -284,43 +160,17 @@ public void write(RtpPacket packet, RTPFormat format) { // update clock rate rtpClock.setClockRate(this.format.getClockRate()); - jittCompTimestamp = rtpClock.convertToRtpTime(60); } // if this is first packet then synchronize clock if (isn == -1) { rtpClock.synchronize(packet.getTimestamp()); isn = packet.getSeqNumber(); - initJitter(packet); - } else { - estimateJitter(packet); - } - - // drop outstanding packets - // packet is outstanding if its timestamp of arrived packet is less - // then consumer media time - if (packet.getTimestamp() < this.arrivalDeadLine) { - logger.warn( - "drop packet: dead line=" + arrivalDeadLine + - ", packet time=" + packet.getTimestamp() + - ", seq=" + packet.getSeqNumber() + - ", payload length=" + packet.getPayloadLength() + - ", format=" + this.format.toString() + - ", csrc: " + packet.getContributingSource() - ); - dropCount++; - - // checking if not dropping too much - droppedInRaw++; - if (droppedInRaw == QUEUE_SIZE / 2 || queue.size() == 0) { - arrivalDeadLine = 0; - } else { - return; - } + syncSource = packet.getSyncSource(); } Frame f = packet.toFrame(rtpClock, this.format); -f.setDuration(rtpClock.convertToAbsoluteTime(f.getLength())); + f.setDuration(rtpClock.convertToAbsoluteTime(f.getLength())); // dump the packet to capture if enabled so if (this.dumpConfig != null) { @@ -328,8 +178,6 @@ public void write(RtpPacket packet, RTPFormat format) { if (dump != null) dump.dump(packet, queue.size()); } - droppedInRaw = 0; - // find correct position to insert a packet // use timestamp since its always positive int currIndex = queue.size() - 1; @@ -344,63 +192,57 @@ public void write(RtpPacket packet, RTPFormat format) { ", seq=" + packet.getSeqNumber() + ", payload length=" + packet.getPayloadLength() + ", format=" + this.format.toString() + - ", csrc: " + packet.getContributingSource() + ", ssrc: " + packet.getSyncSource() ); return; } - queue.add(currIndex + 1, f); - - // recalculate duration of each frame in queue and overall duration - // since we could insert the frame in the middle of the queue - duration = 0; - if (queue.size() > 1) { - duration = queue.get(queue.size() - 1).getTimestamp() - queue.get(0).getTimestamp(); - } - -// for (int i = 0; i < queue.size() - 1; i++) { -// // duration measured by wall clock -// long d = queue.get(i + 1).getTimestamp() - queue.get(i).getTimestamp(); -// // in case of RFC2833 event timestamp remains same -// queue.get(i).setDuration(d > 0 ? d : 0); -// } - - // if overall duration is negative we have some mess here,try to - // reset - if (duration < 0 && queue.size() > 1) { - logger.warn("Something messy happened. Reseting jitter buffer!"); - reset(); - return; + if (currIndex == -1 && arrivalDeadLine != -1) { + // drop outstanding packets + // packet is outstanding if its timestamp of arrived packet is less + // then consumer media time + long arrivalDiff = packet.getTimestamp() - this.arrivalDeadLine; + int maxDiff = packet.getPayloadLength() * 50; //1 second + if (arrivalDiff < 0) { + if (Math.abs(arrivalDiff) > maxDiff) { + currIndex = queue.size() - 1; + } else { + logger.warn( + "drop packet: dead line=" + arrivalDeadLine + + ", packet time=" + packet.getTimestamp() + + ", seq=" + packet.getSeqNumber() + + ", payload length=" + packet.getPayloadLength() + + ", format=" + this.format.toString() + + ", ssrc: " + packet.getSyncSource() + + ", arrivalDiff: " + arrivalDiff + + ", maxDiff: " + maxDiff + ); + + return; + } + } } - // overflow? - // only now remove packet if overflow , possibly the same packet we just received - if (queue.size() > QUEUE_SIZE) { - logger.warn("Buffer overflow!" + - " queue: " + queue.size() + - ", localPeer: " + (packet.getLocalPeer() != null ? packet.getLocalPeer().toString() : "null") + - ", remotePeer: " + (packet.getRemotePeer() != null ? packet.getRemotePeer().toString() : "null") + - ", seq: " + packet.getSeqNumber() + - ", timestamp: " + packet.getTimestamp() + - ", csrc: " + packet.getContributingSource() + if (syncSource != packet.getSyncSource()) { + logger.warn("New SyncSource: " + packet.getSyncSource() + + ", old SyncSource: " + syncSource + + ", arrivalDeadline: " + arrivalDeadLine + + ", timestamp: " + packet.getTimestamp() ); - dropCount++; - queue.remove(0); + syncSource = packet.getSyncSource(); } - // check if this buffer already full - if (!ready) { - ready = !useBuffer || (queue.size() > 1); - if (ready && listener != null) { - listener.onFill(); - } - } + queue.add(currIndex + 1, f); } finally { LOCK.unlock(); } } + public int getBufferSize() { + return queue.size(); + } + /** * Polls packet from buffer's head. * @@ -410,49 +252,110 @@ public void write(RtpPacket packet, RTPFormat format) { public Frame read(long timestamp) { try { LOCK.lock(); - if (queue.size() == 0) { - this.ready = false; - return null; - } + if (!useBuffer) { + if (queue.isEmpty()) { + arrivalDeadLine = -1; - Frame frame = null; - long rtpTime; + return null; + } else { + Frame frame = queue.remove(0); - long comp = compensatedTimestamp(rtpClock.getLocalRtpTime()); + arrivalDeadLine = rtpClock.convertToRtpTime(frame.getTimestamp() + frame.getDuration()); - while (queue.size() != 0) { - frame = queue.remove(0); - rtpTime = rtpClock.convertToRtpTime(frame.getTimestamp()); + //convert duration to nanoseconds + frame.setDuration(frame.getDuration() * 1000000L); + frame.setTimestamp(frame.getTimestamp() * 1000000L); - if (comp <= rtpTime) { - break; + return frame; } - } - if (this.dumpConfig != null) { - JitterBufferRTPDump dump = rtpDump.get(); - if (dump != null) { - long seq = frame != null ? frame.getSequenceNumber() : -1; - dump.suppliedDump(seq, queue.size()); + } else { + + int size = queue.size(); + + long currentTime = timestamp / 1000000 + 20; + long currentTimeDiff = 20; + + if (!decodedFrameTime.isEmpty()) { + currentTimeDiff = currentTime - decodedFrameTime.peekFirst(); } - } - if (frame == null) { - return null; - } + if (size < BUFFER_SIZE_MIN) { +// System.out.println("SKIP MIN"); + return null; + } - //buffer empty now? - change ready flag. - if (queue.size() == 0) { - this.ready = false; - } + else if (size < BUFFER_SIZE_NOR) { + if (currentTimeDiff < (1000 * SPEED_SLOW / avgFrameRate)) { +// System.out.println("SKIP NOR: " + currentTimeDiff + " " + (1000 * SPEED_SLOW / avgFrameRate)); + return null; + } + + } else if (size < BUFFER_SIZE_MAX) { + if (currentTimeDiff < (1000 * SPEED_NOR / avgFrameRate) && + currentTimeDiff < (1000 * SPEED_NOR / lastFrameRate)) { +// System.out.println("SKIP < MAX: " + currentTimeDiff + " " + (1000 * SPEED_NOR / avgFrameRate) + " " + (1000 * SPEED_NOR / lastFrameRate)); + return null; + } + } else { + if (currentTimeDiff < (1000 * SPEED_FAST / avgFrameRate) && + currentTimeDiff < (1000 * SPEED_FAST / lastFrameRate)) { +// System.out.println("SKIP >= MAX: " + currentTimeDiff + " " + (1000 * SPEED_FAST / avgFrameRate) + " " + (1000 * SPEED_FAST / lastFrameRate)); + return null; + } + + if (avgFrameRate > 49 && (currentTime % 200) == 0) { + queue.remove(0); + } + + } - arrivalDeadLine = rtpClock.convertToRtpTime(frame.getTimestamp() + frame.getDuration()); + Frame frame = queue.remove(0); - //convert duration to nanoseconds - frame.setDuration(frame.getDuration() * 1000000L); - frame.setTimestamp(frame.getTimestamp() * 1000000L); + if (this.dumpConfig != null) { + JitterBufferRTPDump dump = rtpDump.get(); + if (dump != null) { + long seq = frame != null ? frame.getSequenceNumber() : -1; + dump.suppliedDump(seq, queue.size()); + } + } + + //convert duration to nanoseconds + frame.setDuration(frame.getDuration() * 1000000L); + frame.setTimestamp(frame.getTimestamp() * 1000000L); + + lastFrameRate = (int) (1000.0 / currentTimeDiff); + decodedFrameTime.push(currentTime); + long frameRateDiff = (currentTime - decodedFrameTime.peekLast()); + + int dftSize = decodedFrameTime.size()-1 ; + if (dftSize == 0) dftSize = 1; + + if (frameRateDiff == 0) avgFrameRate = 50; + else avgFrameRate = (int) (dftSize * 1000 / frameRateDiff); + + // AVG framerate or last framerate is 0, this would result in a division by zero and the jitter would get stuck + // This case can only happen after long times of inactivity, so we reset the values + if (avgFrameRate == 0 || lastFrameRate == 0) { + avgFrameRate = 50; + lastFrameRate = 50; + decodedFrameTime.clear(); + queue.clear(); + } - return frame; + if (queue.isEmpty()) { + arrivalDeadLine = -1; + } else { + //set arrival deadline for the next frame (in rtp time + arrivalDeadLine = rtpClock.convertToRtpTime(frame.getTimestamp() + frame.getDuration()); + } + + if (decodedFrameTime.size() >= NUM_FRAME_TIME_HISTORY) { + decodedFrameTime.removeLast(); + } + + return frame; + } } finally { LOCK.unlock(); } @@ -485,18 +388,15 @@ private void restartRecording() { public void restart() { reset(); - this.ready=false; - arrivalDeadLine = 0; - dropCount=0; - droppedInRaw=0; + arrivalDeadLine = -1; format=null; isn=-1; + syncSource=-1; + + lastFrameRate = 50; + avgFrameRate = 50; - // - clockOffset = 0; - adaptJittCompTimestamp = 0; - jittCompTimestamp = 0; - jitter = 0d; + decodedFrameTime.clear(); restartRecording(); } diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RTPInput.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RTPInput.java index cc11ab34e..d84edde8a 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RTPInput.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RTPInput.java @@ -40,7 +40,7 @@ * * The Media source of RTP data. */ -public class RTPInput extends AbstractSource implements BufferListener { +public class RTPInput extends AbstractSource { private static final long serialVersionUID = -737259897530641186L; @@ -53,10 +53,7 @@ public class RTPInput extends AbstractSource implements BufferListener { //digital signaling processor private Processor dsp; - - protected Integer preEvolveCount=0; - protected Integer evolveCount=0; - + private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(RTPInput.class); private AudioInput input; @@ -100,10 +97,6 @@ public Processor getDsp() { return this.dsp; } - protected int getPacketsLost() { - return 0; - } - @Override public Frame evolve(long timestamp) { Frame currFrame=rxBuffer.read(timestamp); @@ -127,14 +120,5 @@ public Frame evolve(long timestamp) { return currFrame; } - - /** - * RX buffer's call back method. - * - * This method is called when rxBuffer is full and it is time to start - * transmission to the consumer. - */ - public void onFill() { - this.wakeup(); - } + } \ No newline at end of file diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java index f3be05b5d..98106f44c 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java @@ -77,7 +77,6 @@ public RtpHandler(PriorityQueueScheduler scheduler, RtpClock clock, RtpClock oob this.jitterBuffer = new JitterBuffer(this.rtpClock, this.jitterBufferSize, scheduler, dumpDir); this.rtpInput = new RTPInput(scheduler, jitterBuffer); - this.jitterBuffer.setListener(this.rtpInput); this.dtmfInput = new DtmfInput(scheduler, oobClock); this.rtpFormats = new RTPFormats(); @@ -103,11 +102,7 @@ public RTPInput getRtpInput() { public DtmfInput getDtmfInput() { return dtmfInput; } - - public boolean isLoopable() { - return loopable; - } - + public void setLoopable(boolean loopable) { this.loopable = loopable; } @@ -132,7 +127,6 @@ public void useJitterBuffer(boolean useBuffer) { */ public void setFormatMap(final RTPFormats rtpFormats) { this.rtpFormats = rtpFormats; - this.jitterBuffer.setFormats(rtpFormats); } public RTPFormats getFormatMap() { diff --git a/io/rtp/src/test/java/org/mobicents/media/server/impl/rtp/JitterBufferTest.java b/io/rtp/src/test/java/org/mobicents/media/server/impl/rtp/JitterBufferTest.java index cfe4a49c0..85c25cfc7 100644 --- a/io/rtp/src/test/java/org/mobicents/media/server/impl/rtp/JitterBufferTest.java +++ b/io/rtp/src/test/java/org/mobicents/media/server/impl/rtp/JitterBufferTest.java @@ -23,10 +23,10 @@ package org.mobicents.media.server.impl.rtp; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.LinkedList; import java.util.Random; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import java.util.HashMap; import org.junit.After; import org.junit.AfterClass; @@ -37,6 +37,8 @@ import org.mobicents.media.server.scheduler.PriorityQueueScheduler; import org.mobicents.media.server.spi.memory.Frame; +import static org.junit.Assert.*; + /** * * kulikov @@ -64,7 +66,6 @@ public static void tearDownClass() throws Exception { @Before public void setUp() { - jitterBuffer.setFormats(AVProfile.audio); rtpClock.setClockRate(8000); jitterBuffer.reset(); } @@ -75,208 +76,211 @@ public void tearDown() { @Test - public void testNormalReadWrite() throws Exception { + public void testNoPacketsAfter3Packets() throws Exception { RtpPacket[] stream = createStream(100); Frame[] media = new Frame[stream.length]; for (int i = 0; i < stream.length; i++) { - wallClock.tick(20000000L); - jitterBuffer.write(stream[i],AVProfile.audio.find(8)); + if (i == 3) { + // Wait for 3020ms while reading and not writing any packets. + for (int j = 0; j < 50 * 3; j++) { + wallClock.tick(20000000L); + media[i] = jitterBuffer.read(wallClock.getTime()); + } + wallClock.tick(20000000L); + } else { + wallClock.tick(20000000L); + } + jitterBuffer.write(stream[i], AVProfile.audio.find(8)); media[i] = jitterBuffer.read(wallClock.getTime()); } - this.checkSequence(media); - assertEquals(0, 0); + + for (int i = 0; i < media.length; i++) { + Frame f = media[i]; + if (i == 0 || i == 1 || i == 4 || i == 5) { + assertNull("Frames should be missing", f); + } else { + assertNotNull("Frames should be present", f); + } + } } @Test - public void testInnerSort() throws Exception { - // todo fix - RtpPacket p1 = RtpPacket.outgoing(local,remote,false, 8, 1, 160 * 1, 123, new byte[160], 0, 160); - RtpPacket p2 = RtpPacket.outgoing(local,remote,false, 8, 2, 160 * 2, 123, new byte[160], 0, 160); - RtpPacket p3 = RtpPacket.outgoing(local,remote,false, 8, 3, 160 * 3, 123, new byte[160], 0, 160); - RtpPacket p4 = RtpPacket.outgoing(local,remote,false, 8, 4, 160 * 4, 123, new byte[160], 0, 160); - RtpPacket p5 = RtpPacket.outgoing(local,remote,false, 8, 5, 160 * 5, 123, new byte[160], 0, 160); - - jitterBuffer.write(p1,AVProfile.audio.find(8)); - jitterBuffer.write(p2,AVProfile.audio.find(8)); - jitterBuffer.write(p4,AVProfile.audio.find(8)); - jitterBuffer.write(p3,AVProfile.audio.find(8)); - - Frame buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(1, buffer.getSequenceNumber()); - - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(2, buffer.getSequenceNumber()); + public void failingOutOforder() throws Exception { + RtpPacket[] stream = createStream(100); + HashMap> packets = reorderWithDelay(10, 10, stream); - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(3, buffer.getSequenceNumber()); + Frame[] media = new Frame[stream.length]; + int[] bufferSize = new int[stream.length]; + for (int i = 0; i < stream.length; i++) { + if (packets.containsKey(i)) { + for (RtpPacket rtpPacket : packets.get(i)) { + System.out.println("Packet: " + rtpPacket.getSeqNumber()); + jitterBuffer.write(rtpPacket, AVProfile.audio.find(8)); + } + } - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(4, buffer.getSequenceNumber()); + wallClock.tick(20000000L); + media[i] = jitterBuffer.read(wallClock.getTime()); + bufferSize[i] = jitterBuffer.getBufferSize(); + } +// this.checkMaxBufferSize(bufferSize, 9); + this.checkSequence(media); + assertEquals(0, 0); } @Test - public void testOutstanding() throws Exception { - RtpPacket p1 = RtpPacket.outgoing(local,remote,false, 8, 1, 160 * 1, 123, new byte[160], 0, 160); - RtpPacket p2 = RtpPacket.outgoing(local,remote,false, 8, 2, 160 * 2, 123, new byte[160], 0, 160); - RtpPacket p3 = RtpPacket.outgoing(local,remote,false, 8, 3, 160 * 3, 123, new byte[160], 0, 160); - RtpPacket p4 = RtpPacket.outgoing(local,remote,false, 8, 4, 160 * 4, 123, new byte[160], 0, 160); - RtpPacket p5 =RtpPacket.outgoing(local,remote,false, 8, 5, 160 * 5, 123, new byte[160], 0, 160); - - jitterBuffer.write(p1,AVProfile.audio.find(8)); - jitterBuffer.write(p3,AVProfile.audio.find(8)); - jitterBuffer.write(p5,AVProfile.audio.find(8)); - - assertEquals(0, jitterBuffer.getDropped()); - - //60ms + 40ms - wallClock.tick(100000000L); - - Frame buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(1, buffer.getSequenceNumber()); - - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(3, buffer.getSequenceNumber()); + public void testBuffering() throws Exception { + RtpPacket[] stream = createStream(1000); - jitterBuffer.write(p2,AVProfile.audio.find(8)); - assertEquals(1, jitterBuffer.getDropped()); - - - -// buffer = jitterBuffer.read(wallClock.getTime()); -// assertEquals(3, buffer.getSequenceNumber()); - -// buffer = jitterBuffer.read(wallClock.getTime()); -// assertEquals(null, buffer); + Frame[] media = new Frame[stream.length]; + int[] bufferSize = new int[stream.length]; + for (int i = 0; i < stream.length; i++) { + wallClock.tick(20000000L); + if (i%5 == 0) { + jitterBuffer.write(stream[i], AVProfile.audio.find(8)); + jitterBuffer.write(stream[i+1], AVProfile.audio.find(8)); + jitterBuffer.write(stream[i+2], AVProfile.audio.find(8)); + jitterBuffer.write(stream[i+3], AVProfile.audio.find(8)); + jitterBuffer.write(stream[i+4], AVProfile.audio.find(8)); + } + media[i] = jitterBuffer.read(wallClock.getTime()); + bufferSize[i] = jitterBuffer.getBufferSize(); + } + this.checkMaxBufferSize(bufferSize, 6); + this.checkSequence(media); + assertEquals(0, 0); } @Test - public void testEmpty() throws Exception { - RtpPacket p1 = RtpPacket.outgoing(local,remote,false, 8, 1, 160 * 1, 123, new byte[160], 0, 160); //new RtpPacket(172, false); - RtpPacket p2 = RtpPacket.outgoing(local,remote,false, 8, 2, 160 * 2, 123, new byte[160], 0, 160); //new RtpPacket(172, false); - RtpPacket p3 = RtpPacket.outgoing(local,remote,false, 8, 3, 160 * 3, 123, new byte[160], 0, 160); //new RtpPacket(172, false); - - jitterBuffer.write(p1,AVProfile.audio.find(8)); - jitterBuffer.write(p2,AVProfile.audio.find(8)); - jitterBuffer.write(p3,AVProfile.audio.find(8)); - - Frame buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(1, buffer.getSequenceNumber()); - - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(2, buffer.getSequenceNumber()); - - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(3, buffer.getSequenceNumber()); + public void testNormalReadWrite() throws Exception { + RtpPacket[] stream = createStream(1000); - buffer = jitterBuffer.read(wallClock.getTime()); - assertEquals(null, buffer); + Frame[] media = new Frame[stream.length]; + for (int i = 0; i < stream.length; i++) { + wallClock.tick(20000000L); + jitterBuffer.write(stream[i], AVProfile.audio.find(8)); + media[i] = jitterBuffer.read(wallClock.getTime()); + } + this.checkSequence(media); + assertEquals(0, 0); } @Test - public void testOverflow() { - RtpPacket[] stream = createStream(5); - for (int i = 0; i < stream.length; i++) { - jitterBuffer.write(stream[i],AVProfile.audio.find(8)); + public void testOrdering() throws Exception { + RtpPacket[] stream = createStream(1000); + shuffle(stream); + + for (RtpPacket rtpPacket : stream) { + jitterBuffer.write(rtpPacket, AVProfile.audio.find(8)); } - Frame data = jitterBuffer.read(wallClock.getTime()); - assertEquals(1, data.getSequenceNumber()); - } + Frame[] media = new Frame[stream.length]; + for (int i = 0; i < stream.length; i++) { + wallClock.tick(20000000L); + media[i] = jitterBuffer.read(wallClock.getTime()); + } - @Test - /** - * - * Test that network jitter for RTP packets is estimated correctly - * - * http://tools.ietf.org/html/rfc3550#appendix-A.8 - */ - public void testJitter() { - // the timestamp for each packet increases by 10ms=160 timestamp units for sampling rate 8KHz - RtpPacket p1 = RtpPacket.outgoing(local,remote,false, 8, 1, 160 * 1, 123, new byte[160], 0, 160); - RtpPacket p2 = RtpPacket.outgoing(local,remote,false, 8, 2, 160 * 2, 123, new byte[160], 0, 160); - RtpPacket p3 = RtpPacket.outgoing(local,remote,false, 8, 2, 160 * 3, 123, new byte[160], 0, 160); - RtpPacket p4 = RtpPacket.outgoing(local,remote,false, 8, 3, 160 * 4, 123, new byte[160], 0, 160); - RtpPacket p5 = RtpPacket.outgoing(local,remote,false, 8, 3, 160 * 5, 123, new byte[160], 0, 160); - - - long jitterDeltaLimit = 1; // 1 sampling units delta for timing and rounding errors , i.e. 1/8ms - - //write first packet, expected jitter = 0 - jitterBuffer.write(p1,AVProfile.audio.find(8)); - assertEquals(0, jitterBuffer.getEstimatedJitter(), jitterDeltaLimit); - - // move time forward by 20ms and write the second packet - // the transit time should remain approximately the same - near 0ms. - // expected jitter = 0; - wallClock.tick(20000000L); - jitterBuffer.write(p2,AVProfile.audio.find(8)); - assertEquals(0, jitterBuffer.getEstimatedJitter(), jitterDeltaLimit); - - // move time forward by 30ms and write the next packet - // the transit time should increase by 10ms, - // as suggested by the difference in the third packet timestamp (160*3) and the 20ms delay for the server to receive the second packet - // expected jitter should be close to the 10ms delay in timestamp units/16, i.e. 80/16. - wallClock.tick(30000000L); - jitterBuffer.write(p3,AVProfile.audio.find(8)); - assertEquals(5, jitterBuffer.getEstimatedJitter(), jitterDeltaLimit); - - //move time forward by 20ms and write the next packet - //the transit time does not change from the previous packet. - // The jitter should stay approximately the same. - wallClock.tick(20000000L); - jitterBuffer.write(p4,AVProfile.audio.find(8)); - assertEquals(4, jitterBuffer.getEstimatedJitter(), jitterDeltaLimit); - - //move time forward by 30ms and write the next packet - //packet was delayed 10ms again. - // The estimated jitter should increase significantly, by nearly 5ms (80/16) - wallClock.tick(30000000L); - jitterBuffer.write(p5,AVProfile.audio.find(8)); - assertEquals(9, jitterBuffer.getEstimatedJitter(), jitterDeltaLimit); - + this.checkSequence(media); + assertEquals(0, 0); } private RtpPacket[] createStream(int size) { RtpPacket[] stream = new RtpPacket[size]; - int it = 12345; + int it = 1234500000; + int it2 = 0; + int it3 = 1234560000; + int segment = stream.length/3; + for (int i = 0; i < segment; i++) { + stream[i] = RtpPacket.outgoing(local,remote,false, 8, i + 1, 160 * (i+1) + it, 123, new byte[160], 0, 160); + } + + for (int i = segment; i < 2*segment; i++) { + stream[i] = RtpPacket.outgoing(local,remote,false, 8, i + 1, 160 * (i+1) + it2, 123, new byte[160], 0, 160); + } - for (int i = 0; i < stream.length; i++) { - stream[i] = RtpPacket.outgoing(local,remote,false, 8, i + 1, 160 * (i+1) + it, 123, new byte[160], 0, 160); + for (int i = 2*segment; i < stream.length; i++) { + stream[i] = RtpPacket.outgoing(local,remote,false, 8, i + 1, 160 * (i+1) + it3, 123, new byte[160], 0, 160); } + return stream; } + private void checkMaxBufferSize(int[] buffer, int maxSize) throws Exception { + for (int j : buffer) { + assertTrue("Max buffer size exceeded " + j + " > " + maxSize, j <= maxSize); + } + } + private void checkSequence(Frame[] media) throws Exception { + int loss = 0; boolean res = true; for (int i = 0; i < media.length - 1; i++) { - if (media[i] == null) { - throw new Exception("Null data at position: " + i); + if (media[i] == null) { + loss++; + continue; } - if (media[i + 1] == null) { - throw new Exception("Null data at position: " + (i+1)); + if (media[i + 1] == null) { + continue; } res &= (media[i + 1].getSequenceNumber() - media[i].getSequenceNumber() == 1); } + System.out.println("Loss: " + ((100 * loss) / media.length)); + int lossPercent = (100 * loss) / media.length; + assertTrue("Loss is too high " + lossPercent, lossPercent < 10); assertTrue("Wrong sequence ", res); } private void shuffle(RtpPacket[] stream) { Random rnd = new Random(); - for (int k = 0; k < 5; k++) { + for (int k = 0; k < stream.length; k++) { int i = rnd.nextInt(stream.length - 1); + int j = rnd.nextInt(stream.length - 1); RtpPacket tmp = stream[i]; - stream[i] = stream[i + 1]; - stream[i + 1] = tmp; + stream[i] = stream[j]; + stream[j] = tmp; } } + + private HashMap> reorderWithDelay(int delay, int jitter, RtpPacket[] stream) { + HashMap> result = new HashMap>(); + + Random rnd = new Random(); + + for (int i = 0; i < stream.length; i++) { + int key = i + delay; + if (jitter > 0) { + if (rnd.nextBoolean()) { + key += rnd.nextInt(jitter); + } else { + key -= rnd.nextInt(jitter); + } + + if (key < 0) key = 0; + } + + + LinkedList list; + if (!result.containsKey(key)) { + list = new LinkedList(); + } else { + list = result.get(key); + } + + list.push(stream[i]); + result.put(key, list); + } + + return result; + } + }