跳到主要内容

观察者模式

定义

观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子。

现在常说的基于事件驱动的架构,其实也是观察者模式的一种最佳实践。当我们观察某一个对象时,对象传递出的每一个行为都被看成是一个事件,观察者通过处理每一个事件来完成自身的操作处理。

生活中也有许多观察者模式的应用,比如,红绿灯与汽车的关系,‘红灯停、绿灯行’,在这个过程中,红绿灯是被观察者,汽车是观察者,等等。

观察者模式(observer pattern),在对象之间定义一个一对多的依赖,这样当一个对象改变状态时,它的所有依赖项都会自动得到通知和更新

观察者模式的别名有 发布-订阅(Publish/Subscribe)模式、模型-视图(Model-View)模式、源-监听(Source-Listener)模式等。

提示

观察者模式是它是用于建立一种对象与对象之间的依赖关系,一个对象发生改变时将自动通知其它对象,其它对象将相应的做出反应。

发生改变的对象称为观察目标(被观察者依赖),而被通知的对象称为观察者,一个观察目标可以应对多个观察者,而这些观察者之间可以没有任何相互联系,可以根据需要增加和删除观察者,使得系统易于扩展。

根据业务场景的不用,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。

模式原理

类图

下面是观察者模式的经典实现

定义抽象被观察者

/**
* 抽象观察对象
*/
public interface Subject {

void attach(Observer observer);

void detach(Observer observer);

void notifyObservers(String message);
}

定义抽象观察者

/**
* 抽象的观察者
*
* @author 21129
*/
public interface Observer {

// update 方法:不同的观察者更新行为定义为一个相同的接口,不同的观察者对该接口可以有不同的实现。
public void update(String message);

}

定义具体被观察者

package com.e6yun.project.behavior.observer.example01;

import java.util.ArrayList;
import java.util.List;

