深入解读 EventBus 3.0 源码

EventBus 是一款非常优秀的 Android 事件订阅/发布框架,可以非常快速、简洁地实现组件间的通信,并且高度解耦。大致思想就是一个发布者在发布了事件后,所有订阅了这个类型事件的订阅者都可以接收到事件。本人本着刨根问底的优良传统,果断下载了一份源码(源码地址:https://github.com/greenrobot/EventBus/)进行学习,下面将按照使用的过程来解读源码。

使用方法

首先在对应模块的 build.gradle 里面配置:

1
2
3
dependencies {
compile 'org.greenrobot:eventbus:3.0.0'
}

获取实例

在使用之前需要获取一个 EventBus 的实例,代码如下:

1
EventBus.getDefault();

订阅

订阅者需要订阅操作才能接受到事件,订阅代码如下:

1
EventBus.getDefault().register(this);

在合适的地方调用此方法,注意要和取消订阅方法成对出现。

订阅方法

订阅者在订阅后需要实现一个订阅方法来接收发布者发布的事件,使用 @Subscribe 注解即可,代码如下:

1
2
3
4
@Subscribe
public void event(String str) {
// 这里处理业务逻辑
}

对于 @Subscribe 注解有三个参数可以设置,也可以不设置使用默认值:

  • threadMode:一个枚举类型,表示方法执行的线程,有四个值可选:ThreadMode.POSTING(默认值)、ThreadMode.MAIN、ThreadMode.BACKGROUND、ThreadMode.ASYNC。ThreadMode.POSTING 表示方法执行的线程和发布者发布消息时所在线程一致;ThreadMode.MAIN 表示无论发布者发布消息时是哪个线程,方法都将在 Android 主线程(也即UI线程)中被调用;ThreadMode.BACKGROUND 表示当发布者发布消息的线程不是主线程时,方法将会在一个単例的后台线程执行;ThreadMode.ASYNC 表示方法将会在一个新的后台线程执行,既不是主线程,也不是发布者发布消息的线程
  • sticky:表示是否接受粘性事件,默认为 false,如果设置为 true,则可以在发送粘性事件之后在进行订阅操作,依然可以接收事件
  • priority:表示事件的优先级,默认为 0,值越大优先级越大
  • 方法参数:表示事件的类型,有且仅有一个参数

事件发布

发布者在需要的地方发布事件,订阅了相同事件类型的订阅者便可以接收到,代码如下:

1
EventBus.getDefault().post("Test");

也可以发送粘性事件,如果在注解中指定 sticky 为 true,则可以延迟接收事件(先发布事件再订阅,在订阅的时候接收到事件),代码如下:

1
EventBus.getDefault().postSticky("Test");

移除粘性事件

如果不想接收某个粘性事件而粘性事件已经发布,则可以通过移除操作将其移除,代码如下(MessageEvent 是某个事件类型):

1
2
3
4
MessageEvent stickyEvent = EventBus.getDefault().getStickyEvent(MessageEvent.class);
if(stickyEvent != null) {
EventBus.getDefault().removeStickyEvent(stickyEvent);
}

取消订阅

订阅者如果不想再接收事件,需要进行取消订阅操作,代码如下:

1
EventBus.getDefault().unregister(this);

运行加速

EventBus 提供了一个选项用于加速,首先在对应模块的 build.gradle 里面配置:

1
2
3
4
5
6
7
8
9
dependencies {
annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.0.0'
}

apt {
arguments {
eventBusIndex "com.example.myapp.MyEventBusIndex"
}
}

其中 com.example.myapp.MyEventBusIndex 是一个自动生成的索引类,类名可以自定义,然后在获取实例的时候改成如下代码便可实现加速:

1
EventBus.builder().addIndex(new MyEventBusIndex()).build();

获取实例源码解析

在使用 EventBus 之前要获取实例,我们先来看看 getDefault() 方法及其相关方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder(); // EventBusBuilder 的默认对象
static volatile EventBus defaultInstance; // 单例模式的对象,使用 volatile 关键字保证多线程同时操作 defaultInstance 时其成员变量不会出现不一致的问题

public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

这里使用了一个単例模式获取实例,并且最终会调用一个带有 EventBusBuilder 参数的构造方法,里面进行各种参数初始化。我们先不管这些参数的具体含义是什么,首先去看一看 EventBusBuilder 是什么样的,有这样两个方法返回 EventBus 的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." + " It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}

public EventBus build() {
return new EventBus(this);
}

