From 3903d1286b9b6359ff092a32db462220a81dd178 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Tue, 25 Feb 2025 10:49:45 -0800 Subject: [PATCH 1/6] add migration for smart autocomplete view on eap_items --- .../0029_smart_autocomplete_items.py | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py diff --git a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py new file mode 100644 index 0000000000..0c3912dc14 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py @@ -0,0 +1,159 @@ +from typing import List, Sequence + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.datasets.storages.tags_hash_map import get_array_vals_hash +from snuba.migrations import migration, operations, table_engines +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import Array, Column, Date, String, UInt + +num_attr_buckets = 40 + +columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("item_type", UInt(8)), + Column("date", Date(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), + Column( + "retention_days", + UInt(16), + ), + Column( + "attributes_string_hash", + Array( + UInt(64), Modifiers(materialized=get_array_vals_hash("attributes_string")) + ), + ), + Column("attributes_string", Array(String())), + Column( + "attributes_float_hash", + Array( + UInt(64), Modifiers(materialized=get_array_vals_hash("attributes_float")) + ), + ), + Column("attributes_float", Array(String())), + # a hash of all the attribute keys of the item in sorted order + # this lets us deduplicate rows with merges + Column( + "key_hash", + UInt( + 64, + Modifiers( + materialized="cityHash64(arraySort(arrayConcat(attributes_string, attributes_float)))" + ), + ), + ), +] + + +_attr_num_names = ", ".join( + [f"mapKeys(attributes_float_{i})" for i in range(num_attr_buckets)] +) +_attr_str_names = ", ".join( + [f"mapKeys(attributes_string_{i})" for i in range(num_attr_buckets)] +) + + +MV_QUERY = f""" +SELECT + organization_id AS organization_id, + project_id AS project_id, + item_type as item_type, + toDate(timestamp) AS date, + retention_days as retention_days, + arrayConcat({_attr_str_names}) AS attributes_string, + arrayConcat({_attr_num_names}) AS attributes_float +FROM eap_items_1_local +""" + + +class Migration(migration.ClickhouseNodeMigration): + + blocking = False + storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM + granularity = "8192" + + local_table_name = "eap_item_attrs_1_local" + dist_table_name = "eap_item_attrs_1_dist" + mv_name = "eap_item_attrs_1_mv" + + def forwards_ops(self) -> Sequence[SqlOperation]: + create_table_ops = [ + operations.CreateTable( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + engine=table_engines.ReplacingMergeTree( + storage_set=self.storage_set_key, + primary_key="(organization_id, project_id, date, key_hash)", + order_by="(organization_id, project_id, date, key_hash)", + partition_by="(retention_days, toMonday(date))", + ttl="date + toIntervalDay(retention_days)", + ), + columns=columns, + target=OperationTarget.LOCAL, + ), + operations.CreateTable( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + engine=table_engines.Distributed( + local_table_name=self.local_table_name, + sharding_key=None, + ), + columns=columns, + target=OperationTarget.DISTRIBUTED, + ), + ] + + index_ops = [ + operations.AddIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name="bf_attributes_string_hash", + index_expression="attributes_string_hash", + index_type="bloom_filter", + granularity=1, + target=operations.OperationTarget.LOCAL, + ), + operations.AddIndex( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + index_name="bf_attributes_float_hash", + index_expression="attributes_float_hash", + index_type="bloom_filter", + granularity=1, + target=operations.OperationTarget.LOCAL, + ), + ] + materialized_view_ops: list[SqlOperation] = [] + + materialized_view_ops.append( + operations.CreateMaterializedView( + storage_set=self.storage_set_key, + view_name=self.mv_name, + columns=columns, + destination_table_name=self.local_table_name, + target=OperationTarget.LOCAL, + query=MV_QUERY, + ), + ) + + return create_table_ops + index_ops + materialized_view_ops + + def backwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.mv_name, + target=OperationTarget.LOCAL, + ), + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + target=OperationTarget.LOCAL, + ), + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + target=OperationTarget.DISTRIBUTED, + ), + ] From a531d38730d75a32c6824fc841bedda0060dcc8b Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Tue, 25 Feb 2025 14:19:23 -0800 Subject: [PATCH 2/6] store by weeks for more dedup, deduplicate by retention days --- .../0029_smart_autocomplete_items.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py index 0c3912dc14..9fde4a82ae 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py +++ b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py @@ -59,7 +59,7 @@ organization_id AS organization_id, project_id AS project_id, item_type as item_type, - toDate(timestamp) AS date, + toMonday(timestamp) AS date, retention_days as retention_days, arrayConcat({_attr_str_names}) AS attributes_string, arrayConcat({_attr_num_names}) AS attributes_float @@ -85,7 +85,7 @@ def forwards_ops(self) -> Sequence[SqlOperation]: engine=table_engines.ReplacingMergeTree( storage_set=self.storage_set_key, primary_key="(organization_id, project_id, date, key_hash)", - order_by="(organization_id, project_id, date, key_hash)", + order_by="(organization_id, project_id, date, key_hash, retention_days)", partition_by="(retention_days, toMonday(date))", ttl="date + toIntervalDay(retention_days)", ), From 289ef7b897868b2a6364e8b421cfbe6072e93b0f Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Tue, 25 Feb 2025 14:44:02 -0800 Subject: [PATCH 3/6] add item_type to primary + sort key --- .../0029_smart_autocomplete_items.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py index 9fde4a82ae..259b7344ff 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py +++ b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py @@ -84,8 +84,8 @@ def forwards_ops(self) -> Sequence[SqlOperation]: table_name=self.local_table_name, engine=table_engines.ReplacingMergeTree( storage_set=self.storage_set_key, - primary_key="(organization_id, project_id, date, key_hash)", - order_by="(organization_id, project_id, date, key_hash, retention_days)", + primary_key="(organization_id, project_id, date, item_type, key_hash)", + order_by="(organization_id, project_id, date, item_type, key_hash, retention_days)", partition_by="(retention_days, toMonday(date))", ttl="date + toIntervalDay(retention_days)", ), From 213c1ac03f062541af9158a9c6b89dc71b272fe2 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Thu, 27 Feb 2025 14:02:53 -0800 Subject: [PATCH 4/6] change to one array for hashes --- .../0029_smart_autocomplete_items.py | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py index 259b7344ff..78207dc183 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py +++ b/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py @@ -19,18 +19,17 @@ UInt(16), ), Column( - "attributes_string_hash", + "attribute_keys_hash", Array( - UInt(64), Modifiers(materialized=get_array_vals_hash("attributes_string")) + UInt(64), + Modifiers( + materialized=get_array_vals_hash( + "arrayConcat(attributes_string, attributes_float)" + ) + ), ), ), Column("attributes_string", Array(String())), - Column( - "attributes_float_hash", - Array( - UInt(64), Modifiers(materialized=get_array_vals_hash("attributes_float")) - ), - ), Column("attributes_float", Array(String())), # a hash of all the attribute keys of the item in sorted order # this lets us deduplicate rows with merges @@ -108,17 +107,8 @@ def forwards_ops(self) -> Sequence[SqlOperation]: operations.AddIndex( storage_set=self.storage_set_key, table_name=self.local_table_name, - index_name="bf_attributes_string_hash", - index_expression="attributes_string_hash", - index_type="bloom_filter", - granularity=1, - target=operations.OperationTarget.LOCAL, - ), - operations.AddIndex( - storage_set=self.storage_set_key, - table_name=self.local_table_name, - index_name="bf_attributes_float_hash", - index_expression="attributes_float_hash", + index_name="bf_attribute_keys_hash", + index_expression="attribute_keys_hash", index_type="bloom_filter", granularity=1, target=operations.OperationTarget.LOCAL, From c01ad8e62c06e15de49eba3f7f7614b2dba28f73 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Thu, 27 Feb 2025 14:04:18 -0800 Subject: [PATCH 5/6] up the migration number --- ...art_autocomplete_items.py => 0030_smart_autocomplete_items.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0029_smart_autocomplete_items.py => 0030_smart_autocomplete_items.py} (100%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py similarity index 100% rename from snuba/snuba_migrations/events_analytics_platform/0029_smart_autocomplete_items.py rename to snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py From ec863e392f90fa5db99178deb514818363d1e6c3 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Thu, 27 Feb 2025 14:19:12 -0800 Subject: [PATCH 6/6] more descriptive table name --- .../0030_smart_autocomplete_items.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py b/snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py index 78207dc183..b429ecacbc 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py +++ b/snuba/snuba_migrations/events_analytics_platform/0030_smart_autocomplete_items.py @@ -72,9 +72,9 @@ class Migration(migration.ClickhouseNodeMigration): storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM granularity = "8192" - local_table_name = "eap_item_attrs_1_local" - dist_table_name = "eap_item_attrs_1_dist" - mv_name = "eap_item_attrs_1_mv" + local_table_name = "eap_item_co_occurring_attrs_1_local" + dist_table_name = "eap_item_co_occurring_attrs_1_dist" + mv_name = "eap_item_co_occurring_attrs_1_mv" def forwards_ops(self) -> Sequence[SqlOperation]: create_table_ops = [