Skip to content
Snippets Groups Projects
KafkaStructuredStreamingDriver.scala 1004 B
Newer Older
package com.ippon.dojo.driver

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SparkSession

object KafkaStructuredStreamingDriver {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("KafkaStructuredStreamingDojoApp")
      .getOrCreate()
      
    import spark.implicits._
    
    val complaints = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092")
      .option("subscribe", "dojo")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
    
    // split complaints
    val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count()
    
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()

  }
}