Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.ippon.dojo.driver
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SparkSession
object KafkaStructuredStreamingDriver {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("KafkaStructuredStreamingDojoApp")
.getOrCreate()
import spark.implicits._
val complaints = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092")
.option("subscribe", "dojo")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// split complaints
val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}