-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq2_rdd.py
39 lines (28 loc) · 1.32 KB
/
q2_rdd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import Row
import sys
import time
from pyspark.sql import SparkSession
def TimeSegments(hour):
if 500 <= hour <= 1159:
return "Morning"
elif 1200 <= hour <= 1659:
return "Afternoon"
elif 1700 <= hour <= 2059:
return "Evening"
else:
return "Night"
path = "hdfs://master:9000/user/user/"
sys.stdout = open("outputs/Query2RDD.txt", "w")
spark = SparkSession.builder.appName("Query2RDD").config("spark.executor.instances", "4").getOrCreate()
CrimeData = spark.read.csv(path+"CrimeData.csv",header=True, inferSchema=True)
startTime = time.time()
#Filter the data for crimes that occurred on the "STREET",categorize and group the crimes by time of day, and order the result in descending order of count
# In the RDD version, we use map and reduceByKey to achieve the same result as the dataframe version
CrimeDataRDD = CrimeData.rdd.filter(lambda x: x['Premis Desc'] == "STREET")
Result = CrimeDataRDD.map(lambda row: (TimeSegments(int(row['TIME OCC'])), 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)
print("Query 2 RDD Execution Time: " + str(time.time() - startTime) + "\n")
print("===== Query 2 RDD Result =====")
Result.toDF(["Time of Day","CrimeCount"]).show(Result.count(), truncate=False)
sys.stdout.close()
sys.stdout = sys.__stdout__
spark.stop()