Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load csv/json v2 protocol #104

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions railib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def list_models(ctx: Context, database: str, engine: str) -> list:
# Generate a rel literal relation for the given dict.
def _gen_literal_dict(items: dict) -> str:
result = []
for k, v in items:
for k, v in items.items():
result.append(f"{_gen_literal(k)},{_gen_literal(v)}")
return "{" + ";".join(result) + "}"

Expand Down Expand Up @@ -761,6 +761,14 @@ def _gen_syntax_config(syntax: dict = {}) -> str:
return result


# Generate list of config schema options for `load_csv`
def _gen_schema_config(schema: dict = {}) -> str:
result = ""
for k, v in schema.items():
result += f"def config:schema{k} = \"{v}\"\n"
return result


# `syntax`:
# * header: a map from col number to name (base 1)
# * header_row: row number of header, 0 means no header (default: 1)
Expand All @@ -777,6 +785,7 @@ def load_csv(
relation: str,
data: str or io.TextIOBase,
syntax: dict = {},
schema: dict = {},
) -> dict:
if isinstance(data, str):
pass # ok
Expand All @@ -786,8 +795,9 @@ def load_csv(
raise TypeError(f"bad type for arg 'data': {data.__class__.__name__}")
inputs = {"data": data}
command = _gen_syntax_config(syntax)
command += _gen_schema_config(schema)
command += "def config:data = data\n" "def insert:%s = load_csv[config]" % relation
return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False)
return exec(ctx, database, engine, command, inputs=inputs, readonly=False)


def load_json(
Expand All @@ -805,7 +815,7 @@ def load_json(
raise TypeError(f"bad type for arg 'data': {data.__class__.__name__}")
inputs = {"data": data}
command = "def config:data = data\n" "def insert:%s = load_json[config]" % relation
return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False)
return exec(ctx, database, engine, command, inputs=inputs, readonly=False)


def exec_v1(
Expand Down
94 changes: 84 additions & 10 deletions tests/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def create_engine_wait(ctx: api.Context, engine: str):
custom_headers = json.loads(os.getenv('CUSTOM_HEADERS', '{}'))

if client_id is None:
print("using config from path")
cfg = config.read()
else:
file = tempfile.NamedTemporaryFile(mode="w")
Expand All @@ -51,19 +50,19 @@ def create_engine_wait(ctx: api.Context, engine: str):

ctx = api.Context(**cfg)

suffix = uuid.uuid4()
engine = f"python-sdk-{suffix}"
dbname = f"python-sdk-{suffix}"


class TestTransactionAsync(unittest.TestCase):
def setUp(self):
create_engine_wait(ctx, engine)
api.create_database(ctx, dbname)
suffix = uuid.uuid4()
self.engine = f"python-sdk-{suffix}"
self.dbname = f"python-sdk-{suffix}"

create_engine_wait(ctx, self.engine)
api.create_database(ctx, self.dbname)

def test_v2_exec(self):
cmd = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}"
rsp = api.exec(ctx, dbname, engine, cmd)
rsp = api.exec(ctx, self.dbname, self.engine, cmd)

# transaction
self.assertEqual("COMPLETED", rsp.transaction["state"])
Expand All @@ -89,8 +88,83 @@ def test_v2_exec(self):
1, 16, 81, 256, 625]}, rsp.results[0]["table"].to_pydict())

def tearDown(self):
api.delete_engine(ctx, engine)
api.delete_database(ctx, dbname)
api.delete_engine(ctx, self.engine)
api.delete_database(ctx, self.dbname)


class TestDataload(unittest.TestCase):
def setUp(self):
suffix = uuid.uuid4()
self.engine = f"python-sdk-{suffix}"
self.dbname = f"python-sdk-{suffix}"

create_engine_wait(ctx, self.engine)
api.create_database(ctx, self.dbname)

def test_load_json(self):
json = '{ "test" : 123 }'
resp = api.load_json(ctx, self.dbname, self.engine, 'test_relation', json)
self.assertEqual("COMPLETED", resp.transaction["state"])

resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation')
self.assertEqual("COMPLETED", resp.transaction["state"])
self.assertEqual({'v1': [123]}, resp.results[0]["table"].to_pydict())

def test_load_csv(self):
csv = 'foo,bar\n1,2'
resp = api.load_csv(ctx, self.dbname, self.engine, 'test_relation', csv)
self.assertEqual("COMPLETED", resp.transaction["state"])

resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation')
self.assertEqual("COMPLETED", resp.transaction["state"])
self.assertEqual({'v1': [2], 'v2': ['2']}, resp.results[0]["table"].to_pydict())
self.assertEqual({'v1': [2], 'v2': ['1']}, resp.results[1]["table"].to_pydict())

def test_load_csv_with_syntax(self):
csv = 'foo|bar\n1,2'
resp = api.load_csv(
ctx,
self.dbname,
self.engine,
'test_relation',
csv,
{
'header': {1: 'foo', 2: 'bar'},
'delim': '|',
'quotechar': "'",
'header_row': 0,
'escapechar': ']'
}
)
self.assertEqual("COMPLETED", resp.transaction["state"])

resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation')
self.assertEqual("COMPLETED", resp.transaction["state"])
self.assertEqual({'v1': [2], 'v2': [2], 'v3': ['1,2']}, resp.results[0]["table"].to_pydict())

def test_load_csv_with_schema(self):
csv = 'foo,bar\n1,test'
resp = api.load_csv(
ctx,
self.dbname,
self.engine,
'test_relation',
csv,
schema={
':foo': 'int',
':bar': 'string'
}
)
self.assertEqual("COMPLETED", resp.transaction["state"])

resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation')
self.assertEqual("COMPLETED", resp.transaction["state"])
self.assertEqual({'v1': [2], 'v2': ['test']}, resp.results[0]["table"].to_pydict())
self.assertEqual({'v1': [2], 'v2': [1]}, resp.results[1]["table"].to_pydict())

def tearDown(self):
api.delete_engine(ctx, self.engine)
api.delete_database(ctx, self.dbname)


if __name__ == '__main__':
Expand Down