Java 线程间通信(wait / notify)
线程间通信是多线程编程中的重要概念。理解 wait/notify 机制是实现线程协调的基础。本章将详细介绍 Java 中的线程间通信。
wait/notify 机制
wait() 方法
**wait()**使当前线程进入等待状态,直到其他线程调用 notify() 或 notifyAll()。
public final void wait() throws InterruptedException;
public final void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException;
重要规则:
- 必须在
synchronized代码块中调用 - 调用
wait()会释放锁 - 被唤醒后需要重新获取锁
synchronized (lock) {
while (condition) {
lock.wait(); // 释放锁,进入等待
}
// 被唤醒后,重新获取锁,继续执行
}
notify() 方法
**notify()**唤醒一个等待该对象锁的线程。
public final void notify();
特点:
- 必须在
synchronized代码块中调用 - 随机唤醒一个等待的线程
- 不释放锁,需要执行完同步块才释放
synchronized (lock) {
// 修改条件
condition = true;
lock.notify(); // 唤醒一个等待的线程
}
notifyAll() 方法
**notifyAll()**唤醒所有等待该对象锁的线程。
public final void notifyAll();
特点:
- 必须在
synchronized代码块中调用 - 唤醒所有等待的线程
- 线程竞争获取锁
synchronized (lock) {
condition = true;
lock.notifyAll(); // 唤醒所有等待的线程
}
wait/notify 的使用模式
// 等待线程
synchronized (lock) {
while (!condition) { // 使用 while 而不是 if
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// 条件满足,执行操作
}
// 通知线程
synchronized (lock) {
condition = true;
lock.notify(); // 或 notifyAll()
}
为什么使用 while 而不是 if:
- 防止虚假唤醒(spurious wakeup)
- 确保条件真正满足
生产者-消费者模型
基本实现
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int CAPACITY = 10;
private final Object lock = new Object();
// 生产者
public void produce(int item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == CAPACITY) { // 队列满,等待
System.out.println("队列满,生产者等待");
lock.wait();
}
queue.offer(item);
System.out.println("生产:" + item);
lock.notifyAll(); // 通知消费者
}
}
// 消费者
public int consume() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) { // 队列空,等待
System.out.println("队列空,消费者等待");
lock.wait();
}
int item = queue.poll();
System.out.println("消费:" + item);
lock.notifyAll(); // 通知生产者
return item;
}
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.produce(i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.consume();
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
多生产者多消费者
public class MultiProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int CAPACITY = 10;
private final Object lock = new Object();
public void produce(int item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == CAPACITY) {
lock.wait();
}
queue.offer(item);
System.out.println(Thread.currentThread().getName() + " 生产:" + item);
lock.notifyAll();
}
}
public int consume() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
lock.wait();
}
int item = queue.poll();
System.out.println(Thread.currentThread().getName() + " 消费:" + item);
lock.notifyAll();
return item;
}
}
public static void main(String[] args) {
MultiProducerConsumer pc = new MultiProducerConsumer();
// 多个生产者
for (int i = 0; i < 3; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
pc.produce(producerId * 10 + j);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者" + producerId).start();
}
// 多个消费者
for (int i = 0; i < 2; i++) {
final int consumerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 15; j++) {
pc.consume();
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者" + consumerId).start();
}
}
}
示例:多线程消息队列
示例 1:简单消息队列
import java.util.LinkedList;
import java.util.Queue;
public class MessageQueue {
private final Queue<String> messages = new LinkedList<>();
private final int MAX_SIZE = 10;
private final Object lock = new Object();
private volatile boolean running = true;
// 发送消息
public void send(String message) throws InterruptedException {
synchronized (lock) {
while (messages.size() >= MAX_SIZE && running) {
System.out.println("队列满,等待发送");
lock.wait();
}
if (running) {
messages.offer(message);
System.out.println("发送消息:" + message);
lock.notifyAll();
}
}
}
// 接收消息
public String receive() throws InterruptedException {
synchronized (lock) {
while (messages.isEmpty() && running) {
System.out.println("队列空,等待接收");
lock.wait();
}
if (!running && messages.isEmpty()) {
return null;
}
String message = messages.poll();
System.out.println("接收消息:" + message);
lock.notifyAll();
return message;
}
}
// 停止
public void stop() {
synchronized (lock) {
running = false;
lock.notifyAll();
}
}
public static void main(String[] args) throws InterruptedException {
MessageQueue queue = new MessageQueue();
// 发送者
Thread sender = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.send("消息" + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 接收者
Thread receiver = new Thread(() -> {
try {
String message;
while ((message = queue.receive()) != null) {
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
sender.start();
receiver.start();
sender.join();
Thread.sleep(1000);
queue.stop();
receiver.join();
}
}