Skip to content

Commit

Permalink
spark: add comments to binding #1
Browse files Browse the repository at this point in the history
  • Loading branch information
roldanx committed Nov 29, 2018
1 parent bfdc97a commit ab5fa85
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 59 deletions.
19 changes: 4 additions & 15 deletions oskar-spark/src/main/python/pyoskar/analysis.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@

import sys
from pyspark import keyword_only
from pyspark.ml.param.shared import *
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaTransformer

if sys.version > '3':
basestring = str

from pyspark import since, keyword_only, SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.ml.linalg import _convert_to_vector
from pyspark.ml.wrapper import JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, _jvm
from pyspark.ml.common import inherit_doc
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame

DEFAULT_COHORT = "ALL"


Expand Down Expand Up @@ -453,10 +446,6 @@ def setMissingAsReference(self, value):
return self._set(missingAsReference=value)






class TdtTransformer(AbstractTransformer):
studyId = Param(Params._dummy(), "studyId", "", typeConverter=TypeConverters.toString)
phenotype = Param(Params._dummy(), "phenotype", "", typeConverter=TypeConverters.toString)
Expand Down
132 changes: 116 additions & 16 deletions oskar-spark/src/main/python/pyoskar/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json

from pyspark.ml.wrapper import JavaWrapper
from pyspark.sql.dataframe import DataFrame

from pyoskar.analysis import *

__all__ = ['Oskar']
Expand All @@ -16,51 +19,77 @@ def __init__(self, spark):

def load(self, file_path):
"""
:type file_path: str
:param file_path:
:return:
"""
df = self._call_java("load", file_path)
return df

def chiSquare(self, df, studyId, phenotype):
"""
:type df: DataFrame
:param df:
:param studyId:
:param phenotype:
:return:
"""
return ChiSquareTransformer(studyId=studyId, phenotype=phenotype).transform(df)

def compoundHeterozygote(self, df, father, mother, child, studyId=None, missingGenotypeAsReference=None):
"""
:type df: DataFrame
:param df:
:param father:
:param mother:
:param child:
:param studyId:
:param missingGenotypeAsReference:
:return:
"""
return CompoundHeterozigoteTransformer(father=father, mother=mother, child=child, studyId=studyId,
missingGenotypeAsReference=missingGenotypeAsReference).transform(df)

def facet(self, df, facet):
"""
:type df: DataFrame
:param df:
:param facet:
:return:
"""
return FacetTransformer(facet=facet).transform(df)

def fisher(self, df, studyId, phenotype):
"""
:type df: DataFrame
:param df:
:param studyId:
:param phenotype:
:return:
"""
return FisherTransformer(studyId=studyId, phenotype=phenotype).transform(df)

def hardyWeinberg(self, df, studyId=None):
"""
:type df: DataFrame
:param df: Original dataframe
:type studyId: str
:param studyId:
:rtype: DataFrame
:return: Transformed dataframe
"""
return HardyWeinbergTransformer(studyId=studyId).transform(df)

def histogram(self, df, inputCol, step=None):
"""
:type df: DataFrame
:param df:
:param inputCol:
:param step:
:return:
"""
return HistogramTransformer(inputCol=inputCol, step=step).transform(df)

