观察者模式 - Observer

背景

观察者模式(Observer Pattern)用来处理某个值得关注的状态的对象状态变更,进而执行相应的操作

将自身的状态改变通知给其他对象, 我们也将其称为 发布者 (publisher)

所有希望关注发布者状态变化的其他对象被称为 订阅者 (subscribers)

所有订阅者都必须实现同样的接口, 发布者仅通过该接口与订阅者交互, 接口中必须声明通知方法及其参数, 这样发布者在发出通知时还能传递一些上下文数据

目的

实现一种订阅机制,可在对象事件发生时通知多个 “观察” 该对象的其他对象;订阅者能在不与具体发布者类耦合的情况下通过接口观察发布者的状态

真实世界类比

商店缺货时,用户向商店登记联系电话,当补货后商店通过电话通知对该商品(事件)感兴趣的用户;而不是:

  • 用户频繁到达商店查询是否到货
  • 商品通知所有用户缺货商品到货

实践

模拟一个场景,实现一个日志装饰,当日志信息打印异常时,进行相应日志等级的处理:

  • warn 级别发送邮件
  • error 级别电话通知

问题

可以对日志实现进行包装,在对应的日志级别方法进行增强

例如,实现一个 MyLogger 的装饰类,将增强的方法写在相关装饰的方法上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyLogger {

private TinyLog log = new TinyLog("MyLogger");

public void info(String format, Throwable t, Object... arguments) {
log.info(format, arguments, t);
}

public void warn(String format, Throwable t, Object... arguments) {
log.warn(format, arguments, t);
if (Objects.nonNull(t)) {
System.out.println("邮件通知管理员");
}
}

public void error(String format, Throwable t, Object... arguments) {
log.error(format, arguments, t);
if (Objects.nonNull(t)) {
System.out.println("电话通知管理员");
}
}
}

这里就会存在如下问题:

  • 关注日志的逻辑需要硬编码在日志打印的核心逻辑中,监听行为和发布者、甚至和功能的主要实现耦合
  • 无法动态增加、移除订阅者

这种情况下使用观察者模式解决以上问题

实现

监听者接口

所有的监听者实现该接口,当事件触发时通过该接口的实现告诉发布者需要执行的操作

1
2
3
4
5
public interface LogEventListener {

void call(Throwable t);

}

发布者

维护了监听者列表,提供了供事件触发的通知行为以及订阅、移除订阅等操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class LogEventManager {

private Map<Level, List<LogEventListener>> listeners = new HashMap<>();

public LogEventManager() {
for (Level level : Level.values()) {
listeners.put(level, new ArrayList<>());
}
}

public void subscribe(Level level, LogEventListener listener) {
List<LogEventListener> listenerList = listeners.get(level);
listenerList.add(listener);
}

public void unsubscribe(Level level, LogEventListener listener) {
List<LogEventListener> listenerList = listeners.get(level);
listenerList.remove(listener);
}

public void call(Level level, Throwable t) {
List<LogEventListener> listenerList = listeners.get(level);
for (LogEventListener listener : listenerList) {
listener.notify(t);
}
}
}

日志装饰接入

只需要在具体的行为(事件)调用发布者

至于发布者的逻辑、发布者维护的监听者,不需要核心业务逻辑关注

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyLogger {

private TinyLog log = new TinyLog("MyLogger");

private LogEventManager logEventManager;

public MyLogger(final LogEventManager logEventManager) {
this.logEventManager = logEventManager;
}

public void info(String format, Throwable t, Object... arguments) {
log.info(format, arguments, t);
}

public void warn(String format, Throwable t, Object... arguments) {
log.warn(format, arguments, t);

if (Objects.nonNull(t)) {
logEventManager.call(Level.WARN, t);
}
}

public void error(String format, Throwable t, Object... arguments) {
log.error(format, arguments, t);

if (Objects.nonNull(t)) {
logEventManager.call(Level.ERROR, t);
}
}
}

监听者实现

邮件通知实现

1
2
3
4
5
6
public class EmailCallListener implements LogEventListener {
@Override
public void call(final Throwable t) {
System.out.println("邮件通知管理员 " + t.getMessage());
}
}

电话通知实现

1
2
3
4
5
6
public class PhoneCallListener implements LogEventListener {
@Override
public void call(final Throwable t) {
System.out.println("电话通知管理员 " + t.getMessage());
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Run {

public static void main(String[] args) {
// 发布者(管理)
LogEventManager logEventManager = new LogEventManager();
logEventManager.subscribe(Level.WARN, new EmailCallListener());
logEventManager.subscribe(Level.ERROR, new PhoneCallListener());

// 日志装饰类
MyLogger myLogger = new MyLogger(logEventManager);

// 打印日志
RuntimeException exception = new RuntimeException("an error occurred");
myLogger.warn("get warn", exception);
myLogger.error("get error", exception);
}
}
1
2
3
4
5
6
七月 27, 2023 12:09:08 上午 design.behavioral.observer.MyLogger warn
警告: get warn
七月 27, 2023 12:09:08 上午 design.behavioral.observer.MyLogger error
严重: get error
邮件通知管理员 an error occurred
电话通知管理员 an error occurred

变更订阅

业务调整,对于 error 日志也需要进行邮件通知,那么只需要调整订阅关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Run {

public static void main(String[] args) {
LogEventManager logEventManager = new LogEventManager();
logEventManager.subscribe(Level.WARN, new EmailCallListener());
logEventManager.subscribe(Level.ERROR, new PhoneCallListener());
// 对 error 增加实现
logEventManager.subscribe(Level.ERROR, new EmailCallListener());

MyLogger myLogger = new MyLogger(logEventManager);

RuntimeException exception = new RuntimeException("an error occurred");
myLogger.warn("get warn", exception);
myLogger.error("get error", exception);
}
}

实际应用

Apache 的 commons.io,可以有功能来监听本地文件的变更

观察者 FileAlterationObserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected FileAlterationObserver(final FileEntry rootEntry, final FileFilter fileFilter,
final IOCase caseSensitivity) {
if (rootEntry == null) {
throw new IllegalArgumentException("Root entry is missing");
}
if (rootEntry.getFile() == null) {
throw new IllegalArgumentException("Root directory is missing");
}
this.rootEntry = rootEntry;
this.fileFilter = fileFilter;
if (caseSensitivity == null || caseSensitivity.equals(IOCase.SYSTEM)) {
this.comparator = NameFileComparator.NAME_SYSTEM_COMPARATOR;
} else if (caseSensitivity.equals(IOCase.INSENSITIVE)) {
this.comparator = NameFileComparator.NAME_INSENSITIVE_COMPARATOR;
} else {
this.comparator = NameFileComparator.NAME_COMPARATOR;
}
}

构造方法中传入一个文件、过滤器等对象

监听者 FileAlterationListener

FileAlterationListener 是一个接口,用于实现文件相关事件的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public interface FileAlterationListener {

/**
* File system observer started checking event.
*
* @param observer The file system observer
*/
void onStart(final FileAlterationObserver observer);

/**
* Directory created Event.
*
* @param directory The directory created
*/
void onDirectoryCreate(final File directory);

/**
* Directory changed Event.
*
* @param directory The directory changed
*/
void onDirectoryChange(final File directory);

/**
* Directory deleted Event.
*
* @param directory The directory deleted
*/
void onDirectoryDelete(final File directory);

/**
* File created Event.
*
* @param file The file created
*/
void onFileCreate(final File file);

/**
* File changed Event.
*
* @param file The file changed
*/
void onFileChange(final File file);

/**
* File deleted Event.
*
* @param file The file deleted
*/
void onFileDelete(final File file);

/**
* File system observer finished checking event.
*
* @param observer The file system observer
*/
void onStop(final FileAlterationObserver observer);
}

注册监听者

调用 FileAlterationObserveraddListener 方法添加监听者

1
2
3
4
5
6
7
8
9
10
/**
* Adds a file system listener.
*
* @param listener The file system listener
*/
public void addListener(final FileAlterationListener listener) {
if (listener != null) {
listeners.add(listener);
}
}

最终监听者被保存在 private final List<FileAlterationListener> listeners = new CopyOnWriteArrayList<>() 属性中

监控器 FileAlterationMonitor

监控器 FileAlterationMonitor 包装了一系列观察者,以及监控间隔时间等参数

1
2
3
4
5
6
7
8
public FileAlterationMonitor(final long interval, final FileAlterationObserver... observers) {
this(interval);
if (observers != null) {
for (final FileAlterationObserver observer : observers) {
addObserver(observer);
}
}
}

启动监控

调用 FileAlterationMonitorstart 方法开启监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void start() throws Exception {
if (running) {
throw new IllegalStateException("Monitor is already running");
}
for (final FileAlterationObserver observer : observers) {
observer.initialize();
}
running = true;
if (threadFactory != null) {
thread = threadFactory.newThread(this);
} else {
thread = new Thread(this);
}
thread.start();
}

FileAlterationMonitor 本身是一个 Runnable 实现

run 方法的实现,使用 while 循环判断,调用 FileAlterationObservercheckAndNotify 来观察和通知监听者

Thread.sleep(interval) 来进行时间间隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Runs this monitor.
*/
@Override
public void run() {
while (running) {
for (final FileAlterationObserver observer : observers) {
observer.checkAndNotify();
}
if (!running) {
break;
}
try {
Thread.sleep(interval);
} catch (final InterruptedException ignored) {
// ignore
}
}
}

FileAlterationObservercheckAndNotify 就是发布者的核心方法,比较文件的变化,调用已注册监听者的相关处理方法

总结

观察者模式的优点:

  • 满足了开闭原则;无需修改发布者代码就能引入新的订阅者类 (如果是发布者接口则可轻松引入发布者类)
  • 可以在运行时建立对象之间的联系(动态订阅、解除订阅)

责任链、 命令、 中介者和观察者用于处理请求发送者和接收者之间的不同连接方式:

  • 责任链按照顺序将请求动态传递给一系列的潜在接收者, 直至其中一名接收者对请求进行处理
  • 命令在发送者和请求者之间建立单向连接
  • 中介者清除了发送者和请求者之间的直接连接, 强制它们通过一个中介对象进行间接沟通
  • 观察者允许接收者动态地订阅或取消接收请求

参考

观察者设计模式 (refactoringguru.cn)