From 9a59216b394186b969b3e10590884d2fdc296ae1 Mon Sep 17 00:00:00 2001 From: Jordan Dukart Date: Fri, 16 Jul 2021 14:33:28 -0300 Subject: [PATCH 1/2] WIP. --- idc_migration.info.yml | 3 +- idc_migration.module | 2 +- idc_migration.services.yml | 6 + src/Form/MigrateSourceUiForm.php | 50 ++++ src/MigrateBatchExecutable.php | 465 ++++++++++++++++++++++++++++++ src/Plugin/migrate/source/CSV.php | 35 +++ src/Routing/RouteSubscriber.php | 23 ++ 7 files changed, 582 insertions(+), 2 deletions(-) create mode 100644 idc_migration.services.yml create mode 100644 src/Form/MigrateSourceUiForm.php create mode 100644 src/MigrateBatchExecutable.php create mode 100644 src/Plugin/migrate/source/CSV.php create mode 100644 src/Routing/RouteSubscriber.php diff --git a/idc_migration.info.yml b/idc_migration.info.yml index 54112c9..e0be9b4 100644 --- a/idc_migration.info.yml +++ b/idc_migration.info.yml @@ -3,4 +3,5 @@ type: module description: 'Custom migration plugins supporting IDC ingest' core: 8.x dependencies: - - migrate \ No newline at end of file + - drupal:migrate + - migrate_source_csv:migrate_source_csv diff --git a/idc_migration.module b/idc_migration.module index 2c4ce15..012d4af 100644 --- a/idc_migration.module +++ b/idc_migration.module @@ -2,4 +2,4 @@ /** * @file * A description of what your module does. -*/ \ No newline at end of file +*/ diff --git a/idc_migration.services.yml b/idc_migration.services.yml new file mode 100644 index 0000000..9e0f099 --- /dev/null +++ b/idc_migration.services.yml @@ -0,0 +1,6 @@ +--- +services: + idc_defaults.route_subscriber: + class: Drupal\idc_migration\Routing\RouteSubscriber + tags: + - name: event_subscriber diff --git a/src/Form/MigrateSourceUiForm.php b/src/Form/MigrateSourceUiForm.php new file mode 100644 index 0000000..0b8c8bb --- /dev/null +++ b/src/Form/MigrateSourceUiForm.php @@ -0,0 +1,50 @@ +getValue('migrations'); + /** @var \Drupal\migrate\Plugin\Migration $migration */ + $migration = $this->pluginManagerMigration->createInstance($migration_id); + + // Reset status. + $status = $migration->getStatus(); + if ($status !== MigrationInterface::STATUS_IDLE) { + $migration->setStatus(MigrationInterface::STATUS_IDLE); + $this->messenger()->addWarning($this->t('Migration @id reset to Idle', ['@id' => $migration_id])); + } + + $options = []; + + // Prepare the migration with the path injected. + $definition = $this->pluginManagerMigration->getDefinition($migration_id); + // Override the file path. + $definition['source']['path'] = $form_state->getValue('file_path'); + /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ + $migration = $this->pluginManagerMigration->createStubMigration($definition); + + // Force updates or not. + if ($form_state->getValue('update_existing_records')) { + $migration->getIdMap()->prepareUpdate(); + } + + $executable = new MigrateBatchExecutable($migration, new StubMigrationMessage(), $options); + batch_set($executable->prepareBatch()); + } + +} diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php new file mode 100644 index 0000000..a1194da --- /dev/null +++ b/src/MigrateBatchExecutable.php @@ -0,0 +1,465 @@ +id()}"; + $this->queue = \Drupal::queue($queue_name, TRUE); + + if (static::isCli()) { + // XXX: CLI Execution, most likely via drush. Let's adjust our memory + // threshold to be inline with Drush's constraint, with something of a + // fudge factor: 60% (drush's base) + 5% (our fudge factor), down from + // migrate's default of 85%. + // @see https://github.com/drush-ops/drush/blob/dbdb6733655231687d8ab68cdea6bf9fedbd0562/includes/batch.inc#L291-L298 + // @see https://git.drupalcode.org/project/drupal/-/blob/8.9.x/core/modules/migrate/src/MigrateExecutable.php#L47 + $this->memoryThreshold = 0.65; + } + } + + /** + * {@inheritdoc} + */ + public function __wakeup() { + $this->traitWakeup(); + // XXX: Re-add the listeners because the services get re-initialized. + foreach ($this->listeners as $event => $listener) { + $this->getEventDispatcher()->addListener($event, $listener); + } + } + + /** + * Prepare a batch array for execution for the given migration. + * + * @return array + * A batch array with operations and the like. + * + * @throws \Exception + * If the migration could not be enqueued successfully. + */ + public function prepareBatch() { + $result = $this->enqueue(); + if ($result === MigrationInterface::RESULT_COMPLETED) { + return [ + 'title' => $this->t('Running migration: @migration', [ + '@migration' => $this->migration->id(), + ]), + 'operations' => [ + [[$this, 'processBatch'], []], + ], + 'finished' => [$this, 'finishBatch'], + ]; + } + else { + throw new \Exception('Migration failed.'); + } + } + + /** + * Batch finished callback. + */ + public function finishBatch($success, $results, $ops, $interval) { + $this->queue->deleteQueue(); + $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message)); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + } + + /** + * Populate the target queue with the rows of the given migration. + * + * @return int + * One of the MigrationInterface::RESULT_* constants representing the state + * of queueing. + */ + protected function enqueue() { + // Only begin the import operation if the migration is currently idle. + if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { + $this->message->display($this->t('Migration @id is busy with another operation: @status', + [ + '@id' => $this->migration->id(), + // XXX: Copypasta. + // @See https://git.drupalcode.org/project/drupal/-/blob/154038f1401583a30e0ea7d9c19db02f37b10943/core/modules/migrate/src/MigrateExecutable.php#L156 + //phpcs:ignore Drupal.Semantics.FunctionT.NotLiteralString + '@status' => $this->t($this->migration->getStatusLabel()), + ]), 'error'); + return MigrationInterface::RESULT_FAILED; + } + $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message)); + + // Knock off migration if the requirements haven't been met. + try { + $this->migration->checkRequirements(); + } + catch (RequirementsException $e) { + $this->message->display( + $this->t( + 'Migration @id did not meet the requirements. @message @requirements', + [ + '@id' => $this->migration->id(), + '@message' => $e->getMessage(), + '@requirements' => $e->getRequirementsString(), + ] + ), + 'error' + ); + + return MigrationInterface::RESULT_FAILED; + } + + $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING); + $source = $this->getSource(); + + try { + $source->rewind(); + } + catch (\Exception $e) { + $this->message->display( + $this->t('Migration failed with source plugin exception: @e', ['@e' => $e]), 'error'); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + return MigrationInterface::RESULT_FAILED; + } + + // XXX: Nuke it, just in case. + $this->queue->deleteQueue(); + foreach ($source as $row) { + $this->queue->createItem([ + 'row' => $row, + 'attempts' => 0, + ]); + } + return MigrationInterface::RESULT_COMPLETED; + } + + /** + * The meat of processing a row. + * + * Perform the processing of a row and save it to the destination, if + * applicable. + * + * @param \Drupal\migrate\Row $row + * The row to be processed. + * + * @return int + * One of the MigrationInterface::STATUS_* constants. + */ + protected function processRowFromQueue(Row $row) { + $id_map = $this->getIdMap(); + $this->sourceIdValues = $row->getSourceIdValues(); + + try { + $this->processRow($row); + $save = TRUE; + } + catch (MigrateException $e) { + $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); + $this->saveMessage($e->getMessage(), $e->getLevel()); + $save = FALSE; + } + catch (MigrateSkipRowException $e) { + if ($e->getSaveToMap()) { + $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED); + } + if ($message = trim($e->getMessage())) { + $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL); + } + $save = FALSE; + } + + if ($save) { + try { + $destination = $this->migration->getDestinationPlugin(); + $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row)); + $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues); + $destination_id_values = $destination_ids ? reset($destination_ids) : []; + $destination_id_values = $destination->import($row, $destination_id_values); + $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values)); + if ($destination_id_values) { + // We do not save an idMap entry for config. + if ($destination_id_values !== TRUE) { + $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction()); + } + } + else { + $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + if (!$id_map->messageCount()) { + $message = $this->t('New object was not saved, no error provided'); + $this->saveMessage($message); + $this->message->display($message); + } + } + } + catch (MigrateException $e) { + $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); + $this->saveMessage($e->getMessage(), $e->getLevel()); + } + catch (\Exception $e) { + $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + $this->handleException($e); + } + } + + $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED; + + // Check for memory exhaustion. + if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { + return $return; + } + + // If anyone has requested we stop, return the requested result. + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + $return = $this->migration->getInterruptionResult(); + $this->migration->clearInterruptionResult(); + return $return; + } + + } + + /** + * Batch operation callback. + * + * @param array|\DrushBatchContext $context + * Batch context. + */ + public function processBatch(&$context) { + $sandbox =& $context['sandbox']; + + if (!isset($sandbox['total'])) { + $sandbox['current'] = 0; + $sandbox['total'] = $this->queue->numberOfItems(); + if ($sandbox['total'] === 0) { + $context['message'] = $this->t('Queue empty.'); + return; + } + } + + while (TRUE) { + $item = $this->queue->claimItem(); + if (!$item) { + $context['message'] = $this->t('Queue exhausted.'); + break; + } + $row = $item->data['row']; + if ($item->data['attempts']++ > 0) { + $sleep_time = 2 ** ($item->data['attempts'] - 1); + $context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [ + '@attempt' => $item->data['attempts'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@migration' => $this->migration->id(), + '@time' => $sleep_time, + ]); + sleep($sleep_time); + } + + try { + $status = $this->processRowFromQueue($row); + ++$sandbox['current']; + $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + ]); + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + $context['message'] = $this->t('Stopping "@migration" after @current of @total', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@total' => $sandbox['total'], + ]); + $context['finished'] = 1; + break; + } + elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { + // Force iteration, due to memory or time. + break; + } + } + catch (\Exception $e) { + if ($item->data['attempts'] < 3) { + // XXX: Not really making any progress, requeueing things, so don't + // increment 'current'. + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->queue->createItem($item->data); + } + else { + ++$sandbox['current']; + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + } + } + finally { + $context['finished'] = $context['finished'] ?? ($sandbox['current'] / $sandbox['total']); + $this->queue->deleteItem($item); + } + } + } + + /** + * {@inheritdoc} + */ + protected function checkStatus() { + $status = parent::checkStatus(); + + if ($status === MigrationInterface::RESULT_COMPLETED) { + if (!static::isCli() && !static::hasTime()) { + return MigrationInterface::RESULT_INCOMPLETE; + } + } + return $status; + } + + /** + * Track if we have started our timer. + * + * @var bool + */ + protected static $timerStarted = FALSE; + + /** + * The threshold after which to iterate, in millis. + * + * @var float + */ + protected static $timerThreshold = 0.0; + + /** + * Determine if we should have time for another item. + * + * @return bool + * TRUE if we should have time; otherwise, FALSE. + */ + protected static function hasTime() { + if (!static::$timerStarted) { + Timer::start(static::TIMER); + static::$timerStarted = TRUE; + // Convert seconds to millis, and allow let's cut the iteration after a + // a third of the time. + static::$timerThreshold = static::getIterationTimeThreshold() * 1000 / 3; + + // Need to allow at least one, to avoid starving. + return TRUE; + } + + return Timer::read(static::TIMER) < static::$timerThreshold; + } + + /** + * Determine an appropriate "max threshold" of time to let an iteration run. + * + * @return float + * An amount of time, in seconds. + */ + protected static function getIterationTimeThreshold() { + $max_exec = intval(ini_get('max_execution_time')); + if ($max_exec > 0) { + // max_execution_time could be 0 if run from CLI (drush?) + return min(static::MAX_TIME, $max_exec); + } + else { + return static::MAX_TIME; + } + } + + /** + * Helper; determine if we are running in a CLI context. + * + * @return bool + * TRUE if we are; otherwise, FALSE. + */ + protected static function isCli() { + return PHP_SAPI === 'cli'; + } + + /** + * Emit information on what we've done. + * + * Either since the last feedback or the beginning of this migration. + * + * @param bool $done + * TRUE if this is the last items to process. Otherwise FALSE. + */ + protected function progressMessage($done = TRUE) { + $processed = $this->getProcessedCount(); + if ($done) { + $singular_message = 'Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - done with "@name". The messages for this migration can be reviewed here.'; + $plural_message = 'Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - done with "@name". The messages for this migration can be reviewed here.'; + } + else { + $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'"; + $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'"; + } + $this->message->display(\Drupal::translation()->formatPlural($processed, + $singular_message, $plural_message, + [ + '@numitems' => $processed, + '@created' => $this->getCreatedCount(), + '@updated' => $this->getUpdatedCount(), + '@failures' => $this->getFailedCount(), + '@ignored' => $this->getIgnoredCount(), + '@name' => $this->migration->id(), + '@url' => Url::fromRoute('migrate_tools.messages', [ + 'migration' => $this->migration->id(), + 'migration_group' => $this->migration->getPluginDefinition()['migration_group'], + ])->toString(), + ] + )); + } + +} diff --git a/src/Plugin/migrate/source/CSV.php b/src/Plugin/migrate/source/CSV.php new file mode 100644 index 0000000..38d666d --- /dev/null +++ b/src/Plugin/migrate/source/CSV.php @@ -0,0 +1,35 @@ +getReader()->getHeader(); + if ($this->configuration['fields']) { + // If there is no header record, we need to flip description and name so + // the name becomes the header record. + $header = array_flip($this->fields()); + } + // XXX: Need to wrap this in an ArrayIterator as Generators are not + // serializable. + return new \ArrayIterator(iterator_to_array($this->getGenerator($this->getReader()->getRecords($header)))); + } + +} diff --git a/src/Routing/RouteSubscriber.php b/src/Routing/RouteSubscriber.php new file mode 100644 index 0000000..76f5ca3 --- /dev/null +++ b/src/Routing/RouteSubscriber.php @@ -0,0 +1,23 @@ +get('migrate_source_ui.form')) { + $route->setDefault('_form', MigrateSourceUiForm::class); + } + } + +} From 488f1841a67d8947e13260e0d5d5a678600e2625 Mon Sep 17 00:00:00 2001 From: Jordan Dukart Date: Mon, 19 Jul 2021 16:11:11 -0300 Subject: [PATCH 2/2] Remove dgi. --- src/MigrateBatchExecutable.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index a1194da..a93edb9 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -29,7 +29,7 @@ class MigrateBatchExecutable extends MigrateExecutable { } // The name of our timer. - const TIMER = 'dgi_migrate_iteration_timer'; + const TIMER = 'idc_migrate_iteration_timer'; // The max amount of time we might want to allow per iteration. const MAX_TIME = 3600.0; @@ -47,7 +47,7 @@ class MigrateBatchExecutable extends MigrateExecutable { public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { parent::__construct($migration, $message, $options); - $queue_name = "dgi_migrate__batch_queue__{$migration->id()}"; + $queue_name = "idc_migrate__batch_queue__{$migration->id()}"; $this->queue = \Drupal::queue($queue_name, TRUE); if (static::isCli()) {