Commit 94bdf3b4 authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

feat: rxjava - level6

parent 31f9a66a
......@@ -93,6 +93,14 @@ RX Java possède plusieurs types Java afin de spécialiser certains flux :
Schedulers
Par défault, les flux fonctionne sur le `thread` courant, c'est-à-dire le `thread` qui a souscrit le flux. Il s'agit du `thread` appelé `main` dans les exercices précédents. La majorité de nos exemples était des traitements synchrones, excepté certains exercices. Lorsqu'une opération a de la latence, l'émission synchrone impose cette latence au `thread` qui a souscrit.
Nous allons étudier les émissions de flux asynchrone, une émission asynchrone arrive quand un producteur émet un flux dans un `thread` différent du `thread` qui a souscrit.
Les `Schedulers` de RX Java peuvent changer le comportement des `thread` d'émission et de souscription. Par exemple, nous pouvons choisir le `thread` qui émet le flux de données. Un `Scheduler` est très similaire à un `Executor` du JDK.
RX Java gére la concurrence à notre place avec deux méthodes : `subscribeOn` pour changer le `thread` de l'émetteur et `observerOn` pour changer le `thread` du consommateur.
### G. Level 7
Testing
......
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@Slf4j
public class Level6 {
public static void main(String[] args) throws Exception {
exercise1();
exercise2();
exercise3();
}
/**
* Crée un flux dans une tâche asynchrone à l'aide d'un {@link java.util.concurrent.Executor}.
* <p><p>
*
* Note: Résultat attendue
* <ul>
* <li>Start Level6/Exercise1</li>
* <li><[main] Subscribing ...</li>
* <li>[main] Subscribed</li>
* <li>[pool-1-thread-1] Sending: 1</li>
* <li>[pool-1-thread-1] Received: 1</li>
* <li>[pool-1-thread-1] Sending: 2</li>
* <li>[pool-1-thread-1] Received: 2</li>
* <li>[pool-1-thread-1] Sending: 3</li>
* <li>[pool-1-thread-1] Received: 3</li>
* <li>[pool-1-thread-1] Sending: 4</li>
* <li>[pool-1-thread-1] Received: 4</li>
* <li>[pool-1-thread-1] Sending: 5</li>
* <li>[pool-1-thread-1] Received: 5</li>
* <li>[pool-1-thread-1] Completed</li>
* </ul>
*
* Remarque: Il y a deux {@link Thread} démarrés, `main` et `pool-1-thread-1` crée en utilisant un {@link java.util.concurrent.Executor}.
* Le {@link Thread} `pool-1-thread-1` est utilisé pour émettre les données. Chaque émission est suivi par la correspondant émission parce
* qu'il reçoit dans le même {@link Thread} que l'émetteur.
* <p><p>
* Dans ce cas de figure, une latence chez le consommateur perturbera l'émission du flux par le producteur.
*/
public static void exercise1() throws Exception {
log.info("Start Level6/Exercise1");
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Observable<Integer> observable = Observable.create(subscriber -> executorService.submit(() -> {
IntStream.range(1, 5).forEach(value -> {
log.info("Sending: {}", value);
subscriber.onNext(value);
Helpers.sleep(500, TimeUnit.MILLISECONDS);
});
subscriber.onComplete();
}));
log.info("Subscribing ...");
observable
.doOnNext(value -> Helpers.sleep(1, TimeUnit.SECONDS))
.doOnComplete(() -> {
executorService.shutdown();
latch.countDown();
})
.subscribe(
value -> log.info("Received: {}", value),
Helpers::log,
() -> log.info("Completed"));
log.info("Subscribed.");
latch.await();
}
/**
* Crée un flux dans une tâche asynchrone à l'aide de la méthode {@link Observable#subscribeOn}.
* <p><p>
*
* Note: Résultat attendue
* <ul>
* <li>Start Level6/Exercise1</li>
* <li><[main] Subscribing ...</li>
* <li>[main] Subscribed</li>
* <li>[RxSingleScheduler-1] Sending: 1</li>
* <li>[RxSingleScheduler-1] Received: 1</li>
* <li>[RxSingleScheduler-1] Sending: 2</li>
* <li>[RxSingleScheduler-1] Received: 2</li>
* <li>[RxSingleScheduler-1] Sending: 3</li>
* <li>[RxSingleScheduler-1] Received: 3</li>
* <li>[RxSingleScheduler-1] Sending: 4</li>
* <li>[RxSingleScheduler-1] Received: 4</li>
* <li>[RxSingleScheduler-1] Sending: 5</li>
* <li>[RxSingleScheduler-1] Received: 5</li>
* <li>[RxSingleScheduler-1] Completed</li>
* </ul>
*
* Remarque: La méthode {@link Observable#subscribeOn} permet de changer le {@link Thread} de l'émetteur. L'émission
* fonctionne de la manière que l'exercice précédent mais la gestion du {@link Thread} est faite par RX Java.
*/
public static void exercise2() throws Exception {
log.info("Start Level6/Exercise2");
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = Observable.create(subscriber -> {
IntStream.range(1, 5).forEach(value -> {
log.info("Sending: {}", value);
subscriber.onNext(value);
Helpers.sleep(200, TimeUnit.MILLISECONDS);
});
subscriber.onComplete();
});
log.info("Subscribing ...");
observable
.subscribeOn(Schedulers.single())
.doOnComplete(() -> latch.countDown())
.subscribe(
value -> log.info("Received: {}", value),
Helpers::log,
() -> log.info("Completed"));
log.info("Subscribed");
latch.await();
}
/**
* Crée un flux synchrone et receptionne les données dans une tâche asynchrone à l'aide
* de la méthode {@link Observable#observeOn}.
* <p><p>
*
* Note: Résultat attendue
* <ul>
* <li>Start Level6/Exercise1</li>
* <li>[main] Subscribing ...</li>
* <li>[main] Subscribed</li>
* <li>[RxSingleScheduler-1] Sending: 1</li>
* <li>[main] Received: 1</li>
* <li>[RxSingleScheduler-1] Sending: 2</li>
* <li>[main] Received: 2</li>
* <li>[RxSingleScheduler-1] Sending: 3</li>
* <li>[main] Received: 3</li>
* <li>[main] Sending: 4</li>
* <li>[RxSingleScheduler-1] Received: 4</li>
* <li>[main] Sending: 5</li>
* <li>[RxSingleScheduler-1] Received: 5</li>
* <li>[RxSingleScheduler-1] Completed</li>
* <li>[main] Subscribed</li>
* </ul>
*
* Remarque: La méthode {@link Observable#subscribeOn} permet de changer le {@link Thread} du consommateur.
*/
public static void exercise3() throws Exception {
log.info("Start Level6/Exercise3");
Observable<Integer> observable = Observable.create(subscriber -> {
IntStream.range(1, 5).forEach(value -> {
log.info("Sending: {}", value);
subscriber.onNext(value);
Helpers.sleep(100, TimeUnit.MILLISECONDS);
});
subscriber.onComplete();
});
log.info("Subscribing ...");
observable
.observeOn(Schedulers.single())
.subscribe(
value -> log.info("Received: {}", value),
Helpers::log,
() -> log.info("Completed"));
log.info("Subscribed");
}
}
package fr.ippon.codingdojo.reactive.util;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@UtilityClass
@Slf4j
public class Helpers {
public static void sleep(long delay, TimeUnit timeUnit) {
......@@ -14,4 +16,8 @@ public class Helpers {
Thread.currentThread().interrupt();
}
}
public static void log(Throwable throwable) {
log.error("Error: {}", throwable.getMessage(), throwable);
}
}
......@@ -4,7 +4,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%thread] %-5level - %msg%n</pattern>
<pattern>%d [%thread] %msg%n</pattern>
</encoder>
</appender>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment