Glide篇RxJava2篇OkHttp篇Retrofit篇后的又一篇源码分析,要点如下:

  • EventBus中用到的APT技术(annotationProcessor)
  • EventBus的注册与反注册过程(register、unregister)
  • EventBus事件的发送与接收(HandlerPoster、BackgroundPoster、AsyncPoster)

EventBus中用到的APT技术

EventBus作为事件总线,是观察者模式的典型实现。事件是被观察者,而观察者就是我们用@Subscribe注解的方法了。EventBus3.0允许以两种方式生成订阅表,既可以依靠APT技术在编译期通过事先生成的代码来构建订阅表,也可以在运行期通过反射来生成订阅表,显然前者在性能上要更好一些。

如果要采用编译期构建订阅表的方式,首先在build.gradle中声明编译期生成类的完整包名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
android {
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [ eventBusIndex : 'com.example.myapp.MyEventBusIndex' ]
}
}
}
}
dependencies {
compile 'org.greenrobot:eventbus:3.0.0'
annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.0.1'
}

然后在初始化时把编译期生成的MyEventBusIndex实例插入到DefaultBus中

1
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

负责APT的模块是EventBusAnootationProcessor,首先进入它的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//EventBusAnnotationProcessor.java
private void createInfoIndexFile(String index) {
BufferedWriter writer = null;
try {
//根据从build.gradle中获得的完整包名创建订阅表类
JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
int period = index.lastIndexOf('.');
String myPackage = period > 0 ? index.substring(0, period) : null;
String clazz = index.substring(period + 1);
writer = new BufferedWriter(sourceFile.openWriter());
if (myPackage != null) {
writer.write("package " + myPackage + ";\n\n");
}
writer.write("import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n");
writer.write("import org.greenrobot.eventbus.ThreadMode;\n\n");
writer.write("import java.util.HashMap;\n");
writer.write("import java.util.Map;\n\n");
writer.write("/** This class is generated by EventBus, do not edit. */\n");
writer.write("public class " + clazz + " implements SubscriberInfoIndex {\n");
writer.write(" private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;\n\n");
writer.write(" static {\n");
//在这里创建一个HashMap对象做索引表
writer.write(" SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();\n\n");
writeIndexLines(writer, myPackage);
writer.write(" }\n\n");
writer.write(" private static void putIndex(SubscriberInfo info) {\n");
writer.write(" SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n");
writer.write(" }\n\n");
//对外提供的方法,EventBus运行时通过该方法获取需要的SubscribeInfo
writer.write(" @Override\n");
writer.write(" public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n");
writer.write(" SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n");
writer.write(" if (info != null) {\n");
writer.write(" return info;\n");
writer.write(" } else {\n");
writer.write(" return null;\n");
writer.write(" }\n");
writer.write(" }\n");
writer.write("}\n");
} catch (IOException e) {
throw new RuntimeException("Could not write source for " + index, e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
//Silent
}
}
}
}
//核心方法,将一个Class类作为key,将包含了该类里所有标记了Subscribe注解的Method和其他信息的SubscriberInfo作为value,放入到订阅表中,由此生成订阅表
private void writeIndexLines(BufferedWriter writer, String myPackage) throws IOException {
for (TypeElement subscriberTypeElement : methodsByClass.keySet()) {
if (classesToSkip.contains(subscriberTypeElement)) {
continue;
}
String subscriberClass = getClassString(subscriberTypeElement, myPackage);
if (isVisible(myPackage, subscriberTypeElement)) {
writeLine(writer, 2,
"putIndex(new SimpleSubscriberInfo(" + subscriberClass + ".class,",
"true,", "new SubscriberMethodInfo[] {");
List<ExecutableElement> methods = methodsByClass.get(subscriberTypeElement);
//解析Subscribe注解包含的信息,得到SubscriberMethodInfo(包含每个Method的MethodName、Thread、isSticky等重要信息)
writeCreateSubscriberMethods(writer, methods, "new SubscriberMethodInfo", myPackage);
writer.write(" }));\n\n");
} else {
writer.write(" // Subscriber not visible to index: " + subscriberClass + "\n");
}
}
}

上面截取了EventBusAnootationProcessor中最核心的生成代码的部分:

  • 首先根据build.gradle中事先声明的完整包名和类名创建一个索引类
  • 在类中创建一个HashMap做订阅表,并对外提供getSubscriberInfo(Class<?> subscriberClass)方法,以便EventBus运行时从表中查阅需要的SubscribeInfo
  • writeIndexLines是填充订阅表的主要方法,它将一个Class类作为key,将包含了该类里所有标记了Subscribe注解的Method和其他信息的SubscriberInfo作为value,put到订阅表中。
  • writeCreateSubscriberMethods方法的作用是解析每个Subscribe注解的方法以及注解中包含的信息,它在解析后获得SubscribeMethodInfo对象,该对象用来创建作为订阅表value的SimpleSubscribeInfo

EventBus的注册与反注册过程

