Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate the numerical values of input features in GConstruct. #1165

Merged
merged 10 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Full argument list of the ``gconstruct.construct_graph`` command
* **-\-graph-name**: (**Required**) the name assigned for the graph. The graph name must adhere to the `Python identifier naming rules<https://docs.python.org/3/reference/lexical_analysis.html#identifiers>`_ with the exception that hyphens (``-``) are permitted and the name can start with numbers.
* **-\-remap-node-id**: boolean value to decide whether to rename node IDs or not. Adding this argument will set it to be true, otherwise false.
* **-\-add-reverse-edges**: boolean value to decide whether to add reverse edges for the given graph. Adding this argument sets it to true; otherwise, it defaults to false. It is **strongly** suggested to include this argument for graph construction, as some nodes in the original data may not have in-degrees, and thus cannot update their presentations by aggregating messages from their neighbors. Adding this arugment helps prevent this issue.
* **-\-no-feature-validate**: Turn off the feature validation. By default, GraphStorm will check whether the input features include ``NaN`` or ``Inf``. By turning this validation process off, you can speedup feature transformation.
* **-\-output-format**: the format of constructed graph, options are ``DGL``, ``DistDGL``. Default is ``DistDGL``. It also accepts multiple graph formats at the same time separated by an space, for example ``--output-format "DGL DistDGL"``. The output format is explained in the :ref:`Output <gcon-output-format>` section above.
* **-\-num-parts**: an integer value that specifies the number of graph partitions to produce. This is only valid if the output format is ``DistDGL``.
* **-\-part-method**: the partition method to use during partitioning. We support 'metis' or 'random'.
Expand Down
13 changes: 12 additions & 1 deletion python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
from .id_map import NoopMap, IdMap, map_node_ids
from .utils import (multiprocessing_data_read,
update_two_phase_feat_ops, ExtMemArrayMerger,
partition_graph, ExtMemArrayWrapper)
partition_graph,
ExtMemArrayWrapper,
stop_validate_features)
from .utils import (get_hard_edge_negs_feats,
shuffle_hard_nids)

Expand Down Expand Up @@ -744,6 +746,13 @@ def process_graph(args):
"""
check_graph_name(args.graph_name)
logging.basicConfig(level=get_log_level(args.logging_level))
if args.no_feature_validate:
logging.warning("Turn off input feature validation."
"This will speedup data processing, "
"but won't check whether there are "
"invalid values from the input.")
stop_validate_features()

with open(args.conf_file, 'r', encoding="utf8") as json_file:
process_confs = json.load(json_file)

Expand Down Expand Up @@ -919,6 +928,8 @@ def process_graph(args):
help="Whether or not to remap node IDs.")
argparser.add_argument("--add-reverse-edges", action='store_true',
help="Add reverse edges.")
argparser.add_argument("--no-feature-validate", action='store_true',
help="Turn off features validation.")
argparser.add_argument("--output-format", type=str, nargs='+', default=["DistDGL"],
help="The output format of the constructed graph."
"It can be a single output format, for example "
Expand Down
130 changes: 71 additions & 59 deletions python/graphstorm/gconstruct/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from transformers import AutoModel, AutoConfig

from .file_io import read_index
from .utils import ExtMemArrayWrapper, ExtFeatureWrapper, generate_hash
from .utils import (ExtMemArrayWrapper,
ExtFeatureWrapper,
generate_hash,
validate_features,
validate_numerical_feats)

LABEL_STATS_FIELD = "training_label_stats"
LABEL_STATS_FREQUENCY_COUNT = "frequency_cnt"
Expand Down Expand Up @@ -280,6 +284,34 @@ def as_out_dtype(self, feats):
else:
return feats.astype(self.out_dtype)

def feat2numerical(self, feats):
""" Cast a feature into a numerical numpy array if it is not.
If it is already a numerical numpy array, do nothing.

Parameters
----------
feats: np.array
Input feature.

Return
------
np.array: the cast feature.
"""
# Only allow integers and floating points
# Do not allow booleans.
if not (np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating)):
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
return feats

class GlobalProcessFeatTransform(FeatTransform):
""" The base class for transformations that can only be done using a single process.

Expand Down Expand Up @@ -388,9 +420,11 @@ def call(self, feats):
f"within numerical value."
if isinstance(feats, ExtMemArrayWrapper):
feats = feats.to_numpy()
assert np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating), \
f"The feature {self.feat_name} has to be integers or floats."

feats = self.feat2numerical(feats)
if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

encoding = np.zeros((len(feats), self.bucket_cnt), dtype=np.int8)
max_val = max(self.bucket_range)
Expand Down Expand Up @@ -562,6 +596,15 @@ def pre_process(self, feats):
f"Feature {self.feat_name} of NumericalMinMaxTransform " \
"must be numpy array or ExtMemArray"

if validate_features():
if isinstance(feats, ExtMemArrayWrapper):
# TODO(xiangsx): This is not memory efficient.
# It will load all data into main memory.
feats = feats.to_numpy()
feats = self.feat2numerical(feats)
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

