Skip to content

Commit

Permalink
Refactor: Use holding objects instead of passing EtlState by reference (
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 6, 2023
1 parent edb0366 commit 2e5f893
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 12 deletions.
42 changes: 30 additions & 12 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Bentools\ETL\Internal\ClonableTrait;
use Bentools\ETL\Internal\EtlBuilderTrait;
use Bentools\ETL\Internal\EtlExceptionsTrait;
use Bentools\ETL\Internal\Ref;
use Bentools\ETL\Loader\InMemoryLoader;
use Bentools\ETL\Loader\LoaderInterface;
use Bentools\ETL\Transformer\NullTransformer;
Expand Down Expand Up @@ -61,45 +62,55 @@ public function __construct(
public function process(mixed $source = null, mixed $destination = null): EtlState
{
$state = new EtlState(options: $this->options, source: $source, destination: $destination);
$stateHolder = ref($state);

try {
$this->dispatch(new InitEvent($state));

foreach ($this->extract($state) as $extractedItem) {
foreach ($this->extract($stateHolder) as $extractedItem) {
try {
$transformedItems = $this->transform($extractedItem, $state);
$this->load($transformedItems, $state);
$this->load($transformedItems, $stateHolder);
} catch (SkipRequest) {
}
}
} catch (StopRequest) {
}

$output = $this->flush($state, false);
$output = $this->flush($stateHolder, false);

$state = unref($stateHolder);
if (!$state->nbTotalItems) {
$state = $state->withNbTotalItems($state->nbLoadedItems);
$stateHolder->update($state);
}

$state = $state->withOutput($output);
$stateHolder->update($state);
$this->dispatch(new EndEvent($state));

gc_collect_cycles();

return $state;
}

private function extract(EtlState &$state): Generator
/**
* @param Ref<EtlState> $stateHolder
*/
private function extract(Ref $stateHolder): Generator
{
$state = unref($stateHolder);
try {
$items = $this->extractor->extract($state);
if (is_countable($items)) {
$state = $state->withNbTotalItems(count($items));
$stateHolder->update($state);
}
$this->dispatch(new StartEvent($state));
foreach ($items as $key => $value) {
try {
$state = $state->withUpdatedItemKey($key);
$state = unref($stateHolder)->withUpdatedItemKey($key);
$stateHolder->update($state);
$event = $this->dispatch(new ExtractEvent($state, $value));
yield $event->item;
} catch (SkipRequest) {
Expand All @@ -108,7 +119,7 @@ private function extract(EtlState &$state): Generator
} catch (StopRequest) {
return;
} catch (Throwable $exception) {
$this->throwExtractException($exception, $state);
$this->throwExtractException($exception, unref($stateHolder));
}
}

Expand All @@ -132,27 +143,34 @@ private function transform(mixed $item, EtlState $state): array
}

/**
* @param list<mixed> $items
* @param list<mixed> $items
* @param Ref<EtlState> $stateHolder
*/
private function load(array $items, EtlState &$state): void
private function load(array $items, Ref $stateHolder): void
{
$state = unref($stateHolder);
try {
foreach ($items as $item) {
$this->loader->load($item, $state);
$state = $state->withIncrementedNbLoadedItems();
$stateHolder->update($state);
$this->dispatch(new LoadEvent($state, $item));
}
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
$this->throwLoadException($e, $state);
$this->throwLoadException($e, unref($stateHolder));
}

$this->flush($state, true);
$this->flush($stateHolder, true);
}

private function flush(EtlState &$state, bool $isPartial): mixed
/**
* @param Ref<EtlState> $stateHolder
*/
private function flush(Ref $stateHolder, bool $isPartial): mixed
{
$state = unref($stateHolder);
if ($isPartial && !$state->shouldFlush()) {
return null;
}
Expand All @@ -169,7 +187,7 @@ private function flush(EtlState &$state, bool $isPartial): mixed
$this->throwFlushException($e, $state);
}
$this->dispatch(new FlushEvent($state, $isPartial, $output));
$state = $state->withClearedFlush();
$stateHolder->update($state->withClearedFlush());

return $output;
}
Expand Down
49 changes: 49 additions & 0 deletions src/Internal/Ref.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Bentools\ETL\Internal;

/**
* @template T
*
* @internal
*/
final class Ref
{
/**
* @param T $value
*/
private function __construct(
public mixed $value,
) {
}

/**
* @param T $value
*
* @return self<T>
*/
public function update(mixed $value): self
{
$this->value = $value;

return $this;
}

/**
* @param T $value
*
* @return self<T>
*/
public static function create(mixed $value): self
{
static $prototype;
$prototype ??= new self(null);

$ref = clone $prototype;
$ref->value = $value;

return $ref;
}
}
32 changes: 32 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

namespace Bentools\ETL;

use Bentools\ETL\Internal\Ref;

use function array_fill_keys;
use function array_intersect_key;
use function array_replace;

/**
* @internal
*
* @param list<string> $keys
* @param array<string, mixed> $values
* @param array<string, mixed> ...$extraValues
Expand All @@ -22,3 +26,31 @@ function array_fill_from(array $keys, array $values, array ...$extraValues): arr

return array_intersect_key($values, $defaults);
}

/**
* @internal
*
* @template T
*
* @param T $value
*
* @return Ref<T>
*/
function ref(mixed $value): Ref
{
return Ref::create($value);
}

/**
* @internal
*
* @template T
*
* @param Ref<T> $ref
*
* @return T
*/
function unref(Ref $ref): mixed
{
return $ref->value;
}

0 comments on commit 2e5f893

Please sign in to comment.