April 3, 2010

Java Multi-channel Asynchronous Throttler

Tags: ,

In another of my recent series of generic utility Java classes, I present a multi-channel asynchronous throttler. My project connected to an external webservice which imposed draconian restrictions if usage went above a certain level – defined as whether there were more than X service calls in Y seconds. I was determined to stay under this level. Furthermore, some types of service calls were more expensive than others and had their own limits in addition to the overall limit. That is for certain types of service call I had stay under two limits – a call specific limit and the overall limit that applied to all calls. I refer to this as multi-channel throttling. Also, I wanted the throttler to be asynchronous, that is I did not want to stop procesing while waiting for the webservice to respond to my call. Looking on the web I found a number of throttlers, but none that matched my multi-channel, asynchronous requirements.

The class I coded is below. The interface allows for both synchronous (with the submitSync methods) and asynchronous (with the submit methods, returning a Future object) Runnable code to be executed, and throttled if necessary. Using Runnable allows the throttler to be generic – there is no application specific code in it. The throttles are defined using the Rate inner class. At least one Rate, the overall throttle, must be defined. Extra channel throttles can be passed into the constructor as a Map of Rates keyed by any object. Then calls to submit(Object channelKey, Runnable task) will throttle based on both the overall throttle and the channel throttle with the specified key if it exists.

The Rate is specified in terms of X calls in Y time period. The time period being defined using a Quantity pattern, a scandalously underused pattern in business code – use it! The throttle uses a rolling time window rather than quantising time into discrete blocks. So in continuous time, the throttle is never exceeded (a mistake I saw in a few implementations). The ordering of service calls may not be preserved when multiple calls are scheduled for the start of the next period – the delayed calls will execute in a random order. Apart from this (and across channels) the class will attempt to maintain the order of the service calls. Although it may not always use the optimal ordering of throttled calls. While the time period can be specified down to nanoseconds, this throttle is only millisecond accurate as that is all I needed. I created the TimeProvider interface for the testing reasons detailed below.

//imports skipped for prettier code display
public class ChannelThrottler {

	public static interface TimeProvider {
		public static final TimeProvider SYSTEM_PROVIDER = new TimeProvider() {
			@Override public long getCurrentTimeInMillis() {return System.currentTimeMillis();}
		};
	
		public long getCurrentTimeInMillis();
	}
	
	public static class Rate {
		private final int numberCalls;
		private final int timeLength;
		private final TimeUnit timeUnit;
		private final LinkedList<Long> callHistory = new LinkedList<Long>();
		public Rate(int numberCalls, int timeLength, TimeUnit timeUnit) {
			this.numberCalls = numberCalls;
			this.timeLength = timeLength;
			this.timeUnit = timeUnit;
		}
		private long timeInMillis() {
			return timeUnit.toMillis(timeLength);
		}
		private void addCall(long callTime) {
			callHistory.addLast(callTime);
		}
		private void cleanOld(long now) {
			ListIterator<Long> i = callHistory.listIterator();
			long threshold = now-timeInMillis();
			while (i.hasNext()) {
				if (i.next()<=threshold) {
					i.remove();
				} else {
					break;
				}
			}
		}
		private long callTime(long now) {
			cleanOld(now);
			if (callHistory.size()<numberCalls) {
				return now;
			}
			long lastStart = callHistory.getLast()-timeInMillis();
			long firstPeriodCall=lastStart, call;
			int count = 0;
			Iterator<Long> i = callHistory.descendingIterator();
			while (i.hasNext()) {
				call = i.next();
				if (call<lastStart) {
					break;
				} else {
					count++;
					firstPeriodCall = call;
				}
			}
			if (count<numberCalls) {
				return firstPeriodCall+1;
			} else {
				return firstPeriodCall+timeInMillis()+1;
			}
		}
	}
	
	private final Rate totalRate;
	private final TimeProvider timeProvider;
	private final ScheduledExecutorService scheduler;
	private final Map<Object, Rate> channels = new HashMap<Object, Rate>();
	
	public ChannelThrottler(Rate totalRate) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), new HashMap<Object, Rate>(), TimeProvider.SYSTEM_PROVIDER);
	}
	
	public ChannelThrottler(Rate totalRate, Map<Object, Rate> channels) {
		this(totalRate, Executors.newSingleThreadScheduledExecutor(), channels, TimeProvider.SYSTEM_PROVIDER);
	}
	
	public ChannelThrottler(Rate totalRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels, TimeProvider timeProvider) {
		this.totalRate = totalRate;
		this.scheduler = scheduler;
		this.channels.putAll(channels);
		this.timeProvider = timeProvider;
	}	
	
	private synchronized long callTime(Rate channel) {
		long now = timeProvider.getCurrentTimeInMillis();
		long callTime = totalRate.callTime(now);
		if (channel!=null) {
			callTime = Math.max(callTime, channel.callTime(now));
			channel.addCall(callTime);
		}
		totalRate.addCall(callTime);
		return callTime;			
	}
	
	private long getThrottleDelay(Object channelKey) {
		long delay = callTime(channels.get(channelKey))-timeProvider.getCurrentTimeInMillis();
		return delay<0?0:delay;
	}
		
	public void submitSync(Object channelKey, Runnable task) throws InterruptedException {
		Thread.sleep(getThrottleDelay(channelKey)); 
		task.run();
	}
	
	public void submitSync(Runnable task) throws InterruptedException {
		long delay = callTime(null)-timeProvider.getCurrentTimeInMillis();
		Thread.sleep(getThrottleDelay(delay)); 
		task.run();
	}
	
	public Future<?> submit(Runnable task) {
		long delay = callTime(null)-timeProvider.getCurrentTimeInMillis();
		return scheduler.schedule(task, delay<0?0:delay, TimeUnit.MILLISECONDS);
	} 
	
	public Future<?> submit(Object channelKey, Runnable task) {
		return scheduler.schedule(task, getThrottleDelay(channelKey), TimeUnit.MILLISECONDS);
	} 
}

As I’m trying to use TDD more, I also have a test class below (using JUnit3). Writing the throttler for tests required a few changes. Firstly, the scheduler used to delay throttled calls can be passed in so I use the DeterministicScheduler from the JMock project (a discussion of that class is here). Furthermore, as it is not practically possible to mock the Java System class and thus control internal time, I have had to create a TimeProvider interface. I can use this to properly mock time and provide a default implementation that just redirects to System.currentTimeMillis().

//imports skipped for prettier code display
@SuppressWarnings("serial")
public class ChannelThrottlerTest {
	
	private static final String CHANNEL1 = "CHANNEL1";
	private static final String CHANNEL2 = "CHANNEL2";
	private DeterministicScheduler scheduler;
	private AtomicLong currentTime = new AtomicLong(0);
	private ChannelThrottler throttler;
	private AtomicInteger count = new AtomicInteger(0);
	private Runnable countIncrementTask = new Runnable() {@Override public void run() {count.incrementAndGet();}};
		
	@Before public void setupThrottler() {
		scheduler = new DeterministicScheduler();
		currentTime.set(0);	
		Map<Object, Rate> channels = new HashMap<Object, Rate>();
		channels.put(CHANNEL1, new Rate(3, 1, TimeUnit.SECONDS));
		channels.put(CHANNEL2, new Rate(1, 1, TimeUnit.SECONDS)); 
		throttler = new ChannelThrottler(new Rate(2, 1, TimeUnit.SECONDS), scheduler, channels, new TimeProvider() {
			@Override public long getCurrentTimeInMillis() {return currentTime.get();}		
		});
		count = new AtomicInteger(0);
	}	
	@Test public void testTotalChannelWithNoDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
	}
	
	@Test public void testTotalChannelWithDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testTotalChannelWithDoubleDelay() throws Exception {
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(500, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(5, count.get());
	}
	
	@Test public void testTotalChannelWithShortestDelay() throws Exception {
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannel() throws Exception {
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		
		assertEquals(1, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1000, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
	}
	
	@Test public void testChannelAndTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	@Test public void testChannelAffectsTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(124, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
	}
	
	private class OrderedTask implements Runnable {
		private final int order;
		public OrderedTask(int order) {this.order=order;}
		@Override public void run() {
			assertEquals(count.incrementAndGet(), order);
		}
	};
	
	@Test public void testChannelCallsAreOrdered() throws Exception {
		throttler.submit(CHANNEL1, new OrderedTask(1));
		throttler.submit(CHANNEL2, new OrderedTask(2));
		throttler.submit(CHANNEL1, new OrderedTask(3));
		throttler.submit(CHANNEL2, new OrderedTask(4));
		throttler.submit(CHANNEL2, new OrderedTask(5));
		throttler.submit(CHANNEL1, new OrderedTask(6));
		throttler.submit(CHANNEL1, new OrderedTask(7));
		scheduler.tick(5000, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannel() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testMultiChannelWithTotal() throws Exception {
		throttler.submit(CHANNEL1, countIncrementTask);
		currentTime = new AtomicLong(777);	
		scheduler.tick(777, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		currentTime = new AtomicLong(877);
		scheduler.tick(100, TimeUnit.MILLISECONDS);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(CHANNEL2, countIncrementTask);
		throttler.submit(countIncrementTask);
		throttler.submit(CHANNEL1, countIncrementTask);
		
		assertEquals(2, count.get());
		scheduler.tick(123, TimeUnit.MILLISECONDS);
		assertEquals(2, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(3, count.get());
		scheduler.tick(778, TimeUnit.MILLISECONDS);
		assertEquals(4, count.get());
		scheduler.tick(1001, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(999, TimeUnit.MILLISECONDS);
		assertEquals(6, count.get());
		scheduler.tick(1, TimeUnit.MILLISECONDS);
		assertEquals(7, count.get());
	}
	
	@Test public void testSync() throws Exception {
		Map<Object, Rate> channels = new HashMap<Object, Rate>();
		channels.put(CHANNEL1, new Rate(3, 250, TimeUnit.MILLISECONDS)); 
		channels.put(CHANNEL2, new Rate(1, 250, TimeUnit.MILLISECONDS)); 
		throttler = new ChannelThrottler(new Rate(2, 250, TimeUnit.MILLISECONDS), channels);
		
		long start = System.nanoTime()/1000000;
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(1, count.get());
		long call1 = System.nanoTime()/1000000;
		assertTrue(call1-start<32);
		
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(2, count.get());
		long call2 = System.nanoTime()/1000000;
		assertTrue(call2-call1>200 && call2-call1<300);
		
		throttler.submitSync(CHANNEL2, countIncrementTask);
		assertEquals(3, count.get());
		long call3 = System.nanoTime()/1000000;
		assertTrue(call3-call2>200 && call3-call2<300);
	}
}

comments powered by Disqus