Olá a todos, meu nome é Ivan, sou desenvolvedor Android. Hoje quero compartilhar minha experiência com o RxJava2 e contar como ocorre a inicialização da cadeia. Por que decidi trazer isso à tona? Depois de conversar com outros desenvolvedores, percebi que nem todo mundo que usa essa ferramenta entende como ela funciona. E então decidi descobrir como as assinaturas são organizadas no RxJava2 e em que sequência todo o trabalho é inicializado. Não encontrei um único artigo explicando isso. Diante disso, fui até o código-fonte para ver como tudo funciona e esbocei para mim uma pequena folha de dicas, que se transformou neste artigo.
Neste artigo, não vou descrever o que é Observable
, Observer
e todas as outras entidades que são usadas no RxJava2. Se você decidir ler este artigo, presumo que já esteja familiarizado com essas informações. E se você ainda não está familiarizado com esses conceitos, recomendo que se familiarize com eles antes de ler.
Veja como começar:
Explorando RxJava 2 para Android
Vamos ver como funciona a cadeia mais simples:
Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
Em cima
Em primeiro lugar, descreverei brevemente cada etapa que percorremos nesta cadeia (as etapas começam de cima para baixo):
Um objeto é criado na declaração justa
ObservableFromArray
.
Um objeto é criado na instrução map
ObservableMap
, que leva no construtor uma referência ao objeto criado anteriormente na instrução just.
filter
ObservableFilter
, map, just.
Observable
’Observable
subscribe()
(ObservableFilter
filter)Observer
, .
ObservableFilter.subscribe()
ObservableFilter.subscribeActual()
,Observer
, filter,FilterObserver
.Observer
Observer
ObservableFilter.subscribe()
.
ObservableMap.subscribe()
ObservableMap.subscribeActual()
Observer,
map,MapObserver
,FilterObserver
.
ObservableFromArray.subscribe()
ObservableFromArray.subscribeActual()
,onSubscribe()
ObservableFromArray.subscribeActual()
Observer
’.
onSubscribe()
Observer
’ .
ObservableFromArray
onNext()
Observer
’.
, just()
null, fromArray(),
Observable
.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
ObjectHelper.requireNonNull(item1, "item1 is null");
ObjectHelper.requireNonNull(item2, "item2 is null");
ObjectHelper.requireNonNull(item3, "item3 is null");
ObjectHelper.requireNonNull(item4, "item4 is null");
ObjectHelper.requireNonNull(item5, "item5 is null");
return fromArray(item1, item2, item3, item4, item5);
}
fromArray()
, .
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
}
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
ObservableFromArray
, .
onAssembly()
, - Observable
, , .
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onAssembly()
Observable
- , :
RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) {
return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
}
return o;
});
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
map()
. , . null, ObservableMap
.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
, ObservableMap
mapper, , this (source). this ObservableFromArray
. ObservableMap
AbstractObservableWithUpstream
, source.
AbstractObservableWithUpstream
, Observable
.
onAssembly()
Observable
.
filter()
. , , ObservableFilter
this ObservableMap
( ObservableFromArray
, ) .
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
subscribe()
, . onNext()
. subscribe()
ObservableFilter
, Observable
.
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
null, LambdaObserver
.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
, .
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
subscribeActual()
LambdaObserver
. subscribeActual()
ObservableFilter
. .
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
FilterObserver
, LambdaObserver
, ObservableFilter
.
FilterObserver
BasicFuseableObserver
, onSubscribe()
. BasicFuseableObserver
, Observer
’. , 6 , FilterObserver
MapObserver
. BasicFuseableObserver.onSubscribe()
onSubscribe()
Observer
’, . :
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
, ObservableFilter
FilterObserver
, source.subscribe()
. , source ObservableMap
, . ObservableMap
subscribe()
.
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) {
......
}
}
, subscribe()
subscribeActual()
, ObservableMap
. subscribeActual()
MapObserver
FilterObserver
mapper
’.
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
Observer
’ BasicFuseableObserver
, onSubscribe()
, Observer
, onSubscribe()
.
subscribeActual()
run()
, Observer
’.
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
onNext()
Observer
’, onComplete()
onError()
, .
Observable
’ callback’ Observer
’, .
onSubscribe()
, doOnSubscribe()
.
3 :
Observable
Observer
Portanto, ao usar operadores, deve-se ter em mente que cada operador aloca memória para vários objetos e não se deve adicionar operadores à cadeia, apenas porque é “possível”.
RxJava é uma ferramenta poderosa, mas você precisa entender como funciona e para que usar. Se você só precisa executar uma solicitação de rede em um thread de segundo plano e depois executar o resultado no thread principal, é como “atirar em pardais com um canhão”, você pode ser pego, mas as consequências podem ser sérias.