Skip to content

Commit

Permalink
Merge pull request #6 from rf972/master
Browse files Browse the repository at this point in the history
Parquet support
  • Loading branch information
rfoley-fw authored Aug 9, 2021
2 parents 28788eb + 0973033 commit 47840f7
Show file tree
Hide file tree
Showing 17 changed files with 338 additions and 53 deletions.
2 changes: 1 addition & 1 deletion benchmark/tpch/build_tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ fi
if [ ! -d tpch-spark/build ]; then
mkdir tpch-spark/build
fi
SPARK_JAR_DIR=../../spark/build/spark-3.2.0/jars/
SPARK_JAR_DIR=../../spark/build/spark-3.3.0/jars/
if [ ! -d $SPARK_JAR_DIR ]; then
echo "Please build spark ($SPARK_JAR_DIR) before building tpch"
exit 1
Expand Down
8 changes: 8 additions & 0 deletions benchmark/tpch/diff_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import glob
import os
import shutil

class DiffTpch:
def __init__(self):
Expand All @@ -22,6 +23,8 @@ def parseArgs(self):
help="enable debug output")
parser.add_argument("--meld", action="store_true",
help="launch meld inline when files differ")
parser.add_argument("--prompt_resolve", action="store_true",
help="prompt to resolve difference by copying to baseline.")
parser.add_argument("--terse", action="store_true",
help="only report differences")
parser.add_argument("--results", "-r",
Expand Down Expand Up @@ -128,6 +131,11 @@ def runDiff(self, baseRootPath, compareRoot, testList):
subprocess.call("meld {} {}".format(baseFile, compareFile), shell=True)
print("meld {} {}".format(baseFile, compareFile))
#print("Result of diff {} {} was {}".format(d1File, compareFile, rc))
if rc != 0 and self._args.prompt_resolve:
resolve = input("resolve this difference? (y/n)")
if resolve == 'y' or resolve == 'Y':
print("Copy {} -> {}".format(compareFile, basePath))
shutil.copy(compareFile, basePath)
elif compareFile != None:
# They matched, keep track of it.
self._successCount += 1
Expand Down
176 changes: 176 additions & 0 deletions benchmark/tpch/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
from __future__ import print_function

import argparse
import sys
import base64

import pyarrow as pa
import pandas as pd
import pyarrow.parquet as pq


def get_metadata(parquet_file):
r = pq.ParquetFile(parquet_file)
return r.metadata

def get_row_group_metadata(parquet_file, group):
r = pq.ParquetFile(parquet_file)
return r.metadata.row_group(group)

def get_row_group_col_metadata(parquet_file, group, col):
r = pq.ParquetFile(parquet_file)
return r.metadata.row_group(group).column(col)

def print_key_value_metadata(parquet_file):
meta = pq.read_metadata(parquet_file)
#print("\n{}\n".format(meta.metadata.keys()))
for key in meta.metadata.keys():
print("\n{}:\n {}".format(key, meta.metadata[key]))

#decoded_schema = base64.b64decode(meta.metadata[b"ARROW:schema"])
#schema = pa.ipc.read_schema(pa.BufferReader(decoded_schema))
#print(schema)

def get_schema(parquet_file):
r = pq.ParquetFile(parquet_file)
return r.schema


def get_data(pq_table, n, head=True):
data = pq_table.to_pandas()
if head:
rows = data.head(n)
else:
rows = data.tail(n)
return rows

def write_multiplied_table(pq_table, output_prefix):
df = pq_table.to_pandas()
add_multiplier = 1
prev_df = df
for i in range(0, add_multiplier):
print("append {} ".format(i))
new_df = prev_df.append(df)
prev_df = new_df
print("new_df rows {}".format(len(new_df.index)))
new_table = pa.Table.from_pandas(new_df)
pq.write_table(new_table, output_prefix + "ex_large.snappy.parquet", data_page_size=100*1024*1024*1024)

def write_table(pq_table, output_prefix):
df = pq_table.to_pandas()
rows = len(df.index)
print("rows {}".format(rows))
print("output file " + output_prefix)
new_df = df[:int(7)]
new_table = pa.Table.from_pandas(new_df)
pq.write_table(new_table, output_prefix + ".parquet.snappy", data_page_size=1024*1024*1024)
#pq.write_table(pq_table, output_prefix + "full.snappy.parquet", data_page_size=1024*1024*1024)

def query_table(pq_table, query):
df = pq_table.to_pandas()
rows = len(df.index)
print("rows {}".format(rows))
print("output: " + query)
new_df = df[['l_quantity','l_extendedprice','l_discount','l_shipdate']]
new_table = pa.Table.from_pandas(new_df)
pq.write_table(new_table, "query" + ".parquet.snappy", data_page_size=1024*1024*1024)

def main(cmd_args=sys.argv, skip=False):
"""
Main entry point with CLI arguments
:param cmd_args: args passed from CLI
:param skip: bool, whether to skip init_args call or not
:return: string stdout
"""
if not skip:
cmd_args = init_args()

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

pq_table = pq.read_table(cmd_args.file)
if cmd_args.write:
#write_multiplied_table(pq_table, cmd_args.write)
write_table(pq_table, cmd_args.write)
elif cmd_args.query:
query_table(pq_table, cmd_args.query)
elif cmd_args.head:
print(get_data(pq_table, cmd_args.head))
elif cmd_args.tail:
print(get_data(pq_table, cmd_args.tail, head=False))
elif cmd_args.count:
print(len(pq_table.to_pandas().index))
elif cmd_args.schema:
print("\n # Schema \n", get_schema(cmd_args.file))
else:
file_md = get_metadata(cmd_args.file)
print("\n # Metadata \n", file_md)
for i in range(0, file_md.num_row_groups):
rg_md = get_row_group_metadata(cmd_args.file, i)
print("\n # Row Group {}\n".format(i), rg_md)
if cmd_args.verbose:
for c in range(0, rg_md.num_columns):
col_md = get_row_group_col_metadata(cmd_args.file, i, c)
print("\n Column {}\n".format(c), col_md)

print_key_value_metadata(cmd_args.file)

def init_args():
parser = argparse.ArgumentParser(
description="Command line (CLI) tool to inspect Apache Parquet files on the go",
usage="usage: parq file [-s [SCHEMA] | --head [HEAD] | --tail [TAIL] | -c [COUNT]]"
)

group = parser.add_mutually_exclusive_group()

group.add_argument("-s",
"--schema",
nargs="?",
type=bool,
const=True,
help="get schema information",
)

group.add_argument("--head",
nargs="?",
type=int,
const=10,
help="get first N rows from file"
)

group.add_argument("--tail",
nargs="?",
type=int,
const=10,
help="get last N rows from file"
)

group.add_argument("-c",
"--count",
nargs="?",
type=bool,
const=True,
help="get total rows count",
)

group.add_argument("-w", "--write", default="",
help="output file")

group.add_argument("-q", "--query", default="",
help="run a query on a dataframe")

group.add_argument("-v", "--verbose", default=False, action="store_true",
help="more detailed output")

parser.add_argument("file",
type=argparse.FileType('rb'),
help='Parquet file')

cmd_args = parser.parse_args()

return cmd_args


if __name__ == '__main__':
args = init_args()
main(args, skip=True)
14 changes: 13 additions & 1 deletion benchmark/tpch/run_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self):
self._debug = False
self._continueOnError = False
self._startTime = time.time()
self._test_failures = 0

