task-scheduler
请实现一个定时任务调度器,有很多任务,每个任务都有一个时间戳,任务会在该时间点开始执行。
定时执行任务是一个很常见的需求,例如
方案1: PriorityBlockingQueue + Polling
我们很快可以想到第一个办法:
- 用一个
java.util.concurrent.PriorityBlockingQueue
来作为优先队列。因为我们需要一个优先队列,又需要线程安全,用PriorityBlockingQueue
再合适不过了。你也可以手工实现一个自己的PriorityBlockingQueue
,用java.util.PriorityQueue
+ ReentrantLock
,用一把锁把这个队列保护起来,就是线程安全的啦 - 对于生产者,可以用一个
while(true)
,造一些随机任务塞进去 - 对于消费者,起一个线程,在
while(true)
里每隔几秒检查一下队列,如果有任务,则取出来执行。
这个方案的确可行,总结起来就是轮询
方案2: PriorityBlockingQueue + 时间差
可以把方案while(true)
里的逻辑变成:
- 偷看一下堆顶的元素,但并不取出来,如果该任务过期了,则取出来
- 如果没过期,则计算一下时间差,然后
sleep() 该时间差
不再是
稍等!这个方案其实有个致命的缺陷,导致它比 PiorityBlockingQueue + Polling
更加不可用,这个缺点是什么呢?。。。假设当前堆顶的任务在
方案3: DelayQueue
方案PriorityBlockingQueue
,它把计算时间差并让消费者等待该时间差的功能集成进了队列,消费者不需要关心时间差的事情了,直接在while(true)
里不断take()
就行了。
import java.util.PriorityQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class DelayQueue<E extends Delayed> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
private Thread leader = null;
public DelayQueue() {}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean put(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
这个代码中有几个要点要注意一下。
if (q.peek() == e) {
leader = null;
available.signal();
}
如果第一个元素等于刚刚插入进去的元素,说明刚才队列是空的。现在队列里有了一个任务,那么就应该唤醒所有在等待的消费者线程,避免了方案leader
重置为
leader
这个成员有啥作用?leader
就是指向
想象一下有个多个消费者线程用
if (leader != null)
available.await();
如果为空说明没有其他消费者去取任务
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
下次循环会走如下分支,取到任务结束,
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
我们可以看到
如果删除这行代码,会发生什么呢?假设现在有
- 线程
A 进来获取first, 然后进入else 的else , 设置了leader 为当前线程A ,并让A 等待一段时间 - 线程
B 进来获取first, 进入else 的阻塞操作, 然后无限期等待,这时线程B 是持有first 引用的 - 线程
A 等待指定时间后被唤醒,获取对象成功,出队,这个对象理应被GC 回收,但是它还被线程B 持有着,GC 链可达,所以不能回收这个first - 只要线程
B 无限期的睡眠,那么这个本该被回收的对象就不能被GC 销毁掉,那么就会造成内存泄露
Task 对象
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Task implements Delayed {
private String name;
private long startTime; // milliseconds
public Task(String name, long delay) {
this.name = name;
this.startTime = System.currentTimeMillis() + delay;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int)(this.startTime - ((Task) o).startTime);
}
@Override
public String toString() {
return "task " + name + " at " + startTime;
}
}
java.util.concurrent.Delayed
,可以用于表示具有过期时间的元素,刚好可以拿来表示任务这个概念。
生产者
import java.util.Random;
import java.util.UUID;
public class TaskProducer implements Runnable {
private final Random random = new Random();
private DelayQueue<Task> q;
public TaskProducer(DelayQueue<Task> q) {
this.q = q;
}
@Override
public void run() {
while (true) {
try {
int delay = random.nextInt(10000);
Task task = new Task(UUID.randomUUID().toString(), delay);
System.out.println("Put " + task);
q.put(task);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生产者很简单,就是一个死循环,不断地产生一些是时间随机的任务。
消费者
public class TaskConsumer implements Runnable {
private DelayQueue<Task> q;
public TaskConsumer(DelayQueue<Task> q) {
this.q = q;
}
@Override
public void run() {
while (true) {
try {
Task task = q.take();
System.out.println("Take " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
当TaskConsumer
会无限等待,直到被唤醒,因此它不会消耗
定时任务调度器
public class TaskScheduler {
public static void main(String[] args) {
DelayQueue<Task> queue = new DelayQueue<>();
new Thread(new TaskProducer(queue), "Producer Thread").start();
new Thread(new TaskConsumer(queue), "Consumer Thread").start();
}
}
方案4: 时间轮(HashedWheelTimer)
时间轮

上图是一个长度为
举个例子,假设指针当前正指向格子
怎么实现时间轮呢?
时间轮的优点是性能高,插入和删除的时间复杂度都是
参考资料
- java.util.concurrent.DelayQueue
- HashedWheelTimer.java - Github
delayQueue 原理理解之源码解析- 简书- 细说延时任务的处理
- 简书 - 延迟任务的实现总结
- nick hao - 博客园 - 定时器(Timer)的实现
- java.util.concurrent.DelayQueue Example
HashedWheelTimer 原理- ZimZz - 博客园Hash 算法系列- 具体算法(HashedWheelTimer) - CSDNjava Disruptor 工作原理,谁能用一个比喻形容下? - 知乎1 分钟实现“延迟消息”功能- 58 沈剑Linux 下定时器的实现方式分析- IBM 1 分钟了解Leader-Follower 线程模型- 58 沈剑