diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 47feec298dc3d011fd910a0b092580a5d7ed2060..7dbfee88079b072a663a6815356bf1e71bf892b2 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -1,7 +1,40 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="ChangeListManager">
-    <list default="true" id="b73c7a62-507c-4776-b197-5964d0935e98" name="Default" comment="" />
+    <list default="true" id="b73c7a62-507c-4776-b197-5964d0935e98" name="Default" comment="final version">
+      <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" afterPath="$PROJECT_DIR$/.idea/workspace.xml" />
+      <change beforePath="$PROJECT_DIR$/README.md" afterPath="$PROJECT_DIR$/README.md" />
+      <change beforePath="$PROJECT_DIR$/pom.xml" afterPath="$PROJECT_DIR$/pom.xml" />
+      <change beforePath="$PROJECT_DIR$/spark.jfr" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/model/Person.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/model/Person.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java" afterPath="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java" />
+      <change beforePath="$PROJECT_DIR$/src/main/resources/config.properties" afterPath="$PROJECT_DIR$/src/main/resources/config.properties" />
+      <change beforePath="$PROJECT_DIR$/src/main/resources/log4j.xml" afterPath="$PROJECT_DIR$/src/main/resources/log4j.xml" />
+      <change beforePath="$PROJECT_DIR$/target/classes/config.properties" afterPath="$PROJECT_DIR$/target/classes/config.properties" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/AlimKafka$1.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/AlimKafka.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/AlimKafkaOptim.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaSpark.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$1.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$MyTimestampExtractor.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming.class" afterPath="" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/utils/Config.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/utils/Config.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class" afterPath="$PROJECT_DIR$/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class" />
+      <change beforePath="$PROJECT_DIR$/target/classes/log4j.xml" afterPath="$PROJECT_DIR$/target/classes/log4j.xml" />
+      <change beforePath="$PROJECT_DIR$/target/test-classes/LatencyTest.class" afterPath="" />
+    </list>
     <ignored path="spark-chess.iws" />
     <ignored path=".idea/workspace.xml" />
     <ignored path="$PROJECT_DIR$/target/" />
@@ -23,8 +56,8 @@
       <file leaf-file-name="KafkaContinuousSpark.java" pinned="false" current-in-tab="false">
         <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="288">
-              <caret line="33" column="34" lean-forward="false" selection-start-line="33" selection-start-column="34" selection-end-line="33" selection-end-column="34" />
+            <state relative-caret-position="364">
+              <caret line="89" column="24" lean-forward="false" selection-start-line="89" selection-start-column="24" selection-end-line="89" selection-end-column="24" />
               <folding>
                 <element signature="imports" expanded="true" />
               </folding>
@@ -32,11 +65,11 @@
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="AlimKafka.java" pinned="false" current-in-tab="false">
-        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java">
+      <file leaf-file-name="KafkaStructuredSpark.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="413">
-              <caret line="78" column="0" lean-forward="false" selection-start-line="78" selection-start-column="0" selection-end-line="78" selection-end-column="0" />
+            <state relative-caret-position="296">
+              <caret line="88" column="22" lean-forward="false" selection-start-line="88" selection-start-column="22" selection-end-line="88" selection-end-column="22" />
               <folding>
                 <element signature="imports" expanded="true" />
               </folding>
@@ -44,22 +77,26 @@
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="config.properties" pinned="false" current-in-tab="true">
-        <entry file="file://$PROJECT_DIR$/src/main/resources/config.properties">
+      <file leaf-file-name="UtilStats.java" pinned="false" current-in-tab="true">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="90">
-              <caret line="6" column="14" lean-forward="false" selection-start-line="6" selection-start-column="14" selection-end-line="6" selection-end-column="14" />
-              <folding />
+            <state relative-caret-position="435">
+              <caret line="29" column="35" lean-forward="false" selection-start-line="29" selection-start-column="35" selection-end-line="29" selection-end-column="35" />
+              <folding>
+                <element signature="imports" expanded="true" />
+              </folding>
             </state>
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="log4j.xml" pinned="false" current-in-tab="false">
-        <entry file="file://$PROJECT_DIR$/src/main/resources/log4j.xml">
+      <file leaf-file-name="AlimKafkaSync.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="60">
-              <caret line="4" column="0" lean-forward="false" selection-start-line="4" selection-start-column="0" selection-end-line="4" selection-end-column="0" />
-              <folding />
+            <state relative-caret-position="690">
+              <caret line="124" column="21" lean-forward="false" selection-start-line="124" selection-start-column="21" selection-end-line="124" selection-end-column="21" />
+              <folding>
+                <element signature="imports" expanded="true" />
+              </folding>
             </state>
           </provider>
         </entry>
@@ -67,18 +104,18 @@
       <file leaf-file-name="pom.xml" pinned="false" current-in-tab="false">
         <entry file="file://$PROJECT_DIR$/pom.xml">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="390">
-              <caret line="26" column="0" lean-forward="true" selection-start-line="26" selection-start-column="0" selection-end-line="26" selection-end-column="0" />
+            <state relative-caret-position="975">
+              <caret line="65" column="21" lean-forward="false" selection-start-line="65" selection-start-column="21" selection-end-line="65" selection-end-column="21" />
               <folding />
             </state>
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="KafkaSpark.java" pinned="false" current-in-tab="false">
-        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java">
+      <file leaf-file-name="AlimKafkaAsync.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="-225">
-              <caret line="39" column="73" lean-forward="false" selection-start-line="39" selection-start-column="73" selection-end-line="39" selection-end-column="73" />
+            <state relative-caret-position="394">
+              <caret line="123" column="21" lean-forward="false" selection-start-line="123" selection-start-column="21" selection-end-line="123" selection-end-column="21" />
               <folding>
                 <element signature="imports" expanded="true" />
               </folding>
@@ -86,21 +123,50 @@
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="UtilStats.java" pinned="false" current-in-tab="false">
-        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java">
+      <file leaf-file-name="Config.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="207">
-              <caret line="33" column="45" lean-forward="false" selection-start-line="33" selection-start-column="45" selection-end-line="33" selection-end-column="45" />
-              <folding />
+            <state relative-caret-position="394">
+              <caret line="34" column="53" lean-forward="false" selection-start-line="34" selection-start-column="53" selection-end-line="34" selection-end-column="53" />
+              <folding>
+                <element signature="imports" expanded="true" />
+                <element signature="e#1288#1289#0" expanded="true" />
+                <element signature="e#1343#1344#0" expanded="true" />
+                <element signature="e#1457#1458#0" expanded="true" />
+              </folding>
             </state>
           </provider>
         </entry>
       </file>
-      <file leaf-file-name="KafkaStructuredSpark.java" pinned="false" current-in-tab="false">
-        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java">
+      <file leaf-file-name="KafkaStream.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java">
           <provider selected="true" editor-type-id="text-editor">
-            <state relative-caret-position="1340">
-              <caret line="93" column="5" lean-forward="false" selection-start-line="93" selection-start-column="5" selection-end-line="93" selection-end-column="5" />
+            <state relative-caret-position="186">
+              <caret line="76" column="75" lean-forward="false" selection-start-line="76" selection-start-column="75" selection-end-line="76" selection-end-column="75" />
+              <folding>
+                <element signature="imports" expanded="true" />
+              </folding>
+            </state>
+          </provider>
+        </entry>
+      </file>
+      <file leaf-file-name="KafkaFlink.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java">
+          <provider selected="true" editor-type-id="text-editor">
+            <state relative-caret-position="193">
+              <caret line="30" column="12" lean-forward="false" selection-start-line="30" selection-start-column="12" selection-end-line="30" selection-end-column="12" />
+              <folding>
+                <element signature="imports" expanded="true" />
+              </folding>
+            </state>
+          </provider>
+        </entry>
+      </file>
+      <file leaf-file-name="KafkaSparkStreaming.java" pinned="false" current-in-tab="false">
+        <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java">
+          <provider selected="true" editor-type-id="text-editor">
+            <state relative-caret-position="300">
+              <caret line="72" column="5" lean-forward="false" selection-start-line="72" selection-start-column="5" selection-end-line="72" selection-end-column="5" />
               <folding>
                 <element signature="imports" expanded="true" />
               </folding>
@@ -125,7 +191,12 @@
       <find>UtilStats</find>
       <find>add</find>
       <find>bean</find>
+      <find>total</find>
+      <find>stat</find>
+      <find>producer-topic-metrics.record-send-rate</find>
       <find>Stats total</find>
+      <find>596</find>
+      <find>max=596</find>
     </findStrings>
   </component>
   <component name="Git.Settings">
@@ -139,29 +210,12 @@
   <component name="IdeDocumentHistory">
     <option name="CHANGED_PATHS">
       <list>
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/UtilStats.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/KafkaFlink.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/KafkaFlink.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/kafkaContinuousSpark.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/KafkaStreamsTest.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/kafkaStructuredSpark.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/AlimKafka.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTimestamp/kafkaSpark.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/UtilStats.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/PropertyFileResolver.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/ApplicationProperty.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/ApplicaitonPropertyProducer.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/Person.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/Config.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/KafkaMetrics.java" />
         <option value="$PROJECT_DIR$/chess.log" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/KafkaStreamsTest.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/kafkaSpark.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/KafkaFlink.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/kafkaStructuredSpark.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/kafkaContinuousSpark.java" />
-        <option value="$PROJECT_DIR$/pom.xml" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/StreamingConsumer/kafkaContinuousSpark.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/StreamingConsumer/kafkaSpark.java" />
         <option value="$PROJECT_DIR$/src/test/latencyTest.java" />
@@ -181,15 +235,32 @@
         <option value="$PROJECT_DIR$/src/main/resources/log4j.properties" />
         <option value="$PROJECT_DIR$/src/main/resources/-log4j.xml" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/StreamingConsumer/KafkaSpark.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreamingReceiver.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkReceiver.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/StreamWrapperActor.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/StreamKafkaActor.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaAsync.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/kafkacontinous.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaSync.java" />
+        <option value="$PROJECT_DIR$/pom.xml" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreams.java" />
+        <option value="$PROJECT_DIR$/kafka-bench.log" />
+        <option value="$PROJECT_DIR$/src/main/resources/log4j.xml" />
+        <option value="$PROJECT_DIR$/src/main/resources/config.properties" />
         <option value="$PROJECT_DIR$/README.md" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java" />
+        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java" />
         <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java" />
-        <option value="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java" />
-        <option value="$PROJECT_DIR$/src/main/resources/log4j.xml" />
-        <option value="$PROJECT_DIR$/src/main/resources/config.properties" />
       </list>
     </option>
   </component>
@@ -261,7 +332,9 @@
       <foldersAlwaysOnTop value="true" />
     </navigator>
     <panes>
+      <pane id="PackagesPane" />
       <pane id="Scope" />
+      <pane id="Scratches" />
       <pane id="ProjectPane">
         <subPane>
           <expand>
@@ -302,7 +375,7 @@
               <item name="main" type="462c0819:PsiDirectoryNode" />
               <item name="java" type="462c0819:PsiDirectoryNode" />
               <item name="kafkaLatency" type="462c0819:PsiDirectoryNode" />
-              <item name="streaming" type="462c0819:PsiDirectoryNode" />
+              <item name="producer" type="462c0819:PsiDirectoryNode" />
             </path>
             <path>
               <item name="kafka-bench" type="b2602c69:ProjectViewProjectNode" />
