Skip to content

Commit f57663f

Browse files
agaszmurlomwiewiorMarek Wiewiorka
authored
pileup implementation (#161)
* new test file added, first step towards impl * new test file added * fixing file names * removing pileup from perf * adding md file for perf test * alts map e2e * pileup first impl * removed extra md file, coliding with normal bam test dataset * throwing away pileup perf test temporarily * 🐛 Fixing problem with reading both bam files in perf tests * Feature/pileup impl optim (#162) * Adding support for reference file path in TVF * 🐎 Some performance improvements with InternalRow conversions, logging and * Adding rdd instrumentation * Adding persistin for speeding up accumulator * Caching cleaned up Contigs and optimize prepareOutput method * aligned fixes and optimizations * getting subsequence instead of base from ref * changed alts representation table->map * explicit conversion to unsafe row * Optimizing UTF8String.fromString with caches * Small optimizations * output compression and refactoring * fixing compressed output * changed alts map representation string encoded in bytes * calculate map & array sizes * storing unsafe map * minor refactoring * cleanup in map projection * altBase retreived from SEQ. * [WIP]Further perf improvements (#167) * MDtag parsing cleanup * Fixing missing reference in baseByteMap * Perf improvement for events sum * Refactoring of sum operations * Improving acc performance * Rmoving groupRefBuilder * Added Instrumentation param to turn it on/off * placeholder for samtools testing * fix to writing short arrays * moving to double in roundUp method * removing unnecessary table * samtools testing piece * samtools converters added * samtools testing framework ctd * conversion fix, when switchig contigs * conversion fix, when switchig contigs * insertion handling in conversions, zero-coverage region fixes. * N in reference handling * 1 base zero cov region fix * handling quality encoding in pileup string * Fixing coverage perf degratatio due DataQualityFuncs.cleanContig (#168) * total bugfixing * total bugfixing * proper changes merged * changind storageLevel * Fixing Jenkins tests OOM * half of changes applied * Openjdk and graalvm support (#170) * Fixing Oracle JDK dep * Adding fallback if ObjectSizeCalculator.getObjectSize not available Co-authored-by: Marek Wiewiorka <[email protected]> * minor fixes * using object methods instead of functions and storage level fix * attempt nr 436786534765987456 * function -> method Co-authored-by: Marek Wiewiórka <[email protected]> Co-authored-by: Marek Wiewiorka <[email protected]>
1 parent 51d05ea commit f57663f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+27186
-417
lines changed

Jenkinsfile

+9-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ author = ""
1212
message = ""
1313
channel = "#project-sequila"
1414

15+
sbtOpts = "'-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xmx4G'"
16+
1517

1618
def getGitAuthor = {
1719
def commit = sh(returnStdout: true, script: 'git rev-parse HEAD')
@@ -97,19 +99,17 @@ node {
9799
stage('Test Scala code') {
98100

99101
echo 'Testing Scala code....'
100-
sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt clean test"
101-
102-
102+
sh "SBT_OPTS=${sbtOpts} ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt clean test"
103103

104104

105105
}
106106

107107
stage('Package scala code') {
108108

109-
echo 'Building Scala code....'
110-
sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt package"
109+
//echo 'Building Scala code....'
110+
//sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt package"
111111
echo "Generating documentation"
112-
sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt doc"
112+
sh "SBT_OPTS=${sbtOpts} ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt doc"
113113
//publishHTML([allowMissing: false, alwaysLinkToLastBuild: true, keepAll: false, reportDir: 'target/scala-2.11/api/', reportFiles: 'package.html', reportName: 'Scala Doc', reportTitles: ''])
114114

115115
}
@@ -119,15 +119,15 @@ node {
119119

120120
echo "branch: ${env.BRANCH_NAME}"
121121
echo 'Publishing to ZSI-BIO snapshots repository....'
122-
sh "SBT_OPTS='-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xmx2G' ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt 'set test in publish := {}' publish"
122+
sh "SBT_OPTS=${sbtOpts} ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt 'set test in publish := {}' publish"
123123

124124

125125
}
126126

127127
stage('Code stats') {
128128

129129
echo 'Gathering code stats....'
130-
sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt stats"
130+
sh "SBT_OPTS=${sbtOpts} ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt stats"
131131

132132
}
133133
stage('Readthedocs') {
@@ -146,7 +146,7 @@ node {
146146
// }
147147

148148
stage('Performance testing') {
149-
sh "${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt -J-Xms2048m -J-Xmx2048m 'set test in assembly := {}' assembly"
149+
sh "SBT_OPTS=${sbtOpts} ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt 'set test in assembly := {}' assembly"
150150
sh "ssh bdg-perf@cdh00 rm -rf /tmp/bdg-sequila-assembly-*.jar"
151151
sh "scp target/scala-2.11/bdg-sequila-assembly-*.jar bdg-perf@cdh00:/tmp"
152152
sh "scp performance/bdg_perf/bdg_perf_sequila.scala bdg-perf@cdh00:/tmp"

build.sbt

+10-6
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.25.0"
2727
libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.25.0"
2828
libraryDependencies += "org.scala-lang" % "scala-library" % "2.11.8"
2929
libraryDependencies += "org.rogach" %% "scallop" % "3.1.2"
30-
libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.11.1"
31-
libraryDependencies += "org.hammerlab.bdg-utils" %% "cli" % "0.3.0"
30+
//libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.11.1"
31+
//libraryDependencies += "org.hammerlab.bdg-utils" %% "cli" %
32+
libraryDependencies += "org.bdgenomics.utils" % "utils-metrics-spark2_2.11" % "0.2.16"
3233
libraryDependencies += "com.github.samtools" % "htsjdk" % "2.19.0"
3334
libraryDependencies += "ch.cern.sparkmeasure" %% "spark-measure" % "0.13" excludeAll (ExclusionRule("org.apache.hadoop"))
3435
libraryDependencies += "org.broadinstitute" % "gatk-native-bindings" % "1.0.0" excludeAll (ExclusionRule("org.apache.hadoop"))
@@ -55,13 +56,15 @@ avroSpecificScalaSource in Compile := new java.io.File("src/main/org/biodatageek
5556
avroSpecificScalaSource in Test := new java.io.File("src/test/org/biodatageeks/formats")
5657

5758

58-
fork := true
59-
fork in Test := true
59+
fork := false
60+
fork in Test := false
61+
parallelExecution in Test := false
62+
6063
javaOptions in Test ++= Seq(
6164
"-Dlog4j.debug=false",
6265
"-Dlog4j.configuration=log4j.properties")
6366

64-
javaOptions ++= Seq("-Xms512M", "-Xmx8192M", "-XX:+CMSClassUnloadingEnabled")
67+
javaOptions ++= Seq("-Xms512M", "-Xmx8192M", "-XX:+CMSClassUnloadingEnabled" , "-Dlog4j.configuration=log4j.properties")
6568

6669
//fix for using with hdp warehouse connector
6770
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")
@@ -70,8 +73,8 @@ outputStrategy := Some(StdoutOutput)
7073

7174

7275
resolvers ++= Seq(
73-
"Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven",
7476
"zsibio-snapshots" at "http://zsibio.ii.pw.edu.pl/nexus/repository/maven-snapshots/",
77+
"Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven",
7578
"spring" at "http://repo.spring.io/libs-milestone/",
7679
"Cloudera" at "https://repository.cloudera.com/content/repositories/releases/",
7780
"Hortonworks" at "http://repo.hortonworks.com/content/repositories/releases/",
@@ -92,6 +95,7 @@ assemblyMergeStrategy in assembly := {
9295
case PathList("com", xs@_*) => MergeStrategy.first
9396
case PathList("shadeio", xs@_*) => MergeStrategy.first
9497
case PathList("au", xs@_*) => MergeStrategy.first
98+
case PathList("htsjdk", xs@_*) => MergeStrategy.first
9599
case ("META-INF/org/apache/logging/log4j/core/config/plugins/Log4j2Plugins.dat") => MergeStrategy.first
96100
case ("images/ant_logo_large.gif") => MergeStrategy.first
97101
case "overview.html" => MergeStrategy.rename

performance/bdg_perf/bdg_perf_sequila.scala

+18-4
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@ SequilaRegister.register(ss)
1616

1717

1818

19-
val BAM_DIR = s"${sys.env("BGD_PERF_SEQ_BAM_DIR")}/*.bam"
19+
val BAM_DIR = s"${sys.env("BGD_PERF_SEQ_BAM_DIR")}/*wes.bam"
20+
val BAM_MD_DIR = s"${sys.env("BGD_PERF_SEQ_BAM_DIR")}/md/*wes.md.bam"
21+
2022
val CRAM_DIR = s"${sys.env("BGD_PERF_SEQ_BAM_DIR")}/*.cram"
2123
val FASTA_PATH = s"${sys.env("BGD_PERF_SEQ_BAM_DIR")}/Homo_sapiens_assembly18.fasta"
2224
val bamTable = "reads_bam"
25+
val bamMdTable="reads_md_bam"
2326
val cramTable = "reads_cram"
2427

2528
//preparation
29+
ss.sql(s"DROP TABLE IF EXISTS ${bamTable}")
30+
ss.sql(s"DROP TABLE IF EXISTS ${cramTable}")
31+
ss.sql(s"DROP TABLE IF EXISTS ${bamMdTable}")
32+
2633
ss.sql(s"""
2734
CREATE TABLE IF NOT EXISTS ${bamTable}
2835
USING org.biodatageeks.sequila.datasources.BAM.BAMDataSource
@@ -33,6 +40,13 @@ CREATE TABLE IF NOT EXISTS ${cramTable}
3340
USING org.biodatageeks.sequila.datasources.BAM.BAMDataSource
3441
OPTIONS(path '${CRAM_DIR}')""")
3542

43+
ss.sql(s"""
44+
CREATE TABLE IF NOT EXISTS ${bamMdTable}
45+
USING org.biodatageeks.sequila.datasources.BAM.BAMDataSource
46+
OPTIONS(path '${BAM_MD_DIR}')""")
47+
48+
49+
3650
//targets
3751
val bedPath="/data/granges/tgp_exome_hg18.bed"
3852
ss.sql(s"""
@@ -60,8 +74,8 @@ val queries = Array(
6074
""".stripMargin),
6175
BDGQuery("bdg_cov_window_fix_length_500_count_NA12878_BAM",s"SELECT COUNT(*) FROM bdg_coverage ('${bamTable}','NA12878', 'bases','500')"),
6276
BDGQuery("bdg_cov_window_fix_length_500_count_NA12878_CRAM",s"SELECT COUNT(*) FROM bdg_coverage ('${cramTable}','NA12878', 'bases','500')"),
63-
BDGQuery("bdg_cov_bases_NA12878_BAM",s"SELECT COUNT(*) FROM bdg_coverage ('${bamTable}','NA12878', 'bases')"),
64-
BDGQuery("bdg_pileup_cov_only_NA12878",s"SELECT COUNT(*) FROM pileup ('${bamTable}')")
77+
BDGQuery("bdg_cov_bases_NA12878_BAM",s"SELECT COUNT(*) FROM bdg_coverage ('${bamTable}','NA12878', 'bases')")
78+
// BDGQuery("bdg_pileup_cov_only_NA12878",s"SELECT COUNT(*) FROM pileup ('${bamMdTable}')")
6579

6680
)
6781

@@ -70,7 +84,7 @@ BDGPerfRunner.run(ss,queries)
7084

7185
//cleanup
7286
ss.sql(s"""DROP TABLE IF EXISTS ${bamTable}""")
73-
ss.sql(s"""DROP TABLE IF EXISTS ${cramTable}""")
87+
ss.sql(s"""DROP TABLE IF EXISTS ${cramTable}""")
7488

7589
ss.sql(s"""DROP TABLE IF EXISTS targets""")
7690
System.exit(0)

src/main/resources/log4j.properties

+7-2
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:
2626
# log level for this class is used to overwrite the root logger's log level, so that
2727
# the user can have different defaults for the shell and regular Spark apps.
2828
log4j.logger.org.apache.spark.repl.Main=WARN
29+
log4j.logger.org.apache.spark=WARN
30+
31+
# Silence akka remoting
32+
log4j.logger.Remoting=WARN
2933

3034
# Settings to quiet third party logs that are too verbose
3135
log4j.logger.org.spark_project.jetty=WARN
3236
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
33-
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
34-
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
37+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
38+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
3539
log4j.logger.org.apache.parquet=ERROR
3640
log4j.logger.parquet=ERROR
3741

@@ -40,3 +44,4 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
4044
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
4145

4246
log4j.logger.org.biodatageeks.sequila.pileup=ERROR
47+
log4j.logger.org.biodatageeks.sequila.coverage=ERROR
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package htsjdk.samtools;
2+
3+
public class SAMRecordHelper {
4+
public static void eagerDecode(SAMRecord record) {
5+
record.eagerDecode();
6+
}
7+
}

src/main/scala/org/biodatageeks/sequila/apps/FeatureCounts.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import htsjdk.samtools.ValidationStringency
44
import org.apache.hadoop.io.LongWritable
55
import org.apache.spark.sql.SparkSession
66
import org.biodatageeks.sequila.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
7+
import org.biodatageeks.sequila.utils.Columns
78
import org.rogach.scallop.ScallopConf
89
import org.seqdoop.hadoop_bam.{BAMInputFormat, SAMRecordWritable}
910
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
1011

1112
object FeatureCounts {
12-
case class Region(contigName:String,start:Int,end:Int)
13+
case class Region(contig:String, pos_start:Int, pos_end:Int)
1314
class RunConf(args:Array[String]) extends ScallopConf(args){
1415

1516
val output = opt[String](required = true)
@@ -69,7 +70,9 @@ object FeatureCounts {
6970
.option("header", "true")
7071
.option("delimiter", "\t")
7172
.csv(runConf.annotations())
72-
targets.createOrReplaceTempView("targets")
73+
targets
74+
.withColumnRenamed("contigName", Columns.CONTIG)
75+
.createOrReplaceTempView("targets")
7376

7477
spark.sql(query)
7578
.orderBy("GeneId")

src/main/scala/org/biodatageeks/sequila/coverage/CoverageMethods.scala

+18-11
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,17 @@ object CoverageMethodsMos {
7575
new mutable.HashMap[String, (Array[Short], Int, Int, Int)]()
7676
val contigStartStopPartMap = new mutable.HashMap[String, Int]()
7777
val cigarMap = new mutable.HashMap[String, Int]()
78+
var contigIter = ""
79+
var contigCleanIter = ""
7880
while (p.hasNext) {
7981
val r = p.next()
8082
val read = r
81-
val contig = DataQualityFuncs.cleanContig(read.getContig)
83+
val contigIn = read.getContig
84+
if(contigIn != contigIter) {
85+
contigCleanIter = DataQualityFuncs.cleanContig(contigIn)
86+
contigIter = contigIn
87+
}
88+
val contig = contigCleanIter
8289

8390
// default value of filterFlag 1796:
8491
// * read unmapped (0x4)
@@ -196,7 +203,7 @@ object CoverageMethodsMos {
196203
result: Array[AbstractCovRecord]): Int = {
197204
var indexShift = ind
198205
if (allPos && maxPosition == contigMax) {
199-
logger.debug(
206+
if (logger.isDebugEnabled()) logger.debug(
200207
s"Adding last block for index: $indexShift, start: $maxPosition end: $contigLength, cov: 0")
201208
if (blocksResult) {
202209
result(indexShift) = CovRecord(contig, maxPosition, contigLength, 0)
@@ -265,8 +272,8 @@ object CoverageMethodsMos {
265272
val result = new Array[AbstractCovRecord](
266273
firstBlockMaxLength + covArrayLength + lastBlockMaxLength)
267274

268-
logger.debug(s"$contig shift $posShift")
269-
logger.debug(
275+
if (logger.isDebugEnabled()) logger.debug(s"$contig shift $posShift")
276+
if (logger.isDebugEnabled()) logger.debug(
270277
s"size: ${firstBlockMaxLength + covArrayLength + lastBlockMaxLength}")
271278

272279
var i = 0
@@ -287,7 +294,7 @@ object CoverageMethodsMos {
287294
cov += r._2._1(i)
288295

289296
if (!blocksResult) { // per-base output
290-
if (i != covArrayLength - 1) { //HACK. otherwise we get doubled CovRecords for partition boundary index
297+
if (i != covArrayLength - 1) {
291298
if (allPos || cov != 0) { // show all positions or only non-zero
292299
result(ind) =
293300
CovRecord(contig, i + posShift, i + posShift, cov.toShort)
@@ -368,16 +375,16 @@ object CoverageMethodsMos {
368375
b: Broadcast[UpdateStruct],
369376
covEvents: RDD[(String, (Array[Short], Int, Int, Int, Int))])
370377
: RDD[(String, (Array[Short], Int, Int, Int))] = {
371-
logger.debug(s"### covEvents count ${covEvents.count()}")
378+
if (logger.isDebugEnabled()) logger.debug(s"### covEvents count ${covEvents.count()}")
372379

373380
val newCovEvents = covEvents.map { c =>
374381
{
375-
logger.debug(s"updating partition ${c._1}, ${c._2._2}")
382+
if (logger.isDebugEnabled()) logger.debug(s"updating partition ${c._1}, ${c._2._2}")
376383
val upd = b.value.upd
377384
val shrink = b.value.shrink
378385
val (contig, (eventsArray, minPos, maxPos, contigLength, _)) = c // to REFACTOR
379386
var eventsArrMutable = eventsArray
380-
logger.debug(
387+
if (logger.isDebugEnabled()) logger.debug(
381388
s"#### Update Partition: $contig, min=$minPos max=$maxPos len:${eventsArray.length} span: ${maxPos - minPos} ")
382389

383390
val updArray =
@@ -392,7 +399,7 @@ object CoverageMethodsMos {
392399
}
393400

394401
var i = 0
395-
logger.debug(s"$contig, min=$minPos max=$maxPos updating: ${eventsArrMutable
402+
if (logger.isDebugEnabled()) logger.debug(s"$contig, min=$minPos max=$maxPos updating: ${eventsArrMutable
396403
.take(10)
397404
.mkString(",")} with ${overlapArray.take(10).mkString(",")} and $covSum ")
398405
eventsArrMutable(i) = (eventsArrMutable(i) + covSum).toShort // add cumSum to zeroth element
@@ -409,7 +416,7 @@ object CoverageMethodsMos {
409416
throw e
410417
}
411418
}
412-
logger.debug(
419+
if (logger.isDebugEnabled()) logger.debug(
413420
s"$contig, min=$minPos max=$maxPos Updated array ${eventsArrMutable.take(10).mkString(",")}")
414421
eventsArrMutable
415422
case None =>
@@ -424,7 +431,7 @@ object CoverageMethodsMos {
424431
updArray.take(len)
425432
case None => updArray
426433
}
427-
logger.debug(
434+
if (logger.isDebugEnabled()) logger.debug(
428435
s"#### End of Update Partition: $contig, min=$minPos max=$maxPos len:${eventsArray.length} span: ${maxPos - minPos}")
429436
(contig, (shrinkArray, minPos, maxPos, contigLength))
430437
}

0 commit comments

Comments
 (0)