Skip to content

Commit

Permalink
Validate the numerical values of input features in GConstruct. (#1165)
Browse files Browse the repository at this point in the history
*Issue #, if available:*
#1166 

*Description of changes:*
Validate the numerical values during data transformation when debug mode
is True.
(GConstruct has already handled the None value in CategoricalTransform
and TokenTransform.


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Xiang Song <[email protected]>
  • Loading branch information
classicsong and Xiang Song authored Feb 14, 2025
1 parent 08a4bc3 commit aa0944d
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Full argument list of the ``gconstruct.construct_graph`` command
* **-\-graph-name**: (**Required**) the name assigned for the graph. The graph name must adhere to the `Python identifier naming rules<https://docs.python.org/3/reference/lexical_analysis.html#identifiers>`_ with the exception that hyphens (``-``) are permitted and the name can start with numbers.
* **-\-remap-node-id**: boolean value to decide whether to rename node IDs or not. Adding this argument will set it to be true, otherwise false.
* **-\-add-reverse-edges**: boolean value to decide whether to add reverse edges for the given graph. Adding this argument sets it to true; otherwise, it defaults to false. It is **strongly** suggested to include this argument for graph construction, as some nodes in the original data may not have in-degrees, and thus cannot update their presentations by aggregating messages from their neighbors. Adding this arugment helps prevent this issue.
* **-\-no-feature-validate**: Turn off the feature validation. By default, GraphStorm will check whether the input features include ``NaN`` or ``Inf``. By turning this validation process off, you can speedup feature transformation.
* **-\-output-format**: the format of constructed graph, options are ``DGL``, ``DistDGL``. Default is ``DistDGL``. It also accepts multiple graph formats at the same time separated by an space, for example ``--output-format "DGL DistDGL"``. The output format is explained in the :ref:`Output <gcon-output-format>` section above.
* **-\-num-parts**: an integer value that specifies the number of graph partitions to produce. This is only valid if the output format is ``DistDGL``.
* **-\-part-method**: the partition method to use during partitioning. We support 'metis' or 'random'.
Expand Down
13 changes: 12 additions & 1 deletion python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
from .id_map import NoopMap, IdMap, map_node_ids
from .utils import (multiprocessing_data_read,
update_two_phase_feat_ops, ExtMemArrayMerger,
partition_graph, ExtMemArrayWrapper)
partition_graph,
ExtMemArrayWrapper,
stop_validate_features)
from .utils import (get_hard_edge_negs_feats,
shuffle_hard_nids)

Expand Down Expand Up @@ -744,6 +746,13 @@ def process_graph(args):
"""
check_graph_name(args.graph_name)
logging.basicConfig(level=get_log_level(args.logging_level))
if args.no_feature_validate:
logging.warning("Turn off input feature validation."
"This will speedup data processing, "
"but won't check whether there are "
"invalid values from the input.")
stop_validate_features()

with open(args.conf_file, 'r', encoding="utf8") as json_file:
process_confs = json.load(json_file)

Expand Down Expand Up @@ -919,6 +928,8 @@ def process_graph(args):
help="Whether or not to remap node IDs.")
argparser.add_argument("--add-reverse-edges", action='store_true',
help="Add reverse edges.")
argparser.add_argument("--no-feature-validate", action='store_true',
help="Turn off features validation.")
argparser.add_argument("--output-format", type=str, nargs='+', default=["DistDGL"],
help="The output format of the constructed graph."
"It can be a single output format, for example "
Expand Down
130 changes: 71 additions & 59 deletions python/graphstorm/gconstruct/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from transformers import AutoModel, AutoConfig

from .file_io import read_index
from .utils import ExtMemArrayWrapper, ExtFeatureWrapper, generate_hash
from .utils import (ExtMemArrayWrapper,
ExtFeatureWrapper,
generate_hash,
validate_features,
validate_numerical_feats)

LABEL_STATS_FIELD = "training_label_stats"
LABEL_STATS_FREQUENCY_COUNT = "frequency_cnt"
Expand Down Expand Up @@ -280,6 +284,34 @@ def as_out_dtype(self, feats):
else:
return feats.astype(self.out_dtype)

def feat2numerical(self, feats):
""" Cast a feature into a numerical numpy array if it is not.
If it is already a numerical numpy array, do nothing.
Parameters
----------
feats: np.array
Input feature.
Return
------
np.array: the cast feature.
"""
# Only allow integers and floating points
# Do not allow booleans.
if not (np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating)):
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
return feats

class GlobalProcessFeatTransform(FeatTransform):
""" The base class for transformations that can only be done using a single process.
Expand Down Expand Up @@ -388,9 +420,11 @@ def call(self, feats):
f"within numerical value."
if isinstance(feats, ExtMemArrayWrapper):
feats = feats.to_numpy()
assert np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating), \
f"The feature {self.feat_name} has to be integers or floats."

