forked from ilastik/ilastik
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
H5N5 writing: Refactor test - readability, disentangle test cases (il…
- Loading branch information
Showing
1 changed file
with
137 additions
and
210 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,7 @@ | ||
from builtins import object | ||
|
||
############################################################################### | ||
# lazyflow: data flow based lazy parallel computation framework | ||
# | ||
# Copyright (C) 2011-2014, the ilastik developers | ||
# Copyright (C) 2011-2024, the ilastik developers | ||
# <[email protected]> | ||
# | ||
# This program is free software; you can redistribute it and/or | ||
|
@@ -21,215 +19,144 @@ | |
# This information is also available on the ilastik web site at: | ||
# http://ilastik.org/license/ | ||
############################################################################### | ||
from lazyflow.operators.opArrayPiper import OpArrayPiper | ||
from lazyflow.operators.ioOperators import OpH5N5WriterBigDataset | ||
from shutil import rmtree | ||
from typing import Union | ||
|
||
import h5py | ||
import numpy | ||
import pytest | ||
import vigra | ||
import h5py | ||
import z5py | ||
import os | ||
|
||
import lazyflow.graph | ||
from lazyflow.operators.ioOperators import OpH5N5WriterBigDataset | ||
from lazyflow.operators.opArrayPiper import OpArrayPiper | ||
|
||
|
||
import logging | ||
|
||
logger = logging.getLogger("tests.testOpH5N5WriterBigDataset") | ||
cacheLogger = logging.getLogger("lazyflow.operators.ioOperators.ioOperators.OpH5N5WriterBigDataset") | ||
requesterLogger = logging.getLogger("lazyflow.utility.bigRequestStreamer") | ||
|
||
|
||
class TestOpH5N5WriterBigDataset(object): | ||
def setup_method(self, method): | ||
self.graph = lazyflow.graph.Graph() | ||
self.testDataH5FileName = "bigH5TestData.h5" | ||
self.testDataN5FileName = "bigN5TestData.n5" | ||
self.datasetInternalPath = "volume/data" | ||
|
||
# Generate some test data | ||
self.dataShape = (1, 10, 128, 128, 1) | ||
self.testData = vigra.VigraArray(self.dataShape, axistags=vigra.defaultAxistags("txyzc"), order="C") | ||
self.testData[...] = numpy.indices(self.dataShape).sum(0) | ||
|
||
def teardown_method(self, method): | ||
# Clean up: Delete the test file. | ||
try: | ||
os.remove(self.testDataH5FileName) | ||
rmtree(self.testDataN5FileName) | ||
except: | ||
pass | ||
|
||
def test_Writer(self): | ||
# Create the h5 file | ||
hdf5File = h5py.File(self.testDataH5FileName, "w") | ||
n5File = z5py.N5File(self.testDataN5FileName, "w") | ||
|
||
opPiper = OpArrayPiper(graph=self.graph) | ||
opPiper.Input.setValue(self.testData) | ||
|
||
h5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
n5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
h5_opWriter.h5N5File.setValue(hdf5File) | ||
n5_opWriter.h5N5File.setValue(n5File) | ||
h5_opWriter.h5N5Path.setValue(self.datasetInternalPath) | ||
n5_opWriter.h5N5Path.setValue(self.datasetInternalPath) | ||
h5_opWriter.Image.connect(opPiper.Output) | ||
n5_opWriter.Image.connect(opPiper.Output) | ||
|
||
# Force the operator to execute by asking for the output (a bool) | ||
h5_success = h5_opWriter.WriteImage.value | ||
n5_success = n5_opWriter.WriteImage.value | ||
|
||
assert h5_success | ||
assert n5_success | ||
|
||
hdf5File.close() | ||
n5File.close() | ||
|
||
# Check the file. | ||
hdf5File = h5py.File(self.testDataH5FileName, "r") | ||
n5File = z5py.N5File(self.testDataN5FileName, "r") | ||
h5_dataset = hdf5File[self.datasetInternalPath] | ||
n5_dataset = n5File[self.datasetInternalPath] | ||
assert h5_dataset.shape == self.dataShape | ||
assert n5_dataset.shape == self.dataShape | ||
assert (numpy.all(h5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
assert (numpy.all(n5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
hdf5File.close() | ||
n5File.close() | ||
|
||
|
||
class TestOpH5N5WriterBigDataset_2(object): | ||
def setup_method(self, method): | ||
self.graph = lazyflow.graph.Graph() | ||
self.testDataH5FileName = "bigH5TestData.h5" | ||
self.testDataN5FileName = "bigH5TestData.n5" | ||
self.datasetInternalPath = "volume/data" | ||
|
||
# Generate some test data | ||
self.dataShape = (1, 10, 128, 128, 1) | ||
self.testData = vigra.VigraArray( | ||
self.dataShape, axistags=vigra.defaultAxistags("txyzc") | ||
) # default vigra order this time... | ||
self.testData[...] = numpy.indices(self.dataShape).sum(0) | ||
|
||
def teardown_method(self, method): | ||
# Clean up: Delete the test file. | ||
try: | ||
os.remove(self.testDataH5FileName) | ||
rmtree(self.testDataN5FileName) | ||
except: | ||
pass | ||
|
||
def test_Writer(self): | ||
|
||
# Create the h5 file | ||
hdf5File = h5py.File(self.testDataH5FileName, "w") | ||
n5File = z5py.N5File(self.testDataN5FileName, "w") | ||
|
||
opPiper = OpArrayPiper(graph=self.graph) | ||
opPiper.Input.setValue(self.testData) | ||
|
||
h5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
n5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
|
||
# This checks that you can give a preexisting group as the file | ||
h5_g = hdf5File.create_group("volume") | ||
n5_g = n5File.create_group("volume") | ||
h5_opWriter.h5N5File.setValue(h5_g) | ||
n5_opWriter.h5N5File.setValue(n5_g) | ||
h5_opWriter.h5N5Path.setValue("data") | ||
n5_opWriter.h5N5Path.setValue("data") | ||
h5_opWriter.Image.connect(opPiper.Output) | ||
n5_opWriter.Image.connect(opPiper.Output) | ||
|
||
# Force the operator to execute by asking for the output (a bool) | ||
h5_success = h5_opWriter.WriteImage.value | ||
n5_success = n5_opWriter.WriteImage.value | ||
assert h5_success | ||
assert n5_success | ||
|
||
hdf5File.close() | ||
n5File.close() | ||
|
||
# Check the file. | ||
hdf5File = h5py.File(self.testDataH5FileName, "r") | ||
n5File = h5py.File(self.testDataH5FileName, "r") | ||
h5_dataset = hdf5File[self.datasetInternalPath] | ||
n5_dataset = n5File[self.datasetInternalPath] | ||
assert h5_dataset.shape == self.dataShape | ||
assert n5_dataset.shape == self.dataShape | ||
assert (numpy.all(h5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
assert (numpy.all(n5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
hdf5File.close() | ||
n5File.close() | ||
|
||
|
||
class TestOpH5N5WriterBigDataset_3(object): | ||
def setup_method(self, method): | ||
self.graph = lazyflow.graph.Graph() | ||
self.testDataH5FileName = "bigH5TestData.h5" | ||
self.testDataN5FileName = "bigH5TestData.n5" | ||
|
||
self.datasetInternalPath = "volume/data" | ||
|
||
# Generate some test data | ||
self.dataShape = (1, 10, 128, 128, 1) | ||
self.testData = vigra.VigraArray( | ||
self.dataShape, axistags=vigra.defaultAxistags("txyzc") | ||
) # default vigra order this time... | ||
self.testData[...] = numpy.indices(self.dataShape).sum(0) | ||
|
||
def teardown_method(self, method): | ||
# Clean up: Delete the test file. | ||
try: | ||
os.remove(self.testDataH5FileName) | ||
rmtree(self.testDataN5FileName) | ||
except: | ||
pass | ||
|
||
def test_Writer(self): | ||
# Create the h5 file | ||
hdf5File = h5py.File(self.testDataH5FileName, "w") | ||
n5File = z5py.N5File(self.testDataN5FileName, "w") | ||
|
||
opPiper = OpArrayPiper(graph=self.graph) | ||
opPiper.Input.setValue(self.testData) | ||
|
||
# Force extra metadata onto the output | ||
opPiper.Output.meta.ideal_blockshape = (1, 1, 0, 0, 1) | ||
# Pretend the RAM usage will be really high to force lots of tiny blocks | ||
opPiper.Output.meta.ram_usage_per_requested_pixel = 1000000.0 | ||
|
||
h5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
n5_opWriter = OpH5N5WriterBigDataset(graph=self.graph) | ||
|
||
# This checks that you can give a preexisting group as the file | ||
h5_g = hdf5File.create_group("volume") | ||
n5_g = n5File.create_group("volume") | ||
h5_opWriter.h5N5File.setValue(h5_g) | ||
n5_opWriter.h5N5File.setValue(n5_g) | ||
h5_opWriter.h5N5Path.setValue("data") | ||
n5_opWriter.h5N5Path.setValue("data") | ||
h5_opWriter.Image.connect(opPiper.Output) | ||
n5_opWriter.Image.connect(opPiper.Output) | ||
|
||
# Force the operator to execute by asking for the output (a bool) | ||
h5_success = h5_opWriter.WriteImage.value | ||
n5_success = n5_opWriter.WriteImage.value | ||
assert h5_success | ||
assert n5_success | ||
|
||
hdf5File.close() | ||
n5File.close() | ||
|
||
# Check the file. | ||
hdf5File = h5py.File(self.testDataH5FileName, "r") | ||
n5File = h5py.File(self.testDataH5FileName, "r") | ||
h5_dataset = hdf5File[self.datasetInternalPath] | ||
n5_dataset = n5File[self.datasetInternalPath] | ||
assert h5_dataset.shape == self.dataShape | ||
assert n5_dataset.shape == self.dataShape | ||
assert (numpy.all(h5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
assert (numpy.all(n5_dataset[...] == self.testData.view(numpy.ndarray)[...])).all() | ||
hdf5File.close() | ||
n5File.close() | ||
def setup_writer( | ||
graph: lazyflow.graph.Graph, | ||
file: Union[h5py.File, z5py.N5File, h5py.Group, z5py.Group], | ||
internal_path: str, | ||
source_slot: lazyflow.slot.OutputSlot, | ||
) -> OpH5N5WriterBigDataset: | ||
opWriter = OpH5N5WriterBigDataset(graph=graph) | ||
opWriter.h5N5File.setValue(file) | ||
opWriter.h5N5Path.setValue(internal_path) | ||
opWriter.Image.connect(source_slot) | ||
return opWriter | ||
|
||
|
||
@pytest.fixture | ||
def test_data_order_c(): | ||
dataShape = (1, 10, 128, 128, 1) | ||
testData = vigra.VigraArray(dataShape, axistags=vigra.defaultAxistags("txyzc"), order="C") | ||
testData[...] = numpy.indices(dataShape).sum(0) | ||
return testData | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"file_ext, file_class", | ||
[ | ||
("h5", h5py.File), | ||
("n5", z5py.N5File), | ||
], | ||
) | ||
def test_write_nested_internal_path_with_order_c(tmp_path, graph, test_data_order_c, file_ext, file_class): | ||
file_path = tmp_path / f"test.{file_ext}" | ||
|
||
opPiper = OpArrayPiper(graph=graph) | ||
opPiper.Input.setValue(test_data_order_c) | ||
file = file_class(file_path, "w") | ||
internal_path = "volume/data" | ||
opWriter = setup_writer(graph, file, internal_path, opPiper.Output) | ||
|
||
try: | ||
assert opWriter.WriteImage.value # Trigger write | ||
finally: | ||
file.close() | ||
del file | ||
|
||
file = file_class(file_path, "r") | ||
dataset = file[internal_path] | ||
|
||
try: | ||
assert dataset.shape == test_data_order_c.shape | ||
numpy.testing.assert_array_equal(dataset[...], test_data_order_c.view(numpy.ndarray)[...]) | ||
finally: | ||
file.close() | ||
|
||
|
||
@pytest.fixture | ||
def test_data_default_order(): | ||
dataShape = (1, 10, 128, 128, 1) | ||
testData = vigra.VigraArray(dataShape, axistags=vigra.defaultAxistags("txyzc")) # default vigra order this time... | ||
testData[...] = numpy.indices(dataShape).sum(0) | ||
return testData | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"file_ext, file_class", | ||
[ | ||
("h5", h5py.File), | ||
("n5", z5py.N5File), | ||
], | ||
) | ||
def test_write_to_group_default_order(tmp_path, graph, test_data_default_order, file_ext, file_class): | ||
opPiper = OpArrayPiper(graph=graph) | ||
opPiper.Input.setValue(test_data_default_order) | ||
|
||
file_path = tmp_path / f"test.{file_ext}" | ||
file = file_class(file_path, "w") | ||
g = file.create_group("volume") | ||
opWriter = setup_writer(graph, g, "data", opPiper.Output) | ||
|
||
try: | ||
assert opWriter.WriteImage.value # Trigger write | ||
finally: | ||
file.close() | ||
del file | ||
|
||
file = file_class(file_path, "r") | ||
dataset = file["volume/data"] | ||
|
||
try: | ||
assert dataset.shape == test_data_default_order.shape | ||
numpy.testing.assert_array_equal(dataset[...], test_data_default_order.view(numpy.ndarray)[...]) | ||
finally: | ||
file.close() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"file_ext, file_class", | ||
[ | ||
("h5", h5py.File), | ||
("n5", z5py.N5File), | ||
], | ||
) | ||
def test_write_to_group_with_extra_meta(tmp_path, graph, test_data_default_order, file_ext, file_class): | ||
opPiper = OpArrayPiper(graph=graph) | ||
opPiper.Input.setValue(test_data_default_order) | ||
|
||
# Force extra metadata onto the output | ||
opPiper.Output.meta.ideal_blockshape = (1, 1, 0, 0, 1) | ||
# Pretend the RAM usage will be really high to force lots of tiny blocks | ||
opPiper.Output.meta.ram_usage_per_requested_pixel = 1000000.0 | ||
|
||
file_path = tmp_path / f"test.{file_ext}" | ||
file = file_class(file_path, "w") | ||
g = file.create_group("volume") | ||
opWriter = setup_writer(graph, g, "data", opPiper.Output) | ||
|
||
try: | ||
assert opWriter.WriteImage.value # Trigger write | ||
finally: | ||
file.close() | ||
del file | ||
|
||
file = file_class(file_path, "r") | ||
dataset = file["volume/data"] | ||
|
||
try: | ||
assert dataset.shape == test_data_default_order.shape | ||
numpy.testing.assert_array_equal(dataset[...], test_data_default_order.view(numpy.ndarray)[...]) | ||
finally: | ||
file.close() |