Java线程通信 #

一、wait/notify机制 #

1.1 基本用法 #

java
public class Message {
    private String content;
    private boolean empty = true;
    
    public synchronized String take() throws InterruptedException {
        while (empty) {
            wait();  // 等待消息
        }
        empty = true;
        notifyAll();  // 通知生产者
        return content;
    }
    
    public synchronized void put(String content) throws InterruptedException {
        while (!empty) {
            wait();  // 等待消费
        }
        empty = false;
        this.content = content;
        notifyAll();  // 通知消费者
    }
}

1.2 注意事项 #

  • 必须在synchronized块内调用
  • wait()释放锁,notify()不释放锁
  • 建议使用notifyAll()而非notify()
  • 使用while而非if检查条件

二、生产者消费者 #

2.1 使用wait/notify #

java
public class Buffer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    
    public synchronized void produce(int value) throws InterruptedException {
        while (queue.size() == capacity) {
            wait();
        }
        queue.offer(value);
        System.out.println("生产: " + value);
        notifyAll();
    }
    
    public synchronized int consume() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        int value = queue.poll();
        System.out.println("消费: " + value);
        notifyAll();
        return value;
    }
}

2.2 使用BlockingQueue #

java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

// 生产者
Thread producer = new Thread(() -> {
    try {
        for (int i = 0; i < 100; i++) {
            queue.put(i);  // 队列满时阻塞
            System.out.println("生产: " + i);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

// 消费者
Thread consumer = new Thread(() -> {
    try {
        while (true) {
            int value = queue.take();  // 队列空时阻塞
            System.out.println("消费: " + value);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

三、Condition #

3.1 基本使用 #

java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();  // 等待不满
            }
            queue.offer(value);
            notEmpty.signal();  // 通知不空
        } finally {
            lock.unlock();
        }
    }
    
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();  // 等待不空
            }
            int value = queue.poll();
            notFull.signal();  // 通知不满
            return value;
        } finally {
            lock.unlock();
        }
    }
}

3.2 多条件变量 #

java
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();

// 线程A等待
lock.lock();
try {
    conditionA.await();
} finally {
    lock.unlock();
}

// 线程B唤醒A
lock.lock();
try {
    conditionA.signal();
} finally {
    lock.unlock();
}

四、BlockingQueue #

4.1 常用实现 #

java
// 无界队列
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

// 有界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);

// 数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100);

// 优先级队列
BlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>();

// 延迟队列
BlockingQueue<Delayed> delayedQueue = new DelayQueue<>();

// 同步队列
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();

4.2 常用方法 #

方法 抛异常 返回值 阻塞 超时
插入 add() offer() put() offer(e, timeout)
移除 remove() poll() take() poll(timeout)
检查 element() peek() - -

五、CountDownLatch #

5.1 基本使用 #

java
import java.util.concurrent.CountDownLatch;

CountDownLatch latch = new CountDownLatch(3);

// 工作线程
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            Thread.sleep(1000);
            System.out.println("任务完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            latch.countDown();  // 计数减1
        }
    }).start();
}

// 等待所有任务完成
latch.await();
System.out.println("所有任务完成");

六、CyclicBarrier #

6.1 基本使用 #

java
import java.util.concurrent.CyclicBarrier;

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            System.out.println("线程到达");
            barrier.await();  // 等待其他线程
            System.out.println("继续执行");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}

七、Semaphore #

7.1 基本使用 #

java
import java.util.concurrent.Semaphore;

Semaphore semaphore = new Semaphore(3);  // 3个许可

for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();  // 获取许可
            System.out.println("获取资源");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();  // 释放许可
        }
    }).start();
}

八、总结 #

工具 用途
wait/notify 基本线程通信
Condition 灵活的条件等待
BlockingQueue 生产者消费者
CountDownLatch 一次性等待
CyclicBarrier 循环等待
Semaphore 限流

线程通信要点:

  • 使用while检查等待条件
  • BlockingQueue简化生产者消费者
  • CountDownLatch用于一次性等待
  • Semaphore用于限流
最后更新:2026-03-26