Opérateurs de programmation réactifs dans RxJava 2

Si votre application Android doit accumuler ces critiques cinq étoiles sur Google Play, elle doit pouvoir effectuer plusieurs tâches à la fois..

Au minimum, les utilisateurs mobiles d'aujourd'hui s'attendent à pouvoir encore interagir avec votre application pendant qu'elle travaille en arrière-plan. Cela peut sembler simple, mais Android est mono-thread par défaut. Si vous voulez répondre aux attentes de votre public, vous devrez tôt ou tard créer des threads supplémentaires..

Dans le précédent article de cette série, nous avons présenté RxJava, une bibliothèque réactive pour la machine virtuelle qui peut vous aider à créer des applications Android qui réagissent aux données et aux événements lorsqu'ils se produisent. Mais vous pouvez également utiliser cette bibliothèque pour réagir aux données et aux événements. simultanément.

Dans cet article, je vais vous montrer comment utiliser les opérateurs de RxJava pour enfin faire de la concurrence sur Android une expérience sans douleur. À la fin de cet article, vous saurez comment utiliser les opérateurs RxJava pour créer des threads supplémentaires, spécifier le travail à effectuer sur ces threads, puis publier les résultats sur le thread principal de l'interface utilisateur d'Android avec un simple quelques lignes de code.

Et, comme aucune technologie n'est parfaite, je vais également vous parler d'un piège potentiel majeur consistant à ajouter la bibliothèque RxJava à vos projets, avant de vous montrer comment utiliser des opérateurs pour résoudre ce problème. jamais se produit dans vos propres projets Android.

Présentation des opérateurs

RxJava possède une énorme collection d'opérateurs principalement destinés à vous aider à modifier, filtrer, fusionner et transformer les données émises par votre ordinateur. Observables. Vous trouverez la liste complète des opérateurs RxJava dans la documentation officielle, et bien que personne ne s’attende à ce que vous mémorisiez chaque opérateur, cela vaut la peine de passer un peu de temps à parcourir cette liste pour vous donner une idée générale des différentes transformations de données que vous pouvez effectuer..

La liste des opérateurs de RxJava est déjà assez exhaustive, mais si vous ne trouvez pas l'opérateur idéal pour la transformation de données que vous aviez à l'esprit, vous pouvez toujours chaîner plusieurs opérateurs ensemble. Appliquer un opérateur à un Observable renvoie généralement un autre Observable, vous pouvez donc continuer à appliquer les opérateurs jusqu'à ce que vous obteniez les résultats souhaités.

Il y a beaucoup trop d'opérateurs RxJava à couvrir dans un seul article, et la documentation officielle de RxJava fait déjà un bon travail en introduisant tous les opérateurs que vous pouvez utiliser pour les transformations de données. Je vais donc me concentrer sur deux opérateurs disposant du plus de potentiel pour vous faciliter la vie en tant que développeur Android: subscribeOn () et observerOn ()

Multithreading avec des opérateurs RxJava

Si votre application doit offrir la meilleure expérience utilisateur possible, elle doit être en mesure d'exécuter des tâches intensives ou de longue durée et d'effectuer plusieurs tâches simultanément, sans bloquer le fil de l'interface utilisateur principal d'Android..

Par exemple, imaginez que votre application doit extraire des informations de deux bases de données différentes. Si vous effectuez ces deux tâches l'une après l'autre sur le fil principal d'Android, non seulement cela prendra beaucoup de temps, mais l'interface utilisateur ne répondra pas jusqu'à ce que votre application ait fini de récupérer chacune des informations des deux bases de données. . Pas vraiment une bonne expérience utilisateur!

