-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq2_sql.py
48 lines (37 loc) · 1.48 KB
/
q2_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import sys
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, desc , sum
path = "hdfs://master:9000/user/user/data/"
sys.stdout = open("outputs/Query2SQL.txt", "w")
# Create a Spark session
spark = SparkSession.builder.appName("Query2SQL").config("spark.executor.instances", "4").getOrCreate()
CrimeData = spark.read.csv(path+"CrimeData.csv",header=True, inferSchema=True)
startTime = time.time()
CrimeData.createOrReplaceTempView("CrimeDataTable")
#Filter the data for crimes that occurred on the "STREET",categorize and group the crimes by time of day, and order the result in descending order of count
TimeOfDaySQL = """
SELECT *,
CASE
WHEN `TIME OCC` BETWEEN 500 AND 1159 THEN "Morning"
WHEN `TIME OCC` BETWEEN 1200 AND 1659 THEN "Afternoon"
WHEN `TIME OCC` BETWEEN 1700 AND 2059 THEN "Evening"
WHEN `TIME OCC` < 460 OR `TIME OCC` >= 2100 THEN "Night"
ELSE NULL
END AS `Time of Day`
FROM CrimeDataTable
WHERE `Premis Desc` = "STREET"
"""
ResultSQL = """
SELECT `Time of Day`, COUNT(*) AS CrimeCount
FROM ({})
GROUP BY `Time of Day`
ORDER BY CrimeCount DESC
""".format(TimeOfDaySQL)
Result = spark.sql(ResultSQL)
print("Query 2 SQL Execution Time: " + str(time.time() - startTime) + "\n")
print("===== Query 2 SQL Result =====")
Result.show(Result.count(), truncate=False)
sys.stdout.close()
sys.stdout = sys.__stdout__
spark.stop()