Skip to content

Commit

Permalink
feat: add multi-query transaction support to snowflake piece
Browse files Browse the repository at this point in the history
  • Loading branch information
valentin-mourtialon committed Jan 15, 2025
1 parent 4d1dd96 commit a39a41b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 27 deletions.
2 changes: 1 addition & 1 deletion packages/pieces/community/snowflake/package.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"name": "@activepieces/piece-snowflake",
"version": "0.0.9"
"version": "0.1.0"
}
6 changes: 3 additions & 3 deletions packages/pieces/community/snowflake/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
PieceAuth,
Property,
} from '@activepieces/pieces-framework';
import { runQuery } from './lib/actions/run-query';
import { runQueries } from './lib/actions/run-query';
import { PieceCategory } from '@activepieces/shared';
import { insertRowAction } from './lib/actions/insert-row';

Expand Down Expand Up @@ -53,7 +53,7 @@ export const snowflake = createPiece({
minimumSupportedRelease: '0.27.1',
logoUrl: 'https://cdn.activepieces.com/pieces/snowflake.png',
categories: [PieceCategory.DEVELOPER_TOOLS],
authors: ['AdamSelene', 'abuaboud'],
actions: [runQuery, insertRowAction],
authors: ['AdamSelene', 'abuaboud', 'valentin-mourtialon'],
actions: [runQueries, insertRowAction],
triggers: [],
});
102 changes: 79 additions & 23 deletions packages/pieces/community/snowflake/src/lib/actions/run-query.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import { createAction, Property } from '@activepieces/pieces-framework';
import snowflake from 'snowflake-sdk';
import snowflake, { Statement, SnowflakeError } from 'snowflake-sdk';
import { snowflakeAuth } from '../../index';

type QueryResult = unknown[] | undefined;

const DEFAULT_APPLICATION_NAME = 'ActivePieces';
const DEFAULT_QUERY_TIMEOUT = 30000;

export const runQuery = createAction({
name: 'runQuery',
displayName: 'Run Query',
description: 'Run Query',
export const runQueries = createAction({
name: 'runQueries',
displayName: 'Run Queries',
description: 'Run Queries',
auth: snowflakeAuth,
props: {
sqlText: Property.ShortText({
displayName: 'SQL query',
description: 'Use :1, :2… or ? placeholders to use binding parameters.',
sqlTexts: Property.Array({
displayName: 'SQL queries',
description:
'Array of SQL queries to execute in order, in the same transaction. Use :1, :2… or ? placeholders to use binding parameters.',
required: true,
}),
binds: Property.Array({
Expand Down Expand Up @@ -52,29 +55,82 @@ export const runQuery = createAction({
account,
});

return new Promise((resolve, reject) => {
connection.connect(function (err, conn) {
return new Promise<QueryResult>((resolve, reject) => {
connection.connect(function (err: SnowflakeError | undefined) {
if (err) {
reject(err);
return;
}
});

const { sqlText, binds } = context.propsValue;

connection.execute({
sqlText,
binds: binds as snowflake.Binds,
complete: (err, stmt, rows) => {
if (err) {
reject(err);
}
connection.destroy((err, conn) => {
connection.execute({
sqlText: 'BEGIN',
complete: (err: SnowflakeError | undefined) => {
if (err) {
reject(err);
return;
}
executeQueriesSequentially();
},
});

const { sqlTexts, binds } = context.propsValue;
let lastQueryResult: QueryResult = [];

function handleError(err: SnowflakeError) {
connection.execute({
sqlText: 'ROLLBACK',
complete: () => {
connection.destroy(() => {
reject(err);
});
},
});
resolve(rows);
},
}

async function executeQueriesSequentially() {
try {
for (const sqlText of sqlTexts) {
lastQueryResult = await new Promise<QueryResult>(
(resolveQuery, rejectQuery) => {
connection.execute({
sqlText: sqlText as string,
binds: binds as snowflake.Binds,
complete: (
err: SnowflakeError | undefined,
stmt: Statement,
rows: QueryResult
) => {
if (err) {
rejectQuery(err);
return;
}
resolveQuery(rows);
},
});
}
);
}

connection.execute({
sqlText: 'COMMIT',
complete: (err: SnowflakeError | undefined) => {
if (err) {
handleError(err);
return;
}
connection.destroy((err: SnowflakeError | undefined) => {
if (err) {
reject(err);
return;
}
resolve(lastQueryResult);
});
},
});
} catch (err) {
handleError(err as SnowflakeError); // Reject with the original error!
}
}
});
});
},
Expand Down

0 comments on commit a39a41b

Please sign in to comment.