贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

Guava EventBus

介绍

EventBus 允许组件之间通过发布订阅方式进行交互,而不需要组件之间进行显式的注册(也意味着互相了解其存在)

它被设计专门用于使用显式注册来替代传统的 Java 进程内事件分发

它不是通用的发布订阅系统,也不是用于进程间通信的系统

文档

摘自 Github Guava Document EventBusExplained · google/guava Wiki · GitHub

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 定义出类和订阅的方法
class EventBusChangeRecorder {
@Subscribe public void recordCustomerChange(ChangeEvent e) {
recordChange(e.getChange());
}
}

// 在任意地方进行初始化
eventBus.register(new EventBusChangeRecorder());

// 之后进行消息的发送
public void changeCustomer()
ChangeEvent event = getChangeEvent();
eventBus.post(event);
}

一分钟指南

转换一个已经存在的以事件订阅为基础的系统来使用 EventBus 是容易的

订阅者(For Listeners)

订阅特定类型的事件(例如 CustomerChangeEvent

  • 传统的方式:实现一个和事件相关的接口,例如 CustomerChangeEventListener
  • EventBus:创建一个方法,CustomerChangeEvent 作为这个方法唯一的参数,然后在方法上标记 @Subscribe 注解

向事件生产者注册你的订阅者方法

  • 传统的方式:将对象传递给每个生产者的 registerCustomerChangeEventListener 方法,这些方法很少在公共接口中定义,因此除了了解每个可能的生产者,还必须了解其类型
  • EventBus:将方法所在对象通过 EventBus.register(Object) 方法注册在一个 EventBus 对象上,需要确保消息生产者使用的 EventBus 实例和进行方法注册的是同一个

订阅一个普通事件的超类(supertype;例如 EventObject 或者 Object

  • 传统的方式:不容易实现
  • EventBus:事件会自动分配给超类的订阅者,允许针对接口(Interface)的订阅者或者针对 Object 的 “通配符订阅者”

要订阅或检测在没有合适事件订阅者存在的事件

  • 传统的方式:在每个事件调度方法上添加代码(可能使用 AOP)
  • EventBus:订阅 DeadEvent,随后 EventBus 将会通知你任何已经已经生产但是没有消费交付的事件(便与调试)

生产者(For Producers)

跟踪订阅者处理事件

  • 传统的方式:写代码管理订阅者列表,包括同步,或者使用公共类例如 EventListenerList
  • EventBus:EventBus 已经为你实现了

派发事件给订阅者

  • 传统的方式:显式实现一个向各个事件订阅者分配事件的方法,包括错误隔离和异步(如果希望如此)
  • EventBus:传递消息对象给一个 EventBus 实例的 EventBus.post(Object) 方法

术语

EventBus 系统和代码使用以下术语来讨论事件分发

术语 描述
Event 任何可能会被传递给一个总线的对象
Subcribing 注册一个 listener 给 EventBus 的行为,以便其处理方法将会接收事件
Listener 一个希望接收事件的对象,暴露处理方法(handler method)
Handler method 一个 public 方法,EventBus 可以使用该方法处理事件;使用 @Subcribe 注解进行标记
Posting an event 确保事件通过 EventBus 被任意订阅者接收

FAQ

为什么我必须创建自己的 EventBus 而不是使用单例

EventBus 并没有明确指出你如何使用它;对于针对每个组件的事件总线并没有禁止使用各自独立的实例,也没有禁止针对不同上下文、主题的时间使用独立的实例;如果进行拆分,会使得在测试中创建、删除 EventBus 对象更加容易、

当然,如果你想有一个大而全逻辑的 EventBus 单例,并不会阻止你这样做,只需让您的容器(例如 Guice)在全局范围内将 EventBus 创建为一个单例(或者将其存储为一个静态变量,如果想这样做的话)

总而言之,EventBus 不是一个单例是因为我们不想做出这种决定,你可以根据喜好决定如何使用

我可以向 EventBus 取消注册某个订阅者吗

可以,使用 EventBus.unregister;只不过我们发现这种需求很少见:

  • 大部分订阅者都在启动时进行注册或者懒加载,并在应用程序的生命周期中持续存在
  • 特殊范围(scope-specific)的 EventBus 实例可以处理临时的事件分发(例如在请求范围的对象之间分发事件)
  • 对于测试,EventBus 实例可以很容易地创建或丢弃,从而无需显式注销

为什么使用注解来标记订阅方法,而不是订阅者实现接口

我们认为 @Subscribe 注解可以传达真实的意图,就像实现接口一样明确(或者更明确),同时让您可以自由地将事件订阅者方法放置在任何您希望的位置,并为它们提供需要的名称

传统的 JavaEvents 使用侦听器接口,该接口通常只运行少数方法——通常是一个,这有许多缺点:

  • 任何一个类都只能实现对给定事件的单个响应
  • 订阅者接口方法可能发生冲突
  • 方法必须以事件命名(例如 handleChangeEvent ),而不是以其目的(例如 recordChangeInJournal )命名
  • 每种事件都有他们自己的接口,没有一个针对事件族的通用父类接口

干净清晰地实现会存在困难,解决困难导致了一种模式,即使用简单的匿名类来实现订阅器接口

比较如下两个 case

1
2
3
4
5
6
7
8
9
class ChangeRecorder {
void setCustomer(Customer cust) {
cust.addChangeListener(new ChangeListener() {
public void customerChanged(ChangeEvent e) {
recordChange(e.getChange());
}
};
}
}

对比

1
2
3
4
5
6
// Class is typically registered by the container.
class EventBusChangeRecorder {
@Subscribe public void recordCustomerChange(ChangeEvent e) {
recordChange(e.getChange());
}
}

事实上在第二种情况下,意图更清晰:冗余代码(noise code)更少,事件订阅者有一个清晰而有意义的名称

如果我注册了一个没有任何订阅者方法的侦听器,会发生什么?

什么都不会发生

EventBus 总是设计和容器、模块系统集成,在这些情况下,让容器、工厂、环境将每个创建的对象传递给EventBus 的 register 方法是很方便的

这样,容器、工厂、环境创建的任何对象都可以通过公开订阅者方法挂接到系统的事件模型中

在编译时可以检测到哪些 Event Bus 问题?

Java 的类型系统可以明确检测到的任何问题

例如为不存在的事件类型定义订阅者方法

哪些 Event Bus 问题可以在注册时立即检测到?

调用 register 后,将立即检查正在注册的侦听器的订阅者方法的格式是否正确

特别是,任何用 @Subscribe 标记的方法都必须只接受一个参数

任何违反此规则的行为都将引发 IllegalArgumentException(这个检查可以转移到使用 APT 的编译时间,这是我们正在研究的一个解决方案)

哪些 Event Bus 问题只能在稍后的运行时检测到?

如果组件 post 了没有注册侦听器的事件,则可能指示错误(通常指示您错过了 @Subscribe 注释,或者侦听组件未加载)

(请注意,这并不一定意味着有问题;在许多情况下,应用程序会故意忽略发布的事件,尤其是当事件来自您无法控制的代码时)

要处理此类事件,请为 DeadEvent 类注册一个订阅者方法,每当 EventBus 接收到没有注册订阅者的事件时,它都会将其转换为 DeadEvent 并按您的方式传递,从而允许您记录它或以其他方式恢复

1
2
3
4
5
6
7
8
9
public class EventListener {
/**
* 死信事件:接收没有订阅者的消息
*/
@Subscribe
public void listenerDeadEvent(DeadEvent deadEvent){
System.out.println("EventListener#listenerDeadEvent -> " + deadEvent.getEvent());
}
}

我如何测试消息订阅者和他们的订阅方法?

因为侦听器类上的订阅者方法是普通方法,所以您可以简单地从测试代码中调用它们来模拟 EventBus

源码

EventBus

EventBus 承载了订阅信息和 post 事件的能力,实例和实例之间的信息互相隔离

1
2
3
4
5
6
7
8
9
10
EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}

从构造器来看,主要有以下主要的初始属性:

  • identifier:该 EventBus 实例的简称标识
  • executor:执行器;默认是一个直接执行器
  • dispatcher:调度器;默认是单线程队列调度
  • exceptionHandler:异常处理器

EventBus 还有一个子类 AsyncEventBus

区别在于可以设置 ExecutorDispatcher 默认为 LegacyAsyncDispatcher

1
2
3
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}

register 注册订阅方法

1
2
3
4
5
6
public class Run {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new EventListener());
}
}

