Commit 4d6d089d authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

feat: rxjava

parent c2d51544
# Coding Dojo : RX Java
Bienvenue dans ce **Coding Dojo** ciblé sur l'usage de la programmation réactive, en utilisant le projet **RX Java 2**.
RX Java 2 est une librairie implémentant le paradigme de la programmation réactive. Dans ce **Coding Dojo**, vous apprendrez comment utiliser **RX Java 2**, les concepts, les opérateurs and les bonnes pratiques.
## Frameworks
Les frameworks utilisés pour ce coding dojo sont nombreux. Ci-dessous une liste des principaux :
* Java 11
* RX Java 2.2.4
* JUnit 5
* Lombok 1.18.4
* SLF4J
* Logback
* Etc.
## Enoncé
### A. Level 1
Création de flux de données
La classe **Observable** représente un flux de données. Les méthodes **just** ou **fromIterable** crée un flux à partir d'une collection d'objets. Les flux de données sont des chaînes de caractères dans cet exercice.
Si vous ne souscrivez pas au flux, rien ne se passe. La méthode **subscribe** déclare un **observer** consommant les données passées dans le flux. La souscription déclenche les étapes de traitement du flux.
### B. Level 2
Anatomie d'un flux de données (**stream**)
Un **Stream** est une séquence de données, potentiellement non limitée. Ces données peuvent être connues ou inconnues à sa création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas quand les données vont être émises.
Un **Observer** peut recevoir 3 types d'événements notifié par l'une des 3 méthodes suivantes:
- **onNext**: notifie à chaque fois qu'un élément est passé dans le flux
- **onComplete**: notifie la fin du flux. `onNext ne sera plus appelé
- **onError**: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, **onComplete** n'est pas appelé.
RxJava met à disposition 3 méthodes à la disposition lors de la souscription.
- notifie les éléments : `stream.subscribe(item -> {});`
- notifie une erreur : `stream.subscribe(item -> {}, error -> {});`
- notifie la fin du flux : `stream.subscribe(item -> {}, error -> {}, () -> {});`
### C. Level 3
Cold Stream vs HotStream.
Nous allons aborder un concept clé dans la programmation. Il s'agit des flux qui peuvent-être froid (**Cold Stream**) ou chaud (**HotStream**).
Un **Cold Stream** recommence au début du flux pour chaque consommateur (**subscriber/observer**). Chaque consommateur obtient la liste complète des éléments.
Contrairement à un **Cold Stream**, un **HotStream** envoie les mêmes éléments à tous les consommateurs. Si un consommateur souscrit après le début de l'émission du flux, il ne receverra pas les éléments précédement émis.
### D. Level 4
Les types de flux
RX Java possède plusieurs types Java afin de spécialiser certains flux :
| Type | Description | Back pressure |
| ------------- | ---------------------------------------------------------------------|:--------------:|
| Single | Un flux émettant une unique valeur | Non |
| Maybe | Un flux émettant 0 ou 1 valeur au plus | Non |
| Completable | Un flux n'émettant pas de valeur | Non |
| Observable | Un flux émettant une séquence de données potentiellement illimitée | Non |
| Flowable | Un flux émettant une séquence de données potentiellement illimitée | Oui |
- **Single** est une version limitée de **Observable**. Nous verrons plus tard que certains opérateurs ne sont pas disponibles avec un **Single**. Les méthodes **doOnNext** et **doOnComplete** sont remplacées par **doOnSuccess**. Dans une application web, **Single** est utilisé lorsqu'un service REST retourne une unique ressource.
- **Single** n'accepte pas la valeur `null`. **Maybe** accepte un flux vide et peut donc avoir une valeur ou aucune.
- **Completable** n'émet aucune valeur, il exécute une action sans retourner de valeur. En conséquence, il ne fournit pas de méthode **doOnNext**. Dans une application web, **Completable** est utilisé lorsqu'un service REST exécute un traitement sans retourner de résultat.
- **Flowable** est une version améliorée de **Observable** supportant la back pressure.
### E. Level 5
Operators
RX Java est riche en opérateurs. Un opérateur permet d'altérer un flux d'événements ainsi obtenir un nouveau flux. Dans cette section, nous allons introduire certains des opérateurs les plus utilisés.
Il existe différents types d'opérateurs classés par catégorie.
- **Création** : `create`, `just`, `range`, `interval`
- **Filtrage** : `filter`, `first`, `last`, `distinct`, `skip`, `take`
- **Transformation** : `map`, `flatMap`, `scan`, `buffer`
- **Combinaison** : `combineLatest`, `merge`, `concat`, `zip`
- **Agregation** : `count`, `max`, `min`, `sum`, `reduce`
### F. Level 6
Schedulers
### G. Level 7
Testing
RX Java fournit un opérateur appelé `test` permettant de retourner une instance `TestObserver`
Cet utilitaire permet de vérifier différents types de résultat attendu
- Que le consommateur `Observer` a bien été souscrit
- Que l'événement de fin du flux a bien été envoyé
- Que le nombre d'éléments est correct
- Que les éléments arrivent bien dans l'ordre
RX Java fournit un deuxième utilitaire très utile pour les tests. Il s'agit de la classe `TestScheduler`, un scheduler conçu pour les tests. Il permet de contrôler l'horloge comme avancer le temps !
\ No newline at end of file
repositories {
jcenter()
mavenCentral()
}
plugins {
id("java")
id("org.springframework.boot") version "2.1.1.RELEASE"
id("idea")
}
apply(plugin = "io.spring.dependency-management")
java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
group = "com.ippon.codingdojo"
version = "1.0.0"
dependencies {
implementation("ch.qos.logback:logback-classic")
implementation("org.slf4j:log4j-over-slf4j")
implementation("io.reactivex.rxjava2:rxjava")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testImplementation("org.junit.jupiter:junit-jupiter-api")
}
configurations.all {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j", module = "log4j")
}
tasks.withType<Jar> {
enabled = false
}
tasks.withType<Test> {
useJUnitPlatform()
maxHeapSize = "1G"
testLogging.showStandardStreams = true
}
rootProject.name = "reactive-rxjava"
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level1 {
public static void main(String[] args) throws Exception {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise1</li>
* <li>Flash McQueen</li>
* <li>Chick Hicks</li>
* <li>The King</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
private static void exercise1() {
log.info("Start Level1/Exercise1");
// TODO Créez un flux de données avec la méthode `just`
}
/**
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise2</li>
* <li>Flash McQueen</li>
* <li>Martin</li>
* <li>Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
*/
private static void exercise2() {
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
log.info("Start Level1/Exercise2");
// TODO Créez un flux de données avec la méthode `fromIterable`
}
/**
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise3</li>
* <li>Winner: Flash McQueen</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
private static void exercise3() {
log.info("Start Level1/Exercise3");
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
// TODO Créez un flux de données avec la méthode `fromCallable`
}
/**
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise4</li>
* <li>Winner: The king</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
log.info("Start Level1/Exercise4");
var future = Executors.newSingleThreadScheduledExecutor().schedule(() -> "The king", 3, TimeUnit.SECONDS);
// TODO Créez un flux de données avec la méthode `fromFuture`
}
}
package fr.ippon.codingdojo.reactive;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class Level2 {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise1</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
*/
public static void exercise1() {
var data = List.of("Flash McQueen", "The King", "Martin", "Sally Carrera");
log.info("Start Level2/Exercise1");
Observable.fromIterable(data);
// TODO Affichez les éléments émis `doOnNext`
// TODO Affichez la notification de la fin du flux `doOnComplete`
}
/**
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* </ul>
* <p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
*/
public static void exercise2() {
log.info("Start Level2/Exercise2");
var data = List.of("Flash McQueen", "The King", "Unknown Character", "Martin", "Sally Carrera");
Observable.fromIterable(data)
.map(item -> {
if (item.startsWith("Unknown")) {
throw new RuntimeException("Unknown Character !");
}
return item;
});
// TODO Affichez les éléments émis
// TODO Affichez la notification de la fin du flux
// TODO Affichez l'erreur émis pendant le traitement du flux
}
/**
* Affiche les événements directement depuis le `subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* <ul>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
*
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* </ul>
*/
public static void exercise3() {
log.info("Start Level2/Exercise3");
var data = List.of("Flash McQueen", "The King", "Chick Hicks");
Observable.fromIterable(data);
// TODO Utilisez la méthode `subscribe` pour affichez les différents événements du flux
}
/**
* Utilise la méthodes {@link Observable#create(ObservableOnSubscribe)} pour émettre un flux dynamique avec les valeurs
* `Flash McQueen`, `The King`, `Chick Hicks`, `Martin` et `Sally Carrera`.
*
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
*
*/
public static void exercise4() {
log.info("Start Level2/Exercise4");
// TODO Créez un flux dynamique avec les éléments "Flash McQueen", "The King", "Chick Hicks", "Martin"
// et "Sally Carrera" et notifiez de la fin du flux
Observable<String> observable = null;
}
}
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.HotStream;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Level3 {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
}
/**
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `Cold Stream` émettant des valeurs fixes.
* <p><p>
* NOTE: sortie attendue
*
* <ul>
* <li>Start Level3/Exercise1</li>
* <li>[1] Next: Flash McQueen</li>
* <li>[1] Next: The King</li>
* <li>[1] Next: Chick Hicks</li>
* <li>[1] Completed</li>
* <li>[2] Next: Flash McQueen</li>
* <li>[2] Next: The King</li>
* <li>[2] Next: Chick Hicks</li>
* <li>[2] Completed</li>
* </ul>
*
* <p>
* Remarque:
* La souscription des deux consommateurs est synchrone. Chaque consommateur consomme le flux
* de données entierement chacun son tour.
*/
public static void exercise1() {
log.info("Start Level3/Exercise1");
// TODO Créez un `Cold Stream` avec les éléments "Flash McQueen", "The King" et "Chick Hicks"
// TODO Créez une première souscription
// TODO Créez une deuxième souscription
}
/**
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `Cold Stream` émettant des valeurs dynamiques.
* <p><p>
* NOTE: sortie attendue
*
* <ul>
* <li>Start Level3/Exercise2</li>
* <li>[1] Next: Date1 -> {generated_date}</li>
* <li>[1] Next: Date2 -> {generated_date}</li>
* <li>[1] Next: Date3 -> {generated_date}</li>
* <li>[1] Completed</li>
* <li>[2] Next: Date1 -> {generated_date}</li>
* <li>[2] Next: Date2 -> {generated_date}</li>
* <li>[2] Next: Date3 -> {generated_date}</li>
* <li>[2] Completed</li>
* </ul>
*
* <p>
* Remarque:
* Il s'agit encore d'un `Cold Stream` avec un flux dynamique, parce que chaque consommateur reçoit le flux de
* données depuis le début. Le flux n'est pas partagé entre les consommateurs.
*
*/
public static void exercise2() {
log.info("Start Level3/Exercise2");
// TODO Créez un `Cold Stream` avec la date/heure du jour
// TODO Créez une première souscription
// TODO Créez une deuxième souscription
}
/**
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `HotStream` {@link HotStream#create()}
* émettant une valeur toutes les secondes.
*
* <ul>
* <li>Le premier consommateur souscrit immédiatement déclanchant le flux de données et stoppe la souscription après 5 secondes</li>
* <li>Le deuxième consommateur souscrit après 3 secondes et stoppe la soucription après 4 secondes</li>
* </ul>
*
* NOTE: sortie attendue
*
* <ul>
* <li>Start Level3/Exercise1</li>
* <li>[1] Next: 1</li>
* <li>[1] Next: 2</li>
* <li>[1] Next: 3</li>
* <li>[2] Next: 3</li>
* <li>[1] Next: 4</li>
* <li>[2] Next: 4</li>
* <li>[2] Next: 5</li>
* <li>[2] Next: 6</li>
* </ul>
*
* Remarque:
* Le flux de données est partagé par les deux consommateurs. La méhode {@link Observable#publish()} partage le flux de données
* à tous les consommateurs. Les données ne sont pas consommées sur le {@link Thread} principal.
*/
public static void exercise3() {
Observable<Long> observable = HotStream.create();
log.info("Start Level3/Exercise3");
// TODO Créez une première souscription immédiatement
// TODO Créez une deuxième souscription après trois secondes
// TODO Annulez la première souscription après cinq secondes
// TODO Annulez la deuxième souscription après sept secondes
}
}
package fr.ippon.codingdojo.reactive;
import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Level4 {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Crée un {@link Single} qui émet une unique valeur. Un {@link Single} n'accepte pas la valeur `null`, le flux ne peut pas être vide.
*
* <p><p>
* Note : sortie attendue
* <li>
* <ul>Start Level4/Exercise1</ul>
* <ul>Value: Flash McQueen</ul>
* </li>
* <p>
* Remarque : Les méthodes {@link io.reactivex.Observable#doOnNext} et {@link io.reactivex.Observable#doOnComplete}
* sont remplacées par la méthode {@link Single#doOnSuccess}.
*/
public static void exercise1() {
log.info("Start Level4/exercise1");
// TODO Créez un flux avec un unique élément et affichez l'événment onSuccess
}
/**
* Crée un {@link Maybe} qui émet une valeur. Un {@link Maybe} accepte la valeur `null``
*
* <p><p>
* Note : sortie attendue
* <li>
* <ul>Start Level4/Exercise2</ul>
* <ul>Value: Flash McQueen</ul>
* </li>
* <p>
* Remarque : La méthode {@link Maybe#doOnSuccess} notifie le consommateur de la valeur émise.
*/
public static void exercise2() {
log.info("Start Level4/Exercise2");
// TODO Créez un flux avec un seul élément et affichez les événements onSuccess/onComplete
}
/**
* Crée un {@link Maybe} vide.
*
* <p><p>
* Note : sortie attendue
* <li>
* <ul>Start Level4/Exercise3</ul>
* <ul>Completed</ul>
* </li>
* <p>
* Remarque : La méthode {@link Maybe#doOnComplete} notifie le consommateur de la fin du flux vide. Cette méthode
* est appelée seulement quand un {@link Maybe} est vide.
*/
public static void exercise3() {
log.info("Start Level4/Exercise3");
// TODO Créez un flux vide et affichez les événements onSuccess/onComplete
}