Une solution bien meilleure consiste à créer deux threads supplémentaires où vous pouvez effectuer ces deux tâches simultanément sans que celles-ci ne bloquent le thread principal de l'interface utilisateur. Cette approche signifie que le travail sera accompli deux fois plus vite, et l'utilisateur pourra continuer à interagir avec l'interface utilisateur de votre application tout au long. Potentiellement, vos utilisateurs peuvent même ne pas savoir que votre application effectue un travail intensif et de longue durée en arrière-plan. Toutes les informations de la base de données apparaîtront simplement dans l'interface utilisateur de votre application, comme par magie.!

Par défaut, Android fournit quelques outils que vous pouvez utiliser pour créer des threads supplémentaires, notamment Un servicele sable IntentServices, mais ces solutions sont difficiles à mettre en œuvre et peuvent rapidement aboutir à un code complexe et détaillé. De plus, si vous n'implémentez pas correctement le multithreading, vous risquez de vous retrouver avec une application qui perd de la mémoire et génère toutes sortes d'erreurs..

Pour que le multithreading sur Android provoque encore plus de maux de tête, le principal thread d'interface utilisateur d'Android est le seul qui puisse mettre à jour l'interface utilisateur de votre application. Si vous souhaitez mettre à jour l'interface utilisateur de votre application avec le résultat du travail effectué sur tout autre fil, alors vous aurez généralement besoin de créer un Gestionnaire sur le fil principal de l'interface utilisateur, puis utilisez cette Gestionnaire pour transférer des données de votre fil d’arrière-plan au fil principal. Cela signifie plus de code, plus de complexité et plus d'opportunités pour que des erreurs se glissent dans votre projet.

Mais RxJava comporte deux opérateurs qui peuvent vous aider à éviter une grande partie de cette complexité et de ce potentiel d'erreur..

Notez que vous utilisez ces opérateurs avec Planificateurs, qui sont essentiellement des composants qui vous permettent de spécifier les fils. Pour l'instant, il suffit de penser à planificateur comme étant synonyme du mot fil.

  • subscribeOn (planificateur): Par défaut, un Observable émet ses données sur le fil où l’abonnement a été déclaré, c’est-à-dire où vous avez appelé le .souscrire méthode. Sous Android, il s’agit généralement du fil principal de l’interface utilisateur. Vous pouvez utiliser le subscribeOn () opérateur pour définir un autre Planificateur où le Observable devrait exécuter et émettre ses données.
  • observeOn (planificateur): Vous pouvez utiliser cet opérateur pour rediriger votre Observableles émissions de à un autre Planificateur, changer efficacement le fil où le ObservableLes notifications de sont envoyées et, par extension, le thread sur lequel ses données sont utilisées.

RxJava est livré avec un certain nombre de planificateurs que vous pouvez utiliser pour créer différents threads, notamment:

  • Schedulers.io (): Conçu pour être utilisé pour des tâches liées aux entrées / sorties. 
  • Schedulers.computation (): Conçu pour être utilisé pour des tâches de calcul. Par défaut, le nombre de threads dans le planificateur de calcul est limité au nombre de processeurs disponibles sur votre périphérique..
  • Schedulers.newThread (): Crée un nouveau fil.

Maintenant, vous avez un aperçu de toutes les pièces mobiles, regardons quelques exemples de la façon dont subscribeOn () et observerOn () sont utilisés et voir certains ordonnanceurs en action.

subscribeOn ()

Dans Android, vous utiliserez généralement subscribeOn () et un accompagnant Planificateur pour changer le thread sur lequel un travail long ou intensif est effectué, il n'y a donc aucun risque de blocage du thread principal de l'interface utilisateur. Par exemple, vous pouvez décider d’importer une grande quantité de données sur le io () planificateur ou effectuer des calculs sur la calcul() planificateur.

Dans le code suivant, nous créons un nouveau thread où le Observable va exécuter ses opérations et émettre les valeurs 1, 2, et 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Bien que ce soit tout ce dont vous avez besoin pour créer un thread et commencer à émettre des données sur ce thread, vous voudrez peut-être avoir la confirmation que cet observable fonctionne réellement sur un nouveau thread. Une méthode consiste à imprimer le nom du fil que votre application utilise actuellement, dans Android Studio.Moniteur Logcat.