feats = self.feat2numerical(feats)
if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

encoding = np.zeros((len(feats), self.bucket_cnt), dtype=np.int8)
max_val = max(self.bucket_range)
Expand Down Expand Up @@ -562,6 +596,15 @@ def pre_process(self, feats):
f"Feature {self.feat_name} of NumericalMinMaxTransform " \
"must be numpy array or ExtMemArray"

if validate_features():
if isinstance(feats, ExtMemArrayWrapper):
# TODO(xiangsx): This is not memory efficient.
# It will load all data into main memory.
feats = feats.to_numpy()
feats = self.feat2numerical(feats)
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

# The max and min of $val = (val-min) / (max-min)$ is pre-defined
# in the transform_conf, return max_val and min_val directly
if self._max_val is not None and self._min_val is not None:
Expand All @@ -572,17 +615,7 @@ def pre_process(self, feats):
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
feats = self.feat2numerical(feats)
assert len(feats.shape) <= 2, \
"Only support 1D fp feature or 2D fp feature, " \
f"but get {len(feats.shape)}D feature for {self.feat_name}"
Expand Down Expand Up @@ -651,15 +684,7 @@ def call(self, feats):
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
feats = (feats - self._min_val) / (self._max_val - self._min_val)
feats[feats > 1] = 1 # any value > self._max_val is set to self._max_val
feats[feats < 0] = 0 # any value < self._min_val is set to self._min_val
Expand Down Expand Up @@ -713,6 +738,16 @@ def pre_process(self, feats) -> Dict[str, Optional[np.ndarray]]:
f"Feature {self.feat_name} of NumericalMinMaxTransform " \
f"must be numpy array or ExtMemArray, got {type(feats)}"

if validate_features():
if isinstance(feats, ExtMemArrayWrapper):
# TODO(xiangsx): This is not memory efficient.
# It will load all data into main memory.
feats = feats.to_numpy()

feats = self.feat2numerical(feats)
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

# If sum has already been set.
# Skip the pre-process step
if self._summation is not None:
Expand All @@ -723,18 +758,7 @@ def pre_process(self, feats) -> Dict[str, Optional[np.ndarray]]:
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
# make summation a 1D array
summation = np.sum(feats, axis=0, keepdims=True).reshape((-1,))
return {self.feat_name: summation}
Expand Down Expand Up @@ -782,15 +806,7 @@ def call(self, feats) -> Dict[str, np.ndarray]:
# It will load all data into main memory.
feats = feats.to_numpy()

if feats.dtype not in [np.float64, np.float32, np.float16, np.int64, \
np.int32, np.int16, np.int8]:
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except TypeError:
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")

feats = self.feat2numerical(feats)
feats = feats / self._summation

return {self.feat_name: feats}
Expand Down Expand Up @@ -836,21 +852,12 @@ def call(self, feats):
assert isinstance(feats, (np.ndarray, ExtMemArrayWrapper)), \
f"The feature {self.feat_name} has to be NumPy array."

