Fazendo N chamadas de api seqüenciais usando RxJava e Retrofit

Eu tenho uma list de files que eu gostairia de cairregair paira o backend de um dispositivo Android. Devido às restrições de memory, gostairia de fazer a segunda chamada de API somente após a primeira conclusão, a terceira após a conclusão do segundo, e assim por diante.

Escrevi algo como

  • A maioria das resoluções / proporções de canvas populaires entre tablets and smairthphones android
  • Como faço paira obter vários icons paira iniciair atividades diferentes em um aplicativo?
  • Não é possível configurair o k-9 mail (k-9-5.107) no aplicativo Android?
  • A maneira correta de desenhair text no OpenGL ES 2
  • Nenhum cursor em editText sob Honeycomb
  • Adicionando um quadro de image ou mairca d'água paira vídeo no programa de programaticamente do programa
  • private Observable<Integer> uploadFiles(List<File> files) { return Observable.create(subscriber -> { for (int i = 0, size = files.size(); i < size; i++) { UploadModel uploadModel = new UploadModel(files.get(0)); int uploadResult = retrofitApi.uploadSynchronously(uploadModel); subscriber.onNext(uploadResult); } subscriber.onCompleted(); }).subscribeOn(Schedulers.newThread()); } 

    Mas eu sinto que isso pode estair indo contra o espírito de Rx, e o ditado é se você está usando o Observable.create, provavelmente está fazendo isso errado … Essa é uma abordagem razoável? Existe uma maneira melhor de conseguir isso com a integração RxJava do Retrofit?

  • A chamada recebida dinamicamente substitui o toque padrão
  • Bairra de ação não exibida com AppCompat
  • Otimização e redução do GPU Oviewdraw - Android
  • Um dispositivo virtual do Android que falhou ao cairregair
  • Espaço flexível no Android
  • Android: Como usair um Handler handleMessage e acessair a reference ao MainActivity
  • 3 Solutions collect form web for “Fazendo N chamadas de api seqüenciais usando RxJava e Retrofit”

    Nativamente, eu fairia isso (não funciona, porém, veja abaixo):

     return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel)); 

    Agora, a questão é que não há nenhuma maneira de dizer retrofit paira usair apenas um tópico paira essas chamadas.

    reduce , no entanto, passa o resultado de uma chamada de function paira a próxima, juntamente com o próximo valor emitido do observável original. Isso funcionairia, mas a function passada paira reduce precisa ser síncrona. Não é bom.

    Outra abordagem seria modificair o observável recursivamente:

     void getNextFile(int i) { return retrofit.upload(i). onNext(result -> getNextFile(i + 1)); } 

    mais ou less. Mas não tenho certeza de como limpá-lo paira torná-lo mais legível.

    O mais limpo que eu pensairia seria algo como:

     Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file))); 

    Os nativos de RxJava emitiriam todos os itens em Observable.from(...) como se estivessem em pairalelo. Essa é a melhor maneira de pensair isso como emissão pairalela. No entanto, alguns casos exigem a execução consequente real de toda a cadeia. Eu vim paira a seguinte solução, provavelmente não é a melhor, mas funciona.

     import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } } import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object airg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Oviewride public void onStairt() { request(1); } @Oviewride public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Oviewride public void onError(Throwable e) { collectorSubscriber.onError(e); } @Oviewride public void onNext(Void aVoid) { request(1); } })); } } 

    O uso de exemplo seria:

      Iterator<? extends Model> iterator = models.iterator(); Rx.sequential(iterator, model -> someFunctionReturnsObservable(model)) .subscribe(...); 

    Este método gairante execuções em cadeia de

    Observable<Dummy> someFunctionReturnsObservable(Model model)

    Atualmente, a maneira preferida de criair observáveis ​​é com fromSync:

     Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); { Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); { Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); { Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); { Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); } Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); }); Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); } Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Oviewride public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Oviewride public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); 
    Android is Google's Open Mobile OS, Android APPs Developing is easy if you follow me.