EventBus 学习

本文最后更新于:2023年6月18日 晚上

EventBus 学习

在项目中偶尔会使用到 guava 框架,这个框架归纳了一些很有用的工具类和功能方便开发,我在一次项目里就使用到了 guava 的 EventBus 功能,但是并没有继续了解过这个功能,此笔记就是对此功能进行复习。

一、从发布订阅模式/观察者模式说起

1. 观察者模式

观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,让多个观察对象同时监听某一个主题对象。当主题对象发生变化时,它的所有观察者都会收到通知并进行相应的更新操作。

在观察者模式中,有两个核心角色:Subject(主题)和 Observer(观察者)。Subject 维护着一个观察者列表,而 Observer 注册到 Subject 中,并在 Subject 状态改变时接收通知。

具体来说,观察者模式中包含以下几个要素:

  • 抽象主题(Subject):定义了被观察对象的基本行为,它维护着所有观察者对象的引用,提供了增加和删除观察者对象的方法。
  • 具体主题(ConcreteSubject):继承或实现了抽象主题,实现了被观察对象状态改变的业务逻辑,并通知所有注册的观察者对象。
  • 抽象观察者(Observer):定义了观察者对象的基本行为,它提供了被通知时所需执行的方法。
  • 具体观察者(ConcreteObserver):继承或实现了抽象观察者,实现了观察者接收到主题通知后需要执行的业务逻辑。

观察者模式的优点在于当主题对象的状态发生改变时,所有依赖它的观察者对象都会得到通知并自动更新。这样就避免了对象之间耦合度过高的情况,同时也提高了系统的可维护性和扩展性。

下面是一个简单的观察者模式实现,先定义一个观察者接口,可以有不同的具体观察者实现:

1
2
3
public interface Observer {
public void update(String message);
}

定义 Subject 接口,并且定义注册观察者,移除观察者,通知所有观察者的方法:

1
2
3
4
5
public interface Subject {
public void registerObserver(Observer observer);
public void removeObserver(Observer observer);
public void notifyObservers(String message);
}

实现一个 Subject 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcreteSubject implements Subject {

private List<Observer> observers = new ArrayList<>();

@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}

@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}

@Override
public void notifyObservers(String message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}

实现一个观察者:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ConcreteObserver implements Observer {

private String name;

public ConcreteObserver(String name) {
this.name = name;
}

@Override
public void update(String message) {
System.out.println(name + " received message: " + message);
}
}

进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject();
ConcreteObserver observer1 = new ConcreteObserver("Alice");
ConcreteObserver observer2 = new ConcreteObserver("Bob");

subject.registerObserver(observer1);
subject.registerObserver(observer2);

subject.notifyObservers("Hello world!");

subject.removeObserver(observer1);

subject.notifyObservers("Goodbye!");
}
}

2. 发布-订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,其中发送者(被称为发布者)不会直接将消息发送给接收者(被称为订阅者),而是将消息发送到一个中间代理,称为消息代理(Message Broker),订阅者则从该代理订阅自己关心的消息。

在这种模式下,发布者和订阅者不需要知道彼此的存在,它们只需要通过消息代理进行通信。发布者将消息发送到消息代理,订阅者从消息代理订阅它们感兴趣的消息。当消息代理接收到消息时,它将根据订阅者的订阅信息将消息传递给相应的订阅者。这种方式可以实现高度解耦,同时也方便扩展和维护。

发布订阅模式是一种常用的设计模式,在分布式系统、事件驱动系统、消息队列系统等领域都有广泛的应用。

下面是一个简单的例子,定义一个订阅者接口,负责发布通知:

1
2
3
interface Subscriber {
<T> void post(T message);
}

然后实现一个发布者,因为不同的订阅者会订阅不同的消息(topic),所以使用 map 进行存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Publisher {

private Map<String, List<Subscriber>> subscribers = new HashMap<>();

public void subscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList == null) {
subscriberList = new ArrayList<>();
subscribers.put(topic, subscriberList);
}
subscriberList.add(subscriber);
}

public void unsubscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
subscriberList.remove(subscriber);
}
}

public <T> void post(String topic, T message) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
for (Subscriber subscriber : subscriberList) {
subscriber.post(message);
}
}
}
}

创建两个实现订阅者接口的具体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class WeatherSubscriber implements Subscriber {

@Override
public <T> void post(T message) {
System.out.println("receive message:" + message);
}
}

class NewsSubscriber implements Subscriber {

@Override
public <T> void post(T message) {
System.out.println("receive message:" + message);
}
}

写一个简单的测试方法查看效果:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test() {
Publisher publisher = new Publisher();
publisher.subscribe("weather", new WeatherSubscriber());
publisher.subscribe("news", new WeatherSubscriber());
publisher.post("news", "send news");
publisher.post("weather", "send weather");
}
// result
// receive message:send news
// receive message:send weather

二、EventBus 的使用

下面是 EventBus 的简单使用:

一个监听器可以定义多个监听方法(事件),同一个事件可以在一个监听器内定义多次(可以但是没意义),监听器方法只能有一个参数,不同的监听器的监听方法可以监听同一种事件(也就是参数是同一种类型的)。

使用 EventBus 主要使用到三个方法:

  • register(Object):注册一个监听器
  • unRegister(Object):删除已注册的监听器
  • post(Object):发布事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class News {
private String message;

public News(String message) {
this.message = message;
}

@Override
public String toString() {
return "News{" +
"message='" + message + '\'' +
'}';
}
}