该方法用于注册订阅方法,即处理事件的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void register(Object listener) {
// 拿到所有包装的 Subscriber
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();

CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

// 保存到 eventSubscribers
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
// key 是 event type
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}

这里使用了 Guava 包下的 cache 工具

直接看扫描订阅方法的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
// 遍历类型
for (Class<?> supertype : supertypes) {
// 遍历方法
for (Method method : supertype.getDeclaredMethods()) {
// 扫描注解
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
// check 方法参数数量
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters. "
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);
// check 方法参数是否是原始类型
checkArgument(
!parameterTypes[0].isPrimitive(),
"@Subscribe method %s's parameter is %s. "
+ "Subscriber methods cannot accept primitives. "
+ "Consider changing the parameter to %s.",
method,
parameterTypes[0].getName(),
Primitives.wrap(parameterTypes[0]).getSimpleName());

// 包装反射出的方法
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
// 保存
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}

主要流程:

  • 获取 Object 对象的类型列表
  • 遍历方法
  • 扫描出 Subscribe 方法,校验参数列表
  • 包装为 MethodIdentifier 保存

post 发布事件

1
2
3
4
5
6
7
8
9
public class Run {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new EventListener());
eventBus.post(1);
eventBus.post("1");
eventBus.post(new Object());
}
}

发布事件的操作其实就包含了对订阅者的调用

1
2
3
4
5
6
7
8
9
10
11
12
public void post(Object event) {
// 根据 event type 查询订阅者列表
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
// 如果有订阅方法,则调用 dispatch
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// 如果对应事件没有订阅者,则包装为死信消息
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}

不同的调度器有不同的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));

if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}

最终调用 SubscriberdispatchEvent 方法,通过反射进行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}

Dispatcher 调度器

调度器用来改变事件发布后调用订阅方法的顺序

有三种实现:

  • PerThreadQueuedDispatcher:每个线程内的事件都按照发布顺序调用订阅者;广度优先
  • LegacyAsyncDispatcher:单个全局队列中发布的事件进行排队;广度优先
  • ImmediateDispatcher:事件发布后立即将事件调度给订阅者,而不使用中间队列来更改调度顺序;深度优先

PerThreadQueuedDispatcher

每个线程内的事件都按照发布顺序调用订阅者;广度优先

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private static final class PerThreadQueuedDispatcher extends Dispatcher {

// This dispatcher matches the original dispatch behavior of EventBus.

/** Per-thread queue of events to dispatch. */
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};

/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 获取当前线程的 event 队列
Queue<Event> queueForThread = queue.get();
// 将当前发布的事件 offer 进队列
queueForThread.offer(new Event(event, subscribers));

// 判断调度标记
if (!dispatching.get()) {
// 调度标记,设置为 true
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 调度订阅方法
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
// 清理 ThreadLocal
dispatching.remove();
queue.remove();
}
}
}
...
}

PerThreadQueuedDispatcher 隔离了线程,每个线程的事件都在一个队列下保证调度

同时使用一个线程隔离的标记 dispatching 当前线程的事件队列是否正在调度中

这样就达到了广度优先的效果,事件 A 和事件 B => [a1,a2,a3,b1,b2,b3]

LegacyAsyncDispatcher

单个全局队列中发布的事件进行排队;广度优先

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static final class LegacyAsyncDispatcher extends Dispatcher {
/** Global event queue. */
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}

EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
...
}

使用了一个全局事件队列 ConcurrentLinkedQueue 来保证顺序

以此在异步的过程中同样达到消息订阅广度的效果

事件 A 和事件 B => [a1,a2,a3,b1,b2,b3]

ImmediateDispatcher

事件发布后立即将事件调度给订阅者,而不使用中间队列来更改调度顺序;深度优先

1
2
3
4
5
6
7
8
9
10
11
private static final class ImmediateDispatcher extends Dispatcher {
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}

其实就是事件发布后拿到订阅列表立即调度

区别排队的机制,消息和消息之间不保证先后顺序

事件 A 和事件 B 之间有可能是 [a1,a2,a3,b1,b2,b3],也有可能是 [a1,a2,b1,b2,a3,b3],也可能是 [a1,b1,b2,b3,a2,a3]

参考

EventBusExplained · google/guava Wiki · GitHub