这里我们大概知道了它使用了建造者模式,通过 EventBusBuilder 初始化各种参数,然后给 EventBus 赋值。因此获取实例其实有很好几种方式,如下:

  • 获取默认配置的単例
1
EventBus.getDefault();
  • 获取默认配置的新的实例,这种方式不能执行两次
1
EventBus.builder().installDefaultEventBus();
  • 获取新的实例,并且可以根据需求进行参数设置
1
EventBus.builder().build();

订阅源码解析

订阅者要想接收发布者发布的事件,首先要进行订阅操作,首先我们来看看 register() 方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final SubscriberMethodFinder subscriberMethodFinder;

public void register(Object subscriber) {
// 找到指定订阅者的订阅方法,转换为 SubscriberMethod 对象集合
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

// 对每个 SubscriberMethod 对象进行订阅操作
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

主要做了两件事情:寻找订阅方法并转换为 SubscriberMethod 对象集合、遍历 SubscriberMethod 对象集合进行订阅操作。

这里出现了一个名为 SubscriberMethod 的类,它其实就是对应订阅方法的实体类,里面保存了订阅方法的信息,其主要属性如下:

1
2
3
4
5
6
final Method method; // 订阅方法的 Method 对象
final ThreadMode threadMode; // 线程模型,对应 @Subscriber 注解中的 ThreadMode 的值
final Class<?> eventType; // 事件类型,订阅方法的参数的 Class 对象
final int priority; // 事件的优先级,对应 @Subscriber 注解中的 priority 的值
final boolean sticky; // 是否接受粘性事件,对应 @Subscriber 注解中的 sticky 的值
String methodString; // 订阅方法的名称

查找订阅方法

订阅的第一步是查找订阅方法,这里使用了一个叫 SubscriberMethodFinder 的对象方法 findSubscriberMethods() 进行查找操作,可以发现之前在获取 EventBus 实例的时候对 SubscriberMethodFinder 进行了初始化操作:

1
2
3
4
5
EventBus(EventBusBuilder builder) {
...
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);
...
}

这里的三个参数取的是 EventBuilder 的参数,代表的含义如下:

  • subscriberInfoIndexes:List——订阅对象的索引集合,默认值为 null
  • strictMethodVerification:是否进行严格的方法验证,默认值为 false
  • ignoreGeneratedIndex:是否忽略注解处理器生成的自定义类,默认值为 false

这些参数都可以在创建 EventBus 实例的时候通过 EventBusBuilder 设置,我们先跳过具体的用途,到了该用到的时候自然就会明白。

接下来看看 findSubscriberMethods() 方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 从缓存从读取要找的对象集合,如果有直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

// 是否忽略注解处理器生成的自定义类
if (ignoreGeneratedIndex) {
// 利用反射机制获取订阅者的 SubscriberMethod 集合
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 从注解处理器生成的自定义类中获取订阅者的 SubscriberMethod 集合
subscriberMethods = findUsingInfo(subscriberClass);
}

// 如果订阅者的 SubscriberMethod 集合为空,则抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 否则存入缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

这个方法的目的很明确,即把订阅者的所有订阅方法转换成 SubscriberMethod 对象集合,转换过程通过 ignoreGeneratedIndex 字段判断使用哪种方式。从代码中还可以看出我们在使用过程中,如果一个类里面进行了注册操作而没有实现订阅方法话就会抛出异常。并且代码中使用了缓存,提高了程序运行效率。

反射查找

我们先看忽略注解处理器的情况,利用反射机制获取订阅者的 SubscriberMethod 集合,findUsingReflection() 方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 获取 FindState 实例并初始化
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);

// 这里有一个循环,只要 FindState 的 clazz 对象不为 null,将会一直循环,其实这里就是把包括 clazz 本身及其所有父类都遍历了
while (findState.clazz != null) {
// 通过反射查找订阅方法
findUsingReflectionInSingleClass(findState);
// 移动至父类查找
findState.moveToSuperclass();
}

// 返回找到的 SubscriberMethod 对象集合并做了一些释放资源的操作
return getMethodsAndRelease(findState);
}

这个方法虽然精简,但却包含了几个关键的方法,这几个方法是整个查找订阅方法过程中的主要逻辑判断和处理。

首先我们来看一看 FindState,这是一个内部类,作用是保存查找到的 SubscriberMethod 对象集合的状态,通俗地说就是记录查找 SubscriberMethod 对象集合过程中产生的一些数据。这里我们先看两个方法,这两个方法是相互呼应的,一个是生成 FindState 实例,一个是 FindState 中保存的 SubscriberMethod 对象集合返回并且还原 FindState 的状态,下面来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
// FindState 状态还原
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}