def parseTestList(self):
testItems = self._args.tests.split(",")
Expand Down Expand Up @@ -110,11 +111,19 @@ def restartAll(self):
def runCmd(self, cmd):
(rc, output) = self.runCommand(cmd, show_cmd=True, enable_stdout=False)
if rc != 0:
print("status {} from: {}".format(rc, cmd))
self._test_failures += 1
failure = "test failed with status {} cmd {}".format(rc, cmd)
self._testResults.append(failure)
print(failure)
lineNum = 0
for line in output:
if lineNum > 0:
lineNum += 1
if rc == 0 and (("TPCH Failed" in line) or ("FAILED" in line)):
self._test_failures += 1
failure = "test failed cmd: {}".format(cmd)
print(failure)
self._testResults.append(failure)
if "Test Results" in line:
lineNum += 1
if lineNum == 4:
Expand Down Expand Up @@ -144,8 +153,11 @@ def runTests(self):
for t in self._testList:
cmd = "./run_tpch.sh -w {} -t {} {}".format(w, t, self._args.args)
self.runCmd(cmd)
print("")
self.showResults()
self.displayElapsed()
if (self._test_failures > 0):
print("test failures: {}".format(self._test_failures))

def run(self):
self.parseArgs()
Expand Down
14 changes: 9 additions & 5 deletions benchmark/tpch/run_tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ if [ ${DEBUG} == "YES" ]; then
--class main.scala.TpchQuery \
--conf "spark.jars.ivy=/build/ivy" \
--conf "spark.driver.maxResultSize=20g" \
--conf "spark.driver.memory=20g" \
--conf "spark.driver.extraJavaOptions=-classpath /conf/:/build/spark-3.2.0/jars/*:/examples/scala/target/scala-2.12/ -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=172.18.0.4:5005" \
--conf "spark.driver.memory=2g" \
--conf "spark.executor.memory=2g" \
--conf "spark.driver.extraJavaOptions=-classpath /conf/:/build/spark-3.3.0/jars/*:/examples/scala/target/scala-2.12/ -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=172.18.0.3:5005" \
--packages com.github.scopt:scopt_2.12:4.0.0-RC2,ch.cern.sparkmeasure:spark-measure_2.12:0.17 \
--jars /build/downloads/spark-sql-macros_2.12.10_0.1.0-SNAPSHOT.jar,/dikeHDFS/client/ndp-hdfs/target/ndp-hdfs-1.0.jar,/build/extra_jars/*,/pushdown-datasource/target/scala-2.12/pushdown-datasource_2.12-0.1.0.jar,/build/downloads/h2-1.4.200.jar \
/tpch/tpch-spark/target/scala-2.12/spark-tpc-h-queries_2.12-1.0.jar $@ --workers ${WORKERS}
#--packages com.github.scopt:scopt_2.12:4.0.0-RC2,com.amazonaws:aws-java-sdk:1.11.853,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.commons:commons-csv:1.8 \
# --conf "spark.sql.parquet.enableVectorizedReader=false" \
else
docker exec -it sparkmaster spark-submit --master local[$WORKERS] \
--conf "ivy.shared.default.root=/build/ivy_jars" \
Expand All @@ -51,12 +53,13 @@ else
--conf "spark.jars.ivy=/build/ivy" \
--conf "spark.driver.maxResultSize=20g" \
--conf "spark.sql.broadcastTimeout=10000000" \
--conf "spark.driver.memory=20g" \
--conf "spark.driver.memory=32g" \
--conf "spark.executor.memory=32g" \
--conf "spark.eventLog.enabled=true" \
--conf "spark.eventLog.dir=/build/spark-events" \
--conf "spark.driver.extraJavaOptions=-classpath /conf/:/build/spark-3.2.0/jars/*:/examples/scala/target/scala-2.12/" \
--conf "spark.driver.extraJavaOptions=-classpath /conf/:/build/spark-3.3.0/jars/*:/examples/scala/target/scala-2.12/" \
--packages com.github.scopt:scopt_2.12:4.0.0-RC2,ch.cern.sparkmeasure:spark-measure_2.12:0.17 \
--jars /build/downloads/spark-sql-macros_2.12.10_0.1.0-SNAPSHOT.jar,/dikeHDFS/client/ndp-hdfs/target/ndp-hdfs-1.0.jar,/build/extra_jars/*,/pushdown-datasource/target/scala-2.12/pushdown-datasource_2.12-0.1.0.jar,/build/downloads/h2-1.4.200.jar\
--jars /build/downloads/spark-sql-macros_2.12.10_0.1.0-SNAPSHOT.jar,/dikeHDFS/client/ndp-hdfs/target/ndp-hdfs-1.0.jar,/build/extra_jars/*,/pushdown-datasource/target/scala-2.12/pushdown-datasource_2.12-0.1.0.jar,/build/downloads/h2-1.4.200.jar \
/tpch/tpch-spark/target/scala-2.12/spark-tpc-h-queries_2.12-1.0.jar $@ --workers ${WORKERS}
fi

Expand All @@ -75,4 +78,5 @@ if [ $STATUS -eq 0 ];then
echo "TPCH Successful"
else
echo "TPCH Failed"
exit $STATUS
fi
Loading

0 comments on commit 47840f7

Please sign in to comment.