Skip to content

Commit

Permalink
Merge pull request #9 from jhu-idc/batch-migration-replacement
Browse files Browse the repository at this point in the history
Update the logic to handle re-queueing in a proper fashion.
  • Loading branch information
bseeger committed Oct 14, 2021
2 parents e52a14b + a86d8b4 commit 44c3252
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 59 deletions.
37 changes: 37 additions & 0 deletions src/MigrateBatchException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Drupal\idc_migration;

/**
* Batch exception typification.
*/
class MigrateBatchException extends \Exception {

/**
* Store the "finished" value, if we want to provide it.
*
* @var float|null
*/
protected $finished;

/**
* Constructor.
*/
public function __construct($message = '', $finished = NULL, $previous = NULL) {
parent::__construct($message, NULL, $previous);

$this->finished = $finished;
}

/**
* Get the stored "finished" value.
*
* @return float|null
* The finished value, if provided; otherwise, NULL... which means that it
* is up to the handler to calculate.
*/
public function getFinished() {
return $this->finished;
}

}
146 changes: 87 additions & 59 deletions src/MigrateBatchExecutable.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,85 +272,113 @@ public function processBatch(&$context) {
$sandbox['total'] = $this->queue->numberOfItems();
if ($sandbox['total'] === 0) {
$context['message'] = $this->t('Queue empty.');
$context['finished'] = 1;
return;
}
}

while (TRUE) {
$item = $this->queue->claimItem();
if (!$item) {
$context['message'] = $this->t('Queue exhausted.');
break;
}
$row = $item->data['row'];
if ($item->data['attempts']++ > 0) {
$sleep_time = 2 ** ($item->data['attempts'] - 1);
$context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [
'@attempt' => $item->data['attempts'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@migration' => $this->migration->id(),
'@time' => $sleep_time,
]);
sleep($sleep_time);
}

try {
$status = $this->processRowFromQueue($row);
++$sandbox['current'];
$context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@total' => $sandbox['total'],
]);
if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
$context['message'] = $this->t('Stopping "@migration" after @current of @total', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@total' => $sandbox['total'],
]);
$context['finished'] = 1;
break;
}
elseif ($status === MigrationInterface::RESULT_INCOMPLETE) {
// Force iteration, due to memory or time.
continue;
try {
$context['finished'] = $sandbox['current'] / $sandbox['total'];
// call hasTime() once at the begining to start the timer and set the timer threshold.
// If we don't then it won't get called until after the first record is processed and the
// algorithm won't consider the time it spent on the first record. Large file ingests
// will hit a php max exec timeout then.
static::hasTime();
while ($context['finished'] < 1) {
$item = $this->queue->claimItem();
if (!$item) {
// XXX: Exceptions for flow control... maybe not the best, but works
// for now... as such, let's allow it to pass translated messages.
// phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT
throw new MigrateBatchException($this->t('Queue exhausted.'), 1);
}
}
catch (\Exception $e) {
if ($item->data['attempts'] < 3) {
// XXX: Not really making any progress, requeueing things, so don't
// increment 'current'.
$context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [
$row = $item->data['row'];
if ($item->data['attempts']++ > 0) {
$sleep_time = 2 ** ($item->data['attempts'] - 1);
$context['message'] = $this->t('Attempt @attempt processing row (IDs: @ids) in migration @migration; sleeping @time seconds.', [
'@attempt' => $item->data['attempts'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@total' => $sandbox['total'],
'@ex' => $e,
'@n' => "\n",
'@time' => $sleep_time,
]);
$this->queue->createItem($item->data);
sleep($sleep_time);
}
else {

try {
$status = $this->processRowFromQueue($row);
++$sandbox['current'];
$context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [
$context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@total' => $sandbox['total'],
'@ex' => $e,
'@n' => "\n",
]);
$this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
// XXX: Exceptions for flow control... maybe not the best, but works
// for now... as such, let's allow it to pass translated messages.
// phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT
throw new MigrateBatchException($this->t('Stopping "@migration" after @current of @total', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@total' => $sandbox['total'],
]), 1);
}
elseif ($status === MigrationInterface::RESULT_INCOMPLETE) {
// Force iteration, due to memory or time.
// XXX: Don't want to pass a message here, as it would _always_ be
// shown if this was run via the web interface.
throw new MigrateBatchException();
}
}
catch (MigrateBatchException $e) {
// Rethrow to avoid the general handling below.
throw $e;
}
catch (\Exception $e) {
if ($item->data['attempts'] < 3) {
// XXX: Not really making any progress, requeueing things, so don't
// increment 'current'.
$context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@total' => $sandbox['total'],
'@ex' => $e,
'@n' => "\n",
]);
$this->queue->createItem($item->data);
}
else {
++$sandbox['current'];
$context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [
'@migration' => $this->migration->id(),
'@current' => $sandbox['current'],
'@ids' => var_export($row->getSourceIdValues(), TRUE),
'@total' => $sandbox['total'],
'@ex' => $e,
'@n' => "\n",
]);
$this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
}
}
finally {
$this->queue->deleteItem($item);
}

$context['finished'] = $sandbox['current'] / $sandbox['total'];
}
finally {
$context['finished'] = $context['finished'] ?? ($sandbox['current'] / $sandbox['total']);
$this->queue->deleteItem($item);
}
catch (MigrateBatchException $e) {
if ($msg = $e->getMessage()) {
$context['message'] = $msg;
}

$context['finished'] = $e->getFinished() ?? ($sandbox['current'] / $sandbox['total']);
}

}


/**
* {@inheritdoc}
*/
Expand Down

0 comments on commit 44c3252

Please sign in to comment.