上面已经分析过编译期生成订阅表的过程了,有了订阅表之后注册与反注册就十分简单了。先看注册部分的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//通过当前注册类的Class类型获取该类中标注了Subscribe的Methods
//整个过程在SubscriberMethodFinder类中完成,该类会判断从编译期
//生成的订阅表中获取,还是通过反射动态的获取
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//将一个Method与当前注册传入的实例组合成一个Subscription对象
//这是因为通过反射invoke方法时需要指明是调用哪个实例的该方法,所以实例也是必要的
//Subscription是接收事件的基本单位
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
//根据Method形参的Class类型取出对应的subscriptions,将刚生成的subscription加入其中
//然后再放回subscriptionsByEventType
//这样当有事件到来时,就能获取到事件类型对应的所有subscription
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//这里更新typesBySubscriber这个map,把当前注册的实例作为key,把实例中所有订阅方法的参数Class类型组成的List作为value,存入其中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//最后处理sticky事件,如果订阅方法中有isSticky为true的方法,则取该方法形参类型,检索之前存储的StickyEvent事件,如果存在相应类型的事件则直接发送该事件到该方法
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

上面的注释已经标注的很详细了,从上面的代码中可以看到,在我们平时调用EventBus.getDefault().register(this)后执行了如下操作:

  • 首先从SubscriberMethodFinder实例中根据注册实例的Class类型获取该注册类中的所有注解了Subscribe的方法List<SubscriberMethod>,至于SubscriberMethodFinder中是如何解析获得这些订阅方法信息的问题我们后面再分析
  • 接着第一步是更新subscriptionsByEventType这个map对象,该对象是EventBus中最重要的容器,保存了当前所有注册了的订阅方法。这一步将每一个subscriberMethod和当前注册的实例subscriber组成了一个新对象Subscription,它是接收事件的基本单位。再依据subscriberMethod中保存的当前方法的形参类型(即意图接收的事件类型),从subscriptionsByEventType中取到对应的subscriptions,将新的Subscription存入其中,完成注册事件的登记。
  • 然后第二步是更新typesBySubscriber这个map对象,该对象把当前注册的实例作为key,把实例中所有订阅方法的参数Class类型组成的List作为value,保存在其中。这么做一方面可以方便的查询当前实例是否注册过,另一方当反注册时也能根据取到的types从subscriptionsByEventType取到所有需要做remove的subscriptions,进而分别从中移除不再需要监听事件的方法。
  • 最后一步是处理sticky事件。当一个订阅方法的isSticky标注为true时,显然在注册的那一刻就应该检索目前所有的stickyEvents找出匹配类型的事件,并发射事件到该订阅方法。

接下来进入另一个重要的类SubscriberMethodFinder看下它是如何根据注册实例的Class类型获取该类所有订阅方法的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//SubscriberMethodFinder.java
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//先从缓存取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//如果指定不使用编译生成的索引表,则直接用反射去获取类中的订阅方法
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//否则依靠索引表来获取订阅方法
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将本次解析结果存入缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
//该方法优先尝试从编译期生成的索引表中取需要的SubscriberInfo,未取到则走反射的方法取
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//尝试从索引表中取
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//未取到则以反射的方式获取
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
//获取一个FindState对象用来保存本次解析的各种信息
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}
//该方法从编译期生成的索引表`subscriberInfoIndexes`中取需要的SubscriberInfo
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
//该方法通过反射来获取订阅方法,并解析Subscribe注解中包含的信息,生成SubscriberMethod
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//反射获取所有方法并检查是否有Subscribe注解,是否添加过,是否只有一次参数
//满足所有条件后,则将注解中的信息和当前Method封装成一个subscriberMethod保存起来
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

从以上代码可以分析出如下内容:

  • FindStateSubscriberMethodFinder的一个静态内部类,专门用来保存本次解析中获得的各种信息,为了节约内存开销由一个FindState数组充当对象池,默认大小为4
  • 当外部调用findSubscriberMethods方法时,首先会尝试从缓存METHOD_CACHE中根据Class类型获取订阅方法信息。未取到则进入findUsingInfo方法,尝试从编译期生成的索引表subscriberInfoIndexes中取,如果仍未取到则走开销较大的反射方式findUsingReflection方法获取。最终取到的subscriberMethods再返回前会先存入缓存
  • findUsingReflection方法中会反射获取所有方法并检查是否有Subscribe注解,是否添加过,是否只有一次参数,满足所有条件后,则将注解中的信息和当前Method封装成一个subscriberMethod保存起来

经过以上步骤,将所有注册类中的订阅方法都封装成了Subscription并更新到了subscriptionsByEventType中,注册过程结束,接下来看下反注册方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//EventBus.java
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//先从subscribedTypes中获取所有types
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//然后根据types获取所有subscriptions
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//最后根据subscriber从所有subscriptions中移除掉该类当初注册进去的方法
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

反注册过程很简单,大致可以视为3个步骤:

  • 首先根据反注册实例的Class类型从subscribedTypes中取到所有types
  • 然后分别根据每个type从subscriptionsByEventType中取到所有subscriptions
  • 最后根据反注册的实例subscriber,从每个subscriptions中移除当初注册进去的方法

至此,subscriptionsByEventType中不再包含被反注册了的类中的方法,反注册完成。

