June 27, 2010

Ordered Java Multi-channel Asynchronous Throttler

Tags: Java, Technical

Some time ago I wrote a post describing a Java Multi-channel Asynchronous Throttler I had written. At the time, I stated it would preserve the order of calls, but as Asa commented on that blog post, this was not always the case. Here is a new version that does preserve order, and passes Asa’s test. As part of this work I also extracted common code into new classes and created a ChannelThrottler interface. It works by placing incoming tasks on an internal queue. All the code detailed in this post (and the other throttler post) is available here.

I’m defining a multi-channel throttler as a class that inputs other Runnable tasks (together with an optional channel identifier) and throttles the rate they are executed. The throttling depends on rules defined as allowing X calls in Y time period. Different channels may have different rates, but there is also an overall rate. For more information see the previous blog post. So the interface for a generic channel throttler becomes the below:

//imports skipped for prettier code display
public interface ChannelThrottler {
	Future<?> submit(Runnable task);
	Future<?> submit(Object channelKey, Runnable task);
}

From the previous work, I extracted the Rate as an independent class (previously it was an inner class). It keeps track of an individual channel throttle rate. Using this information it can calculate the delay that needs to be applied to a Runnable task for it to meet the throttling requirements.

//imports skipped for prettier code display
public final class Rate {

	private final int numberCalls;
	private final int timeLength;
	private final TimeUnit timeUnit;
	private final LinkedList<Long> callHistory = new LinkedList<Long>();
	
	public Rate(int numberCalls, int timeLength, TimeUnit timeUnit) {
		this.numberCalls = numberCalls;
		this.timeLength = timeLength;
		this.timeUnit = timeUnit;
	}
	
	private long timeInMillis() {
		return timeUnit.toMillis(timeLength);
	}

	
	/* package */ void addCall(long callTime) {
		callHistory.addLast(callTime);
	}
	
	private void cleanOld(long now) {
		ListIterator<Long> i = callHistory.listIterator();
		long threshold = now-timeInMillis();
		while (i.hasNext()) {
			if (i.next()<=threshold) {
				i.remove();
			} else {
				break;
			}
		}
	}
	
	/* package */ long callTime(long now) {
		cleanOld(now);
		if (callHistory.size()<numberCalls) {
			return now;
		}
		long lastStart = callHistory.getLast()-timeInMillis();
		long firstPeriodCall=lastStart, call;
		int count = 0;
		Iterator<Long> i = callHistory.descendingIterator();
		while (i.hasNext()) {
			call = i.next();
			if (call<lastStart) {
				break;
			} else {
				count++;
				firstPeriodCall = call;
			}
		}
		if (count<numberCalls) {
			return firstPeriodCall+1;
		} else {
			return firstPeriodCall+timeInMillis()+1;
		}
	}
}

I also extracted some code common to both implementations (this one and the previous one) as an abstract super class.

//imports skipped for prettier code display
/* package */ abstract class AbstractChannelThrottler implements ChannelThrottler {

	protected final Rate totalRate;
	protected final TimeProvider timeProvider;
	protected final ScheduledExecutorService scheduler;
	protected final Map<Object, Rate> channels = new HashMap<Object, Rate>();
	
	protected AbstractChannelThrottler(Rate totalRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels, TimeProvider timeProvider) {
		this.totalRate = totalRate;
		this.scheduler = scheduler;
		this.channels.putAll(channels);
		this.timeProvider = timeProvider;
	}
	
	protected synchronized long callTime(Rate channel) {
		long now = timeProvider.getCurrentTimeInMillis();
		long callTime = totalRate.callTime(now);
		if (channel!=null) {
			callTime = Math.max(callTime, channel.callTime(now));
			channel.addCall(callTime);
		}
		totalRate.addCall(callTime);
		return callTime;			
	}
	
	protected long getThrottleDelay(Object channelKey) {
		long delay = callTime(channels.get(channelKey))-timeProvider.getCurrentTimeInMillis();
		return delay<0?0:delay;
	}
}

Now for the order-preserving throttler itself. The problem with the previous throttler was that it worked by calculating the delay necessary on a Runnable task to meet the various throttle rates and then schedule its execution just after that delay. This meant that if multiple tasks were scheduled at the same time the JVM would run them in any order it chose. To get around this, the tasks are now stored in a FIFO queue. The appropriate delay to fulfill the rate requirements is still calculated, but now it is used to schedule a call that takes the first task off the queue and executes it. Note that the queue contains FutureTask objects and the input Runnable tasks are converted to a FutureTask. This is to maintain the proper interface and allow the process calling the throttler to see the progress of the task (or cancel it).

