Skip to content
Snippets Groups Projects
Commit 6c351654 authored by Jeannine Tondreau's avatar Jeannine Tondreau
Browse files

pushing initial kafka streaming code

parent 8b0844c3
No related branches found
No related tags found
No related merge requests found
./project
target
./classpath
Kafka Streaming Examples
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ippon.dojo</groupId>
<artifactId>KafkaStreaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
pom.xml 0 → 100644
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ippon.dojo</groupId>
<artifactId>KafkaStreaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.ippon.dojo.driver
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaStreamingDriver {
def main(args: Array[String]): Unit = {
//configure the Streaming Context
val sparkConf = new SparkConf().setAppName("KafkaStreamingDojoApp")
val sparkContext = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val kafkaTopic = "dojo"
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ec2-54-174-179-236.compute-1.amazonaws.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "dojo",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("dojo")
val complaints = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
//perform action on the stream
complaints.foreachRDD((rdd, time) => {
val count = rdd.count()
System.out.println(count + " complaints were collected at " + time)
})
//start the stream
streamingContext.start()
streamingContext.awaitTermination()
}
}
\ No newline at end of file
package com.ippon.dojo.driver
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SparkSession
object KafkaStructuredStreamingDriver {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("KafkaStructuredStreamingDojoApp")
.getOrCreate()
import spark.implicits._
val complaints = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ec2-54-174-179-236.compute-1.amazonaws.com:9092")
.option("subscribe", "dojo")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// split complaints
val wordCounts = complaints.flatMap(_.split(",")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment