From cb0d063859eda3a32b8843ca712f0fd1598f3bf4 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 26 Feb 2025 12:39:09 +0100 Subject: [PATCH] Add query bus --- baseline.xml | 88 ++----------------- docs/pages/configuration.md | 23 +++++ phpstan-baseline.neon | 5 ++ psalm.xml | 1 + ...ass.php => CommandHandlerCompilerPass.php} | 2 +- src/DependencyInjection/Configuration.php | 9 ++ .../PatchlevelEventSourcingExtension.php | 23 +++++ .../QueryHandlerCompilerPass.php | 45 ++++++++++ src/PatchlevelEventSourcingBundle.php | 6 +- src/QueryBus/SymfonyQueryBus.php | 29 ++++++ tests/Fixtures/ProfileProjector.php | 6 ++ tests/Fixtures/QueryFoo.php | 12 +++ .../PatchlevelEventSourcingBundleTest.php | 43 +++++++++ 13 files changed, 206 insertions(+), 86 deletions(-) rename src/DependencyInjection/{HandlerCompilerPass.php => CommandHandlerCompilerPass.php} (96%) create mode 100644 src/DependencyInjection/QueryHandlerCompilerPass.php create mode 100644 src/QueryBus/SymfonyQueryBus.php create mode 100644 tests/Fixtures/QueryFoo.php diff --git a/baseline.xml b/baseline.xml index 923aacc6..fa932b30 100644 --- a/baseline.xml +++ b/baseline.xml @@ -1,51 +1,19 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - + + + + @@ -55,50 +23,4 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/docs/pages/configuration.md b/docs/pages/configuration.md index 3e8152b5..6ad0bc59 100644 --- a/docs/pages/configuration.md +++ b/docs/pages/configuration.md @@ -481,6 +481,29 @@ patchlevel_event_sourcing: You can find out more about the command bus and the aggregate handlers [here](https://event-sourcing.patchlevel.io/latest/command_bus/). +## Query Bus + +You can enable the query bus integration to use queries to retrieve data from your system. For this bundle we provide +only a symfony messenger integration, so you have to define the bus in the messenger configuration. + +```yaml +framework: + messenger: + buses: + query.bus: ~ +``` +After this, you need to define the query bus in the event sourcing configuration. +This will automatically register the handlers for the symfony messenger, so you can handle queries in your services. + +```yaml +patchlevel_event_sourcing: + query_bus: + service: query.bus +``` +!!! note + + You can find out more about the query bus [here](https://event-sourcing.patchlevel.io/latest/query_bus/). + ## Event Bus You can enable the event bus to listen for events and messages synchronously. diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 364905f7..683babf7 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,2 +1,7 @@ parameters: ignoreErrors: + - + message: '#^Parameter \#1 \$classString of static method Patchlevel\\EventSourcing\\QueryBus\\HandlerFinder\:\:findInClass\(\) expects class\-string, string given\.$#' + identifier: argument.type + count: 1 + path: src/DependencyInjection/QueryHandlerCompilerPass.php diff --git a/psalm.xml b/psalm.xml index 8485e99b..ad741409 100644 --- a/psalm.xml +++ b/psalm.xml @@ -6,6 +6,7 @@ xmlns="https://getpsalm.org/schema/config" xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd" errorBaseline="baseline.xml" + findUnusedCode="false" ensureOverrideAttribute="false" > diff --git a/src/DependencyInjection/HandlerCompilerPass.php b/src/DependencyInjection/CommandHandlerCompilerPass.php similarity index 96% rename from src/DependencyInjection/HandlerCompilerPass.php rename to src/DependencyInjection/CommandHandlerCompilerPass.php index 8741c91b..dad31be7 100644 --- a/src/DependencyInjection/HandlerCompilerPass.php +++ b/src/DependencyInjection/CommandHandlerCompilerPass.php @@ -17,7 +17,7 @@ use function strtolower; /** @internal */ -final class HandlerCompilerPass implements CompilerPassInterface +final class CommandHandlerCompilerPass implements CompilerPassInterface { public function process(ContainerBuilder $container): void { diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 04b45e7c..48c61a42 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -11,6 +11,7 @@ * @psalm-type Config = array{ * event_bus: array{enabled: bool, type: string, service: string}, * command_bus: array{enabled: bool, service: string}, + * query_bus: array{enabled: bool, service: string}, * subscription: array{ * store: array{type: string, service: string|null}, * retry_strategy?: array{base_delay: int, delay_factor: int, max_attempts: int}, @@ -297,6 +298,14 @@ public function getConfigTreeBuilder(): TreeBuilder ->end() ->end() + ->arrayNode('query_bus') + ->canBeEnabled() + ->addDefaultsIfNotSet() + ->children() + ->scalarNode('service')->isRequired()->end() + ->end() + ->end() + ->arrayNode('aggregate_handlers') ->canBeEnabled() ->addDefaultsIfNotSet() diff --git a/src/DependencyInjection/PatchlevelEventSourcingExtension.php b/src/DependencyInjection/PatchlevelEventSourcingExtension.php index d8c06426..a12887c6 100644 --- a/src/DependencyInjection/PatchlevelEventSourcingExtension.php +++ b/src/DependencyInjection/PatchlevelEventSourcingExtension.php @@ -66,6 +66,7 @@ use Patchlevel\EventSourcing\Metadata\Message\MessageHeaderRegistryFactory; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory; +use Patchlevel\EventSourcing\QueryBus\QueryBus; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Repository\MessageDecorator\ChainMessageDecorator; use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator; @@ -121,6 +122,7 @@ use Patchlevel\EventSourcingBundle\DataCollector\MessageCollectorEventBus; use Patchlevel\EventSourcingBundle\Doctrine\DbalConnectionFactory; use Patchlevel\EventSourcingBundle\EventBus\SymfonyEventBus; +use Patchlevel\EventSourcingBundle\QueryBus\SymfonyQueryBus; use Patchlevel\EventSourcingBundle\RequestListener\AutoSetupListener; use Patchlevel\EventSourcingBundle\RequestListener\SubscriptionRebuildAfterFileChangeListener; use Patchlevel\EventSourcingBundle\Subscription\ResetServicesListener; @@ -172,6 +174,7 @@ public function load(array $configs, ContainerBuilder $container): void $this->configureMessageDecorator($container); $this->configureCommandBus($config, $container); $this->configureEventBus($config, $container); + $this->configureQueryBus($config, $container); $this->configureConnection($config, $container); $this->configureStore($config, $container); $this->configureSnapshots($config, $container); @@ -353,6 +356,26 @@ private function configureMessageLoader(array $config, ContainerBuilder $contain $container->setAlias(MessageLoader::class, GapResolverStoreMessageLoader::class); } + /** @param Config $config */ + private function configureQueryBus(array $config, ContainerBuilder $container): void + { + if (!$config['query_bus']['enabled']) { + return; + } + + $container->register(SymfonyQueryBus::class) + ->setArguments([ + new Reference($config['query_bus']['service']), + ]); + + $container->setAlias(QueryBus::class, SymfonyQueryBus::class); + + $container->setParameter( + 'patchlevel_event_sourcing.query_handlers.bus', + $config['query_bus']['service'], + ); + } + /** @param Config $config */ private function configureSubscription(array $config, ContainerBuilder $container): void { diff --git a/src/DependencyInjection/QueryHandlerCompilerPass.php b/src/DependencyInjection/QueryHandlerCompilerPass.php new file mode 100644 index 00000000..c2d069ab --- /dev/null +++ b/src/DependencyInjection/QueryHandlerCompilerPass.php @@ -0,0 +1,45 @@ +hasParameter('patchlevel_event_sourcing.query_handlers.bus')) { + return; + } + + $bus = $container->getParameter('patchlevel_event_sourcing.query_handlers.bus'); + $subscribers = $container->findTaggedServiceIds('event_sourcing.subscriber'); + + foreach (array_keys($subscribers) as $subscriberServiceName) { + $subscriberDefinition = $container->getDefinition($subscriberServiceName); + $subscriberClass = $subscriberDefinition->getClass(); + + if ($subscriberClass === null) { + continue; + } + + foreach (HandlerFinder::findInClass($subscriberClass) as $queryHandler) { + $subscriberDefinition->addTag( + 'messenger.message_handler', + [ + 'method' => $queryHandler->method, + 'handles' => $queryHandler->queryClass, + 'bus' => $bus, + ], + ); + } + } + } +} diff --git a/src/PatchlevelEventSourcingBundle.php b/src/PatchlevelEventSourcingBundle.php index 335eb79a..81c73768 100644 --- a/src/PatchlevelEventSourcingBundle.php +++ b/src/PatchlevelEventSourcingBundle.php @@ -4,8 +4,9 @@ namespace Patchlevel\EventSourcingBundle; -use Patchlevel\EventSourcingBundle\DependencyInjection\HandlerCompilerPass; +use Patchlevel\EventSourcingBundle\DependencyInjection\CommandHandlerCompilerPass; use Patchlevel\EventSourcingBundle\DependencyInjection\HandlerServiceLocatorCompilerPass; +use Patchlevel\EventSourcingBundle\DependencyInjection\QueryHandlerCompilerPass; use Patchlevel\EventSourcingBundle\DependencyInjection\RepositoryCompilerPass; use Patchlevel\EventSourcingBundle\DependencyInjection\SubscriberGuardCompilePass; use Patchlevel\EventSourcingBundle\DependencyInjection\TranslatorCompilerPass; @@ -18,7 +19,8 @@ public function build(ContainerBuilder $container): void { $container->addCompilerPass(new RepositoryCompilerPass()); $container->addCompilerPass(new SubscriberGuardCompilePass()); - $container->addCompilerPass(new HandlerCompilerPass(), priority: 100); + $container->addCompilerPass(new CommandHandlerCompilerPass(), priority: 100); + $container->addCompilerPass(new QueryHandlerCompilerPass(), priority: 100); $container->addCompilerPass(new HandlerServiceLocatorCompilerPass(), priority: -100); $container->addCompilerPass(new TranslatorCompilerPass()); } diff --git a/src/QueryBus/SymfonyQueryBus.php b/src/QueryBus/SymfonyQueryBus.php new file mode 100644 index 00000000..63b5db0a --- /dev/null +++ b/src/QueryBus/SymfonyQueryBus.php @@ -0,0 +1,29 @@ +messageBus->dispatch($query)->last(HandledStamp::class); + + if ($handledStamp === null) { + throw new RuntimeException('No message handled yet'); + } + + return $handledStamp->getResult(); + } +} diff --git a/tests/Fixtures/ProfileProjector.php b/tests/Fixtures/ProfileProjector.php index 7d4debd2..02a66872 100644 --- a/tests/Fixtures/ProfileProjector.php +++ b/tests/Fixtures/ProfileProjector.php @@ -2,9 +2,15 @@ namespace Patchlevel\EventSourcingBundle\Tests\Fixtures; +use Patchlevel\EventSourcing\Attribute\Answer; use Patchlevel\EventSourcing\Attribute\Projector; #[Projector('profile')] class ProfileProjector { + #[Answer] + public function query(QueryFoo $query): string + { + return $query->result; + } } diff --git a/tests/Fixtures/QueryFoo.php b/tests/Fixtures/QueryFoo.php new file mode 100644 index 00000000..bdb5f97e --- /dev/null +++ b/tests/Fixtures/QueryFoo.php @@ -0,0 +1,12 @@ +get(CommandBus::class)); } + public function testQueryBus(): void + { + $container = new ContainerBuilder(); + + $container->setDefinition(ProfileProjector::class, new Definition(ProfileProjector::class)) + ->setAutoconfigured(true); + + $this->compileContainer( + $container, + [ + 'patchlevel_event_sourcing' => [ + 'connection' => [ + 'service' => 'doctrine.dbal.eventstore_connection', + ], + 'aggregates' => [__DIR__ . '/../Fixtures'], + 'query_bus' => [ + 'service' => 'query.bus', + ], + ], + ] + ); + + $definition = $container->getDefinition(ProfileProjector::class); + $tags = $definition->getTag('messenger.message_handler'); + + self::assertCount(1, $tags); + + $tag = $tags[0]; + + self::assertEquals('query', $tag['method']); + self::assertEquals(QueryFoo::class, $tag['handles']); + self::assertEquals('query.bus', $tag['bus']); + self::assertInstanceOf(SymfonyQueryBus::class, $container->get(QueryBus::class)); + + $handler = $container->get(ProfileProjector::class); + + self::assertEquals('foo', $handler->{$tag['method']}(new QueryFoo('foo'))); + } + public function testMessageLoader(): void { $container = new ContainerBuilder(); @@ -1478,6 +1520,7 @@ private function compileContainer(ContainerBuilder $container, array $config): v $container->set('doctrine.dbal.eventstore_connection', $this->prophesize(Connection::class)->reveal()); $container->set('event.bus', $this->prophesize(MessageBusInterface::class)->reveal()); $container->set('command.bus', $this->prophesize(MessageBusInterface::class)->reveal()); + $container->set('query.bus', $this->prophesize(MessageBusInterface::class)->reveal()); $container->set('cache.default', $this->prophesize(CacheItemPoolInterface::class)->reveal()); $container->set('event_dispatcher', $this->prophesize(EventDispatcherInterface::class)->reveal()); $container->set('services_resetter', $this->prophesize(ServicesResetter::class)->reveal());