Commit 349e093b authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

feat: switch Level1/Level2

parent a4f7ff95
......@@ -6,52 +6,58 @@ RX Java 2 est une librairie implémentant le paradigme de la programmation réac
## Frameworks
Les frameworks utilisés pour ce coding dojo sont nombreux. Ci-dessous une liste des principaux :
Ci-dessous la liste des Frameworks utilisés :
* Java 11
* RX Java 2.2.4
* JUnit 5
* Lombok 1.18.4
* SLF4J
* Logback
* Etc.
## Enoncé
### A. Level 1
Création de flux de données
La classe **Observable** représente un flux de données. Les méthodes **just** ou **fromIterable** crée un flux à partir d'une collection d'objets. Les flux de données sont des chaînes de caractères dans cet exercice.
Si vous ne souscrivez pas au flux, rien ne se passe. La méthode **subscribe** déclare un **observer** consommant les données passées dans le flux. La souscription déclenche les étapes de traitement du flux.
### B. Level 2
### B. Level 1
Anatomie d'un flux de données (**stream**)
Un **Stream** est une séquence de données, potentiellement non limitée. Ces données peuvent être connues ou inconnues à sa création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas quand les données vont être émises.
Un **Stream**, représenté par la classe **Observable**, est une séquence de données, potentiellement non limitée. Ces données peuvent être connues ou inconnues à sa création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas quand les données vont être émises.
Si vous ne souscrivez pas au flux, rien ne se passe. La méthode **subscribe** déclare un **Observer** consommant les données passées dans le flux. La souscription déclenche les étapes de traitement du flux.
Un **Observer** peut recevoir 3 types d'événements notifié par l'une des 3 méthodes suivantes:
- **onNext**: notifie à chaque fois qu'un élément est passé dans le flux
- **onComplete**: notifie la fin du flux. `onNext ne sera plus appelé
- **onError**: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, **onComplete** n'est pas appelé.
- **onComplete**: notifie la fin du flux. **onNext** ne sera plus appelée
- **onError**: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, **onComplete** n'est pas appelée.
RxJava met à disposition 3 méthodes à la disposition lors de la souscription.
- notifie les éléments : `stream.subscribe(item -> {});`
- notifie une erreur : `stream.subscribe(item -> {}, error -> {});`
- notifie la fin du flux : `stream.subscribe(item -> {}, error -> {}, () -> {});`
### A. Level 2
Création de flux de données
Les méthodes **just** ou **fromIterable** de la classe **Observable** crée un flux à partir d'une collection d'objets. Nous pouvons également créer des flux à partir d'un **Callable** avec la méthode **fromCallable**, d'un **Future** avec la méthode **fromFuture** ou d'un intervalle de valeurs avec la méthodes **range**.
La méthode **Observable.create** offre la possibilité de créer un flux dynamique.
`Observable.create(subscriber -> {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onNext("c");
subscriber.onComplete();
})
`
### C. Level 3
Cold Stream vs HotStream.
Nous allons aborder un concept clé dans la programmation. Il s'agit des flux qui peuvent-être froid (**Cold Stream**) ou chaud (**HotStream**).
Nous allons aborder un concept clé dans la programmation réactive, les **Cold Stream** ou **HotStream**.
Un **Cold Stream** recommence au début du flux pour chaque consommateur (**subscriber/observer**). Chaque consommateur obtient la liste complète des éléments.
Contrairement à un **Cold Stream**, un **HotStream** envoie les mêmes éléments à tous les consommateurs. Si un consommateur souscrit après le début de l'émission du flux, il ne recevera pas les éléments précédemment émis.
Contrairement à un **Cold Stream**, un **HotStream** envoie les mêmes éléments à tous les consommateurs. Si un consommateur souscrit après le début de l'émission du flux, il ne receverra pas les éléments précédement émis.
### D. Level 4
......@@ -68,21 +74,21 @@ RX Java possède plusieurs types Java afin de spécialiser certains flux :
| Flowable | Un flux émettant une séquence de données potentiellement illimitée | Oui |
- **Single** est une version limitée de **Observable**. Nous verrons plus tard que certains opérateurs ne sont pas disponibles avec un **Single**. Les méthodes **doOnNext** et **doOnComplete** sont remplacées par **doOnSuccess**. Dans une application web, **Single** est utilisé lorsqu'un service REST retourne une unique ressource.
- **Single** n'accepte pas la valeur `null`. **Maybe** accepte un flux vide et peut donc avoir une valeur ou aucune.
- **Completable** n'émet aucune valeur, il exécute une action sans retourner de valeur. En conséquence, il ne fournit pas de méthode **doOnNext**. Dans une application web, **Completable** est utilisé lorsqu'un service REST exécute un traitement sans retourner de résultat.
- **Flowable** est une version améliorée de **Observable** supportant la back pressure.
### E. Level 5
Operators
RX Java est riche en opérateurs. Un opérateur permet d'altérer un flux d'événements ainsi obtenir un nouveau flux. Dans cette section, nous allons introduire certains des opérateurs les plus utilisés.
Il existe différents types d'opérateurs classés par catégorie.
- **Création** : `create`, `just`, `range`, `interval`
- **Filtrage** : `filter`, `first`, `last`, `distinct`, `skip`, `take`
- **Transformation** : `map`, `flatMap`, `scan`, `buffer`
......@@ -93,13 +99,13 @@ Il existe différents types d'opérateurs classés par catégorie.
Schedulers
Par défaut, les flux fonctionne sur le `thread` courant, c'est-à-dire le `thread` qui a souscrit le flux. Il s'agit du `thread` appelé `main` dans les exercices précédents. La majorité de nos exemples était des traitements synchrones, excepté certains exercices. Lorsqu'une opération a de la latence, l'émission synchrone impose cette latence au `thread` qui a souscrit.
Par défault, les flux fonctionne sur le `thread` courant, c'est-à-dire le `thread` qui a souscrit le flux. Il s'agit du `thread` appelé `main` dans les exercices précédents. La majorité de nos exemples était des traitements synchrones, excepté certains exercices. Lorsqu'une opération a de la latence, l'émission synchrone impose cette latence au `thread` qui a souscrit.
Nous allons étudier les émissions de flux asynchrone, une émission asynchrone arrive quand un producteur émet un flux dans un `thread` différent du `thread` qui a souscrit.
Les **Scheduler** de RX Java peuvent changer le comportement des `thread` d'émission et de souscription. Par exemple, nous pouvons choisir le `thread` qui émet le flux de données. Un **Scheduler** est très similaire à un **Executor** du JDK.
RX Java gère la concurrence à notre place avec deux méthodes : `subscribeOn` pour changer le `thread` de l'émetteur et `observerOn` pour changer le `thread` du consommateur.
RX Java gére la concurrence à notre place avec deux méthodes : `subscribeOn` pour changer le `thread` de l'émetteur et `observerOn` pour changer le `thread` du consommateur.
### G. Level 7
......@@ -113,5 +119,5 @@ Cet utilitaire permet de vérifier différents types de résultat attendu
- Que l'événement de fin du flux a bien été envoyé
- Que le nombre d'éléments est correct
- Que les éléments arrivent bien dans l'ordre
RX Java fournit un deuxième utilitaire très utile pour les tests. Il s'agit de la classe **TestScheduler**, un scheduler conçu pour les tests. Il permet de contrôler l'horloge comme avancer le temps !
\ No newline at end of file
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level1 {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise1</li>
* <li>Flash McQueen</li>
* <li>Chick Hicks</li>
* <li>The King</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
private static void exercise1() {
public static void exercise1() {
log.info("Start Level1/Exercise1");
// TODO Créez un flux de données avec la méthode `just`
var data = List.of("Flash McQueen", "The King", "Martin", "Sally Carrera");
Observable.fromIterable(data);
// TODO Affichez les éléments émis `doOnNext`
// TODO Affichez la notification de la fin du flux `doOnComplete`
}
/**
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise2</li>
* <li>Flash McQueen</li>
* <li>Martin</li>
* <li>Sally Carrera</li>
* <li>Completed</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* </ul>
* <p>
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
* L'exception est affichée dans le console parce que le `Subscriber` ne gère pas les événements. Il souscrit
* sans prendre en compte les éléments émis.
* <p><p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
*/
private static void exercise2() {
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
public static void exercise2() {
log.info("Start Level1/Exercise2");
// TODO Créez un flux de données avec la méthode `fromIterable`
var data = List.of("Flash McQueen", "The King", "Unknown Character", "Martin", "Sally Carrera");
Observable.fromIterable(data)
.map(item -> {
if (item.startsWith("Unknown")) {
throw new RuntimeException("Unknown Character !");
}
return item;
});
// TODO Affichez les éléments émis
// TODO Affichez la notification de la fin du flux
// TODO Affichez l'erreur émis pendant le traitement du flux
}
/**
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* Affiche les événements directement depuis le `Subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* <ul>
* <li>Start Level1/Exercise3</li>
* <li>Winner: Flash McQueen</li>
* <li>Completed</li>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
private static void exercise3() {
log.info("Start Level1/Exercise3");
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
// TODO Créez un flux de données avec la méthode `fromCallable`
}
/**
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
*
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise4</li>
* <li>Winner: The king</li>
* <li>Start Level1/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
log.info("Start Level1/Exercise4");
public static void exercise3() {
log.info("Start Level1/Exercise3");
var future = Executors.newSingleThreadScheduledExecutor().schedule(() -> "The king", 3, TimeUnit.SECONDS);
var data = List.of("Flash McQueen", "The King", "Chick Hicks");
// TODO Créez un flux de données avec la méthode `fromFuture`
Observable.fromIterable(data);
// TODO Utilisez la méthode `subscribe` pour afficher les différents événements du flux
}
}
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level2 {
......@@ -15,118 +20,125 @@ public class Level2 {
exercise2();
exercise3();
exercise4();
exercise5();
}
/**
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise1</li>
* <li>Next: Flash McQueen</li>
* <li>Next: Chick Hicks</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
public static void exercise1() {
var data = List.of("Flash McQueen", "The King", "Martin", "Sally Carrera");
log.info("Start Level2/Exercise1");
Observable.fromIterable(data);
// TODO Affichez les éléments émis `doOnNext`
// TODO Affichez la notification de la fin du flux `doOnComplete`
// TODO Créez un flux avec la méthode `just`
}
/**
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
*/
public static void exercise2() {
log.info("Start Level2/Exercise2");
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
var data = List.of("Flash McQueen", "The King", "Unknown Character", "Martin", "Sally Carrera");
Observable.fromIterable(data)
.map(item -> {
if (item.startsWith("Unknown")) {
throw new RuntimeException("Unknown Character !");
}
return item;
});
log.info("Start Level2/Exercise2");
// TODO Affichez les éléments émis
// TODO Affichez la notification de la fin du flux
// TODO Affichez l'erreur émis pendant le traitement du flux
// TODO Créez un flux avec la méthode `fromIterable`
}
/**
* Affiche les événements directement depuis le `subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* <ul>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
*
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Start Level2/Exercise3</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
public static void exercise3() {
log.info("Start Level2/Exercise3");
var data = List.of("Flash McQueen", "The King", "Chick Hicks");
Observable.fromIterable(data);
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
// TODO Créez un flux avec la méthode `fromCallable`
// TODO Utilisez la méthode `subscribe` pour affichez les différents événements du flux
}
/**
* Utilise la méthodes {@link Observable#create(ObservableOnSubscribe)} pour émettre un flux dynamique avec les valeurs
* `Flash McQueen`, `The King`, `Chick Hicks`, `Martin` et `Sally Carrera`.
*
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Start Level2/Exercise4</li>
* <li>Next: The king</li>
* <li>Completed</li>
* </ul>
*
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
log.info("Start Level2/Exercise4");
// TODO Créez un flux dynamique avec les éléments "Flash McQueen", "The King", "Chick Hicks", "Martin"
// et "Sally Carrera" et notifiez de la fin du flux
Observable<String> observable = null;
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
var future = executorService.schedule(() -> "The king", 3, TimeUnit.SECONDS);
// TODO Créez un flux avec la méthode `fromFuture`
executorService.shutdown();
}
/**
* Utilise la méthode {@link Observable#create} pour émettre des valeurs saisies par l'utilisateur.
* <ul>
* <li>Une entrée avec la valeur "complete" déclenche la fin du flux</li>
* <li>Une entrée contenant la valeur "error" déclenche une erreur</li>
* </ul>
*
* <p><p>
* NOTE: sortie attendue
* <p>
* En fonction de la saisie de l'utilisateur !
*
*/
public static void exercise5() {
log.info("Start Level2/Exercise5");
Scanner scan = new Scanner(System.in);
// TODO Créez un flux avec la méthode `create`, chaque saisie de l'utilisateur est émis dans le flux.
// La saisie de la valeur "complete" déclenche la fin du flux
// La saisie contenant la valeur "error" déclenche une erreur
scan.close();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment