From d7dc600cccc9b782e4f5899099cf1a975e970d8a Mon Sep 17 00:00:00 2001 From: Jordan Dukart Date: Thu, 22 Jul 2021 18:42:09 -0300 Subject: [PATCH 1/2] Update the logic to handle re-queueing in a proper fashion. --- src/MigrateBatchException.php | 37 +++++++++ src/MigrateBatchExecutable.php | 141 +++++++++++++++++++-------------- 2 files changed, 119 insertions(+), 59 deletions(-) create mode 100644 src/MigrateBatchException.php diff --git a/src/MigrateBatchException.php b/src/MigrateBatchException.php new file mode 100644 index 0000000..8a34a97 --- /dev/null +++ b/src/MigrateBatchException.php @@ -0,0 +1,37 @@ +finished = $finished; + } + + /** + * Get the stored "finished" value. + * + * @return float|null + * The finished value, if provided; otherwise, NULL... which means that it + * is up to the handler to calculate. + */ + public function getFinished() { + return $this->finished; + } + +} diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index a93edb9..2f82639 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -272,85 +272,108 @@ public function processBatch(&$context) { $sandbox['total'] = $this->queue->numberOfItems(); if ($sandbox['total'] === 0) { $context['message'] = $this->t('Queue empty.'); + $context['finished'] = 1; return; } } - while (TRUE) { - $item = $this->queue->claimItem(); - if (!$item) { - $context['message'] = $this->t('Queue exhausted.'); - break; - } - $row = $item->data['row']; - if ($item->data['attempts']++ > 0) { - $sleep_time = 2 ** ($item->data['attempts'] - 1); - $context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [ - '@attempt' => $item->data['attempts'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@migration' => $this->migration->id(), - '@time' => $sleep_time, - ]); - sleep($sleep_time); - } - - try { - $status = $this->processRowFromQueue($row); - ++$sandbox['current']; - $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ - '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], - ]); - if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { - $context['message'] = $this->t('Stopping "@migration" after @current of @total', [ - '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@total' => $sandbox['total'], - ]); - $context['finished'] = 1; - break; - } - elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { - // Force iteration, due to memory or time. - break; + try { + $context['finished'] = $sandbox['current'] / $sandbox['total']; + while ($context['finished'] < 1) { + $item = $this->queue->claimItem(); + if (!$item) { + // XXX: Exceptions for flow control... maybe not the best, but works + // for now... as such, let's allow it to pass translated messages. + // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT + throw new MigrateBatchException($this->t('Queue exhausted.'), 1); } - } - catch (\Exception $e) { - if ($item->data['attempts'] < 3) { - // XXX: Not really making any progress, requeueing things, so don't - // increment 'current'. - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + $row = $item->data['row']; + if ($item->data['attempts']++ > 0) { + $sleep_time = 2 ** ($item->data['attempts'] - 1); + $context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [ + '@attempt' => $item->data['attempts'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], - '@ex' => $e, - '@n' => "\n", + '@time' => $sleep_time, ]); - $this->queue->createItem($item->data); + sleep($sleep_time); } - else { + + try { + $status = $this->processRowFromQueue($row); ++$sandbox['current']; - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ '@migration' => $this->migration->id(), '@current' => $sandbox['current'], '@ids' => var_export($row->getSourceIdValues(), TRUE), '@total' => $sandbox['total'], - '@ex' => $e, - '@n' => "\n", ]); - $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + // XXX: Exceptions for flow control... maybe not the best, but works + // for now... as such, let's allow it to pass translated messages. + // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT + throw new MigrateBatchException($this->t('Stopping "@migration" after @current of @total', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@total' => $sandbox['total'], + ]), 1); + } + elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { + // Force iteration, due to memory or time. + // XXX: Don't want to pass a message here, as it would _always_ be + // shown if this was run via the web interface. + throw new MigrateBatchException(); + } + } + catch (MigrateBatchException $e) { + // Rethrow to avoid the general handling below. + throw $e; + } + catch (\Exception $e) { + if ($item->data['attempts'] < 3) { + // XXX: Not really making any progress, requeueing things, so don't + // increment 'current'. + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->queue->createItem($item->data); + } + else { + ++$sandbox['current']; + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + } + } + finally { + $this->queue->deleteItem($item); } + + $context['finished'] = $sandbox['current'] / $sandbox['total']; } - finally { - $context['finished'] = $context['finished'] ?? ($sandbox['current'] / $sandbox['total']); - $this->queue->deleteItem($item); + } + catch (MigrateBatchException $e) { + if ($msg = $e->getMessage()) { + $context['message'] = $msg; } + + $context['finished'] = $e->getFinished() ?? ($sandbox['current'] / $sandbox['total']); } + } + /** * {@inheritdoc} */ From a86d8b441a866e8d385e58930945f2dc474b5ddd Mon Sep 17 00:00:00 2001 From: Bethany Seeger Date: Thu, 14 Oct 2021 15:52:36 -0400 Subject: [PATCH 2/2] Fixes a bug where the timer was not started soon enough. --- src/MigrateBatchExecutable.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index 2f82639..de268a2 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -279,6 +279,11 @@ public function processBatch(&$context) { try { $context['finished'] = $sandbox['current'] / $sandbox['total']; + // call hasTime() once at the begining to start the timer and set the timer threshold. + // If we don't then it won't get called until after the first record is processed and the + // algorithm won't consider the time it spent on the first record. Large file ingests + // will hit a php max exec timeout then. + static::hasTime(); while ($context['finished'] < 1) { $item = $this->queue->claimItem(); if (!$item) {