介绍
EventBus
允许组件之间通过发布订阅方式进行交互,而不需要组件之间进行显式的注册(也意味着互相了解其存在)
它被设计专门用于使用显式注册来替代传统的 Java 进程内事件分发
它不是通用的发布订阅系统,也不是用于进程间通信的系统
文档
摘自 Github Guava Document EventBusExplained
· google/guava Wiki · GitHub
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class EventBusChangeRecorder { @Subscribe public void recordCustomerChange (ChangeEvent e) { recordChange(e.getChange()); } } eventBus.register(new EventBusChangeRecorder ()); public void changeCustomer () ChangeEvent event = getChangeEvent(); eventBus.post(event); }
一分钟指南
转换一个已经存在的以事件订阅为基础的系统来使用 EventBus 是容易的
订阅者(For Listeners)
订阅特定类型的事件(例如 CustomerChangeEvent
)
传统的方式:实现一个和事件相关的接口,例如
CustomerChangeEventListener
EventBus:创建一个方法,CustomerChangeEvent
作为这个方法唯一的参数,然后在方法上标记 @Subscribe
注解
向事件生产者注册你的订阅者方法
传统的方式:将对象传递给每个生产者的
registerCustomerChangeEventListener
方法,这些方法很少在公共接口中定义,因此除了了解每个可能的生产者,还必须了解其类型
EventBus:将方法所在对象通过 EventBus.register(Object)
方法注册在一个 EventBus
对象上,需要确保消息生产者使用的
EventBus
实例和进行方法注册的是同一个
订阅一个普通事件的超类(supertype;例如 EventObject
或者
Object
)
传统的方式:不容易实现
EventBus:事件会自动分配给超类的订阅者,允许针对接口(Interface)的订阅者或者针对
Object
的 “通配符订阅者”
要订阅或检测在没有合适事件订阅者存在的事件
传统的方式:在每个事件调度方法上添加代码(可能使用 AOP)
EventBus:订阅 DeadEvent
,随后 EventBus
将会通知你任何已经已经生产但是没有消费交付的事件(便与调试)
生产者(For Producers)
跟踪订阅者处理事件
传统的方式:写代码管理订阅者列表,包括同步,或者使用公共类例如
EventListenerList
EventBus:EventBus
已经为你实现了
派发事件给订阅者
传统的方式:显式实现一个向各个事件订阅者分配事件的方法,包括错误隔离和异步(如果希望如此)
EventBus:传递消息对象给一个 EventBus
实例的
EventBus.post(Object)
方法
术语
EventBus 系统和代码使用以下术语来讨论事件分发
术语
描述
Event
任何可能会被传递给一个总线的对象
Subcribing
注册一个 listener 给
EventBus
的行为,以便其处理方法将会接收事件
Listener
一个希望接收事件的对象,暴露处理方法(handler
method)
Handler method
一个 public 方法,EventBus
可以使用该方法处理事件;使用 @Subcribe
注解进行标记
Posting an event
确保事件通过 EventBus
被任意订阅者接收
FAQ
为什么我必须创建自己的 EventBus
而不是使用单例
EventBus
并没有明确指出你如何使用它;对于针对每个组件的事件总线并没有禁止使用各自独立的实例,也没有禁止针对不同上下文、主题的时间使用独立的实例;如果进行拆分,会使得在测试中创建、删除
EventBus
对象更加容易、
当然,如果你想有一个大而全逻辑的 EventBus
单例,并不会阻止你这样做,只需让您的容器(例如 Guice)在全局范围内将
EventBus
创建为一个单例(或者将其存储为一个静态变量,如果想这样做的话)
总而言之,EventBus
不是一个单例是因为我们不想做出这种决定,你可以根据喜好决定如何使用
我可以向 EventBus
取消注册某个订阅者吗
可以,使用
EventBus.unregister
;只不过我们发现这种需求很少见:
大部分订阅者都在启动时进行注册或者懒加载,并在应用程序的生命周期中持续存在
特殊范围(scope-specific)的 EventBus
实例可以处理临时的事件分发(例如在请求范围的对象之间分发事件)
对于测试,EventBus
实例可以很容易地创建或丢弃,从而无需显式注销
为什么使用注解来标记订阅方法,而不是订阅者实现接口
我们认为 @Subscribe
注解可以传达真实的意图,就像实现接口一样明确(或者更明确),同时让您可以自由地将事件订阅者方法放置在任何您希望的位置,并为它们提供需要的名称
传统的 JavaEvents
使用侦听器接口,该接口通常只运行少数方法——通常是一个,这有许多缺点:
任何一个类都只能实现对给定事件的单个响应
订阅者接口方法可能发生冲突
方法必须以事件命名(例如 handleChangeEvent
),而不是以其目的(例如 recordChangeInJournal
)命名
每种事件都有他们自己的接口,没有一个针对事件族的通用父类接口
干净清晰地实现会存在困难,解决困难导致了一种模式,即使用简单的匿名类来实现订阅器接口
比较如下两个 case
1 2 3 4 5 6 7 8 9 class ChangeRecorder { void setCustomer (Customer cust) { cust.addChangeListener(new ChangeListener () { public void customerChanged (ChangeEvent e) { recordChange(e.getChange()); } }; } }
对比
1 2 3 4 5 6 class EventBusChangeRecorder { @Subscribe public void recordCustomerChange (ChangeEvent e) { recordChange(e.getChange()); } }
事实上在第二种情况下,意图更清晰:冗余代码(noise
code)更少,事件订阅者有一个清晰而有意义的名称
如果我注册了一个没有任何订阅者方法的侦听器,会发生什么?
什么都不会发生
EventBus
总是设计和容器、模块系统集成,在这些情况下,让容器、工厂、环境将每个创建的对象传递给EventBus
的 register
方法是很方便的
这样,容器、工厂、环境创建的任何对象都可以通过公开订阅者方法挂接到系统的事件模型中
在编译时可以检测到哪些 Event Bus 问题?
Java 的类型系统可以明确检测到的任何问题
例如为不存在的事件类型定义订阅者方法
哪些 Event Bus 问题可以在注册时立即检测到?
调用 register
后,将立即检查正在注册的侦听器的订阅者方法的格式是否正确
特别是,任何用 @Subscribe
标记的方法都必须只接受一个参数
任何违反此规则的行为都将引发
IllegalArgumentException
(这个检查可以转移到使用 APT
的编译时间,这是我们正在研究的一个解决方案)
哪些 Event Bus 问题只能在稍后的运行时检测到?
如果组件 post
了没有注册侦听器的事件,则可能指示错误(通常指示您错过了
@Subscribe
注释,或者侦听组件未加载)
(请注意,这并不一定意味着有问题;在许多情况下,应用程序会故意忽略发布的事件,尤其是当事件来自您无法控制的代码时)
要处理此类事件,请为 DeadEvent
类注册一个订阅者方法,每当 EventBus
接收到没有注册订阅者的事件时,它都会将其转换为 DeadEvent
并按您的方式传递,从而允许您记录它或以其他方式恢复
1 2 3 4 5 6 7 8 9 public class EventListener { @Subscribe public void listenerDeadEvent (DeadEvent deadEvent) { System.out.println("EventListener#listenerDeadEvent -> " + deadEvent.getEvent()); } }
我如何测试消息订阅者和他们的订阅方法?
因为侦听器类上的订阅者方法是普通方法,所以您可以简单地从测试代码中调用它们来模拟
EventBus
源码
EventBus
EventBus
承载了订阅信息和 post
事件的能力,实例和实例之间的信息互相隔离
1 2 3 4 5 6 7 8 9 10 EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this .identifier = checkNotNull(identifier); this .executor = checkNotNull(executor); this .dispatcher = checkNotNull(dispatcher); this .exceptionHandler = checkNotNull(exceptionHandler); }
从构造器来看,主要有以下主要的初始属性:
identifier:该 EventBus
实例的简称标识
executor:执行器;默认是一个直接执行器
dispatcher:调度器;默认是单线程队列调度
exceptionHandler:异常处理器
EventBus
还有一个子类 AsyncEventBus
区别在于可以设置 Executor
,Dispatcher
默认为 LegacyAsyncDispatcher
1 2 3 public AsyncEventBus (Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { super ("default" , executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); }
register 注册订阅方法
1 2 3 4 5 6 public class Run { public static void main (String[] args) { EventBus eventBus = new EventBus (); eventBus.register(new EventListener ()); } }
该方法用于注册订阅方法,即处理事件的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void register (Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null ) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet <>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
这里使用了 Guava 包下的 cache 工具
直接看扫描订阅方法的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private static ImmutableList<Method> getAnnotatedMethodsNotCached (Class<?> clazz) { Set<? extends Class <?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument( parameterTypes.length == 1 , "Method %s has @Subscribe annotation but has %s parameters. " + "Subscriber methods must have exactly 1 parameter." , method, parameterTypes.length); checkArgument( !parameterTypes[0 ].isPrimitive(), "@Subscribe method %s's parameter is %s. " + "Subscriber methods cannot accept primitives. " + "Consider changing the parameter to %s." , method, parameterTypes[0 ].getName(), Primitives.wrap(parameterTypes[0 ]).getSimpleName()); MethodIdentifier ident = new MethodIdentifier (method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
主要流程:
获取 Object
对象的类型列表
遍历方法
扫描出 Subscribe
方法,校验参数列表
包装为 MethodIdentifier
保存
post 发布事件
1 2 3 4 5 6 7 8 9 public class Run { public static void main (String[] args) { EventBus eventBus = new EventBus (); eventBus.register(new EventListener ()); eventBus.post(1 ); eventBus.post("1" ); eventBus.post(new Object ()); } }
发布事件的操作其实就包含了对订阅者的调用
1 2 3 4 5 6 7 8 9 10 11 12 public void post (Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { post(new DeadEvent (this , event)); } }
不同的调度器有不同的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override void dispatch (Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event (event, subscribers)); if (!dispatching.get()) { dispatching.set(true ); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null ) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
最终调用 Subscriber
的 dispatchEvent
方法,通过反射进行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 final void dispatchEvent (final Object event) { executor.execute( new Runnable () { @Override public void run () { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
Dispatcher 调度器
调度器用来改变事件发布后调用订阅方法的顺序
有三种实现:
PerThreadQueuedDispatcher:每个线程内的事件都按照发布顺序调用订阅者;广度优先
LegacyAsyncDispatcher:单个全局队列中发布的事件进行排队;广度优先
ImmediateDispatcher:事件发布后立即将事件调度给订阅者,而不使用中间队列来更改调度顺序;深度优先
PerThreadQueuedDispatcher
每个线程内的事件都按照发布顺序调用订阅者;广度优先
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private static final class PerThreadQueuedDispatcher extends Dispatcher { private final ThreadLocal<Queue<Event>> queue = new ThreadLocal <Queue<Event>>() { @Override protected Queue<Event> initialValue () { return Queues.newArrayDeque(); } }; private final ThreadLocal<Boolean> dispatching = new ThreadLocal <Boolean>() { @Override protected Boolean initialValue () { return false ; } }; @Override void dispatch (Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event (event, subscribers)); if (!dispatching.get()) { dispatching.set(true ); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null ) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } ... }
PerThreadQueuedDispatcher
隔离了线程,每个线程的事件都在一个队列下保证调度
同时使用一个线程隔离的标记 dispatching
当前线程的事件队列是否正在调度中
这样就达到了广度优先的效果,事件 A 和事件 B =>
[a1,a2,a3,b1,b2,b3]
LegacyAsyncDispatcher
单个全局队列中发布的事件进行排队;广度优先
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static final class LegacyAsyncDispatcher extends Dispatcher { private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch (Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber (event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null ) { e.subscriber.dispatchEvent(e.event); } } ... }
使用了一个全局事件队列 ConcurrentLinkedQueue
来保证顺序
以此在异步的过程中同样达到消息订阅广度的效果
事件 A 和事件 B => [a1,a2,a3,b1,b2,b3]
事件发布后立即将事件调度给订阅者,而不使用中间队列来更改调度顺序;深度优先
1 2 3 4 5 6 7 8 9 10 11 private static final class ImmediateDispatcher extends Dispatcher { private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher (); @Override void dispatch (Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { subscribers.next().dispatchEvent(event); } } }
其实就是事件发布后拿到订阅列表立即调度
区别排队的机制,消息和消息之间不保证先后顺序
事件 A 和事件 B 之间有可能是 [a1,a2,a3,b1,b2,b3],也有可能是
[a1,a2,b1,b2,a3,b3],也可能是 [a1,b1,b2,b3,a2,a3]
参考
EventBusExplained
· google/guava Wiki · GitHub