java 线程同步工具类

2016-08-04 18:01:25   最后更新: 2016-08-04 18:01:25   访问数量:490




上一篇日志中,我们介绍了 java 并发编程中的线程安全容器类

java 中线程安全的容器类

本篇日志中,我们介绍一下 java 中除此之外的其他同步工具类

 

上一篇日志中介绍的阻塞队列就是一种同步工具类,它用来协调线程的控制流,其他还包括信号量、栅栏以及闭锁

当然,用户也可以自己创建自己的同步工具类

所有的同步工具类都包含一些特定的结构化属性,他们根据自己封装的一些状态值来决定执行同步工具类的线程是继续执行还是等待

 

闭锁是一种同步工具类,可以延迟线程的进度直到达到某些状态条件,他的作用相当于一扇门,当门打开时才允许线程通过,并且门一旦打开将不再改变,永远保持打开状态,通常,我们用闭所来确保某些活动直到其他活动结束后才继续进行

在此前的日志中,我们曾经使用过闭锁:

java 线程并发中的等待与唤醒

在第二个例子中,静态的 start 成员变量就是一个闭锁,如果他为 false,那么创建的所有线程都将进入 waiting 状态,直到所有线程都创建完毕并处于 waiting 状态,主线程才会打开 start,允许所有线程并发执行,这就是一个典型的二元闭锁的例子

 

java5 原生提供了一种灵活的闭锁实现 -- CountDownLatch,通常用于让一个或多个线程等待一组事件的发生

CountDownLatch 类包含一个闭锁状态计数器,在构造 CountDownLatch 时,计数器被初始化为一个正数,表示需要等待的事件数,他的 await 方法让线程等待计数器达到 0,使用 countDown 方法递减计数器,表示有一个事件已经发生

 

下面我们使用 CountDownLatch 来改进上面提到的 PrintSortedThreadName 类,按照 threadid 从小到大的顺序依次输出每个 Thread 的 name:

package com.techlog.test.service; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * just for test * Created by techlog on 16/5/20. */ @Service public class PrintSortedThreadName { private static List<Long> threadIds; private static int index; private static final Object lock = new Object(); public static void main(String[] argv) { threadIds = new ArrayList<Long>(); int concurrentcount = 10; final CountDownLatch startGate = new CountDownLatch(concurrentcount+1); index = 0; for (int i = 0; i < concurrentcount; i++) { new Thread(new Process(startGate)).start(); startGate.countDown(); } synchronized (lock) { Collections.sort(threadIds); } startGate.countDown(); } private static class Process implements Runnable { private final CountDownLatch startGate; public Process(CountDownLatch startGate) { this.startGate = startGate; } @Override public void run() { try { synchronized (lock) { threadIds.add(Thread.currentThread().getId()); } startGate.await(); synchronized (lock) { while (threadIds.get(index) != Thread.currentThread().getId()) { lock.wait(); } System.out.println(Thread.currentThread().getId() + " : " + Thread.currentThread().getName()); index++; lock.notifyAll(); } } catch (InterruptedException e) { System.out.println("wait interrupted"); } } } }

 

 

我们使用 CountDownLatch 对象 startGate 来控制线程等待所有线程都创建完成、将 id 添加到 threadIds 并且主线程对 threadIds 完成排序,使用 lock 锁实现 threadIds 的保护和线程同步,最终,打印出了:

11 : Thread-0

12 : Thread-1

13 : Thread-2

14 : Thread-3

15 : Thread-4

16 : Thread-5

17 : Thread-6

18 : Thread-7

19 : Thread-8

20 : Thread-9

 

此前我们介绍过 Future,它用来收集 Callable 接口的运算结果

java 创建多线程的三种方式 -- Runnable、Thread、Callable

 

FutureTask 实现了每个任务结束后的处理

