RxJava fromAction()과 andThen() 으로 구현하는 순차적 실행 코드
오늘은 RxJava의 연산자인 Completable.fromAction()과 andThen() 연산자를 이용해서,
순차적으로 실행이 보장되는 코드를 구현하는 방법을 정리하였습니다.
1. Completable.fromAction()
1-1. Completable.fromAction()
Completable.fromAction() 메서드는,
주어진 Action을 실행하고,
그 결과를 Completable 형태로 반환합니다.
본격적으로 이해하기 전에 이 메소드의 특징을 간단히 알아보겠습니다.
- Completable이 구독될 때까지 실행되지 않음.
- 각 구독마다 Action이 새로 실행됨
- 기본적으로 구독이 발생한 스레드에서 실행됨 ("subscribe()"를 호출한 스레드에서 실행)
- 하지만 subscribeOn()을 통해 실행 스레드 변경가능
1-2. Action과 Completable
Completable.fromAction()을 이해하기 전에,
여기에 사용된 용어먼저 이해해 보겠습니다.
Action과 Completable은 무엇을 의미할까요?
먼저 Action은,
매개변수를 받지 않고 반환값도 없는(void) 작업을 의미합니다.
그래서 Action 인터페이스는 run() 메소드 하나만 가지고 있습니다
Completable.fromAction(() -> {
System.out.println("Performing some action");
})
두번째로 Completable은,
onComplete 또는 onError 이벤트만 발생시켜주는,
특별한 유형의 Observable 을 말합니다.
Completable.fromAction()을 실행하면,
성공시에는 onComplete 신호를 발생시키고요.
예외가 발생하면 onError 신호를 발생시킵니다.
1-3. 순차적 실행
Completable.fromAction() 내부의 코드는,
동기적으로 위에서 아래로,
순차적으로 실행됩니다.
그런데 아래와 같은 경우는 주의해야 합니다.
비동기 작업이 시작되고,
그것이 끝나기 전에 2번째 줄이 시작될 수 있기 때문입니다.
Completable.fromAction(() -> {
asyncOperation1(); // 비동기 작업 시작
asyncOperation2(); // 첫 번째 작업이 끝나기 전에 시작될 수 있음
})
바로 이런 것까지 순차적으로 실행시키고자 할때,
아래에서 다룰 andThen()을 사용해주면 됩니다.
1-4. 에러 처리의 용이성
Completable.fromAction으로 래핑한 코드는,
예외가 발생할 경우,
Completable의 onError 신호로 전환되고,
Completable의 구독자에게 전달되게 됩니다.
Completable.fromAction(() -> {
throw new IOException("Error occurred");
})
.doOnError(e -> System.out.println("에러: " + e.getMessage()))
.onErrorComplete(e -> e instanceof IOException)
.subscribe(
() -> System.out.println("Completed"),
error -> System.err.println("에러: " + error.getMessage())
);
참고로 위에서 사용한 다음 메소드들을 알아두면,
좀 더 유연하게 다양한 에러처리가 가능해 집니다.
- doOnError(): 에러 발생 시 부수 효과를 실행가능
- onErrorComplete(): 특정 조건의 에러를 무시하고 완료 신호로 변환가능
1-5. Completable.fromAction() 안에 Single을 return하는 코드를 넣는 경우 주의
Database를 처리하는 Room의 dao코드를 실행하다 보면,
Single을 리턴하는 경우가 많이 있습니다.
그런데, Single을 반환하는 것은,
그 작업의 시작을 의미할 뿐 완료를 의미하는 것이 아닙니다.
따라서 Single을 리턴하는 코드가 fromAction()안에 들어갈 경우,
Single의 실행을 기다리지 않게 되어버립니다.
이 글의 핵심인 순차적 실행이 무너져 버릴 수 있다는 의미입니다.
게다가 Completable은 완료 또는 에러 신호만을 다루므로,
Single의 결과 값도 사용되지 않습니다.
따라서 순차적으로 진행한다고,
아무 코드나 마구 넣어서는 않되구요.
특히 Single을 리턴해주는 비동기 코드라면,
반드시 이 안에 넣어서는 안됩니다.
그럼 어떻게 하냐고요?
아래에서 볼 andThen()함수안에 넣어서 사용해 주어야 합니다.
참고로, Single을 Completable로 변환해서 사용할수도 있는데요.
아래와 같이, Single을 리턴하는 코드에, ignoreElement() 메소드를 사용해주면 됩니다.
Single<String> single = Single.just("Hello");
Completable completable = single.ignoreElement();
2. andThen()
너무 오래 기다렸네요.
andThen()을 알아볼까요?
andThen()은 비동기 코드의 순차적인 작업 실행을 위해 사용되는 중요한 연산자입니다.
Completable, Single, Maybe, Observable 등,
다양한 리액티브 타입에 대해 순차적 실행을 지원해주는 메소드입니다.
위에서 보았던 케이스인,
RxJava타입이 아닌 2개의 비동기 코드를 순차적으로 실행한다면,
다음과 같이 구현이 가능합니다.
Completable.fromAction(() -> asyncOperation1())
.andThen(Completable.fromAction(() -> asyncOperation2()))
andThen()은 복잡한 비동기 작업 흐름을 구성할 때 매우 유용합니다.
작업의 순서가 보이기도 하고,
실제로 이를 정확하게 제어할 수 있게 해주기 때문입니다.
조금 더 복잡한 코드를 보면서 이해해 보겠습니다.
아래 코드는 다음의 흐름에 따라서 작성된 코드인데요.
- 데이터베이스에 데이터 삽입
- 로그 파일에 기록 쓰기
- API를 통해 알림 보내기
- 결과 처리 또는 오류 로깅
먼저 데이터 베이스에 insert작업이 완료가 되고 나면,
그 다음 순서로 로그를 기록하고,
API를 통해서 노티피케이션(알림)을 보냅니다.
그리고 나서 결과를 가지고 무언가를 하거나,
에러가 나면 이를 로깅합니다.
Completable.fromAction(() -> {
database.insertData(data);
})
.andThen(Completable.fromAction(() -> {
fileSystem.writeLog(logEntry);
}).ignoreElements())
.andThen(Observable.fromCallable(() -> {
return api.sendNotification();
}))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> doSomethingWithResult(result),
error -> Log.e("Process", "Error occurred", error)
);
DB나 fileIO같은 비동기 코드들이 순차적으로 동작하도록 하기위해,
Completable.fromAction()과 andThen()함수를 이용하였습니다.