public final class QueueChannelThrottler extends AbstractChannelThrottler {
	
	private final Runnable processQueueTask = new Runnable() {
		@Override public void run() {
			FutureTask<?> task = tasks.poll();
			if (task!=null && !task.isCancelled()) {
				task.run();
			}
		}		
	};
	private final Queue<FutureTask<?>> tasks = new LinkedList<FutureTask<?>>();

	public QueueChannelThrottler(Rate totalRate) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), new HashMap<Object, Rate>(), TimeProvider.SYSTEM_PROVIDER);
	}
	
	public QueueChannelThrottler(Rate totalRate, Map<Object, Rate> channels) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), channels, TimeProvider.SYSTEM_PROVIDER);
	}
	
	public QueueChannelThrottler(Rate totalRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels, TimeProvider timeProvider) {
		super(totalRate, scheduler, channels, timeProvider);
	}
	
	@Override public Future<?> submit(Runnable task) {
		return submit(null, task);
	} 
	
	@SuppressWarnings("unchecked")
	@Override public Future<?> submit(Object channelKey, Runnable task) {
		long throttledTime = channelKey==null?callTime(null):callTime(channels.get(channelKey));
		FutureTask runTask = new FutureTask(task, null);
		tasks.add(runTask);
		long now = timeProvider.getCurrentTimeInMillis();
		scheduler.schedule(processQueueTask, throttledTime<now?0:throttledTime-now, TimeUnit.MILLISECONDS);
		return runTask;
	} 

}

All the code detailed in this post (and the other throttler post) is available here. For reference, here are the tests used. They are very similar to the tests used for the previous throttler. However, I have added Asa’s ordered calls test.

//imports skipped for prettier code display
public class QueueChannelThrottlerTest {

	private static final String CHANNEL1 = "CHANNEL1";
	private static final String CHANNEL2 = "CHANNEL2";
	private DeterministicScheduler scheduler;
	private AtomicLong currentTime = new AtomicLong(0);
	private QueueChannelThrottler throttler;
	private AtomicInteger count = new AtomicInteger(0);
	private Runnable countIncrementTask = new Runnable() {@Override public void run() {count.incrementAndGet();}};
	
	@SuppressWarnings("serial")
	@Before public void setupThrottler() {
		scheduler = new DeterministicScheduler();
		currentTime.set(0);	
		Map<Object, Rate> channels = new HashMap<Object, Rate>();
		put(CHANNEL1, new Rate(3, 1, TimeUnit.SECONDS));
		put(CHANNEL2, new Rate(1, 1, TimeUnit.SECONDS));
		throttler = new QueueChannelThrottler(new Rate(2, 1, TimeUnit.SECONDS), scheduler, channels, new TimeProvider() {
			@Override public long getCurrentTimeInMillis() {return currentTime.get();}		
		});
		count = new AtomicInteger(0);
	}	
	
