April 3, 2010

Java Multi-channel Asynchronous Throttler

Tags: Java, Technical

In another of my recent series of generic utility Java classes, I present a multi-channel asynchronous throttler. My project connected to an external webservice which imposed draconian restrictions if usage went above a certain level - defined as whether there were more than X service calls in Y seconds. I was determined to stay under this level. Furthermore, some types of service calls were more expensive than others and had their own limits in addition to the overall limit. That is for certain types of service call I had stay under two limits - a call specific limit and the overall limit that applied to all calls. I refer to this as multi-channel throttling. Also, I wanted the throttler to be asynchronous, that is I did not want to stop procesing while waiting for the webservice to respond to my call. Looking on the web I found a number of throttlers, but none that matched my multi-channel, asynchronous requirements.

The class I coded is below. The interface allows for both synchronous (with the submitSync methods) and asynchronous (with the submit methods, returning a Future object) Runnable code to be executed, and throttled if necessary. Using Runnable allows the throttler to be generic - there is no application specific code in it. The throttles are defined using the Rate inner class. At least one Rate, the overall throttle, must be defined. Extra channel throttles can be passed into the constructor as a Map of Rates keyed by any object. Then calls to submit(Object channelKey, Runnable task) will throttle based on both the overall throttle and the channel throttle with the specified key if it exists.

The Rate is specified in terms of X calls in Y time period. The time period being defined using a Quantity pattern, a scandalously underused pattern in business code - use it! The throttle uses a rolling time window rather than quantising time into discrete blocks. So in continuous time, the throttle is never exceeded (a mistake I saw in a few implementations). The ordering of service calls may not be preserved when multiple calls are scheduled for the start of the next period - the delayed calls will execute in a random order. Apart from this (and across channels) the class will attempt to maintain the order of the service calls. Although it may not always use the optimal ordering of throttled calls. While the time period can be specified down to nanoseconds, this throttle is only millisecond accurate as that is all I needed. I created the TimeProvider interface for the testing reasons detailed below.

//imports skipped for prettier code display
public class ChannelThrottler {

	public static interface TimeProvider {
		public static final TimeProvider SYSTEM_PROVIDER = new TimeProvider() {
			@Override public long getCurrentTimeInMillis() {return System.currentTimeMillis();}
		};
	
		public long getCurrentTimeInMillis();
	}
	
	public static class Rate {
		private final int numberCalls;
		private final int timeLength;
		private final TimeUnit timeUnit;
		private final LinkedList<Long> callHistory = new LinkedList<Long>();
		public Rate(int numberCalls, int timeLength, TimeUnit timeUnit) {
			this.numberCalls = numberCalls;
			this.timeLength = timeLength;
			this.timeUnit = timeUnit;
		}
		private long timeInMillis() {
			return timeUnit.toMillis(timeLength);
		}
		private void addCall(long callTime) {
			callHistory.addLast(callTime);
		}
		private void cleanOld(long now) {
			ListIterator<Long> i = callHistory.listIterator();
			long threshold = now-timeInMillis();
			while (i.hasNext()) {
				if (i.next()<=threshold) {
					i.remove();
				} else {
					break;
				}
			}
		}
		private long callTime(long now) {
			cleanOld(now);
			if (callHistory.size()<numberCalls) {
				return now;
			}
			long lastStart = callHistory.getLast()-timeInMillis();
			long firstPeriodCall=lastStart, call;
			int count = 0;
			Iterator<Long> i = callHistory.descendingIterator();
			while (i.hasNext()) {
				call = i.next();
				if (call<lastStart) {
					break;
				} else {
					count++;
					firstPeriodCall = call;
				}
			}
			if (count<numberCalls) {
				return firstPeriodCall+1;
			} else {
				return firstPeriodCall+timeInMillis()+1;
			}
		}
	}
	
	private final Rate totalRate;
	private final TimeProvider timeProvider;
	private final ScheduledExecutorService scheduler;
	private final Map<Object, Rate> channels = new HashMap<Object, Rate>();
	
