-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathgoogle_sql_connector.py
60 lines (51 loc) · 2.02 KB
/
google_sql_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import pyodbc
import csv
import logging
from io import StringIO
class GoogleCloudSQL:
def __init__(self, driver, server, database, user, password, encrypt="yes"):
self.driver = driver
self.server = server
self.database = database
self.user = user
self.password = password
self.encrypt = encrypt
def connect(self):
try:
self.conn = pyodbc.connect(f'DRIVER={{{self.driver}}};SERVER={self.server};DATABASE={self.database};UID={self.user};PWD={self.password};ENCRYPT={self.encrypt}')
return True
except Exception as e:
return str(e)
def close(self):
self.conn.close()
def execute_query(self, query):
print(f'\033[94mExecuting Query:{query}\033[0m')
try:
cursor = self.conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
if len(result) == 0:
result = "0 rows returned"
logging.debug(result)
print(f'\033[96m{result}\033[0m')
return result
headers = [column[0] for column in cursor.description]
output = StringIO()
csv_writer = csv.writer(output)
csv_writer.writerow(headers)
csv_writer.writerows(result)
result = output.getvalue()
logging.debug(result)
print(f'\033[96m{result}\033[0m')
return result
except Exception as e:
return str(e)
def process_table_string(self, input_str):
items = input_str.split(',')
items = [item.split('.')[-1] for item in items]
formatted_str = "', '".join(items)
result = f"'{formatted_str}'"
return result
def execute_schema(self, table_list):
queryPart = self.process_table_string(table_list)
return f"SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME, ', ', COLUMN_NAME, ', ', DATA_TYPE) AS 'Table, Column, DataType' FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME IN ({queryPart})"