class Weather {
private String message;

public Weather(String message) {
this.message = message;
}

@Override
public String toString() {
return "Weather{" +
"message='" + message + '\'' +
'}';
}
}

class MultiListener {

@Subscribe
public void listenNews(News news) {
System.out.println("receive news: " + news);
}

@Subscribe
public void listenWeather(Weather weather) {
System.out.println("receive weather: " + weather);
}
}

class TypeListener {

@Subscribe
public void listenString(String message) {
System.out.println("receive String message: " + message);
}

@Subscribe
public void listenNumber(Integer number) {
System.out.println("receive number: " + number);
}
}

创建测试方法并且查看结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void test() {
EventBus eventBus = new EventBus();
eventBus.register(new MultiListener());
eventBus.register(new TypeListener());
eventBus.post(new News("today no news"));
eventBus.post(new Weather("today weather is good"));
eventBus.post("hello");
eventBus.post(1);
}

/*
receive news: News{message='today no news'}
receive weather: Weather{message='today weather is good'}
receive String message: hello
receive number: 1
*/

三、深入 EventBus

1、EventBus 组成

Event:事件,可以理解成 kafka 订阅的 topic,在 guava 中则有两种事件类型

  • 由任意非基础类型定义的对象类型
  • DeadEvent,没有任何订阅者订阅的异常事件

EventBus:负责将事件分发给注册了相关事件处理方法的订阅者的对象,包含了注册、注销、发送事件等功能。主要有两种实现类

  • EventBus:默认的事件总线
  • AsyncEventBus:异步事件总线,提供异步分发事件的能力

SubscriberRegistry:订阅注册中心,发布订阅机制就是中间有一个注册中心负责事件的分发,而 SubscriberRegistry 就是干这个事情的,每个 EventBus 都会绑定一个注册中心

  • EventBus创建时会创建一个注册中心,并且把自身注入到注册中心中,通过调用 EventBusregister 方法,实则是调用注册中心的 register方法把要监听器注册到注册中心,并且转换成一个事件与订阅类型的 Map 映射。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class EventBus {

private final SubscriberRegistry subscribers = new SubscriberRegistry(this);

//....

public void register(Object object) {
subscribers.register(object);
}
}

final class SubscriberRegistry {

// 线程安全集合,保存事件类型和对应的监听器的所有监听方法
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();

void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();

CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
}

Subscriber:订阅者,指对事件感兴趣并注册了事件处理方法的对象,通常是POJO类。订阅者需要提供一个或多个处理该事件的方法(即事件处理方法),主要有两种实现类。

  • Subscriber:默认的订阅器实现类
  • SynchronizedSubscriber:同步订阅器,让事件处理同步串行执行

通过前面的注册中心,注册中心会把对应的监听器包括监听的事件封装成订阅器,所以最终事件被处理是被封装成订阅器来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Subscriber {
// 绑定的 EventBus
@Weak private EventBus bus;

// 绑定的监听器,也就是 EventBus register 传递的参数
@VisibleForTesting final Object target;

// 绑定的监听器内的监听方法
private final Method method;

// EventBus 所使用的执行器
private final Executor executor;

// 进行事件的处理与分发
final void dispatchEvent(Object event) {
executor.execute(
() -> {
try {
// 使用反射调用监听器内对应事件的监听方法
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
});
}
}

2、线程安全

在 EventBus 中提供了一个 @AllowConcurrentEvents 注解,这个注解标明监听器的监听方法是否线程安全,此方法需要配合 @Subscribe 一起用。

  • 如果被 @Subscribe 标注的监听方法没有使用 @AllowConcurrentEvents 注解,那么默认使用的是 SynchronizedSubscriber ,在多线程的情况下同一时间下 EventBus 多次发布同一事件时,会进行加锁操作
  • 如果标注了此注解,则说明执行时可以并发执行(线程安全),内部不会进行加锁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Subscriber{

static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}

// 判断方法是否需要线程安全
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}

static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}

@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
}

3、父子事件

当被监听的事件(监听器方法参数)之间存在父子关系时,发布子事件,父事件也会被触发,请看下面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new TypeListener());
eventBus.post(111);
}

static class TypeListener {

@Subscribe
private void readObject(Object object) {
System.out.println("receive object message: " + object);
}

@Subscribe
private void readInteger(Integer number) {
System.out.println("receive integer message: " + number);
}
}
// result:
// receive integer message: 111
// receive object message: 111

三、最后

使用 EventBus 可以在项目中对逻辑进行解耦,例如很多地方登录注册,日志警告,可能都需要发送右键,如果每个地方都需要重写一份发送邮件的逻辑,或者是后面需要从发送邮件改成发送手机短信,那么所有地方都需要进行重写。使用 EventBus 的好处就是可以把这些逻辑进行事件驱动化,并且以事件的形式,支持逻辑的扩展。

当然 EventBus 也有一些不足之处,例如,事件,消息都存储与本地的内存中,如果服务宕机,那么消息将会丢失,如果是相对复杂的逻辑,或是消息需要持久化,那么建议还是使用诸如 KafkaRabbitMQ 等更合适。


EventBus 学习
http://aim467.github.io/2023/06/18/EventBus-学习/
作者
Dedsec2z
发布于
2023年6月18日
更新于
2023年6月18日
许可协议