这里使用了对象池的思维,需要 FindState 对象时从对象池去取,没有对象则新建实例返回。同样地,当 FindState 对象用完后把其状态还原放入对象池中。这样做,可以在并发编程时减少对象实例化的次数,达到对象复用的目的。

接着看 findUsingReflection() 方法,首先调用了 FindState 的 initForSubscriber() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
Class<?> subscriberClass; // 不明觉厉
Class<?> clazz; // 订阅者的 Class 对象
boolean skipSuperClasses; // 是否跳过父类的查找
SubscriberInfo subscriberInfo; // 订阅方法消息的接口,此参数与注解处理器生成的索引类有关

void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}

这里我一直有个没明白的地方,就是 subscriberClass 这个参数有什么用,我发现代码里面没有什么地方用到了,并且将其注释再运行程序没有任何问题,所以我们先忽略这个参数。初始化完成后就是循环查找订阅者本身及其所有父类的订阅方法了,有两个方法:findUsingReflectionInSingleClass() 和 FindState 的 moveToSuperclass()。从字面意思大概可以明白第一个方法就是关键所在,它是查找 SubscriberMethod 对象集合的核心代码,传入一个 FindState 对象,把查找过程中产生的数据保存。我们先看第二个方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void moveToSuperclass() {
// 如果 skipSuperClasses 那么循环走完
if (skipSuperClasses) {
clazz = null;
} else {
// 否则获取其父类的 Class 对象
clazz = clazz.getSuperclass();
// 如果父类是 java 或者 android 包下的,则循环走完
String clazzName = clazz.getName();
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
clazz = null;
}
}
}

这个方法逻辑很简单,有两种情况会终止循环,至于 skipSuperClasses 何时为 true 在后面分析。从这个方法还可以看出使用 EventBus 的时候,如果某个类没有进行注册操作,但其子类进行了注册操作,那么它依然可以使用实现订阅方法接收事件。

