Skip to content

Commit b0c3b34

Browse files
committed
Decode the "query" column on Python side
We no longer convert the "query" column on Postgres side. Instead, we let the database adapter load TEXT values as bytes, and finally decode those when loading each row by using the database encoding. Adapter configuration is done through the new 'text_as_bytes' flag, passed to fetch*() functions, triggering the registration of a bytes loader. Row decoding is done in BaseProcess.from_bytes() class method. Decoding is made "robust" (not crashing) by simply passing errors="replace" to bytes.decode(). Test test_data.py::test_encoding() remains valid for regression. On the hand, test_InvalidTextRepresentation() is no longer needed as we don't use convert_from() anymore.
1 parent 050f8d4 commit b0c3b34

16 files changed

+74
-29
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
### Fixed
1616

17+
* Rework decoding of the `query` column to (hopefully) make it more robust
18+
(see #149 for the original report, #302 for a new problem raised while fixing
19+
the previous one and #332 for the latest update).
1720
* Fix a few typos in the man page.
1821

1922
### Misc.

pgactivity/data.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,8 @@ def pg_get_activities(self, duration_mode: int = 1) -> List[RunningProcess]:
412412
"min_duration": self.min_duration,
413413
"dbname_filter": self.filters.dbname,
414414
},
415-
mkrow=RunningProcess,
415+
mkrow=RunningProcess.from_bytes,
416+
text_as_bytes=True,
416417
)
417418

418419
def pg_get_waiting(self, duration_mode: int = 1) -> List[WaitingProcess]:
@@ -438,7 +439,8 @@ def pg_get_waiting(self, duration_mode: int = 1) -> List[WaitingProcess]:
438439
"min_duration": self.min_duration,
439440
"dbname_filter": self.filters.dbname,
440441
},
441-
mkrow=WaitingProcess,
442+
mkrow=WaitingProcess.from_bytes,
443+
text_as_bytes=True,
442444
)
443445

444446
def pg_get_blocking(self, duration_mode: int = 1) -> List[BlockingProcess]:
@@ -466,7 +468,8 @@ def pg_get_blocking(self, duration_mode: int = 1) -> List[BlockingProcess]:
466468
"min_duration": self.min_duration,
467469
"dbname_filter": self.filters.dbname,
468470
},
469-
mkrow=BlockingProcess,
471+
mkrow=BlockingProcess.from_bytes,
472+
text_as_bytes=True,
470473
)
471474

472475
def pg_is_local(self) -> bool:

pgactivity/pg.py