	public ChannelThrottler(Rate totalRate) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), new HashMap<Object, Rate>(), TimeProvider.SYSTEM_PROVIDER);
	}
	
	public ChannelThrottler(Rate totalRate, Map<Object, Rate> channels) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), channels, TimeProvider.SYSTEM_PROVIDER);
	}
	
	public ChannelThrottler(Rate totalRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels, TimeProvider timeProvider) {
		this.totalRate = totalRate;
		this.scheduler = scheduler;
		this.channels.putAll(channels);
		this.timeProvider = timeProvider;
	}	
	
	private synchronized long callTime(Rate channel) {
		long now = timeProvider.getCurrentTimeInMillis();
		long callTime = totalRate.callTime(now);
		if (channel!=null) {
			callTime = Math.max(callTime, channel.callTime(now));
			channel.addCall(callTime);
		}
		totalRate.addCall(callTime);
		return callTime;			
	}
	
	private long getThrottleDelay(Object channelKey) {
		long delay = callTime(channels.get(channelKey))-timeProvider.getCurrentTimeInMillis();
		return delay<0?0:delay;
	}
		
	public void submitSync(Object channelKey, Runnable task) throws InterruptedException {
		Thread.sleep(getThrottleDelay(channelKey)); 
		task.run();
	}
	
	public void submitSync(Runnable task) throws InterruptedException {
		long delay = callTime(null)-timeProvider.getCurrentTimeInMillis();
		Thread.sleep(getThrottleDelay(delay)); 
		task.run();
	}
	
	public Future<?> submit(Runnable task) {
		long delay = callTime(null)-timeProvider.getCurrentTimeInMillis();
		return scheduler.schedule(task, delay<0?0:delay, TimeUnit.MILLISECONDS);
	} 
	
	public Future<?> submit(Object channelKey, Runnable task) {
		return scheduler.schedule(task, getThrottleDelay(channelKey), TimeUnit.MILLISECONDS);
	} 
}

As I’m trying to use TDD more, I also have a test class below (using JUnit3). Writing the throttler for tests required a few changes. Firstly, the scheduler used to delay throttled calls can be passed in so I use the DeterministicScheduler from the JMock project (a discussion of that class is here). Furthermore, as it is not practically possible to mock the Java System class and thus control internal time, I have had to create a TimeProvider interface. I can use this to properly mock time and provide a default implementation that just redirects to System.currentTimeMillis().

//imports skipped for prettier code display
@SuppressWarnings("serial")
public class ChannelThrottlerTest {
	
	private static final String CHANNEL1 = "CHANNEL1";
	private static final String CHANNEL2 = "CHANNEL2";
	private DeterministicScheduler scheduler;
	private AtomicLong currentTime = new AtomicLong(0);
	private ChannelThrottler throttler;
	private AtomicInteger count = new AtomicInteger(0);
	private Runnable countIncrementTask = new Runnable() {@Override public void run() {count.incrementAndGet();}};
		
