vendor/shopware/platform/src/Core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 69

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  5. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  6. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  7. use Symfony\Component\Messenger\MessageBusInterface;
  8. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  9. {
  10.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  11.     public const DISABLE_INDEXING 'disable-indexing';
  12.     /**
  13.      * @var EntityIndexer[]
  14.      */
  15.     private $indexer;
  16.     /**
  17.      * @var MessageBusInterface
  18.      */
  19.     private $messageBus;
  20.     /**
  21.      * @var bool
  22.      */
  23.     private $working false;
  24.     public function __construct(iterable $indexerMessageBusInterface $messageBus)
  25.     {
  26.         $this->indexer $indexer;
  27.         $this->messageBus $messageBus;
  28.     }
  29.     public static function getSubscribedEvents(): array
  30.     {
  31.         return [
  32.             EntityWrittenContainerEvent::class => [
  33.                 ['refresh'1000],
  34.             ],
  35.         ];
  36.     }
  37.     public static function getHandledMessages(): iterable
  38.     {
  39.         return [
  40.             EntityIndexingMessage::class,
  41.             IterateEntityIndexerMessage::class,
  42.         ];
  43.     }
  44.     public function index(bool $useQueue): void
  45.     {
  46.         foreach ($this->indexer as $indexer) {
  47.             $offset null;
  48.             while ($message $indexer->iterate($offset)) {
  49.                 $message->setIndexer($indexer->getName());
  50.                 $this->sendOrHandle($message$useQueue);
  51.                 $offset $message->getOffset();
  52.             }
  53.         }
  54.     }
  55.     public function refresh(EntityWrittenContainerEvent $event): void
  56.     {
  57.         if ($this->working) {
  58.             return;
  59.         }
  60.         $this->working true;
  61.         if ($event->getContext()->hasExtension(self::DISABLE_INDEXING)) {
  62.             $this->working false;
  63.             return;
  64.         }
  65.         $useQueue $event->getContext()->hasExtension(self::USE_INDEXING_QUEUE);
  66.         foreach ($this->indexer as $indexer) {
  67.             $message $indexer->update($event);
  68.             if (!$message) {
  69.                 continue;
  70.             }
  71.             $message->setIndexer($indexer->getName());
  72.             $this->sendOrHandle($message$useQueue);
  73.         }
  74.         $this->working false;
  75.     }
  76.     public function handle($message): void
  77.     {
  78.         if ($message instanceof EntityIndexingMessage) {
  79.             $indexer $this->getIndexer($message->getIndexer());
  80.             if ($indexer) {
  81.                 $indexer->handle($message);
  82.             }
  83.             return;
  84.         }
  85.         if ($message instanceof IterateEntityIndexerMessage) {
  86.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true);
  87.             if (!$next) {
  88.                 return;
  89.             }
  90.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset()));
  91.             return;
  92.         }
  93.     }
  94.     public function sendIndexingMessage(array $indexer = []): void
  95.     {
  96.         if (empty($indexer)) {
  97.             $indexer = [];
  98.             foreach ($this->indexer as $loop) {
  99.                 $indexer[] = $loop->getName();
  100.             }
  101.         }
  102.         if (empty($indexer)) {
  103.             return;
  104.         }
  105.         foreach ($indexer as $name) {
  106.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull));
  107.         }
  108.     }
  109.     public function has(string $name): bool
  110.     {
  111.         return $this->getIndexer($name) !== null;
  112.     }
  113.     public function getIndexer(string $name): ?EntityIndexer
  114.     {
  115.         foreach ($this->indexer as $indexer) {
  116.             if ($indexer->getName() === $name) {
  117.                 return $indexer;
  118.             }
  119.         }
  120.         return null;
  121.     }
  122.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  123.     {
  124.         if ($useQueue || $message->forceQueue()) {
  125.             $this->messageBus->dispatch($message);
  126.             return;
  127.         }
  128.         $this->handle($message);
  129.     }
  130.     private function iterateIndexer(string $name$offsetbool $useQueue): ?EntityIndexingMessage
  131.     {
  132.         $indexer $this->getIndexer($name);
  133.         if (!$indexer instanceof EntityIndexer) {
  134.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  135.         }
  136.         $message $indexer->iterate($offset);
  137.         if (!$message) {
  138.             return null;
  139.         }
  140.         $message->setIndexer($indexer->getName());
  141.         $this->sendOrHandle($message$useQueue);
  142.         return $message;
  143.     }
  144. }