知道什么时候循环会完成了,我们回去看一看真正的核心方法 findUsingReflectionInSingleClass(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 使用 getDeclaredMethods() 获取订阅者 Class 对象本身的方法,不包含继承的方法,效率快
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 如果无法获取则使用 getMethods(0 获取订阅者 Class 对象的所有方法,包含其继承的所有方法,也就是所有父类的方法,效率低
methods = findState.clazz.getMethods();
// 如果获取到了父类的方法,则不需要查找父类了,字段设为 true
findState.skipSuperClasses = true;
}

for (Method method : methods) {
int modifiers = method.getModifiers();
// 保证订阅方法的修饰符不能为 private、abstract、static,并且如果方法名是通过注解处理器生成的,则必须为 public
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 保证订阅方法只有一个参数
if (parameterTypes.length == 1) {
// 保证订阅方法必须使用 @Subscribe 注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 检查这个方法能否被加入
if (findState.checkAdd(method, eventType)) {
// 找到一个订阅方法,则生成对应的 SubscriberMethod 对象,并保存入 FindState 中
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

这里逻辑处理较多,注释在代码里面,只有满足以下条件才会被当做订阅方法处理:

  • 方法必须带有 @Subscribe 注解
  • 方法必须有且只有一个参数
  • 方法的修饰符不能为 private、abstract、static,如果方法是通过注解处理器生成的,则必须是 public 的
  • 方法不能被重复查找

这里还有一个小逻辑,就是对于字段 strictMethodVerification 的判断,如果为 true,那么在不满足上述条件后,对应的带有 @Subscribe 注解的方法(普通方法当然不会报错)会提前抛出异常,如果开发过程中需要非常精确地定位错误,那么建议可以设置这个字段为 true。

逻辑中还有一个 FindState 的对象方法 checkAdd()用于检测是否重复,来看一看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final Map<Class, Object> anyMethodByEventType = new HashMap<>(); // 某个类型的事件对应的方法
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>(); // 方法签名对应的方法定义所在类
final StringBuilder methodKeyBuilder = new StringBuilder(128); // 作为方法签名的字符串缓存

boolean checkAdd(Method method, Class<?> eventType) {
// 将某个类型的事件对应的 Method 对象放入 Map 中,如果之前没有放入过任何对象,直接返回 true
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
// 如果之前存在 Method 对象,则去检查方法的签名
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
throw new IllegalStateException();
}
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
// 拼接方法签名,内容是(方法名 + > + 事件的类型名称)
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();

// 将某个方法签名对应的定义该方法所在的类的 Class 对象放入 Map 中
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
// 如果之前没有放入过任何 Class 对象,或者之前的 Class 对象是现在 的 Class 对象的子类或者就是其本身,则表明可以被添加
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
return true;
} else {
// 否则这个方法不能被添加,并且还原之前的 Class 对象
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

这里的两个方法比较绕,具体有什么作用呢?其实是做了两层判断:第一层是判断同一类型的事件有多少个订阅方法,第二层是判断方法签名,具体作者的用意我也没太搞明白,但我理解了大概有两种情况:

  • 第一种情况:一个订阅者有多个订阅方法,方法名不同,但它们的参数类型(事件类型)都是相同的,那么遍历这些方法的时候,会多次调用到 checkAdd() 方法,由于 existing 不为 null,那么会进而调用 checkAddWithMethodSignature 方法,但是由于每个方法的名字都不同,因此 methodClassOld 会一直为 null,因此都会返回 true。也就是说,允许一个订阅者有多个参数类型(事件类型)相同的订阅方法。
  • 第二种情况:继承的情况,每个订阅者都是有相同订阅方法,换句话说,继承者的订阅方法继承并重写,它们都有着一样的方法签名。方法的遍历会从继承者开始。在 checkAddWithMethodSignature 方法中,methodClassOld 为 null,那么继承者的订阅方法会被添加到列表中。接着,向上找到被继承者的订阅方法,由于 methodClassOld 不为 null 而且显然继承者不是被继承者的父类,methodClassOld.isAssignableFrom() 也会返回 false。也就是说,如果一个订阅者是继承并重写了订阅方法,那么只会允许添加继承者的订阅方法,被继承者的订阅方法会被忽略。

索引查找

对于反射查找订阅方法的相关源码已经分析完了,接下来看看通过注解处理器生成的自定义索引类查找订阅方法的过程。我们回到 findSubscriberMethods() 方法,如果 ignoreGeneratedIndex 为 false 则会调用 findUsingInfo() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 获取 FindState 实例并初始化
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);

// 这里有一个循环,只要 FindState 的 clazz 对象不为 null,将会一直循环,其实这里就是把包括 clazz 本身及其所有父类都遍历了
while (findState.clazz != null) {
// 获取 SubscriberInfo 接口对象,如果不为 null 则通过这个接口获取 SubscriberMethod 对象集合
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 否则通过反射查找订阅方法l
findUsingReflectionInSingleClass(findState);
}
// 移动至父类查找
findState.moveToSuperclass();
}

// 返回找到的 SubscriberMethod 集合并做了一些释放资源的操作
return getMethodsAndRelease(findState);
}

可以看到这个方法和之前的 findUsingReflection() 方法的结构相似,只是循环的内容不一样,这部分内容就是通过一个索引类查找订阅方法(索引类通过注解处理器生成),要搞明白这部分内容,需要先了解几个新出现的接口和类:

  • 接口 SubscriberInfo:一个可以获取订阅方法相关信息的接口,有四个方法,分别是 getSubscriberClass()(获取订阅者的 Class 对象)、getSubscriberMethods()(获取 SubscriberMethod 对象集合)、getSuperSubscriberInfo()(获取订阅者父类的 SubscriberInfo 接口对象)、shouldCheckSuperclass()(获取是否应该检查父类的布尔值)
  • 抽象类 AbstractSubscriberInfo:实现了 SubscriberInfo 接口,并且重写了其中 getSubscriberClass()、getSuperSubscriberInfo()、shouldCheckSuperclass() 方法
  • 类 SimpleSubscriberInfo:继承自 AbstractSubscriberInfo,并且重写了 getSubscriberMethods() 方法
  • 类 SubscriberMethodInfo:对于订阅方法封装的实体类,和之前的 SubscriberMethod 的区别是没有 Method 对象这个成员变量,它和 SubscriberMethod 之间是可以转换的
  • 接口 SubscriberInfoIndex:可以获取 SubscriberInfo 接口对象,只有一个方法 getSubscriberInfo()
  • 索引类 MyEventBusIndex:通过注解处理器自动生成的(如何生成文章开头已经提及),实现了 SubscriberInfoIndex 接口

我们先来看看 getSubscriberInfo() 这个方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private SubscriberInfo getSubscriberInfo(FindState findState) {
// 检查此类不能是其父类
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}

// 检查 subscriberInfoIndexes 这个参数是否为空,不为空则获取 SubscriberInfo 接口对象
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}

// 若没有获取到 SubscriberInfo 接口对象返回 null
return null;
}

