Skip to content

Commit 9653f5c

Browse files
authored
Merge pull request #376 from powersync-ja/feat/queue-read-connections-op-sqlite
Feat: Queue read connections OPSQLite
2 parents 8cc1524 + 39296bb commit 9653f5c

File tree

2 files changed

+41
-24
lines changed

2 files changed

+41
-24
lines changed

.changeset/kind-cats-check.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/op-sqlite': patch
3+
---
4+
5+
Improved queueing for read connections

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

+36-24
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3535

3636
protected initialized: Promise<void>;
3737

38-
protected readConnections: OPSQLiteConnection[] | null;
38+
protected readConnections: Array<{ busy: boolean; connection: OPSQLiteConnection }> | null;
3939

4040
protected writeConnection: OPSQLiteConnection | null;
4141

42+
private readQueue: Array<() => void> = [];
43+
4244
constructor(protected options: OPSQLiteAdapterOptions) {
4345
super();
4446
this.name = this.options.name;
@@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
8890
let dbName = './'.repeat(i + 1) + dbFilename;
8991
const conn = await this.openConnection(dbName);
9092
await conn.execute('PRAGMA query_only = true');
91-
this.readConnections.push(conn);
93+
this.readConnections.push({ busy: false, connection: conn });
9294
}
9395
}
9496

@@ -145,36 +147,46 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
145147
close() {
146148
this.initialized.then(() => {
147149
this.writeConnection!.close();
148-
this.readConnections!.forEach((c) => c.close());
150+
this.readConnections!.forEach((c) => c.connection.close());
149151
});
150152
}
151153

152154
async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
153155
await this.initialized;
154-
// TODO: Use async queues to handle multiple read connections
155-
const sortedConnections = this.readConnections!.map((connection, index) => ({
156-
lockKey: `${LockType.READ}-${index}`,
157-
connection
158-
})).sort((a, b) => {
159-
const aBusy = this.locks.isBusy(a.lockKey);
160-
const bBusy = this.locks.isBusy(b.lockKey);
161-
// Sort by ones which are not busy
162-
return aBusy > bBusy ? 1 : 0;
156+
return new Promise(async (resolve, reject) => {
157+
const execute = async () => {
158+
// Find an available connection that is not busy
159+
const availableConnection = this.readConnections!.find((conn) => !conn.busy);
160+
161+
// If we have an available connection, use it
162+
if (availableConnection) {
163+
availableConnection.busy = true;
164+
try {
165+
resolve(await fn(availableConnection.connection));
166+
} catch (error) {
167+
reject(error);
168+
} finally {
169+
availableConnection.busy = false;
170+
// After query execution, process any queued tasks
171+
this.processQueue();
172+
}
173+
} else {
174+
// If no available connections, add to the queue
175+
this.readQueue.push(execute);
176+
}
177+
};
178+
179+
execute();
163180
});
181+
}
164182

165-
return new Promise(async (resolve, reject) => {
166-
try {
167-
await this.locks.acquire(
168-
sortedConnections[0].lockKey,
169-
async () => {
170-
resolve(await fn(sortedConnections[0].connection));
171-
},
172-
{ timeout: options?.timeoutMs }
173-
);
174-
} catch (ex) {
175-
reject(ex);
183+
private async processQueue(): Promise<void> {
184+
if (this.readQueue.length > 0) {
185+
const next = this.readQueue.shift();
186+
if (next) {
187+
next();
176188
}
177-
});
189+
}
178190
}
179191

180192
async writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {

0 commit comments

Comments
 (0)