工作多线程等待问题
Nuyoah 问题
在工作中遇到一个函数执行周期非常长,可以采用多线程来解决,优化处理速度
问题:需要判断多个服务是否在线,返回服务在线状态
解决
刚开始想到的是采用多线程来解决此问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public List<Data> SetStatus(List<Data> datas) { ExecutorService executorService = Executors.newFixedThreadPool(10);
for (Data data : datas){ Runnable runnable = new Runnable() { @Override public void run() { setStatus(data) } }; executorService.execute(runnable); } executorService.shutdown(); return datas; }
|
但是上述方法问题存在于,线程池中的函数还没有全部执行完毕就返回结果了,导致返回的数据中有许多数据在线状态还没有检测完成
随后想到需要等线程池中的所有线程执行完毕之后才进行返回
解决方法:使用CountDownLatch计数,计数个数是我们需要处理的数据个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public List<Data> SetStatus(List<Data> datas) { CountDownLatch countDownLatch = new CountDownLatch(datas.size()); ExecutorService executorService = Executors.newFixedThreadPool(10);
for (Data data : datas){ Runnable runnable = new Runnable() { @Override public void run() { setStatus(data); countDownLatch.countDown(); } }; executorService.execute(runnable); } countDownLatch.await() executorService.shutdown(); return datas; }
|
上面方法二完美的解决了无法等待线程池全部完成之后在进行后面操作的问题
但是最后实行的时候会有问题,因为需要给每一个服务来判断该服务是否在线,需要去访问对应服务,这导致即使使用线程池,若服务不通依旧会很慢,所以最后解决办法是在上述方法的基础上采用两次访问接口的操作,第一次访问返回服务主体信息,并不会进行服务在线状态的判断,第二次访问是访问服务在线状态。这样不会导致前台页面出现长时间空白。
创建线程池的方式
转载:线程池的创建方式有几种
线程池的创建方式总共包含以下 7 种(其中 6 种是通过 Executors 创建的,1 种是通过ThreadPoolExecutor 创建的)
Executors
newFixedThreadPool
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void fixedThreadPool() { ExecutorService threadPool = Executors.newFixedThreadPool(2); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("任务被执行,线程:" + Thread.currentThread().getName()); } }; threadPool.submit(runnable); threadPool.execute(runnable); threadPool.execute(runnable); threadPool.execute(runnable); }
|
CachedThreadPool
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void cachedThreadPool() { ExecutorService threadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { threadPool.execute(() -> { System.out.println("任务被执行,线程:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } }); } }
|
SingleThreadExecutor
创建单个线程数的线程池,它可以保证先进先出的执行顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void singleThreadExecutor() { ExecutorService threadPool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; threadPool.execute(() -> { System.out.println(index + ":任务被执行"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } }); } }
|
ScheduledThreadPool
创建一个可以执行延迟任务的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void scheduledThreadPool() { ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5); System.out.println("添加任务,时间:" + new Date()); threadPool.schedule(() -> { System.out.println("任务被执行,时间:" + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } }, 1, TimeUnit.SECONDS); }
|
SingleThreadScheduledExecutor
创建一个单线程的可以执行延迟任务的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void SingleThreadScheduledExecutor() { ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor(); System.out.println("添加任务,时间:" + new Date()); threadPool.schedule(() -> { System.out.println("任务被执行,时间:" + new Date()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } }, 2, TimeUnit.SECONDS); }
|
newWorkStealingPool
创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void workStealingPool() { ExecutorService threadPool = Executors.newWorkStealingPool(); for (int i = 0; i < 10; i++) { final int index = i; threadPool.execute(() -> { System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName()); }); } while (!threadPool.isTerminated()) { } }
|
ThreadPoolExecutor
最原始的创建线程池的方式,它包含了 7 个参数可供设置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void myThreadPoolExecutor() { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); for (int i = 0; i < 10; i++) { final int index = i; threadPool.execute(() -> { System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
|
七个参数分别是:
1.corePoolSize
核心线程数,线程池中始终存活的线程数。
2.maximumPoolSize
最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数。
3.keepAliveTime
最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程。
4.unit:
单位是和参数 3 存活时间配合使用的,合在一起用于设定线程的存活时间 ,参数 keepAliveTime 的时间单位有以下 7 种可选:
TimeUnit.DAYS:天
TimeUnit.HOURS:小时
TimeUnit.MINUTES:分
TimeUnit.SECONDS:秒
TimeUnit.MILLISECONDS:毫秒
TimeUnit.MICROSECONDS:微妙
TimeUnit.NANOSECONDS:纳秒
5.workQueue
一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全,它包含以下 7 种类型:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
较常用的是 LinkedBlockingQueue 和 Synchronous,线程池的排队策略与 BlockingQueue有关。
6.threadFactory
线程工厂,主要用来创建线程,默认为正常优先级、非守护线程。
7.handler
拒绝策略,拒绝处理任务时的策略,系统提供了 4 种可选:
AbortPolicy:拒绝并抛出异常。
CallerRunsPolicy:使用当前调用的线程来执行此任务。
DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
DiscardPolicy:忽略并抛弃当前任务。
默认策略为 AbortPolicy。
线程池的执行流程
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。
等待线程池执行完毕方法
CountDownLatch
计数器方法:上面以写过
Future.get()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public List<Data> SetStatus(List<Data> datas) { ExecutorService executorService = Executors.newFixedThreadPool(10); List<Future> futures = new ArrayList<>(); for (Data data : datas){ Runnable runnable = new Runnable() { @Override public void run() { setStatus(data); } }; Future<Data> submit = executorService.submit(runnable); futures.add(submit); } for (Future future : futures) { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); return datas; }
|