Dans le post précédent, Get Started With RxJava, nous avons créé une application qui envoie des messages à Logcat Monitor à différentes étapes du cycle de vie de l'Observable, afin de pouvoir réutiliser une grande partie de ce code..

Ouvrez le projet que vous avez créé dans cet article et modifiez votre code pour qu'il utilise ce qui précède. Observable comme source Observable. Puis ajoutez le subscribeOn () Opérateur et spécifiez que les messages envoyés à Logcat doivent inclure le nom du thread actuel.

Votre projet fini devrait ressembler à ceci:

importer android.support.v7.app.AppCompatActivity; importer android.os.Bundle; importer android.util.Log; importer io.reactivex.Observable; importer io.reactivex.Observer; importer io.reactivex.disposables.Disposable; importer io.reactivex.schedulers.Schedulers; classe publique MainActivity étend AppCompatActivity public statique final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Observateur Observateur = nouvel observateur() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Override public void onNext (valeur entière) Log.e (TAG, "onNext:" + valeur + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: tout est fait!" + Thread.currentThread (). GetName ()); ; 

Assurez-vous que Logcat Monitor d’Android Studio est ouvert (en sélectionnant le Moniteur Android onglet, suivi de Logcat), puis exécutez votre projet sur un périphérique Android physique ou sur un AVD. Vous devriez voir la sortie suivante dans le moniteur Logcat:

Ici, vous pouvez voir que .souscrire est appelé sur le thread principal de l'interface utilisateur, mais l'observable fonctionne sur un thread complètement différent.

le subscribeOn () l’opérateur aura le même effet peu importe où vous le placerez dans la chaîne observable; cependant, vous ne pouvez pas utiliser plusieurs subscribeOn () opérateurs de la même chaîne. Si vous incluez plus d'un subscribeOn (), alors votre chaîne va seulement Utilisez le subscribeOn () c'est le plus proche de la source observable.

observerOn ()

contrairement à subscribeOn (), où vous placez observerOn () dans ta chaine Est-ce que la matière, car cet opérateur ne change que le fil qui est utilisé par les observables qui apparaissent en aval

Par exemple, si vous avez inséré ce qui suit dans votre chaîne, chaque observable qui apparaît dans la chaîne à partir de ce moment utilisera le nouveau fil..

.observeOn (Schedulers.newThread ())

Cette chaîne continuera à fonctionner sur le nouveau thread jusqu'à ce qu'elle rencontre un autre observerOn () opérateur, à quel point il passera au thread spécifié par cet opérateur. Vous pouvez contrôler le fil où des observables spécifiques envoient leurs notifications en insérant plusieurs observerOn () opérateurs dans votre chaîne.

Lors du développement d'applications Android, vous utiliserez généralement observerOn () envoyer le résultat du travail effectué sur les threads d'arrière-plan au thread principal de l'interface utilisateur d'Android. Le moyen le plus simple de rediriger les émissions vers le fil principal de l'interface utilisateur d'Android est d'utiliser le AndroidSchedulers.mainThread Scheduler, qui est inclus dans la bibliothèque RxAndroid plutôt que dans la bibliothèque RxJava. 

La bibliothèque RxAndroid inclut des liaisons spécifiques à Android pour RxJava 2, ce qui en fait une ressource supplémentaire précieuse pour les développeurs Android (et nous y reviendrons plus en détail dans le prochain article de cette série)..

Pour ajouter RxAndroid à votre projet, ouvrez votre module build.gradle fichier et ajoutez la dernière version de la bibliothèque à la section des dépendances. Au moment de la rédaction, la dernière version de RxAndroid était la version 2.0.1, j'ajoute donc ce qui suit:

dépendances … compile 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Après avoir ajouté cette bibliothèque à votre projet, vous pouvez spécifier que les résultats d'un observable doivent être envoyés au principal thread d'interface utilisateur de votre application, à l'aide d'une seule ligne de code:

.observeOn (AndroidSchedulers.mainThread ())

Considérant que la communication avec le fil principal de l'interface utilisateur de votre application occupe une page complète de la documentation Android officielle, il s'agit d'une amélioration considérable qui pourrait vous faire gagner beaucoup de temps lors de la création d'applications Android multithreads..

Inconvénient majeur de RxJava

Alors que RxJava a beaucoup à offrir aux développeurs Android, aucune technologie n'est parfaite, et RxJava présente un piège majeur qui risque de bloquer votre application..

Par défaut, RxJava utilise un workflow basé sur le push: les données sont produites en amont par un Observable, et est ensuite poussé en aval à la assignée Observateur. Le problème majeur avec un flux de travail basé sur le push est sa facilité pour le producteur (dans ce cas, le Observable) d’émettre des articles trop rapidement pour le consommateur (Observateur) procéder.

Un bavard Observable et un lent Observateur peut entraîner rapidement un arriéré d’articles non consommés, ce qui va engloutir les ressources système et peut même entraîner un OutOfMemoryException. Ce problème est connu sous le nom contrepression.

Si vous soupçonnez qu'une contre-pression se produit dans votre application, plusieurs solutions sont possibles, notamment l'utilisation d'un opérateur pour réduire le nombre d'articles en cours de production..

Création de périodes d'échantillonnage avec échantillon() et throttlefirst ()

Si un Observable émet un grand nombre d’éléments, il n’est peut-être pas nécessaire que le destinataire Observateur recevoir chaque un seul de ces articles.

Si vous pouvez en toute sécurité ignorer certains Observablevous pouvez utiliser quelques opérateurs pour créer des périodes d’échantillonnage, puis sélectionner des valeurs spécifiques émises au cours de ces périodes:

  • le échantillon() L'opérateur vérifie la sortie de l'observable à des intervalles spécifiés par vous, puis prend l'élément le plus récent qui a été émis au cours de cette période d'échantillonnage. Par exemple, si vous incluez .échantillon (5, SECONDES) dans votre projet, l'observateur recevra la dernière valeur émise au cours de chaque intervalle de cinq secondes. 
  • le throttleFirst () L'opérateur prend la première valeur émise pendant la période d'échantillonnage. Par exemple, si vous incluez .throttlefirst (5, SECONDES) alors l'observateur recevra la première valeur émise pendant chaque intervalle de cinq secondes.  

Émissions en lots avec tampon()

Si vous ne pouvez ignorer aucune émission en toute sécurité, vous pourrez peut-être encore soulager un peu Observateur en regroupant les émissions en lots, puis en les envoyant en masse. Le traitement des émissions par lots est généralement plus efficace que le traitement de plusieurs émissions séparément. Cette approche devrait donc améliorer le taux de consommation..

Vous pouvez créer des émissions par lots en utilisant le tampon() opérateur. Ici, nous utilisons tampon() pour regrouper tous les éléments émis sur une période de trois secondes:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Alternativement, vous pouvez utiliser tampon() créer un lot composé d’un nombre spécifique d’émissions. Par exemple, ici nous disons tampon() regrouper les émissions en groupes de quatre:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Remplacer les observables par des fluides

Une autre méthode de réduction du nombre d’émissions consiste à remplacer le Observable cela vous cause des problèmes avec un Fluide.

Dans RxJava 2, l’équipe RxJava a décidé de scinder le standard Observable en deux types: le type que nous avons examiné tout au long de cette série, et Fluides.

Fluides fonctionnent de la même manière que Observables, mais avec une différence majeure: Fluides n’envoyez que le nombre d’éléments requis par les observateurs. Si vous avez un Observable émet plus d’articles que son observateur ne peut en consommer, vous pouvez envisager de passer à un système Fluide au lieu.

Avant de pouvoir commencer à utiliser Fluides dans vos projets, vous devez ajouter la déclaration d'importation suivante:

importer io.reactivex.Flowable;

Vous pouvez alors créer Fluides utilisant exactement les mêmes techniques utilisées pour créer Observables. Par exemple, chacun des extraits de code suivants créera un Fluide c'est capable d'émettre des données:

Fluide flowable = Flowable.fromArray (nouvelle chaîne [] "sud", "nord", "ouest", "est");… flowable.subscribe ()
Fluide fluidable = plage de flux (0, 20);… flux.abonnement ()

À ce stade, vous vous demandez peut-être pourquoi je voudrais utiliser Observables quand je peux juste utiliser Fluides et ne pas avoir à se soucier de la contre-pression? La réponse est qu'un Fluide encourt plus d'une surcharge qu'un habitué Observable, donc dans l’intérêt de créer une application très performante, vous devriez vous en tenir à Observables sauf si vous soupçonnez que votre application a des problèmes de contre-pression.

Simple

UNE Fluide n'est pas la seule variation sur Observable que vous trouverez dans RxJava, car la bibliothèque inclut également le Unique classe.

Simple sont utiles lorsque vous avez juste besoin d’émettre une valeur. Dans ces scénarios, créer un Observable peut sembler exagéré, mais un Unique est conçu pour simplement émettre une seule valeur et ensuite compléter, soit en appelant:

  • onSuccès (): Le Unique émet sa seule valeur.  
  • onError (): Si la Unique est incapable d'émettre son article, alors il passera cette méthode le résultat Jetable.

UNE Unique n'appellera qu'une de ces méthodes, puis se terminera immédiatement.

Regardons un exemple de Unique en action, encore une fois, pour gagner du temps, nous réutilisons du code:

importer android.os.Bundle; importer android.support.v7.app.AppCompatActivity; importer android.util.Log; importer io.reactivex.Single; importer io.reactivex.SingleObserver; importer io.reactivex.disposables.Disposable; classe publique MainActivity étend AppCompatActivity public statique final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  SingleObserver privé getSingleObserver () retourne le nouveau SingleObserver() @Override public null onSubscribe (Jetable d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (Valeur de chaîne) Log.e (TAG, "onSuccess:" + value);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Exécutez votre projet sur un AVD ou un périphérique Android physique et vous verrez la sortie suivante dans Logcat Monitor d'Android Studio:

Si vous changez d'avis et que vous souhaitez convertir un Unique dans un Observable à tout moment, une fois encore, RxJava a tous les opérateurs dont vous avez besoin, y compris:

  • fusionner avec(): Fusionne plusieurs Simple en un seul Observable
  • concatWith (): Chaîne les éléments émis par plusieurs Simple ensemble, pour former un Observable émission. 
  • toObservable (): Convertit un Unique dans un Observable qui émet l'élément qui a été initialement émis par le Single, puis termine.

Résumé

Dans cet article, nous avons exploré quelques opérateurs de RxJava que vous pouvez utiliser pour créer et gérer plusieurs threads, sans la complexité et le risque d'erreur inhérents au multithreading sur Android. Nous avons également vu comment vous pouvez utiliser la bibliothèque RxAndroid pour communiquer avec le principal thread d'interface utilisateur d'Android à l'aide d'une seule ligne de code et comment vous assurer que la contre-pression ne devient pas un problème dans votre application..

Nous avons abordé la bibliothèque RxAndroid à quelques reprises tout au long de cette série, mais cette bibliothèque regorge de liaisons RxJava spécifiques à Android qui peuvent s'avérer inestimables lorsque vous utilisez RxJava sur la plate-forme Android. regarder la librairie RxAndroid plus en détail.

Jusque-là, consultez certains de nos autres articles sur le codage pour Android!