Skip to content

Commit 7548aff

Browse files
committed
Refactor merging module and improve temporary directory handling
1 parent 2c47e9b commit 7548aff

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

zcollection/merging/__init__.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,20 @@
99
from __future__ import annotations
1010

1111
from typing import Protocol
12-
import random
12+
import hashlib
1313
import shutil
1414

1515
import fsspec
1616
import fsspec.implementations.local
1717
import zarr.storage
1818

19+
from zcollection import fs_utils
20+
1921
from .. import dataset, storage, sync
2022
from .time_series import merge_time_series
2123

2224
__all__ = ('MergeCallable', 'perform', 'merge_time_series')
2325

24-
#: Character set used to create a temporary directory.
25-
CHARACTERS = 'abcdefghijklmnopqrstuvwxyz0123456789_'
26-
2726

2827
#: pylint: disable=too-few-public-methods,duplicate-code
2928
class MergeCallable(Protocol):
@@ -88,6 +87,12 @@ def _rename(
8887
fs.mv(source, dest, recursive=True)
8988

9089

90+
def _extract_root_dirname(dirname: str, sep: str) -> str:
91+
"""Extracts the root directory name from a partition name."""
92+
parts = filter(lambda x: '=' not in x, dirname.split(sep))
93+
return sep.join(parts)
94+
95+
9196
def _update_fs(
9297
dirname: str,
9398
zds: dataset.Dataset,
@@ -103,9 +108,13 @@ def _update_fs(
103108
fs: The file system that the partition is stored on.
104109
synchronizer: The instance handling access to critical resources.
105110
"""
106-
# Name of the temporary directory.
107-
temp: str = dirname + '.' + ''.join(
108-
random.choice(CHARACTERS) for _ in range(10))
111+
# Building a temporary directory to store the new data. The name of the
112+
# temporary directory is the hash of the partition name.
113+
temp: str = fs_utils.join_path(
114+
_extract_root_dirname(dirname, fs.sep),
115+
hashlib.sha256(dirname.encode()).hexdigest())
116+
if fs.exists(temp):
117+
fs.rm(temp, recursive=True)
109118

110119
# Initializing Zarr group
111120
zarr.storage.init_group(store=fs.get_mapper(temp))

zcollection/merging/tests/test_merging.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def test_update_fs(
4646
generator = data.create_test_dataset(delayed=False)
4747
zds = next(generator)
4848

49-
partition_folder = local_fs.root.joinpath('partition_folder')
49+
partition_folder = local_fs.root.joinpath('variable=1')
5050

5151
zattrs = str(partition_folder.joinpath('.zattrs'))
5252
future = dask_client.submit(_update_fs, str(partition_folder),
@@ -82,7 +82,7 @@ def test_perform(
8282
generator = data.create_test_dataset(delayed=delayed)
8383
zds = next(generator)
8484

85-
path = str(local_fs.root.joinpath('folder'))
85+
path = str(local_fs.root.joinpath('variable=1'))
8686

8787
future = dask_client.submit(_update_fs, path, dask_client.scatter(zds),
8888
local_fs.fs)

0 commit comments

Comments
 (0)