Skip to content

Commit e1d33c9

Browse files
Initial Commit
1 parent 6aff376 commit e1d33c9

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

Diff for: 19-OuterJoinDemo/build.sbt

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
name := "OuterJoinDemo"
2+
organization := "guru.learningjournal"
3+
version := "0.1"
4+
scalaVersion := "2.12.10"
5+
6+
autoScalaLibrary := false
7+
val sparkVersion = "3.0.0"
8+
9+
val sparkDependencies = Seq(
10+
"org.apache.spark" %% "spark-core" % sparkVersion,
11+
"org.apache.spark" %% "spark-sql" % sparkVersion,
12+
"org.apache.spark" %% "spark-hive" % sparkVersion
13+
)
14+
15+
libraryDependencies ++= sparkDependencies

Diff for: 19-OuterJoinDemo/log4j.properties

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#define following in Java System
18+
# -Dlog4j.configuration=file:log4j.properties
19+
# -Dlogfile.name=hello-spark
20+
# -Dspark.yarn.app.container.log.dir=app-logs
21+
log4j.appender.file.ImmediateFlush=true
22+
log4j.appender.file.Append=false
23+
log4j.appender.file.MaxFileSize=500MB
24+
log4j.appender.file.MaxBackupIndex=2
25+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
26+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
27+
28+
29+
# Recommendations from Spark template
30+
log4j.logger.org.apache.hadoop=ERROR
31+
log4j.logger.org.apache.spark.repl.Main=WARN
32+
log4j.logger.org.spark_project.jetty=WARN
33+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
34+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
35+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
36+
log4j.logger.org.apache.parquet=ERROR
37+
log4j.logger.parquet=ERROR
38+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
39+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
40+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package guru.learningjournal.spark.examples
2+
3+
import org.apache.log4j.Logger
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.functions._
6+
7+
object OuterJoinDemo extends Serializable{
8+
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
9+
10+
def main(args: Array[String]): Unit = {
11+
12+
val spark = SparkSession.builder()
13+
.appName("Spark Join Demo")
14+
.master("local[3]")
15+
.getOrCreate()
16+
17+
val ordersList = List(
18+
("01", "02", 350, 1),
19+
("01", "04", 580, 1),
20+
("01", "07", 320, 2),
21+
("02", "03", 450, 1),
22+
("02", "06", 220, 1),
23+
("03", "01", 195, 1),
24+
("04", "09", 270, 3),
25+
("04", "08", 410, 2),
26+
("05", "02", 350, 1)
27+
)
28+
val orderDF = spark.createDataFrame(ordersList).toDF("order_id", "prod_id", "unit_price", "qty")
29+
30+
val productList = List(
31+
("01", "Scroll Mouse", 250, 20),
32+
("02", "Optical Mouse", 350, 20),
33+
("03", "Wireless Mouse", 450, 50),
34+
("04", "Wireless Keyboard", 580, 50),
35+
("05", "Standard Keyboard", 360, 10),
36+
("06", "16 GB Flash Storage", 240, 100),
37+
("07", "32 GB Flash Storage", 320, 50),
38+
("08", "64 GB Flash Storage", 430, 25)
39+
)
40+
val productDF = spark.createDataFrame(productList).toDF("prod_id", "prod_name", "list_price", "qty")
41+
42+
val joinExpr = orderDF.col("prod_id") === productDF.col("prod_id")
43+
val joinType = "left"
44+
45+
val productRenamedDF = productDF.withColumnRenamed("qty", "reorder_qty")
46+
47+
orderDF.join(productRenamedDF, joinExpr, joinType)
48+
.drop(productDF.col("prod_id"))
49+
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty")
50+
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)"))
51+
.withColumn("list_price", expr("coalesce(list_price, unit_price)"))
52+
.sort("order_id")
53+
.show()
54+
}
55+
56+
}

0 commit comments

Comments
 (0)