	@Test public void testTotalChannelWithNoDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
	}
	
	@Test public void testTotalChannelWithDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testTotalChannelWithDoubleDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(5, count.get());
	}
	
	@Test public void testTotalChannelWithShortestDelay() throws Exception {
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannel() throws Exception {
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		
		assertEquals(1, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testChannelAndTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannelAffectsTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	private class OrderedTask implements Runnable {
		private final int order;
		public OrderedTask(int order) {this.order=order;}
		@Override public void run() {
			assertEquals(count.incrementAndGet(), order);
		}
	};
	
	@Test public void testChannelCallsAreOrdered() throws Exception {
		throttler.submit(CHANNEL1, new OrderedTask(1));
		throttler.submit(CHANNEL2, new OrderedTask(2));
		throttler.submit(CHANNEL1, new OrderedTask(3));
		throttler.submit(CHANNEL2, new OrderedTask(4));
		throttler.submit(CHANNEL2, new OrderedTask(5));
		throttler.submit(CHANNEL1, new OrderedTask(6));
		throttler.submit(CHANNEL1, new OrderedTask(7));
		scheduler.tick(5000, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannel() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannelWithTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test
	public void scheduledTasksMonotonicallyIncreasing(){
		int numCalls = 50;
		int totalCalls = 1000;
		int ratePeriod = 100;
		final CountDownLatch latch = new CountDownLatch(totalCalls);
		Rate rate = new Rate(numCalls, ratePeriod, TimeUnit.MILLISECONDS);
		QueueChannelThrottler throttler = new QueueChannelThrottler(rate);
		final ConcurrentLinkedQueue<Long> base = new ConcurrentLinkedQueue<Long>();
		
		for(int i = 0; i < totalCalls; i++) {
			throttler.submit(new Runnable() {
				@Override public void run() {
					base.add(System.currentTimeMillis());
					latch.countDown();
				}			
			});
		}
	
		// wait for the tasks to finish, before exiting
		try {
			latch.await((totalCalls/numCalls)*ratePeriod, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			fail();
		}

		assertEquals(base.size(), 1000);
		long last = 0;
		for (Long next: base) {
			assertTrue(next >= last);
			last = next;
		}
	}
	
	@Test
	public void scheduledTasksShouldRunInOrder(){
		int numCalls = 50;
		int totalCalls = 1000;
		int ratePeriod = 100;
		CountDownLatch latch = new CountDownLatch(totalCalls);
		Rate rate = new Rate(numCalls, ratePeriod, TimeUnit.MILLISECONDS);
		QueueChannelThrottler throttler = new QueueChannelThrottler(rate);
		ConcurrentLinkedQueue<Integer> base = new ConcurrentLinkedQueue<Integer>();
		ConcurrentLinkedQueue<Integer> toCompare = new ConcurrentLinkedQueue<Integer>();
		
		for(int i = 0; i < totalCalls; i++) {
			throttler.submit(new RunnableImpl(i, toCompare, latch));
			base.add(i);
		}
	
		// wait for the tasks to finish, before exiting
		try {
			latch.await((totalCalls/numCalls)*ratePeriod, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			fail();
		}

		assertEquals(toCompare.size(), base.size());
		for (int i=0; i<toCompare.size(); i++) {
			assertEquals(toCompare.poll(), base.poll());
		}
		//assertEquals(toCompare, base);
	}

	public class RunnableImpl implements Runnable {
		public final int id;
		private final ConcurrentLinkedQueue<Integer> collection;
		private final CountDownLatch latch;
		
		public RunnableImpl(int id, ConcurrentLinkedQueue<Integer> collection, CountDownLatch latch) {
			this.id = id;
			this.collection = collection;
			this.latch = latch;
		}
		
		public void run() {
			collection.add(id);
			latch.countDown();
		}
	}
}

Update 12 Feb 2012: I was recently asked to specify the license for the code on this page. Searching the web for a license that accurately captured my feelings, I found the WTFPL. The code on this page and at the download link :http://www.cordinc.com/projects/throttler.zip is available under the WTFPL. Basically, you can do what you want with the code on this page, I place no restrictions on its use, but neither are there any guarantees. It solved the problem I had, but I can’t vouch for any other usage. Use at your own risk (but if you find a bug I’d be interested to hear about it).

Comments: I have removed the commenting system (for privacy concerns), below are the comments that were left on this post before doing so…

Ankush Bhatia @ 2016-01-18 - hi I was going through your code and must say that it is very well written. There is only one thing that I seemed confused about. Why do you reverse iterate in the callTime method of the Rate object when you determine that you cant add task in the current window. Cant we simply say firstPeriodCall+timeInMillis()+1 and schedule it. Is the iteration in the hope that by the time you reach the first element it would have run and hence become invalid thus creating a space for the current task in current window? If yes then this task will run before the already waiting tasks. if not then am I missing something?

Charles @ 2016-01-19 - Hi Ankush, the reverse iteration in callTime is actually to find the earliest time for which the task can be scheduled that will satisfy the Rate limit, thus increasing throughput. The loop finds the first call in the most recent rate period (thus firstPeriodCall). Without the loop, how would it know this starting point? Instead we could just use the last call (lastStart+timeInMillis()+1, which without the loop is the same as firstPeriodCall+timeInMillis()+1), as that is guaranteed to be within the specified rate, but may not be the earliest the next task could run. Does that help?

Ankush Bhatia @ 2016-01-20 - Hi cordinc, Thanks for the reply. But I guess I still believe that iterating is not necessary. I might be wrong but bear with me. Lets consider a scenario where we have a max limit of 4 tasks in 10 seconds and that when the request comes for 5th task the linked list has 5 elements out of which 4 falls in the window (current time - duration). Lets consider this starting point as 0 secs and elements in linked list to be scheduled at - 2, 3, 4, 5, 6 seconds respectively. lets see the code of callTime

cleanOld(now); // it will delete the fifth element from the start of the linked list since -2 sec is out of the duration window. Elements in list now will be at 3,4,5,6 seconds
if (callHistory.size()< numberCalls) 
  return now;
  
//can't add in this window needs to schedule later
long lastStart = callHistory.getLast()-timeInMillis();
long firstPeriodCall=lastStart, call;
int count = 0;
Iterator<Long> i = callHistory.descendingIterator();
while (i.hasNext()) {
  call = i.next();
  if (call<lastStart) {
    break;
  } else {
    count++;
    firstPeriodCall = call;
  }
}

we start iterating backwards comparing the start time of new window(last call - duration) in this case (it would be 6 - 10 = -4 seconds). there will be no element for which call < laststart(-4 seconds) so you will never break and count will still be 4. Since this call is synchronized no other thread will modify linked list in any way while we iterate. So you will always end at the first element(3 sec) which we can get by callHistory.getFirst()Please let me know in case my thought process is wrong. Also one another unrelated query, we are using Executors.newSingleThreadScheduledExecutor() , java docs says its one thread pool and the scheduled tasks will become enabled at the designated time but not run if the worker thread is still executing previous task. Since our thread dequeues the task and completes it before dequeing another task. Is there a possibility that dequing of the task may be delayed due to this. Should we be concern aboutr this or for all practicle purposes this may not happen.

Charles @ 2016-01-21 - Hi, I get a slightly different answer to your example. In your example when the 5th task arrives at t=6s, the first element (t=-2s) is not cleaned as it is within 10 seconds of the current time (t=6s), so the task is scheduled for t=8.01s which is the earliest time that satisfies the rate. Even if it was cleaned, the queue should be 3, 4, 5 as the 5th task at t=6s is not been queued yet. Thus the 5th task would be schedule for t=6s as rate has not yet been exceeded. Do you agree? Here is an example of where the iterator is necessary. With the rate set to 2 tasks in 5 seconds, and tasks arrive at 0s, 1s, 2s, they will be scheduled to 0s, 1s, 5.01s. Now what happens when the next task arrives at t=3s? Nothing is cleaned as 3s-5s is before the first element. The iterator stops at the task scheduled for 1s as 0s<5.01s-5s, thus the task arriving at t=3s is scheduled for 6.01s, again the earliest allowable time. Here taking the front of the queue and adding the rate period would result in it scheduled for 5.01s and thus violate the rate. You are correct that the dequeue may be delayed if the tasks runs too long. It is likely to occur if the task time may be very large relative to your rate (eg. if tasks can run for 0.5s and the rate is 2 in 1s). If that is your use case, then you will need to consider this possibility. In the original use case for this code the tasks were very, very small relative to the rate, so it was not an issue in the design. Another possible problem that was never an issue for me is, what if the queue grows very large because the rate of tasks coming in consistently exceeds the allowable rate. You may need to consider this if the code will run for a long time and there is no time to recover. Throttles work well at smoothing out bursts, using the time between bursts to recover and empty (or nearly empty) the queue. This was the original use case (plus for me no tasks would come in overnight so the queue definitely cleared during that time).

Ankush Bhatia @ 2016-01-21 - Hi cordinc, Thanks for the reply. I made a typo in my post the request that comes in is for 6th element at t=10s and existing elements were at -2, 3,4,5,6. But it does not matter since I get what you are saying. I guess I actually overlooked the possibility that there may be tasks that are scheduled for the time later than the time of current request handling. Now I agree with the use of iterator. For avoiding the delay I could probably let worker thread dequeue the element from this queue and add it to another queue for processing.Also in my case I can discard the messages when the max rate has been achieved rather than scheduling for later processing to avoid queue growing very large.

Charles @ 2016-01-22 - Hi Ankush, thanks for your kind comments. I’m very happy to hear the code doesn’t need to be changed :) Your ideas for adjusting the throttle sound fine to me, so good luck with your work!

Nilanjan @ 2020-07-16 - Hello, I am trying to use your code. I know this was written a long time ago but I will be glad if you can help me. I am trying to run the code for 100 tasks in a sliding window of 1 sec. in it it is not doing that. It is sometimes more and sometimes less than the rate. If I run the test method scheduledTasksMonotonicallyIncreasing while printing like this it shows that it is sometimes more than 50 in a 100 ms time window. Is my understanding wrong somewhere?

@Test
public void scheduledTasksMonotonicallyIncreasing(){
  int numCalls = 50;
  int totalCalls = 1000;
  int ratePeriod = 100;
  final CountDownLatch latch = new CountDownLatch(totalCalls);
  Rate rate = new Rate(numCalls, ratePeriod, TimeUnit.MILLISECONDS);
  QueueChannelThrottler throttler = new QueueChannelThrottler(rate);
  final ConcurrentLinkedQueue<long> base = new ConcurrentLinkedQueue<long>();
  
  for(int i = 0; i < totalCalls; i++) {
    throttler.submit(new Runnable() {
	  @Override public void run() {
	    base.add(System.currentTimeMillis());
		latch.countDown();
	  }
	});
  }
  
  // wait for the tasks to finish, before exiting
  try {
    latch.await((totalCalls/numCalls)*ratePeriod, TimeUnit.MILLISECONDS);
  } catch (InterruptedException e) {
    fail();
  }
  assertEquals(1000, base.size());
  long last = 0;
  for (Long next: base) {
    System.out.println("QueueChannelThrottlerTest.scheduledTasksMonotonicallyIncreasing next=" + next);
	assertTrue(next >= last);
	last = next;
  }
}

By the way, I don’t know how to post the code in a proper format. I used code tag but it is coming out like this.

Charles @ 2020-07-18 - Hi Nilanjan, I don’t have a Java env to hand at the moment (it’s been a few years), so this response will be theoretical for now at least. Looking at your code, it seems to be the same as the original test, but with the System.out line added. If this is correct, I have two comments/suggestions: 1) A Throttle as I understand (and needed at the time) should guarantee a maximum rate and not a minimum rate. So if the rate through the throttle is a bit less than the max (50 in 100ms) that is ok. If it is a lot less that is a problem of efficiency. If it is more than the rate, that is a bug. This code was designed to smooth out bursts of data and to not exceed a downstream hard limit. If the actual rate was a bit less than the provided rate that was fine. This can occur because of long-running tasks (probably not an issue in this test though). Perhaps your needs are different? 2) If sometimes this test shows the limit being exceeded, then that may be because of the test. This test just checks that all the tasks are executed in order. For that reason the exact time a task executes is of less importance. System.currentTimeMillis is a cheap but inaccurate call, it can be wrong by many ms, but repeated calls should be monotonically increasing (ie, it should never go backwards). This works fine for the purposes of the test, but not if you want to be sure the throttle is working correctly (there are other tests for that). Try using System.nanoTime instead, and see if that helps (see: https://stackoverflow.com/q… ). Does that help? If you are still seeing the rate being exceeded, let me know and I’ll see if I can get the code runing again.

Nilanjan @ 2020-07-18 - Hello Cordinc, I am assuming your name or some part of it is Cordinc. Apologies if not. First of all, thanks a lot for replying. Yes it works alright with System.nanoTime(). You were correct. I reread your posts and yes my needs are a little bit different. For start, my calls/requests are API calls and may or may not be expensive/time consuming. For the time being, if multichannel is not required I can do with just the Timer class and some supporting code it seems. I am still testing. I am placing my requests equally distributed across the rate period and starting them. I don’t care if they carry over, the rate should be exactly same in each rate period. Your code was a very good starting point for me. But because of the newSingleThreadScheduledExecutor it seems that a lot of calls are being lost/discarded if they are time consuming. I will definitely try with newScheduledThreadPool(int) but first I need to make my test cases thread safe. I know Timer has a single thread but it does not block. I will definitely test and let you know if that can be achieved through your code.

Charles @ 2020-07-20 - Hi Nilanjan, cordinc is my online nickname, so it is fine to call me that :) Good to hear the code is working as intended. It sounds like your needs do not require a throttle and instead a continuously repeated scheduled execution hooked up to a queue of tasks. Similar, but different enough to require some coding. The way you are going seems right to me, good luck!