Commit 28662cfb authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

fix: javadocs

parent 3357f896
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import fr.ippon.codingdojo.reactive.util.Log;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;
......@@ -13,116 +12,128 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class Level1 {
public static void main(String[] args) throws Exception {
exercise1();
exercise2();
exercice3();
exercise4();
}
/**
* Crée un `Observable` de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode `Observable.just`
*
* NOTE: sortie attendue
*
* Start Level1/Exercise1
* Flash McQueen
* Chick Hicks
* The King
* Completed
* Stop
*
* REMARQUE:
* la sortie du flux est synchrone !
*/
private static void exercise1() {
log.info("Start Level1/Exercise1");
Observable.just("Flash McQueen", "Chick Hicks", "The King")
.subscribe(log::info, Log::error, Log::completed);
log.info("Stop");
}
/**
* Crée un `Observable` à partir de la liste `data`
*
* NOTE: sortie attendue
*
* Start Level1/Exercise2
* Flash McQueen
* Martin
* Sally Carrera
* Completed
* Stop
*
* REMARQUE:
* La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
*/
private static void exercise2() {
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
log.info("Start Level1/Exercise2");
Observable.fromIterable(data)
.subscribe(log::info, Log::error, Log::completed);
log.info("Stop");
}
/**
* Crée un flux à partir d'une tâche `Callable`, la tâche émet la valeur après 5 secondes.
*
* NOTE: sortie attendue
*
* Start Level1/Exercise3
* Winner: Flash McQueen
* Completed
* Stop
*
* REMARQUE:
* La tâche est exécutée de manière synchrone, le `Thread` principal attend le résultat de la tâche.
*/
private static void exercice3() {
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
log.info("Start Level1/Exercise3");
Observable.fromCallable(callable)
.subscribe(winner -> log.info("Winner: {}", winner), Log::error, Log::completed);
log.info("Stop");
}
/**
* Crée un `Observable` à partir d'un objet `Future`, le résultat est émit par une tâche planifiée après 3 secondes.
*
* NOTE: sortie attendue
*
* Start Level1/Exercise4
* Winner: The king
* Completed
* Stop
*
* REMARQUE:
* La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
var future = Executors.newSingleThreadScheduledExecutor().schedule(() -> "The king", 3, TimeUnit.SECONDS);
log.info("Start Level1/Exercise4");
Observable.fromFuture(future)
.subscribe(winner -> log.info("Winner: {}", winner), Log::error, Log::completed);
log.info("stop");
}
public static void main(String[] args) throws Exception {
exercise1();
exercise2();
exercise3();
exercise4();
}
/**
* Crée un {@link Observable} de `Flash McQueen`, `Chick Hicks` and `The King` avec la méthode {@link Observable#just}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise1</li>
* <li>Flash McQueen</li>
* <li>Chick Hicks</li>
* <li>The King</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
* <p>
* REMARQUE:
* la sortie du flux est synchrone !
*/
private static void exercise1() {
log.info("Start Level1/Exercise1");
Observable.just("Flash McQueen", "Chick Hicks", "The King")
.subscribe(
log::info,
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
log.info("Stop");
}
/**
* Crée un {@link Observable} à partir de la liste `data` avec la méthode {@link Observable#fromIterable(Iterable)}
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise2</li>
* <li>Flash McQueen</li>
* <li>Martin</li>
* <li>Sally Carrera</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
* <p>
* REMARQUE: La sortie du flux est encore synchrone, parce que la liste est consommée de manière synchrone
*/
private static void exercise2() {
var data = List.of("Flash McQueen", "Martin", "Sally Carrera");
log.info("Start Level1/Exercise2");
Observable.fromIterable(data)
.subscribe(
log::info,
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
log.info("Stop");
}
/**
* Crée un {@link Observable} à partir d'une tâche {@link Callable}, la tâche émet la valeur après 5 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise3</li>
* <li>Winner: Flash McQueen</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
* <p>
* REMARQUE: La tâche est exécutée de manière synchrone, le {@link Thread} principal attend le résultat de la tâche.
*/
private static void exercise3() {
Callable<String> callable = () -> {
Helpers.sleep(5, TimeUnit.SECONDS);
return "Flash McQueen";
};
log.info("Start Level1/Exercise3");
Observable.fromCallable(callable)
.subscribe(
winner -> log.info("Winner: {}", winner),
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
log.info("Stop");
}
/**
* Crée un {@link Observable} à partir d'un {@link java.util.concurrent.Future}, le résultat est émit par une tâche planifiée après 3 secondes.
* <p><p>
* NOTE: sortie attendue
* <ul>
* <li>Start Level1/Exercise4</li>
* <li>Winner: The king</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
* <p>
* REMARQUE: La tâche est encore exécutée de manière synchrone !
*/
public static void exercise4() {
var future = Executors.newSingleThreadScheduledExecutor().schedule(() -> "The king", 3, TimeUnit.SECONDS);
log.info("Start Level1/Exercise4");
Observable.fromFuture(future)
.subscribe(
winner -> log.info("Winner: {}", winner),
(error) -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed"));
log.info("stop");
}
}
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Log;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Anatomie d'un flux de données (stream).
*
* Un `Stream` est une séquence données, potentiellement non limitée. Ces données peuvent être connues ou inconnues
* à son création. Les flux sont des constructions asynchrones. Lorsque vous observez un flux, vous ne savez pas
* quand les données vont être émises.
*
* Un `Observer` peut recevoir 3 types d'événements notifié par l'une des 3 méthodes suivantes:
* - onNext: notifie à chaque fois qu'un élément est passé dans le flux
* - onComplete: notifie la fin du flux. `onNext ne sera plus appelé
* - onError: notifie qu'une erreur est arrivée. Le flux n'émettra plus d'éléments, `onComplete`n'est pas appelé.
*
*/
@Slf4j
public class Level2 {
......@@ -32,20 +19,18 @@ public class Level2 {
}
/**
* TODO: Afficher un message quand
* - un élément est émis dans le flux
* - le flux est terminé
*
* Affiche un message lorsqu'un élément est émis dans le flux et que le flux est terminé
* <p><p>
* NOTE: sortie attendue
*
* Start Level2/Exercise1
* Next: Flash McQueen
* Next: The King
* Next: Martin
* Next: Sally Carrera
* Completed
* Stop
*
* <ul>
* <li>Start Level2/Exercise1</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
*/
public static void exercise1() {
......@@ -54,26 +39,26 @@ public class Level2 {
log.info("Start Level2/Exercise1");
Observable.fromIterable(data)
.doOnNext(item -> log.info("Next: " + item))
.doOnComplete(Log::completed)
.doOnComplete(() -> log.info("Completed"))
.subscribe();
log.info("Stop");
}
/**
* TODO: Déclencher une erreur quand l'élément commence par la chaîne `Unknown`. Utilisez les méthodes
* `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
*
* Utilise les méthodes `doOnNext`, `doOnComplete` et `doOnError` pour afficher les différents événements.
* Une erreur est déclenchée lorsqu'un élément commence par la chaîne `Unknown`.
* <p><p>
* NOTE: sortie attendue
*
* Start Level2/Exercise2
* Next: Flash McQueen
* Next: The King
* Error: Unknown Character !
* Stop
*
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Error: Unknown Character !</li>
* <li>Stop</li>
* </ul>
* <p>
* REMARQUE:
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
*
* Une fois l'erreur déclenchée, plus aucun élément n'est émis. La méthode `onComplete n'est pas appelée.
*/
public static void exercise2() {
......@@ -96,22 +81,24 @@ public class Level2 {
}
/**
* TODO: Nous allons maintenant recevoir ces événements directement dans le `subscriber`. RxJava met
* 3 méthodes à la disposition lors de la souscription.
* Affiche les événements directement depuis le `subscriber`. RxJava met 3 méthodes à la disposition lors de la souscription.
*
* - notifie les éléments : stream.subscribe(item -> {});
* - notifie une erreur : stream.subscribe(item -> {}, error -> {});
* - notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});
* <ul>
* <li>notifie les éléments : stream.subscribe(item -> {});</li>
* <li>notifie une erreur : stream.subscribe(item -> {}, error -> {});</li>
* <li>notifie la fin du flux : stream.subscribe(item -> {}, error -> {}, () -> {});</li>
* </ul>
*
* <p><p>
* NOTE: sortie attendue
*
* Start Level2/Exercise2
* Next: Flash McQueen
* Next: The King
* Next: Chick Hicks
* Completed
* Stop
*
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
*/
public static void exercise3() {
......@@ -121,27 +108,29 @@ public class Level2 {
Observable.fromIterable(data)
.subscribe(
item -> log.info("Next: " + item),
Log::error,
Log::completed
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed")
);
log.info("Stop");
}
/**
* TODO: Depuis le début, nous utilisons des flux de données avec des valeurs fixes. Utiliser la méthodes `create`
* pour émettre un flux avec les valeurs `Flash McQueen`, `The King`, `Chick Hicks`, `Martin` et `Sally Carrera`.
* Utilise la méthodes {@link Observable#create(ObservableOnSubscribe)} pour émettre un flux dynamique avec les valeurs
* `Flash McQueen`, `The King`, `Chick Hicks`, `Martin` et `Sally Carrera`.
*
* <p><p>
* NOTE: sortie attendue
*
* Start Level2/Exercise2
* Next: Flash McQueen
* Next: The King
* Next: Chick Hicks
* Next: Martin
* Next: Sally Carrera
* Completed
* Stop
* <ul>
* <li>Start Level2/Exercise2</li>
* <li>Next: Flash McQueen</li>
* <li>Next: The King</li>
* <li>Next: Chick Hicks</li>
* <li>Next: Martin</li>
* <li>Next: Sally Carrera</li>
* <li>Completed</li>
* <li>Stop</li>
* </ul>
*
*/
public static void exercise4() {
......@@ -159,9 +148,9 @@ public class Level2 {
});
observable.subscribe(
item -> log.info("Next: " + item),
Log::error,
Log::completed
item -> log.info("Next: {}", item),
error -> log.error("Error: {}", error.getMessage()),
() -> log.info("Completed")
);
log.info("Stop");
......
......@@ -21,31 +21,34 @@ public class Level3 {
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `Cold Stream` émettant des valeurs fixes.
*
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `Cold Stream` émettant des valeurs fixes.
* <p><p>
* NOTE: sortie attendue
*
* Start Level3/Exercise1
* [1] Next: Flash McQueen
* [1] Next: The King
* [1] Next: Chick Hicks
* [1] Completed
* [2] Next: Flash McQueen
* [2] Next: The King
* [2] Next: Chick Hicks
* [2] Completed
* Stop
* <ul>
* <li>Start Level3/Exercise1</li>
* <li>Stop</li>
* <li>[1] Next: Flash McQueen</li>
* <li>[1] Next: The King</li>
* <li>[1] Next: Chick Hicks</li>
* <li>[1] Completed</li>
* <li>[2] Next: Flash McQueen</li>
* <li>[2] Next: The King</li>
* <li>[2] Next: Chick Hicks</li>
* <li>[2] Completed</li>
* </ul>
*
* <p>
* Remarque:
* La souscription des deux consommateurs est synchrone. Chaque consommateur consomme le flux
* de données depuis le début de celui-ci.
* de données entierement chacun son tour.
*/
public static void exercise1() {
log.info("Start Level3/Exercise1");
Observable<String> observable = Observable.just("Flash McQueen", "The King", "Chick Hicks");
log.info("Stop");
observable.subscribe(
item -> log.info("[1] Next: {}", item),
......@@ -58,24 +61,28 @@ public class Level3 {
error -> log.error("[2] Error: {}", error.getMessage()),
() -> log.info("[2] Completed")
);
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `Cold Stream` émettant des valeurs dynamiques.
*
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `Cold Stream` émettant des valeurs dynamiques.
* <p><p>
* NOTE: sortie attendue
*
* Start Level3/Exercise2
* [1] Next: Date1 -> {generated_date}
* [1] Next: Date2 -> {generated_date}
* [1] Next: Date3 -> {generated_date}
* [1] Completed
* [2] Next: Date1 -> {generated_date}
* [2] Next: Date2 -> {generated_date}
* [2] Next: Date3 -> {generated_date}
* [2] Completed
* Stop
* <ul>
* <li>Start Level3/Exercise2</li>
* <li>Stop</li>
* <li>[1] Next: Date1 -> {generated_date}</li>
* <li>[1] Next: Date2 -> {generated_date}</li>
* <li>[1] Next: Date3 -> {generated_date}</li>
* <li>[1] Completed</li>
* <li>[2] Next: Date1 -> {generated_date}</li>
* <li>[2] Next: Date2 -> {generated_date}</li>
* <li>[2] Next: Date3 -> {generated_date}</li>
* <li>[2] Completed</li>
* </ul>
*
* <p>
* Remarque:
* Il s'agit encore d'un `Cold Stream` avec un flux dynamique, parce que chaque consommateur reçoit le flux de
* données depuis le début. Le flux n'est pas partagé entre les consommateurs.
......@@ -83,12 +90,14 @@ public class Level3 {
*/
public static void exercise2() {
log.info("Start Level3/Exercise2");
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Date1 -> " + LocalDateTime.now());
subscriber.onNext("Date2 -> " + LocalDateTime.now());
subscriber.onNext("Date3 -> " + LocalDateTime.now());
subscriber.onComplete();
});
log.info("Stop");
observable.subscribe(
item -> log.info("[1] Next: {}", item),
......@@ -99,52 +108,57 @@ public class Level3 {
observable.subscribe(
item -> log.info("[2] Next: {}", item),
error -> log.error("[2] Error: {}", error.getMessage()),
() -> System.out.println("[2] Completed")
() -> log.info("[2] Completed")
);
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `HotStream` émettant une valeur toutes les secondes.
* Souscrit deux consommateurs {@link io.reactivex.Observer} à un `HotStream` {@link HotStream#create()}
* émettant une valeur toutes les secondes.
*
* - Le premier consommateur souscrit immédiatement déclanchant le flux de données. Il stoppe la souscription après 5 secondes.
* - Le deuxième consommateur souscrit après 3 secondes. Il stoppe la soucription après 4 secondes.
* <ul>
* <li>Le premier consommateur souscrit immédiatement déclanchant le flux de données et stoppe la souscription après 5 secondes</li>
* <li>Le deuxième consommateur souscrit après 3 secondes et stoppe la soucription après 4 secondes</li>
* </ul>
*
* NOTE: sortie attendue
*
* Start Level3/Exercise2
* [1] Next: 1
* [1] Next: 2
* [1] Next: 3
* [2] Next: 3
* [1] Next: 4
* [2] Next: 4
* [2] Next: 5
* [2] Next: 6
* [2] Next: Date2 -> {generated_date}
* [2] Next: Date3 -> {generated_date}
* [2] Completed
* Stop
* <ul>
* <li>Start Level3/Exercise1</li>
* <li>Stop</li>
* <li>[1] Next: 1</li>
* <li>[1] Next: 2</li>
* <li>[1] Next: 3</li>
* <li>[2] Next: 3</li>
* <li>[1] Next: 4</li>
* <li>[2] Next: 4</li>
* <li>[2] Next: 5</li>
* <li>[2] Next: 6</li>
* </ul>
*
* Remarque:
* Le flux de données est partagé par les deux consommateurs. La méhode `Observable.publish` partage le flux de données
* à tous les consommateurs.
* Le flux de données est partagé par les deux consommateurs. La méhode {@link Observable#publish()} partage le flux de données
* à tous les consommateurs. Les données ne sont pas consommées sur le {@link Thread} principal.
*/
public static void exercise3() {
Observable<Long> observable = HotStream.create();
log.info("Start Level3/Exercise3");
Disposable disposable1 = observable.subscribe(
item -> log.info("[1] Next: {}", item),
error -> log.error("[1] Error: {}", error.getMessage()),
()