From 06c241637c385b6539841d870b8e3e939c6f8dc7 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 27 Mar 2025 14:22:05 +0530 Subject: [PATCH] Added examples and fixed the async execute not working without pyarrow --- examples/query_async_execute.py | 32 ++++++++++++++++++++++++++++ src/databricks/sql/client.py | 21 ++++++++++++------ src/databricks/sql/thrift_backend.py | 15 +++++++------ tests/e2e/test_driver.py | 12 +++-------- 4 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 examples/query_async_execute.py diff --git a/examples/query_async_execute.py b/examples/query_async_execute.py new file mode 100644 index 00000000..de4712fe --- /dev/null +++ b/examples/query_async_execute.py @@ -0,0 +1,32 @@ +from databricks import sql +import os +import time + +with sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), +) as connection: + + with connection.cursor() as cursor: + long_running_query = """ + SELECT COUNT(*) FROM RANGE(10000 * 16) x + JOIN RANGE(10000) y + ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%' + """ + + # Non-blocking call + cursor.execute_async(long_running_query) + + # Polling every 5 seconds until the query is no longer pending + while cursor.is_query_pending(): + print("POLLING") + time.sleep(5) + + # Blocking call: fetch results when execution completes + cursor.get_async_execution_result() + + result = cursor.fetchall() + + for res in result: + print(res) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 93e3975a..ea901c3a 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -896,6 +896,19 @@ def get_query_state(self) -> "TOperationState": self._check_not_closed() return self.thrift_backend.get_query_state(self.active_op_handle) + def is_query_pending(self): + """ + Checks whether the async executing query is in pending state or not + + :return: + """ + operation_state = self.get_query_state() + + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + def get_async_execution_result(self): """ @@ -905,13 +918,7 @@ def get_async_execution_result(self): """ self._check_not_closed() - def is_executing(operation_state) -> "bool": - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] - - while is_executing(self.get_query_state()): + while self.is_query_pending(): # Poll after some default time time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index e1e27273..2e3478d7 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -797,12 +797,15 @@ def get_execution_result(self, op_handle, cursor): t_result_set_metadata_resp.schema ) - schema_bytes = ( - t_result_set_metadata_resp.arrowSchema - or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) - .serialize() - .to_pybytes() - ) + if pyarrow: + schema_bytes = ( + t_result_set_metadata_resp.arrowSchema + or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) + .serialize() + .to_pybytes() + ) + else: + schema_bytes = None queue = ResultSetQueueFactory.build_queue( row_set_type=resp.resultSetMetadata.resultFormat, diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 45fea480..8c0a4a5a 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -179,12 +179,6 @@ def test_cloud_fetch(self): class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase): - def isExecuting(self, operation_state): - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] - def test_execute_async__long_running(self): long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" @@ -192,7 +186,7 @@ def test_execute_async__long_running(self): cursor.execute_async(long_running_query) ## Polling after every POLLING_INTERVAL seconds - while self.isExecuting(cursor.get_query_state()): + while cursor.is_query_pending(): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -211,7 +205,7 @@ def test_execute_async__small_result(self): time.sleep(5) ## Polling after every POLLING_INTERVAL seconds - while self.isExecuting(cursor.get_query_state()): + while cursor.is_query_pending(): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -241,7 +235,7 @@ def test_execute_async__large_result(self): time.sleep(5) ## Polling after every POLLING_INTERVAL seconds - while self.isExecuting(cursor.get_query_state()): + while cursor.is_query_pending(): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async")