前言
定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
观察者模式(Observer): 又称发布-订阅模式(Publish-Subscribe:Pub/Sub)。它是一种通知机制,让发送通知的一方(被观察方)和接收通知的一方(观察者)能彼此分离,互不影响。
Java标准库虽然提供了java.util.Observer和java.util.Observable这两个类用于实现观察者模式,但是Java9开始已经废弃java.util.Observer和java.util.Observable这两个类, 实现观察者模式的时候不推荐使用:
此类和Observer接口已被弃用。 Observer和Observable支持的事件模型非常有限,Observable传递的通知顺序未指定,并且状态更改与通知不一一对应。 对于更丰富的事件模型,请考虑使用java.beans包。 为了在线程之间进行可靠且有序的消息传递,请考虑使用java.util.concurrent包中的并发数据结构之一。 有关反应式流样式的编程,请参阅Flow API。
实现 java.bean实现 从java.beans包使用PropertyChangeEvent和PropertyChangeListener(Listeners,类型很多,它们都有回调方法,不需要强制转换)
PropertyChangeSupport
addPropertyChangeListener(PropertyChangeListener listener) 顾名思义,添加对bean的监听。
removePropertyChangeListener(PropertyChangeListener listener) 移除监听。
firePropertyChange(String propertyName, int oldValue, int newValue) 添加对bean内某个变量的监听,第一个参数最好是变量名,第二个是变量改变前的值,第二个是变量改变后的值
PropertyChangeEvent
getPropertyName() 获取发生改变的变量名。
getSource() 获取改变的bean对象
getOldValue() 获取发生改变的变量的旧值。
getNewValue() 获取发生改变的变量的新值
当bean很多的时候特别好用,用propertyChangeEvent.getSource()就能区分是哪个bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class Product { private Integer id; private String name; private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport (this ); public Integer getId () { return id; } public void setId (Integer id) { Integer oldValue = this .id; this .id = id; propertyChangeSupport.firePropertyChange("id" , oldValue, id); } public String getName () { return name; } public void setName (String name) { String oldValue = this .name; this .name = name; propertyChangeSupport.firePropertyChange("name" , oldValue, name); } public PropertyChangeSupport getPropertyChangeSupport () { return propertyChangeSupport; } } public class ObserverTest { public static void main (String[] args) { Product product = new Product (); product.getPropertyChangeSupport().addPropertyChangeListener(evt -> System.out.println("发生了变化: " + evt.getPropertyName() + " 旧值: " + evt.getOldValue() + " 新值: " + evt.getNewValue()) ); product.getPropertyChangeSupport().addPropertyChangeListener("name" , evt -> { System.out.println("name发生了变化: " + evt.getPropertyName() + " 旧值: " + evt.getOldValue() + " 新值: " + evt.getNewValue()); }); product.setId(1 ); product.setName("admin" ); product.setName("user1" ); } }
需要注意的是初次赋值时oldvalue是null,记得判空,否则会导致后续监听失败
Flow实现 Java9提供了java.util.concurrent.Flow(熟悉RxJava库的朋友对于这种用法应该非常熟悉)
Flow是一类在Java中9中引入并具有4个相互关联的接口:
Publisher:发布者,负责发布消息;
Subscriber:订阅者,负责订阅处理消息;
Subscription:订阅控制类,可用于发布者和订阅者之间通信;
Processor:处理者,同时充当Publisher和Subscriber的角色
Flow类还包含defaultBufferSize()静态方法,它返回发布者和订阅者使用的缓冲区的默认大小。 目前,它返回256。
另外还有SubmissionPublisher类是Flow.Publisher接口的实现类。 该类实现了AutoCloseable接口,因此可以使用try-with-resources块来管理其实例。 SubmissionPublisher是Flow.Publisher的实现,她可以灵活的生产数据,同时与Reactive Stream兼容:
1 2 3 SubmissionPublisher() SubmissionPublisher(Executor executor, int maxBufferCapacity) SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 //简单的例子 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); publisher.subscribe(new Flow.Subscriber<String>() { @Override public void onSubscribe(Flow.Subscription subscription) { logger.debug("onSubscribe"); //反向控制获取数据个数 subscription.request(10); } @Override public void onNext(String item) { logger.debug("onNext: " + item); } @Override public void onError(Throwable throwable) { logger.debug("onError: " + throwable); } @Override public void onComplete() { logger.debug("onComplete"); } }); // 发布单个数据 publisher.submit("11111"); //发布多个数据 String[] items = {"1", "x", "2", "x", "3", "x"}; Arrays.stream(items).forEach(publisher::submit); //关闭发布, 关闭publisher,没有该函数则Subscriber.onComplete()不会被调用 publisher.close(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
注意Flow是异步的流处理, 故可以结合线程池一起处理, 有关Flow的文章后续整理下(尽管Flow API允许程序员开始编写响应式程序,但是生态系统仍然需要发展)
自定义实现 当然也可以自己来实现,不过需要注意的是:
如果设计成各个观察者是依次获得的同步通知,如果上一个观察者处理太慢,会导致下一个观察者不能及时获得通知
如果观察者在处理通知的时候,发生了异常,还需要被观察者处理异常,才能保证继续通知下一个观察者
注意实际使用观察者模式需关注背压问题(即消费速度赶不上生产速度)
Spring中使用 如果是使用的Spring框架,推荐直接使用Spring中实现的观察者模式:
自定义需要发布的事件类,需要继承 ApplicationEvent 类或 PayloadApplicationEvent (该类也仅仅是对 ApplicationEvent 的一层封装)
使用 @EventListener 来监听事件或者实现 ApplicationListener 接口。
使用 ApplicationEventPublisher 来发布自定义事件(@Autowired注入即可)
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import org.springframework.context.ApplicationEvent;public class MyApplicationEvent extends ApplicationEvent { public MyApplicationEvent (Object source) { super (source); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.ApplicationListener;public class MyApplicationListener implements ApplicationListener <MyApplicationEvent> { private final Logger logger = LoggerFactory.getLogger(this .getClass()); private String name; public MyApplicationListener (String name) { this .name = name; } @Override public void onApplicationEvent (MyApplicationEvent event) { String source = (String) event.getSource(); logger.info("我是: {}, 收到更新数据为:{}s\n" , this .name, source); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class ObserverConfiguration { @Bean public MyApplicationListener readerListener1 () { return new MyApplicationListener ("张三" ); } @Bean public MyApplicationListener readerListener2 () { return new MyApplicationListener ("李四" ); } @Bean public MyApplicationListener readerListener3 () { return new MyApplicationListener ("王五" ); } }
1 2 3 4 5 6 7 8 @SpringBootTest class SpringObserverTest extends AbstractJUnit4SpringContextTests { @Test void publishEventTest () { applicationContext.publishEvent(new MyApplicationEvent ("Hello World" )); } }
运行测试用例,可以在控制台中看到打印了
1 2 3 4 5 我是: 张三, 收到更新数据为:Hello Worlds 我是: 李四, 收到更新数据为:Hello Worlds 我是: 王五, 收到更新数据为:Hello Worlds
如果业务逻辑中需要发送事件,可以实现ApplicationEventPublisherAware接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Service public class UserService implements ApplicationEventPublisherAware { private Logger logger = LoggerFactory.getLogger(getClass()); private ApplicationEventPublisher applicationEventPublisher; @Override public void setApplicationEventPublisher (ApplicationEventPublisher applicationEventPublisher) { this .applicationEventPublisher = applicationEventPublisher; } public void register (String username) { logger.info("[register][执行用户({}) 的注册逻辑]" , username); applicationEventPublisher.publishEvent(new UserRegisterEvent (this , username)); } }
spring的事件驱动模型使用的是 观察者模式 ,Spring中Observer模式常用的地方是listener的实现。