本文最后更新于:2023年6月18日 晚上
EventBus 学习
在项目中偶尔会使用到 guava 框架,这个框架归纳了一些很有用的工具类和功能方便开发,我在一次项目里就使用到了 guava 的 EventBus 功能,但是并没有继续了解过这个功能,此笔记就是对此功能进行复习。
一、从发布订阅模式/观察者模式说起
1. 观察者模式
观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,让多个观察对象同时监听某一个主题对象。当主题对象发生变化时,它的所有观察者都会收到通知并进行相应的更新操作。
在观察者模式中,有两个核心角色:Subject(主题)和 Observer(观察者)。Subject 维护着一个观察者列表,而 Observer 注册到 Subject 中,并在 Subject 状态改变时接收通知。
具体来说,观察者模式中包含以下几个要素:
- 抽象主题(Subject):定义了被观察对象的基本行为,它维护着所有观察者对象的引用,提供了增加和删除观察者对象的方法。
- 具体主题(ConcreteSubject):继承或实现了抽象主题,实现了被观察对象状态改变的业务逻辑,并通知所有注册的观察者对象。
- 抽象观察者(Observer):定义了观察者对象的基本行为,它提供了被通知时所需执行的方法。
- 具体观察者(ConcreteObserver):继承或实现了抽象观察者,实现了观察者接收到主题通知后需要执行的业务逻辑。
观察者模式的优点在于当主题对象的状态发生改变时,所有依赖它的观察者对象都会得到通知并自动更新。这样就避免了对象之间耦合度过高的情况,同时也提高了系统的可维护性和扩展性。
下面是一个简单的观察者模式实现,先定义一个观察者接口,可以有不同的具体观察者实现:
1 2 3
| public interface Observer { public void update(String message); }
|
定义 Subject 接口,并且定义注册观察者,移除观察者,通知所有观察者的方法:
1 2 3 4 5
| public interface Subject { public void registerObserver(Observer observer); public void removeObserver(Observer observer); public void notifyObservers(String message); }
|
实现一个 Subject 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<>();
@Override public void registerObserver(Observer observer) { observers.add(observer); }
@Override public void removeObserver(Observer observer) { observers.remove(observer); }
@Override public void notifyObservers(String message) { for (Observer observer : observers) { observer.update(message); } } }
|
实现一个观察者:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class ConcreteObserver implements Observer {
private String name;
public ConcreteObserver(String name) { this.name = name; }
@Override public void update(String message) { System.out.println(name + " received message: " + message); } }
|
进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Main { public static void main(String[] args) { ConcreteSubject subject = new ConcreteSubject(); ConcreteObserver observer1 = new ConcreteObserver("Alice"); ConcreteObserver observer2 = new ConcreteObserver("Bob");
subject.registerObserver(observer1); subject.registerObserver(observer2);
subject.notifyObservers("Hello world!");
subject.removeObserver(observer1);
subject.notifyObservers("Goodbye!"); } }
|
2. 发布-订阅模式
发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,其中发送者(被称为发布者)不会直接将消息发送给接收者(被称为订阅者),而是将消息发送到一个中间代理,称为消息代理(Message Broker),订阅者则从该代理订阅自己关心的消息。
在这种模式下,发布者和订阅者不需要知道彼此的存在,它们只需要通过消息代理进行通信。发布者将消息发送到消息代理,订阅者从消息代理订阅它们感兴趣的消息。当消息代理接收到消息时,它将根据订阅者的订阅信息将消息传递给相应的订阅者。这种方式可以实现高度解耦,同时也方便扩展和维护。
发布订阅模式是一种常用的设计模式,在分布式系统、事件驱动系统、消息队列系统等领域都有广泛的应用。
下面是一个简单的例子,定义一个订阅者接口,负责发布通知:
1 2 3
| interface Subscriber { <T> void post(T message); }
|
然后实现一个发布者,因为不同的订阅者会订阅不同的消息(topic),所以使用 map 进行存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class Publisher {
private Map<String, List<Subscriber>> subscribers = new HashMap<>();
public void subscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList == null) { subscriberList = new ArrayList<>(); subscribers.put(topic, subscriberList); } subscriberList.add(subscriber); }
public void unsubscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { subscriberList.remove(subscriber); } }
public <T> void post(String topic, T message) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { for (Subscriber subscriber : subscriberList) { subscriber.post(message); } } } }
|
创建两个实现订阅者接口的具体类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class WeatherSubscriber implements Subscriber {
@Override public <T> void post(T message) { System.out.println("receive message:" + message); } }
class NewsSubscriber implements Subscriber {
@Override public <T> void post(T message) { System.out.println("receive message:" + message); } }
|
写一个简单的测试方法查看效果:
1 2 3 4 5 6 7 8 9 10 11
| @Test public void test() { Publisher publisher = new Publisher(); publisher.subscribe("weather", new WeatherSubscriber()); publisher.subscribe("news", new WeatherSubscriber()); publisher.post("news", "send news"); publisher.post("weather", "send weather"); }
|
二、EventBus
的使用
下面是 EventBus
的简单使用:
一个监听器可以定义多个监听方法(事件),同一个事件可以在一个监听器内定义多次(可以但是没意义),监听器方法只能有一个参数,不同的监听器的监听方法可以监听同一种事件(也就是参数是同一种类型的)。
使用 EventBus
主要使用到三个方法:
register(Object)
:注册一个监听器
unRegister(Object)
:删除已注册的监听器
post(Object)
:发布事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| class News { private String message;
public News(String message) { this.message = message; }
@Override public String toString() { return "News{" + "message='" + message + '\'' + '}'; } }
class Weather { private String message;
public Weather(String message) { this.message = message; }
@Override public String toString() { return "Weather{" + "message='" + message + '\'' + '}'; } }
class MultiListener { @Subscribe public void listenNews(News news) { System.out.println("receive news: " + news); }
@Subscribe public void listenWeather(Weather weather) { System.out.println("receive weather: " + weather); } }
class TypeListener { @Subscribe public void listenString(String message) { System.out.println("receive String message: " + message); }
@Subscribe public void listenNumber(Integer number) { System.out.println("receive number: " + number); } }
|
创建测试方法并且查看结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void test() { EventBus eventBus = new EventBus(); eventBus.register(new MultiListener()); eventBus.register(new TypeListener()); eventBus.post(new News("today no news")); eventBus.post(new Weather("today weather is good")); eventBus.post("hello"); eventBus.post(1); }
|
三、深入 EventBus
1、EventBus
组成
Event
:事件,可以理解成 kafka
订阅的 topic
,在 guava
中则有两种事件类型
- 由任意非基础类型定义的对象类型
DeadEvent
,没有任何订阅者订阅的异常事件
EventBus
:负责将事件分发给注册了相关事件处理方法的订阅者的对象,包含了注册、注销、发送事件等功能。主要有两种实现类
EventBus
:默认的事件总线
AsyncEventBus
:异步事件总线,提供异步分发事件的能力
SubscriberRegistry
:订阅注册中心,发布订阅机制就是中间有一个注册中心负责事件的分发,而 SubscriberRegistry
就是干这个事情的,每个 EventBus
都会绑定一个注册中心
EventBus
创建时会创建一个注册中心,并且把自身注入到注册中心中,通过调用 EventBus
的 register
方法,实则是调用注册中心的 register
方法把要监听器注册到注册中心,并且转换成一个事件与订阅类型的 Map
映射。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class EventBus { private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
public void register(Object object) { subscribers.register(object); } }
final class SubscriberRegistry {
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } } }
|
Subscriber
:订阅者,指对事件感兴趣并注册了事件处理方法的对象,通常是POJO类。订阅者需要提供一个或多个处理该事件的方法(即事件处理方法),主要有两种实现类。
Subscriber
:默认的订阅器实现类
SynchronizedSubscriber
:同步订阅器,让事件处理同步串行执行
通过前面的注册中心,注册中心会把对应的监听器包括监听的事件封装成订阅器,所以最终事件被处理是被封装成订阅器来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class Subscriber { @Weak private EventBus bus; @VisibleForTesting final Object target; private final Method method; private final Executor executor; final void dispatchEvent(Object event) { executor.execute( () -> { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } }); } }
|
2、线程安全
在 EventBus 中提供了一个 @AllowConcurrentEvents
注解,这个注解标明监听器的监听方法是否线程安全,此方法需要配合 @Subscribe
一起用。
- 如果被
@Subscribe
标注的监听方法没有使用 @AllowConcurrentEvents
注解,那么默认使用的是 SynchronizedSubscriber
,在多线程的情况下同一时间下 EventBus
多次发布同一事件时,会进行加锁操作
- 如果标注了此注解,则说明执行时可以并发执行(线程安全),内部不会进行加锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class Subscriber{ static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
private static boolean isDeclaredThreadSafe(Method method) { return method.getAnnotation(AllowConcurrentEvents.class) != null; }
static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); }
@Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } } } }
|
3、父子事件
当被监听的事件(监听器方法参数)之间存在父子关系时,发布子事件,父事件也会被触发,请看下面的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new TypeListener()); eventBus.post(111); }
static class TypeListener {
@Subscribe private void readObject(Object object) { System.out.println("receive object message: " + object); }
@Subscribe private void readInteger(Integer number) { System.out.println("receive integer message: " + number); } }
|
三、最后
使用 EventBus
可以在项目中对逻辑进行解耦,例如很多地方登录注册,日志警告,可能都需要发送右键,如果每个地方都需要重写一份发送邮件的逻辑,或者是后面需要从发送邮件改成发送手机短信,那么所有地方都需要进行重写。使用 EventBus
的好处就是可以把这些逻辑进行事件驱动化,并且以事件的形式,支持逻辑的扩展。
当然 EventBus
也有一些不足之处,例如,事件,消息都存储与本地的内存中,如果服务宕机,那么消息将会丢失,如果是相对复杂的逻辑,或是消息需要持久化,那么建议还是使用诸如 Kafka
或 RabbitMQ
等更合适。