Java线程通信 #
一、wait/notify机制 #
1.1 基本用法 #
java
public class Message {
private String content;
private boolean empty = true;
public synchronized String take() throws InterruptedException {
while (empty) {
wait(); // 等待消息
}
empty = true;
notifyAll(); // 通知生产者
return content;
}
public synchronized void put(String content) throws InterruptedException {
while (!empty) {
wait(); // 等待消费
}
empty = false;
this.content = content;
notifyAll(); // 通知消费者
}
}
1.2 注意事项 #
- 必须在synchronized块内调用
- wait()释放锁,notify()不释放锁
- 建议使用notifyAll()而非notify()
- 使用while而非if检查条件
二、生产者消费者 #
2.1 使用wait/notify #
java
public class Buffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public synchronized void produce(int value) throws InterruptedException {
while (queue.size() == capacity) {
wait();
}
queue.offer(value);
System.out.println("生产: " + value);
notifyAll();
}
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
int value = queue.poll();
System.out.println("消费: " + value);
notifyAll();
return value;
}
}
2.2 使用BlockingQueue #
java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
queue.put(i); // 队列满时阻塞
System.out.println("生产: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
while (true) {
int value = queue.take(); // 队列空时阻塞
System.out.println("消费: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
三、Condition #
3.1 基本使用 #
java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待不满
}
queue.offer(value);
notEmpty.signal(); // 通知不空
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待不空
}
int value = queue.poll();
notFull.signal(); // 通知不满
return value;
} finally {
lock.unlock();
}
}
}
3.2 多条件变量 #
java
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// 线程A等待
lock.lock();
try {
conditionA.await();
} finally {
lock.unlock();
}
// 线程B唤醒A
lock.lock();
try {
conditionA.signal();
} finally {
lock.unlock();
}
四、BlockingQueue #
4.1 常用实现 #
java
// 无界队列
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// 有界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);
// 数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100);
// 优先级队列
BlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>();
// 延迟队列
BlockingQueue<Delayed> delayedQueue = new DelayQueue<>();
// 同步队列
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
4.2 常用方法 #
| 方法 | 抛异常 | 返回值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add() | offer() | put() | offer(e, timeout) |
| 移除 | remove() | poll() | take() | poll(timeout) |
| 检查 | element() | peek() | - | - |
五、CountDownLatch #
5.1 基本使用 #
java
import java.util.concurrent.CountDownLatch;
CountDownLatch latch = new CountDownLatch(3);
// 工作线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("任务完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数减1
}
}).start();
}
// 等待所有任务完成
latch.await();
System.out.println("所有任务完成");
六、CyclicBarrier #
6.1 基本使用 #
java
import java.util.concurrent.CyclicBarrier;
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println("线程到达");
barrier.await(); // 等待其他线程
System.out.println("继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
七、Semaphore #
7.1 基本使用 #
java
import java.util.concurrent.Semaphore;
Semaphore semaphore = new Semaphore(3); // 3个许可
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("获取资源");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
八、总结 #
| 工具 | 用途 |
|---|---|
| wait/notify | 基本线程通信 |
| Condition | 灵活的条件等待 |
| BlockingQueue | 生产者消费者 |
| CountDownLatch | 一次性等待 |
| CyclicBarrier | 循环等待 |
| Semaphore | 限流 |
线程通信要点:
- 使用while检查等待条件
- BlockingQueue简化生产者消费者
- CountDownLatch用于一次性等待
- Semaphore用于限流
最后更新:2026-03-26