	@Before public void setupThrottler() {
		scheduler = new DeterministicScheduler();
		currentTime.set(0);	
		Map<Object, Rate> channels = new HashMap<Object, Rate>();
		channels.put(CHANNEL1, new Rate(3, 1, TimeUnit.SECONDS));
		channels.put(CHANNEL2, new Rate(1, 1, TimeUnit.SECONDS)); 
		throttler = new ChannelThrottler(new Rate(2, 1, TimeUnit.SECONDS), scheduler, channels, new TimeProvider() {
			@Override public long getCurrentTimeInMillis() {return currentTime.get();}		
		});
		count = new AtomicInteger(0);
	}	
	@Test public void testTotalChannelWithNoDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
	}
	
	@Test public void testTotalChannelWithDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testTotalChannelWithDoubleDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(5, count.get());
	}
	
	@Test public void testTotalChannelWithShortestDelay() throws Exception {
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannel() throws Exception {
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		
		assertEquals(1, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testChannelAndTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannelAffectsTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	private class OrderedTask implements Runnable {
		private final int order;
		public OrderedTask(int order) {this.order=order;}
		@Override public void run() {
			assertEquals(count.incrementAndGet(), order);
		}
	};
	
	@Test public void testChannelCallsAreOrdered() throws Exception {
		throttler.submit(CHANNEL1, new OrderedTask(1));
		throttler.submit(CHANNEL2, new OrderedTask(2));
		throttler.submit(CHANNEL1, new OrderedTask(3));
		throttler.submit(CHANNEL2, new OrderedTask(4));
		throttler.submit(CHANNEL2, new OrderedTask(5));
		throttler.submit(CHANNEL1, new OrderedTask(6));
		throttler.submit(CHANNEL1, new OrderedTask(7));
		scheduler.tick(5000, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannel() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannelWithTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testSync() throws Exception {
		Map<Object, Rate> channels = new HashMap<Object, Rate>();
		channels.put(CHANNEL1, new Rate(3, 250, TimeUnit.MILLISECONDS)); 
		channels.put(CHANNEL2, new Rate(1, 250, TimeUnit.MILLISECONDS)); 
		throttler = new ChannelThrottler(new Rate(2, 250, TimeUnit.MILLISECONDS), channels);
		
		long start = System.nanoTime()/1000000;
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(1, count.get());
		long call1 = System.nanoTime()/1000000;
		assertTrue(call1-start<32);
		
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(2, count.get());
		long call2 = System.nanoTime()/1000000;
		assertTrue(call2-call1>200 && call2-call1<300);
		
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(3, count.get());
		long call3 = System.nanoTime()/1000000;
		assertTrue(call3-call2>200 && call3-call2<300);
	}
}

Comments: I have removed the commenting system (for privacy concerns), below are the comments that were left on this post before doing so…

asa @ 2010-05-21 - Hi Charles, I was researching on implementing a throttler in a similar circumstance as yours, and ended up in your excellent blog. Going through your code, I noticed that although you mentioned that you needed to retain the order of the calls, it does not appear to do so under all circumstances. The following test was written in TestNG (sorry :)), and it fails for me. Apologies for not knowing how to paste this as a code block.

public class ThrottlerTest {
  @Test
  public void scheduledTasksShouldRunInOrder(){
    int numCalls = 50;
	int totalCalls = 1000;
	ChannelThrottler.Rate rate = new ChannelThrottler.Rate(numCalls, 1000, TimeUnit.MILLISECONDS);
	
	ChannelThrottler throttler = new ChannelThrottler(rate);
	
	ConcurrentLinkedQueue base = new ConcurrentLinkedQueue();
	ConcurrentLinkedQueue toCompare = new ConcurrentLinkedQueue();
	
	for(int i = 1; i  collection;

      RunnableImpl(int id, ConcurrentLinkedQueue collection) {
        this.id = id;
        this.collection = collection;
      }

      public void run() {
        System.out.println("ID:" +  id + " Time:" + System.currentTimeMillis() + " Thread:" + Thread.currentThread().getName());
        collection.add(id);
      }
  }
}

Charles @ 2010-05-23 - Hi Asa, You are right, when a number of calls are delayed together to the beginning of the next period then the order of those calls is not preserved. This doesn’t happen in my application and my tests were inadequate to spot it. Thanks for sending your test. Looking at the issue, it seems to run quite deep in the design of the code - it won’t be a quick fix. It seems that windows machines have timer resolution of around 10ms (http://www.javatuning.com/t… so just delaying subsequent tasks won’t work (and wouldn’t be exactly correct anyway). So delayed calls will probably have to be queued. Unfortunately I don’t have time to fix this right now. Instead I have changed the text of the blog to say order won’t always be preserved. Then in a week or so I can hopefully fix it. Though I will probably do this as a different class. Many thanks for finding this and pointing it out, Charles.

Charles @ 2010-05-27 - Hi Asa, I have written a new Throttler that does preserve the order of calls. The details are at http://www.cordinc.com/blog/2010/06/ordered-java-multichannel-asyn.html Charles.