Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Libpitt/606 grabbing activity props #627

Merged
merged 6 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 67 additions & 40 deletions src/schema/schema_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
_ubkg = None
_memcached_client = None
_memcached_prefix = None
_schema_properties = {}

# For handling cached requests to uuid-api and external static resources (github raw yaml files)
request_cache = {}
Expand Down Expand Up @@ -92,9 +93,11 @@ def initialize(valid_yaml_file,
global _ubkg
global _memcached_client
global _memcached_prefix
global _schema_properties

logger.info(f"Initialize schema_manager using valid_yaml_file={valid_yaml_file}.")
_schema = load_provenance_schema(valid_yaml_file)
_schema_properties = group_schema_properties_by_name()
if _schema is None:
logger.error(f"Failed to load _schema using {valid_yaml_file}.")
_uuid_api_url = uuid_api_url
Expand Down Expand Up @@ -147,6 +150,45 @@ def load_provenance_schema(valid_yaml_file):
return schema_dict


def group_schema_properties_by_name():
"""
This formats the entities schema properties using property names as key.
Then within various buckets, has a set containing entity names which the property belongs to.

This allows for constant time access when filtering responses by property names.

Returns
-------
dict
"""
global _schema

schema_properties_by_name = {}
for entity in _schema['ENTITIES']:
entity_properties = _schema['ENTITIES'][entity].get('properties', {})
for p in entity_properties:
if p not in schema_properties_by_name:
schema_properties_by_name[p] = {}
schema_properties_by_name[p]['dependencies'] = set()
schema_properties_by_name[p]['trigger'] = set()
schema_properties_by_name[p]['neo4j'] = set()
schema_properties_by_name[p]['json_string'] = set()
schema_properties_by_name[p]['list'] = set()
schema_properties_by_name[p]['dependencies'].update(entity_properties[p].get('dependency_properties', []))

if 'on_read_trigger' in entity_properties[p]:
schema_properties_by_name[p]['trigger'].add(entity)
else:
schema_properties_by_name[p]['neo4j'].add(entity)

if 'type' in entity_properties[p]:
if entity_properties[p]['type'] == 'json_string':
schema_properties_by_name[p]['json_string'].add(entity)
if entity_properties[p]['type'] == 'list':
schema_properties_by_name[p]['list'].add(entity)

return schema_properties_by_name

####################################################################################################
## Helper functions
####################################################################################################
Expand Down Expand Up @@ -392,8 +434,10 @@ def rearrange_datasets(results, entity_type = 'Dataset'):


def group_verify_properties_list(normalized_class='All', properties=[]):
""" Separates neo4j properties from transient ones. Will also gather specific property dependencies via a
`dependency_properties` list setting in the schema yaml. Also filters out any unknown properties.
""" Separates neo4j properties from transient ones. More over, buckets neo4j properties that are
either json_string or list to allow them to be handled via apoc.convert.* functions.
Will also gather specific property dependencies via a `dependency_properties` list setting in the schema yaml.
Also filters out any unknown properties.

Parameters
----------
Expand All @@ -409,60 +453,43 @@ def group_verify_properties_list(normalized_class='All', properties=[]):
"""
# Determine the schema section based on class
global _schema
global _schema_properties

defaults = get_schema_defaults([])

if len(properties) == 1 and properties[0] in defaults:
return PropertyGroups(properties, [], [], [], [], [])

neo4j_fields = []
trigger_fields = []
neo4j_fields = set()
trigger_fields = set()
activity_fields = []
json_fields = []
list_fields = []
schema_section = {}
activities_schema_section = {}
json_fields = set()
list_fields = set()
dependencies = set()

def check_dependencies(_entity_properties):
for _p in properties:
if _p in _entity_properties:
dependencies.update(_entity_properties[_p].get('dependency_properties', []))

activity_properties = _schema['ACTIVITIES']['Activity'].get('properties', {})
check_dependencies(activity_properties)
activities_schema_section.update(activity_properties)

if normalized_class == 'All':
for entity in _schema['ENTITIES']:
entity_properties = _schema['ENTITIES'][entity].get('properties', {})
check_dependencies(entity_properties)
schema_section.update(entity_properties)
else:
entity_properties = _schema['ENTITIES'][normalized_class].get('properties', {})
check_dependencies(entity_properties)
schema_section = entity_properties

for p in properties:
if p in schema_section:
if 'on_read_trigger' in schema_section[p]:
trigger_fields.append(p)
else:
neo4j_fields.append(p)
if p in _schema_properties:
if 'trigger' in _schema_properties[p] and (len(_schema_properties[p]['trigger']) or normalized_class in _schema_properties[p]['trigger']):
trigger_fields.add(p)

if 'type' in schema_section[p]:
if schema_section[p]['type'] == 'json_string':
json_fields.append(p)
if schema_section[p]['type'] == 'list':
list_fields.append(p)
if 'neo4j' in _schema_properties[p] and (len(_schema_properties[p]['neo4j']) or normalized_class in _schema_properties[p]['neo4j']):
neo4j_fields.add(p)

if p in activities_schema_section:
activity_fields.append(p)
if len(_schema_properties[p]['json_string']) or normalized_class in _schema_properties[p]['json_string']:
json_fields.add(p)
if len(_schema_properties[p]['list']) or normalized_class in _schema_properties[p]['list']:
list_fields.add(p)

if 'entity_type' not in neo4j_fields and len(trigger_fields) > 0:
neo4j_fields.append('entity_type')
if 'dependencies' in _schema_properties[p] and len(_schema_properties[p]['dependencies']):
dependencies.update(list(_schema_properties[p]['dependencies']))

if p in activity_properties:
activity_fields.append(p)
dependencies.update(activity_properties[p].get('dependency_properties', []))

return PropertyGroups(neo4j_fields, trigger_fields, activity_fields, list(dependencies), json_fields, list_fields)
return PropertyGroups(list(neo4j_fields), list(trigger_fields), activity_fields, list(dependencies), list(json_fields), list(list_fields))

def exclude_properties_from_response(excluded_fields, output_dict):
"""Removes specified fields from an existing dictionary.
Expand Down
37 changes: 11 additions & 26 deletions src/schema/schema_neo4j_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,39 +206,24 @@ def get_dataset_organ_and_source_info(neo4j_driver, uuid):
source_type = None

with neo4j_driver.session() as session:
sample_query = (f"MATCH (e:Dataset)-[:USED|WAS_GENERATED_BY*]->(s:Sample) "
f"WHERE e.uuid='{uuid}' AND s.sample_category is not null and s.sample_category='Organ' "
f"RETURN apoc.coll.toSet(COLLECT(s)) AS {record_field_name}")
sample_query = ("MATCH (e:Dataset)-[:USED|WAS_GENERATED_BY*]->(s:Sample) WHERE "
f"e.uuid='{uuid}' AND s.sample_category is not null and s.sample_category='Organ' "
"MATCH (s2:Sample)-[:USED|WAS_GENERATED_BY*]->(d:Source) WHERE s2.uuid=s.uuid AND s2.sample_category is not null "
"RETURN DISTINCT d.metadata AS source_metadata, d.source_type AS source_type, "
"CASE WHEN s.organ is not null THEN s.organ "
"ELSE s.sample_category "
"END AS organ_type")

logger.info("======get_dataset_organ_and_source_info() sample_query======")
logger.info(sample_query)

with neo4j_driver.session() as session:
record = session.read_transaction(_execute_readonly_tx, sample_query)

if record and record[record_field_name]:
# Convert the list of nodes to a list of dicts
sample_records = _nodes_to_dicts(record[record_field_name])
for sample_record in sample_records:
if sample_record['organ'] is None:
organ_names.add(sample_record['sample_category'])
else:
organ_names.add(sample_record['organ'])

sample_uuid = sample_record['uuid']

source_query = (f"MATCH (s:Sample)-[:USED|WAS_GENERATED_BY*]->(d:Source) "
f"WHERE s.uuid='{sample_uuid}' AND s.sample_category is not null "
f"RETURN DISTINCT d.metadata AS source_metadata, d.source_type AS source_type")

logger.info("======get_dataset_organ_and_source_info() source_query======")
logger.info(source_query)

source_record = session.read_transaction(_execute_readonly_tx, source_query)

if source_record:
source_metadata.add(source_record[0])
source_type = source_record[1]
if record:
source_metadata.add(record[0])
source_type = record[1]
organ_names.add(record[2])

return organ_names, source_metadata, source_type

Expand Down