Java 并发工具类(JUC)
Java 并发工具包(java.util.concurrent,JUC)提供了丰富的并发工具类。理解这些工具类的使用是进行高效并发编程的关键。本章将详细介绍 Java 中的并发工具类。
CountDownLatch / CyclicBarrier / Semaphore / ConcurrentHashMap
CountDownLatch(倒计时门闩)
**CountDownLatch**用于等待一个或多个线程完成操作。
特点:
- 计数器只能减,不能增
- 计数器为 0 时,等待的线程被唤醒
- 一次性使用
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建多个线程
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 开始执行");
Thread.sleep(2000);
System.out.println("线程 " + threadId + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数减 1
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("所有线程执行完成");
}
}
CyclicBarrier(循环屏障)
**CyclicBarrier**用于多个线程相互等待,到达屏障点后继续执行。
特点:
- 可以重复使用
- 所有线程到达屏障点后继续执行
- 可以指定到达屏障点后的动作
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程到达屏障点");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 到达屏障点");
barrier.await(); // 等待其他线程
System.out.println("线程 " + threadId + " 继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore(信号量)
**Semaphore**用于控制同时访问资源的线程数量。
特点:
- 可以控制并发数
- 可以获取和释放许可
- 适合资源池场景
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); // 允许 3 个线程同时访问
for (int i = 0; i < 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程 " + threadId + " 获取许可");
Thread.sleep(2000);
System.out.println("线程 " + threadId + " 释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
ConcurrentHashMap(并发哈希表)
**ConcurrentHashMap**是线程安全的 HashMap。
特点:
- 线程安全
- 分段锁机制,性能好
- 支持并发读写
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 线程安全的操作
map.put("key1", 1);
map.put("key2", 2);
Integer value = map.get("key1");
// 原子操作
map.putIfAbsent("key3", 3);
map.replace("key1", 1, 10);
map.compute("key1", (k, v) -> v + 1);
原子类(AtomicInteger 等)
原子类概述
原子类提供了无锁的线程安全操作,基于 CAS(Compare-And-Swap)实现。
常用原子类:
AtomicInteger:原子整数AtomicLong:原子长整数AtomicBoolean:原子布尔值AtomicReference:原子引用
AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作
}
public void decrement() {
count.decrementAndGet();
}
public int get() {
return count.get();
}
// 其他常用方法
public void add(int delta) {
count.addAndGet(delta);
}
public boolean compareAndSet(int expect, int update) {
return count.compareAndSet(expect, update);
}
}
原子类的优势
无锁编程:
- 性能更好(无锁)
- 避免死锁
- 适合高并发场景
// 使用 synchronized(有锁)
public class SynchronizedCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
}
// 使用 AtomicInteger(无锁)
public class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
}
示例:并发任务协调
示例 1:CountDownLatch 协调
import java.util.concurrent.CountDownLatch;
public class CountDownLatchCoordinator {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(taskCount);
// 创建多个任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
startLatch.await(); // 等待开始信号
System.out.println("任务 " + taskId + " 执行");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
finishLatch.countDown();
}
}).start();
}
System.out.println("准备开始...");
Thread.sleep(1000);
startLatch.countDown(); // 发出开始信号
finishLatch.await(); // 等待所有任务完成
System.out.println("所有任务完成");
}
}
示例 2:CyclicBarrier 协调
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierCoordinator {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程完成第一阶段");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 第一阶段
System.out.println("线程 " + threadId + " 第一阶段");
barrier.await();
// 第二阶段
System.out.println("线程 " + threadId + " 第二阶段");
barrier.await();
// 第三阶段
System.out.println("线程 " + threadId + " 第三阶段");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
示例 3:Semaphore 控制并发
import java.util.concurrent.Semaphore;
public class SemaphoreCoordinator {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); // 最多 3 个并发
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep(2000);
System.out.println("任务 " + taskId + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
示例 4:ConcurrentHashMap 使用
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 多线程写入
for (int i = 0; i < 10; i++) {
final int value = i;
new Thread(() -> {
map.put("key" + value, value);
}).start();
}
// 原子操作
map.compute("key1", (k, v) -> v == null ? 1 : v + 1);
map.merge("key2", 1, Integer::sum);
// 线程安全的遍历
map.forEach((k, v) -> System.out.println(k + " = " + v));
}
}
示例 5:原子类使用
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static final AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 多个线程并发增加
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet();
}
});
threads[i].start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终计数:" + counter.get()); // 10000
}
}
示例 6:综合使用
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrencyUtilsExample {
private static final AtomicInteger taskCounter = new AtomicInteger(0);
private static final ConcurrentHashMap<String, Integer> results = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
Semaphore semaphore = new Semaphore(3); // 最多 3 个并发
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
semaphore.acquire();
int taskNum = taskCounter.incrementAndGet();
System.out.println("任务 " + taskId + " 执行(任务编号:" + taskNum + ")");
Thread.sleep(2000);
results.put("task" + taskId, taskId * 10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
latch.countDown();
}
});
}
latch.await();
System.out.println("所有任务完成,结果:" + results);
executor.shutdown();
}
}
并发工具类的选择
CountDownLatch
- 等待一个或多个线程完成
- 一次性使用
- 主线程等待子线程
CyclicBarrier
- 多个线程相互等待
- 可以重复使用
- 分阶段执行
Semaphore
- 控制并发数
- 资源池场景
- 限流
ConcurrentHashMap
- 线程安全的 Map
- 高并发读写
- 替代 Hashtable
原子类
- 无锁编程
- 高并发计数
- 简单操作
小结
Java 并发工具类要点:
- CountDownLatch:等待线程完成,一次性使用
- CyclicBarrier:线程相互等待,可重复使用
- Semaphore:控制并发数,资源池
- ConcurrentHashMap:线程安全的 Map
- 原子类:无锁线程安全操作
关键要点:
- CountDownLatch 用于等待
- CyclicBarrier 用于同步
- Semaphore 用于限流
- ConcurrentHashMap 替代 Hashtable
- 原子类提供无锁操作
理解了并发工具类,你就能高效地协调并发任务。在下一章,我们将学习死锁与并发问题分析。