package com.ippon.dojo.driver import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.sql.SparkSession object KafkaStructuredStreamingDriver { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("KafkaStructuredStreamingDojoApp") .getOrCreate() import spark.implicits._ val complaints = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092") .option("subscribe", "dojo") .load() .selectExpr("CAST(value AS STRING)") .as[String] // split complaints val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }