관리 메뉴

hyeals study

RxJava 사용해보기 본문

안드로이드

RxJava 사용해보기

hyeals 2021. 2. 7. 22:56

지난 포스팅에서 RxJava에 대한 개념을 정리했으니 이번에는 본격적으로 코드를 작성해보면서 공부할 예정이다.

 

일단 RxJava를 위해 알아야 할 것이 5가지 정도 있다.

 

1. Observable: 데이터 소스

지속적으로 변경이 가능한 데이터의 집합이다.

RxJava에서는 Observable을 관찰하는 Observer가 존재하고, Observable이 순차적으로 발행하는 데이터에 대해서 반응한다. 다시말해서 Observer pattern이 구현되어있는 클래스이다.

 

* Observer pattern: 객체의 상태 변화를 관찰하는 관찰자들(옵저버 목록)을 객체에 등록해서 상태 변화가 있을 때 마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 패턴

 

Observable에는 3가지 이벤트가 있다.

1. onNext(): 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행한다.

2. onComplete(): 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달해서 onNext()를 더 호출하지 않음을 나타낸다.

3. onError(): 오류가 발생했음을 Observer에게 전달한다. 만약 이 이벤트가 발생했다면 그 이후에는 onNext와 onComplete이벤트가 발생하지 않고 Observable이 종료한다.

 

이 이벤트들은 Emitter라는 인터페이스에 의해 선언되며, 데이터나 오류 내용을 발행할 때 null은 발행할 수 없다.

Observable생성하는 법

RxJava에서는 연산자(operator)를 이용해서 Observable을 생성할 수 있다.

Observable을 생성하는 함수는 아래와 같다.

 

1. just()

- 인자로 넣은 데이터를 차례대로 발행한다.

- 인자로는 최대 10개를 넣을 수 있다. (이 때 인자들은 타입이 서로 같아야 한다.)

 

import io.reactivex.rxjava3.core.Observable;

public class RxjavaTest {
    public static void main(String[] args){
        Observable<String> observable = Observable.just("Hi(1)", "RxJava(1)", "!!!(1)");
        observable.subscribe(data -> System.out.println(data));
        //observable.subscribe(System.out::println);

        Observable.just("Hi(2)", "RxJava(2)", "!!!(2)").subscribe(System.out::println);
    }
}

[결과]

 

 

2. create()

- just()와 달리 onNext(), onComplete(), onError()같은 이벤트를 개발자가 직접 호출해야한다.

- 등록된 콜백들을 제대로 처리하지 않으면 메모리 누수가 발생하기 때문에 신경써서 사용해야 한다.

 

import io.reactivex.rxjava3.core.Observable;

public class RxjavaTest_create {
    public static void main(String[] args){
        Observable <String> observable = Observable.create(emitter ->{
            emitter.onNext("Hello");
            emitter.onNext("Observable");
            emitter.onNext("Create!");
            emitter.onComplete();
                });

        observable.subscribe(System.out::println);
    }
}

[결과]

 

 

onError를 처리하고 싶은 경우에는 아래와 같이 코드를 작성하면 된다.

 

import io.reactivex.rxjava3.core.Observable;

public class RxjavaTest_create {
    public static void main(String[] args){
        Observable <String> observable = Observable.create(emitter ->{
            emitter.onNext("Hello");
            emitter.onNext("Observable");
            emitter.onError(new Throwable());
            emitter.onNext("Create!");
            emitter.onComplete();
        });

        observable.subscribe(System.out::println, throwable -> System.out.println("Error!"));
    }
}

[결과]

 

 

3. empty()

- 아무런 데이터를 발행하지 않는 Observable을 생성하고 싶을 때 사용하는 연산자이다.


2. 리액티브 연산자(Operators): 데이터 소스를 처리하는 함수

이 리액티브 연산자를 통해서 데이터 소스로부터 전달받은 데이터를 가공해서 최종적인 결과 데이터를 만들어낸다.

 

예) Debounce, Filter, Distinct, ElementAt, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, and TakeLast

 