@@ -311,27 +384,28 @@
               <item name="main" type="462c0819:PsiDirectoryNode" />
               <item name="java" type="462c0819:PsiDirectoryNode" />
               <item name="kafkaLatency" type="462c0819:PsiDirectoryNode" />
-              <item name="utils" type="462c0819:PsiDirectoryNode" />
+              <item name="streaming" type="462c0819:PsiDirectoryNode" />
             </path>
             <path>
               <item name="kafka-bench" type="b2602c69:ProjectViewProjectNode" />
               <item name="Streaming-Kafka-Bench" type="462c0819:PsiDirectoryNode" />
               <item name="src" type="462c0819:PsiDirectoryNode" />
               <item name="main" type="462c0819:PsiDirectoryNode" />
-              <item name="resources" type="462c0819:PsiDirectoryNode" />
+              <item name="java" type="462c0819:PsiDirectoryNode" />
+              <item name="kafkaLatency" type="462c0819:PsiDirectoryNode" />
+              <item name="utils" type="462c0819:PsiDirectoryNode" />
             </path>
             <path>
               <item name="kafka-bench" type="b2602c69:ProjectViewProjectNode" />
               <item name="Streaming-Kafka-Bench" type="462c0819:PsiDirectoryNode" />
               <item name="src" type="462c0819:PsiDirectoryNode" />
-              <item name="test" type="462c0819:PsiDirectoryNode" />
+              <item name="main" type="462c0819:PsiDirectoryNode" />
+              <item name="resources" type="462c0819:PsiDirectoryNode" />
             </path>
           </expand>
           <select />
         </subPane>
       </pane>
-      <pane id="Scratches" />
-      <pane id="PackagesPane" />
     </panes>
   </component>
   <component name="PropertiesComponent">
@@ -346,31 +420,32 @@
     <property name="options.splitter.details.proportions" value="0.2" />
     <property name="options.searchVisible" value="true" />
     <property name="last_opened_file_path" value="$PROJECT_DIR$" />
-    <property name="settings.editor.selected.configurable" value="Debugger_Data_Views_Java" />
+    <property name="settings.editor.selected.configurable" value="preferences.sourceCode.Other File Types" />
     <property name="project.structure.last.edited" value="Project" />
     <property name="project.structure.proportion" value="0.15" />
     <property name="project.structure.side.proportion" value="0.2" />
+    <property name="run.code.analysis.last.selected.profile" value="pProject Default" />
   </component>
   <component name="RecentsManager">
-    <key name="MoveFile.RECENT_KEYS">
-      <recent name="$PROJECT_DIR$/src" />
-    </key>
     <key name="MoveClassesOrPackagesDialog.RECENTS_KEY">
       <recent name="fr.ippon.kafkaLatency.StreamingConsumer" />
     </key>
+    <key name="CopyFile.RECENT_KEYS">
+      <recent name="$PROJECT_DIR$/src/main" />
+      <recent name="$PROJECT_DIR$/src" />
+      <recent name="$PROJECT_DIR$/src/main/resources" />
+      <recent name="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties" />
+      <recent name="$PROJECT_DIR$/src/main/java" />
+    </key>
     <key name="CopyClassDialog.RECENTS_KEY">
+      <recent name="fr.ippon.kafkaLatency.streaming" />
       <recent name="fr.ippon.kafkaLatency" />
       <recent name="fr.ippon.kafkaLatency.StreamingConsumer" />
       <recent name="fr.ippon.kafkaTimestamp" />
       <recent name="kafka" />
-      <recent name="fr.ippon.kafka" />
     </key>
-    <key name="CopyFile.RECENT_KEYS">
-      <recent name="$PROJECT_DIR$/src/main" />
+    <key name="MoveFile.RECENT_KEYS">
       <recent name="$PROJECT_DIR$/src" />
-      <recent name="$PROJECT_DIR$/src/main/resources" />
-      <recent name="$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties" />
-      <recent name="$PROJECT_DIR$/src/main/java" />
     </key>
   </component>
   <component name="RunDashboard">
@@ -385,7 +460,7 @@
       </list>
     </option>
   </component>
-  <component name="RunManager" selected="JUnit.LatencyTest.sparkContinuousTest">
+  <component name="RunManager" selected="Application.KafkaContinuousSpark">
     <configuration default="true" type="Applet" factoryName="Applet">
       <option name="HTML_USED" value="false" />
       <option name="WIDTH" value="400" />
@@ -393,16 +468,36 @@
       <option name="POLICY_FILE" value="$APPLICATION_HOME_DIR$/bin/appletviewer.policy" />
       <module />
     </configuration>
-    <configuration name="AlimKafka" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+    <configuration name="AlimKafkaAsync" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
+        <pattern>
+          <option name="PATTERN" value="fr.ippon.kafkaLatency.producer.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <extension name="recording" autostart="false" />
+      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.producer.AlimKafkaAsync" />
+      <option name="VM_PARAMETERS" />
+      <option name="PROGRAM_PARAMETERS" />
+      <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
+      <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
+      <option name="ALTERNATIVE_JRE_PATH" />
+      <option name="ENABLE_SWING_INSPECTOR" value="false" />
+      <option name="ENV_VARIABLES" />
+      <option name="PASS_PARENT_ENVS" value="true" />
+      <module name="kafka-bench" />
+      <envs />
+    </configuration>
+    <configuration name="AlimKafkaSync" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
         <pattern>
-          <option name="PATTERN" value="fr.ippon.kafkaLatency.*" />
+          <option name="PATTERN" value="fr.ippon.kafkaLatency.producer.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
       <extension name="recording" autostart="false" />
-      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.AlimKafka" />
-      <option name="VM_PARAMETERS" value="-XX:+UseG1GC" />
+      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.producer.AlimKafkaSync" />
+      <option name="VM_PARAMETERS" value="" />
       <option name="PROGRAM_PARAMETERS" value="" />
       <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
       <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
@@ -413,15 +508,15 @@
       <module name="kafka-bench" />
       <envs />
     </configuration>
-    <configuration name="AlimKafkaOptim" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+    <configuration name="KafkaContinuousSpark" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
         <pattern>
-          <option name="PATTERN" value="fr.ippon.kafkaLatency.*" />
+          <option name="PATTERN" value="fr.ippon.kafkaLatency.streaming.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
       <extension name="recording" autostart="false" />
-      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.AlimKafkaOptim" />
+      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.streaming.KafkaContinuousSpark" />
       <option name="VM_PARAMETERS" />
       <option name="PROGRAM_PARAMETERS" />
       <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
@@ -433,7 +528,7 @@
       <module name="kafka-bench" />
       <envs />
     </configuration>
-    <configuration name="KafkaSpark" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+    <configuration name="KafkaFlink" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
         <pattern>
           <option name="PATTERN" value="fr.ippon.kafkaLatency.streaming.*" />
@@ -441,10 +536,10 @@
         </pattern>
       </extension>
       <extension name="recording" autostart="false" />
-      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.streaming.KafkaSpark" />
-      <option name="VM_PARAMETERS" value="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" />
-      <option name="PROGRAM_PARAMETERS" value="" />
-      <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
+      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.streaming.KafkaFlink" />
+      <option name="VM_PARAMETERS" />
+      <option name="PROGRAM_PARAMETERS" />
+      <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
       <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
       <option name="ALTERNATIVE_JRE_PATH" />
       <option name="ENABLE_SWING_INSPECTOR" value="false" />
@@ -453,7 +548,7 @@
       <module name="kafka-bench" />
       <envs />
     </configuration>
-    <configuration name="KafkaStructuredSpark" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+    <configuration name="KafkaStream" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
         <pattern>
           <option name="PATTERN" value="fr.ippon.kafkaLatency.streaming.*" />
@@ -461,7 +556,7 @@
         </pattern>
       </extension>
       <extension name="recording" autostart="false" />
-      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.streaming.KafkaStructuredSpark" />
+      <option name="MAIN_CLASS_NAME" value="fr.ippon.kafkaLatency.streaming.KafkaStream" />
       <option name="VM_PARAMETERS" />
       <option name="PROGRAM_PARAMETERS" />
       <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
@@ -488,27 +583,6 @@
       <module name="" />
       <envs />
     </configuration>
-    <configuration name="LatencyTest.sparkContinuousTest" type="JUnit" factoryName="JUnit" temporary="true" nameIsGenerated="true">
-      <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
-      <extension name="recording" autostart="true" />
-      <module name="kafka-bench" />
-      <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
-      <option name="ALTERNATIVE_JRE_PATH" />
-      <option name="PACKAGE_NAME" value="" />
-      <option name="MAIN_CLASS_NAME" value="LatencyTest" />
-      <option name="METHOD_NAME" value="sparkContinuousTest" />
-      <option name="TEST_OBJECT" value="method" />
-      <option name="VM_PARAMETERS" value="-ea" />
-      <option name="PARAMETERS" />
-      <option name="WORKING_DIRECTORY" value="$MODULE_DIR$" />
-      <option name="ENV_VARIABLES" />
-      <option name="PASS_PARENT_ENVS" value="true" />
-      <option name="TEST_SEARCH_SCOPE">
-        <value defaultName="singleModule" />
-      </option>
-      <envs />
-      <patterns />
-    </configuration>
     <configuration default="true" type="JUnit" factoryName="JUnit">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
       <extension name="recording" autostart="true" />
@@ -791,19 +865,18 @@
     </configuration>
     <list size="6">
       <item index="0" class="java.lang.String" itemvalue="JUnit.fr.ippon.kafkaLatency.streaming in kafka-bench" />
-      <item index="1" class="java.lang.String" itemvalue="Application.KafkaSpark" />
-      <item index="2" class="java.lang.String" itemvalue="Application.AlimKafka" />
-      <item index="3" class="java.lang.String" itemvalue="Application.KafkaStructuredSpark" />
-      <item index="4" class="java.lang.String" itemvalue="Application.AlimKafkaOptim" />
-      <item index="5" class="java.lang.String" itemvalue="JUnit.LatencyTest.sparkContinuousTest" />
+      <item index="1" class="java.lang.String" itemvalue="Application.KafkaStream" />
+      <item index="2" class="java.lang.String" itemvalue="Application.AlimKafkaSync" />
+      <item index="3" class="java.lang.String" itemvalue="Application.KafkaFlink" />
+      <item index="4" class="java.lang.String" itemvalue="Application.AlimKafkaAsync" />
+      <item index="5" class="java.lang.String" itemvalue="Application.KafkaContinuousSpark" />
     </list>
     <recent_temporary>
-      <list size="5">
-        <item index="0" class="java.lang.String" itemvalue="JUnit.LatencyTest.sparkContinuousTest" />
-        <item index="1" class="java.lang.String" itemvalue="Application.AlimKafka" />
-        <item index="2" class="java.lang.String" itemvalue="Application.KafkaSpark" />
-        <item index="3" class="java.lang.String" itemvalue="Application.AlimKafkaOptim" />
-        <item index="4" class="java.lang.String" itemvalue="Application.KafkaStructuredSpark" />
+      <list size="4">
+        <item index="0" class="java.lang.String" itemvalue="Application.AlimKafkaAsync" />
+        <item index="1" class="java.lang.String" itemvalue="Application.KafkaContinuousSpark" />
+        <item index="2" class="java.lang.String" itemvalue="Application.KafkaFlink" />
+        <item index="3" class="java.lang.String" itemvalue="Application.AlimKafkaSync" />
       </list>
     </recent_temporary>
   </component>
@@ -826,6 +899,14 @@
       <option name="presentableId" value="Default" />
       <updated>1482932185202</updated>
     </task>
+    <task id="LOCAL-00001" summary="Update formating">
+      <created>1522422883816</created>
+      <option name="number" value="00001" />
+      <option name="presentableId" value="LOCAL-00001" />
+      <option name="project" value="LOCAL" />
+      <updated>1522422883816</updated>
+    </task>
+    <option name="localTasksCounter" value="2" />
     <servers />
   </component>
   <component name="TestHistory">
@@ -878,11 +959,13 @@
       <window_info id="Palette&#9;" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
       <window_info id="Event Log" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.25856355" sideWeight="0.36324787" order="7" side_tool="true" content_ui="tabs" />
       <window_info id="Maven Projects" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.47557998" sideWeight="0.5" order="9" side_tool="false" content_ui="tabs" />
-      <window_info id="Run" active="true" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" show_stripe_button="true" weight="0.25856355" sideWeight="0.6367521" order="2" side_tool="false" content_ui="tabs" />
-      <window_info id="Version Control" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32975295" sideWeight="0.5" order="8" side_tool="false" content_ui="tabs" />
+      <window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.25856355" sideWeight="0.6367521" order="2" side_tool="false" content_ui="tabs" />
+      <window_info id="Version Control" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32928178" sideWeight="0.5" order="8" side_tool="false" content_ui="tabs" />
       <window_info id="Terminal" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32975295" sideWeight="0.4859585" order="9" side_tool="false" content_ui="tabs" />
       <window_info id="Designer" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
-      <window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" show_stripe_button="true" weight="0.21001221" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
+      <window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" show_stripe_button="true" weight="0.21245421" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
+      <window_info id="Inspection Results" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" show_stripe_button="true" weight="0.3281768" sideWeight="0.49761903" order="11" side_tool="false" content_ui="tabs" />
+      <window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.3281768" sideWeight="0.495116" order="1" side_tool="false" content_ui="tabs" />
       <window_info id="Structure" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.25" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
       <window_info id="Ant Build" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.24969475" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
       <window_info id="UI Designer" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
@@ -897,11 +980,9 @@
       <window_info id="Dependency Viewer" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32928178" sideWeight="0.5" order="11" side_tool="false" content_ui="tabs" />
       <window_info id="Capture Tool" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="4" side_tool="false" content_ui="tabs" />
       <window_info id="Hierarchy" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.25" sideWeight="0.5" order="2" side_tool="false" content_ui="combo" />
-      <window_info id="Inspection Results" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32914045" sideWeight="0.49761903" order="11" side_tool="false" content_ui="tabs" />
       <window_info id="Image Layers" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
       <window_info id="Capture Analysis" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
       <window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.4" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
-      <window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.3281768" sideWeight="0.495116" order="1" side_tool="false" content_ui="tabs" />
       <window_info id="Theme Preview" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="8" side_tool="false" content_ui="tabs" />
     </layout>
     <layout-to-restore>
@@ -974,6 +1055,9 @@
     <ignored-roots>
       <path value="$PROJECT_DIR$" />
     </ignored-roots>
+    <MESSAGE value="Update formating" />
+    <MESSAGE value="final version" />
+    <option name="LAST_COMMIT_MESSAGE" value="final version" />
   </component>
   <component name="XDebuggerManager">
     <breakpoint-manager>
@@ -996,12 +1080,6 @@
           <properties />
           <option name="timeStamp" value="103" />
         </line-breakpoint>
-        <line-breakpoint enabled="true" type="java-line">
-          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java</url>
-          <line>43</line>
-          <properties />
-          <option name="timeStamp" value="144" />
-        </line-breakpoint>
         <line-breakpoint enabled="true" type="java-line">
           <url>file://$PROJECT_DIR$/src/test/LatencyTest.java</url>
           <line>64</line>
@@ -1009,22 +1087,16 @@
           <option name="timeStamp" value="150" />
         </line-breakpoint>
         <line-breakpoint enabled="true" type="java-line">
-          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java</url>
-          <line>66</line>
+          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java</url>
+          <line>47</line>
           <properties />
-          <option name="timeStamp" value="161" />
+          <option name="timeStamp" value="179" />
         </line-breakpoint>
         <line-breakpoint enabled="true" type="java-line">
-          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java</url>
-          <line>62</line>
+          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java</url>
+          <line>75</line>
           <properties />
-          <option name="timeStamp" value="162" />
-        </line-breakpoint>
-        <line-breakpoint enabled="true" type="java-line">
-          <url>file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java</url>
-          <line>63</line>
-          <properties />
-          <option name="timeStamp" value="163" />
+          <option name="timeStamp" value="182" />
         </line-breakpoint>
         <breakpoint enabled="true" type="java-exception">
           <properties class="java.lang.NullPointerException" package="java.lang" />
@@ -1034,7 +1106,7 @@
       <breakpoints-dialog>
         <breakpoints-dialog />
       </breakpoints-dialog>
-      <option name="time" value="164" />
+      <option name="time" value="186" />
     </breakpoint-manager>
     <watches-manager />
   </component>
@@ -1064,48 +1136,6 @@
     </expressions>
   </component>
   <component name="editorHistoryManager">
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/KafkaFlink.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/KafkaMetrics.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/kafkaSpark.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/KafkaStreamsTest.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaTime/kafkaStructuredSpark.java" />
-    <entry file="file://$PROJECT_DIR$/chess.log.2018-03-15" />
-    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/flink/flink-core/1.4.2/flink-core-1.4.2.jar!/org/apache/flink/api/common/functions/MapFunction.class">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="120">
-          <caret line="12" column="6" lean-forward="false" selection-start-line="12" selection-start-column="6" selection-end-line="12" selection-end-column="6" />
-        </state>
-      </provider>
-    </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/PropertyFileResolver.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/ApplicationProperty.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/properties/ApplicaitonPropertyProducer.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/JsonPOJOSerde.java" />
-    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-configuration2/2.2/commons-configuration2-2.2.jar!/org/apache/commons/configuration2/builder/fluent/Parameters.class">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="187">
-          <caret line="91" column="0" lean-forward="false" selection-start-line="91" selection-start-column="0" selection-end-line="91" selection-end-column="0" />
-        </state>
-      </provider>
-    </entry>
-    <entry file="jar://$MAVEN_REPOSITORY$/commons-beanutils/commons-beanutils/1.9.3/commons-beanutils-1.9.3.jar!/org/apache/commons/beanutils/PropertyUtilsBean.class">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="296">
-          <caret line="26" column="30" lean-forward="false" selection-start-line="26" selection-start-column="13" selection-end-line="26" selection-end-column="30" />
-        </state>
-      </provider>
-    </entry>
-    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-configuration2/2.1/commons-configuration2-2.1.jar!/org/apache/commons/configuration2/builder/BasicConfigurationBuilder.class">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="214">
-          <caret line="100" column="0" lean-forward="false" selection-start-line="100" selection-start-column="0" selection-end-line="100" selection-end-column="0" />
-        </state>
-      </provider>
-    </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/KafkaMetrics.java" />
-    <entry file="file://$PROJECT_DIR$/chess.log" />
-    <entry file="file://$PROJECT_DIR$/test.json" />
-    <entry file="file://$PROJECT_DIR$/src/main/resources/persons.json" />
     <entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-streams/0.11.0.2/kafka-streams-0.11.0.2.jar!/org/apache/kafka/streams/processor/TimestampExtractor.class">
       <provider selected="true" editor-type-id="text-editor">
         <state relative-caret-position="120">
@@ -1130,25 +1160,6 @@
     </entry>
     <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaMain.java" />
     <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/StreamingConsumer/KafkaContinuousSparkMain.java" />
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="-225">
-          <caret line="26" column="19" lean-forward="false" selection-start-line="26" selection-start-column="19" selection-end-line="26" selection-end-column="19" />
-          <folding>
-            <element signature="imports" expanded="false" />
-            <element signature="e#1873#1874#0" expanded="false" />
-            <element signature="e#1920#1921#0" expanded="false" />
-          </folding>
-        </state>
-      </provider>
-    </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/resources/listPersons.json">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="0">
-          <caret line="0" column="0" lean-forward="false" selection-start-line="0" selection-start-column="0" selection-end-line="0" selection-end-column="0" />
-        </state>
-      </provider>
-    </entry>
     <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/model/Person.java">
       <provider selected="true" editor-type-id="text-editor">
         <state relative-caret-position="180">
@@ -1171,16 +1182,6 @@
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java">
-      <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="1012">
-          <caret line="76" column="87" lean-forward="true" selection-start-line="76" selection-start-column="87" selection-end-line="76" selection-end-column="87" />
-          <folding>
-            <element signature="imports" expanded="true" />
-          </folding>
-        </state>
-      </provider>
-    </entry>
     <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/StreamingConsumer/KafkaSparkTuning.java" />
     <entry file="file://$PROJECT_DIR$/src/log4j.xml" />
     <entry file="file://$PROJECT_DIR$/src/main/log4j.xml" />
