Some time ago I wrote a post describing a Java Multi-channel
I had written. At the time, I stated it would preserve the order of
calls, but as Asa commented on that blog post, this was not always the
case. Here is a new version that does preserve order, and passes Asa’s
test. As part of this work I also extracted common code into new classes
and created a ChannelThrottler interface. It works by placing incoming
tasks on an internal queue. All the code detailed in this post (and the
other throttler post) is available
I’m defining a multi-channel throttler as a class that inputs other
Runnable tasks (together with an optional channel identifier) and
throttles the rate they are executed. The throttling depends on rules
defined as allowing X calls in Y time period. Different channels may
have different rates, but there is also an overall rate. For more
information see the previous blog
So the interface for a generic channel throttler becomes the below:
From the previous work, I extracted the Rate as an independent class
(previously it was an inner class). It keeps track of an individual
channel throttle rate. Using this information it can calculate the delay
that needs to be applied to a Runnable task for it to meet the
I also extracted some code common to both implementations (this one and
the previous one) as an abstract super class.
Now for the order-preserving throttler itself. The problem with the
previous throttler was that it worked by calculating the delay necessary
on a Runnable task to meet the various throttle rates and then schedule
its execution just after that delay. This meant that if multiple tasks
were scheduled at the same time the JVM would run them in any order it
chose. To get around this, the tasks are now stored in a
FIFO queue. The appropriate delay
to fulfill the rate requirements is still calculated, but now it is used
to schedule a call that takes the first task off the queue and executes
it. Note that the queue contains
objects and the input Runnable tasks are converted to a FutureTask. This
is to maintain the proper interface and allow the process calling the
throttler to see the progress of the task (or cancel it).
All the code detailed in this post (and the other throttler post) is
available here. For
reference, here are the tests used. They are very similar to the tests
used for the previous throttler. However, I have added Asa’s ordered
Update 12 Feb 2012: I was recently asked to specify the license for
the code on this page. Searching the web for a license that accurately
captured my feelings, I found the
WTFPL. The code on this page and
at the download link
is available under the WTFPL. Basically, you can do what you want with
the code on this page, I place no restrictions on its use, but neither
are there any guarantees. It solved the problem I had, but I can’t vouch
for any other usage. Use at your own risk (but if you find a bug I’d be
interested to hear about it).
Comments:I have removed the commenting system (for privacy concerns), below are the comments that were left on this post before doing so…
Ankush Bhatia @ 2016-01-18 - hi I was going through your code and must say that it is very well written. There is only one thing that I seemed confused about. Why do you reverse iterate in the callTime method of the Rate object when you determine that you cant add task in the current window. Cant we simply say firstPeriodCall+timeInMillis()+1 and schedule it. Is the iteration in the hope that by the time you reach the first element it would have run and hence become invalid thus creating a space for the current task in current window? If yes then this task will run before the already waiting tasks. if not then am I missing something?
Charles @ 2016-01-19 - Hi Ankush, the reverse iteration in callTime is actually to find the earliest time for which the task can be scheduled that will satisfy the Rate limit, thus increasing throughput. The loop finds the first call in the most recent rate period (thus firstPeriodCall). Without the loop, how would it know this starting point? Instead we could just use the last call (lastStart+timeInMillis()+1, which without the loop is the same as firstPeriodCall+timeInMillis()+1), as that is guaranteed to be within the specified rate, but may not be the earliest the next task could run. Does that help?
Ankush Bhatia @ 2016-01-20 - Hi cordinc, Thanks for the reply. But I guess I still believe that iterating is not necessary. I might be wrong but bear with me. Lets consider a scenario where we have a max limit of 4 tasks in 10 seconds and that when the request comes for 5th task the linked list has 5 elements out of which 4 falls in the window (current time - duration). Lets consider this starting point as 0 secs and elements in linked list to be scheduled at - 2, 3, 4, 5, 6 seconds respectively. lets see the code of callTime
we start iterating backwards comparing the start time of new window(last call - duration) in this case (it would be 6 - 10 = -4 seconds). there will be no element for which call < laststart(-4 seconds) so you will never break and count will still be 4. Since this call is synchronized no other thread will modify linked list in any way while we iterate. So you will always end at the first element(3 sec) which we can get by callHistory.getFirst()Please let me know in case my thought process is wrong. Also one another unrelated query, we are using Executors.newSingleThreadScheduledExecutor() , java docs says its one thread pool and the scheduled tasks will become enabled at the designated time but not run if the worker thread is still executing previous task. Since our thread dequeues the task and completes it before dequeing another task. Is there a possibility that dequing of the task may be delayed due to this. Should we be concern aboutr this or for all practicle purposes this may not happen.
Charles @ 2016-01-21 - Hi, I get a slightly different answer to your example. In your example when the 5th task arrives at t=6s, the first element (t=-2s) is not cleaned as it is within 10 seconds of the current time (t=6s), so the task is scheduled for t=8.01s which is the earliest time that satisfies the rate. Even if it was cleaned, the queue should be 3, 4, 5 as the 5th task at t=6s is not been queued yet. Thus the 5th task would be schedule for t=6s as rate has not yet been exceeded. Do you agree? Here is an example of where the iterator is necessary. With the rate set to 2 tasks in 5 seconds, and tasks arrive at 0s, 1s, 2s, they will be scheduled to 0s, 1s, 5.01s. Now what happens when the next task arrives at t=3s? Nothing is cleaned as 3s-5s is before the first element. The iterator stops at the task scheduled for 1s as 0s<5.01s-5s, thus the task arriving at t=3s is scheduled for 6.01s, again the earliest allowable time. Here taking the front of the queue and adding the rate period would result in it scheduled for 5.01s and thus violate the rate. You are correct that the dequeue may be delayed if the tasks runs too long. It is likely to occur if the task time may be very large relative to your rate (eg. if tasks can run for 0.5s and the rate is 2 in 1s). If that is your use case, then you will need to consider this possibility. In the original use case for this code the tasks were very, very small relative to the rate, so it was not an issue in the design. Another possible problem that was never an issue for me is, what if the queue grows very large because the rate of tasks coming in consistently exceeds the allowable rate. You may need to consider this if the code will run for a long time and there is no time to recover. Throttles work well at smoothing out bursts, using the time between bursts to recover and empty (or nearly empty) the queue. This was the original use case (plus for me no tasks would come in overnight so the queue definitely cleared during that time).
Ankush Bhatia @ 2016-01-21 - Hi cordinc, Thanks for the reply. I made a typo in my post the request that comes in is for 6th element at t=10s and existing elements were at -2, 3,4,5,6. But it does not matter since I get what you are saying. I guess I actually overlooked the possibility that there may be tasks that are scheduled for the time later than the time of current request handling. Now I agree with the use of iterator. For avoiding the delay I could probably let worker thread dequeue the element from this queue and add it to another queue for processing.Also in my case I can discard the messages when the max rate has been achieved rather than scheduling for later processing to avoid queue growing very large.
Charles @ 2016-01-22 - Hi Ankush, thanks for your kind comments. I’m very happy to hear the code doesn’t need to be changed :) Your ideas for adjusting the throttle sound fine to me, so good luck with your work!
Nilanjan @ 2020-07-16 - Hello, I am trying to use your code. I know this was written a long time ago but I will be glad if you can help me. I am trying to run the code for 100 tasks in a sliding window of 1 sec. in it it is not doing that. It is sometimes more and sometimes less than the rate. If I run the test method scheduledTasksMonotonicallyIncreasing while printing like this it shows that it is sometimes more than 50 in a 100 ms time window. Is my understanding wrong somewhere?
By the way, I don’t know how to post the code in a proper format. I used code tag but it is coming out like this.
Charles @ 2020-07-18 - Hi Nilanjan, I don’t have a Java env to hand at the moment (it’s been a few years), so this response will be theoretical for now at least. Looking at your code, it seems to be the same as the original test, but with the System.out line added. If this is correct, I have two comments/suggestions: 1) A Throttle as I understand (and needed at the time) should guarantee a maximum rate and not a minimum rate. So if the rate through the throttle is a bit less than the max (50 in 100ms) that is ok. If it is a lot less that is a problem of efficiency. If it is more than the rate, that is a bug. This code was designed to smooth out bursts of data and to not exceed a downstream hard limit. If the actual rate was a bit less than the provided rate that was fine. This can occur because of long-running tasks (probably not an issue in this test though). Perhaps your needs are different? 2) If sometimes this test shows the limit being exceeded, then that may be because of the test. This test just checks that all the tasks are executed in order. For that reason the exact time a task executes is of less importance. System.currentTimeMillis is a cheap but inaccurate call, it can be wrong by many ms, but repeated calls should be monotonically increasing (ie, it should never go backwards). This works fine for the purposes of the test, but not if you want to be sure the throttle is working correctly (there are other tests for that). Try using System.nanoTime instead, and see if that helps (see: https://stackoverflow.com/q… ). Does that help? If you are still seeing the rate being exceeded, let me know and I’ll see if I can get the code runing again.
Nilanjan @ 2020-07-18 - Hello Cordinc, I am assuming your name or some part of it is Cordinc. Apologies if not. First of all, thanks a lot for replying. Yes it works alright with System.nanoTime(). You were correct. I reread your posts and yes my needs are a little bit different. For start, my calls/requests are API calls and may or may not be expensive/time consuming. For the time being, if multichannel is not required I can do with just the Timer class and some supporting code it seems. I am still testing. I am placing my requests equally distributed across the rate period and starting them. I don’t care if they carry over, the rate should be exactly same in each rate period. Your code was a very good starting point for me. But because of the newSingleThreadScheduledExecutor it seems that a lot of calls are being lost/discarded if they are time consuming. I will definitely try with newScheduledThreadPool(int) but first I need to make my test cases thread safe. I know Timer has a single thread but it does not block. I will definitely test and let you know if that can be achieved through your code.
Charles @ 2020-07-20 - Hi Nilanjan, cordinc is my online nickname, so it is fine to call me that :) Good to hear the code is working as intended. It sounds like your needs do not require a throttle and instead a continuously repeated scheduled execution hooked up to a queue of tasks. Similar, but different enough to require some coding. The way you are going seems right to me, good luck!