Commit 153b2d7d authored by Julien SADAOUI's avatar Julien SADAOUI
Browse files

feat: rxjava - Level 3

parent 56331c4d
package fr.ippon.codingdojo.reactive;
import fr.ippon.codingdojo.reactive.util.Helpers;
import fr.ippon.codingdojo.reactive.util.HotStream;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Level3 {
public static void main(String[] args) {
exercise1();
exercise2();
exercise3();
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `Cold Stream` émettant des valeurs fixes.
*
* NOTE: sortie attendue
*
* Start Level3/Exercise1
* [1] Next: Flash McQueen
* [1] Next: The King
* [1] Next: Chick Hicks
* [1] Completed
* [2] Next: Flash McQueen
* [2] Next: The King
* [2] Next: Chick Hicks
* [2] Completed
* Stop
*
* Remarque:
* La souscription des deux consommateurs est synchrone. Chaque consommateur consomme le flux
* de données depuis le début de celui-ci.
*/
public static void exercise1() {
log.info("Start Level3/Exercise1");
Observable<String> observable = Observable.just("Flash McQueen", "The King", "Chick Hicks");
observable.subscribe(
item -> log.info("[1] Next: {}", item),
error -> log.error("[1] Error: {}", error.getMessage()),
() -> log.info("[1] Completed")
);
observable.subscribe(
item -> log.info("[2] Next: {}", item),
error -> log.error("[2] Error: {}", error.getMessage()),
() -> log.info("[2] Completed")
);
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `Cold Stream` émettant des valeurs dynamiques.
*
* NOTE: sortie attendue
*
* Start Level3/Exercise2
* [1] Next: Date1 -> {generated_date}
* [1] Next: Date2 -> {generated_date}
* [1] Next: Date3 -> {generated_date}
* [1] Completed
* [2] Next: Date1 -> {generated_date}
* [2] Next: Date2 -> {generated_date}
* [2] Next: Date3 -> {generated_date}
* [2] Completed
* Stop
*
* Remarque:
* Il s'agit encore d'un `Cold Stream` avec un flux dynamique, parce que chaque consommateur reçoit le flux de
* données depuis le début. Le flux n'est pas partagé entre les consommateurs.
*
*/
public static void exercise2() {
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Date1 -> " + LocalDateTime.now());
subscriber.onNext("Date2 -> " + LocalDateTime.now());
subscriber.onNext("Date3 -> " + LocalDateTime.now());
subscriber.onComplete();
});
observable.subscribe(
item -> log.info("[1] Next: {}", item),
error -> log.error("[1] Error: {}", error.getMessage()),
() -> log.info("[1] Completed")
);
observable.subscribe(
item -> log.info("[2] Next: {}", item),
error -> log.error("[2] Error: {}", error.getMessage()),
() -> System.out.println("[2] Completed")
);
}
/**
* Souscrit deux consommateurs (subscriber/observer) à un `HotStream` émettant une valeur toutes les secondes.
*
* - Le premier consommateur souscrit immédiatement déclanchant le flux de données. Il stoppe la souscription après 5 secondes.
* - Le deuxième consommateur souscrit après 3 secondes. Il stoppe la soucription après 4 secondes.
*
* NOTE: sortie attendue
*
* Start Level3/Exercise2
* [1] Next: 1
* [1] Next: 2
* [1] Next: 3
* [2] Next: 3
* [1] Next: 4
* [2] Next: 4
* [2] Next: 5
* [2] Next: 6
* [2] Next: Date2 -> {generated_date}
* [2] Next: Date3 -> {generated_date}
* [2] Completed
* Stop
*
* Remarque:
* Le flux de données est partagé par les deux consommateurs. La méhode `Observable.publish` partage le flux de données
* à tous les consommateurs.
*/
public static void exercise3() {
Observable<Long> observable = HotStream.create();
Disposable disposable1 = observable.subscribe(
item -> log.info("[1] Next: {}", item),
error -> log.error("[1] Error: {}", error.getMessage()),
() -> log.info("[1] Completed")
);
Helpers.sleep(3, TimeUnit.SECONDS);
Disposable disposable2 = observable.subscribe(
item -> log.info("[2] Next: {}", item),
error -> log.error("[2] Error: {}", error.getMessage()),
() -> System.out.println("[2] Completed")
);
Helpers.sleep(2, TimeUnit.SECONDS);
disposable1.dispose();
Helpers.sleep(2, TimeUnit.SECONDS);
disposable2.dispose();
}
}
package fr.ippon.codingdojo.reactive.util;
import lombok.experimental.UtilityClass;
import java.util.concurrent.TimeUnit;
@UtilityClass
public class Helpers {
public static void sleep(long delay, TimeUnit timeUnit) {
try {
Thread.sleep(timeUnit.toMillis(delay));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
package fr.ippon.codingdojo.reactive.util;
import io.reactivex.Observable;
import lombok.experimental.UtilityClass;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@UtilityClass
public class HotStream {
public static Observable<Long> create() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
AtomicInteger subscribers = new AtomicInteger();
AtomicLong counter = new AtomicLong();
return Observable.<Long>create(subscriber -> executorService
.scheduleWithFixedDelay(() -> subscriber.onNext(counter.incrementAndGet()), 1, 1, TimeUnit.SECONDS))
.publish()
.autoConnect()
.doOnSubscribe(s -> subscribers.incrementAndGet())
.doOnDispose(() -> {
int size = subscribers.decrementAndGet();
if (size == 0) executorService.shutdown();
});
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment