본문 바로가기

Android/이론

[안드로이드] RxJava Flowable 알아보기 (위키 번역)

728x90
반응형

 

 

 

 

 

Flowable (RxJava Javadoc 2.2.21)

Returns a Flowable that emits the results of a specified combiner function applied to combinations of five items emitted, in sequence, by five other Publishers. zip applies this function in strict sequence, so the first item emitted by the new Publisher wi

reactivex.io

 

Flowable class는 팩토리 메서드와 중간 오퍼레이터, reactive dataflow를 소비할 수 있는 기능을 제공하고, Reactive Stream패턴을 구현한 클래스입니다.

Reactive Stream은  Flowable 이 확장한  Publisher 와 함께 작동합니다.

따라서 오퍼레이터들은 일반  Publisher 들을 직접 수용하고 다른 Reactive Stream과 직접적인 상호작용을 허용합니다.

Flowable은 오퍼레이터를 위한 128개의 기본 버퍼 사이즈를 호스팅합니다.

오퍼레이터는  bufferSize() (시스템 매개변수  rx2.buffer-size 를 통해 gloabal하게 override 될수 있음)를 통해 접근할수 있습니다.

그러나 대부분의 오퍼레이터들은 내부 버퍼 사이즈를 명시적으로 설정할 수 있는 overloads를 가지고 있습니다.

 

 

 

Floawable 은 다음과 같은 프로토콜을 따릅니다.

 onSubscribe    onNext*    (onError | onComplete)? 

 

 Subscriber.onSubscribe(Subscription) 을 통해 소비자에게 제공되는  Subscription  인스턴스를 통해 stream을 폐기할 수 있습니다.

버전 1.x의  Observable.subscribe()  와 달리,  subscribe(Subscriber)  구독의 외부 취소를 허용하지 않으며 필요한 경우   Subscriber  인스턴스가 이러한 기능을 노출해야 합니다.

Flowable은 backpressure를 지원하고  Subscription.request(long) 을 통해  Subscriber 가 요구사항을 신호하도록 요구합니다.

Example:

Disposable d = Flowable.just("Hello world!")
     .delay(1, TimeUnit.SECONDS)
     .subscribeWith(new DisposableSubscriber<String>() {
         @Override public void onStart() {
             System.out.println("Start!");
             request(1);
         }
         @Override public void onNext(String t) {
             System.out.println(t);
             request(1);
         }
         @Override public void onError(Throwable t) {
             t.printStackTrace();
         }
         @Override public void onComplete() {
             System.out.println("Done!");
         }
     });

 Thread.sleep(500);
 // the sequence can now be cancelled via dispose()
 d.dispose();

 

Reactive Stream은  Publisher 와  Subscriber  사이의 상호작용을 정의할 때 상대적으로 엄격하기 때문에, 특정 타이밍요건으로 인해 상당한 성능 저하가 발생하고,  Subscription.request(long) 을 통해 잘못된 요청을 준비해야할 필요가 있습니다.

따라서, RxJava는 완화된 규칙으로 소비자가 구동될 수 있음을 나타내는  FlowableSubscriber  인터페이스를 도입했습니다. 

모든 RxJava 오퍼레이터는 이러한 완화된 규칙을 염두에 두고 구현됩니다.

만약  Subscriber 를 구독하는것이 이 인터페이스를 구현하지 않을 경우(예를 들어, 다른 리액티브 스트림 호환 라이브러리에서 가져온 경우), Flowable은 자동적으로 그 주변에 compliance wrapper를 적용할것입니다. 

Flowable은 추상 클래스이지만, 글자 그대로 따라야할 Reactive Stream 규칙이 많기 때문에 클래스를 직접 확장하여 source 및 custom operator를 구현하지 않는 것이 좋습니다.

이러한 custom 구현이 필요한 경우 wiki를 참조하세요.

 create(FlowableOnSubscribe, BackpressureStrategy)  factory method를 사용하여 custom Flowable을 만드는 권장 방법 :

Flowable<String> source = Flowable.create(new FlowableOnSubscribe<String>() {
     @Override
     public void subscribe(FlowableEmitter<String> emitter) throws Exception {

         // signal an item
         emitter.onNext("Hello");

         // could be some blocking operation
         Thread.sleep(1000);

         // the consumer might have cancelled the flow
         if (emitter.isCancelled() {
             return;
         }

         emitter.onNext("World");

         Thread.sleep(1000);

         // the end-of-sequence has to be signaled, otherwise the
         // consumers may never finish
         emitter.onComplete();
     }
 }, BackpressureStrategy.BUFFER);

 System.out.println("Subscribe!");
 
 source.subscribe(System.out::println);
 
 System.out.println("Done!");

 

 Floawble 과 같은 RxJava reactive source는, 일반적으로 동기이고 순차적입니다. 

ReactiveX 디자인에서, operator가 실행되는 위치(스레드)는 오퍼레이터가 데이터를 작업할수 있는 시간과 직교합니다. 

이 의미는 비동기 및 병렬화가  subscribeOn(Scheduler) ,  observeOn(Scheduler) parallel() 과 같은 연산자를 통해 명시적으로 표현되어야함을 의미합니다. 일반적으로 Scheduler 매개변수를 사용하는 오퍼레이터는 이러한 유형의 비동기화를 flow에 도입하고 있습니다.

728x90
반응형