声明

本篇文章除部分引用外,均为原创内容,如有雷同纯属巧合,引用转载请附上原文链接与声明。

阅读条件

读本篇文章需掌握多线程编程,线程池方面知识

注意

本文若包含部分下载内容,本着一站式阅读的想法,本站提供其对应软件的直接下载方式,但是由于带宽原因下载缓慢是必然的,建立读者去相关官网进行下载,若某些软件禁止三方传播,请在主页上通过联系作者的方式将相关项目进行取消。

文章大纲
  • 问题产生
  • 问题分析
  • 解决方案
问题产生

笔者负责一个数据采集功能的实现,当需要进行数据采集时,将创建一个采集任务并提交到线程池中执行,线程池所采用的任务缓存队列是LinkBlockingQueue,即任务先到就先执行,后到后执行。
后来需求发生变化,要求对提交的数据采集任务的执行顺序进行控制,使某些特殊的任务可以优先执行。第一时间想到的解决方案是将LinkBlockingQueue更换为PriorityBlockingQueue,然后将相关的任务继承Comparable接口并实现,这样便可以做到对执行顺序进行控制的目的。

动手开干

首先新建任务类,用于执行任务,代码如下,该任务类初始化时传入一个int类型参数表示优先级,值越大则优先级越高。休眠1000ms模拟采集过程耗时。

public class Task implements Runnable, Comparable<Task> {

    private int priority;

    public int getPriority() {
        return this.priority;
    }

    public Task(int priority) {
        this.priority = priority;
    }

    @Override
    public int compareTo(Task anotherTask) {
        return anotherTask.getPriority() - getPriority();
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("collect data successfully. the priority of this task is : " + priority);
    }
}

创建线程池并测试,可以看到为了测试排队功能,线程池初始化时给的coresize,maxSize均是1,这样模拟任务串行执行,任务缓存队列使用的PriorityBlockingQueue。

public class Application {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
        executor.submit(new Task(100));
        executor.submit(new Task(2));
        executor.submit(new Task(3));
        executor.submit(new Task(4));
    }
}

点击执行,执行结果并没有像期望的那样按照顺序执行,而是抛出了以下的异常信息,该异常的意思是无法将FutureTask转换为Comparable。

Exception in thread "main" java.lang.ClassCastException: class java.util.concurrent.FutureTask cannot be cast to class java.lang.Comparable (java.util.concurrent.FutureTask and java.lang.Comparable are in module java.base of loader 'bootstrap')
	at java.base/java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:360)
	at java.base/java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:486)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
	at tech.chenx.core.Application.main(Application.java:19)
问题分析

看到这个错误,首先是一种云里雾里的感觉,起初是怀疑PriorityBlockingQueue的排队是需要任务去实现特殊的接口而不是Comparable接口,于是翻阅了PriorityBlockingQueue源码,并看到头部的简述注释如下

/**
 * An unbounded {@linkplain java.util.concurrent.BlockingQueue blocking queue} that uses
 * the same ordering rules as class {@link java.util.PriorityQueue} and supplies
 * blocking retrieval operations.  While this queue is logically
 * unbounded, attempted additions may fail due to resource exhaustion
 * (causing {@code OutOfMemoryError}). This class does not permit
 * {@code null} elements.  A priority queue relying on {@linkplain
 * Comparable natural ordering} also does not permit insertion of
 * non-comparable objects (doing so results in
 * {@code ClassCastException}).
 */

大概的意思是该类使用Comparable接口进行排序,所以也不允许插入没有实现Comparable接口的实例,如果插入将会报类转换异常:ClassCastException。
该异常就是上文测试中所遇到的异常,可是观察代码,我们所提交的Task实例是实现了Comparable接口的,还是不明白为什么会抛出该异常,并且异常信息中的FutureTask也不知道从何而来。于是只有翻阅ThreadPoolExecutor的submit源码,这里选择debug启动,进入得到如下断点信息

threadpool-deb2

可以看到,确实我们提交的是Task任务,而在submit方法中,该任务却被包装成了FutureTask类实例,接下来才调用的execute方法。

问题原因

这也就意味着进入线程池执行的任务最终并不是我们定义的Task实例,而是FutureTask实例,而FutureTask类未实现Comparable接口,所以在执行时,线程池将FutureTask任务尝试放到PriorityBlockingQueue中,就出现了PriorityBlockingQueue注释中的说的情况:插入一个未实现Comparable接口的实例,也抛出了ClassCastException。

解决方案

在问题分析中已经找到了原因,那现在就开始着手修改。我们仍然是要实现任务按照优先级进行执行,那么就不能通过ThreadPoolExecutor.submit()方法来提交任务,因为该方法始终要将任务包装为FutureTask。那么我们可以跳过这一步,因为submit方法本质最后还是调用的ThreadPoolExecutor.execute()方法,所以有了第一种解决方案

方案一

采用execute来提交任务,而非submit来提交任务

public class Application {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
        executor.execute(new Task(100));
        executor.execute(new Task(2));
        executor.execute(new Task(3));
        executor.execute(new Task(4));
    }
}

测试结果如下,任务按照优先级从大到小进行执行,执行正确且无异常信息

collect data successfully. the priority of this task is : 100
collect data successfully. the priority of this task is : 4
collect data successfully. the priority of this task is : 3
collect data successfully. the priority of this task is : 2

但是在实际线上场景中,需求往往更加复杂,导致方案一不可行,观察源码可以发现,submit()和execute()方法的区别是前者是一个返回值,即返回包装的FutureTask,而后者是没有返回值的,没有返回值就意味着:一旦这个任务被提交了,那么这个任务就不再受管控了,导致无法取消,也无法获取结果,也就体现了submit方法的用处,可以通过返回值对过程进行控制。

就拿数据采集来举例,数据采集属于资源消耗较大的操作,那么这个操作应该是支持临时停止的和控制的,所以我们不能直接通过调用execute方法来执行。

那么就引出了新的问题:如何即满足任务按照优先级顺序执行,又能得到其对应的Future实例以便对其执行流程进行管控

方案二

首先,思路是自己创建一个实现Comparable接口的FutureTask类,采用Task作为参数即可,然后调用ThreadPoolExecutor.execute()方法执行该FutureTask实例,这样既能有执行任务对应的FutureTask实例,又能使任务按照优先级执行。

自定义FutureTask如下
public class ComparableFutureTask extends FutureTask<Object> implements Comparable<ComparableFutureTask> {

    private Task task;
    
    public Task getTask() {
        return task;
    }

    public ComparableFutureTask(Task task) {
        super(task, null);
        this.task = task;
    }

    @Override
    public int compareTo(ComparableFutureTask comparableFutureTask) {
        return this.task.compareTo(comparableFutureTask.getTask());
    }
}

修改测试类

public class Application {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(), Thread::new, new ThreadPoolExecutor.AbortPolicy());
        executor.execute(new ComparableFutureTask(new Task(100)));
        executor.execute(new ComparableFutureTask(new Task(2)));
        executor.execute(new ComparableFutureTask(new Task(3)));
        executor.execute(new ComparableFutureTask(new Task(4)));
    }
}

测试结果如下,结果正确且无错误

collect data successfully. the priority of this task is : 100
collect data successfully. the priority of this task is : 4
collect data successfully. the priority of this task is : 3
collect data successfully. the priority of this task is : 2