这个方法的作用是获取 SubscriberInfo 接口对象,并且做了两个判断,这里有个不明白的地方,根据代码跟踪发现 getSuperSubscriberInfo() 永远都是 null,所以主要是检查 subscriberInfoIndexes 这个参数是否为空,那这个参数是怎么来的呢?还记得文章开头提到的运行加速,代码如下:

1
EventBus.builder().addIndex(new MyEventBusIndex()).build();

如果加入了这行代码,会对这里的 subscriberInfoIndexes 参数进行赋值,类型是 SubscriberInfoIndex 接口,我们来看看这个索引类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyEventBusIndex implements SubscriberInfoIndex {

private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

// 这里我在 MainActivity 里面定了一个 event() 方法,参数类型为 String
putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("event", String.class),
}));
}

private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}

}

这个类对于不同的使用方法会有所不同,可以看出它会记录订阅方法的相关信息,实现很简单,不再赘述。

对于索引查找订阅方法,有下面几个关系可以总结:

  • SubscriberInfoIndex 接口可以获取 SubscriberInfo 接口对象
  • SubscriberInfo 接口可以获取 SubscriberMethod 对象(订阅方法对象)集合
  • MyEventBusIndex 类中有一个 Map,记录了订阅者的 SubscriberInfo 接口对象(实际上记录的是 SimpleSubscriberInfo,而 SimpleSubscriberInfo 继承于 AbstractSubscriberInfo,AbstractSubscriberInfo 实现了 SubscriberInfo 接口)

从刚刚分析的代码可以看出如果不去设置运行加速的配置,那么 subscriberInfoIndexes 这个参数永远为 null,FindState 的成员变量 subscriberInfo 也会永远为 null,循环永远都是走 findUsingReflectionInSingleClass() 这个方法,也就是通过反射查找,这也解释了为什么叫加速,因为反射的性能远远不如普通代码。

如果设置了运行加速的配置,则会通过 SubscriberInfoIndex 接口获取 SubscriberInfo 的接口对象,然后通过 SubscriberInfo 获取 SubscriberMethod 对象集合,其中也进行了同反射查找相同的检查。

订阅

到了这里寻找订阅方法就完成,接下来就是进行真正的订阅操作,回到之前的代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final SubscriberMethodFinder subscriberMethodFinder;

public void register(Object subscriber) {
// 找到指定订阅者的订阅方法,转换为 SubscriberMethod 对象集合
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

// 对每个 SubscriberMethod 对象进行订阅操作
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

订阅操作主要是调用了 subscribe() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; // 事件类型对应的订阅方法
private final Map<Object, List<Class<?>>> typesBySubscriber; // 订阅者对应的事件类型

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// Key 是事件类型,Value 是 Subscription 对象集合,把它们装进 Map 中
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 如果集合中已经存在这个订阅方法,则抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}

// 装入 Map 的操作,注意这里进行了排序处理,priority 值越高会排在前面,事件发布时也会优先
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

// Key 是订阅者的 Class 对象,Value 是事件类型的集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

// 省略一部分代码
...
}

这里只截取了一段代码,剩下的代码是处理粘性事件的,暂时不作讨论。这个方法主要用到了两个成员变量,subscriptionsByEventType 和 typesBySubscriber,在构造函数中对这两个参数进行了初始化:

1
2
3
4
5
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
...
}

回到订阅操作的方法,其主要作用就是对两个成员变量(subscriptionsByEventType 和 typesBySubscriber)进行赋值操作,需要注意的是,如果对于某个类型的事件的订阅方法已经订阅,再次订阅会产生错误,也就是同一个订阅者不能调用两次 register() 方法(当然是在没有取消订阅的情况下)。

这里还出现了一个新的类: Subscription,这是类是对于 SubscriberMethod 对象(订阅方法的实体类)的进一步封装,增加了两个属性,其中一个表明这个订阅方法属于哪一个订阅者,成员变量代码如下:

1
2
3
final Object subscriber; // 订阅者
final SubscriberMethod subscriberMethod; // 订阅方法
volatile boolean active; // 是否是活动的

事件发布源码解析

当订阅者订阅了一个事件后,任何角色都可以发送事件,如果发送的事件和订阅的事件类型一致,则订阅者可以接收到这个事件。接下来看看事件发送的源码,首先看看 post() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 使用 ThreadLocal 让不同线程拥有不同的 PostingThreadState 对象副本,线程之间互不影响
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};

