Skip to content

Commit

Permalink
setup functions and libs
Browse files Browse the repository at this point in the history
  • Loading branch information
WarFox committed Mar 2, 2024
1 parent 9b3da3a commit 3b4de0b
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 12 deletions.
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,69 @@ Sandbox project for playing around with Spark

PRs welcome!

TODO: Transactional topic with no compaction - consecutive offsets
TODO: Transactional topic with compaction - non consecutive offsets

## Build

## Kafka Avro data

Structured Streaming in Confluent Platform provides =from_avro= function that
accepts Confluent Schema registry url as a parameter and handles Confluent Avro
format (wire format) automatically

https://docs.databricks.com/structured-streaming/avro-dataframe.html

## Submit

```
spark-submit --master localhost[*]
--jars target
--class <main-class> \
<application-jar> \
[application-arguments]
```

```sh
sbt package && \
spark-submit --packages \
org.apache.spark:spark-sql_2.12:3.3.1,\
org.apache.spark:spark-streaming_2.12:3.3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 \
--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample \
--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar
```

```sh
sbt package and
(spark-submit --packages org.apache.spark:spark-sql_2.12:3.3.1,
org.apache.spark:spark-streaming_2.12:3.3.1,
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample
--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar)
```

org.apache.hadoop:hadoop-common:3.3.4,\
org.apache.hadoop:hadoop-aws:3.3.4,
com.fasterxml.jackson.core:jackson-databind:2.12.7 \
--conf spark.jars.ivySettings=./ivysettings.xml \

## References ##

- https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html
- https://spark.apache.org/docs/latest/quick-start.html
- https://bzhangusc.wordpress.com/2015/11/20/use-sbt-console-as-spark-shell/
- https://github.com/MrPowers/spark-examples/blob/master/src/main/scala/com/github/mrpowers/spark/examples/SparkSummit.scala
- https://sparkbyexamples.com/spark/different-ways-to-create-a-spark-dataframe/

export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -Xmx2G -Xms1G"


* How to set the batch size? Time?

* Using spark with Schema Registry
https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala

https://datachef.co/blog/deserialzing-confluent-avro-record-kafka-spark/

https://www.dremio.com/blog/streaming-data-into-apache-iceberg-tables-using-aws-kinesis-and-aws-glue/
52 changes: 42 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,40 @@ name := "spark-sandbox"

version := "0.0.1"

scalaVersion := "2.13.6"
scalaVersion := "2.12.15"

val sparkVersion = "3.3.1"
val hadoopVersion = "3.3.4"
val confluentVersion = "7.3.1"

resolvers += "Confluent" at "https://packages.confluent.io/maven/"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion, "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion, "provided",
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
/** Provided Section **/

// Spark library with same version MUST be available in the cluster that the jobs run
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion % "provided" ,

"org.apache.hudi" %% "hudi-spark3.3-bundle" % "0.12.1" % "provided",

// Hadoop libraries with same version MUST be available in the cluster that the jobs run
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided",

/** End of Provided Section - libraries in provided section is not included in assembly jar **/

// thrird party library for using Confluent Schema Registry with Spark
"za.co.absa" % "abris_2.12" % "6.3.0",
"io.confluent" % "kafka-schema-registry-client" % confluentVersion excludeAll(
ExclusionRule(organization = "com.fasterxml.jackson.module", name = "jackson-module-scala")
),
"io.confluent" % "kafka-avro-serializer" % confluentVersion excludeAll(
ExclusionRule(organization = "com.fasterxml.jackson.module", name = "jackson-module-scala")
),

"com.github.mrpowers" %% "spark-daria" % "1.2.3",

