Skip to content

Commit 2d44655

Browse files
authored
Store and use TypeCodecContext in PgBytesPgByteDataReader (isoos#357)
1 parent f93856e commit 2d44655

8 files changed

+72
-26
lines changed

Diff for: analysis_options.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ analyzer:
1111
unused_import: error
1212
unused_local_variable: error
1313
dead_code: error
14+
deprecated_member_use_from_same_package: ignore
1415

1516
linter:
1617
rules:

Diff for: lib/src/buffer.dart

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:convert';
22

33
import 'package:buffer/buffer.dart';
4+
import 'package:postgres/src/types/type_codec.dart';
45

56
/// This class doesn't add much over using `List<int>` instead, however,
67
/// it creates a nice explicit type difference from both `String` and `List<int>`,
@@ -42,17 +43,19 @@ class PgByteDataWriter extends ByteDataWriter {
4243
const _emptyString = '';
4344

4445
class PgByteDataReader extends ByteDataReader {
45-
final Encoding encoding;
46+
final TypeCodecContext typeCodecContext;
4647

4748
PgByteDataReader({
48-
required this.encoding,
49+
required this.typeCodecContext,
4950
});
5051

52+
Encoding get encoding => typeCodecContext.encoding;
53+
5154
String readNullTerminatedString() {
5255
final bytes = readUntilTerminatingByte(0);
5356
if (bytes.isEmpty) {
5457
return _emptyString;
5558
}
56-
return encoding.decode(bytes);
59+
return typeCodecContext.encoding.decode(bytes);
5760
}
5861
}

Diff for: lib/src/message_window.dart

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import 'dart:collection';
2-
import 'dart:convert';
32
import 'dart:typed_data';
43

54
import 'package:buffer/buffer.dart';
65
import 'package:charcode/ascii.dart';
6+
import 'package:postgres/src/types/type_codec.dart';
77

88
import 'buffer.dart';
99
import 'messages/server_messages.dart';
@@ -35,11 +35,11 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
3535
};
3636

3737
class MessageFramer {
38-
final Encoding _encoding;
39-
late final _reader = PgByteDataReader(encoding: _encoding);
38+
final TypeCodecContext _typeCodecContext;
39+
late final _reader = PgByteDataReader(typeCodecContext: _typeCodecContext);
4040
final messageQueue = Queue<ServerMessage>();
4141

42-
MessageFramer(this._encoding);
42+
MessageFramer(this._typeCodecContext);
4343

4444
int? _type;
4545
int _expectedLength = 0;
@@ -116,7 +116,11 @@ ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
116116
if (code == ReplicationMessageId.primaryKeepAlive) {
117117
return PrimaryKeepAliveMessage.parse(reader);
118118
} else if (code == ReplicationMessageId.xLogData) {
119-
return XLogDataMessage.parse(reader.read(length - 1), reader.encoding);
119+
return XLogDataMessage.parse(
120+
reader.read(length - 1),
121+
reader.encoding,
122+
typeCodecContext: reader.typeCodecContext,
123+
);
120124
} else {
121125
final bb = BytesBuffer();
122126
bb.addByte(code);

Diff for: lib/src/messages/server_messages.dart

+14-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:convert';
22
import 'dart:typed_data';
33

44
import 'package:meta/meta.dart';
5+
import 'package:postgres/src/types/type_codec.dart';
56

67
import '../buffer.dart';
78
import '../time_converters.dart';
@@ -366,8 +367,19 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage {
366367
/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method
367368
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
368369
/// return [XLogDataMessage] with raw data.
369-
static XLogDataMessage parse(Uint8List bytes, Encoding encoding) {
370-
final reader = PgByteDataReader(encoding: encoding)..add(bytes);
370+
///
371+
@Deprecated(
372+
'It is likely that this method signature will change or will be removed in '
373+
'an upcoming release. Please file a new issue on GitHub if you are using it.')
374+
static XLogDataMessage parse(
375+
Uint8List bytes,
376+
Encoding encoding, {
377+
TypeCodecContext? typeCodecContext,
378+
}) {
379+
final reader = PgByteDataReader(
380+
typeCodecContext: typeCodecContext ??
381+
TypeCodecContext.withDefaults(encoding: encoding))
382+
..add(bytes);
371383
final walStart = LSN(reader.readUint64());
372384
final walEnd = LSN(reader.readUint64());
373385
final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());

Diff for: lib/src/types/type_codec.dart

+15-1
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,22 @@ class TypeCodecContext {
104104
required this.typeRegistry,
105105
});
106106

107+
factory TypeCodecContext.withDefaults({
108+
Encoding? encoding,
109+
RelationTracker? relationTracker,
110+
RuntimeParameters? runtimeParameters,
111+
TypeRegistry? typeRegistry,
112+
}) {
113+
return TypeCodecContext(
114+
encoding: encoding ?? utf8,
115+
relationTracker: relationTracker ?? RelationTracker(),
116+
runtimeParameters: runtimeParameters ?? RuntimeParameters(),
117+
typeRegistry: typeRegistry ?? TypeRegistry(),
118+
);
119+
}
120+
107121
PgByteDataReader newPgByteDataReader([Uint8List? bytes]) {
108-
final reader = PgByteDataReader(encoding: encoding);
122+
final reader = PgByteDataReader(typeCodecContext: this);
109123
if (bytes != null) {
110124
reader.add(bytes);
111125
}

Diff for: lib/src/v3/connection.dart

+18-7
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,18 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
200200
final settings = connectionSettings is ResolvedConnectionSettings
201201
? connectionSettings
202202
: ResolvedConnectionSettings(connectionSettings, null);
203-
var (channel, secure) = await _connect(endpoint, settings);
203+
final typeCodecContext = TypeCodecContext(
204+
encoding: settings.encoding,
205+
// TODO: share this between pooled connections
206+
relationTracker: RelationTracker(),
207+
runtimeParameters: RuntimeParameters(),
208+
typeRegistry: settings.typeRegistry,
209+
);
210+
var (channel, secure) = await _connect(
211+
endpoint,
212+
settings,
213+
typeCodecContext: typeCodecContext,
214+
);
204215

205216
if (_debugLog) {
206217
channel = channel.transform(StreamChannelTransformer(
@@ -226,9 +237,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
226237
settings,
227238
channel,
228239
secure,
229-
// TODO: share this between pooled connections
230-
relationTracker: RelationTracker(),
231-
runtimeParameters: RuntimeParameters(),
240+
relationTracker: typeCodecContext.relationTracker,
241+
runtimeParameters: typeCodecContext.runtimeParameters,
232242
);
233243
await connection._startup();
234244
if (connection._settings.onOpen != null) {
@@ -239,8 +249,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
239249

240250
static Future<(StreamChannel<Message>, bool)> _connect(
241251
Endpoint endpoint,
242-
ResolvedConnectionSettings settings,
243-
) async {
252+
ResolvedConnectionSettings settings, {
253+
required TypeCodecContext typeCodecContext,
254+
}) async {
244255
final host = endpoint.host;
245256
final port = endpoint.port;
246257

@@ -331,7 +342,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
331342

332343
return (
333344
StreamChannel<List<int>>(adaptedStream, outgoingSocket)
334-
.transform(messageTransformer(settings.encoding)),
345+
.transform(messageTransformer(typeCodecContext)),
335346
secure,
336347
);
337348
}

Diff for: lib/src/v3/protocol.dart

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import 'dart:async';
2-
import 'dart:convert';
32
import 'dart:typed_data';
43

54
import 'package:async/async.dart';
5+
import 'package:postgres/src/types/type_codec.dart';
66
import 'package:stream_channel/stream_channel.dart';
77

88
import '../buffer.dart';
@@ -34,9 +34,9 @@ class AggregatedClientMessage extends ClientMessage {
3434
}
3535

3636
StreamChannelTransformer<Message, List<int>> messageTransformer(
37-
Encoding encoding) {
37+
TypeCodecContext typeCodecContext) {
3838
return StreamChannelTransformer(
39-
_readMessages(encoding),
39+
_readMessages(typeCodecContext),
4040
StreamSinkTransformer.fromHandlers(
4141
handleData: (message, out) {
4242
if (message is! ClientMessage) {
@@ -47,16 +47,17 @@ StreamChannelTransformer<Message, List<int>> messageTransformer(
4747
return;
4848
}
4949

50-
out.add(message.asBytes(encoding: encoding));
50+
out.add(message.asBytes(encoding: typeCodecContext.encoding));
5151
},
5252
),
5353
);
5454
}
5555

56-
StreamTransformer<Uint8List, ServerMessage> _readMessages(Encoding encoding) {
56+
StreamTransformer<Uint8List, ServerMessage> _readMessages(
57+
TypeCodecContext typeCodecContext) {
5758
return StreamTransformer.fromBind((rawStream) {
5859
return Stream.multi((listener) {
59-
final framer = MessageFramer(encoding);
60+
final framer = MessageFramer(typeCodecContext);
6061

6162
var paused = false;
6263

Diff for: test/framer_test.dart

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
import 'dart:convert';
21
import 'dart:typed_data';
32

43
import 'package:buffer/buffer.dart';
54
import 'package:postgres/src/message_window.dart';
65
import 'package:postgres/src/messages/logical_replication_messages.dart';
76
import 'package:postgres/src/messages/server_messages.dart';
87
import 'package:postgres/src/messages/shared_messages.dart';
8+
import 'package:postgres/src/types/type_codec.dart';
99
import 'package:test/test.dart';
1010

1111
void main() {
1212
late MessageFramer framer;
1313
setUp(() {
14-
framer = MessageFramer(utf8);
14+
framer = MessageFramer(TypeCodecContext.withDefaults());
1515
});
1616

1717
tearDown(() {

0 commit comments

Comments
 (0)