public void post(Object event) {
// 取当前线程的 PostingThreadState 对象,并把要发送的事件加入队列
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

// 开始事件的发布
if (!postingState.isPosting) {
// 判断当前线程是否为主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环发布事件队列中的事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 状态重置
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

这里有一个新出现的类 PostingThreadState,这个类主要记录事件发布的状态,其字段如下:

1
2
3
4
5
6
final List<Object> eventQueue = new ArrayList<Object>(); // 要发布事件的对象集合,作为事件队列
boolean isPosting; // 是否在发布中
boolean isMainThread; // 发布事件所在的线程是否主线程
Subscription subscription; // 要接收事件的订阅方法
Object event; // 要发布的事件
boolean canceled; // 是否取消事件发布

回到 post() 方法,首先把 PostingThreadState 对象装进了 ThreadLocal,这是由于 post() 方法可以在任意线程并发调用,这样做可以有效隔离各线程事件发布的状态(其实内部机制就是对于不同的线程各自创建了一个新的 PostingThreadState 实例副本)。然后把事件放入了一个队列,通过循环发布每一个事件,调用了 postSingleEvent() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final boolean logNoSubscriberMessages; // 是否打印未找到订阅方法的消息,默认为 true
private final boolean sendNoSubscriberEvent; // 是否发布 NoSubscriberEvent 事件,默认为 true
private final boolean eventInheritance; // 发布事件是否具有继承性,默认为 true

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;

// 发布事件是否具有继承性
if (eventInheritance) {
// 如果具有继承性,则找到该事件的所有父类事件,然后逐一发布
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 如果没有继承性,则只发布该事件本身
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}

// 如果未找到对应事件的订阅方法,会根据 sendNoSubscriberEvent 发布一个 EventBus 定义的事件 NoSubscriberEvent
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

这个方法主要是确认真正要发布的事件,判断了 eventInheritance 这个字段,默认为 true,从这里可以看出在使用的时候如果 eventInheritance(在构造 EventBus 实例时可以进行初始化,前面已经提及)设为 true,则会把发布事件的所有父类事件全部发布,例如:事件 A 继承于 B,那么发布一个 A 事件,订阅方法的事件类型为 A 和 B 都可以接收到这个事件。查找要发布的事件调用了以下的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>(); // 某个事件对应的所有父类事件

private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}

代码比较简单,就是找其父类,不再详细分析。在确认了要发布的事件后,调用了 postSingleEventForEventType() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; // 事件类型对应的订阅方法
private final Map<Object, List<Class<?>>> typesBySubscriber; // 订阅者对应的事件类型

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
// 根据事件类型获取对应的 Subscription 对象集合(进行过订阅操作的订阅方法)
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}

// 进行真正的事件发布动作,发布完成后则重置 PostingThreadState 对象的状态
if (subscriptions != null && !subscriptions.isEmpty()) {
// 对于每一个订阅方法都进行调用
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

这段代码也较简单,用到了之前分析过的两个成员变量,直接看 postToSubscription() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private final HandlerPoster mainThreadPoster; // 用于在主线程中调用订阅方法
private final BackgroundPoster backgroundPoster; // 用于在子线程中调用订阅方法
private final AsyncPoster asyncPoster; // 用于在新的线程中调用订阅方法

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅方法的线程模型来确定订阅方法调用的方式(线程模型的概念和使用前面已经提及)
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 不管发布事件处于什么线程,都在发布事件的线程调用订阅方法
invokeSubscriber(subscription, event);
break;
case MAIN:
// 如果发布事件的线程是主线程则直接调用订阅方法
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// 否则会使用 mainThreadPoster 调用订阅方法
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
// 如果发布事件的线程是主线程则使用 backgroundPoster 调用订阅方法
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
// 否则直接调用订阅方法
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 不管发布事件处于什么线程,都使用 asyncPoster 调用订阅方法
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

这段代码可以说是 EventBus 的核心了,一个一个看,先看看在当前线程直接调用订阅方法的 invokeSubscriber() 方法,代码如下:

1
2
3
4
5
6
7
8
9
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

这个代码很简单了,直接拿上对应订阅方法的 Method 对象进行反射调用,如果抛出 InvocationTargetException 错误则调用一个 handleSubscriberException() 方法来处理错误,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final boolean throwSubscriberException; // 是否抛出 EventBusException 异常,默认为 true
private final boolean logSubscriberExceptions; // 是否打印调用方法失败的异常消息,默认为 true
private final boolean sendSubscriberExceptionEvent; // 是否发布 SubscriberExceptionEvent 事件,默认为 true

private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
// 如果这个事件是由异常引起的则打印一些消息
if (event instanceof SubscriberExceptionEvent) {
if (logSubscriberExceptions) {
Log.e(TAG, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass() + " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
Log.e(TAG, "Initial event " + exEvent.causingEvent + " caused exception in " + exEvent.causingSubscriber, exEvent.throwable);
}
} else {
// 否则根据各个字段进行相应的处理
if (throwSubscriberException) {
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause);
}
if (sendSubscriberExceptionEvent) {
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event, subscription.subscriber);
post(exEvent);
}
}
}