+37-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import psycopg
2121
from psycopg import sql as sql
22+
from psycopg.adapt import Buffer, Loader
2223
from psycopg.rows import dict_row
2324
from psycopg.errors import (
2425
FeatureNotSupported as FeatureNotSupported,
@@ -34,6 +35,12 @@
3435

3536
Connection = psycopg.Connection[Dict[str, Any]]
3637

38+
class BytesLoader(Loader):
39+
def load(self, data: Buffer) -> bytes:
40+
if isinstance(data, memoryview):
41+
return bytes(data)
42+
return data
43+
3744
def connect(*args: Any, **kwargs: Any) -> Connection:
3845
return psycopg.connect(*args, autocommit=True, row_factory=dict_row, **kwargs)
3946

@@ -51,19 +58,27 @@ def execute(
5158
conn.execute(query, args, prepare=True)
5259

5360
@overload
54-
def cursor(conn: Connection, mkrow: Callable[..., Row]) -> psycopg.Cursor[Row]:
61+
def cursor(
62+
conn: Connection, mkrow: Callable[..., Row], text_as_bytes: bool
63+
) -> psycopg.Cursor[Row]:
5564
...
5665

5766
@overload
58-
def cursor(conn: Connection, mkrow: None) -> psycopg.Cursor[psycopg.rows.DictRow]:
67+
def cursor(
68+
conn: Connection, mkrow: None, text_as_bytes: bool
69+
) -> psycopg.Cursor[psycopg.rows.DictRow]:
5970
...
6071

6172
def cursor(
62-
conn: Connection, mkrow: Optional[Callable[..., Row]]
73+
conn: Connection, mkrow: Optional[Callable[..., Row]], text_as_bytes: bool
6374
) -> Union[psycopg.Cursor[psycopg.rows.DictRow], psycopg.Cursor[Row]]:
6475
if mkrow is not None:
65-
return conn.cursor(row_factory=psycopg.rows.kwargs_row(mkrow))
66-
return conn.cursor()
76+
cur = conn.cursor(row_factory=psycopg.rows.kwargs_row(mkrow))
77+
else:
78+
cur = conn.cursor() # type: ignore[assignment]
79+
if text_as_bytes:
80+
cur.adapters.register_loader("text", BytesLoader)
81+
return cur
6782

6883
@overload
6984
def fetchone(
@@ -72,6 +87,7 @@ def fetchone(
7287
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
7388
*,
7489
mkrow: Callable[..., Row],
90+
text_as_bytes: bool = False,
7591
) -> Row:
7692
...
7793

@@ -80,6 +96,8 @@ def fetchone(
8096
conn: Connection,
8197
query: Union[str, sql.Composed],
8298
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
99+
*,
100+
text_as_bytes: bool = False,
83101
) -> Dict[str, Any]:
84102
...
85103

@@ -89,8 +107,9 @@ def fetchone(
89107
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
90108
*,
91109
mkrow: Optional[Callable[..., Row]] = None,
110+
text_as_bytes: bool = False,
92111
) -> Union[Dict[str, Any], Row]:
93-
with cursor(conn, mkrow) as cur:
112+
with cursor(conn, mkrow, text_as_bytes) as cur:
94113
row = cur.execute(query, args, prepare=True).fetchone()
95114
assert row is not None
96115
return row
@@ -102,6 +121,7 @@ def fetchall(
102121
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
103122
*,
104123
mkrow: Callable[..., Row],
124+
text_as_bytes: bool = False,
105125
) -> List[Row]:
106126
...
107127

@@ -110,6 +130,8 @@ def fetchall(
110130
conn: Connection,
111131
query: Union[str, sql.Composed],
112132
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
133+
*,
134+
text_as_bytes: bool = False,
113135
) -> List[Dict[str, Any]]:
114136
...
115137

@@ -118,13 +140,15 @@ def fetchall(
118140
query: Union[str, sql.Composed],
119141
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
120142
*,
143+
text_as_bytes: bool = False,
121144
mkrow: Optional[Callable[..., Row]] = None,
122145
) -> Union[List[Dict[str, Any]], List[Row]]:
123-
with cursor(conn, mkrow) as cur:
146+
with cursor(conn, mkrow, text_as_bytes) as cur:
124147
return cur.execute(query, args, prepare=True).fetchall()
125148

126149
except ImportError:
127150
import psycopg2
151+
import psycopg2.extensions
128152
from psycopg2.extras import DictCursor
129153
from psycopg2 import sql as sql # type: ignore[no-redef]
130154
from psycopg2.errors import ( # type: ignore[no-redef]
@@ -170,8 +194,11 @@ def fetchone( # type: ignore[no-redef]
170194
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
171195
*,
172196
mkrow: Optional[Callable[..., Row]] = None,
197+
text_as_bytes: bool = False,
173198
) -> Union[Dict[str, Any], Row]:
174199
with conn.cursor() as cur:
200+
if text_as_bytes:
201+
psycopg2.extensions.register_type(psycopg2.extensions.BYTES, cur)
175202
cur.execute(query, args)
176203
row = cur.fetchone()
177204
assert row is not None
@@ -185,8 +212,11 @@ def fetchall( # type: ignore[no-redef]
185212
args: Union[None, Sequence[Any], Dict[str, Any]] = None,
186213
*,
187214
mkrow: Optional[Callable[..., Row]] = None,
215+
text_as_bytes: bool = False,
188216
) -> Union[List[Dict[str, Any]], List[Row]]:
189217
with conn.cursor() as cur:
218+
if text_as_bytes:
219+
psycopg2.extensions.register_type(psycopg2.extensions.BYTES, cur)
190220
cur.execute(query, args)
191221
rows = cur.fetchall()
192222
if mkrow is not None:

pgactivity/queries/get_blocking_oldest.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ SELECT
1717
END AS state,
1818
CASE WHEN sq.query LIKE '<IDLE>%%'
1919
THEN NULL
20-
ELSE convert_from(replace(sq.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8'))
20+
ELSE sq.query
2121
END AS query,
2222
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
2323
waiting AS wait

pgactivity/queries/get_blocking_post_090200.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SELECT
1313
locktype AS type,
1414
duration,
1515
state,
16-
convert_from(replace(sq.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
16+
sq.query AS query,
1717
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1818
waiting as wait
1919
FROM

pgactivity/queries/get_blocking_post_090600.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ SELECT
1111
locktype AS type,
1212
duration,
1313
state,
14-
convert_from(replace(sq.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
14+
sq.query AS query,
1515
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1616
wait_event as wait
1717
FROM

pgactivity/queries/get_pg_activity_oldest.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ SELECT
1818
END AS state,
1919
CASE
2020
WHEN a.current_query LIKE '<IDLE>%%' THEN NULL
21-
ELSE convert_from(replace(a.current_query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8'))
21+
ELSE a.current_query
2222
END AS query,
2323
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
2424
NULL AS query_leader_pid,

pgactivity/queries/get_pg_activity_post_090200.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SELECT
1313
a.waiting AS wait,
1414
a.usename AS user,
1515
a.state AS state,
16-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
16+
a.query AS query,
1717
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1818
NULL AS query_leader_pid,
1919
false AS is_parallel_worker

pgactivity/queries/get_pg_activity_post_090600.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SELECT
1313
a.wait_event AS wait,
1414
a.usename AS user,
1515
a.state AS state,
16-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
16+
a.query AS query,
1717
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1818
NULL AS query_leader_pid,
1919
false AS is_parallel_worker

pgactivity/queries/get_pg_activity_post_100000.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SELECT
1313
a.wait_event as wait,
1414
a.usename AS user,
1515
a.state AS state,
16-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
16+
a.query AS query,
1717
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1818
NULL AS query_leader_pid,
1919
( a.backend_type = 'background worker'

pgactivity/queries/get_pg_activity_post_110000.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ SELECT
1212
a.wait_event AS wait,
1313
a.usename AS user,
1414
a.state AS state,
15-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
15+
a.query AS query,
1616
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1717
NULL AS query_leader_pid,
1818
a.backend_type = 'parallel worker' AS is_parallel_worker

pgactivity/queries/get_pg_activity_post_130000.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ SELECT
1212
a.wait_event AS wait,
1313
a.usename AS user,
1414
a.state AS state,
15-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
15+
a.query AS query,
1616
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding,
1717
coalesce(a.leader_pid, a.pid) AS query_leader_pid,
1818
a.backend_type = 'parallel worker' AS is_parallel_worker

pgactivity/queries/get_waiting_oldest.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ SELECT
2020
END AS state,
2121
CASE WHEN a.current_query LIKE '<IDLE>%%'
2222
THEN NULL
23-
ELSE convert_from(replace(a.current_query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8'))
23+
ELSE a.current_query
2424
END AS query,
2525
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding
2626
FROM

pgactivity/queries/get_waiting_post_090200.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ SELECT
1616
pg_locks.relation::regclass AS relation,
1717
EXTRACT(epoch FROM (NOW() - a.{duration_column})) AS duration,
1818
a.state as state,
19-
convert_from(replace(a.query, '\', '\\')::bytea, coalesce(pg_catalog.pg_encoding_to_char(b.encoding), 'UTF8')) AS query,
19+
a.query AS query,
2020
pg_catalog.pg_encoding_to_char(b.encoding) AS encoding
2121
FROM
2222
pg_catalog.pg_locks

pgactivity/types.py

+16
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
overload,
1515
Sequence,
1616
Tuple,
17+
Type,
1718
TypeVar,
1819
Union,
1920
)
@@ -911,6 +912,21 @@ class BaseProcess:
911912
query_leader_pid: Optional[int]
912913
is_parallel_worker: bool
913914

915+
_P = TypeVar("_P", bound="BaseProcess")
916+
917+
@classmethod
918+
def from_bytes(
919+
cls: Type[_P], *, encoding: Optional[Union[str, bytes]], **kwargs: Any
920+
) -> _P:
921+
if encoding is None:
922+
encoding = "utf-8"
923+
elif isinstance(encoding, bytes): # psycopg2
924+
encoding = encoding.decode()
925+
for name, value in kwargs.items():
926+
if isinstance(value, bytes):
927+
kwargs[name] = value.decode(encoding, errors="replace")
928+
return cls(encoding=encoding, **kwargs)
929+
914930

915931
@attr.s(auto_attribs=True, frozen=True, slots=True)
916932
class RunningProcess(BaseProcess):

tests/test_data.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def test_terminate_backend(postgresql, data):
107107

108108

109109
def test_encoding(postgresql, data, execute):
110-
"""Test for issue #149"""
110+
"""Test for issue #149, #332."""
111111
conninfo = postgresql.info.dsn
112112
conn = psycopg.connect(conninfo)
113113
conn.autocommit = True
@@ -137,13 +137,6 @@ def test_encoding(postgresql, data, execute):
137137
assert "blocking éléphant" in blocking.query
138138

139139

140-
def test_InvalidTextRepresentation(postgresql, data, execute):
141-
"""Test for issue #275"""
142-
postgresql.execute("select '123' ~ '\\d+', 'Hello world!\n', pg_sleep(3)")
143-
running = data.pg_get_activities()
144-
assert "123" in running[0].query
145-
146-
147140
def test_filters_dbname(data, execute):
148141
data_filtered = attr.evolve(data, filters=types.Filters(dbname="temp"))
149142
execute("SELECT pg_sleep(2)", dbname="template1", autocommit=True)

0 commit comments

Comments
 (0)