Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature_counts function to Spark SQL #168

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
GroupBy in FeatureCounts function, add dataset API
kkobylin committed Jun 27, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 20d0ea715c29c2577a5945a509246bb2aea31e25
Original file line number Diff line number Diff line change
@@ -33,11 +33,11 @@ object FeatureCountsFunc {

val ss = SequilaSession(spark)

val query = s"Select fc.*, count(*) AS Counts " +
s" FROM feature_counts('${runConf.readsFile()}', '${runConf.annotations()}') fc " +
s" GROUP BY fc.sample_id, fc.contig, fc.pos_start, fc.pos_end, fc.strand, fc.length"
val query = s"Select fc.*" +
s" FROM feature_counts('${runConf.readsFile()}', '${runConf.annotations()}') fc "

ss.sql(query)
.orderBy("sample_id")
.coalesce(1)
.show()
}
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package org.biodatageeks.sequila.rangejoins.methods.IntervalTree
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SequilaSession.logger
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
@@ -83,6 +83,13 @@ case class FeatureCountsPlan(spark: SparkSession,
})
})
.flatMap(r => r)
.groupBy(x => x)
.mapValues(_.size)
.map(entry => {
entry._1.setInt(6, entry._2)
toUnsafeRow(entry._1)
})

v3
} else {
val genesRddWithIndex = genesRdd.zipWithIndex()
@@ -118,18 +125,30 @@ case class FeatureCountsPlan(spark: SparkSession,
val result = v3.
join(intGenesRdd)
.map(l => l._2._2)
.groupBy(x => x)
.mapValues(_.size)
.map(entry => {
entry._1.setInt(6, entry._2)
toUnsafeRow(entry._1)
})
result
}
}

private def toUnsafeRow(r: InternalRow): InternalRow = {
val proj = UnsafeProjection.create(schema)
proj.apply(r)
}

private def toInternalRow(r: Row): InternalRow = {
InternalRow.fromSeq(Seq(
UTF8String.fromString(r.getString(0)), //Sample
UTF8String.fromString(r.getString(1)), //Contig
r.getString(2).toInt, //Start
r.getString(3).toInt, //End
UTF8String.fromString(r.getString(4)), //Strand
r.getString(3).toInt - r.getString(2).toInt //Length
r.getString(3).toInt - r.getString(2).toInt, //Length
0 //count
))
}

Original file line number Diff line number Diff line change
@@ -24,9 +24,13 @@ object FeatureCountsTemplate {
StructField(s"${Columns.CONTIG}", StringType, nullable = false),
StructField(s"${Columns.START}", IntegerType, nullable = false),
StructField(s"${Columns.END}", IntegerType, nullable = false),
StructField(s"${Columns.STRAND}", StringType, nullable = true),
StructField(s"${Columns.LENGTH}", IntegerType, nullable = false))
StructField(s"${Columns.STRAND}", StringType, nullable = false),
StructField(s"${Columns.LENGTH}", IntegerType, nullable = false),
StructField(s"${Columns.COUNT_REF}", IntegerType, nullable = false))
).toAttributes
new FeatureCountsTemplate(reads, genes, output)
}
}
}

case class FeatureCounts(sample:String, contig:String, start:Int, end:Int,
strand:String, length:Int, countRef: Int)
Original file line number Diff line number Diff line change
@@ -7,9 +7,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, SeQuiLaAnalyzer}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.SequilaDataSourceStrategy
import org.apache.spark.sql.functions.{lit, typedLit}
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{ArrayType, ByteType, MapType, ShortType}
import org.biodatageeks.sequila.pileup.PileupStrategy
import org.biodatageeks.sequila.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
import org.biodatageeks.sequila.utils.{InternalParams, UDFRegister}
@@ -91,6 +90,18 @@ case class SequilaSession(sparkSession: SparkSession) extends SparkSession(spark
new Dataset(sparkSession, PileupTemplate(path, refPath, false, false), Encoders.kryo[Row]).as[Coverage]
}

/**
* Calculate genes count number in reads
*
* @param reads BAM file
* @param genes BED file
* @return genes count as Dataset[FeatureCounts]
*/
def featureCounts(reads: String, genes: String) : Dataset[FeatureCounts] ={

new Dataset(sparkSession, FeatureCountsTemplate(reads, genes), Encoders.kryo[Row]).as[FeatureCounts]
}

/**
* Calculate pileup in block/base format
* +------+---------+-------+------------------+--------+--------+-----------+---------+--------------------+