Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sweep: Create LanceDB index after table is created in import #87

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/vdf_io/import_vdf/astradb_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from astrapy.db import AstraDB
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from qdrant_client.http.models import Distance

from vdf_io.constants import INT_MAX
from vdf_io.names import DBNames
Expand Down
25 changes: 24 additions & 1 deletion src/vdf_io/import_vdf/lancedb_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyarrow.parquet as pq

import lancedb
from lancedb import create_index

from vdf_io.constants import DEFAULT_BATCH_SIZE, INT_MAX
from vdf_io.meta_types import NamespaceMeta
Expand All @@ -23,6 +24,7 @@

class ImportLanceDB(ImportVDB):
DB_NAME_SLUG = DBNames.LANCEDB
ID_COLUMN = "id"

@classmethod
def import_vdb(cls, args):
Expand Down Expand Up @@ -103,6 +105,25 @@ def upsert_data(self):
table = self.db.open_table(new_index_name)
tqdm.write(f"Opened table {new_index_name}")

# Get the ID column from the parquet file schema
parquet_schema = pq.read_schema(parquet_files[0])
id_column = None
for field in parquet_schema:
if field.name == self.ID_COLUMN:
id_column = field.name
break

if id_column:
# Create index on the table
create_index(table, id_column)
tqdm.write(
f"Created index on {id_column} for table {new_index_name}"
)
else:
tqdm.write(
f"Warning: No ID column {self.ID_COLUMN} found in schema. Skipping index creation for table {new_index_name}"
)

for file in tqdm(parquet_files, desc="Iterating parquet files"):
file_path = self.get_file_path(final_data_path, file)
df = self.read_parquet_progress(
Expand All @@ -117,7 +138,9 @@ def upsert_data(self):
for col in df.columns:
if col not in [field.name for field in table.schema]:
col_type = df[col].dtype
tqdm.write(f"Adding column {col} of type {col_type} to {new_index_name}")
tqdm.write(
f"Adding column {col} of type {col_type} to {new_index_name}"
)
table.add_columns(
{
col: get_default_value(col_type),
Expand Down