Une application multithread comprend deux parties ou plus pouvant être exécutées en parallèle. Cela permet à l'application de mieux utiliser les cœurs à l'intérieur du processeur de l'appareil. Cela lui permet d’accomplir les tâches plus rapidement et offre une expérience plus fluide et plus réactive pour l’utilisateur.
Le codage pour la simultanéité en Java peut être douloureux, mais grâce à RxJava
, c'est maintenant beaucoup plus facile à faire. Avec RxJava
, il vous suffit de déclarer le thread sur lequel vous voulez que la tâche soit exécutée (de manière déclarative) au lieu de créer et de gérer les threads (de manière impérative).
RxJava
fait usage de Planificateurs
avec le subscribeOn ()
et observerOn ()
opérateurs de simultanéité pour y parvenir. Dans ce tutoriel, vous en apprendrez plus sur Planificateurs
, la subscribeOn ()
opérateur, le observerOn ()
opérateur, et aussi comment tirer parti de la flatMap ()
opérateur pour obtenir la simultanéité. Mais d'abord, commençons par Planificateurs
dans RxJava
.
Pour suivre ce tutoriel, vous devez être familiarisé avec:
Consultez nos autres articles pour vous familiariser avec les bases de RxJava et des expressions lambda.
Planificateurs
dans RxJava sont utilisés pour exécuter une unité de travail sur un thread. UNE Planificateur
fournit une abstraction au mécanisme de threading Android et Java. Lorsque vous voulez exécuter une tâche et que vous utilisez un Planificateur
pour exécuter cette tâche, le Planificateur
accède à son pool de threads (une collection de threads prêts à être utilisés) puis exécute la tâche dans un thread disponible.
Vous pouvez également spécifier qu'une tâche doit être exécutée dans un thread spécifique. (Il y a deux opérateurs, subscribeOn ()
et observerOn ()
, qui peut être utilisé pour spécifier sur quel fil de la Planificateur
pool de threads, la tâche doit être exécutée.)
Comme vous le savez, dans Android, les processus longs ou les tâches gourmandes en ressources processeur ne doivent pas être exécutés sur le thread principal. Si un abonnement par un Observateur
à un Observable
est effectuée sur le thread principal, tout opérateur associé s'exécutera également sur le thread principal. Dans le cas d'une tâche de longue durée (par exemple une requête réseau) ou d'une tâche gourmande en ressources processeur (par exemple une transformation d'image), ceci bloque l'interface utilisateur jusqu'à la fin de la tâche, ce qui conduit à la terrible boîte de dialogue ANR (Application Not Responding). et l'application se brisant. Ces opérateurs peuvent à la place être basculés vers un autre thread avec le observerOn ()
opérateur.
Dans la section suivante, nous allons explorer les différents types de Planificateurs
et leurs utilisations.
Voici quelques types de Planificateurs
disponible en RxJava
et RxAndroid
pour indiquer le type de thread sur lequel exécuter des tâches.
Schedulers.immediate ()
: retourne un Planificateur
qui exécute le travail instantanément dans le fil actuel. Sachez que ceci bloquera le thread en cours, il devrait donc être utilisé avec prudence. Schedulers.trampoline ()
: planifie les tâches dans le fil actuel. Ces tâches ne sont pas exécutées immédiatement mais sont exécutées une fois que le thread a terminé ses tâches en cours. Ceci est différent de Schedulers.immediate ()
car au lieu d'exécuter une tâche immédiatement, il attend que les tâches en cours se terminent. Schedulers.newThread ()
: déclenche un nouveau thread et retourne un Planificateur
exécuter la tâche dans le nouveau fil pour chaque Observateur
. Vous devriez faire attention en utilisant ceci car le nouveau thread n'est pas réutilisé par la suite mais est détruit. Schedulers.computation ()
: cela nous donne un Planificateur
destiné au travail intensif en calcul, tel que la transformation d’images, les calculs complexes, etc. Ce Planificateur
utilise une taille de pool de threads fixe qui dépend des cœurs de la CPU pour une utilisation optimale. Veillez à ne pas créer plus de threads que les cœurs de processeur disponibles, car cela pourrait réduire les performances.. Schedulers.io ()
: crée et retourne un Planificateur
désigné pour les travaux liés aux E / S, tels que les appels réseau asynchrones ou la lecture et l'écriture dans la base de données. Ces tâches ne nécessitent pas beaucoup de temps processeur, sinon elles utilisent Schedulers.computation ()
.Schedulers.single ()
: crée et retourne un Planificateur
et exécute plusieurs tâches séquentiellement dans un seul thread. Schedulers.from (Executor executor)
: cela va créer un Planificateur
qui exécutera une tâche ou une unité de travail sur la donnée Exécuteur
. AndroidSchedulers.mainThread ()
: cela va créer un Planificateur
qui exécute la tâche sur le fil principal de l'application Android. Ce type de planificateur est fourni par le RxAndroid
bibliothèque. subscribeOn ()
OpérateurEn utilisant le subscribeOn ()
opérateur de simultanéité, vous spécifiez que le Planificateur
devrait effectuer l'opération dans le Observable
en amont. Il va ensuite pousser les valeurs à la Observateurs
en utilisant le même fil. Voyons maintenant un exemple pratique:
importer android.os.Bundle; importer android.support.v7.app.AppCompatActivity; importer android.util.Log; importer io.reactivex.Observable; importer io.reactivex.ObservableOnSubscribe; importer io.reactivex.disposables.Disposable; importer io.reactivex.schedulers.Schedulers; Classe publique MainActivity étend AppCompatActivity privé statique final String [] STATES = "Lagos", "Abuja", "Abia", "Edo", "Enugu", "Niger", "Anambra"; private Disposable mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observableobservable = observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .doOnComplete () -> Log.d ("MainActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("MainActivity", "reçu" + s + "sur le fil" + Thread.currentThread (). getName ()); private ObservableOnSubscribe dataSource () return (emitter -> for (Etat de la chaîne: STATES) emitter.onNext (state); Log.d ("MainActivity", "emitting" + state + "sur le thread" + Thread.currentThread (). getName ()); Thread.sleep (600); emitter.onComplete ();); @Override protected void onDestroy () if (mDisposable! = Null &&! MDisposable.isDisposed ()) mDisposable.dispose (); super.onDestroy ();
Dans le code ci-dessus, nous avons un statique ArrayList
qui contient certains états au Nigeria. Nous avons aussi un champ qui est de type Jetable
. Nous obtenons le Jetable
par exemple en appelant Observable.subscribe ()
, et nous l'utilisons plus tard lorsque nous appelons la disposer()
méthode pour libérer toutes les ressources qui ont été utilisées. Cela aide à prévenir les fuites de mémoire. Notre la source de données()
méthode (qui peut renvoyer des données d’une source de base de données distante ou locale) renverra ObservableOnSubscribe
: cela est nécessaire pour créer notre propre Observable
plus tard en utilisant la méthode Observable.create ()
.
À l'intérieur de la source de données()
méthode, nous parcourons le tableau en émettant chaque élément vers le Observateurs
en appelant emitter.onNext ()
. Après que chaque valeur est émise, nous mettons le thread en veille afin de simuler un travail intensif. Enfin, nous appelons le onComplete ()
méthode pour signaler au Observateurs
que nous avons fini de transmettre des valeurs et qu'ils ne devraient plus en attendre.
Maintenant, notre la source de données()
La méthode ne doit pas être exécutée sur le fil principal de l'interface utilisateur. Mais comment est-ce spécifié? Dans l'exemple ci-dessus, nous avons fourni Schedulers.newThread ()
comme argument pour subscribeOn ()
. Cela signifie que le la source de données()
l'opération sera exécutée dans un nouveau thread. Notez également que dans l'exemple ci-dessus, nous avons juste un Observateur
. Si nous avions plusieurs Observateurs
, chacun d'eux aurait son propre fil.
Pour que nous puissions voir cela fonctionner, notre Observateur
imprime les valeurs qu'il obtient dans son onNext ()
méthode de la Observable
.
Lorsque nous exécutons cela et visualisons notre logcat sur Android Studio, vous pouvez voir que les émissions provenant du la source de données()
méthode à la Observateur
est arrivé sur le même fil-RxNewThreadScheduler-1
-dans lequel la Observateur
les a reçus.
Si vous ne spécifiez pas le .subscribeOn ()
méthode après la Observable.create ()
méthode, il sera exécuté sur le thread actuel, qui dans notre cas est le thread principal, bloquant ainsi l'interface utilisateur de l'application.
Vous devez connaître certains détails importants concernant la subscribeOn ()
opérateur. Vous ne devriez en avoir qu'un subscribeOn ()
dans le Observable
chaîne; l'ajout d'un autre élément n'importe où dans la chaîne n'aura aucun effet. L'emplacement recommandé pour placer cet opérateur est le plus près possible de la source pour des raisons de clarté. En d'autres termes, placez-le en premier dans la chaîne d'opérateur.
Observable.create (dataSource ()) .subscribeOn (Schedulers.computation ()) // cela a un effet .subscribeOn (Schedulers.io ()) // n'a aucun effet .doOnNext (s -> saveToCache (s); // exécuté sur Schedulers.computation ())
observerOn ()
OpérateurComme nous l'avons vu, le subscribeOn ()
opérateur de simultanéité va instruire le Observable
lequel Planificateur
à utiliser pour faire avancer les émissions le long du Observable
chaîne au Observateurs
.
Le métier de observerOn ()
opérateur de simultanéité, d'autre part, consiste à basculer les émissions ultérieures vers un autre thread ou Planificateur
. Nous utilisons cet opérateur pour contrôler sur quel fil les consommateurs en aval recevront les émissions. Voyons un exemple pratique.
importer android.os.Bundle; importer android.support.v7.app.AppCompatActivity; importer android.util.Log; importer android.widget.TextView; importer io.reactivex.Observable; importer io.reactivex.ObservableOnSubscribe; importer io.reactivex.android.schedulers.AndroidSchedulers; importer io.reactivex.disposables.Disposable; importer io.reactivex.schedulers.Schedulers; public class ObserveOnActivity étend AppCompatActivity privé Disposable mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); TextView textView = (TextView) findViewById (R.id.tv_main); Observableobservable = observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d ("ObserveOnActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("ObserveOnActivity", "reçu" + s + "sur le fil" + Thread.currentThread (). getName ()); textView.setText (s);); private ObservableOnSubscribe dataSource () return (emitter -> Thread.sleep (800); emitter.onNext ("Value"); Log.d ("ObserveOnActivity", "dataSource () sur le thread" + Thread.currentThread (). getName ( ));; emitter.onComplete ();); //…
Dans le code ci-dessus, nous avons utilisé le observerOn ()
opérateur et ensuite passé le AndroidSchedulers.mainThread ()
à cela. Ce que nous avons fait est de changer le fil de Schedulers.newThread ()
au fil principal Android. Cela est nécessaire car nous souhaitons mettre à jour le Affichage
widget, et ne peut le faire que depuis le fil principal de l'interface utilisateur. Notez que si vous ne passez pas au thread principal lorsque vous essayez de mettre à jour le Affichage
widget, l'application va planter et jeter un CalledFromWrongThreadException
.
Contrairement à la subscribeOn ()
opérateur, le observerOn ()
opérateur peut être appliqué plusieurs fois dans la chaîne d'opérateurs, modifiant ainsi le Planificateur
plus d'une fois.
Observableobservable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (Schedulers.io ()) .doOnNext (s -> saveToCache (s); Log.d ("ObserveOnActivity", "doOnNext" () sur le fil "+ Thread.currentThread (). getName ());) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d (" ObserveOnActivity "," Complet "));
Ce code a deux observerOn ()
les opérateurs. Le premier utilise le Schedulers.io ()
, ce qui signifie que le saveToCache ()
la méthode sera exécutée sur le Schedulers.io ()
fil. Après cela, il passe ensuite à la AndroidSchedulers.mainThread ()
où Observateurs
recevra les émissions de l'amont.
le flatMap ()
opérateur est un autre opérateur très puissant et important qui peut être utilisé pour obtenir une concurrence. La définition selon la documentation officielle est la suivante:
Transformez les éléments émis par un observable en observables, puis réglez les émissions de ceux-ci en un seul observable..
Jetons un coup d'oeil à un exemple pratique qui utilise cet opérateur:
//… @Override protected void onCreate (Bundle savedInstanceState) //… final String [] states = "Lagos", "Abuja", "Imo", "Enugu"; ObservablestatesObservable = Observable.fromArray (états); statesObservable.flatMap (s -> Observable.create (getPopulation (s))) .subscribe (pair -> Log.d ("MainActivity", pair.first + "la population est" + pair.second)); private ObservableOnSubscribe getPopulation (String state) return (emitter -> Random r = new Random (); Log.d ("MainActivity", "getPopulation () pour" + state + "est appelé" + Thread.currentThread (). getName ( )); emitter.onNext (new Pair (état, r.nextInt (300000 - 10000) + 10000)); emitter.onComplete (););
Ceci imprimera ce qui suit sur Android Studio logcat:
getPopulation () pour Lagos appelée sur la population principale de Lagos est 80362 getPopulation () pour Abuja appelée sur la population principale d'Abuja est 132559 getPopulation () pour Imo appelée sur la population principale d'Imo est 34106 getPopulation () pour Enugu appelée sur la population principale d'Enugu est 220301
D'après le résultat ci-dessus, vous pouvez voir que les résultats que nous avons obtenus étaient dans le même ordre que dans le tableau. Également getPopulation ()
méthode pour chaque état a été traitée sur le même thread, le thread principal. Cela ralentit le résultat en sortie car ils ont été traités séquentiellement sur le thread principal..
Maintenant, pour que nous puissions obtenir une concurrence avec cet opérateur, nous voulons que le getPopulation ()
méthode pour chaque état (émissions de la États observables
) à traiter sur différents threads. Cela accélérera le traitement. Nous allons utiliser le flatMap ()
opérateur de le faire, car il crée un nouveau Observable
pour chaque émission. Nous appliquons ensuite le subscribeOn ()
opérateur de simultanéité à chacun, en passant un Planificateur
à cela.
statesObservable.flatMap (s -> Observable.create (getPopulation (s)) .subscribeOn (Schedulers.io ())) .subscribe (pair -> Log.d ("MainActivity", pair.first + "population is" + pair .seconde));
Chaque émission produisant un Observable
, la flatMap ()
Le travail de l'opérateur consiste à les fusionner ensemble, puis à les envoyer en un seul flux.
getPopulation () pour Lagos appelé sur RxCachedThreadScheduler-1 La population de Lagos est 143965 getPopulation () pour Abuja appelé sur RxCachedThreadScheduler-2 getPopulation () pour Enugu appelé sur RxCachedThreadScheduler-4 population d'Abuja est 158363 La population Enugu est 271420 -3 La population d'Imo est 81564
Dans le résultat ci-dessus, nous pouvons observer que chaque état getPopulation ()
La méthode a été traitée sur différents threads. Cela rend le traitement beaucoup plus rapide, mais observe également que les émissions du flatMap ()
opérateur qui ont été reçus par le Observateur
ne sont pas dans le même ordre que les émissions d'origine en amont.
Dans ce tutoriel, vous avez appris à gérer la concurrence d’accès à l’aide de RxJava 2: c’est quoi, les différents Planificateurs
disponible, et comment utiliser le subscribeOn ()
et observerOn ()
opérateurs de simultanéité. Je vous ai également montré comment utiliser le flatMap ()
opérateur pour obtenir la simultanéité.
En attendant, découvrez certains de nos autres cours et tutoriels sur le développement d'applications Java et Android.!