博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程并发工具类(Fork-Join,CountDownLatch,CyclicBarrier,Semaphore,Exchange)
阅读量:4125 次
发布时间:2019-05-25

本文共 11977 字,大约阅读时间需要 39 分钟。

文章目录

Fork-Join

分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

WorkStealing:所谓 Work-Stealing,在 ForkJoinPool 中的实现为:线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行 —— 感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”

ForkJoinTask<V> 用来专门定义 Fork/Join 型任务 —— 完成将大任务分割为小任务以及合并结果的工作。一般我们不需要直接使用 ForkJoinTask,而是通过继承它的子类 RecursiveActionRecursiveTask 并实现对应的抽象方法 —— compute ,来定义我们自己的任务。其中,RecursiveAction 是不带返回值的 Fork/Join 型任务,所以使用此类任务并不产生结果,也就不涉及到结果的合并;而 RecursiveTask 是带返回值的 Fork/Join 型任务,使用此类任务的话,在任务结束前,我们需要进行结果的合并。其中,通过 ForkJoinTask 的 fork 方法,我们可以产生子任务并执行;通过 join 方法,我们可以获得子任务的结果

在这里插入图片描述

Fork-Join同步用法以及有返回值得情况:【统计整形数组元素和】

public class SumArray {
private static class SumTask extends RecursiveTask
{
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; //表示我们要实际统计的数组 private int fromIndex;//开始统计的下标 private int toIndex;//统计到哪里结束的下标 public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() {
if(toIndex-fromIndex < THRESHOLD) {
int count = 0; for(int i=fromIndex;i<=toIndex;i++) {
count = count + src[i]; } return count; }else {
//fromIndex....mid....toIndex //1...................70....100 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); invokeAll(left,right); return left.join()+right.join(); } } } public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(); //生成随机数组 int[] src = MakeArray.makeArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步调用 System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); }}

Fork-Join异步用法以及无返回值得情况:

public class FindDirsFiles extends RecursiveAction{
private File path;//当前任务需要搜寻的目录 public FindDirsFiles(File path) {
this.path = path; } public static void main(String [] args){
try {
// 用一个 ForkJoinPool 实例调度总任务 ForkJoinPool pool = new ForkJoinPool(); FindDirsFiles task = new FindDirsFiles(new File("F:/")); pool.execute(task);//异步调用 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){
otherWork = otherWork+i; } System.out.println("Main Thread done sth......,otherWork="+otherWork); task.join();//阻塞的方法 System.out.println("Task end"); } catch (Exception e) {
e.printStackTrace(); } } @Override protected void compute() {
List
subTasks = new ArrayList<>(); File[] files = path.listFiles(); if(files!=null) {
for(File file:files) {
if(file.isDirectory()) {
subTasks.add(new FindDirsFiles(file)); }else {
//遇到文件,检查 if(file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:"+file.getAbsolutePath()); } } } if(!subTasks.isEmpty()) {
for(FindDirsFiles subTask:invokeAll(subTasks)) {
subTask.join();//等待子任务执行完成 } } } }}

CountDownLatch

一个线程或一组线程等待其他的线程完成工作以后再执行

await:用来等待
countDown:负责计数器减1
在这里插入图片描述

//5个线程,6个countDown点public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6); //初始化线程(只有一步,有4个) private static class InitThread implements Runnable{
@Override public void run() {
System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次; for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } //业务线程 private static class BusiThread implements Runnable{
@Override public void run() {
try {
latch.await(); } catch (InterruptedException e) {
e.printStackTrace(); } for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException {
//单独的初始化线程,初始化分为2步,需要扣减两次 new Thread(new Runnable() {
@Override public void run() {
SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown();//每完成一步初始化工作,扣减一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown();//每完成一步初始化工作,扣减一次 } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); }}

执行结果:

Thread_16 read init work......Thread_14 read init work......Thread_16 ........continue do its workThread_15 read init work......Thread_15 ........continue do its workThread_13 ........continue do its workThread_13 ........continue do its workThread_15 ........continue do its workThread_16 ........continue do its workThread_14 ........continue do its workThread_14 ........continue do its workThread_11 ready init work step 1nd......begin step 2nd.......Thread_11 ready init work step 2nd......Main do ites work........BusiThread_12 do business-----BusiThread_12 do business-----BusiThread_12 do business-----

CyclicBarrier

让一组线程到达某个屏障,被阻塞,一直到组内最后一个线程到达屏障时,屏障开放,所有阻塞的线程会继续执行

public class UseCyclicBarrier {
private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread()); private static ConcurrentHashMap
resultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器 public static void main(String[] args) {
for(int i=0;i<=4;i++){
Thread thread = new Thread(new SubThread()); thread.start(); } } //负责屏障开放以后的工作 private static class CollectThread implements Runnable{
@Override public void run() {
StringBuilder result = new StringBuilder(); for(Map.Entry
workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } //工作线程 private static class SubThread implements Runnable{
@Override public void run() {
long id = Thread.currentThread().getId();//线程本身的处理结果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//随机决定工作线程的是否睡眠 try {
if(r.nextBoolean()) {
Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) {
e.printStackTrace(); } } }}

执行结果:

11....is await13....is await14....is await15....is awaitThread_12 ....do something12....is await the result = [11][12][13][14][15]do other business........Thread_11 ....do its businessThread_12 ....do its businessThread_13 ....do its businessThread_14 ....do its businessThread_15 ....do its business

CountDownLatch 是由外部线程决定是否可以继续执行,而CyclicBarrier是由内部线程决定大家是否可以执行

Semaphore

控制同时访问某个特定资源的线程数量,用在流量控制

使用semaphore演示数据库连接池:

//使用semaphore实现数据库连接池public class DBPoolSemaphore {
private final static int POOL_SIZE = 10; private final Semaphore useful;//useful表示可用的数据库连接 public DBPoolSemaphore() {
this.useful = new Semaphore(POOL_SIZE); } //存放数据库连接的容器 private static LinkedList
pool = new LinkedList
(); //初始化池 static {
for (int i = 0; i < POOL_SIZE; i++) {
//SqlConnectImpl.fetchConnection() //class SqlConnectImpl implements Connection //返回一个自定义实现的Connection pool.addLast(SqlConnectImpl.fetchConnection()); } } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException {
if(connection!=null) {
System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); synchronized (pool) {
pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException {
useful.acquire(); Connection conn; synchronized (pool) {
conn = pool.removeFirst(); } return conn; }}

测试数据库连接池:

public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); //业务线程 private static class BusiThread extends Thread{
@Override public void run() {
Random r = new Random();//让每个线程持有连接的时间不一样 long start = System.currentTimeMillis(); try {
Connection connect = dbPool.takeConnect(); System.out.println("Thread_"+Thread.currentThread().getId() +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms."); Thread.sleep(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据 System.out.println("查询数据完成,归还连接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) {
} } } public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread(); thread.start(); } } }

执行结果:

Thread_11_获取数据库连接共耗时【0】ms.Thread_12_获取数据库连接共耗时【0】ms.Thread_13_获取数据库连接共耗时【0】ms.Thread_14_获取数据库连接共耗时【0】ms.Thread_15_获取数据库连接共耗时【0】ms.Thread_16_获取数据库连接共耗时【0】ms.Thread_17_获取数据库连接共耗时【0】ms.Thread_18_获取数据库连接共耗时【0】ms.Thread_19_获取数据库连接共耗时【0】ms.Thread_20_获取数据库连接共耗时【0】ms.查询数据完成,归还连接!当前有40个线程等待数据库连接!!可用连接数:0Thread_21_获取数据库连接共耗时【101】ms.查询数据完成,归还连接!当前有39个线程等待数据库连接!!可用连接数:0Thread_22_获取数据库连接共耗时【101】ms.查询数据完成,归还连接!当前有38个线程等待数据库连接!!可用连接数:0Thread_23_获取数据库连接共耗时【104】ms.查询数据完成,归还连接!当前有37个线程等待数据库连接!!可用连接数:0Thread_24_获取数据库连接共耗时【125】ms.查询数据完成,归还连接!当前有36个线程等待数据库连接!!可用连接数:0Thread_25_获取数据库连接共耗时【142】ms.查询数据完成,归还连接!当前有35个线程等待数据库连接!!可用连接数:0Thread_26_获取数据库连接共耗时【163】ms.查询数据完成,归还连接!当前有34个线程等待数据库连接!!可用连接数:0Thread_27_获取数据库连接共耗时【164】ms.查询数据完成,归还连接!当前有33个线程等待数据库连接!!可用连接数:0Thread_28_获取数据库连接共耗时【166】ms.查询数据完成,归还连接!当前有32个线程等待数据库连接!!可用连接数:0Thread_29_获取数据库连接共耗时【174】ms.查询数据完成,归还连接!当前有31个线程等待数据库连接!!可用连接数:0Thread_30_获取数据库连接共耗时【176】ms.查询数据完成,归还连接!

Exchange

用于两个线程间的数据交换

public class UseExchange {
private static final Exchanger
> exchange = new Exchanger
>(); public static void main(String[] args) {
//第一个线程 new Thread(new Runnable() {
@Override public void run() {
Set
setA = new HashSet
();//存放数据的容器 try { /*添加数据 * set.add(.....) * */ setA = exchange.exchange(setA);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); //第二个线程 new Thread(new Runnable() { @Override public void run() { Set
setB = new HashSet
();//存放数据的容器 try { /*添加数据 * set.add(.....) * set.add(.....) * */ setB = exchange.exchange(setB);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); }}

转载地址:http://sfhpi.baihongyu.com/

你可能感兴趣的文章
【JavaScript 教程】浏览器—History 对象
查看>>
还不会正则表达式?看这篇!
查看>>
100道+ JavaScript 面试题,助你查漏补缺
查看>>
JavaScript深入理解之闭包
查看>>
这才是学习Vite2的正确姿势!
查看>>
7 个适用于所有前端开发人员的很棒API,你需要了解一下
查看>>
25个构建Web项目的HTML建议,你需要了解一下!
查看>>
【web素材】02-10款大气的购物商城网站模板
查看>>
6种方式实现JavaScript数组扁平化(flat)方法的总结
查看>>
如何实现a===1 && a===2 && a===3返回true?
查看>>
49个在工作中常用且容易遗忘的CSS样式清单整理
查看>>
20种在学习编程的同时也可以在线赚钱的方法
查看>>
隐藏搜索框:CSS 动画正反向序列
查看>>
12 个JavaScript 特性技巧你可能从未使用过
查看>>
127个超级实用的JavaScript 代码片段,你千万要收藏好(上)
查看>>
【视频教程】Javascript ES6 教程27—ES6 构建一个Promise
查看>>
【5分钟代码练习】01—导航栏鼠标悬停效果的实现
查看>>
127个超级实用的JavaScript 代码片段,你千万要收藏好(中)
查看>>
8种ES6中扩展运算符的用法
查看>>
【视频教程】Javascript ES6 教程28—ES6 Promise 实例应用
查看>>