Skip to content

Commit

Permalink
Ref #31 Add bulk query string parameters support
Browse files Browse the repository at this point in the history
  • Loading branch information
damienalexandre committed Oct 20, 2020
1 parent b794b99 commit c1c6278
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- New `setBulkRequestParams` on the Indexer allowing all the Bulk query params.
- This changelog file.

## [1.0.2] - 2020-07-31
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ $dto->foo = 'Hops from Alsace, France';
$indexer->scheduleIndex('beers', new Document('123', $dto));
$indexer->flush();

// Set parameters on the Bulk
$indexer->setBulkRequestParams([
'pipeline' => 'covfefe',
'refresh' => 'wait_for'
]);

// Force index refresh if needed
$indexer->refresh('beers');

Expand Down
27 changes: 26 additions & 1 deletion src/Indexer.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ class Indexer
private $serializer;
/** @var Bulk|null */
private $currentBulk = null;
private $bulkRequestParams = [];

public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100)
public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100, array $bulkRequestParams = [])
{
// TODO: on the destruct, maybe throw an exception for non empty indexer queues?
$this->client = $client;
$this->bulkMaxSize = $bulkMaxSize ?? 100;
$this->serializer = $serializer;
$this->bulkRequestParams = $bulkRequestParams;
}

public function scheduleIndex($index, Document $document)
Expand Down Expand Up @@ -124,6 +126,7 @@ protected function getCurrentBulk(): Bulk
{
if (!($this->currentBulk instanceof Bulk)) {
$this->currentBulk = new Bulk($this->client);
$this->refreshBulkRequestParams();
}

return $this->currentBulk;
Expand All @@ -137,4 +140,26 @@ public function setBulkMaxSize(int $bulkMaxSize): void
$this->flush();
}
}

public function getBulkRequestParams(): array
{
return $this->bulkRequestParams;
}

public function setBulkRequestParams(array $bulkRequestParams): void
{
$this->bulkRequestParams = $bulkRequestParams;
$this->refreshBulkRequestParams();
}

private function refreshBulkRequestParams()
{
if (!$this->currentBulk) {
return;
}

foreach ($this->bulkRequestParams as $key => $value) {
$this->currentBulk->setRequestParam($key, $value);
}
}
}
34 changes: 34 additions & 0 deletions tests/IndexerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,40 @@ public function testIndexingWithError(): void
$this->assertEquals(0, $indexer->getQueueSize());
}

public function testRequestParameters(): void
{
$indexName = mb_strtolower(__FUNCTION__);
$dto = new TestDTO();
$dto->bar = 'I like unicorns.';
$dto->foo = 'Why is the sky blue?';

$client = $this->getClient();
$client->setConfigValue(Client::CONFIG_BULK_SIZE, 3);
$indexer = $client->getIndexer();
$indexer->setBulkRequestParams([
'refresh' => 'wait_for',
]);

$indexer->scheduleIndex($indexName, new Document('1', $dto));
$response = $indexer->flush();

$this->assertInstanceOf(ResponseSet::class, $response);
$transferInfo = $response->getTransferInfo();
$this->assertStringContainsString('_bulk?refresh=wait_for', $transferInfo['url']);

// Test the same with an invalid pipeline
$indexer->setBulkRequestParams([
'pipeline' => 'covfefe',
]);

$indexer->scheduleIndex($indexName, new Document('1', $dto));

$this->expectException(ResponseException::class);
$this->expectExceptionMessageMatches('/pipeline with id \[covfefe\] does not exist/');

$indexer->flush();
}

public function testIndexJsonString(): void
{
$indexName = mb_strtolower(__FUNCTION__);
Expand Down

0 comments on commit c1c6278

Please sign in to comment.