/**
* 具体主题:被观察者
*/
public class ConcreteSubject implements Subject {

// 定义集合,存储所有的观察者对象
private List<Observer> observers = new ArrayList<>();
// 注册方法,向观察者集合增加一个观察者
@Override
public void attach(Observer observer) {
observers.add(observer);

}
// 注销方法:用于从观察者集合中移除一个观察者
@Override
public void detach(Observer observer) {
observers.remove(observer);
}

@Override
public void notifyObservers(String message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}

定义多个观察者

package com.e6yun.project.behavior.observer.example01;

public class ConcreteObserver1 implements Observer{

@Override
public void update(String message) {
System.out.println("ConcreteObserver1 得到通知,更新状态: " + message);
}
}



package com.e6yun.project.behavior.observer.example01;

public class ConcreteObserver2 implements Observer{

@Override
public void update(String message) {
System.out.println("ConcreteObserver2 得到通知,更新状态: " + message);
}
}

测试

public class Client {

public static void main(String[] args) {
// 创建被观察者
Subject subject = new ConcreteSubject();

//注册观察者,注册多个
subject.attach(new ConcreteObserver1());
subject.attach(new ConcreteObserver2());

// 具体的主题内部发生改变,给所有注册的观察者发送通知
subject.notifyObservers("状态已更新");
}
}

买房摇号功能示例

使用观察者模式,实现一个买房摇号的功能,摇号结束,需要通过短信告知用户摇号结果,还需要向 MQ 中保存用户本次摇号的信息。

未使用设计模式

/**
* 模拟买房摇号服务
*/
public class DrawHouseService {

// 摇号抽签
public String lots(String uid) {
if (uid.hashCode() % 2 == 0) {
return "恭喜ID为:" + uid + "的用户,在本次摇号中中签!";
} else {
return "很遗憾,ID为:" + uid + "的用户,您本次未中签!";
}
}
}

/**
* 开奖服务接口
*/
public interface LotteryService {

// 开奖之后的业务操作
LotteryResult lottery(String uid) ;

}
import java.util.Date;

public class LotteryServiceImpl implements LotteryService {

// 注入摇号服务
private DrawHouseService drawHouseService = new DrawHouseService();

@Override
public LotteryResult lottery(String uid) {
//1. 摇号操作
String result = drawHouseService.lots(uid);
//2. 发送短息
System.out.println("发送短信通知用户,ID为:" + uid + "你的摇号结果如下:" + result);

//3.记录用户摇号结果到 MQ
System.out.println("记录用户摇号结果到 MQ,ID 为:" + uid + ",摇号结果:" + result);

return new LotteryResult(uid,result,new Date());
}
}

虽然现在 lottery 做了三件事情,摇号、发送短信和发放体验金,违反了单一职责原则,但是,如果没有扩展和修改的需求,现在的代码是可以接受的。 如果非要使用观察者模式,就需要引入更多的类和更加复杂的代码结构,反倒是一种过度设计。

相反,如果需求频繁变动,比如,摇号结束后,不再发送短信,而是改为发送邮件,站内信。这种情况下,就需要频繁修改 lottery 方法内的代码,违反开闭原则。 执行的代码越多, 就越影响代码的可读性和可维护性。

这个时候,观察者模式就派上用场了。

使用设计模式进行优化

上面的摇号业务中,摇号、发短信、发 MQ 消息是一个顺序调用的过程,但是除了摇号这个核心功能以外,发短信与记录信息到 MQ 的操作都不是主链路的功能,需要单独抽取出来,这样才能保证在后面的开发过程中保证代码的可扩展性和可维护性。

import java.util.Date;

/**
* 摇号结果实体类
*/
public class LotteryResult {
private String uId;
private String msg;
private Date date;

public LotteryResult(String uId, String msg, Date date) {
this.uId = uId;
this.msg = msg;
this.date = date;
}

public String getuId() { return uId; }
public String getMsg() { return msg; }
public Date getDate() { return date; }
}

/**
* 事件监听接口
*/
public interface EventListener {
void doEvent(LotteryResult result);
}

/**
* 短信通知事件监听器
*/
public class MessageEventListener implements EventListener {
@Override
public void doEvent(LotteryResult result) {
try {
System.out.println("发送短信通知,用户ID:" + result.getuId() +
",您的摇号结果为:" + result.getMsg());
// 实际业务中这里会调用短信服务
} catch (Exception e) {
System.err.println("短信发送失败: " + e.getMessage());
}
}
}

/**
* MQ 消息发送事件监听器
*/
public class MQEventListener implements EventListener {
@Override
public void doEvent(LotteryResult result) {
try {
System.out.println("记录用户的摇号结果(MQ),用户ID:" + result.getuId() +
",摇号结果:" + result.getMsg());
// 实际业务中这里会发送MQ消息
} catch (Exception e) {
System.err.println("MQ消息发送失败: " + e.getMessage());
}
}
}
import java.util.*;
import java.util.concurrent.*;

/**
* 事件处理类,订阅、取消订阅、通知
*/
public class EventManager {
public enum EventType {
MQ, MESSAGE
}

// 监听器集合,使用线程安全的Map
private final Map<EventType, List<EventListener>> listeners = new ConcurrentHashMap<>();

// 异步执行器,避免阻塞主线程
private final ExecutorService executor = Executors.newFixedThreadPool(4);

public EventManager(EventType... operations) {
for (EventType operation : operations) {
this.listeners.put(operation, new CopyOnWriteArrayList<>());
}
}

public void subscribe(EventType eventType, EventListener eventListener) {
if (eventType == null || eventListener == null) {
throw new IllegalArgumentException("EventType and EventListener cannot be null");
}
listeners.get(eventType).add(eventListener);
}

public void unsubscribe(EventType eventType, EventListener eventListener) {
if (eventType != null && eventListener != null) {
listeners.get(eventType).remove(eventListener);
}
}

// 同步通知
public void notify(EventType eventType, LotteryResult result) {
List<EventListener> eventListeners = listeners.get(eventType);
if (eventListeners != null) {
for (EventListener eventListener : eventListeners) {
try {
eventListener.doEvent(result);
} catch (Exception e) {
System.err.println("事件处理异常: " + e.getMessage());
}
}
}
}

// 异步通知,避免阻塞主线程
public void notifyAsync(EventType eventType, LotteryResult result) {
List<EventListener> eventListeners = listeners.get(eventType);
if (eventListeners != null) {
for (EventListener eventListener : eventListeners) {
executor.submit(() -> {
try {
eventListener.doEvent(result);
} catch (Exception e) {
System.err.println("异步事件处理异常: " + e.getMessage());
}
});
}
}
}

// 获取所有已注册的事件类型
public Set<EventType> getRegisteredEventTypes() {
return new HashSet<>(listeners.keySet());
}

// 检查某个事件类型是否有监听器
public boolean hasListeners(EventType eventType) {
List<EventListener> eventListeners = listeners.get(eventType);
return eventListeners != null && !eventListeners.isEmpty();
}

// 关闭线程池
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
/**
* 开奖服务抽象类
*/
public abstract class LotteryService {
protected final EventManager eventManager;

public LotteryService() {
// 设置事件类型
eventManager = new EventManager(EventType.MQ, EventType.MESSAGE);

// 订阅事件监听器
eventManager.subscribe(EventType.MQ, new MQEventListener());
eventManager.subscribe(EventType.MESSAGE, new MessageEventListener());
}

public abstract LotteryResult lottery(String uid);

// 同步处理摇号结果通知 - 自动通知所有观察者
public LotteryResult lotteryAndMsg(String uid) {
LotteryResult lottery = lottery(uid);
// 自动通知所有注册的事件类型,无需手动添加
notifyAllObservers(lottery);
return lottery;
}

// 异步处理摇号结果通知,提高性能 - 自动通知所有观察者
public LotteryResult lotteryAndMsgAsync(String uid) {
LotteryResult lottery = lottery(uid);
// 自动异步通知所有注册的事件类型,无需手动添加
notifyAllObserversAsync(lottery);
return lottery;
}

// 自动通知所有注册的观察者
private void notifyAllObservers(LotteryResult result) {
// 遍历所有已注册的事件类型,而不是硬编码枚举值
for (EventType eventType : eventManager.getRegisteredEventTypes()) {
if (eventManager.hasListeners(eventType)) {
eventManager.notify(eventType, result);
}
}
}

// 自动异步通知所有注册的观察者
private void notifyAllObserversAsync(LotteryResult result) {
// 遍历所有已注册的事件类型,而不是硬编码枚举值
for (EventType eventType : eventManager.getRegisteredEventTypes()) {
if (eventManager.hasListeners(eventType)) {
eventManager.notifyAsync(eventType, result);
}
}
}

// 动态添加新的观察者
public void addObserver(EventType eventType, EventListener listener) {
eventManager.subscribe(eventType, listener);
}

// 动态移除观察者
public void removeObserver(EventType eventType, EventListener listener) {
eventManager.unsubscribe(eventType, listener);
}

// 关闭资源
public void shutdown() {
eventManager.shutdown();
}
}

/**
* 摇号服务实现类
*/
public class LotteryServiceImpl extends LotteryService {
private final DrawHouseService drawHouseService;

public LotteryServiceImpl(DrawHouseService drawHouseService) {
super();
this.drawHouseService = drawHouseService;
}

@Override
public LotteryResult lottery(String uid) {
if (uid == null || uid.trim().isEmpty()) {
throw new IllegalArgumentException("用户ID不能为空");
}

// 摇号操作
String result = drawHouseService.lots(uid);
return new LotteryResult(uid, result, new Date());
}
}

/**
* 客户端测试 - 展示动态添加观察者的能力
*/
public class LotteryClient {
public static void main(String[] args) {
DrawHouseService drawHouseService = new DrawHouseService();
LotteryService lotteryService = new LotteryServiceImpl(drawHouseService);

try {
// 动态添加新的观察者(比如邮件通知)
lotteryService.addObserver(EventType.MESSAGE, new EmailEventListener());

// 同步处理 - 会自动通知所有观察者(包括新添加的邮件观察者)
LotteryResult result1 = lotteryService.lotteryAndMsg("user123");
System.out.println("同步处理完成: " + result1.getMsg());

// 异步处理 - 也会自动通知所有观察者
LotteryResult result2 = lotteryService.lotteryAndMsgAsync("user456");
System.out.println("异步处理完成: " + result2.getMsg());

// 等待异步任务完成
Thread.sleep(1000);
} catch (Exception e) {
System.err.println("摇号处理异常: " + e.getMessage());
} finally {
// 关闭资源
lotteryService.shutdown();
}
}
}

/**
* 新增的邮件通知观察者示例
*/
class EmailEventListener implements EventListener {
@Override
public void doEvent(LotteryResult result) {
try {
System.out.println("发送邮件通知,用户ID:" + result.getuId() +
",您的摇号结果为:" + result.getMsg());
// 实际业务中这里会调用邮件服务
} catch (Exception e) {
System.err.println("邮件发送失败: " + e.getMessage());
}
}
}


1. **自动发现观察者**:`notifyAllObservers()` 方法会自动遍历所有已注册的事件类型,无需手动添加通知代码

2. **动态注册机制**:通过 `addObserver()` 和 `removeObserver()` 方法,可以在运行时动态添加/移除观察者

3. **开闭原则**:新增观察者时,只需要:
- 实现 `EventListener` 接口
- 调用 `addObserver()` 注册
- **无需修改** `lotteryAndMsg()` 或 `lotteryAndMsgAsync()` 方法

#### 📈 **使用示例**

```java
// 新增观察者时,只需要这样:
lotteryService.addObserver(EventType.MESSAGE, new EmailEventListener());
lotteryService.addObserver(EventType.MESSAGE, new PushNotificationListener());

// 核心业务方法无需修改,会自动通知所有观察者
LotteryResult result = lotteryService.lotteryAndMsg("user123");

优势

  • 符合开闭原则:对扩展开放,对修改封闭
  • 降低耦合度:被观察者不需要知道具体的观察者实现
  • 提高可维护性:新增观察者不影响核心业务逻辑
  • 支持运行时配置:可以根据配置动态启用/禁用观察者

这样实现后,观察者模式真正发挥了其解耦和扩展的优势!

基于不同应用场景的不同实现方式

在不同的应用场景和需求下, 观察者模式有不同的实现方式,有同步阻塞的方式、也有异步阻塞的方式;有进程内的方式,也有跨进程的方式。

上面的实现方式是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直被阻塞,直到所有的观察者代码都执行完之后才执行后续的代码。

如果摇号接口是一个调用比较频繁的接口,对性能比较敏感,那么可以将同步阻塞的方式改为异步阻塞的实现方式(启动一个新的线程来执行观察者代码),以此来减少响应时间。

上面都是进程内的实现方式,如果摇号结束后,需要发送信息给其它系统,这时候就需要用到跨进程的实现方式了。

如果其它系统提供了 PRC 接口,仍然可以沿用之前的实现思路,在观察者中调用PRC 接口,将结果发送给其它系统。还可以基于消息队列(Message Queue,比如 RocketMQ 等)来实现。

这样还是有弊端,比如需要引入一个新的系统(消息队列),增加了维护成本,但是这样被观察者和观察者之间的解耦更加彻底,被观察者完全不知道观察者的存在,观察者也完全不知道被观察者的存在。

实现一个异步非阻塞的 EventBus 框架

在不同应用场景下,观察者模式有 同步阻塞、异步非阻塞、进程内、进程间的实现方式。

同步阻塞是最经典的实现方式,主要是为了代码解耦;异步非阻塞除了能实现代码解耦之外,还能提高代码的执行效率;进程间的观察者模式解耦更加彻底,一般是基于消息队列来实现,用来实现不同进程间的被观察者和观察者之间的交互。

简易实现

有两种实现方式,其中一种是,在每个 update 方法中创建一个新的线程执行代码逻辑;另一种是:在 ConcreteSubject 的 notifyObservers() 方法中使用线程池来执行每个观察者的 update 方法。

对于第一种实现方式,频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。第二种实现方式,尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 ConcreteSubject 类中,这增加了代码的复杂度。

如果需要更加极端一点,需要在同步阻塞和异步非阻塞之间灵活切换,那就要不停地修改 被观察者 中的代码。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那么这样的代码实现也无法做到复用。

框架的作用有:隐藏实现细节、降低开发难度、做到代码复用,解耦业务和非业务代码,让程序员聚焦于业务开发。针对异步非阻塞观察者模式,我们也可以将它抽象成框架来达到这样的效果,而这个框架就可以是 EventBus。

需求整理

EventBus 翻译为 事件总线,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。其中,Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同事也支持同步阻塞模式。

event bus 使用示例
package com.wkq.project.designpattern02.observer.controller;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.wkq.project.designpattern02.observer.observers.Observer;
import com.wkq.project.designpattern02.observer.service.UserService;
import jakarta.annotation.Resource;
import java.util.List;
import java.util.concurrent.Executors;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author 21129
*/
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private UserService userService;
private EventBus eventBus;
private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;

public UserController(List<Observer> observers) {
//eventBus = new EventBus(); // 同步阻塞模式
this.eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE));
for (Object observer : observers) {
eventBus.register(observer);
}
}

@RequestMapping("/register")
public String register(String username,String password) {
Long userId = userService.register(username,password);
eventBus.post(userId);
return "success";
}
}
package com.wkq.project.designpattern02.observer.observers;

public interface Observer {
void handleRegSuccess(Long userId);
}
package com.wkq.project.designpattern02.observer.observers;

import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RegNotificationObserver implements Observer{

@Subscribe
@Override
public void handleRegSuccess(Long userId) {
log.info("发送注册成功通知给:{}", userId);
}

}
package com.wkq.project.designpattern02.observer.observers;

import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author 21129
*/
@Slf4j
@Component
public class RegPromotionObserver implements Observer{
@Subscribe
@Override
public void handleRegSuccess(Long userId) {
// 模拟发送邮件
log.info("用户注册成功,发送邮件,用户id:{}", userId);
}
}
package com.wkq.project.designpattern02.observer.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class UserService {

public Long register(String username, String password) {
log.info("用户注册成功,用户名:{},密码:{}", username, password);
return 1L;
}
}

利用 EventBus 框架实现的观察者模式,和从零开始编写的观察者模式相比,实现思路大致一样,都需要定义 Observer,并且通过 register() 函数注册 Observer,也都需要通过某个函数(比如 post() )将 Observer 发布出去,被 Observer 接收到消息后,会执行 Observer 的 handleRegSuccess() 方法。

但在实现细节方面,又有些区别,基于 EventBus ,不需要定义 Observer 接口,任何类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中那个函数可以接受被观察者发送的消息。

EventBus、AsyncEventBus

Guava EventBus 对外暴露的所有可调用接口,都封装在 EventBus 中。其中, EventBus 实现了同步阻塞的观察者模式,AsyncEventBus 继承自 EventBus,提供了异步非阻塞的观察者模式。

register()

EventBus 类提供了 register() 函数用来注册观察者。具体的函数定义如下所示。它可以接受任何类型(Object) 的观察者。而在经典的观察者模式的视线中,register() 必须接受实现了同一 Observer 接口的类对象。

public void register(Object object);

unregister()

EventBus 类提供了 unregister() 函数用来删除观察者。

public void unregister(Object object);

post()

EventBus 类提供了 post() 函数用来给观察者发送消息。

public void post(Object event);

跟经典的观察者模式不同之处在于,当调用 post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者,所谓的可匹配指的是,能接受的消息类型是发送消息(post 方法定义中的 event) 类型的父类。

比如,AObserver 能接受的消息类型是 XMsg,BObserver 能接受消息类型是 YMsg,CObserver 能接受消息类型是 ZMsg。其中, XMsg 是 YMsg 的父类。

XMsg xMsg  = new XMsg();
YMsg yMsg = new YMsg();
ZMsg zMsg = new ZMsg();
post(xMsg); // AObserver 接收到消息
post(yMsg); // AObserver 和 BObserver 接收到消息
post(zMsg); // CObserver 接收到消息

@Subscribe 注解

EventBus 通过 @Subscribe 注解来表明,某个函数能接受那种类型的消息。

public DObserver {
//...省略其他属性和方法...

@Subscribe
public void f1(PMsg event) { //... }

@Subscribe
public void f2(QMsg event) { //... }
}

当通过 register 方法将 DObserver 注册到 EventBus 的时候,EventBus 会根据 @Subscribe 注解找到 f1 和 f2,并且将这两个函数能接受的消息类型记录下来(PMsg->f1,QMsg->f2)。

当通过 post 函数发送消息的时候,EventBus 会根据消息的类型,找到对应的函数,并执行该函数。

比如 post(PMsg),那么 EventBus 会调用 AObserver 的 f1() 方法。

实现 EventBus 框架

重点是 EventBus 中两个核心函数 register 和 post 的实现原理,搞懂它们,也就搞懂了整个 EventBus 框架。

从图中可以看出,最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接受消息函数的对应关系。当调用 register 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。当调用 post() 函数发送消息的时候, EventBus 通过注册表找到相应的可接受消息的函数,然后通过 Java 的反射语法来动态地创建对象、执行函数。

对于同步阻塞模式,EventBus 在一个线程内一次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。

整个小框架包含 5 个类,EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry 。

总结

设计模式要干的事情就是解耦、创建型模式时将创建和使用代码解耦,结构型模式是将不同功能代码解耦,行为型模式是将不同行为代码解耦,具体到观察者模式就是将观察者和被观察者解耦满足开闭原则、高内聚松耦合等特性,提高代码的可扩展性。

附录

生产者-消费者模式 vs 观察者模式

基本概念对比

特性观察者模式生产者-消费者模式
核心思想一对多的依赖关系,状态变化时自动通知通过队列解耦生产者和消费者
参与者被观察者(Subject) + 观察者(Observer)生产者(Producer) + 队列(Queue) + 消费者(Consumer)
通信方式直接调用或事件驱动通过中间队列异步通信
耦合度松耦合(通过接口)完全解耦(通过队列)

架构对比

观察者模式架构
被观察者 ──直接调用──> 观察者A

└──直接调用──> 观察者B

└──直接调用──> 观察者C
生产者-消费者模式架构
生产者 ──消息──> [队列] ──消息──> 消费者A

└──消息──> 消费者B

└──消息──> 消费者C

代码实现对比

观察者模式实现
// 被观察者
public class Subject {
private List<Observer> observers = new ArrayList<>();

public void notifyObservers(String data) {
for (Observer observer : observers) {
observer.update(data); // 直接调用
}
}
}

// 观察者
public interface Observer {
void update(String data);
}
生产者-消费者模式实现
// 生产者
public class Producer {
private BlockingQueue<String> queue;

public void produce(String data) {
queue.put(data); // 放入队列
}
}

// 消费者
public class Consumer implements Runnable {
private BlockingQueue<String> queue;

public void run() {
while (true) {
String data = queue.take(); // 从队列取出
process(data);
}
}
}

主要区别

1. 通信机制
  • 观察者模式:同步或异步直接调用
  • 生产者-消费者模式:通过队列异步通信
2. 解耦程度
  • 观察者模式:松耦合,但仍有依赖关系
  • 生产者-消费者模式:完全解耦,互不感知
3. 性能特点
  • 观察者模式:实时性高,但可能阻塞
  • 生产者-消费者模式:异步处理,吞吐量高
4. 适用场景
  • 观察者模式:事件驱动、状态变化通知
  • 生产者-消费者模式:任务处理、数据流处理

联系与转换

1. 观察者模式 → 生产者-消费者模式
// 观察者模式可以演化为生产者-消费者模式
public class EventQueue {
private BlockingQueue<Event> queue = new LinkedBlockingQueue<>();

public void publish(Event event) {
queue.offer(event); // 生产者
}

public void startConsuming() {
new Thread(() -> {
while (true) {
Event event = queue.take(); // 消费者
notifyObservers(event);
}
}).start();
}
}
2. 生产者-消费者模式 → 观察者模式
// 消费者可以作为观察者
public class Consumer implements Observer {
private BlockingQueue<String> queue;

public void update(String data) {
queue.put(data); // 将观察者模式转换为队列处理
}
}

实际应用场景对比

观察者模式应用
// 用户注册事件
public class UserRegistrationEvent {
private String userId;
private String email;
}

// 观察者处理
public class EmailObserver implements Observer {
public void update(UserRegistrationEvent event) {
sendWelcomeEmail(event.getEmail());
}
}

public class LogObserver implements Observer {
public void update(UserRegistrationEvent event) {
logUserRegistration(event.getUserId());
}
}
生产者-消费者模式应用
// 订单处理
public class OrderProducer {
private BlockingQueue<Order> orderQueue;

public void createOrder(Order order) {
orderQueue.put(order); // 生产订单
}
}

public class OrderConsumer implements Runnable {
private BlockingQueue<Order> orderQueue;

public void run() {
while (true) {
Order order = orderQueue.take(); // 消费订单
processOrder(order);
}
}
}

选择建议

选择观察者模式当:
  • 需要实时响应状态变化
  • 观察者数量相对较少
  • 对性能要求不是特别高
  • 需要保持事件处理的顺序
选择生产者-消费者模式当:
  • 需要处理大量数据或任务
  • 生产者和消费者处理速度不匹配
  • 需要削峰填谷
  • 需要完全解耦的系统架构

总结

观察者模式和生产者-消费者模式都是解决组件间通信的重要模式,但各有侧重:

  • 观察者模式:更适合事件驱动的场景,强调实时性和响应性
  • 生产者-消费者模式:更适合数据处理场景,强调吞吐量和解耦性

在实际项目中,两种模式可以结合使用,观察者模式处理实时事件,生产者-消费者模式处理批量任务,形成完整的事件处理架构。