具体的解释都在注释里了,很简单。在阅读各种线程模式下调用订阅方法的代码之前,要先看一看与之相关的两个类:PendingPost 和 PendingPostQueue。

首先是 PendingPost,我把它理解为一个即将发布的事件的实体类,里面包含了要发布的事件,对应的订阅方法,以及下一个 PendingPost 对象,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final class PendingPost {

private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); // 作为对象复用池

Object event; // 要发布的事件
Subscription subscription; // 发布事件对应的订阅方法
PendingPost next; // 下一个 PendingPost 对象

private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}

// 这个方法用来获取一个 PendingPost 对象,会判断对象池里面是否有对象,如果有取出来进行初始化操作,否则新实例化一个对象
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}

// 这个方法是用来释放 PendingPost 对象的,当它用完后调用此方法,释放资源并将其放入对象池中,达到对象复用的目的
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;

// 加上同步锁避免并发问题,并且控制对象池的容量最大值是 10000
synchronized (pendingPostPool) {
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}

}

代码很简单,主要有两个方法,这两个方法应该对应出现,标志着一次事件的发布过程。接着来看一看 PendingPostQueue,这个类看名字就知道是 PendingPost 的队列实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final class PendingPostQueue {

private PendingPost head; // 队列的头部
private PendingPost tail; // 队列的尾部

// 将一个 PendingPost 对象放入队列
synchronized void enqueue(PendingPost pendingPost) {
// 新放入的 PendingPost 对象不能为空
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}

// 进行队列头部和尾部的判断
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}

// 唤醒被 wait() 阻塞的线程
notifyAll();
}

// 取出队列中的一个 PendingPost 对象
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}

// 等待指定时间后取出队列中的一个 PendingPost 对象,或者在等待过程中如果调用了 enqueue()方法也会取出一个 PendingPost 对象
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}

}

这个类主要是实现 PendingPost 队列的功能,提供入列和出列两个方法,出列有两个重载方法,一个是直接出列,另一个是可以在不清楚队列内是否为空的情况下使用。

分析完了 PendingPost 和 PendingPostQueue,我们回到 postToSubscription() 方法,根据不同的线程模式来调用订阅方法。之前分析过在当前线程直接调用订阅方法的情况,接着看看在 ThreadMode.MAIN 模式下的情况,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final HandlerPoster mainThreadPoster; // 用于在主线程中调用订阅方法

EventBus(EventBusBuilder builder) {
...
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); // 进行初始化操作
...
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
...
case MAIN:
// 如果发布事件的线程是主线程则直接调用订阅方法
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// 否则会使用 mainThreadPoster 调用订阅方法
mainThreadPoster.enqueue(subscription, event);
}
break;
...
}
}

以上代码都是与 ThreadMode.MAIN 模式下调用订阅方法相关的(其它的省略了),这里出现了一个新的类 HandlerPoster,它继承自 Handler 类,来看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
final class HandlerPoster extends Handler {

private final PendingPostQueue queue; // PendingPost 的队列实现
private final int maxMillisInsideHandleMessage; // 在 Handler 消息处理函数中的最大毫秒数,用以标记消息是否重发
private final EventBus eventBus; // EventBus 对象
private boolean handlerActive; // Handler 是否活跃,这个标志主要是控制在上一个消息没有处理结束前无法进行消息处理

HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
// 这里指定消息回调方法 handleMessage() 的回调线程
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

// 调用订阅方法的流程开始执行
void enqueue(Subscription subscription, Object event) {
// 新建一个 PendingPost 对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将 PendingPost 放入队列中
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 使用 Handler 发送一个空消息,在回调方法 handleMessage() 中处理逻辑,如果发送失败,抛出异常
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
// 从队列中取出一个 PendingPost 对象,如果为 null,则直接返回终止循环,不会进行接下来操作
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}

// 调用 EventBus 的 invokeSubscriber() 方法,这个稍后再看
eventBus.invokeSubscriber(pendingPost);

// 判断方法调用是否超时,如果超时则重发一个消息,并且终止循环
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
// 如果进行了消息重发,则表明处理仍在进行中,不可进行下一个消息的处理
handlerActive = rescheduled;
}
}

}

