Forked from
sparkdojo / KafkaStreaming
1 commit behind the upstream repository.
-
Jeannine Tondreau authoredJeannine Tondreau authored
KafkaStructuredStreamingDriver.scala 1004 B
package com.ippon.dojo.driver
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SparkSession
object KafkaStructuredStreamingDriver {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("KafkaStructuredStreamingDojoApp")
.getOrCreate()
import spark.implicits._
val complaints = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092")
.option("subscribe", "dojo")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// split complaints
val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}