아래 예시와 같이 필터 Opreator를 이용해서 데이터를 가공할 수 있다.

 

import io.reactivex.rxjava3.core.Observable;

public class RxjavaTest {
    public static void main(String[] args){
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(data -> data > 3)
                  .subscribe(System.out::println);
    }
}

 

[결과]

 


3. 스케쥴러(Scheduler): 스레드 관리자

리액티브 프로그래밍 자체가 비동기 프로그래밍을 위한 기법이기 때문에 스레드를 관리하는 관리자가 반드시 필요하다.

 

아래의 코드는 실행될 스레드를 따로 처리하지 않아서 메인 스레드에서 실행이 됨을 확인할 수 있다.

import io.reactivex.rxjava3.core.Observable;

public class RxjavaTest {
    public static void main(String[] args) throws InterruptedException{
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(data -> data > 3)
                  .subscribe(data -> System.out.println(Thread.currentThread().getName()  
                  + " : " + "Result: " + data));

        Thread.sleep(500);
    }
}

[결과]

 

 

하지만 스케쥴러를 사용하면 아래와 같이 다른 스레드에서 실행할 수 있도록 관리를 해줄 수 있다.

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class RxjavaTest {
    public static void main(String[] args) throws InterruptedException{
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(data -> data > 3)
                  .subscribeOn(Schedulers.io())
                  .subscribe(data -> System.out.println(Thread.currentThread().getName() 
                  + " : " + "Result: " + data));

        Thread.sleep(500);
    }
}

[결과]

 

 

[+]

 

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class RxjavaTest {
    public static void main(String[] args) throws InterruptedException{
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.doOnNext(data -> System.out.println(Thread.currentThread().getName() 
        					+ " : " + "doOnNext: " + data))
                  .subscribeOn(Schedulers.io())
                  .observeOn(Schedulers.computation())
                  .filter(data -> data > 3)
                  .subscribe(data -> System.out.println(Thread.currentThread().getName()  
                  			+ " : " + "Result: " + data));

        Thread.sleep(500);
    }
}

[결과]

 

* subscribeOn함수는 데이터의 발행 및 흐름을 관리하는 스레드를 정의한다.

* observeOn함수는 데이터의 가공 및 처리를 관리하는 스레드를 정의한다.


4. Subscriber: Observable이 발행하는 데이터를 구독하는 구독자

구독자가 구독하지 않으면 Observable이 발행한 데이터를 전달받을 수 없다.

 

따라서 subscribe함수를 호출해야 실제 데이터가 발행된다.


5. 함수형 프로그래밍: Rxjava에서 제공하는 연산자(Operator)함수를 사용

전통적인 스레드 기반의 프로그래밍은 여러 스레드가 동시에 실행이 됐었다. 때문에 개발자가 예상하지 못한 문제가 발생하고 그 문제를 재현하는 것조차 어려운 상황이 발생했다. 리액티브 프로그래밍에서는 이러한 문제를 해결하기 위해서 부수 효과가 없는 순수함수를 지향하는 함수형 프로그래밍 기법을 도입했고, 함수형 프로그래밍 기법으로 작성된 함수가 바로 리액티브 연산자 함수이다.

 

 

blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html

 

[Android] RxJava Observable 옵저버블

지난 포스트 [Android] RxJava 시작하기에서는 반응형 프로그래밍에 대한 개념을 설명하고 명령형 프로그램과의 차이를 서술했다. 이번 포스트에서는 Observable이 어떻게 동작하는지 알아보자. 그리

blog.yena.io

medium.com/@sunminlee89/rxjava-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-1-99ddae13c1c7

 

RxJava 프로그래밍 1

RxJava 스터디에서 사용될 교재 ‘RxJava 프로그래밍’을 공부하며 정리할 내용을 포스팅할 예정이다.

medium.com

reactivex.io/

 

ReactiveX

CROSS-PLATFORM Available for idiomatic Java, Scala, C#, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others

reactivex.io

 

다음 포스팅에서는 RxJava를 안드로이드에 적용해서 공부할 계획이다!! 

Comments