if np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating): \
return {self.feat_name: feats}
else:
logging.warning("The feature %s has to be floating points or integers,"
"but get %s. Try to cast it into float32",
self.feat_name, feats.dtype)
try:
# if input dtype is not float or integer, we need to cast the data
# into float32
feats = feats.astype(np.float32)
except: # pylint: disable=bare-except
raise ValueError(f"The feature {self.feat_name} has to be integers or floats.")
feats = self.feat2numerical(feats)
if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

return {self.feat_name: feats}
return {self.feat_name: feats}

def after_merge_transform(self, feats):
# The feats can be a numpy array or a numpy memmaped object
Expand Down Expand Up @@ -1076,6 +1083,11 @@ def call(self, feats):
assert np.issubdtype(feats.dtype, np.integer) \
or np.issubdtype(feats.dtype, np.floating), \
f"The feature {self.feat_name} has to be integers or floats."

if validate_features():
assert validate_numerical_feats(feats), \
f"There are NaN, Inf or missing value in the {self.feat_name} feature."

if self.truncate_dim is not None:
if isinstance(feats, np.ndarray):
feats = feats[:, :self.truncate_dim]
Expand Down
22 changes: 22 additions & 0 deletions python/graphstorm/gconstruct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@
SHARED_MEMORY_CROSS_PROCESS_STORAGE = "shared_memory"
PICKLE_CROSS_PROCESS_STORAGE = "pickle"
EXT_MEMORY_STORAGE = "ext_memory"
VALIDATE_FEATRE= True

def validate_features():
""" Check whether gconstruct needs to validate the input features
"""
return VALIDATE_FEATRE

def stop_validate_features():
""" Set gconstruct in debug mode.
"""
global VALIDATE_FEATRE
VALIDATE_FEATRE = False

def validate_numerical_feats(feats):
""" Validate the numerical features
Returns
-------
bool: Whether the values of the input feature are all valid
"""
return (not np.isnan(feats).any()) and \
(not np.isinf(feats).any())

def _is_numeric(arr):
""" Check if the input array has the numeric data type.
Expand Down
37 changes: 36 additions & 1 deletion tests/unit-tests/gconstruct/test_gconstruct_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pyarrow.parquet as pq
import pyarrow as pa
import torch as th
import graphstorm

from numpy.testing import assert_almost_equal

Expand All @@ -34,7 +35,10 @@
from graphstorm.gconstruct.utils import (save_maps,
load_maps,
get_hard_edge_negs_feats,
shuffle_hard_nids)
shuffle_hard_nids,
validate_features,
stop_validate_features,
validate_numerical_feats)
from graphstorm.gconstruct.file_io import (write_data_hdf5,
read_data_hdf5,
get_in_files,
Expand Down Expand Up @@ -518,7 +522,38 @@ def test_single_directory_expansion():
expected_files = sorted([os.path.join(temp_dir, f) for f in test_files])
assert sorted(result) == expected_files

def test_validate_features():
assert validate_features()
stop_validate_features()
assert validate_features() is False
# set VALIDATE_FEATRE back
graphstorm.gconstruct.utils.VALIDATE_FEATRE = True

def test_validate_numerical_feats():
array = np.array([1,2,3])
assert validate_numerical_feats(array) is True

array = np.array([1,2,np.inf])
assert validate_numerical_feats(array) is False

array = np.array([1,2,np.nan])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,np.nan],
[1,2,np.inf]])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,3],
[1,2,np.inf]])
assert validate_numerical_feats(array) is False

array = np.array([[1,2,3],
[1,2,np.nan]])
assert validate_numerical_feats(array) is False

if __name__ == '__main__':
test_validate_numerical_feats()
test_validate_features()
test_shuffle_hard_nids()
test_save_load_maps()
test_get_hard_edge_negs_feats()
Expand Down
Loading

0 comments on commit aa0944d

Please sign in to comment.