diff --git a/src/MigrateBatchException.php b/src/MigrateBatchException.php new file mode 100644 index 0000000..8a34a97 --- /dev/null +++ b/src/MigrateBatchException.php @@ -0,0 +1,37 @@ +finished = $finished; + } + + /** + * Get the stored "finished" value. + * + * @return float|null + * The finished value, if provided; otherwise, NULL... which means that it + * is up to the handler to calculate. + */ + public function getFinished() { + return $this->finished; + } + +} diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index c2db47e..de268a2 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -272,85 +272,113 @@ public function processBatch(&$context) { $sandbox['total'] = $this->queue->numberOfItems(); if ($sandbox['total'] === 0) { $context['message'] = $this->t('Queue empty.'); + $context['finished'] = 1; return; } } - while (TRUE) { - $item = $this->queue->claimItem(); - if (!$item) { - $context['message'] = $this->t('Queue exhausted.'); - break; - } - $row = $item->data['row']; - if ($item->data['attempts']++ > 0) { - $sleep_time = 2 ** ($item->data['attempts'] - 1); - $context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [ - '@attempt' => $item->data['attempts'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@migration' => $this->migration->id(), - '@time' => $sleep_time, - ]); - sleep($sleep_time); - } - - try { - $status = $this->processRowFromQueue($row); - ++$sandbox['current']; - $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ - '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], - ]); - if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { - $context['message'] = $this->t('Stopping "@migration" after @current of @total', [ - '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@total' => $sandbox['total'], - ]); - $context['finished'] = 1; - break; - } - elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { - // Force iteration, due to memory or time. - continue; + try { + $context['finished'] = $sandbox['current'] / $sandbox['total']; + // call hasTime() once at the begining to start the timer and set the timer threshold. + // If we don't then it won't get called until after the first record is processed and the + // algorithm won't consider the time it spent on the first record. Large file ingests + // will hit a php max exec timeout then. + static::hasTime(); + while ($context['finished'] < 1) { + $item = $this->queue->claimItem(); + if (!$item) { + // XXX: Exceptions for flow control... maybe not the best, but works + // for now... as such, let's allow it to pass translated messages. + // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT + throw new MigrateBatchException($this->t('Queue exhausted.'), 1); } - } - catch (\Exception $e) { - if ($item->data['attempts'] < 3) { - // XXX: Not really making any progress, requeueing things, so don't - // increment 'current'. - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + $row = $item->data['row']; + if ($item->data['attempts']++ > 0) { + $sleep_time = 2 ** ($item->data['attempts'] - 1); + $context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [ + '@attempt' => $item->data['attempts'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), '@migration' => $this->migration->id(), - '@current' => $sandbox['current'], - '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], - '@ex' => $e, - '@n' => "\n", + '@time' => $sleep_time, ]); - $this->queue->createItem($item->data); + sleep($sleep_time); } - else { + + try { + $status = $this->processRowFromQueue($row); ++$sandbox['current']; - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ '@migration' => $this->migration->id(), '@current' => $sandbox['current'], '@ids' => var_export($row->getSourceIdValues(), TRUE), '@total' => $sandbox['total'], - '@ex' => $e, - '@n' => "\n", ]); - $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + // XXX: Exceptions for flow control... maybe not the best, but works + // for now... as such, let's allow it to pass translated messages. + // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT + throw new MigrateBatchException($this->t('Stopping "@migration" after @current of @total', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@total' => $sandbox['total'], + ]), 1); + } + elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { + // Force iteration, due to memory or time. + // XXX: Don't want to pass a message here, as it would _always_ be + // shown if this was run via the web interface. + throw new MigrateBatchException(); + } + } + catch (MigrateBatchException $e) { + // Rethrow to avoid the general handling below. + throw $e; + } + catch (\Exception $e) { + if ($item->data['attempts'] < 3) { + // XXX: Not really making any progress, requeueing things, so don't + // increment 'current'. + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->queue->createItem($item->data); + } + else { + ++$sandbox['current']; + $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + '@migration' => $this->migration->id(), + '@current' => $sandbox['current'], + '@ids' => var_export($row->getSourceIdValues(), TRUE), + '@total' => $sandbox['total'], + '@ex' => $e, + '@n' => "\n", + ]); + $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); + } + } + finally { + $this->queue->deleteItem($item); } + + $context['finished'] = $sandbox['current'] / $sandbox['total']; } - finally { - $context['finished'] = $context['finished'] ?? ($sandbox['current'] / $sandbox['total']); - $this->queue->deleteItem($item); + } + catch (MigrateBatchException $e) { + if ($msg = $e->getMessage()) { + $context['message'] = $msg; } + + $context['finished'] = $e->getFinished() ?? ($sandbox['current'] / $sandbox['total']); } + } + /** * {@inheritdoc} */