public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for(int i=0; i<5; i++) { Callable<String> c = new Task(); MyFutureTask ft = new MyFutureTask(c); executor.submit(ft); } executor.shutdown(); } } class MyFutureTask extends FutureTask<String> { public MyFutureTask(Callable<String> callable) { super(callable); } @Override protected void done() { try { System.out.println(get() + " 线程执行完毕!~"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class Task implements Callable<String> { @Override public String call() throws Exception { Random rand = new Random(); TimeUnit.SECONDS.sleep(rand.nextInt(12)); return Thread.currentThread().getName(); } }

 

 

一旦某个线程执行完成,Future 接收到线程返回,FutureTask 的 done 方法就会被调用,因此,你会看到在不同的时间间隔内依次打出了:

pool-1-thread-1 线程执行完毕!~

pool-1-thread-4 线程执行完毕!~

pool-1-thread-5 线程执行完毕!~

pool-1-thread-3 线程执行完毕!~

pool-1-thread-2 线程执行完毕!~

 

上面的例子中,FutureTask 实际上实现了一个闭锁的功能

 

在 linux 系统调用中,我们曾经介绍过信号量的相关概念,实际上他是带有计数功能的互斥锁,用来保护某个只允许指定数量操作的资源

信号量

java 也提供了计数信号量用来控制同步访问某个特定资源的操作数量,计数信号量还常被用于实现某种资源池,或者对容器施加边界限定

Semaphore 对象通过创建时的构造方法指定了计数的大小,通常是资源的数量,通过对象的 acquire 方法获取资源的许可,计数会相应减1,通过对象的 release 释放资源的许可,计数会相应的加1,当计数值为 0 时,则 acquire 会阻塞直到计数大于 0,通过 availablePermits 方法可以查看当前计数

 

下面的代码展示了一个模拟限制客户端并发请求数的示例:

package com.techlog.test.service; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class ConcurrentCall { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 10; index++) { final int NO = index; Runnable run = () -> { try { System.out.println("Acquire: " + NO); // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); System.out.println("Release: " + NO); // 释放许可 semp.release(); } catch (InterruptedException e) { e.printStackTrace(); } }; exec.execute(run); } exec.shutdown(); } }

 

 

输出了:

Acquire: 0

Acquire: 3

Acquire: 2

Acquire: 1

Acquire: 5

Accessing: 2

Accessing: 3

Acquire: 4

Acquire: 7

Accessing: 0

Acquire: 6

Accessing: 5

Accessing: 1

Acquire: 9

Acquire: 8

Release: 0

Accessing: 4

Release: 2

Accessing: 7

Release: 7

Accessing: 6

Release: 3

Accessing: 9

Release: 5

Accessing: 8

Release: 1

Release: 4

Release: 6

Release: 9

Release: 8

 

可以看到,每一时刻最多只有允许占用资源的 5 个线程被调用,其他线程则在等待资源,直到某个线程 release,立即有新的线程获得资源

 

栅栏类似于闭锁,他能阻塞一组线程直到某个事件发生

 

他与闭锁关键的区别在于,所有线程必须同时到达栅栏位置,才能继续执行,闭锁用于等待事件的发生,栅栏则用于等待其他线程

CycleBarrier 对象创建时传入所有需要等待的线程数,当线程调用 CycleBarrier 对象的 await 方法表示已经到达栅栏,并进入 waiting 状态,直到所有线程都调用 await 方法到达栅栏,则栅栏会立即开启,所有线程同步执行

每当栅栏开启并让所有线程通过以后,栅栏将再次关闭,直到指定数目的线程再次到齐

 

下面的代码模拟了一个赛跑的示例:

public class CyclicBarrierTest { public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "1号选手"))); executor.submit(new Thread(new Runner(barrier, "2号选手"))); executor.submit(new Thread(new Runner(barrier, "3号选手"))); executor.shutdown(); } } class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(name + " 准备好了..."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " 起跑!"); } }

 

 

Exchanger 用于实现两方在栅栏位置上交换数据

Exchanger 对象具有 exchange 方法,当一个线程调用 exchange 方法后,他将处于 waiting 状态,直到另一个线程调用 exchange 方法

通常 Exchanger 用来实现两个线程间满缓冲区与空缓冲区的交换,维护空缓冲区的线程调用 exchange 方法等待另一个线程缓冲区满,另一个线程缓冲区满后,则两个线程开始交换数据

下面的代码模拟了数据的交换示例:

public class ThreadLocalTest { public static void main(String[] args) { Exchanger<List<Integer>> exchanger = new Exchanger<>(); new Consumer(exchanger).start(); new Producer(exchanger).start(); } } class Producer extends Thread { List<Integer> list = new ArrayList<>(); Exchanger<List<Integer>> exchanger = null; public Producer(Exchanger<List<Integer>> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { Random rand = new Random(); for(int i=0; i<10; i++) { list.clear(); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { List<Integer> list = new ArrayList<>(); Exchanger<List<Integer>> exchanger = null; public Consumer(Exchanger<List<Integer>> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { for(int i=0; i<10; i++) { try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print(list.get(0)+", "); System.out.print(list.get(1)+", "); System.out.print(list.get(2)+", "); System.out.print(list.get(3)+", "); System.out.println(list.get(4)+", "); } } }

 

 






技术帖      龙潭书斋      线程      thread      java      信号量      semaphore      java并发编程实战      同步      barrier      闭锁      栅栏     


京ICP备15018585号