解耦Spring之后该如何高效的进行事件通知

Posted by klaus_turbo on 2022-08-21
发布订阅模型

解耦Spring之后该如何高效的进行事件通知

Spring 作为重量级生产级的框架,在日常的企业开发过程中被广泛应用,其提供的 IOC ,AOP ,WebMVC等给开发带来了极大的便利。但在使用的过程中,逐渐会发现,随着开发迭代的不断深入,Spring 逐渐变成了开发过程中不可缺少的一个环节,Spring 所提供的类,方法在项目中遍地都是。慢慢的,框架的使用似乎逐渐成了业务逻辑一部分,想要脱离 Spring 变得及其的困难,甚至可行性变成了零。不光如此,相信很多做过框架升级的伙伴也发现了其中难以察觉的坑,一旦某一个版本出现不确定,项目就可能直接运行失败。等等诸如此类的问题,不得不思考,Spring 真的这么需要吗?(后续会有关于 简单实现一个 Spring 的分享出来,知己知彼,方能百战不殆)

参考很多框架中间件的实现,就不难发现,基本很少有中间件会在实现的时候直接去使用 Spring ,而是在通过后续的开发,独立出一个专门用于适配Spring的版本。我想作为这些中间件的开发创作者也大都有上面类似的顾虑吧,还有就是 Spring 的启动时间确实是难以接受,对于一些需要快速启动,低故障恢复时延的项目,Spring的接入启动好像并不是很能让人满意。

好了,说了这么多拙见,似乎离我们的标题越来越远了…言归正传,Spring大家既然都用过了,那它的事件通知应该也都用过的吧。不知道有没有发现一个问题,就是当你使用 ApplicationPublisher 的时候,你首先得知道这个事件是通过谁发送出去的,然后接收者得知道自己接受的是什么事件。但是我们所用的事件通知,所关注的本身是事件,对于接收者来说没有一点问题,但是要把一个事件发送出去,我们得先知道知道发送者是谁,是不是觉得有些奇怪?那要是当事件多了之后呢?

我们希望的是,既然是使用的事件通知这么一个东西,那么在开发的时候我想关注的只是事件本身,并不希望使用一个事件,就要显示的再去使用这个事件的发送者,同时回到我们前面说的,如果不使用 Spring ,那我们该怎么去实现事件通知呢?

可能有熟悉GOF23的小伙伴会脱口而出:观察者模式。没错,最根本额设计实现思想就是观察者模式,再结合我们前面说到的,要在使用的时候只关注事件本身,而不需要知道什么事件由什么发送者发出的,来看看我是怎么实现的。

事件通知的三板斧我们先定义好:事件,发送者,订阅者。

首先是事件的定义,我们使用一个标记类去定义这个事件:

1
2
public abstract class Event implements Serializable {
}

事件基类定义好了之后我们去定义事件的发送者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface EventPublisher extends Closeable {

/**
* 定义好当前${@link EventPublisher}发送的是什么类型的事件.
* @return {@link Event}
*/
Class<? extends Event> getEventType();

/**
* 给当前的发布者添加对其发布事件感兴趣的订阅者(通过一个订阅者列表实现事件广播).
*
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber);

/**
* 从订阅者列表中删除一个订阅者.
*
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber);

/**
* 发布事件.
*
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);

/**
* Notify listener.
*
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
}

然后定义这个事件的订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class Subscriber<T extends Event> {

/**
* 事件通知的回调.
*
* @param event {@link Event}
*/
public abstract void onEvent(T event);

/**
* 当前订阅者所关注的事件类型.
*
* @return Class which extends {@link Event}
*/
public abstract Class<? extends Event> subscribeType();

/**
* 处理事件的线程池(用于提高订阅者事件的处理能力).
*
* @return {@link Executor}
*/
public Executor executor() {
return null;
}

}

这样我们就定义好了事件,发布者,订阅者三个角色的基类。通过我们定义的 EventPublisher 来对 Event 事件进行发布,同时再运行的时候再把 Subscriber 通过 EventPublisher 中定义的 addSubscriber 方法注册到对应的 EventPublisher 中的订阅者列表中去,这样基本的观察者模式就成型了,同时,由于我们使用的是订阅者列表,实现了订阅者和发布者之间 N:1 的映射关系,以及当事件来临时,使得广播成为了可能。

