Commit abe380b2 authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

fix: switch Level1/Level2

parent 15b414c0
......@@ -6,7 +6,7 @@ RX Java 2 est une librairie implémentant le paradigme de la programmation réac
## Frameworks
Les frameworks utilisés pour ce coding dojo sont nombreux. Ci-dessous une liste des principaux :
Ci-dessous la liste des Frameworks utilisés :
* Java 11
* RX Java 2.2.4
......@@ -18,36 +18,44 @@ Les frameworks utilisés pour ce coding dojo sont nombreux. Ci-dessous une liste
## Enoncé
### A. Level 1
Création de flux de données
La classe **Observable** représente un flux de données. Les méthodes **just** ou **fromIterable** crée un flux à partir d'une collection d'objets. Les flux de données sont des chaînes de caractères dans cet exercice.
Si vous ne souscrivez pas au flux, rien ne se passe. La méthode **subscribe** déclare un **observer** consommant les données passées dans le flux. La souscription déclenche les étapes de traitement du flux.
### B. Level 2
### B. Level 1
Anatomie d'un flux de données (**stream**)
Un **Stream** est une séquence de données, potentiellement non limitée. Ces données peuvent être connues ou inconnues à sa création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas quand les données vont être émises.
Un **Stream**, représenté par la classe **Observable**, est une séquence de données, potentiellement non limitée. Ces données peuvent être connues ou inconnues à sa création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas quand les données vont être émises.
Si vous ne souscrivez pas au flux, rien ne se passe. La méthode **subscribe** déclare un **Observer** consommant les données passées dans le flux. La souscription déclenche les étapes de traitement du flux.
Un **Observer** peut recevoir 3 types d'événements notifié par l'une des 3 méthodes suivantes:
- **onNext**: notifie à chaque fois qu'un élément est passé dans le flux
- **onComplete**: notifie la fin du flux. `onNext ne sera plus appelé
- **onError**: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, **onComplete** n'est pas appelé.
- **onComplete**: notifie la fin du flux. **onNext** ne sera plus appelée
- **onError**: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, **onComplete** n'est pas appelée.
RxJava met à disposition 3 méthodes à la disposition lors de la souscription.
- notifie les éléments : `stream.subscribe(item -> {});`
- notifie une erreur : `stream.subscribe(item -> {}, error -> {});`
- notifie la fin du flux : `stream.subscribe(item -> {}, error -> {}, () -> {});`
### A. Level 2
Création de flux de données
Les méthodes **just** ou **fromIterable** de la classe **Observable** crée un flux à partir d'une collection d'objets. Nous pouvons également créer des flux à partir d'un **Callable** avec la méthode **fromCallable**, d'un **Future** avec la méthode **fromFuture** ou d'un intervalle de valeurs avec la méthodes **range**.
La méthode **Observable.create** offre la possibilité de créer un flux dynamique.
`Observable.create(subscriber -> {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onNext("c");
subscriber.onComplete();
})
`
### C. Level 3
Cold Stream vs HotStream.
Nous allons aborder un concept clé dans la programmation. Il s'agit des flux qui peuvent-être froid (**Cold Stream**) ou chaud (**HotStream**).
Nous allons aborder un concept clé dans la programmation réactive, les **Cold Stream** ou **HotStream**.
Un **Cold Stream** recommence au début du flux pour chaque consommateur (**subscriber/observer**). Chaque consommateur obtient la liste complète des éléments.
......
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level1 {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise1</li>
* <li>Flash McQueen</li>
* <li>Chick Hicks</li>
* <li>The King</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
private static void exercise1() {
public static void exercise1() {
log.info("Start Level1/Exercise1");
Observable.just("Flash McQueen", "Chick Hicks", "The King")
.subscribe(
log::info,
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
var data = List.of("Flash McQueen", "The King", "Martin", "Sally Carrera");
Observable.fromIterable(data)
.doOnNext(item -> log.info("Next: " + item))
.doOnComplete(() -> log.info("Completed"))
.subscribe();
}
/**
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise2</li>
* <li>Flash McQueen</li>
* <li>Martin</li>
* <li>Sally Carrera</li>
* <li>Completed</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* </ul>
* <p>
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
* L'exception est affichée dans le console parce que le `Subscriber` ne gère pas les événements. Il souscrit
* sans prendre en compte les éléments émis.
* <p><p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
*/
private static void exercise2() {
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
public static void exercise2() {
log.info("Start Level1/Exercise2");
var data = List.of("Flash McQueen", "The King", "Unknown Character", "Martin", "Sally Carrera");
Observable.fromIterable(data)
.subscribe(
log::info,
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
.map(item -> {
if (item.startsWith("Unknown")) {
throw new RuntimeException("Unknown Character !");
}
return item;
})
.doOnNext(s -> log.info("Next: " + s))
.doOnComplete(() -> log.info("Never called !"))
.doOnError(err -> log.error("Error: " + err.getMessage()))
.subscribe();
}
/**
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* Affiche les événements directement depuis le `Subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* <ul>
* <li>Start Level1/Exercise3</li>
* <li>Winner: Flash McQueen</li>
* <li>Completed</li>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
private static void exercise3() {
log.info("Start Level1/Exercise3");
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
Observable.fromCallable(callable)
.subscribe(
winner -> log.info("Winner: {}", winner),
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
}
/**
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
*
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise4</li>
* <li>Winner: The king</li>
* <li>Start Level1/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
log.info("Start Level1/Exercise4");
var future = Executors.newSingleThreadScheduledExecutor().schedule(() -> "The king", 3, TimeUnit.SECONDS);
public static void exercise3() {
log.info("Start Level1/Exercise3");
Observable.fromFuture(future)
var data = List.of("Flash McQueen", "The King", "Chick Hicks");
Observable.fromIterable(data)
.subscribe(
winner -> log.info("Winner: {}", winner),
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
item -> log.info("Next: " + item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed")
);
}
}
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level2 {
......@@ -15,131 +20,153 @@ public class Level2 {
exercise2();
exercise3();
exercise4();
exercise5();
}
/**
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise1</li>
* <li>Next: Flash McQueen</li>
* <li>Next: Chick Hicks</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
public static void exercise1() {
var data = List.of("Flash McQueen", "The King", "Martin", "Sally Carrera");
log.info("Start Level2/Exercise1");
Observable.fromIterable(data)
.doOnNext(item -> log.info("Next: " + item))
.doOnComplete(() -> log.info("Completed"))
.subscribe();
Observable.just("Flash McQueen", "Chick Hicks", "The King")
.subscribe(
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
}
/**
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
*/
public static void exercise2() {
log.info("Start Level2/Exercise2");
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
var data = List.of("Flash McQueen", "The King", "Unknown Character", "Martin", "Sally Carrera");
log.info("Start Level2/Exercise2");
Observable.fromIterable(data)
.map(item -> {
if (item.startsWith("Unknown")) {
throw new RuntimeException("Unknown Character !");
}
return item;
})
.doOnNext(s -> log.info("Next: " + s))
.doOnComplete(() -> log.info("Never called !"))
.doOnError(err -> log.error("Error: " + err.getMessage()))
.subscribe();
.subscribe(
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
}
/**
* Affiche les événements directement depuis le `subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* <ul>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
*
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Start Level2/Exercise3</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
public static void exercise3() {
log.info("Start Level2/Exercise3");
var data = List.of("Flash McQueen", "The King", "Chick Hicks");
Observable.fromIterable(data)
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
Observable.fromCallable(callable)
.subscribe(
item -> log.info("Next: " + item),
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed")
);
() -> log.info("Completed"));
}
/**
* Utilise la méthodes {@link Observable#create(ObservableOnSubscribe)} pour émettre un flux dynamique avec les valeurs
* `Flash McQueen`, `The King`, `Chick Hicks`, `Martin` et `Sally Carrera`.
*
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Start Level2/Exercise4</li>
* <li>Next: The king</li>
* <li>Completed</li>
* </ul>
*
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
log.info("Start Level2/Exercise4");
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Flash McQueen");
subscriber.onNext("The King");
subscriber.onNext("Chick Hicks");
subscriber.onNext("Martin");
subscriber.onNext("Sally Carrera");
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
var future = executorService.schedule(() -> "The king", 3, TimeUnit.SECONDS);
Observable.fromFuture(future)
.subscribe(
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
executorService.shutdown();
}
subscriber.onComplete();
/**
* Utilise la méthode {@link Observable#create} pour émettre des valeurs saisies par l'utilisateur.
* <ul>
* <li>Une entrée avec la valeur "complete" déclenche la fin du flux</li>
* <li>Une entrée contenant la valeur "error" déclenche une erreur</li>
* </ul>
*
* <p><p>
* NOTE: sortie attendue
* <p>
* En fonction de la saisie de l'utilisateur !
*
*/
public static void exercise5() {
log.info("Start Level2/Exercise5");
Scanner scan = new Scanner(System.in);
Observable<String> observable = Observable.create(subscriber -> {
while (true) {
String input = scan.next();
if (input.equals("complete")) {
subscriber.onComplete();
break;
} else if (input.contains("error")) {
subscriber.onError(new Exception(input));
break;
} else {
subscriber.onNext(input);
}
}
});
observable.subscribe(
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed")
);
() -> log.info("Completed"));
scan.close();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment