PriorityBlockingQueue在线程池中使用的异常
声明
本篇文章除部分引用外,均为原创内容,如有雷同纯属巧合,引用转载请附上原文链接与声明。
阅读条件
读本篇文章需掌握多线程编程,线程池方面知识
注意
本文若包含部分下载内容,本着一站式阅读的想法,本站提供其对应软件的直接下载方式,但是由于带宽原因下载缓慢是必然的,建立读者去相关官网进行下载,若某些软件禁止三方传播,请在主页上通过联系作者的方式将相关项目进行取消。
文章大纲
- 问题产生
- 问题分析
- 解决方案
问题产生
笔者负责一个数据采集功能的实现,当需要进行数据采集时,将创建一个采集任务并提交到线程池中执行,线程池所采用的任务缓存队列是LinkBlockingQueue,即任务先到就先执行,后到后执行。
后来需求发生变化,要求对提交的数据采集任务的执行顺序进行控制,使某些特殊的任务可以优先执行。第一时间想到的解决方案是将LinkBlockingQueue更换为PriorityBlockingQueue,然后将相关的任务继承Comparable接口并实现,这样便可以做到对执行顺序进行控制的目的。
动手开干
首先新建任务类,用于执行任务,代码如下,该任务类初始化时传入一个int类型参数表示优先级,值越大则优先级越高。休眠1000ms模拟采集过程耗时。
public class Task implements Runnable, Comparable<Task> {
private int priority;
public int getPriority() {
return this.priority;
}
public Task(int priority) {
this.priority = priority;
}
@Override
public int compareTo(Task anotherTask) {
return anotherTask.getPriority() - getPriority();
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("collect data successfully. the priority of this task is : " + priority);
}
}
创建线程池并测试,可以看到为了测试排队功能,线程池初始化时给的coresize,maxSize均是1,这样模拟任务串行执行,任务缓存队列使用的PriorityBlockingQueue。
public class Application {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
executor.submit(new Task(100));
executor.submit(new Task(2));
executor.submit(new Task(3));
executor.submit(new Task(4));
}
}
点击执行,执行结果并没有像期望的那样按照顺序执行,而是抛出了以下的异常信息,该异常的意思是无法将FutureTask转换为Comparable。
Exception in thread "main" java.lang.ClassCastException: class java.util.concurrent.FutureTask cannot be cast to class java.lang.Comparable (java.util.concurrent.FutureTask and java.lang.Comparable are in module java.base of loader 'bootstrap')
at java.base/java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:360)
at java.base/java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:486)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at tech.chenx.core.Application.main(Application.java:19)
问题分析
看到这个错误,首先是一种云里雾里的感觉,起初是怀疑PriorityBlockingQueue的排队是需要任务去实现特殊的接口而不是Comparable接口,于是翻阅了PriorityBlockingQueue源码,并看到头部的简述注释如下
/**
* An unbounded {@linkplain java.util.concurrent.BlockingQueue blocking queue} that uses
* the same ordering rules as class {@link java.util.PriorityQueue} and supplies
* blocking retrieval operations. While this queue is logically
* unbounded, attempted additions may fail due to resource exhaustion
* (causing {@code OutOfMemoryError}). This class does not permit
* {@code null} elements. A priority queue relying on {@linkplain
* Comparable natural ordering} also does not permit insertion of
* non-comparable objects (doing so results in
* {@code ClassCastException}).
*/
大概的意思是该类使用Comparable接口进行排序,所以也不允许插入没有实现Comparable接口的实例,如果插入将会报类转换异常:ClassCastException。
该异常就是上文测试中所遇到的异常,可是观察代码,我们所提交的Task实例是实现了Comparable接口的,还是不明白为什么会抛出该异常,并且异常信息中的FutureTask也不知道从何而来。于是只有翻阅ThreadPoolExecutor的submit源码,这里选择debug启动,进入得到如下断点信息
可以看到,确实我们提交的是Task任务,而在submit方法中,该任务却被包装成了FutureTask类实例,接下来才调用的execute方法。
问题原因
这也就意味着进入线程池执行的任务最终并不是我们定义的Task实例,而是FutureTask实例,而FutureTask类未实现Comparable接口,所以在执行时,线程池将FutureTask任务尝试放到PriorityBlockingQueue中,就出现了PriorityBlockingQueue注释中的说的情况:插入一个未实现Comparable接口的实例,也抛出了ClassCastException。
解决方案
在问题分析中已经找到了原因,那现在就开始着手修改。我们仍然是要实现任务按照优先级进行执行,那么就不能通过ThreadPoolExecutor.submit()方法来提交任务,因为该方法始终要将任务包装为FutureTask。那么我们可以跳过这一步,因为submit方法本质最后还是调用的ThreadPoolExecutor.execute()方法,所以有了第一种解决方案
方案一
采用execute来提交任务,而非submit来提交任务
public class Application {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
executor.execute(new Task(100));
executor.execute(new Task(2));
executor.execute(new Task(3));
executor.execute(new Task(4));
}
}
测试结果如下,任务按照优先级从大到小进行执行,执行正确且无异常信息
collect data successfully. the priority of this task is : 100
collect data successfully. the priority of this task is : 4
collect data successfully. the priority of this task is : 3
collect data successfully. the priority of this task is : 2
但是在实际线上场景中,需求往往更加复杂,导致方案一不可行,观察源码可以发现,submit()和execute()方法的区别是前者是一个返回值,即返回包装的FutureTask,而后者是没有返回值的,没有返回值就意味着:一旦这个任务被提交了,那么这个任务就不再受管控了,导致无法取消,也无法获取结果,也就体现了submit方法的用处,可以通过返回值对过程进行控制。
就拿数据采集来举例,数据采集属于资源消耗较大的操作,那么这个操作应该是支持临时停止的和控制的,所以我们不能直接通过调用execute方法来执行。
那么就引出了新的问题:如何即满足任务按照优先级顺序执行,又能得到其对应的Future实例以便对其执行流程进行管控
方案二
首先,思路是自己创建一个实现Comparable接口的FutureTask类,采用Task作为参数即可,然后调用ThreadPoolExecutor.execute()方法执行该FutureTask实例,这样既能有执行任务对应的FutureTask实例,又能使任务按照优先级执行。
自定义FutureTask如下
public class ComparableFutureTask extends FutureTask<Object> implements Comparable<ComparableFutureTask> {
private Task task;
public Task getTask() {
return task;
}
public ComparableFutureTask(Task task) {
super(task, null);
this.task = task;
}
@Override
public int compareTo(ComparableFutureTask comparableFutureTask) {
return this.task.compareTo(comparableFutureTask.getTask());
}
}
修改测试类
public class Application {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
executor.execute(new ComparableFutureTask(new Task(100)));
executor.execute(new ComparableFutureTask(new Task(2)));
executor.execute(new ComparableFutureTask(new Task(3)));
executor.execute(new ComparableFutureTask(new Task(4)));
}
}
测试结果如下,结果正确且无错误
collect data successfully. the priority of this task is : 100
collect data successfully. the priority of this task is : 4
collect data successfully. the priority of this task is : 3
collect data successfully. the priority of this task is : 2