这里有个要注意的地方,整个方法调用是一个循环,如果没有达到终止循环的条件(队列中已经没有 PendingPost 对象或者方法调用超时)则会取出下一个 PendingPost 对象继续进行方法的调用,直到取完为止。这里可以看出如果快速连续发布相同的事件,则订阅方法是按照顺序接收事件的,不会出现同时接收或者丢失的情况。

这个类主要使用了 Handler 的特性,调用方法所在的线程关键在于构造器中的 Looper 参数,从之前的代码可以看出使用了 Looper.getMainLooper(),那么不管当前线程情况如何,回调总会发生在主线程中,然后调用了 EventBus 的方法,来看看代码:

1
2
3
4
5
6
7
8
9
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
// 取出需要的参数后进行资源释放
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

这里代码很简单,其实就是获取 PendingPost 中保存的参数,然后调用 invokeSubscriber() 方法在当前线程调用订阅方法。到这里,如何在 ThreadMode.MAIN 模式下调用订阅方法就分析完了。至于如何在 Thread.BACKGROUND 模式和 Thread.ASYNC 模式下调用订阅方法就不作分析了,因为大部分原理和内容都相似,只不过是使用 Runnable 和 ExecutorService 进行线程的切换。

取消订阅源码解析

如果一个订阅者不想再接收任何事件了,可以取消订阅,其方法 unregister() 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final Map<Object, List<Class<?>>> typesBySubscriber; // 订阅者对应的事件类型

public synchronized void unregister(Object subscriber) {
// 获取订阅者对应的事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 遍历事件,把事件类型对应的该订阅者的订阅方法删除
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 删除这个订阅者的所有事件
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

代码很简单,直接看 unsubscribeByEventType() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; // 事件类型对应的订阅方法

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 获取事件类型对应的订阅方法
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

// 删除指定订阅者的订阅方法
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
// 这里指定设为 false,是防止并发操作正在调用订阅方法,而取消订阅操作又没有完成
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

这两个方法其实就是把两个 Map 中和要取消订阅的订阅者相关的信息移除,很容易理解。到了这里,EventBus 的一般操作的源码已经全部分析完毕。

关于粘性事件

粘性事件的使用方法文章开头已经提及,由于本文篇幅已经很长了,不再作出详细分析,其原理其实很简单。首先在订阅过程中,会读取一个名为 stickyEvents 的 Map,如果 Map 中有对象并且当前订阅方法的 sticky 属性为 true,则经过一系列检查(主要是检查这个粘性事件是否已经被订阅者接收过)根据情况进行事件发布。而如果发布时使用粘性事件发布,则会先把事件加入 stickyEvents 中,再进行普通事件的发布。

对于粘性事件,其订阅方法和事件发布会有以下几个逻辑关系:

  • 如果使用 post() 方法发布事件,则订阅者必须在之前先进行订阅操作才可以接收事件,订阅方法的 sticky 属性是否为 true 不影响
  • 如果使用 postSticky() 方法发布事件,订阅者在之前进行过订阅操作,则都可以接收事件,订阅方法的 sticky 属性是否为 true 不影响
  • 如果使用 postSticky() 方法发布事件,订阅者在之前没有进行过订阅操作,此时订阅方法的 sticky 属性起到作用,设为 true,当订阅者之后任意时刻进行订阅操作时都可以接收事件,而设为 false,此后订阅者再进行订阅操作也无法接收事件

关于注解处理器

EventBus 可以通过配置进行加速,其原理是通过注解处理器在编译期间生成了一个索引类,这样查找订阅方法阶段不需要使用发射,运行效率大大提高。注解处理器本文不作介绍,其用途也很广泛,可以参考我的另一篇博客:编写一个 Android 编译时注解框架的一般套路——以 ButterKnife 为例

总结

EventBus 是一款非常值得学习的开源框架,其优秀的代码架构和穿插代码间的设计模式都是可以学习的技巧,代码中大量地方用到了对象复用池这个概念,可以减少实例化对象的次数,提高了运行效率。代码中还对并发操作会出现的各种情况都作了相应处理,比如使用 ThreadLocal 类保证単例模式下对象的成员变量能在不同的线程中各自使用,互不影响,这类处理还有很多。

文章很长,从开始读源码到写完博客花了两个星期时间,不过在过程确实学到了不少,这也是对自己学习过程中的一个记录。道路尚远,且行且珍惜!

坚持原创技术分享,您的支持将鼓励我继续创作!