Flowable class는 팩토리 메서드와 중간 오퍼레이터, reactive dataflow를 소비할 수 있는 기능을 제공하고, Reactive Stream패턴을 구현한 클래스입니다.
Reactive Stream은 Flowable 이 확장한 Publisher 와 함께 작동합니다.
따라서 오퍼레이터들은 일반 Publisher 들을 직접 수용하고 다른 Reactive Stream과 직접적인 상호작용을 허용합니다.
Flowable은 오퍼레이터를 위한 128개의 기본 버퍼 사이즈를 호스팅합니다.
오퍼레이터는 bufferSize() (시스템 매개변수 rx2.buffer-size 를 통해 gloabal하게 override 될수 있음)를 통해 접근할수 있습니다.
그러나 대부분의 오퍼레이터들은 내부 버퍼 사이즈를 명시적으로 설정할 수 있는 overloads를 가지고 있습니다.
Floawable 은 다음과 같은 프로토콜을 따릅니다.
onSubscribe onNext* (onError | onComplete)?
Subscriber.onSubscribe(Subscription) 을 통해 소비자에게 제공되는 Subscription 인스턴스를 통해 stream을 폐기할 수 있습니다.
버전 1.x의 Observable.subscribe() 와 달리, subscribe(Subscriber) 는 구독의 외부 취소를 허용하지 않으며 필요한 경우 Subscriber 인스턴스가 이러한 기능을 노출해야 합니다.
Flowable은 backpressure를 지원하고 Subscription.request(long) 을 통해 Subscriber 가 요구사항을 신호하도록 요구합니다.
Example:
Disposable d = Flowable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableSubscriber<String>() {
@Override public void onStart() {
System.out.println("Start!");
request(1);
}
@Override public void onNext(String t) {
System.out.println(t);
request(1);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence can now be cancelled via dispose()
d.dispose();
Reactive Stream은 Publisher 와 Subscriber 사이의 상호작용을 정의할 때 상대적으로 엄격하기 때문에, 특정 타이밍요건으로 인해 상당한 성능 저하가 발생하고, Subscription.request(long) 을 통해 잘못된 요청을 준비해야할 필요가 있습니다.
따라서, RxJava는 완화된 규칙으로 소비자가 구동될 수 있음을 나타내는 FlowableSubscriber 인터페이스를 도입했습니다.
모든 RxJava 오퍼레이터는 이러한 완화된 규칙을 염두에 두고 구현됩니다.
만약 Subscriber 를 구독하는것이 이 인터페이스를 구현하지 않을 경우(예를 들어, 다른 리액티브 스트림 호환 라이브러리에서 가져온 경우), Flowable은 자동적으로 그 주변에 compliance wrapper를 적용할것입니다.
Flowable은 추상 클래스이지만, 글자 그대로 따라야할 Reactive Stream 규칙이 많기 때문에 클래스를 직접 확장하여 source 및 custom operator를 구현하지 않는 것이 좋습니다.
이러한 custom 구현이 필요한 경우 wiki를 참조하세요.
create(FlowableOnSubscribe, BackpressureStrategy) factory method를 사용하여 custom Flowable을 만드는 권장 방법 :
Flowable<String> source = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// signal an item
emitter.onNext("Hello");
// could be some blocking operation
Thread.sleep(1000);
// the consumer might have cancelled the flow
if (emitter.isCancelled() {
return;
}
emitter.onNext("World");
Thread.sleep(1000);
// the end-of-sequence has to be signaled, otherwise the
// consumers may never finish
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
System.out.println("Subscribe!");
source.subscribe(System.out::println);
System.out.println("Done!");
Floawble 과 같은 RxJava reactive source는, 일반적으로 동기이고 순차적입니다.
ReactiveX 디자인에서, operator가 실행되는 위치(스레드)는 오퍼레이터가 데이터를 작업할수 있는 시간과 직교합니다.
이 의미는 비동기 및 병렬화가 subscribeOn(Scheduler) , observeOn(Scheduler) , parallel() 과 같은 연산자를 통해 명시적으로 표현되어야함을 의미합니다. 일반적으로 Scheduler 매개변수를 사용하는 오퍼레이터는 이러한 유형의 비동기화를 flow에 도입하고 있습니다.
'Android > 이론' 카테고리의 다른 글
[Java/kotlin] Hex String의 Int 변환 (0) | 2021.03.31 |
---|---|
[안드로이드] Koin으로 주입된 viewModel 쉽게 Unit 테스트 하기 (0) | 2021.03.19 |
[안드로이드] Cold flows, Hot channels(번역) (0) | 2021.03.17 |
[안드로이드] Fragment 안의 Fragment 처리(교체, 백스택, Back 버튼 클릭 등) ― parentFragmentManager vs childFragmentManager (1) | 2021.03.02 |