From 6c35165447e022f527901ef116ff619737507574 Mon Sep 17 00:00:00 2001 From: Jeannine Tondreau <jtondreau@ipponusa.com> Date: Fri, 3 Mar 2017 10:27:42 -0500 Subject: [PATCH] pushing initial kafka streaming code --- .gitignore | 3 + README.md | 1 + dependency-reduced-pom.xml | 50 +++++++++++++ pom.xml | 73 +++++++++++++++++++ .../dojo/driver/KafkaStreamingDriver.scala | 49 +++++++++++++ .../KafkaStructuredStreamingDriver.scala | 39 ++++++++++ 6 files changed, 215 insertions(+) create mode 100644 .gitignore create mode 100644 dependency-reduced-pom.xml create mode 100644 pom.xml create mode 100644 src/main/scala/com/ippon/dojo/driver/KafkaStreamingDriver.scala create mode 100644 src/main/scala/com/ippon/dojo/driver/KafkaStructuredStreamingDriver.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d0ae19d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +./project +target +./classpath diff --git a/README.md b/README.md index e69de29..abea153 100644 --- a/README.md +++ b/README.md @@ -0,0 +1 @@ +Kafka Streaming Examples \ No newline at end of file diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..a969229 --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.ippon.dojo</groupId> + <artifactId>KafkaStreaming</artifactId> + <version>0.0.1-SNAPSHOT</version> + <build> + <sourceDirectory>src/main/scala</sourceDirectory> + <plugins> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.1</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..129cd52 --- /dev/null +++ b/pom.xml @@ -0,0 +1,73 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.ippon.dojo</groupId> + <artifactId>KafkaStreaming</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.11</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_2.11</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>2.1.0</version> + </dependency> + </dependencies> + <build> + <sourceDirectory>src/main/scala</sourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.1</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> +</project> \ No newline at end of file diff --git a/src/main/scala/com/ippon/dojo/driver/KafkaStreamingDriver.scala b/src/main/scala/com/ippon/dojo/driver/KafkaStreamingDriver.scala new file mode 100644 index 0000000..59a3ba2 --- /dev/null +++ b/src/main/scala/com/ippon/dojo/driver/KafkaStreamingDriver.scala @@ -0,0 +1,49 @@ +package com.ippon.dojo.driver + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import kafka.serializer.StringDecoder +import org.apache.spark.streaming._ +import org.apache.spark.SparkConf +import org.apache.spark.streaming.kafka010.KafkaUtils +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.kafka010._ +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe + +object KafkaStreamingDriver { + def main(args: Array[String]): Unit = { + //configure the Streaming Context + val sparkConf = new SparkConf().setAppName("KafkaStreamingDojoApp") + val sparkContext = new SparkContext(sparkConf) + val streamingContext = new StreamingContext(sparkContext, Seconds(1)) + + val kafkaTopic = "dojo" + // Create direct kafka stream with brokers and topics + val kafkaParams = Map[String, Object]( + "bootstrap.servers" -> "ec2-54-174-179-236.compute-1.amazonaws.com:9092", + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[StringDeserializer], + "group.id" -> "dojo", + "auto.offset.reset" -> "latest", + "enable.auto.commit" -> (false: java.lang.Boolean)) + + val topics = Array("dojo") + val complaints = KafkaUtils.createDirectStream[String, String]( + streamingContext, + PreferConsistent, + Subscribe[String, String](topics, kafkaParams)) + + //perform action on the stream + complaints.foreachRDD((rdd, time) => { + val count = rdd.count() + System.out.println(count + " complaints were collected at " + time) + }) + + //start the stream + streamingContext.start() + streamingContext.awaitTermination() + } +} \ No newline at end of file diff --git a/src/main/scala/com/ippon/dojo/driver/KafkaStructuredStreamingDriver.scala b/src/main/scala/com/ippon/dojo/driver/KafkaStructuredStreamingDriver.scala new file mode 100644 index 0000000..7cd3e5e --- /dev/null +++ b/src/main/scala/com/ippon/dojo/driver/KafkaStructuredStreamingDriver.scala @@ -0,0 +1,39 @@ +package com.ippon.dojo.driver + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import org.apache.spark.sql.SparkSession + +object KafkaStructuredStreamingDriver { + + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder + .appName("KafkaStructuredStreamingDojoApp") + .getOrCreate() + + import spark.implicits._ + + val complaints = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092") + .option("subscribe", "dojo") + .load() + .selectExpr("CAST(value AS STRING)") + .as[String] + + // split complaints + val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count() + + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + + } +} \ No newline at end of file -- GitLab