// jackson-module-scala is required for jackson-databind
Expand All @@ -29,29 +53,37 @@ libraryDependencies ++= Seq(
Test / fork := true

javaOptions ++= Seq(
"-Xms512M",
"-Xmx2048M",
"-XX:MaxPermSize=2048M",
"-Xms1G",
"-Xmx2G",
"-XX:+CMSClassUnloadingEnabled"
)
// The above is equilavent as export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -Xmx2G -Xms1GG

// Show runtime of tests
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD")

// JAR file settings

ThisBuild / assemblyShadeRules := Seq(
ShadeRule.rename(("com.fasterxml.jackson.**") -> "shadejackson.@1").inAll
)

// don't include Scala in the JAR file
// assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// assembly / assemblyOption in := (assembly / assemblyOption).value.copy(includeScala = false)
// ThisBuild / assemblyOption := assembly / assemblyOption ~= { _.withIncludeScala(false) }
// ThisBuild / assemblyPackageScala := false

// Add the JAR file naming conventions described here: https://github.com/MrPowers/spark-style-guide#jar-files
// You can add the JAR file naming conventions by running the shell script

// When you run sbt console - spark, sc and sqlContext will be ready for you!
console / initialCommands := s"""
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
.master("local[*]")
.appName("shell")
.config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") // only needed for dealing with public S3 buckets
.getOrCreate()

// use default provider credential chain "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
Expand Down
6 changes: 6 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
logLevel := Level.Warn

addDependencyTreePlugin

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0")

addSbtPlugin("com.github.sbt" % "sbt-avro" % "3.4.2")

libraryDependencies += "org.apache.avro" % "avro-compiler" % "1.11.1"
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.github.warfox.sparksandbox

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

trait SparkSessionWrapper extends Serializable {

Expand All @@ -11,4 +12,9 @@ trait SparkSessionWrapper extends Serializable {
.getOrCreate()

lazy val sc: SparkContext = spark.sparkContext

}

trait StreamingSessionWrapper extends SparkSessionWrapper {
lazy val ssc = new StreamingContext(sc, Seconds(1))
}
73 changes: 72 additions & 1 deletion src/main/scala/com/github/warfox/sparksandbox/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,82 @@ package com.github.warfox.sparksandbox
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.avro.specific.SpecificDatumReader
import java.nio.ByteBuffer

import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
import org.apache.kafka.common.errors.SerializationException
import org.apache.avro.io.DecoderFactory

import sandbox.avro.Pageview
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericDatumReader

class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
if (genericRecord == null) {
""
} else {
genericRecord.toString
}
}
}

object functions {

def isEven(col: Column): Column = {
col % 2 === lit(0)
}

}
// https://datachef.co/blog/deserialzing-confluent-avro-record-kafka-spark//
def deserializeFromConfluentAvro(bytes: Array[Byte]): Pageview = {
val schemaRegistryUrl = Config.schemaRegistryUrl;
val schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)

val buffer: ByteBuffer = ByteBuffer.wrap(bytes)

// The first byte is magic byte
if (buffer.get != 0)
throw new SerializationException(
"Unknown magic byte!. Expected 0 for Confluent bytes"
)

// The next 2 bytes are schema id
val writeSchemaId = buffer.getInt()
val writerSchema = schemaRegistry.getByID(writeSchemaId)

// we want to deserialize with the last schema
val subject = "pageview" + "-value"
val readerSchemaId = schemaRegistry.getLatestSchemaMetadata(subject).getId
val readerSchema = schemaRegistry.getByID(readerSchemaId)

// this logic is quite specific to Confluent Avro format that uses schema registry for compresion
val length = buffer.limit() - 1 - 4
val start = buffer.position() + buffer.arrayOffset()
val decoder = DecoderFactory.get().binaryDecoder(buffer.array(), start, length, null)

// work with GenericRecord
// val pageViewSchema = Pageview.getClassSchema().toString()
// val datumReader = new GenericDatumReader[GenericRecord](pageViewSchema)

// SpecificDatumReader needs a type
val datumReader = new SpecificDatumReader[Pageview](writerSchema, readerSchema)
datumReader.read(null, decoder)
}

val deserializeAvro: UserDefinedFunction = udf((bytes: Array[Byte]) => {
deserializeFromConfluentAvro(bytes)
})

}

0 comments on commit 3b4de0b

Please sign in to comment.