@@ -1200,30 +1201,139 @@
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java">
+    <entry file="file://$PROJECT_DIR$/src/test/LatencyTest.java" />
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreamingReceiver.java" />
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-core_2.11/2.3.0/spark-core_2.11-2.3.0.jar!/org/apache/spark/api/java/function/VoidFunction.class">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="240">
-          <caret line="22" column="0" lean-forward="false" selection-start-line="22" selection-start-column="0" selection-end-line="22" selection-end-column="0" />
-          <folding>
-            <element signature="imports" expanded="false" />
-          </folding>
+        <state relative-caret-position="120">
+          <caret line="11" column="9" lean-forward="false" selection-start-line="11" selection-start-column="9" selection-end-line="11" selection-end-column="9" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/dibbhatt/kafka-spark-consumer/1.0.14/kafka-spark-consumer-1.0.14.jar!/consumer/kafka/ReceiverLauncher.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="287">
+          <caret line="43" column="52" lean-forward="false" selection-start-line="43" selection-start-column="34" selection-end-line="43" selection-end-column="52" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkReceiver.java" />
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="21" lean-forward="false" selection-start-line="1" selection-start-column="21" selection-end-line="1" selection-end-column="21" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="24" lean-forward="false" selection-start-line="1" selection-start-column="24" selection-end-line="1" selection-end-column="24" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming_2.11/2.3.0/spark-streaming_2.11-2.3.0.jar!/org/apache/spark/streaming/package.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="15" lean-forward="false" selection-start-line="1" selection-start-column="15" selection-end-line="1" selection-end-column="15" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/KafkaRDD.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="21" lean-forward="false" selection-start-line="1" selection-start-column="21" selection-end-line="1" selection-end-column="21" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/KafkaRDDPartition.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="24" lean-forward="false" selection-start-line="1" selection-start-column="24" selection-end-line="1" selection-end-column="24" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/package.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="15" lean-forward="false" selection-start-line="1" selection-start-column="15" selection-end-line="1" selection-end-column="15" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-streaming-kafka-0-10_2.11/2.3.0/spark-streaming-kafka-0-10_2.11-2.3.0.jar!/org/apache/spark/streaming/kafka010/KafkaUtils.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="30">
+          <caret line="2" column="7" lean-forward="false" selection-start-line="2" selection-start-column="7" selection-end-line="2" selection-end-column="7" />
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java">
+    <entry file="jar://$MAVEN_REPOSITORY$/dibbhatt/kafka-spark-consumer/1.0.14/kafka-spark-consumer-1.0.14.jar!/consumer/kafka/MessageAndMetadata.class">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="735">
-          <caret line="49" column="51" lean-forward="false" selection-start-line="49" selection-start-column="51" selection-end-line="49" selection-end-column="51" />
+        <state relative-caret-position="90">
+          <caret line="9" column="13" lean-forward="false" selection-start-line="9" selection-start-column="13" selection-end-line="9" selection-end-column="13" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.6.7.1/jackson-databind-2.6.7.1.jar!/com/fasterxml/jackson/databind/node/ObjectNode.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="196">
+          <caret line="185" column="0" lean-forward="false" selection-start-line="185" selection-start-column="0" selection-end-line="185" selection-end-column="0" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/com/typesafe/akka/akka-actor_2.11/2.5.9/akka-actor_2.11-2.5.9.jar!/akka/japi/function/Procedure.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="60">
+          <caret line="4" column="6" lean-forward="false" selection-start-line="4" selection-start-column="6" selection-end-line="4" selection-end-column="6" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="jar://$MAVEN_REPOSITORY$/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar!/scala/collection/convert/Wrappers.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="214">
+          <caret line="41" column="0" lean-forward="false" selection-start-line="41" selection-start-column="0" selection-end-line="41" selection-end-column="0" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/StreamKafkaActor.java" />
+    <entry file="file://$PROJECT_DIR$/src/main/resources/listPersons.json">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="15">
+          <caret line="1" column="5" lean-forward="false" selection-start-line="1" selection-start-column="1" selection-end-line="1" selection-end-column="5" />
+          <folding />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/kafkacontinous.java" />
+    <entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-core_2.11/2.3.0/spark-core_2.11-2.3.0.jar!/org/apache/spark/api/java/function/MapFunction.class">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="120">
+          <caret line="11" column="6" lean-forward="false" selection-start-line="11" selection-start-column="6" selection-end-line="11" selection-end-column="6" />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/kafka-bench.log">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="-90217">
+          <caret line="1670" column="149" lean-forward="false" selection-start-line="1670" selection-start-column="142" selection-end-line="1670" selection-end-column="149" />
+          <folding />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/MetricsProducerReporter.java">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="630">
+          <caret line="44" column="26" lean-forward="false" selection-start-line="44" selection-start-column="18" selection-end-line="44" selection-end-column="26" />
           <folding>
-            <element signature="imports" expanded="false" />
+            <element signature="imports" expanded="true" />
           </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/MetricsProducerReporter.java">
+    <entry file="file://$PROJECT_DIR$/src/main/resources/log4j.xml">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="105">
-          <caret line="17" column="36" lean-forward="false" selection-start-line="17" selection-start-column="13" selection-end-line="17" selection-end-column="36" />
+        <state relative-caret-position="255">
+          <caret line="17" column="45" lean-forward="false" selection-start-line="17" selection-start-column="45" selection-end-line="17" selection-end-column="45" />
           <folding />
         </state>
       </provider>
@@ -1231,8 +1341,8 @@
     <entry file="file://$PROJECT_DIR$/README.md">
       <provider selected="true" editor-type-id="split-provider[text-editor;markdown-preview-editor]">
         <state split_layout="SPLIT">
-          <first_editor relative-caret-position="690">
-            <caret line="46" column="6" lean-forward="false" selection-start-line="46" selection-start-column="6" selection-end-line="46" selection-end-column="6" />
+          <first_editor relative-caret-position="720">
+            <caret line="48" column="22" lean-forward="true" selection-start-line="48" selection-start-column="22" selection-end-line="48" selection-end-column="22" />
             <folding />
           </first_editor>
           <second_editor />
@@ -1245,91 +1355,112 @@
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java">
+    <entry file="file://$PROJECT_DIR$/src/main/resources/config.properties">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="288">
-          <caret line="33" column="34" lean-forward="false" selection-start-line="33" selection-start-column="34" selection-end-line="33" selection-end-column="34" />
-          <folding>
-            <element signature="imports" expanded="true" />
-          </folding>
+        <state relative-caret-position="15">
+          <caret line="1" column="32" lean-forward="false" selection-start-line="1" selection-start-column="32" selection-end-line="1" selection-end-column="32" />
+          <folding />
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java">
+    <entry file="file://$PROJECT_DIR$/pom.xml">
+      <provider selected="true" editor-type-id="text-editor">
+        <state relative-caret-position="975">
+          <caret line="65" column="21" lean-forward="false" selection-start-line="65" selection-start-column="21" selection-end-line="65" selection-end-column="21" />
+          <folding />
+        </state>
+      </provider>
+    </entry>
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/Config.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="1340">
-          <caret line="93" column="5" lean-forward="false" selection-start-line="93" selection-start-column="5" selection-end-line="93" selection-end-column="5" />
+        <state relative-caret-position="394">
+          <caret line="34" column="53" lean-forward="false" selection-start-line="34" selection-start-column="53" selection-end-line="34" selection-end-column="53" />
           <folding>
             <element signature="imports" expanded="true" />
+            <element signature="e#1288#1289#0" expanded="true" />
+            <element signature="e#1343#1344#0" expanded="true" />
+            <element signature="e#1457#1458#0" expanded="true" />
           </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="-225">
-          <caret line="39" column="73" lean-forward="false" selection-start-line="39" selection-start-column="73" selection-end-line="39" selection-end-column="73" />
+        <state relative-caret-position="186">
+          <caret line="76" column="75" lean-forward="false" selection-start-line="76" selection-start-column="75" selection-end-line="76" selection-end-column="75" />
           <folding>
             <element signature="imports" expanded="true" />
           </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="207">
-          <caret line="33" column="45" lean-forward="false" selection-start-line="33" selection-start-column="45" selection-end-line="33" selection-end-column="45" />
-          <folding />
+        <state relative-caret-position="193">
+          <caret line="30" column="12" lean-forward="false" selection-start-line="30" selection-start-column="12" selection-end-line="30" selection-end-column="12" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="413">
-          <caret line="78" column="0" lean-forward="false" selection-start-line="78" selection-start-column="0" selection-end-line="78" selection-end-column="0" />
+        <state relative-caret-position="364">
+          <caret line="89" column="24" lean-forward="false" selection-start-line="89" selection-start-column="24" selection-end-line="89" selection-end-column="24" />
           <folding>
             <element signature="imports" expanded="true" />
           </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/resources/log4j.xml">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="60">
-          <caret line="4" column="0" lean-forward="false" selection-start-line="4" selection-start-column="0" selection-end-line="4" selection-end-column="0" />
-          <folding />
+        <state relative-caret-position="296">
+          <caret line="88" column="22" lean-forward="false" selection-start-line="88" selection-start-column="22" selection-end-line="88" selection-end-column="22" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/kafka-bench.log">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="-26569">
-          <caret line="1938" column="42" lean-forward="false" selection-start-line="1938" selection-start-column="32" selection-end-line="1938" selection-end-column="42" />
-          <folding />
+        <state relative-caret-position="394">
+          <caret line="123" column="21" lean-forward="false" selection-start-line="123" selection-start-column="21" selection-end-line="123" selection-end-column="21" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/test/LatencyTest.java">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="246">
-          <caret line="25" column="0" lean-forward="true" selection-start-line="25" selection-start-column="0" selection-end-line="25" selection-end-column="0" />
-          <folding />
+        <state relative-caret-position="690">
+          <caret line="124" column="21" lean-forward="false" selection-start-line="124" selection-start-column="21" selection-end-line="124" selection-end-column="21" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/pom.xml">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="390">
-          <caret line="26" column="0" lean-forward="true" selection-start-line="26" selection-start-column="0" selection-end-line="26" selection-end-column="0" />
-          <folding />
+        <state relative-caret-position="300">
+          <caret line="72" column="5" lean-forward="false" selection-start-line="72" selection-start-column="5" selection-end-line="72" selection-end-column="5" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
-    <entry file="file://$PROJECT_DIR$/src/main/resources/config.properties">
+    <entry file="file://$PROJECT_DIR$/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java">
       <provider selected="true" editor-type-id="text-editor">
-        <state relative-caret-position="90">
-          <caret line="6" column="14" lean-forward="false" selection-start-line="6" selection-start-column="14" selection-end-line="6" selection-end-column="14" />
-          <folding />
+        <state relative-caret-position="435">
+          <caret line="29" column="35" lean-forward="false" selection-start-line="29" selection-start-column="35" selection-end-line="29" selection-end-column="35" />
+          <folding>
+            <element signature="imports" expanded="true" />
+          </folding>
         </state>
       </provider>
     </entry>
diff --git a/README.md b/README.md
index c1e4943a1a4e8dcadccd1741197e47e046d6bc2b..d36835f691e89747a545ea10a64c409ca0bc90ec 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
 # Experiment with Apache Kafka and streaming framework
 
-Mesure latency between record insertion in Kafka and Streaming processing using Kafka automated timestamp of messages.
+Measure latency between record insertion in Kafka and Streaming processing using automated timestamping of Kafka messages.
 
 ## Frameworks tested
 - Kafka Stream
@@ -14,23 +14,25 @@ Mesure latency between record insertion in Kafka and Streaming processing using
 
 ## Sources
 ### Producer
-- fr.ippon.kafkaLatency.AlimKafka : Kafka Producer (100 000 messages)
+- fr.ippon.kafkaLatency.producer.AlimKafkaSync : Kafka Producer (100 000 messages) : sync API
+- fr.ippon.kafkaLatency.producer.AlimKafkaAsync : Kafka Producer (100 000 messages) : Async API
 
 ### Consummer
-- fr.ippon.kafkaLatency.streaming.KafkaSpark : Spark streaming consummer
+- fr.ippon.kafkaLatency.streaming.KafkaSparkStreaming : Spark streaming consummer
 - fr.ippon.kafkaLatency.streaming.KafkaStructuredSpark : Spark Structured streaming consummer
 - fr.ippon.kafkaLatency.streaming.KafkaContinuousSpark : Spark Continous processing consummer
-- fr.ippon.kafkaLatency.streaming.KafkaStreaming : Kafka Streams consummer
+- fr.ippon.kafkaLatency.streaming.KafkaStream : Kafka Streams consummer
 - fr.ippon.kafkaLatency.streaming.KafkaFlink : Flink consummer
 
 ### Utils
 - fr.ippon.kafkaLatency.utils.UtilStats : Streaming statistics
 - fr.ippon.kafkaLatency.utils.MetricsProducerReporter : Kafka Producer Metrics
+- fr.ippon.kafkaLatency.utils.Config : Helper to fetch properties
 
 ## Resources
 ### config.properties :
  - Kafka
-    - topic : Kafka topic name to use
+    - topic : Kafka topic to use
     - numPartitions : Number of partitions for topic
     - replication : Replication Factor for topic
     - bootstrap.servers: List of Kafka servers
