21. RxJava 与 Spring MVC
Spring Cloud Netflix 包含RxJava。
RxJava 是Reactive Extensions的 Java VM implementation:一个 library,用于通过使用 observable sequences 来编写异步和 event-based 程序。
Spring Cloud Netflix 支持从 Spring MVC 控制器返回rx.Single
objects。它还支持将rx.Observable
objects 用于Server-sent events(SSE)。如果您的内部 API 已经使用 RxJava 构建(例如,请参见第 17.4 节,“ Feign Hystrix 支持”),这可能非常方便。
以下是使用rx.Single
的一些示例:
@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
return Single.just("single value");
}
@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse")
public ResponseEntity<Single<String>> singleWithResponse() {
return new ResponseEntity<>(Single.just("single value"),
HttpStatus.NOT_FOUND);
}
@RequestMapping(method = RequestMethod.GET, value = "/singleCreatedWithResponse")
public Single<ResponseEntity<String>> singleOuterWithResponse() {
return Single.just(new ResponseEntity<>("single value", HttpStatus.CREATED));
}
@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
return Single.error(new RuntimeException("Unexpected"));
}
如果您有Observable
而不是单个,则可以使用.toSingle()
或.toList().toSingle()
。这里有些例子:
@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
return Observable.just("single value").toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/multiple")
public Single<List<String>> multiple() {
return Observable.just("multiple", "values").toList().toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable")
public ResponseEntity<Single<String>> responseWithObservable() {
Observable<String> observable = Observable.just("single value");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(APPLICATION_JSON_UTF8);
return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED);
}
@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return "single value";
}
});
}
如果您有流端点和 client,则 SSE 可以是一个选项。要将rx.Observable
转换为 Spring SseEmitter
,请使用RxResponse.sse()
。这里有些例子:
@RequestMapping(method = RequestMethod.GET, value = "/sse")
public SseEmitter single() {
return RxResponse.sse(Observable.just("single value"));
}
@RequestMapping(method = RequestMethod.GET, value = "/messages")
public SseEmitter messages() {
return RxResponse.sse(Observable.just("message 1", "message 2", "message 3"));
}
@RequestMapping(method = RequestMethod.GET, value = "/events")
public SseEmitter event() {
return RxResponse.sse(APPLICATION_JSON_UTF8,
Observable.just(new EventDto("Spring io", getDate(2016, 5, 19)),
new EventDto("SpringOnePlatform", getDate(2016, 8, 1))));
}