EventBus事件的发送与接收

先从post方法的源码开始分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//EventBus.java
//发射一个事件,首先获取发射时的线程环境,然后事件入队
//然后在while循环中取事件进入postSingleEvent方法
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
//发射sticky类型的事件,相比post唯一多出的步骤就是将本次事件添加到stickyEvents
//至于stickyEvent的用法在上面介绍SubscriberMethodFinder已介绍,不再赘述
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
//该方法主要判断本次事件是否涉及超类
//如果涉及超类的话超类和当前类都会发出事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
//该方法中根据本次事件的Class类型,即type,从subscriptionsByEventType中取到所有订阅者subscription
//然后进入postToSubscription执行线程调度环节
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
//该方法根据当前线程环境,以及事件的目标接收环境,选择适合的poster来真正发射事件
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

现在梳理一下发射事件的过程,这个过程包含很多函数调用,环环相扣:

  • 先额外提一下postSticky方法,该方法相比post唯一多出的步骤就是将本次事件添加到stickyEvents,至于stickyEvent的用法在上面介绍SubscriberMethodFinder已介绍,不再赘述
  • post方法是发射事件的起点,首先获取本次发射时的线程环境,然后事件入队,之后while循环中取出所有事件依次进入postSingleEvent方法
  • postSingleEvent方法主要判断本次事件是否涉及超类接收者,如果超类订阅方法也允许接收到事件,则分别发出超类事件和当前类事件
  • postSingleEventForEventType方法根据本次事件的Class类型,即type,从subscriptionsByEventType中取到对应的事件订阅者subscription,用于之后接收事件
  • postToSubscription方法是最终发出事件的地方,这里会根据事件发射时的线程环境和订阅者指定的接收环境来选择合适的poster发出事件,并触发subscription来接收事件

下面来分别分析几种事件发送者:

  • HandlerPoster:主线程发送者,本质是持有MainLooper的Handler
  • BackgroundPoster: 单一子线程发送者,本质是Runnable,由线程池调度,同步块控制串行发送事件
  • AsyncPoster:异步子线程发送者,本质是Runnable,由线程池调度,由对应数量的线程并行发送事件

首先进入HandlerPoster的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//HandlerPoster.java
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

由以上代码可见HandlerPoster中持有一个缓冲队列PendingPostQueue,当有事件传入时首先将事件和订阅者组合封装成一个PendingPost对象并入队。其实不仅仅是HandlerPoster,在BackgroundPoster和AsyncPoster中也有这么一个PendingPostQueue。先进入它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//PendingPostQueue.java
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
//PendingPost.java
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}

可见它只是一个简单的单向链表实现的队列结构。当有对象入队时队尾向后移动移一位指向新对象,当有对象出队时则将当前表头返回,并将表头向后移一位。同时还用到了wait和notifyAll,支持队列阻塞等待

再看PendingPost,它只是一个简单的实体类,持有event和subscription,并且为了节省内存开销,在类第一次加载时会创建一个pendingPostPool做对象池,对外提供obtainPendingPost来复用PendingPost。

再回到HandlerPoster,当PendingPoster入队后紧接着会发一个空Message到Handler,从而触发handleMessage,保证在主线程执行最终的事件接收eventBus.invokeSubscriber(pendingPost),那么这个invokeSubscriber方法又是如何实现的呢?进入源码:

1
2
3
4
5
6
7
8
9
10
11
//EventBus.java
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber,
event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

只有一行代码,直接从subscription中取出当初注册时存入的实例和方法,将event作为参数,以反射的形式触发该实例的该方法,由此订阅方法就接收到了事件,整个流程结束。

最后再看一下BackgroundPoster和AsyncPoster的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//BackgroundPoster.java
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//AsyncPoster.java
class AsyncPoster implements Runnable {
private final PendingPosterQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPoster pendingPoster = PendingPoster.obtainPendingPoster(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No Pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}

两者在实现上很像,都实现了Runnable接口,由线程池来执行,确保在子线程中完成事件的接收与触发。所使用的线程池默认是newCachedThreadPool,该线程池的特点是按需动态创建与回收线程,正好兼容上面串行执行和并行执行两种场景。在BackgroundPoster中由于加了同步锁,线程池只需要提供单线程串行发送事件即可,在AsyncPoster中线程池会提供与并发事件数相当的线程来并发发送事件,需要注意控制并发量。

最后

到此EventBus源码分析就结束了,EventBus除了实现了经典的观察者模式以外,还提供了Sticky,Inheritance,线程调度等便利的功能,帮助我们减少大量的回调,充分解耦各模块。但要注意不要忘记反注册,否则注册的实例会一直保存在EventBus中造成内存泄漏。EventBus相比其他框架需要记录、处理大量数据,且这些数据间还维持着各种映射关系,它对数据的合理封装也是很值得我们借鉴的一点(ゝ∀・)b

声明:本站所有文章均为原创或翻译,遵循署名-非商业性使用-禁止演绎 4.0 国际许可协议,如需转载请确保您对该协议有足够了解,并附上作者名(Est)及原贴地址