Skip to content

Commit

Permalink
Merge pull request #37 from gowrizrh/feature/index-large-count
Browse files Browse the repository at this point in the history
Index large amount of event records
  • Loading branch information
gowrizrh authored Jan 2, 2025
2 parents 318f75b + d69b5f0 commit cf97a7c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 11 deletions.
25 changes: 15 additions & 10 deletions Model/Indexer/AsyncEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@

namespace MageOS\AsyncEvents\Model\Indexer;

use Magento\Elasticsearch\Model\Adapter\ElasticsearchFactory;
use MageOS\AsyncEvents\Helper\Config;
use MageOS\AsyncEvents\Model\Adapter\BatchDataMapper\AsyncEventLogMapper;
use MageOS\AsyncEvents\Model\Indexer\DataProvider\AsyncEventSubscriberLogs;
use MageOS\AsyncEvents\Model\Resolver\AsyncEvent;
use ArrayIterator;
use Magento\CatalogSearch\Model\Indexer\IndexerHandlerFactory;
use Magento\Elasticsearch\Model\Adapter\ElasticsearchFactory;
use Magento\Framework\App\DeploymentConfig;
use Magento\Framework\App\ObjectManager;
use Magento\Framework\Indexer\ActionInterface as IndexerActionInterface;
use Magento\Framework\Indexer\DimensionalIndexerInterface;
use Magento\Framework\Indexer\DimensionProviderInterface;
use Magento\Framework\Indexer\IndexStructureInterface;
use Magento\Framework\Indexer\SaveHandler\IndexerInterface;
use Magento\Framework\Mview\ActionInterface as MviewActionInterface;
use MageOS\AsyncEvents\Helper\Config;
use MageOS\AsyncEvents\Model\Adapter\BatchDataMapper\AsyncEventLogMapper;
use MageOS\AsyncEvents\Model\Indexer\DataProvider\AsyncEventSubscriberLogs;
use MageOS\AsyncEvents\Model\Resolver\AsyncEvent;
use Traversable;

class AsyncEventSubscriber implements
Expand All @@ -29,7 +28,7 @@ class AsyncEventSubscriber implements
/**
* Indexer ID in configuration
*/
private const INDEXER_ID = 'asynchronous_event_subscriber_log';
public const INDEXER_ID = 'asynchronous_event_subscriber_log';

/**
* Default batch size
Expand Down Expand Up @@ -71,10 +70,10 @@ public function __construct(
private readonly IndexerHandlerFactory $indexerHandlerFactory,
private readonly AsyncEventSubscriberLogs $asyncEventSubscriberLogsDataProvider,
private readonly AsyncEvent $asyncEventScopeResolver,
private readonly IndexStructureInterface $indexStructure,
private readonly Config $config,
private readonly ElasticsearchFactory $adapterFactory,
private readonly AsyncEventLogMapper $loggerMapper,
private readonly IndexStructureFactory $indexStructureFactory,
private readonly array $data,
int $batchSize = null,
DeploymentConfig $deploymentConfig = null
Expand Down Expand Up @@ -106,21 +105,27 @@ public function executeByDimensions(array $dimensions, ?Traversable $entityIds)
'batchDocumentDataMapper' => $this->loggerMapper
]);

$indexStructure = $this->indexStructureFactory->create([
'adapter' => $adapter
]);

$saveHandler = $this->indexerHandlerFactory->create(
[
'data' => $this->data,
'adapter' => $adapter,
'scopeResolver' => $this->asyncEventScopeResolver,
'indexStructure' => $this->indexStructure
'indexStructure' => $indexStructure
]
);

if ($entityIds === null) {
$asyncEventDimension = $dimensions[AsyncEventDimensionProvider::DIMENSION_NAME]->getValue();


$saveHandler->cleanIndex($dimensions);
$saveHandler->saveIndex(
$dimensions,
$this->asyncEventSubscriberLogsDataProvider->getAsyncEventLogs($asyncEventDimension, null)
$this->asyncEventSubscriberLogsDataProvider->rebuildIndex($asyncEventDimension)
);
} else {
$asyncEventIds = iterator_to_array($entityIds);
Expand Down
48 changes: 47 additions & 1 deletion Model/Indexer/DataProvider/AsyncEventSubscriberLogs.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,32 @@

namespace MageOS\AsyncEvents\Model\Indexer\DataProvider;

use Generator;
use Magento\Framework\App\DeploymentConfig;
use Magento\Framework\App\ResourceConnection;
use Magento\Framework\DB\Adapter\AdapterInterface;
use MageOS\AsyncEvents\Model\Indexer\AsyncEventSubscriber;
use MageOS\AsyncEvents\Model\ResourceModel\AsyncEventLog\Collection;
use MageOS\AsyncEvents\Model\ResourceModel\AsyncEventLog\CollectionFactory as AsyncEventLogCollectionFactory;
use Zend_Db_Expr;

class AsyncEventSubscriberLogs
{
private AdapterInterface $connection;

private const DEPLOYMENT_CONFIG_INDEXER_BATCHES_PREFIX = 'indexer/batch_size/';

/**
* @param AsyncEventLogCollectionFactory $collectionFactory
* @param ResourceConnection $resource
* @param DeploymentConfig $deploymentConfig
*/
public function __construct(
private readonly AsyncEventLogCollectionFactory $collectionFactory
private readonly AsyncEventLogCollectionFactory $collectionFactory,
private readonly ResourceConnection $resource,
private readonly DeploymentConfig $deploymentConfig
) {
$this->connection = $resource->getConnection();
}

/**
Expand Down Expand Up @@ -42,4 +57,35 @@ public function getAsyncEventLogs(string $asyncEvent, ?array $logIds): Collectio

return $logCollection;
}

public function rebuildIndex(string $asyncEvent): Generator
{
$tableName = $this->resource->getTableName('async_event_subscriber_log');

$batchSize = $this->deploymentConfig->get(
self::DEPLOYMENT_CONFIG_INDEXER_BATCHES_PREFIX . AsyncEventSubscriber::INDEXER_ID . '/mysql_get'
) ?? 10_000;

$lastId = 0;

while (true) {
$select = $this->connection->select()
->from($tableName, ['*', new Zend_Db_Expr("'$asyncEvent' AS event_name")])
->where('log_id > ?', $lastId)
->order('log_id ASC')
->limit($batchSize);

$result = $this->connection->fetchAll($select);

if (empty($result)) {
break;
}

foreach ($result as $row) {
yield $row['log_id'] => $row;
}

$lastId = end($result)['log_id'];
}
}
}
Binary file removed docs/Webhooks.png
Binary file not shown.
Binary file removed docs/event_fan_out.png
Binary file not shown.
Binary file removed docs/failover_architecture.png
Binary file not shown.
Binary file removed docs/simple_model.png
Binary file not shown.
Binary file removed docs/webhook_fan_in.png
Binary file not shown.

0 comments on commit cf97a7c

Please sign in to comment.