【初】线程池最优使用策略【Java线程池学习一】
大约 6 分钟
本篇博客的主要目的是指导如何在Java中优雅的使用线程池。这篇博客的内容是截止一周前我对线程池的理解,简单说就是工作了三年的人对线程池的理解。
一、前言
初学者对于多线程会有一种莫名的恐惧,会害怕出错,但其实多线程没想象中的那么复杂,主要两点:
- 对于线程的参数熟记于心(刚开始记不住很正常也没啥,收藏文章看几次就好了)
- 使用多线程的时候要注意 全局变量 和 异常处理
对我以往的面试来说(一年多工作经验的时候),掌握下面的理论知识就很充足了。
二、理论
2-1、创建线程池的方式
在实际的工作中我们一般有两种创建线程池的方式:
方式一:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,2, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
方式二:不推荐、不推荐, 了解即可
// 创建一个单线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建一个定时执行的线程池
ScheduledExecutorService executorService1 = Executors.newScheduledThreadPool(1);
// 创建一个可缓存的线程池
ExecutorService executorService2 = Executors.newCachedThreadPool();
// 创建一个定长的线程池
ExecutorService executorService3 = Executors.newFixedThreadPool(1);
方式二点进各个方法,其实底层还是调用的 new ThreadPoolExecutor()
只是传递的参数不一致罢了。
2-2、核心参数(重点)
new ThreadPoolExecutor(1,2, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
创建线程池的时候会传递一堆参数,当你理解了这些参数再来使用线程池将畅通无阻。
ThreadPoolExecutor 的构造方法好有几个重载的,直接来看参数最全的一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
参数名称 | 含义 |
---|---|
corePoolSize(核心线程数量) | 线程池里面的常驻线程数 |
maximumPoolSize(最大线程数量) | 当核心线程数处理不完,并且阻塞队列满了之后,就会开启新的线程来执行 |
unit(超时回收线程时间的单位) | 超时时间的单位,配合上面使用 |
workQueue(阻塞队列) | 当任务超过了核心线程能处理的范围后,新的任务会被放进阻塞队列 常见的队列: 1. ArrayBlockingQueue 基于数组实现的一个有界队列 2. LinkedBlockingQueue 基于链表实现的无界队列(使用的时候要设置大小) 3. SynchronousQueue 一个特殊的队列,不存储数据,一个任务进来直接给到消费者,它有公平模式(先进先出),和非公平模式(先进后出) 4. PriorityBlockingQueue 无界可扩容可排序的队列 5. DelayQueue 里面的元素必须实现Delayed接口,重写里面的getDelay、compareTo方法。 |
threadFactory(线程工厂) | 用来创建线程的工厂 |
handler(拒绝策略) | 当阻塞队列满了之后,并且线程数达到了最大线程数,这时候如果还有任务进来,就会被拒绝。 常见的拒绝策略 1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 (默认的策略) 2. DiscardPolicy:也是丢弃任务,但是不抛出异常。 3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) 4. CallerRunsPolicy:由调用线程处理该任务 |
注:
- 使用阻塞队列的时候要注意 LinkedBlockingQueue 是基于链表实现的一个无界队列,在使用的时候一定要设置一个大小,不然如果任务生产过快,会导致内存溢出。
- 超出核心线程数的任务,先进队列再开新线程(最大线程数)。
- 如果我们可以接受消费慢,其实我们可以考虑把拒绝策略设置成 CallerRunsPolicy (由调用线程处理该任务)
三、实践
3-1、场景一
有一个任务列表,每次最少选择一个任务,最多选择10个任务,绝大部分的时候是选择一个任务的,平均每个任务执行需要 0.3s。
最开始我们接收前端的参数是一个 List集合,每次从循环去处理数据,绝大部分的时候很快,但偶尔用户选择多个的时候就很慢了。
然后我们进行优化,使用多线程去处理,伪代码如下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
public String fun(List<String> ids) throws InterruptedException {
// 计数器,我们需要在多线程处理完毕后再来对结果进行汇总返回
CountDownLatch countDownLatch = new CountDownLatch(ids.size());
// 用来存放多线程的结果
LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
// 循环处理结果
for (String id : ids) {
executor.execute(() -> {
// 业务逻辑处理, 并把处理的结果放入阻塞队列
queue.add(funTest(id));
countDownLatch.countDown();
});
}
// 等待线程都执行完
countDownLatch.await();
// 处理全部的返回值 queue
// ...
return "ok";
}
3-2、场景二
一个查询列表接口,但因为前期表结构设计的问题,现在需要去三张表里面获取数据,然后在把结果按照时间排序返回。
这个场景侧重点对于每个表的数据处理,最终返回给前端的是一个List,但是表数据是不一样的,所以我们查询到数据后需要进行特殊处理,伪代码如下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
public String fun() throws ExecutionException, InterruptedException {
// 不同的任务去处理
Future<Object> resultOne = executor.submit(funOne(), Object.class);
Future<Object> resultTwo = executor.submit(funTwo(), Object.class);
Future<Object> resultThree = executor.submit(funThree(), Object.class);
// get 方法会阻塞直到返回结果
Object one = resultOne.get();
Object two = resultTwo.get();
Object three = resultThree.get();
// 数据集合组合返回
// ...
return "ok";
}
3-3、场景三
某个接口提供了查询功能,但数据量有100w需要按每页1w来查询,并把数据插入数据库。
上面两个的伪代码主要是基于业务的实现,这个代码我们着重来看一下使用多线程需要注意那些因素。
/**
* 100w每次1w 只需要10次即可,但考虑到后续数据量的一个增长,我们这里还是设置了最大线程数为20,阻塞队列是10,这样做多可以容纳300w的数据,还超过的话会让当前线程直执行
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());
public String fun() {
final int pageSize = 10000;
BaseResult tmpBaseResult = baseRequest(1, pageSize);
if (tmpBaseResult.code != 200) {
return "fail";
}
CountDownLatch countDownLatch = new CountDownLatch(tmpBaseResult.totalCount);
for (int i = 1;i <= tmpBaseResult.totalCount; i++) {
Integer page = new Integer(i);
executor.execute(()->{
try {
BaseResult baseResult = baseRequest(page, pageSize);
if (tmpBaseResult.code == 200) {
try {
baseMapper.inserts(baseResult.lists);
}catch (Exception e) {
log.error("数据处理异常:数据库异常:{}", e);
}
}
}catch (Exception e) {
log.error("数据处理异常:多线程处理异常:{}", e);
}finally {
countDownLatch.countDown();
}
});
}
// 等待线程都执行完
countDownLatch.await();
log.info("执行成功");
return "ok";
}
private BaseResult baseRequest(int page, int pageSize) {
BaseResult baseResult = new BaseResult();
try {
Map map = http.execute(page, pageSize);
if (map.get("code") != 200) {
baseResult.code = -1;
log.error("数据处理异常:接口返回异常:{}", e);
return baseResult;
}
baseResult.code = 200;
baseResult.totalCount = map.get("allCount") % pageSize == 0 ? map.get("allCount") / pageSize : map.get("allCount") / pageSize + 1;
baseResult.lists = (List) map.get("data");
}catch (Exception e) {
baseResult.code = -1;
log.error("数据处理异常:请求接口异常:{}", e);
}
return baseResult;
}
public class BaseResult {
/**
* 200 正常
*/
private Integer code;
/**
* 总条数
*/
private Integer totalCount;
/**
* 数据结果
*/
private List<Object> lists;
}
注:
- 一定要注意全局变量的使用,尽可能的不要使用全局变量
- 不要害怕代码多,一定要做好异常处理,不然多线程出了问题你都不知道