Expand All @@ -69,15 +98,21 @@ def ibs(self, df, samples=None, skipMultiAllelic=None, skipReference=None, numPa
Calculates the Identity By State.
:type df: DataFrame
:param: df: Original dataframe
:param df: Original dataframe
:type samples: list<str>
:param: samples: List of samples to use for calculating the IBS
:type skipMultiAllelic: boolean
:param: skipMultiAllelic: Skip variants where any of the samples has a secondary alternate
:type skipReference: boolean
:param: skipReference: Skip variants where both samples of the pair are HOM_REF
:param samples: List of samples to use for calculating the IBS
:type skipMultiAllelic: bool
:param skipMultiAllelic: Skip variants where any of the samples has a secondary alternate
:type skipReference: bool
:param skipReference: Skip variants where both samples of the pair are HOM_REF
:type numPairs: int
:param: numPairs:
:param numPairs:
:rtype: DataFrame
:return: Transformed dataframe
"""
return IBSTransformer(samples=samples, skipReference=skipReference, skipMultiAllelic=skipMultiAllelic,
Expand All @@ -87,13 +122,28 @@ def imputeSex(self, df, lowerThreshold=None, upperThreshold=None, chromosomeX=No
"""
Estimate sex of the individuals calculating the inbreeding coefficients F on the chromosome X.
:param: df: Original dataframe
:type df: DataFrame
:param df: Original dataframe
:type lowerThreshold: float
:param lowerThreshold:
:type upperThreshold: float
:param upperThreshold:
:type chromosomeX: str
:param chromosomeX:
:type includePseudoautosomalRegions: bool
:param includePseudoautosomalRegions:
:type par1chrX: str
:param par1chrX:
:type par2chrX: str
:param par2chrX:
:rtype: DataFrame
:return: Transformed dataframe
"""
return ImputeSexTransformer(lowerThreshold=lowerThreshold, upperThreshold=upperThreshold, chromosomeX=chromosomeX,
Expand All @@ -111,10 +161,19 @@ def inbreedingCoefficient(self, df, missingGenotypesAsHomRef=None, includeMultiA
([observed hom. count] - [expected count]) / ([total genotypes count] - [expected count])
Unless otherwise specified, the genotype counts will exclude the missing and multi-allelic genotypes.
:type df: DataFrame
:param df: Original dataframe
:type missingGenotypesAsHomRef: bool
:param missingGenotypesAsHomRef: Treat missing genotypes as HomRef genotypes
:type includeMultiAllelicGenotypes: bool
:param includeMultiAllelicGenotypes: Include multi-allelic variants in the calculation
:type mafThreshold: float
:param mafThreshold: Include multi-allelic variants in the calculation
:rtype: DataFrame
:return: Transformed dataframe
"""
return InbreedingCoefficientTransformer(missingGenotypesAsHomRef=missingGenotypesAsHomRef,
Expand All @@ -125,11 +184,23 @@ def mendel(self, df, father, mother, child, studyId=None):
Using Plink Mendel error codes
https://www.cog-genomics.org/plink2/basic_stats#mendel
:type df: DataFrame
:param df: Original dataframe
:type father: str
:param father:
:type mother: str
:param mother:
:type child: str
:param child:
:type studyId: str
:param studyId:
:rtype: DataFrame
:return: Transformed dataframe
"""
return MendelianErrorTransformer(father=father, mother=mother, child=child, studyId=studyId).transform(df)
Expand All @@ -151,14 +222,29 @@ def modeOfInheritance(self, df, family, modeOfInheritance, phenotype, studyId=No
- xLinked
- yLinked
:type df: DataFrame
:param df: Original dataframe
:type family: str
:param family: Select family to apply the filter
:type modeOfInheritance: str
:param modeOfInheritance: Filter by mode of inheritance from a given family. Accepted values: monoallelic (dominant),
biallelic (recessive), xLinkedMonoallelic, xLinkedBiallelic, yLinked"
:type phenotype: str
:param phenotype:
:type studyId: str
:param studyId:
:type incompletePenetrance: bool
:param incompletePenetrance: Allow variants with an incomplete penetrance mode of inheritance
:type missingAsReference: bool
:param missingAsReference:
:rtype: DataFrame
:return: Transformed dataframe
"""
return ModeOfInheritanceTransformer(family=family, modeOfInheritance=modeOfInheritance, phenotype=phenotype, studyId=studyId,
Expand All @@ -174,12 +260,23 @@ def tdt(self, df, studyId, phenotype):
def stats(self, df, studyId=None, cohort=None, samples=None, missingAsReference=None):
"""
:type df: DataFrame
:param df: Original dataframe
:type studyId: str
:param studyId:
:type cohort: str
:param cohort: Name of the cohort to calculate stats from. By default, 'ALL'
:type samples: list<str>
:param samples: Samples belonging to the cohort. If empty, will try to read from metadata. If missing, will use all samples
from the dataset
:type missingAsReference: bool
:param missingAsReference: Count missing alleles as reference alleles
:rtype: DataFrame
:return: Transformed dataframe
"""
return VariantStatsTransformer(studyId=studyId, cohort=cohort, samples=samples, missingAsReference=missingAsReference).transform(df)
Expand All @@ -204,10 +301,11 @@ def getMetadataPath(self, path):

def readMetadata(self, meta_path):
"""
Writes the VariantMetadata into the schema metadata from the given dataset.
Writes the VariantMetadata into the schema metadata from the given dataframe.
:type meta_path: str
:param meta_path: Path to the metadata file
:rtype: dict
:return: An instance of VariantMetadata
"""
Expand All @@ -216,12 +314,14 @@ def readMetadata(self, meta_path):

def setVariantMetadata(self, df, variant_metadata):
"""
Writes the VariantMetadata into the schema metadata from the given dataset.
Writes the VariantMetadata into the schema metadata from the given dataframe.
:type df: DataFrame
:param df: DataFrame to modify
:type variant_metadata: VariantMetadata
:param variant_metadata: VariantMetadata to set
:rtype: DataFrame
:return: Modified DataFrame
"""
Expand Down
27 changes: 19 additions & 8 deletions oskar-spark/src/main/python/pyoskar/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,14 @@ def protein_substitution(annotation, score):
"""
Returns an array with the MIN and the MAX value of the given ProteinSubstitutionScore. Empty array if not found.
:param annotation:
:type annotation: str
:param annotation: Annotation field
:type score: str
:param score:
:return: max an min array for protein substitution score
:rtype:
:return:
"""
jc = VariantUdfManager._java_class().protein_substitution(_to_java_column(annotation), score)
return Column(jc)
Expand All @@ -135,10 +140,13 @@ def functional(annotation, source):
Read the value for the Functional Score. Null if none. Main functional scores are: cadd_scaled and cadd_raw.
:type annotation: str
:param annotation: annotation field
:param annotation: Annotation field
:type source: str
:param source: study source
:return: functional score
:param source: Study source
:rtype:
:return: Functional score
"""

jc = VariantUdfManager._java_class().functional(_to_java_column(annotation), source)
Expand All @@ -155,10 +163,13 @@ def conservation(annotation, source):
Read the value for the Conservation Score. Null if none. Main conservation scores are: gerp, phastCons and phylop
:type annotation: str
:param annotation: annotation field
:param annotation: Annotation field
:type source: str
:param source: study source
:return: conservation score
:param source: Study source
:rtype:
:return: Conservation score
"""
jc = VariantUdfManager._java_class().conservation(_to_java_column(annotation), source)
return Column(jc)
Loading

0 comments on commit ab5fa85

Please sign in to comment.