Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query bus #253

Draft
wants to merge 1 commit into
base: 3.11.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 5 additions & 83 deletions baseline.xml
Original file line number Diff line number Diff line change
@@ -1,51 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="6.7.1@a2f190972555ea01b0cfcc1913924d6c5fc1a64e">
<file src="src/Attribute/AsListener.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Command/StoreMigrateCommand.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/CommandBus/SymfonyCommandBus.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/CommandBus/SymfonyParameterResolver.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/DataCollector/EventSourcingCollector.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
<code><![CDATA[getAggregates]]></code>
<code><![CDATA[getEvents]]></code>
<code><![CDATA[getMessages]]></code>
</PossiblyUnusedMethod>
</file>
<files psalm-version="6.8.8@1361cd33008feb3ae2b4a93f1860e14e538ec8c2">
<file src="src/DataCollector/MessageCollectorEventBus.php">
<ClassMustBeFinal>
<code><![CDATA[MessageCollectorEventBus]]></code>
</ClassMustBeFinal>
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
<code><![CDATA[clear]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/DependencyInjection/Configuration.php">
<UndefinedInterfaceMethod>
<code><![CDATA[scalarNode]]></code>
</UndefinedInterfaceMethod>
</file>
<file src="src/DependencyInjection/PatchlevelEventSourcingExtension.php">
<UnusedClass>
<code><![CDATA[PatchlevelEventSourcingExtension]]></code>
</UnusedClass>
<file src="src/DependencyInjection/QueryHandlerCompilerPass.php">
<ArgumentTypeCoercion>
<code><![CDATA[$subscriberClass]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/DependencyInjection/SubscriberGuardCompilePass.php">
<InvalidOperand>
Expand All @@ -55,50 +23,4 @@
<code><![CDATA[$argument]]></code>
</MixedAssignment>
</file>
<file src="src/Doctrine/DbalConnectionFactory.php">
<PossiblyUnusedMethod>
<code><![CDATA[createConnection]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/EventBus/SymfonyEventBus.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Normalizer/SymfonyUuidNormalizer.php">
<UnusedClass>
<code><![CDATA[SymfonyUuidNormalizer]]></code>
</UnusedClass>
</file>
<file src="src/PatchlevelEventSourcingBundle.php">
<UnusedClass>
<code><![CDATA[PatchlevelEventSourcingBundle]]></code>
</UnusedClass>
</file>
<file src="src/RequestListener/AutoSetupListener.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
<code><![CDATA[onKernelRequest]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/RequestListener/SubscriptionRebuildAfterFileChangeListener.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
<code><![CDATA[onKernelRequest]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Subscription/ResetServicesListener.php">
<PossiblyUnusedMethod>
<code><![CDATA[__construct]]></code>
<code><![CDATA[onWorkerRunningEvent]]></code>
</PossiblyUnusedMethod>
<UnusedParam>
<code><![CDATA[$event]]></code>
</UnusedParam>
</file>
<file src="src/Subscription/StaticInMemorySubscriptionStoreFactory.php">
<PossiblyUnusedMethod>
<code><![CDATA[create]]></code>
</PossiblyUnusedMethod>
</file>
</files>
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
],
"require": {
"php": "~8.2.0 || ~8.3.0 || ~8.4.0",
"patchlevel/event-sourcing": "^3.10.0",
"patchlevel/event-sourcing": "dev-query-bus",
"symfony/cache": "^6.4.0|^7.0.0",
"symfony/config": "^6.4.0|^7.0.0",
"symfony/console": "^6.4.1|^7.0.1",
Expand Down
15 changes: 8 additions & 7 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
parameters:
ignoreErrors:
-
message: '#^Parameter \#1 \$classString of static method Patchlevel\\EventSourcing\\QueryBus\\HandlerFinder\:\:findInClass\(\) expects class\-string, string given\.$#'
identifier: argument.type
count: 1
path: src/DependencyInjection/QueryHandlerCompilerPass.php
1 change: 1 addition & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
errorBaseline="baseline.xml"
findUnusedCode="false"
ensureOverrideAttribute="false"
>
<projectFiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use function strtolower;

/** @internal */
final class HandlerCompilerPass implements CompilerPassInterface
final class CommandHandlerCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
Expand Down
9 changes: 9 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* @psalm-type Config = array{
* event_bus: array{enabled: bool, type: string, service: string},
* command_bus: array{enabled: bool, service: string},
* query_bus: array{enabled: bool, service: string},
* subscription: array{
* store: array{type: string, service: string|null},
* retry_strategy?: array{base_delay: int, delay_factor: int, max_attempts: int},
Expand Down Expand Up @@ -280,6 +281,14 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
->end()

->arrayNode('query_bus')
->canBeEnabled()
->addDefaultsIfNotSet()
->children()
->scalarNode('service')->isRequired()->end()
->end()
->end()

->arrayNode('aggregate_handlers')
->canBeEnabled()
->addDefaultsIfNotSet()
Expand Down
23 changes: 23 additions & 0 deletions src/DependencyInjection/PatchlevelEventSourcingExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
use Patchlevel\EventSourcing\Metadata\Message\MessageHeaderRegistryFactory;
use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory;
use Patchlevel\EventSourcing\QueryBus\QueryBus;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Repository\MessageDecorator\ChainMessageDecorator;
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
Expand Down Expand Up @@ -117,6 +118,7 @@
use Patchlevel\EventSourcingBundle\DataCollector\MessageCollectorEventBus;
use Patchlevel\EventSourcingBundle\Doctrine\DbalConnectionFactory;
use Patchlevel\EventSourcingBundle\EventBus\SymfonyEventBus;
use Patchlevel\EventSourcingBundle\QueryBus\SymfonyQueryBus;
use Patchlevel\EventSourcingBundle\RequestListener\AutoSetupListener;
use Patchlevel\EventSourcingBundle\RequestListener\SubscriptionRebuildAfterFileChangeListener;
use Patchlevel\EventSourcingBundle\Subscription\ResetServicesListener;
Expand Down Expand Up @@ -168,6 +170,7 @@ public function load(array $configs, ContainerBuilder $container): void
$this->configureMessageDecorator($container);
$this->configureCommandBus($config, $container);
$this->configureEventBus($config, $container);
$this->configureQueryBus($config, $container);
$this->configureConnection($config, $container);
$this->configureStore($config, $container);
$this->configureSnapshots($config, $container);
Expand Down Expand Up @@ -324,6 +327,26 @@ static function (ChildDefinition $definition, AsListener $attribute): void {
$container->setAlias(EventBus::class, $config['event_bus']['service']);
}

/** @param Config $config */
private function configureQueryBus(array $config, ContainerBuilder $container): void
{
if (!$config['query_bus']['enabled']) {
return;
}

$container->register(SymfonyQueryBus::class)
->setArguments([
new Reference($config['query_bus']['service']),
]);

$container->setAlias(QueryBus::class, SymfonyQueryBus::class);

$container->setParameter(
'patchlevel_event_sourcing.query_handlers.bus',
$config['query_bus']['service'],
);
}

/** @param Config $config */
private function configureSubscription(array $config, ContainerBuilder $container): void
{
Expand Down
45 changes: 45 additions & 0 deletions src/DependencyInjection/QueryHandlerCompilerPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcingBundle\DependencyInjection;

use Patchlevel\EventSourcing\QueryBus\HandlerFinder;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

use function array_keys;

/** @internal */
final class QueryHandlerCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('patchlevel_event_sourcing.query_handlers.bus')) {
return;
}

$bus = $container->getParameter('patchlevel_event_sourcing.query_handlers.bus');
$subscribers = $container->findTaggedServiceIds('event_sourcing.subscriber');

foreach (array_keys($subscribers) as $subscriberServiceName) {
$subscriberDefinition = $container->getDefinition($subscriberServiceName);
$subscriberClass = $subscriberDefinition->getClass();

if ($subscriberClass === null) {
continue;
}

foreach (HandlerFinder::findInClass($subscriberClass) as $queryHandler) {
$subscriberDefinition->addTag(
'messenger.message_handler',
[
'method' => $queryHandler->method,
'handles' => $queryHandler->queryClass,
'bus' => $bus,
],
);
}
}
}
}
6 changes: 4 additions & 2 deletions src/PatchlevelEventSourcingBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

namespace Patchlevel\EventSourcingBundle;

use Patchlevel\EventSourcingBundle\DependencyInjection\HandlerCompilerPass;
use Patchlevel\EventSourcingBundle\DependencyInjection\CommandHandlerCompilerPass;
use Patchlevel\EventSourcingBundle\DependencyInjection\HandlerServiceLocatorCompilerPass;
use Patchlevel\EventSourcingBundle\DependencyInjection\QueryHandlerCompilerPass;
use Patchlevel\EventSourcingBundle\DependencyInjection\RepositoryCompilerPass;
use Patchlevel\EventSourcingBundle\DependencyInjection\SubscriberGuardCompilePass;
use Patchlevel\EventSourcingBundle\DependencyInjection\TranslatorCompilerPass;
Expand All @@ -18,7 +19,8 @@ public function build(ContainerBuilder $container): void
{
$container->addCompilerPass(new RepositoryCompilerPass());
$container->addCompilerPass(new SubscriberGuardCompilePass());
$container->addCompilerPass(new HandlerCompilerPass(), priority: 100);
$container->addCompilerPass(new CommandHandlerCompilerPass(), priority: 100);
$container->addCompilerPass(new QueryHandlerCompilerPass(), priority: 100);
$container->addCompilerPass(new HandlerServiceLocatorCompilerPass(), priority: -100);
$container->addCompilerPass(new TranslatorCompilerPass());
}
Expand Down
29 changes: 29 additions & 0 deletions src/QueryBus/SymfonyQueryBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcingBundle\QueryBus;

use Patchlevel\EventSourcing\QueryBus\QueryBus;
use RuntimeException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

final class SymfonyQueryBus implements QueryBus
{
public function __construct(
private readonly MessageBusInterface $messageBus,
) {
}

public function dispatch(object $query): mixed
{
$handledStamp = $this->messageBus->dispatch($query)->last(HandledStamp::class);

if ($handledStamp === null) {
throw new RuntimeException('No message handled yet');
}

return $handledStamp->getResult();
}
}
6 changes: 6 additions & 0 deletions tests/Fixtures/ProfileProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

namespace Patchlevel\EventSourcingBundle\Tests\Fixtures;

use Patchlevel\EventSourcing\Attribute\Answer;
use Patchlevel\EventSourcing\Attribute\Projector;

#[Projector('profile')]
class ProfileProjector
{
#[Answer]
public function query(QueryFoo $query): string
{
return $query->result;
}
}
12 changes: 12 additions & 0 deletions tests/Fixtures/QueryFoo.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcingBundle\Tests\Fixtures;

final readonly class QueryFoo
{
public function __construct(public string $result)
{
}
}
Loading
Loading