@@ -38,10 +40,10 @@ Mesure latency between record insertion in Kafka and Streaming processing using
  - Streaming
     - stats.step : Number of message to group before calculate stats 
     - stats.total : Total number of messages
-    - max.poll.records : Max records to fetch in Kafka
+    - max.poll.records : Max records to fetch in Kafka in a micro-batch
     - spark.continuous.interval : Trigger interval in spark continous processing
     - spark.structured.interval : Trigger interval in spark structured streaming
     - spark.streaming.interval : Interval of micro batch
 
  - data source
-    - listPersonns.json
\ No newline at end of file
+    - listPersons.json
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1fceb8778ec9c7e3f9034f0eade2d3d0fb969afe..267cf573228096656c5c586f0a2aad5dd930ce1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,16 +15,6 @@
 		<junit.version>4.12</junit.version>
 	</properties>
 
-    <repositories>
-        <repository>
-            <id>apache.snapshots</id>
-            <name>Apache Development Snapshot Repository</name>
-            <url>https://repository.apache.org/content/repositories/snapshots/</url>
-            <releases><enabled>false</enabled></releases>
-            <snapshots><enabled>true</enabled></snapshots>
-        </repository>
-    </repositories>
-
 	<build>
 
 		<pluginManagement>
diff --git a/src/main/java/fr/ippon/kafkaLatency/model/Person.java b/src/main/java/fr/ippon/kafkaLatency/model/Person.java
index 6010811d103d70fe8b2dd73690c89ed08b9a97b9..9fb9b85ec4016016fc573fc49063765e1898755e 100644
--- a/src/main/java/fr/ippon/kafkaLatency/model/Person.java
+++ b/src/main/java/fr/ippon/kafkaLatency/model/Person.java
@@ -10,33 +10,35 @@ import java.util.List;
  */
 public class Person {
 
-    Integer id;
+    private Integer id;
 
-    Boolean isActive;
+    private Boolean isActive;
 
-    Integer age;
+    private Integer age;
 
-    String name;
+    private String name;
 
-    String gender;
+    private String gender;
 
-    String company;
+    private String company;
 
-    String email;
+    private String email;
 
-    String phone;
+    private String phone;
 
-    String address;
+    private String address;
 
-    String placeOfBirth;
+    private String placeOfBirth;
 
     @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "YYYY-MM-dd", timezone = "CET")
+    private
     Date dateOfBirth;
 
     @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "YYYY-MM-dd'T'hh:mm:ss", timezone = "CET")
+    private
     Date registered;
 
-    List<Friend> friends;
+    private List<Friend> friends;
 
     public Person() {
     }
@@ -161,7 +163,7 @@ public class Person {
         this.friends = friends;
     }
 
-    public static class Friend {
+    static class Friend {
         Integer id;
 
         public Friend() {
diff --git a/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java b/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java
similarity index 76%
rename from src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java
rename to src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java
index 27909b513d140fccd0b476af08a9294dc274d0ca..53c96bdb9c7b03402f7f65e14295b34034e80f9f 100644
--- a/src/main/java/fr/ippon/kafkaLatency/AlimKafkaOptim.java
+++ b/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaAsync.java
@@ -1,123 +1,128 @@
-package fr.ippon.kafkaLatency;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import fr.ippon.kafkaLatency.utils.Config;
-import fr.ippon.kafkaLatency.utils.MetricsProducerReporter;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.internals.Topic;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.log4j.Logger;
-
-import java.beans.IntrospectionException;
-import java.io.File;
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.temporal.ChronoUnit;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-/**
- * IPPON 2018.
- */
-public class AlimKafkaOptim {
-
-    private static ClassLoader classLoader = AlimKafkaOptim.class.getClassLoader();
-
-    private final static Logger logger = Logger.getLogger(AlimKafkaOptim.class);
-
-    public static void main(String[] args)  throws InterruptedException, IntrospectionException {
-
-        StringBuilder stats = new StringBuilder();
-
-        Properties props = new Properties();
-
-        Producer<String, JsonNode> producer = null;
-
-        String brokers = Config.getProperty("bootstrap.servers");
-        String topic = Config.getProperty("topic");
-        String numPartitions = Config.getProperty("numPartitions");
-        String replication = Config.getProperty("replication");
-        int total = Integer.valueOf(Config.getProperty("stats.total"));
-
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-
-        //Create topic if not exist
-        AdminClient adminClient = AdminClient.create(props);
-
-        NewTopic newTopic = new NewTopic(topic, Integer.valueOf(numPartitions), Integer.valueOf(replication).shortValue());
-        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
-
-        try {
-            createTopicsResult.all().get();
-            // real failure cause is wrapped inside the raised ExecutionException
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof TopicExistsException) {
-                logger.info("Topic already exists (delete manually to change settings) !!");
-            } else if (e.getCause() instanceof TimeoutException) {
-                logger.error("Timeout !!");
-            } else {
-                logger.error("Error !!" + e.getCause());
-            }
-        } finally {
-            adminClient.close();
-        }
-
-        try {
-
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    StringSerializer.class.getName());
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    JsonSerializer.class.getName());
-
-            ObjectMapper mapper = new ObjectMapper();
-
-            producer = new KafkaProducer<String, JsonNode>(props);
-            File file = new File(classLoader.getResource("listPersons.json").getFile());
-
-            long i = 0;
-
-            JsonNode allNodes = mapper.readTree(file);
-
-            while (i < total) {
-                Iterator<JsonNode> it = allNodes.iterator();
-                LocalDateTime start = LocalDateTime.now();
-
-                Producer<String, JsonNode> finalProducer = producer;
-
-                while (it.hasNext()) {
-                    finalProducer.send(new ProducerRecord<String, JsonNode>(topic, it.next()));
-                    i++;
-                }
-
-                LocalDateTime end = LocalDateTime.now();
-
-                long millis = ChronoUnit.MILLIS.between(start, end);
-
-                stats.append(allNodes.size()).append(" messages inserted in ").append(millis).append(" ms.").append(System.lineSeparator());
-            }
-        } catch (IOException e) {
-            logger.error("Error IO" + e.getMessage());
-        } finally {
-            Thread.sleep(3000);
-            logger.info(stats);
-            MetricsProducerReporter metrics = new MetricsProducerReporter(producer);
-            metrics.displayMetrics();
-            producer.close();
-        }
-    }
-
-}
+package fr.ippon.kafkaLatency.producer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import fr.ippon.kafkaLatency.utils.Config;
+import fr.ippon.kafkaLatency.utils.MetricsProducerReporter;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * IPPON 2018.
+ */
+public class AlimKafkaAsync {
+
+    private static final ClassLoader classLoader = AlimKafkaAsync.class.getClassLoader();
+
+    private final static Logger logger = Logger.getLogger(AlimKafkaAsync.class);
+
+    public static void main(String[] args) throws InterruptedException {
+
+        StringBuilder stats = new StringBuilder();
+
+        Properties props = new Properties();
+
+        Producer<String, JsonNode> producer = null;
+
+        String brokers = Config.getProperty("bootstrap.servers");
+        String topic = Config.getProperty("topic");
+        String numPartitions = Config.getProperty("numPartitions");
+        String replication = Config.getProperty("replication");
+        int total = Integer.valueOf(Config.getProperty("stats.total"));
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+
+        //Create topic if not exist
+        AdminClient adminClient = AdminClient.create(props);
+
+        NewTopic newTopic = new NewTopic(topic, Integer.valueOf(numPartitions), Integer.valueOf(replication).shortValue());
+        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+
+        try {
+            createTopicsResult.all().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TopicExistsException) {
+                logger.info("Topic already exists (delete manually to change settings) !!");
+            } else if (e.getCause() instanceof TimeoutException) {
+                logger.error("Timeout !!");
+            } else {
+                logger.error("Error !!" + e.getCause());
+            }
+        } finally {
+            adminClient.close();
+        }
+
+        try {
+
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerAsync");
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                    StringSerializer.class.getName());
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                    JsonSerializer.class.getName());
+
+            ObjectMapper mapper = new ObjectMapper();
+
+            producer = new KafkaProducer<String, JsonNode>(props);
+            File file = new File(Objects.requireNonNull(classLoader.getResource("listPersons.json")).getFile());
+
+            long i = 0;
+
+            List<JsonNode> allNodes = mapper.readValue(file, new TypeReference<List<JsonNode>>() {
+            });
+
+            Producer<String, JsonNode> finalProducer = producer;
+
+            while (i < total) {
+                LocalDateTime start = LocalDateTime.now();
+
+                long size = allNodes.stream().map(node -> {
+                    try {
+                        // synchronous API
+                        finalProducer.send(new ProducerRecord<String, JsonNode>(topic, node));
+                    } catch (Exception e) {
+                        logger.error("Error IO" + e.getMessage());
+                    }
+                    return null;
+                }).count();
+
+                i = i + size;
+
+                LocalDateTime end = LocalDateTime.now();
+
+                long millis = ChronoUnit.MILLIS.between(start, end);
+
+                stats.append(allNodes.size()).append(" messages inserted in ").append(millis).append(" ms.").append(System.lineSeparator());
+            }
+        } catch (IOException e) {
+            logger.error("Error IO" + e.getMessage());
+        } finally {
+            logger.info(stats);
+            MetricsProducerReporter metrics = new MetricsProducerReporter(producer);
+            metrics.displayMetrics();
+            assert producer != null;
+            producer.close();
+        }
+    }
+
+}
diff --git a/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java b/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java
similarity index 78%
rename from src/main/java/fr/ippon/kafkaLatency/AlimKafka.java
rename to src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java
index 4b0caba12b517292af3b83ca359c9039fac6d5e6..f5678652b180ee9efb62fa4398d8b5274b96e1a5 100644
--- a/src/main/java/fr/ippon/kafkaLatency/AlimKafka.java
+++ b/src/main/java/fr/ippon/kafkaLatency/producer/AlimKafkaSync.java
@@ -1,129 +1,129 @@
-package fr.ippon.kafkaLatency;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import fr.ippon.kafkaLatency.model.Person;
-import fr.ippon.kafkaLatency.utils.Config;
-import fr.ippon.kafkaLatency.utils.MetricsProducerReporter;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.log4j.Logger;
-
-import java.beans.IntrospectionException;
-import java.io.File;
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.temporal.ChronoUnit;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-/**
- * IPPON 2018.
- */
-public class AlimKafka {
-
-    private static ClassLoader classLoader = AlimKafka.class.getClassLoader();
-
-    private final static Logger logger = Logger.getLogger(AlimKafka.class);
-
-    public static void main(String[] args)  throws InterruptedException, IntrospectionException {
-
-        StringBuilder stats = new StringBuilder();
-
-        Properties props = new Properties();
-
-        Producer<String, JsonNode> producer = null;
-
-        String brokers = Config.getProperty("bootstrap.servers");
-        String topic = Config.getProperty("topic");
-        String numPartitions = Config.getProperty("numPartitions");
-        String replication = Config.getProperty("replication");
-        int total = Integer.valueOf(Config.getProperty("stats.total"));
-
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-
-        //Create topic if not exist
-        AdminClient adminClient = AdminClient.create(props);
-
-        NewTopic newTopic = new NewTopic(topic, Integer.valueOf(numPartitions), Integer.valueOf(replication).shortValue());
-        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
-
-        try {
-            createTopicsResult.all().get();
-            // real failure cause is wrapped inside the raised ExecutionException
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof TopicExistsException) {
-                logger.info("Topic already exists (delete manually to change settings) !!");
-            } else if (e.getCause() instanceof TimeoutException) {
-                logger.error("Timeout !!");
-            } else {
-                logger.error("Error !!" + e.getCause());
-            }
-        } finally {
-            adminClient.close();
-        }
-
-        try {
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    StringSerializer.class.getName());
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    JsonSerializer.class.getName());
-            props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 4);
-            ObjectMapper mapper = new ObjectMapper();
-
-            producer = new KafkaProducer<String, JsonNode>(props);
-            List<Person> persons = null;
-
-            File file = new File(classLoader.getResource("listPersons.json").getFile());
-
-            long i = 0;
-
-            persons = mapper.readValue(file, new TypeReference<List<Person>>() {
-            });
-
-            while (i < total) {
-
-                LocalDateTime start = LocalDateTime.now();
-
-                Producer<String, JsonNode> finalProducer = producer;
-                long size = persons.parallelStream().map(person -> {
-                    try {
-                        finalProducer.send(new ProducerRecord<String, JsonNode>(topic, mapper.valueToTree(person)));
-                    } catch (Exception e) {
-                        logger.error("Error IO" + e.getMessage());
-                    }
-                    return null;
-                }).count();
-
-                i = i + size;
-                LocalDateTime end = LocalDateTime.now();
-
-                long millis = ChronoUnit.MILLIS.between(start, end);
-
-                stats.append(persons.size()).append(" messages inserted in ").append(millis).append(" ms.").append(System.lineSeparator());
-            }
-        } catch (IOException e) {
-            logger.error("Error IO" + e.getMessage());
-        } finally {
-            Thread.sleep(5000);
-            logger.info(stats);
-            MetricsProducerReporter metrics = new MetricsProducerReporter(producer);
-            metrics.displayMetrics();
-            producer.close();
-        }
-    }
-
-}
+package fr.ippon.kafkaLatency.producer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import fr.ippon.kafkaLatency.model.Person;
+import fr.ippon.kafkaLatency.utils.Config;
+import fr.ippon.kafkaLatency.utils.MetricsProducerReporter;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * IPPON 2018.
+ */
+public class AlimKafkaSync {
+
+    private static final ClassLoader classLoader = AlimKafkaSync.class.getClassLoader();
+
+    private final static Logger logger = Logger.getLogger(AlimKafkaSync.class);
+
+    public static void main(String[] args)  throws InterruptedException {
+
+        StringBuilder stats = new StringBuilder();
+
+        Properties props = new Properties();
+
+        Producer<String, JsonNode> producer = null;
+
+        String brokers = Config.getProperty("bootstrap.servers");
+        String topic = Config.getProperty("topic");
+        String numPartitions = Config.getProperty("numPartitions");
+        String replication = Config.getProperty("replication");
+        int total = Integer.valueOf(Config.getProperty("stats.total"));
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+
+        //Create topic if not exist
+        AdminClient adminClient = AdminClient.create(props);
+
+        NewTopic newTopic = new NewTopic(topic, Integer.valueOf(numPartitions), Integer.valueOf(replication).shortValue());
+        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+
+        try {
+            createTopicsResult.all().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TopicExistsException) {
+                logger.info("Topic already exists (delete manually to change settings) !!");
+            } else if (e.getCause() instanceof TimeoutException) {
+                logger.error("Timeout !!");
+            } else {
+                logger.error("Error !!" + e.getCause());
+            }
+        } finally {
+            adminClient.close();
+        }
+
+        try {
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerSync");
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                    StringSerializer.class.getName());
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                    JsonSerializer.class.getName());
+            ObjectMapper mapper = new ObjectMapper();
+
+            producer = new KafkaProducer<String, JsonNode>(props);
+            List<Person> persons = null;
+
+            File file = new File(Objects.requireNonNull(classLoader.getResource("listPersons.json")).getFile());
+
+            long i = 0;
+
+            List<JsonNode> allNodes = mapper.readValue(file, new TypeReference<List<JsonNode>>() {
+            });
+
+            Producer<String, JsonNode> localProducer = producer;
+
+            while (i < total) {
+
+                LocalDateTime start = LocalDateTime.now();
+
+                long size = allNodes.stream().map(node -> {
+                    try {
+                        // synchronous API
+                        localProducer.send(new ProducerRecord<String, JsonNode>(topic, node)).get();
+                    } catch (Exception e) {
+                        logger.error("Error IO" + e.getMessage());
+                    }
+                    return null;
+                }).count();
+
+                i = i + size;
+                LocalDateTime end = LocalDateTime.now();
+
+                long millis = ChronoUnit.MILLIS.between(start, end);
+
+                stats.append(allNodes.size()).append(" messages inserted in ").append(millis).append(" ms.").append(System.lineSeparator());
+            }
+        } catch (IOException e) {
+            logger.error("Error IO" + e.getMessage());
+        } finally {
+            logger.info(stats);
+            MetricsProducerReporter metrics = new MetricsProducerReporter(producer);
+            metrics.displayMetrics();
+            assert producer != null;
+            producer.close();
+        }
+    }
+
+}
diff --git a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java
index 2f1461fb325942c9f78072c70d9ef40d1cf241aa..959ad76614d7e573de4a1c2cbae478ce87563638 100644
--- a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java
+++ b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.java
@@ -5,8 +5,6 @@ import fr.ippon.kafkaLatency.model.Person;
 import fr.ippon.kafkaLatency.utils.Config;
 import fr.ippon.kafkaLatency.utils.UtilStats;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -16,6 +14,7 @@ import org.apache.spark.sql.streaming.StreamingQuery;
 import org.apache.spark.sql.streaming.Trigger;
 import scala.Tuple2;
 
