Skip to content

Commit 7b2a8d6

Browse files
Nyholmfabpot
authored andcommitted
[Messenger] Move Transports to separate packages
0 parents  commit 7b2a8d6

20 files changed

+1324
-0
lines changed

.gitattributes

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/Tests export-ignore
2+
/phpunit.xml.dist export-ignore
3+
/.gitignore export-ignore

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
vendor/
2+
composer.lock
3+
phpunit.xml

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CHANGELOG
2+
=========
3+
4+
5.1.0
5+
-----
6+
7+
* Introduced the Redis bridge.

LICENSE

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2018-2020 Fabien Potencier
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is furnished
8+
to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

README.md

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Redis Messenger
2+
===============
3+
4+
Provides Redis integration for Symfony Messenger.
5+
6+
Resources
7+
---------
8+
9+
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
10+
* [Report issues](https://github.com/symfony/symfony/issues) and
11+
[send Pull Requests](https://github.com/symfony/symfony/pulls)
12+
in the [main Symfony repository](https://github.com/symfony/symfony)

Tests/Fixtures/DummyMessage.php

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures;
4+
5+
class DummyMessage
6+
{
7+
private $message;
8+
9+
public function __construct(string $message)
10+
{
11+
$this->message = $message;
12+
}
13+
14+
public function getMessage(): string
15+
{
16+
return $this->message;
17+
}
18+
}

Tests/Transport/ConnectionTest.php

+250
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\TransportException;
16+
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
17+
18+
/**
19+
* @requires extension redis >= 4.3.0
20+
*/
21+
class ConnectionTest extends TestCase
22+
{
23+
public static function setUpBeforeClass(): void
24+
{
25+
$redis = Connection::fromDsn('redis://localhost/queue');
26+
27+
try {
28+
$redis->get();
29+
} catch (TransportException $e) {
30+
if (0 === strpos($e->getMessage(), 'ERR unknown command \'X')) {
31+
self::markTestSkipped('Redis server >= 5 is required');
32+
}
33+
34+
throw $e;
35+
}
36+
}
37+
38+
public function testFromInvalidDsn()
39+
{
40+
$this->expectException(\InvalidArgumentException::class);
41+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
42+
43+
Connection::fromDsn('redis://');
44+
}
45+
46+
public function testFromDsn()
47+
{
48+
$this->assertEquals(
49+
new Connection(['stream' => 'queue'], [
50+
'host' => 'localhost',
51+
'port' => 6379,
52+
]),
53+
Connection::fromDsn('redis://localhost/queue')
54+
);
55+
}
56+
57+
public function testFromDsnOnUnixSocket()
58+
{
59+
$this->assertEquals(
60+
new Connection(['stream' => 'queue'], [
61+
'host' => '/var/run/redis/redis.sock',
62+
'port' => 0,
63+
], [], $redis = $this->createMock(\Redis::class)),
64+
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue'], $redis)
65+
);
66+
}
67+
68+
public function testFromDsnWithOptions()
69+
{
70+
$this->assertEquals(
71+
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
72+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
73+
);
74+
}
75+
76+
public function testFromDsnWithQueryOptions()
77+
{
78+
$this->assertEquals(
79+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
80+
'host' => 'localhost',
81+
'port' => 6379,
82+
], [
83+
'serializer' => 2,
84+
]),
85+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
86+
);
87+
}
88+
89+
public function testKeepGettingPendingMessages()
90+
{
91+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
92+
93+
$redis->expects($this->exactly(3))->method('xreadgroup')
94+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
95+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
96+
97+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
98+
$this->assertNotNull($connection->get());
99+
$this->assertNotNull($connection->get());
100+
$this->assertNotNull($connection->get());
101+
}
102+
103+
public function testAuth()
104+
{
105+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
106+
107+
$redis->expects($this->exactly(1))->method('auth')
108+
->with('password')
109+
->willReturn(true);
110+
111+
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
112+
}
113+
114+
public function testFailedAuth()
115+
{
116+
$this->expectException(\InvalidArgumentException::class);
117+
$this->expectExceptionMessage('Redis connection failed');
118+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
119+
120+
$redis->expects($this->exactly(1))->method('auth')
121+
->with('password')
122+
->willReturn(false);
123+
124+
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
125+
}
126+
127+
public function testDbIndex()
128+
{
129+
$redis = new \Redis();
130+
131+
Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);
132+
133+
$this->assertSame(2, $redis->getDbNum());
134+
}
135+
136+
public function testFirstGetPendingMessagesThenNewMessages()
137+
{
138+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
139+
140+
$count = 0;
141+
142+
$redis->expects($this->exactly(2))->method('xreadgroup')
143+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
144+
++$count;
145+
146+
if (1 === $count) {
147+
return '0' === $arr_streams['queue'];
148+
}
149+
150+
return '>' === $arr_streams['queue'];
151+
}), 1, null)
152+
->willReturn(['queue' => []]);
153+
154+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
155+
$connection->get();
156+
}
157+
158+
public function testUnexpectedRedisError()
159+
{
160+
$this->expectException(TransportException::class);
161+
$this->expectExceptionMessage('Redis error happens');
162+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
163+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
164+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
165+
166+
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
167+
$connection->get();
168+
}
169+
170+
public function testGetAfterReject()
171+
{
172+
$redis = new \Redis();
173+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
174+
175+
$connection->add('1', []);
176+
$connection->add('2', []);
177+
178+
$failing = $connection->get();
179+
$connection->reject($failing['id']);
180+
181+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
182+
$this->assertNotNull($connection->get());
183+
184+
$redis->del('messenger-rejectthenget');
185+
}
186+
187+
public function testGetNonBlocking()
188+
{
189+
$redis = new \Redis();
190+
191+
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
192+
193+
$this->assertNull($connection->get()); // no message, should return null immediately
194+
$connection->add('1', []);
195+
$this->assertNotEmpty($message = $connection->get());
196+
$connection->reject($message['id']);
197+
$redis->del('messenger-getnonblocking');
198+
}
199+
200+
public function testJsonError()
201+
{
202+
$redis = new \Redis();
203+
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
204+
try {
205+
$connection->add("\xB1\x31", []);
206+
} catch (TransportException $e) {
207+
}
208+
209+
$this->assertSame('Malformed UTF-8 characters, possibly incorrectly encoded', $e->getMessage());
210+
}
211+
212+
public function testMaxEntries()
213+
{
214+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
215+
216+
$redis->expects($this->exactly(1))->method('xadd')
217+
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
218+
->willReturn(1);
219+
220+
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
221+
$connection->add('1', []);
222+
}
223+
224+
public function testLastErrorGetsCleared()
225+
{
226+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
227+
228+
$redis->expects($this->once())->method('xadd')->willReturn(0);
229+
$redis->expects($this->once())->method('xack')->willReturn(0);
230+
231+
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
232+
$redis->expects($this->exactly(2))->method('clearLastError');
233+
234+
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
235+
236+
try {
237+
$connection->add('message', []);
238+
} catch (TransportException $e) {
239+
}
240+
241+
$this->assertSame('xadd error', $e->getMessage());
242+
243+
try {
244+
$connection->ack('1');
245+
} catch (TransportException $e) {
246+
}
247+
248+
$this->assertSame('xack error', $e->getMessage());
249+
}
250+
}

0 commit comments

Comments
 (0)