From ab5fa859b647b36fea0faa1048697de0ee0684a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Rold=C3=A1n?= Date: Thu, 29 Nov 2018 18:55:49 +0000 Subject: [PATCH] spark: add comments to binding #1 --- .../src/main/python/pyoskar/analysis.py | 19 +-- oskar-spark/src/main/python/pyoskar/core.py | 132 +++++++++++++++--- oskar-spark/src/main/python/pyoskar/sql.py | 27 ++-- .../test/python/pyoskar_tests/test_utils.py | 43 +++--- 4 files changed, 162 insertions(+), 59 deletions(-) diff --git a/oskar-spark/src/main/python/pyoskar/analysis.py b/oskar-spark/src/main/python/pyoskar/analysis.py index 560f0fa..8619591 100644 --- a/oskar-spark/src/main/python/pyoskar/analysis.py +++ b/oskar-spark/src/main/python/pyoskar/analysis.py @@ -1,20 +1,13 @@ import sys +from pyspark import keyword_only +from pyspark.ml.param.shared import * +from pyspark.ml.util import JavaMLReadable, JavaMLWritable +from pyspark.ml.wrapper import JavaTransformer if sys.version > '3': basestring = str -from pyspark import since, keyword_only, SparkContext -from pyspark.rdd import ignore_unicode_prefix -from pyspark.ml.linalg import _convert_to_vector -from pyspark.ml.wrapper import JavaWrapper -from pyspark.ml.param.shared import * -from pyspark.ml.util import JavaMLReadable, JavaMLWritable -from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, _jvm -from pyspark.ml.common import inherit_doc -from pyspark.sql.functions import lit -from pyspark.sql import DataFrame - DEFAULT_COHORT = "ALL" @@ -453,10 +446,6 @@ def setMissingAsReference(self, value): return self._set(missingAsReference=value) - - - - class TdtTransformer(AbstractTransformer): studyId = Param(Params._dummy(), "studyId", "", typeConverter=TypeConverters.toString) phenotype = Param(Params._dummy(), "phenotype", "", typeConverter=TypeConverters.toString) diff --git a/oskar-spark/src/main/python/pyoskar/core.py b/oskar-spark/src/main/python/pyoskar/core.py index 42afcfb..e6b072a 100644 --- a/oskar-spark/src/main/python/pyoskar/core.py +++ b/oskar-spark/src/main/python/pyoskar/core.py @@ -1,5 +1,8 @@ import json + +from pyspark.ml.wrapper import JavaWrapper from pyspark.sql.dataframe import DataFrame + from pyoskar.analysis import * __all__ = ['Oskar'] @@ -16,7 +19,9 @@ def __init__(self, spark): def load(self, file_path): """ - :type file_path: str + + :param file_path: + :return: """ df = self._call_java("load", file_path) return df @@ -24,14 +29,23 @@ def load(self, file_path): def chiSquare(self, df, studyId, phenotype): """ - :type df: DataFrame + :param df: + :param studyId: + :param phenotype: + :return: """ return ChiSquareTransformer(studyId=studyId, phenotype=phenotype).transform(df) def compoundHeterozygote(self, df, father, mother, child, studyId=None, missingGenotypeAsReference=None): """ - :type df: DataFrame + :param df: + :param father: + :param mother: + :param child: + :param studyId: + :param missingGenotypeAsReference: + :return: """ return CompoundHeterozigoteTransformer(father=father, mother=mother, child=child, studyId=studyId, missingGenotypeAsReference=missingGenotypeAsReference).transform(df) @@ -39,14 +53,19 @@ def compoundHeterozygote(self, df, father, mother, child, studyId=None, missingG def facet(self, df, facet): """ - :type df: DataFrame + :param df: + :param facet: + :return: """ return FacetTransformer(facet=facet).transform(df) def fisher(self, df, studyId, phenotype): """ - :type df: DataFrame + :param df: + :param studyId: + :param phenotype: + :return: """ return FisherTransformer(studyId=studyId, phenotype=phenotype).transform(df) @@ -54,13 +73,23 @@ def hardyWeinberg(self, df, studyId=None): """ :type df: DataFrame + :param df: Original dataframe + + :type studyId: str + :param studyId: + + :rtype: DataFrame + :return: Transformed dataframe """ return HardyWeinbergTransformer(studyId=studyId).transform(df) def histogram(self, df, inputCol, step=None): """ - :type df: DataFrame + :param df: + :param inputCol: + :param step: + :return: """ return HistogramTransformer(inputCol=inputCol, step=step).transform(df) @@ -69,15 +98,21 @@ def ibs(self, df, samples=None, skipMultiAllelic=None, skipReference=None, numPa Calculates the Identity By State. :type df: DataFrame - :param: df: Original dataframe + :param df: Original dataframe + :type samples: list - :param: samples: List of samples to use for calculating the IBS - :type skipMultiAllelic: boolean - :param: skipMultiAllelic: Skip variants where any of the samples has a secondary alternate - :type skipReference: boolean - :param: skipReference: Skip variants where both samples of the pair are HOM_REF + :param samples: List of samples to use for calculating the IBS + + :type skipMultiAllelic: bool + :param skipMultiAllelic: Skip variants where any of the samples has a secondary alternate + + :type skipReference: bool + :param skipReference: Skip variants where both samples of the pair are HOM_REF + :type numPairs: int - :param: numPairs: + :param numPairs: + + :rtype: DataFrame :return: Transformed dataframe """ return IBSTransformer(samples=samples, skipReference=skipReference, skipMultiAllelic=skipMultiAllelic, @@ -87,13 +122,28 @@ def imputeSex(self, df, lowerThreshold=None, upperThreshold=None, chromosomeX=No """ Estimate sex of the individuals calculating the inbreeding coefficients F on the chromosome X. - :param: df: Original dataframe + :type df: DataFrame + :param df: Original dataframe + + :type lowerThreshold: float :param lowerThreshold: + + :type upperThreshold: float :param upperThreshold: + + :type chromosomeX: str :param chromosomeX: + + :type includePseudoautosomalRegions: bool :param includePseudoautosomalRegions: + + :type par1chrX: str :param par1chrX: + + :type par2chrX: str :param par2chrX: + + :rtype: DataFrame :return: Transformed dataframe """ return ImputeSexTransformer(lowerThreshold=lowerThreshold, upperThreshold=upperThreshold, chromosomeX=chromosomeX, @@ -111,10 +161,19 @@ def inbreedingCoefficient(self, df, missingGenotypesAsHomRef=None, includeMultiA ([observed hom. count] - [expected count]) / ([total genotypes count] - [expected count]) Unless otherwise specified, the genotype counts will exclude the missing and multi-allelic genotypes. + :type df: DataFrame :param df: Original dataframe + + :type missingGenotypesAsHomRef: bool :param missingGenotypesAsHomRef: Treat missing genotypes as HomRef genotypes + + :type includeMultiAllelicGenotypes: bool :param includeMultiAllelicGenotypes: Include multi-allelic variants in the calculation + + :type mafThreshold: float :param mafThreshold: Include multi-allelic variants in the calculation + + :rtype: DataFrame :return: Transformed dataframe """ return InbreedingCoefficientTransformer(missingGenotypesAsHomRef=missingGenotypesAsHomRef, @@ -125,11 +184,23 @@ def mendel(self, df, father, mother, child, studyId=None): Using Plink Mendel error codes https://www.cog-genomics.org/plink2/basic_stats#mendel + :type df: DataFrame :param df: Original dataframe + + :type father: str :param father: + + :type mother: str :param mother: + + :type child: str :param child: + + :type studyId: str :param studyId: + + + :rtype: DataFrame :return: Transformed dataframe """ return MendelianErrorTransformer(father=father, mother=mother, child=child, studyId=studyId).transform(df) @@ -151,14 +222,29 @@ def modeOfInheritance(self, df, family, modeOfInheritance, phenotype, studyId=No - xLinked - yLinked + :type df: DataFrame :param df: Original dataframe + + :type family: str :param family: Select family to apply the filter + + :type modeOfInheritance: str :param modeOfInheritance: Filter by mode of inheritance from a given family. Accepted values: monoallelic (dominant), biallelic (recessive), xLinkedMonoallelic, xLinkedBiallelic, yLinked" + + :type phenotype: str :param phenotype: + + :type studyId: str :param studyId: + + :type incompletePenetrance: bool :param incompletePenetrance: Allow variants with an incomplete penetrance mode of inheritance + + :type missingAsReference: bool :param missingAsReference: + + :rtype: DataFrame :return: Transformed dataframe """ return ModeOfInheritanceTransformer(family=family, modeOfInheritance=modeOfInheritance, phenotype=phenotype, studyId=studyId, @@ -174,12 +260,23 @@ def tdt(self, df, studyId, phenotype): def stats(self, df, studyId=None, cohort=None, samples=None, missingAsReference=None): """ + :type df: DataFrame :param df: Original dataframe + + :type studyId: str :param studyId: + + :type cohort: str :param cohort: Name of the cohort to calculate stats from. By default, 'ALL' + + :type samples: list :param samples: Samples belonging to the cohort. If empty, will try to read from metadata. If missing, will use all samples from the dataset + + :type missingAsReference: bool :param missingAsReference: Count missing alleles as reference alleles + + :rtype: DataFrame :return: Transformed dataframe """ return VariantStatsTransformer(studyId=studyId, cohort=cohort, samples=samples, missingAsReference=missingAsReference).transform(df) @@ -204,10 +301,11 @@ def getMetadataPath(self, path): def readMetadata(self, meta_path): """ - Writes the VariantMetadata into the schema metadata from the given dataset. + Writes the VariantMetadata into the schema metadata from the given dataframe. :type meta_path: str :param meta_path: Path to the metadata file + :rtype: dict :return: An instance of VariantMetadata """ @@ -216,12 +314,14 @@ def readMetadata(self, meta_path): def setVariantMetadata(self, df, variant_metadata): """ - Writes the VariantMetadata into the schema metadata from the given dataset. + Writes the VariantMetadata into the schema metadata from the given dataframe. :type df: DataFrame :param df: DataFrame to modify + :type variant_metadata: VariantMetadata :param variant_metadata: VariantMetadata to set + :rtype: DataFrame :return: Modified DataFrame """ diff --git a/oskar-spark/src/main/python/pyoskar/sql.py b/oskar-spark/src/main/python/pyoskar/sql.py index 1c3c13c..ab8016c 100644 --- a/oskar-spark/src/main/python/pyoskar/sql.py +++ b/oskar-spark/src/main/python/pyoskar/sql.py @@ -107,9 +107,14 @@ def protein_substitution(annotation, score): """ Returns an array with the MIN and the MAX value of the given ProteinSubstitutionScore. Empty array if not found. - :param annotation: + :type annotation: str + :param annotation: Annotation field + + :type score: str :param score: - :return: max an min array for protein substitution score + + :rtype: + :return: """ jc = VariantUdfManager._java_class().protein_substitution(_to_java_column(annotation), score) return Column(jc) @@ -135,10 +140,13 @@ def functional(annotation, source): Read the value for the Functional Score. Null if none. Main functional scores are: cadd_scaled and cadd_raw. :type annotation: str - :param annotation: annotation field + :param annotation: Annotation field + :type source: str - :param source: study source - :return: functional score + :param source: Study source + + :rtype: + :return: Functional score """ jc = VariantUdfManager._java_class().functional(_to_java_column(annotation), source) @@ -155,10 +163,13 @@ def conservation(annotation, source): Read the value for the Conservation Score. Null if none. Main conservation scores are: gerp, phastCons and phylop :type annotation: str - :param annotation: annotation field + :param annotation: Annotation field + :type source: str - :param source: study source - :return: conservation score + :param source: Study source + + :rtype: + :return: Conservation score """ jc = VariantUdfManager._java_class().conservation(_to_java_column(annotation), source) return Column(jc) diff --git a/oskar-spark/src/test/python/pyoskar_tests/test_utils.py b/oskar-spark/src/test/python/pyoskar_tests/test_utils.py index 7d670ba..16b6f77 100644 --- a/oskar-spark/src/test/python/pyoskar_tests/test_utils.py +++ b/oskar-spark/src/test/python/pyoskar_tests/test_utils.py @@ -12,31 +12,34 @@ def create_testing_pyspark_session(): + jar = TARGET_PATH + "oskar-spark-0.1.0-jar-with-dependencies.jar" + if not path.exists(jar): + raise Exception("JAR file \"" + jar + "\" not found! Unable to start SparkSession") + return (SparkSession.builder .master("local[*]") .appName("testing") .config("spark.ui.enabled", "false") - .config("spark.jars", TARGET_PATH + "oskar-spark-0.1.0-jar-with-dependencies.jar") + .config("spark.jars", jar) .getOrCreate()) - class TestOskarBase(TestCase): - spark = None # type: SparkSession - oskar = None # type: Oskar - df = None # type: DataFrame - - @classmethod - def setUpClass(cls): - cls.spark = create_testing_pyspark_session() - cls.oskar = Oskar(cls.spark) - cls.df = cls.oskar.load(PLATINUM_SMALL) - - def setUp(self): - self.spark = self.__class__.spark - self.oskar = self.__class__.oskar - self.df = self.__class__.df - - @classmethod - def tearDownClass(cls): - cls.spark.stop() + spark = None # type: SparkSession + oskar = None # type: Oskar + df = None # type: DataFrame + + @classmethod + def setUpClass(cls): + cls.spark = create_testing_pyspark_session() + cls.oskar = Oskar(cls.spark) + cls.df = cls.oskar.load(PLATINUM_SMALL) + + def setUp(self): + self.spark = self.__class__.spark + self.oskar = self.__class__.oskar + self.df = self.__class__.df + + @classmethod + def tearDownClass(cls): + cls.spark.stop()