好了,这样问题就来了,按照现在的逻辑,那是不是在发布事件的时候还是得显示的使用事件相关的 EventPublisher 实现呢?是的,按照现在的逻辑确实需要这么做,那我们怎么去解决这个问题呢?

既然说我们不想每次在发布一个事件的时候就要去显示的声明当前事件的发布者,那我们是不是可以想办法,在一个中转站把事件和事件发布者关联起来,将他们之间的一对一的映射关系保存在其中,这样我们每次发布事件的时候就可以通过同一个中转站去发送事件,发送的时候只需要通过映射关系去找到对应的 EventPublisher 就好了。同样的,由于 Subscriber 和 Event 的映射关系也是一对一的,所以我们便可以通过中转站将 Subscriber 注册到对应的 EventPublisher 中去了,我们来看一下这个所谓的中转站如何设计实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class UnifiedNotifyCenter   {

private static final Logger LOGGER = LoggerFactory.getLogger(UnifiedNotifyCenter.class);

private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);

public UnifiedNotifyCenter(Collection<EventPublisher> publishers) {
// 可以在此初始化 publisherMap
}

/**
* 往对应的${@link EventPublisher} 中注册订阅者.
* @param subscriber
*/
public void registerSubscriber(final Subscriber subscriber) {
String topic = subscriber.subscribeType().getCanonicalName();
EventPublisher publisher = publisherMap.get(topic);
if (Objects.nonNull(publisher)) {
publisher.addSubscriber(subscriber);
}
}

/**
* 发布事件.
* @param event
* @return
*/
public boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
LOGGER.error("There was an exception to the message publishing : ", ex);
return false;
}
}

/**
* 发布事件.
* @param eventType
* @param event
* @return
*/
public boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
String topic = eventType.getCanonicalName();
EventPublisher publisher = publisherMap.get(topic);
if (!Objects.isNull(publisher)) {
return publisher.publish(event);
}
LOGGER.warn("[UnifiedNotifyCenter] There is no [{}] publisher for this event.", topic);
return false;
}

}

这就是我定义的 中转站,通过一个Hash映射 Event 和 EventPublisher ,然后把注册和事件发布的操作都委托给这里的 UnifiedNotifyCenter。这样,我们就解决了每次事件发布都需要声明事件对应发布者的问题,并将注册订阅者和事件发布的操作委托到了 UnifiedNotifyCenter 中去。同时,由于 Subscriber 必须要注册到 EventPublisher 中才能进行事件通知,若没有中间这个 UnifiedNotifyCenter ,那么将会由于 EventPublisher 必须执行的注册操作而导致 EventPublisher 与 Subscriber 耦合到一起,在引入了 UnifiedNotifyCenter 之后,解决了两者之间的必然的耦合关系。

由于事件与事件发布者之间的关系需要提前维护,所以我们就需要通过某种方式将两者维护起来,可以通过 JDk 自带的 ServiceLoader 去在构造函数中初始化彼此的映射关系。或者你若想在 Spring 中使用此设计,那你便可以在构造函数中通过依赖注入的方式去初始化这个映射。

下面以 JDK 的ServiceLoader 为例,展示一下如何通过 ServiceLoader 去初始化:

首先为了方便演示,我们先定义一个通用的 事件:

1
2
3
4
public class GeneralEvent extends Event{

}

然后再定一个发送此类事件的发布者:

1
2
3
4
5
6
7
8
9
10
11
12
public class GeneralEventPublisher extends BaseEventPublisher {

@Override
public void shutdown() {
// do nothing
}

@Override
public Class<? extends Event> getEventType() {
return GeneralEvent.class;
}
}

既然要使用 JDK 的ServiceLoader, 那我们要先在 resouces 的META-INF.services 下面新建一个用于查找的文档,名称为我们的 EventPublisher 基类:

com.example.demo.event.publisher.EventPublisher

其中我们在文档中定义我们的 GeneralEventPublisher :

com.example.demo.event.publisher.GeneralEventPublisher

最后,我们在 UnifiedNotifyCenter 的无参构造函数中通过 ServiceLoader 去加载并初始化映射关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public UnifiedNotifyCenter() {
ServiceLoader<EventPublisher> eventPublisherServiceLoader = ServiceLoader.load(EventPublisher.class);
for (EventPublisher publisher : eventPublisherServiceLoader) {
System.out.println( publisher.getEventType().getCanonicalName());
publisherMap.putIfAbsent(publisher.getEventType().getCanonicalName(), publisher);
}

// 用于资源回收
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!publisherMap.isEmpty()) {
publisherMap.forEach((key, value) -> value.shutdown());
}
}));
}

这样我们的一整套的时间通知机制就完成了。

为了解决事件多的时候事件处理时间的问题,我们可以采用一种类似 Reactor 模式的方式,在事件发布者内部维护一个监听线程,以及一个用于缓冲的队列,将事件产生的线程和事件分发的线程解耦开。同时,我们在前面定义 Subscriber 的时候,定义了一个 executor() 方法用于获取定义好的处理事件的线程池,来看看我是如何定义这么一个 类 Reactor模式的事件发布者的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public abstract class WorkerBasedEventPublisher extends Thread implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkerBasedEventPublisher.class);

private final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();

private final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(1024);

private volatile boolean shutdown = false;

public WorkerBasedEventPublisher() {
start();
}

public ConcurrentHashSet<Subscriber> getSubscribers() {
return subscribers;
}

@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}

@Override
public boolean publish(Event event) {
boolean success = this.queue.offer(event);
if (!success) {
System.out.println("添加发布时间失败");
LOGGER.warn("[WorkerBasedEventPublisher # publish] Error to publish event : [{}]",event.toString());
}
return true;
}

@Override
public void notifySubscriber(Subscriber subscriber, Event event) {

LOGGER.debug("[ UnifiedNotifyCenter ] the {} will received by {}", event, subscriber);
final Runnable job = () -> subscriber.onEvent(event);

final Executor executor = subscriber.executor();

if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Exception e) {
LOGGER.error("Event callback exception: ", e);
}
}
}

@Override
public void run() {
openEventHandler();
}

void openEventHandler() {
try {
int waitTimes = 60;
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
// 防止 CPU 空转
TimeUnit.SECONDS.sleep(1);
waitTimes--;
}

for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}

void receiveEvent(Event event) {
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event.toString());
return;
}
for (Subscriber subscriber : subscribers) {
notifySubscriber(subscriber, event);
}
}

private boolean hasSubscriber() {
return CollectionUtils.isNotEmpty(subscribers);
}

@Override
public void shutdown() {
this.shutdown = true;
// 这边只是简单的将事件队列清空了,其实可以在调用销毁的钩子的时候将事件处理结束,当// 然这需要看具体的业务需求
queue.clear();
}
}

这样,当需要提高处理事件的能力的时候我们就可以直接继承 WorkerBasedEventPublisher 这个事件发布者了。

好了,至此一个属于我们自己的发布订阅模型就实现结束了,在使用的时候我们只需要做如下几步:

  1. 定义好事件
  2. 根据需求选择合适的 EventPublisher 定义好事件发布者
  3. 定义好事件订阅者
  4. 将事件订阅者通过统一通知中心注册到对应事件的发布者中去
  5. 通过JDK的spi或者其他的扩展机制,将事件与事件发布者的映射关系初始化到统一通知中心中去
  6. 通过统一通知中心发布事件

当然,本篇文章虽然说的是解耦 Spring 之后事件通知如何进行,但在 Spring 项目中我们也是可以使用这个发布订阅模型的实现的。

关于事件通知实现的内容讲到这里就结束了,肯定还有更好的实现方式存在,这不重要,重要的是你是否有想法,是否有跳出框架后该怎么做的想法。

好的,谢谢您花了很多时间看完这篇文章,有建议的欢迎一起来探讨。

本人关于图片作品版权的声明:

  1. 本人在此刊载的原创作品,其版权归属本人所有。

  2. 任何传统媒体、商业公司或其他网站未经本人的授权许可,不得擅自从本人转载、转贴或者以任何其他方式复制、使用上述作品。

  3. 传统媒体、商业公司或其他网站对上述作品的任何使用,均须事先与本人联系。

  4. 对于侵犯本人的合法权益的公司、媒体、网站和人员,本人聘请的律师受本人的委托,将采取必要的措施,通过包括法律诉讼在内的途径来维护本人的合法权益。

特此声明,敬请合作。