+import java.io.Serializable;
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
@@ -24,12 +23,13 @@ import java.util.concurrent.TimeUnit;
 /**
  * Ippon Tech 2018.
  */
-public class KafkaContinuousSpark {
+public class KafkaContinuousSpark implements Serializable {
 
-    private final static ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = new ObjectMapper();
     private final static Logger logger = Logger.getLogger(KafkaContinuousSpark.class);
 
     public static void main(String[] args) {
+
         StreamingQuery query = null;
         SparkSession spark = null;
 
@@ -38,20 +38,18 @@ public class KafkaContinuousSpark {
             String topic = Config.getProperty("topic");
             Integer SparkContinuousInterval = Config.getIntProperty("spark.continuous.interval");
             Integer maxPollRecords = Config.getIntProperty("max.poll.records");
+            Integer numPartitions = Config.getIntProperty("numPartitions");
 
-            UtilStats stats = new UtilStats();
 
-            SparkConf sparkConf = new SparkConf()
-                    .setMaster("local[*]")
-                    .set("spark.streaming.stopGracefullyOnShutdown", "true")
-                    .set("spark.ui.showConsoleProgress", "false")
-                    .set("spark.eventLog.enabled", "false")
-                    .set("spark.streaming.backpressure.enabled", "true")
-                    .setAppName("kafkaContinuous");
+            UtilStats stats = new UtilStats("KafkaContinuousSpark");
 
-            spark = new SparkSession(new SparkContext(sparkConf));
+            // !!! number of kafka partitions must not exceed number of cores
+            spark = SparkSession.builder()
+                    .appName("KafkaContinuousSpark")
+                    .master("local[*]")
+                    .getOrCreate();
 
-            Dataset<Tuple2<String, Timestamp>> line = spark.readStream().format("kafka")
+            Dataset<Tuple2<String, Timestamp>> source = spark.readStream().format("kafka")
                     .option("kafka.bootstrap.servers", brokers)
                     .option("subscribe", topic)
                     .option("max.poll.records", maxPollRecords)
@@ -60,42 +58,40 @@ public class KafkaContinuousSpark {
                     .select("value", "timestamp")
                     .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
 
-            Dataset<Long> json = line.map((MapFunction<Tuple2<String, Timestamp>, Long>) t -> {
+            Dataset<Person> ds = source.map((MapFunction<Tuple2<String, Timestamp>, Person>) t -> {
                 LocalDateTime now = LocalDateTime.now();
 
                 LocalDateTime parsedDate = t._2.toLocalDateTime();
 
                 long millis = ChronoUnit.MILLIS.between(parsedDate, now);
 
-                mapper.readValue(t._1, Person.class);
+                Person p = mapper.readValue(t._1, Person.class);
 
                 stats.treatStats(millis);
 
-                return millis;
-            }, Encoders.bean(Long.class));
+                return p;
+            }, Encoders.bean(Person.class));
 
 
-            query = json.writeStream()
+            query = ds.writeStream()
                     .outputMode(OutputMode.Append())
                     .format("memory").queryName("test")
                     .trigger(Trigger.Continuous(SparkContinuousInterval, TimeUnit.MILLISECONDS))
                     .start();
 
-            //await
             query.awaitTermination();
 
         } catch (Exception e) {
             logger.error("Error " + e.getMessage());
+
+            assert query != null;
             if (query.isActive()) {
                 query.stop();
-                spark.close();
+                spark.stop();
             }
-        }
-    }
-}
-
-
-
 
+        }
 
+    }
 
+}
\ No newline at end of file
diff --git a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java
index 7731d37cca7dacaa4e3fe88871010c218d1dd7b5..7f3aa4a4cccced93197a6899c33626e37512f7c0 100644
--- a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java
+++ b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaFlink.java
@@ -12,7 +12,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.log4j.Logger;
 
 import java.time.Instant;
@@ -28,9 +28,10 @@ public class KafkaFlink {
 
     private final static Logger logger = Logger.getLogger(KafkaFlink.class);
 
+    private static final ObjectMapper mapper = new ObjectMapper();
 
     public static class TimestampsFetcher implements AssignerWithPunctuatedWatermarks<String> {
-        UtilStats stats = new UtilStats();
+        final UtilStats stats = new UtilStats("KafkaFlink");
 
         @Override
         public long extractTimestamp(String element, long previousElementTimestamp) {
@@ -61,13 +62,12 @@ public class KafkaFlink {
             // set up the streaming execution environment
             final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-            env.enableCheckpointing(100);
+            env.enableCheckpointing(150);
             Properties properties = new Properties();
             properties.setProperty("bootstrap.servers", brokers);
             properties.setProperty("auto.offset.reset", "latest");
 
-            properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "flink");
-
+            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "Flink");
 
             FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), properties);
 
