@@ -28,13 +28,13 @@ def wait_for_message(
28
28
while now < timeout :
29
29
if node :
30
30
message = pubsub .get_sharded_message (
31
- ignore_subscribe_messages = ignore_subscribe_messages , target_node = node
31
+ ignore_subscribe_messages = ignore_subscribe_messages , target_node = node , timeout = 0.01
32
32
)
33
33
elif func :
34
- message = func (ignore_subscribe_messages = ignore_subscribe_messages )
34
+ message = func (ignore_subscribe_messages = ignore_subscribe_messages , timeout = 0.01 )
35
35
else :
36
36
message = pubsub .get_message (
37
- ignore_subscribe_messages = ignore_subscribe_messages
37
+ ignore_subscribe_messages = ignore_subscribe_messages , timeout = 0.01
38
38
)
39
39
if message is not None :
40
40
return message
@@ -475,6 +475,16 @@ def test_published_message_to_channel(self, r):
475
475
assert isinstance (message , dict )
476
476
assert message == make_message ("message" , "foo" , "test message" )
477
477
478
+ async def test_published_message_to_channel_with_blocking_get_message (self , r ):
479
+ pubsub = r .pubsub ()
480
+ pubsub .subscribe ("foo" )
481
+ assert wait_for_message (pubsub ) == make_message ("subscribe" , "foo" , 1 )
482
+
483
+ assert r .publish ("foo" , "test message" ) == 1
484
+ message = pubsub .get_message (ignore_subscribe_messages = True )
485
+ assert isinstance (message , dict )
486
+ assert message == make_message ("message" , "foo" , "test message" )
487
+
478
488
@pytest .mark .onlynoncluster
479
489
@skip_if_server_version_lt ("7.0.0" )
480
490
def test_published_message_to_shard_channel (self , r ):
@@ -920,7 +930,7 @@ def test_get_message_with_timeout_returns_none(self, r):
920
930
def test_get_message_not_subscribed_return_none (self , r ):
921
931
p = r .pubsub ()
922
932
assert p .subscribed is False
923
- assert p .get_message () is None
933
+ assert p .get_message (timeout = 0 ) is None
924
934
assert p .get_message (timeout = 0.1 ) is None
925
935
with patch .object (threading .Event , "wait" ) as mock :
926
936
mock .return_value = False
@@ -931,7 +941,7 @@ def test_get_message_subscribe_during_waiting(self, r):
931
941
p = r .pubsub ()
932
942
933
943
def poll (ps , expected_res ):
934
- assert ps .get_message () is None
944
+ assert ps .get_message (timeout = 0 ) is None
935
945
message = ps .get_message (timeout = 1 )
936
946
assert message == expected_res
937
947
0 commit comments