Skip to content

Commit

Permalink
analysis: implement variant stats analysis and spark/parquet analysis…
Browse files Browse the repository at this point in the history
… executor, #20
  • Loading branch information
jtarraga committed Sep 25, 2019
1 parent b831876 commit 6c671ea
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,32 @@
import org.opencb.oskar.analysis.exceptions.AnalysisException;

import java.nio.file.Path;
import java.util.List;

public class AbstractSampleStatsExecutor extends AbstractAnalysisExecutor {

protected List<String> sampleNames;
protected String cohortName;

public AbstractSampleStatsExecutor() {
}

public AbstractSampleStatsExecutor(ObjectMap executorParams, Path outDir) {
this.setup(executorParams, outDir);
public AbstractSampleStatsExecutor(List<String> sampleNames, ObjectMap executorParams, Path outDir) {
this.setup(sampleNames, executorParams, outDir);
}

public AbstractSampleStatsExecutor(String cohortName, ObjectMap executorParams, Path outDir) {
this.setup(sampleNames, executorParams, outDir);
}

protected void setup(List<String> sampleNames, ObjectMap executorParams, Path outDir) {
super.setup(executorParams, outDir);
this.sampleNames = sampleNames;
}

protected void setup(ObjectMap executorParams, Path outDir) {
protected void setup(String cohortName, ObjectMap executorParams, Path outDir) {
super.setup(executorParams, outDir);
this.cohortName = cohortName;
}

@Override
Expand All @@ -28,6 +42,8 @@ public AnalysisResult exec() throws AnalysisException {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AbstractSampleStatsExecutor{");
sb.append("sampleNames=").append(sampleNames);
sb.append(", cohortName='").append(cohortName).append('\'');
sb.append(", executorParams=").append(executorParams);
sb.append(", outDir=").append(outDir);
sb.append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
import org.opencb.oskar.core.annotations.Analysis;

import java.nio.file.Path;
import java.util.List;

@Analysis(id = SampleStats.ID, data = Analysis.AnalysisData.VARIANT)
public class SampleStats extends AbstractAnalysis {

public static final String ID = "VARIANT_STATS";
public static final String ID = "SAMPLE_STATS";

private List<String> sampleNames;

public SampleStats(ObjectMap executorParams, Path outDir) {
public SampleStats(List<String> sampleNames, ObjectMap executorParams, Path outDir) {
super(executorParams, outDir);
this.sampleNames = sampleNames;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,31 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.opencb.biodata.models.variant.metadata.VariantSetStats;
import org.opencb.biodata.models.variant.stats.VariantSampleStats;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.oskar.analysis.AnalysisResult;
import org.opencb.oskar.analysis.exceptions.AnalysisException;
import org.opencb.oskar.analysis.variant.stats.AbstractSampleStatsExecutor;
import org.opencb.oskar.analysis.variant.stats.VariantStats;
import org.opencb.oskar.spark.commons.OskarException;
import org.opencb.oskar.spark.commons.converters.RowToAvroConverter;
import org.opencb.oskar.spark.variant.Oskar;
import org.opencb.oskar.spark.variant.analysis.transformers.VariantSetStatsTransformer;
import org.opencb.oskar.spark.variant.analysis.transformers.VariantSampleStatsTransformer;

import java.io.File;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class SampleStatsSparkParquetAnalysisExecutor extends AbstractSampleStatsExecutor {

public SampleStatsSparkParquetAnalysisExecutor() {
}

public SampleStatsSparkParquetAnalysisExecutor(ObjectMap executorParams, Path outDir) {
super(executorParams, outDir);
public SampleStatsSparkParquetAnalysisExecutor(List<String> sampleNames, ObjectMap executorParams, Path outDir) {
super(sampleNames, executorParams, outDir);
}

@Override
Expand All @@ -46,7 +45,7 @@ public AnalysisResult exec() throws AnalysisException {
// Prepare input dataset from the input parquet file
SparkSession sparkSession = SparkSession.builder()
.master(master)
.appName("variant stats")
.appName("sample stats")
.config("spark.ui.enabled", "false")
.getOrCreate();

Expand All @@ -59,9 +58,11 @@ public AnalysisResult exec() throws AnalysisException {
}

// Call to the dataset transformer
VariantSetStatsTransformer transformer = new VariantSetStatsTransformer();
GenericRowWithSchema result = (GenericRowWithSchema) transformer.transform(inputDastaset).collectAsList().get(0);
VariantSetStats stats = RowToAvroConverter.convert(result, new VariantSetStats());
VariantSampleStatsTransformer transformer = new VariantSampleStatsTransformer();


Dataset<Row> outputDs = transformer.setSamples(sampleNames).transform(inputDastaset);
Map<String, VariantSampleStats> stats = VariantSampleStatsTransformer.toSampleStats(outputDs);

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(MapperFeature.REQUIRE_SETTERS_FOR_GETTERS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opencb.oskar.spark.OskarSparkTestUtils.*;

public class SampleStatsSparkParquetAnalysisExecutorTest {
private List<String> sampleNames;
private String cohort;
private ObjectMap executorParams;

Expand All @@ -27,11 +32,17 @@ public void init() throws IOException {
executorParams.put("STUDY_ID", OskarSparkTestUtils.PLATINUM_STUDY);
executorParams.put("MASTER", "local[*]");
executorParams.put("FILE", file.getAbsolutePath());

sampleNames = new ArrayList<>();
sampleNames.add(NA12877);
sampleNames.add(NA12879);
sampleNames.add(NA12885);
sampleNames.add(NA12890);
}

@Test
public void variantStats() throws IOException, AnalysisException {
SampleStatsSparkParquetAnalysisExecutor executor = new SampleStatsSparkParquetAnalysisExecutor(executorParams,
public void sampleStats() throws IOException, AnalysisException {
SampleStatsSparkParquetAnalysisExecutor executor = new SampleStatsSparkParquetAnalysisExecutor(sampleNames, executorParams,
oskarSparkTestUtils.getRootDir().toAbsolutePath());
AnalysisResult analysisResult = executor.exec();

Expand Down

0 comments on commit 6c671ea

Please sign in to comment.