Skip to content

Deprecate global mutex #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
16 changes: 9 additions & 7 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: dart-lang/setup-dart@v1

- name: Install dependencies
run: dart pub get
- name: Check formatting
run: dart format --output=none --set-exit-if-changed .
- name: Lint
run: dart analyze
run: dart analyze lib test example
# This fails when clong into sqlite_async.dart. Disable for now.
# - name: Test Fixes
# run: dart fix --compare-to-golden test_fixes
- name: Publish dry-run
run: dart pub publish --dry-run
- name: Check publish score
Expand All @@ -32,19 +34,19 @@ jobs:
include:
- sqlite_version: "3440200"
sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3440200.tar.gz"
dart_sdk: 3.2.4
dart_sdk: 3.3.3
- sqlite_version: "3430200"
sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3430200.tar.gz"
dart_sdk: 3.2.4
dart_sdk: 3.3.3
- sqlite_version: "3420000"
sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3420000.tar.gz"
dart_sdk: 3.2.4
dart_sdk: 3.3.3
- sqlite_version: "3410100"
sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3410100.tar.gz"
dart_sdk: 3.2.4
dart_sdk: 3.3.3
- sqlite_version: "3380000"
sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz"
dart_sdk: 3.2.0
dart_sdk: 3.3.3
steps:
- uses: actions/checkout@v3
- uses: dart-lang/setup-dart@v1
Expand Down
2 changes: 1 addition & 1 deletion example/basic_example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void main() async {
// Combine multiple statements into a single write transaction for:
// 1. Atomic persistence (all updates are either applied or rolled back).
// 2. Improved throughput.
await db.writeTransaction((tx) async {
await db.transaction((tx) async {
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']);
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']);
});
Expand Down
70 changes: 70 additions & 0 deletions lib/fix_data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# This provides automatic fixes of deprecations using `dart fix`.
# See: https://github.com/flutter/flutter/wiki/Data-driven-Fixes
version: 1
transforms:
- title: 'Rename writeLock to lock'
date: 2024-04-02
element:
uris: ['src/sqlite_connection.dart', 'sqlite_async.dart']
method: 'writeLock'
inClass: 'SqliteConnection'
changes:
- kind: 'rename'
newName: 'lock'
- title: 'Rename writeTransaction to transaction'
date: 2024-04-02
element:
uris: ['src/sqlite_connection.dart', 'sqlite_async.dart']
method: 'writeTransaction'
inClass: 'SqliteConnection'
changes:
- kind: 'rename'
newName: 'transaction'
- title: 'Rename writeLock to lock'
date: 2024-04-02
element:
uris: ['src/sqlite_database.dart', 'sqlite_async.dart']
method: 'writeLock'
inClass: 'SqliteDatabase'
changes:
- kind: 'rename'
newName: 'lock'
- title: 'Rename writeTransaction to transaction'
date: 2024-04-02
element:
uris: ['src/sqlite_database.dart', 'sqlite_async.dart']
method: 'writeTransaction'
inClass: 'SqliteDatabase'
changes:
- kind: 'rename'
newName: 'transaction'
- title: 'Rename readTransaction to transaction'
date: 2024-04-02
element:
uris: ['src/sqlite_database.dart', 'sqlite_async.dart']
method: 'readTransaction'
inClass: 'SqliteDatabase'
changes:
- kind: 'addParameter'
index: 1
name: 'readOnly'
style: required_named
argumentValue:
expression: 'true'
- kind: 'rename'
newName: 'transaction'
- title: 'Rename readLock to lock'
date: 2024-04-02
element:
uris: ['src/sqlite_database.dart', 'sqlite_async.dart']
method: 'readLock'
inClass: 'SqliteDatabase'
changes:
- kind: 'addParameter'
index: 1
name: 'readOnly'
style: required_named
argumentValue:
expression: 'true'
- kind: 'rename'
newName: 'lock'
40 changes: 34 additions & 6 deletions lib/src/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import 'update_notification.dart';

/// A connection pool with a single write connection and multiple read connections.
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
SqliteConnection? _writeConnection;
SqliteConnectionImpl? _writeConnection;

final List<SqliteConnectionImpl> _readConnections = [];

Expand Down Expand Up @@ -42,7 +42,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
SqliteConnectionPool(this._factory,
{this.updates,
this.maxReaders = 5,
SqliteConnection? writeConnection,
SqliteConnectionImpl? writeConnection,
this.debugName,
required this.mutex,
required SerializedPortClient upstreamPort})
Expand Down Expand Up @@ -72,7 +72,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
_readConnections.remove(connection);
}
try {
return await connection.readLock((ctx) async {
return await connection.lock((ctx) async {
if (haveLock) {
// Already have a different lock - release this one.
return false;
Expand All @@ -91,7 +91,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
}

return true;
}, lockTimeout: lockTimeout, debugContext: debugContext);
},
lockTimeout: lockTimeout,
readOnly: true,
debugContext: debugContext);
} on TimeoutException {
return false;
}
Expand All @@ -117,6 +120,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
return _writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext, global: true);
}

Future<T> _writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext, required bool global}) {
if (closed) {
throw AssertionError('Closed');
}
Expand All @@ -132,11 +141,30 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
readOnly: false,
openFactory: _factory);
return _runZoned(() {
return _writeConnection!.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
if (global) {
// ignore: deprecated_member_use_from_same_package
return _writeConnection!.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
} else {
return _writeConnection!.lock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
}
}, debugContext: debugContext ?? 'execute()');
}

@override
Future<T> lock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{bool? readOnly, Duration? lockTimeout, String? debugContext}) {
if (readOnly == true) {
// ignore: deprecated_member_use_from_same_package
return readLock((ctx) => callback(ctx as SqliteWriteContext),
lockTimeout: lockTimeout, debugContext: debugContext);
} else {
return _writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext, global: false);
}
}

/// The [Mutex] on individual connections do already error in recursive locks.
///
/// We duplicate the same check here, to:
Expand Down
39 changes: 35 additions & 4 deletions lib/src/sqlite_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,28 @@ abstract class SqliteWriteContext extends SqliteReadContext {

/// Abstract class representing a connection to the SQLite database.
abstract class SqliteConnection extends SqliteWriteContext {
/// Open a transaction.
///
/// Opens a read-write transaction by default. Set [readOnly] to open a
/// read-only transaction.
///
/// Only one read-write transaction can execute against the database at a time.
///
/// Statements within the transaction must be done on the provided
/// [SqliteWriteContext] - attempting statements on the [SqliteConnection]
/// instance will error.
///
/// [lockTimeout] only controls the timeout for locking the connection.
/// Timeout for database-level locks can be configured on [SqliteOpenOptions].
Future<T> transaction<T>(Future<T> Function(SqliteWriteContext tx) callback,
{bool? readOnly, Duration? lockTimeout});

/// Open a read-only transaction.
///
/// Statements within the transaction must be done on the provided
/// [SqliteReadContext] - attempting statements on the [SqliteConnection]
/// instance will error.
@Deprecated('Use [transaction(callback)] instead.')
Future<T> readTransaction<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout});
Expand All @@ -89,6 +106,7 @@ abstract class SqliteConnection extends SqliteWriteContext {
/// Statements within the transaction must be done on the provided
/// [SqliteWriteContext] - attempting statements on the [SqliteConnection]
/// instance will error.
@Deprecated('Use [transaction(callback)] instead.')
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout});
Expand All @@ -102,18 +120,31 @@ abstract class SqliteConnection extends SqliteWriteContext {
{List<Object?> parameters = const [],
Duration throttle = const Duration(milliseconds: 30)});

/// Takes a read lock, without starting a transaction.
/// Takes a read lock on this connection, without starting a transaction.
///
/// In most cases, [readTransaction] should be used instead.
/// This is a low-level API. In most cases, [transaction] should be used instead.
@Deprecated('Use [lock] instead.')
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext});

/// Takes a global lock, without starting a transaction.
/// Takes a global write lock, without starting a transaction.
///
/// In most cases, [writeTransaction] should be used instead.
/// This is a low-level API. In most cases, [transaction] should be used instead.
@Deprecated('Use [lock] instead.')
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext});

/// Lock a connection for exclusive usage.
///
/// When using a connection pool, use [readOnly] to select a read connection.
/// In other contexts, [readOnly] has no effect.
///
/// This is a low-level API. In most cases, [transaction] should be used instead.
///
/// Any direct query methods such as [getAll] or [execute] uses [lock] internally.
Future<T> lock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{bool? readOnly, Duration? lockTimeout, String? debugContext});

Future<void> close();

/// Returns true if the connection is closed
Expand Down
20 changes: 9 additions & 11 deletions lib/src/sqlite_connection_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
Future<T> lock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{bool? readOnly, Duration? lockTimeout, String? debugContext}) async {
// Private lock to synchronize this with other statements on the same connection,
// to ensure that transactions aren't interleaved.
return _connectionMutex.lock(() async {
Expand All @@ -120,6 +120,12 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
}, timeout: lockTimeout);
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
return lock(callback, lockTimeout: lockTimeout, debugContext: debugContext);
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
Expand Down Expand Up @@ -271,7 +277,6 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
final server = params.portServer;
final commandPort = ReceivePort();

Timer? updateDebouncer;
Set<String> updatedTables = {};
int? txId;
Object? txError;
Expand All @@ -280,25 +285,18 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
if (updatedTables.isNotEmpty) {
client.fire(UpdateNotification(updatedTables));
updatedTables.clear();
updateDebouncer?.cancel();
updateDebouncer = null;
}
}

db.updates.listen((event) {
updatedTables.add(event.tableName);

// This handles two cases:
// 1. Update arrived after _SqliteIsolateClose (not sure if this could happen).
// 2. Long-running _SqliteIsolateClosure that should fire updates while running.
updateDebouncer ??=
Timer(const Duration(milliseconds: 10), maybeFireUpdates);
});

server.open((data) async {
if (data is _SqliteIsolateClose) {
if (txId != null) {
if (!db.autocommit) {
updatedTables.clear();
db.execute('ROLLBACK');
}
txId = null;
Expand Down
Loading
Loading