@@ -75,11 +75,9 @@ public class KafkaFlink {
 
             myConsumer.assignTimestampsAndWatermarks(new TimestampsFetcher());
 
-            input.map((MapFunction<String, Person>) (String value) -> {
-                ObjectMapper mapper = new ObjectMapper();
-                return mapper.readValue(value, Person.class);
-
-            }).setParallelism(numPartitions).countWindowAll(1000);
+            input.map((MapFunction<String, Person>) (String value) ->
+                    mapper.readValue(value, Person.class)
+            ).setParallelism(numPartitions).countWindowAll(1000);
 
             // execute program
             env.execute("Flink Streaming Kafka");
diff --git a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java
similarity index 73%
rename from src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java
rename to src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java
index 11bb330a39fd8f0cb91956fdb15b6cfec73d989e..d7e383953f50f0327135e5a9694272afc7420d72 100644
--- a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSpark.java
+++ b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaSparkStreaming.java
@@ -18,7 +18,6 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies;
 import org.apache.spark.streaming.kafka010.KafkaUtils;
 import org.apache.spark.streaming.kafka010.LocationStrategies;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
@@ -27,22 +26,25 @@ import java.util.*;
 /**
  * Ippon Tech 2018.
  */
-public class KafkaSpark {
-    static ObjectMapper mapper = new ObjectMapper();
+public class KafkaSparkStreaming {
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-    private final static Logger logger = Logger.getLogger(KafkaContinuousSpark.class);
+    private final static Logger logger = Logger.getLogger(KafkaSparkStreaming.class);
 
-    static Function function = (Function<ConsumerRecord<String, String>, Person>) cr -> {
-            LocalDateTime kafkaTimestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(cr.timestamp()), TimeZone
-                    .getDefault().toZoneId());
-            LocalDateTime now = LocalDateTime.now();
+    private final static UtilStats stats = new UtilStats("KafkaSparkStreaming");
 
-            long millis = ChronoUnit.MILLIS.between(kafkaTimestamp, now);
-            Person p = mapper.readValue(cr.value(), Person.class);
+    private static final Function function = (Function<ConsumerRecord<String, String>, Long>) cr -> {
+        LocalDateTime kafkaTimestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(cr.timestamp()), TimeZone
+                .getDefault().toZoneId());
+        LocalDateTime now = LocalDateTime.now();
 
-            UtilStats.treatStats(millis);
+        long millis = ChronoUnit.MILLIS.between(kafkaTimestamp, now);
+        Person p = mapper.readValue(cr.value(), Person.class);
+        // Compute data here
 
-        return p;
+        stats.treatStats(millis);
+
+        return millis;
 
     };
 
@@ -54,27 +56,24 @@ public class KafkaSpark {
             Integer SparkStreamingInterval = Config.getIntProperty("spark.streaming.interval");
             Integer maxPollRecords = Config.getIntProperty("max.poll.records");
 
+            // !!! number of kafka partitions must not exceed number of cores
             SparkConf sparkConf = new SparkConf()
                     .setMaster("local[*]")
                     .set("spark.streaming.stopGracefullyOnShutdown", "true")
-                    .set("spark.ui.enabled", "false")
-                    .set("spark.eventLog.enabled", "false")
-                    .set("spark.streaming.backpressure.enabled", "true")
                     .setAppName("kafkaSparkStreaming");
 
             sc = new JavaStreamingContext(sparkConf, Durations.milliseconds(SparkStreamingInterval));
 
-
             Map<String, Object> kafkaParams = new HashMap<>();
 
             kafkaParams.put("bootstrap.servers", brokers);
             kafkaParams.put("key.deserializer", StringDeserializer.class);
             kafkaParams.put("value.deserializer", StringDeserializer.class);
-            kafkaParams.put("group.id", "KafkaSpark");
+            kafkaParams.put("group.id", "KafkaSparkStreaming");
             kafkaParams.put("auto.offset.reset", "latest");
             kafkaParams.put("max.poll.records", maxPollRecords);
 
-            Collection<String> topics = Arrays.asList(topic);
+            Collection<String> topics = Collections.singletonList(topic);
 
             final JavaInputDStream<ConsumerRecord<String, String>> stream =
                     KafkaUtils.createDirectStream(
@@ -90,6 +89,7 @@ public class KafkaSpark {
 
         } catch (Exception e) {
             logger.error("Error " + e.getMessage());
+            assert sc != null;
             sc.stop(true, true);
         }
     }
diff --git a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java
similarity index 56%
rename from src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java
rename to src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java
index f1bbff87d0ffedadcd72b91fefc0ee3cb7d59782..281f8207f977195105af94ade89a2fa961d342df 100644
--- a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStreaming.java
+++ b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStream.java
@@ -11,7 +11,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.log4j.Logger;
 
@@ -25,14 +24,14 @@ import java.util.TimeZone;
 /**
  * Ippon Tech 2018.
  */
-public class KafkaStreaming  {
-    static ObjectMapper mapper = new ObjectMapper();
+public class KafkaStream {
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-    private final static Logger logger = Logger.getLogger(KafkaStreaming.class);
+    private final static Logger logger = Logger.getLogger(KafkaStream.class);
 
     public static class MyTimestampExtractor implements TimestampExtractor {
 
-        UtilStats stats = new UtilStats();
+        final UtilStats stats = new UtilStats("KafkaStream");
 
         public MyTimestampExtractor() {
             super();
@@ -55,43 +54,38 @@ public class KafkaStreaming  {
     }
 
     public static void main(String[] args) {
+        KafkaStreams streams;
         try {
 
 
-        String brokers = Config.getProperty("bootstrap.servers");
-        String topic = Config.getProperty("topic");
-        String numPartitions = Config.getProperty("numPartitions");
+            String brokers = Config.getProperty("bootstrap.servers");
+            String topic = Config.getProperty("topic");
+            String numPartitions = Config.getProperty("numPartitions");
 
-        Properties props = new Properties();
+            Properties props = new Properties();
 
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka stream");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numPartitions);
+            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka stream");
+            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+            props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);
+            props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numPartitions);
 
-        KStreamBuilder builder = new KStreamBuilder();
+            KStreamBuilder builder = new KStreamBuilder();
 
-        builder.stream(Serdes.String(), Serdes.String(), topic).filter(new Predicate<String, String>() {
-            @Override
-            public boolean test(String key, String value) {
+            builder.stream(Serdes.String(), Serdes.String(), topic).filter((key, value) -> {
                 try {
-
-                    mapper.readValue(value, Person.class);
-
+                    Person p = mapper.readValue(value, Person.class);
+                    // Compute data here
                 } catch (IOException e) {
                     logger.error("Error " + e.getMessage());
                 }
                 return false;
-            }
-        }).print();
-
+            }).print();
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-
-        streams.start();
+            streams = new KafkaStreams(builder, props);
+            streams.start();
 
         } catch (Exception e) {
             logger.error("Error " + e.getMessage());
@@ -99,5 +93,4 @@ public class KafkaStreaming  {
 
     }
 
-
 }
diff --git a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java
index 96879953fb00e146e008452e0f6f5be2b77488d8..d148d658de8c1533e9baa350cb9dd4798b4a7aa5 100644
--- a/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java
+++ b/src/main/java/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class KafkaStructuredSpark {
 
-    private static ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = new ObjectMapper();
 
     private final static Logger logger = Logger.getLogger(KafkaStructuredSpark.class);
 
@@ -41,28 +41,27 @@ public class KafkaStructuredSpark {
             Integer maxPollRecords = Config.getIntProperty("max.poll.records");
             Integer interval = Config.getIntProperty("spark.structured.interval");
 
-            UtilStats stats = new UtilStats();
+            UtilStats stats = new UtilStats("KafkaStructuredSpark");
 
+            // !!! number of kafka partitions must not exceed number of cores
             SparkConf sparkConf = new SparkConf()
                     .setMaster("local[*]")
                     .set("spark.streaming.stopGracefullyOnShutdown", "true")
-                    .set("spark.ui.showConsoleProgress", "false")
-                    .set("spark.eventLog.enabled", "false")
-                    .set("spark.streaming.backpressure.enabled", "true")
                     .setAppName("kafkaStructured");
 
             spark = new SparkSession(new SparkContext(sparkConf));
 
-            Dataset<Tuple2<String, Timestamp>> line = spark.readStream().format("kafka")
+            Dataset<Tuple2<String, Timestamp>> source = spark.readStream()
+                    .format("kafka")
                     .option("kafka.bootstrap.servers", brokers)
                     .option("subscribe", topic)
                     .option("max.poll.records", maxPollRecords)
                     .option("startingOffsets", "latest")
-                    .load()
+                     .load()
                     .select("value", "timestamp")
                     .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
 
-            Dataset<Person> json = line.map((MapFunction<Tuple2<String, Timestamp>, Person>) t -> {
+            Dataset<Person> json = source.map((MapFunction<Tuple2<String, Timestamp>, Person>) t -> {
                 LocalDateTime now = LocalDateTime.now();
 
                 LocalDateTime parsedDate = t._2.toLocalDateTime();
@@ -78,7 +77,7 @@ public class KafkaStructuredSpark {
 
             query = json.writeStream()
                     .outputMode(OutputMode.Append())
-                    .format("memory").queryName("test")
+                    .format("memory").queryName("structured")
                     .trigger(Trigger.ProcessingTime(interval, TimeUnit.MILLISECONDS))
                     .start();
 
@@ -86,6 +85,7 @@ public class KafkaStructuredSpark {
 
         } catch (Exception e) {
             logger.error("Error " + e.getMessage());
+            assert query != null;
             if (query.isActive()) {
                 query.stop();
                 spark.close();
diff --git a/src/main/java/fr/ippon/kafkaLatency/utils/Config.java b/src/main/java/fr/ippon/kafkaLatency/utils/Config.java
index 1efe43f8d67443c08a2b9141ebe10937af2ff98d..fee37ab50d8e4fd3049a91f5252ff60273d83e49 100644
--- a/src/main/java/fr/ippon/kafkaLatency/utils/Config.java
+++ b/src/main/java/fr/ippon/kafkaLatency/utils/Config.java
@@ -1,16 +1,16 @@
 package fr.ippon.kafkaLatency.utils;
 
-import org.apache.commons.beanutils.PropertyUtilsBean;
+import fr.ippon.kafkaLatency.streaming.KafkaContinuousSpark;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
 import org.apache.commons.configuration2.builder.fluent.Parameters;
 import org.apache.commons.configuration2.ex.ConfigurationException;
-
-import java.beans.IntrospectionException;
+import org.apache.log4j.Logger;
 
 public class Config {
 
-    private static PropertiesConfiguration config;
+    private static final PropertiesConfiguration config;
+    private final static Logger logger = Logger.getLogger(KafkaContinuousSpark.class);
 
     static {
         try {
@@ -23,16 +23,16 @@ public class Config {
             config = builder.getConfiguration();
 
         } catch (ConfigurationException cex) {
-
+            logger.error("Error " + cex.getMessage());
         }
     }
 
-    static public String getProperty(String key) throws IntrospectionException {
-        return (String)config.getProperty(key);
+    static public String getProperty(String key) {
+        return (String) config.getProperty(key);
     }
 
 
-    static public Integer getIntProperty(String key) throws IntrospectionException {
+    static public Integer getIntProperty(String key)  {
         return Integer.valueOf(getProperty(key));
     }
 
diff --git a/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java b/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java
index 63c0d4960dbaa96c39c4556c0381f150aed86c9c..8ceef9e128966c8cac83fe89d0eac6c48273cd88 100644
--- a/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java
+++ b/src/main/java/fr/ippon/kafkaLatency/utils/UtilStats.java
@@ -2,7 +2,6 @@ package fr.ippon.kafkaLatency.utils;
 
 import org.apache.log4j.Logger;
 
-import java.beans.IntrospectionException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -13,48 +12,49 @@ import java.util.LongSummaryStatistics;
  * IPPON 2018.
  */
 public class UtilStats implements Serializable {
-    static List<Long> lStep = Collections.synchronizedList(new ArrayList());
-    static List<Long> lTotal = Collections.synchronizedList(new ArrayList());
 
-    private static long step;
-    private static long total;
+    private static final List<Long> lStep = Collections.synchronizedList(new ArrayList());
+    private static final List<Long> lTotal = Collections.synchronizedList(new ArrayList());
+
+    private final long step;
+    private long total;
+    private String benchName;
 
     private final static Logger logger = Logger.getLogger(UtilStats.class);
 
-    static {
-        try {
-            step = Integer.valueOf(Config.getProperty("stats.step"));
-            total = Integer.valueOf(Config.getProperty("stats.total"));
-        } catch (IntrospectionException e) {
-            logger.error("Error " + e.getMessage());
-        }
+    public UtilStats(String benchName) {
 
+        step = Integer.valueOf(Config.getProperty("stats.step"));
+        total = Integer.valueOf(Config.getProperty("stats.total"));
+
+        this.benchName = benchName;
     }
 
-    public static void treatStats(long millis) {
-        synchronized (lTotal) {
-            synchronized (lStep) {
-                lStep.add(millis);
+    public void treatStats(long millis) {
+        synchronized (lStep) {
+            lStep.add(millis);
 
-                if (lStep.size() > (step - 1)) {
-                    LongSummaryStatistics stats = lStep.stream()
-                            .mapToLong((x) -> x)
-                            .summaryStatistics();
+            if (lStep.size() > (step - 1)) {
+                LongSummaryStatistics stats = lStep.parallelStream()
+                        .mapToLong((x) -> x)
+                        .summaryStatistics();
 
-                    logger.info("Stats : " + stats);
-                    lStep.clear();
-                }
+                logger.info("Stats (" + benchName + ") : " + stats);
+                lStep.clear();
             }
+
             lTotal.add(millis);
             if (lTotal.size() > (total - 1)) {
-                LongSummaryStatistics stats = lTotal.stream()
+                LongSummaryStatistics stats = lTotal.parallelStream()
                         .mapToLong((x) -> x)
                         .summaryStatistics();
 
-                logger.info("Stats total: " + stats);
+                logger.info("Stats total (" + benchName + ") : " + stats);
+
                 lTotal.clear();
             }
 
         }
     }
+
 }
diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties
index 6169112abe307c52c1b7ffd95514793c98c6e530..36d058c0a09ce9b708f1b2bd75ab1514f5ceb122 100644
--- a/src/main/resources/config.properties
+++ b/src/main/resources/config.properties
@@ -1,13 +1,10 @@
 topic=latency-tests
 bootstrap.servers=localhost:9092
-#bootstrap.servers=192.168.99.100:32400
-numPartitions=1
-#numPartitions=3
+numPartitions=3
 replication=1
-#replication=3
-stats.step=1000
+stats.step=10000
 stats.total=100000
 max.poll.records=1000
 spark.continuous.interval=300
-spark.structured.interval=150
+spark.structured.interval=200
 spark.streaming.interval=50
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index 1d493b1c0284c5d7bcced299cd2de553847ba386..ba331cbfd64f8d8716e709149e70ac711627ad67 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -8,22 +8,22 @@
 	<appender name="console" class="org.apache.log4j.ConsoleAppender">
 		<param name="Target" value="System.out"/>
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %t [%-5p] %c{2} - %m%n"/>
+			<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %t [%-5p] - %m%n"/>
 		</layout>
 	</appender>
 
 	<appender name="file" class="org.apache.log4j.RollingFileAppender">
 		<param name="append" value="true"/>
 		<param name="file" value="./kafka-bench.log"/>
-		<param name="maxBackupIndex" value="10"/>
-		<param name="maxFileSize" value="10MB"/>
+		<param name="maxBackupIndex" value="6"/>
+		<param name="maxFileSize" value="2MB"/>
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%-5p] - %d{dd/MM/yyyy-HH:mm:ss} - %c{2} - %m%n"/>
+			<param name="ConversionPattern" value="[%-5p] - %d{dd/MM/yyyy-HH:mm:ss} - %m%n"/>
 		</layout>
 	</appender>
 	<appender name="async_file" class="org.apache.log4j.AsyncAppender">
-		<param name="BufferSize" value="128"/>
-		<param name="Blocking" value="false"/>
+		<param name="BufferSize" value="60"/>
+		<param name="Blocking" value="true"/>
 		<appender-ref ref="file"/>
 	</appender>
 
@@ -38,12 +38,22 @@
 		<appender-ref ref="console"/>
 		<appender-ref ref="async_file"/>
 	</logger>
+    <logger name="org.apache.spark" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+        <appender-ref ref="async_file"/>
+    </logger>
 	<logger name="org.apache.spark.sql.kafka010" additivity="false">
 		<level value="error"/>
 		<appender-ref ref="console"/>
 		<appender-ref ref="async_file"/>
-
 	</logger>
+    <logger name="fr.ippon.kafkaLatency" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="console"/>
+        <appender-ref ref="async_file"/>
+    </logger>
+
 	<logger name="org.apache.commons.beanutils" additivity="false">
 		<level value="error"/>
 		<appender-ref ref="console"/>
diff --git a/target/classes/config.properties b/target/classes/config.properties
index 95908d605f5d3772c02df6814bc3fc4e6e28941c..36d058c0a09ce9b708f1b2bd75ab1514f5ceb122 100644
--- a/target/classes/config.properties
+++ b/target/classes/config.properties
@@ -1,11 +1,10 @@
-topic=realtime.persons
+topic=latency-tests
 bootstrap.servers=localhost:9092
-#bootstrap.servers=192.168.99.100:32400
-numPartitions=1
+numPartitions=3
 replication=1
-stats.step=1000
+stats.step=10000
 stats.total=100000
 max.poll.records=1000
 spark.continuous.interval=300
-spark.structured.interval=150
+spark.structured.interval=200
 spark.streaming.interval=50
diff --git a/target/classes/fr/ippon/kafkaLatency/AlimKafka$1.class b/target/classes/fr/ippon/kafkaLatency/AlimKafka$1.class
deleted file mode 100644
index 7bbe62deab2c5288a0aad9c26592d01782042363..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/AlimKafka$1.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/AlimKafka.class b/target/classes/fr/ippon/kafkaLatency/AlimKafka.class
deleted file mode 100644
index aeecd2848f161a9a90a0d1b0e3a08ddba224f57a..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/AlimKafka.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/AlimKafkaOptim.class b/target/classes/fr/ippon/kafkaLatency/AlimKafkaOptim.class
deleted file mode 100644
index 1439273dea8f00455c74de0177224f66c1cc129b..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/AlimKafkaOptim.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class
index b2464663c15ae3bbb221f3f782ace675c11788cf..a6317b7995d5b7d09db197b57ad90b88e4d9a09c 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class and b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaContinuousSpark.class differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class
index 835236c17466f4faa0b2be211e4225ebd1d628e8..a58defc0b73e8ea532ed154f55b8ead11fe34cc0 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class and b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink$TimestampsFetcher.class differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class
index 6bc17a3b98ce0584b917fced8ef49f79b394e0fb..9b69329ca0b5229070d5ba269e8b682f04dc75c9 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class and b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaFlink.class differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaSpark.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaSpark.class
deleted file mode 100644
index d052941901b78184b025bf440021bed52eb83f4b..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaSpark.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$1.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$1.class
deleted file mode 100644
index fd737482c754f89b60314bc7a25b2ca7f050cb6f..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$1.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$MyTimestampExtractor.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$MyTimestampExtractor.class
deleted file mode 100644
index 442061a4ff662de5b36a8e55a9f2a3469920e257..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming$MyTimestampExtractor.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming.class
deleted file mode 100644
index 14242125efeb12bc2e5e1dd0eb51d5bcdf5be457..0000000000000000000000000000000000000000
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStreaming.class and /dev/null differ
diff --git a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class
index 190c61aa05a58407bc59461b627469ee90fe92c2..f8bb5b8853e1cf81e60d8805713fe09a3f2a4fbe 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class and b/target/classes/fr/ippon/kafkaLatency/streaming/KafkaStructuredSpark.class differ
diff --git a/target/classes/fr/ippon/kafkaLatency/utils/Config.class b/target/classes/fr/ippon/kafkaLatency/utils/Config.class
index 7ff3e9bee42d46c890d56db9610ed75f5af7cc30..9c3079bca01c40a5e6318cca263fd81431806462 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/utils/Config.class and b/target/classes/fr/ippon/kafkaLatency/utils/Config.class differ
diff --git a/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class b/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class
index 8c0ccf74357c4e3d12b30fd5431088d035f1e4a5..6be66ccbd5daeef0e5b19e9eead1c455acb966e1 100644
Binary files a/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class and b/target/classes/fr/ippon/kafkaLatency/utils/UtilStats.class differ
diff --git a/target/classes/log4j.xml b/target/classes/log4j.xml
index 1d493b1c0284c5d7bcced299cd2de553847ba386..ba331cbfd64f8d8716e709149e70ac711627ad67 100644
--- a/target/classes/log4j.xml
+++ b/target/classes/log4j.xml
@@ -8,22 +8,22 @@
 	<appender name="console" class="org.apache.log4j.ConsoleAppender">
 		<param name="Target" value="System.out"/>
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %t [%-5p] %c{2} - %m%n"/>
+			<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %t [%-5p] - %m%n"/>
 		</layout>
 	</appender>
 
 	<appender name="file" class="org.apache.log4j.RollingFileAppender">
 		<param name="append" value="true"/>
 		<param name="file" value="./kafka-bench.log"/>
-		<param name="maxBackupIndex" value="10"/>
-		<param name="maxFileSize" value="10MB"/>
+		<param name="maxBackupIndex" value="6"/>
+		<param name="maxFileSize" value="2MB"/>
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%-5p] - %d{dd/MM/yyyy-HH:mm:ss} - %c{2} - %m%n"/>
+			<param name="ConversionPattern" value="[%-5p] - %d{dd/MM/yyyy-HH:mm:ss} - %m%n"/>
 		</layout>
 	</appender>
 	<appender name="async_file" class="org.apache.log4j.AsyncAppender">
-		<param name="BufferSize" value="128"/>
-		<param name="Blocking" value="false"/>
+		<param name="BufferSize" value="60"/>
+		<param name="Blocking" value="true"/>
 		<appender-ref ref="file"/>
 	</appender>
 
@@ -38,12 +38,22 @@
 		<appender-ref ref="console"/>
 		<appender-ref ref="async_file"/>
 	</logger>
+    <logger name="org.apache.spark" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+        <appender-ref ref="async_file"/>
+    </logger>
 	<logger name="org.apache.spark.sql.kafka010" additivity="false">
 		<level value="error"/>
 		<appender-ref ref="console"/>
 		<appender-ref ref="async_file"/>
-
 	</logger>
+    <logger name="fr.ippon.kafkaLatency" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="console"/>
+        <appender-ref ref="async_file"/>
+    </logger>
+
 	<logger name="org.apache.commons.beanutils" additivity="false">
 		<level value="error"/>
 		<appender-ref ref="console"/>
diff --git a/target/test-classes/LatencyTest.class b/target/test-classes/LatencyTest.class
deleted file mode 100644
index 0d04f7fa1ec9af1fc5e9c4c977065b33607b54de..0000000000000000000000000000000000000000
Binary files a/target/test-classes/LatencyTest.class and /dev/null differ