# The max and min of $val = (val-min) / (max-min)$ is pre-defined
# in the transform_conf, return max_val and min_val directly
if self._max_val is not None and self._min_val is not None:
Expand All @@ -572,17 +615,7 @@ def pre_process(self, feats):
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
feats = self.feat2numerical(feats)
assert len(feats.shape) <= 2, \
"Only support 1D fp feature or 2D fp feature, " \
f"but get {len(feats.shape)}D feature for {self.feat_name}"
Expand Down Expand Up @@ -651,15 +684,7 @@ def call(self, feats):
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
feats = (feats - self._min_val) / (self._max_val - self._min_val)
feats[feats > 1] = 1 # any value > self._max_val is set to self._max_val
feats[feats < 0] = 0 # any value < self._min_val is set to self._min_val
Expand Down Expand Up @@ -713,6 +738,16 @@ def pre_process(self, feats) -> Dict[str, Optional[np.ndarray]]:
f"Feature {self.feat_name} of NumericalMinMaxTransform " \
f"must be numpy array or ExtMemArray, got {type(feats)}"

if validate_features():
if isinstance(feats, ExtMemArrayWrapper):
# TODO(xiangsx): This is not memory efficient.
# It will load all data into main memory.
feats = feats.to_numpy()

feats = self.feat2numerical(feats)
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

# If sum has already been set.
# Skip the pre-process step
if self._summation is not None:
Expand All @@ -723,18 +758,7 @@ def pre_process(self, feats) -> Dict[str, Optional[np.ndarray]]:
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
# make summation a 1D array
summation = np.sum(feats, axis=0, keepdims=True).reshape((-1,))
return {self.feat_name: summation}
Expand Down Expand Up @@ -782,15 +806,7 @@ def call(self, feats) -> Dict[str, np.ndarray]:
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except TypeError:
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
feats = feats / self._summation

return {self.feat_name: feats}
Expand Down Expand Up @@ -836,21 +852,12 @@ def call(self, feats):
assert isinstance(feats, (np.ndarray, ExtMemArrayWrapper)), \
f"The feature {self.feat_name} has to be NumPy array."

if np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating): \
return {self.feat_name: feats}
else:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
feats = self.feat2numerical(feats)
if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

return {self.feat_name: feats}
return {self.feat_name: feats}

def after_merge_transform(self, feats):
# The feats can be a numpy array or a numpy memmaped object
Expand Down Expand Up @@ -1076,6 +1083,11 @@ def call(self, feats):
assert np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating), \
f"The feature {self.feat_name} has to be integers or floats."

if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

if self.truncate_dim is not None:
if isinstance(feats, np.ndarray):
feats = feats[:, :self.truncate_dim]
Expand Down
22 changes: 22 additions & 0 deletions python/graphstorm/gconstruct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@
SHARED_MEMORY_CROSS_PROCESS_STORAGE = "shared_memory"
PICKLE_CROSS_PROCESS_STORAGE = "pickle"
EXT_MEMORY_STORAGE = "ext_memory"
VALIDATE_FEATRE= True

def validate_features():
""" Check whether gconstruct needs to validate the input features
"""
return VALIDATE_FEATRE

def stop_validate_features():
""" Set gconstruct in debug mode.
"""
global VALIDATE_FEATRE
VALIDATE_FEATRE = False

def validate_numerical_feats(feats):
""" Validate the numerical features

Returns
-------
bool: Whether the values of the input feature are all valid
"""
return (not np.isnan(feats).any()) and \
(not np.isinf(feats).any())

def _is_numeric(arr):
""" Check if the input array has the numeric data type.
Expand Down
37 changes: 36 additions & 1 deletion tests/unit-tests/gconstruct/test_gconstruct_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pyarrow.parquet as pq
import pyarrow as pa
import torch as th
import graphstorm

from numpy.testing import assert_almost_equal

Expand All @@ -34,7 +35,10 @@
from graphstorm.gconstruct.utils import (save_maps,
load_maps,
get_hard_edge_negs_feats,
shuffle_hard_nids)
shuffle_hard_nids,
validate_features,
stop_validate_features,
validate_numerical_feats)
from graphstorm.gconstruct.file_io import (write_data_hdf5,
read_data_hdf5,
get_in_files,
Expand Down Expand Up @@ -518,7 +522,38 @@ def test_single_directory_expansion():
expected_files = sorted([os.path.join(temp_dir, f) for f in test_files])
assert sorted(result) == expected_files

def test_validate_features():
assert validate_features()
stop_validate_features()
assert validate_features() is False
# set VALIDATE_FEATRE back
graphstorm.gconstruct.utils.VALIDATE_FEATRE = True

def test_validate_numerical_feats():
array = np.array([1,2,3])
assert validate_numerical_feats(array) is True

array = np.array([1,2,np.inf])
assert validate_numerical_feats(array) is False

array = np.array([1,2,np.nan])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,np.nan],
[1,2,np.inf]])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,3],
[1,2,np.inf]])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,3],
[1,2,np.nan]])
assert validate_numerical_feats(array) is False

if __name__ == '__main__':
test_validate_numerical_feats()
test_validate_features()
test_shuffle_hard_nids()
test_save_load_maps()
test_get_hard_edge_negs_feats()
Expand Down
Loading
Loading