From c1c627880ec2e30a990a4c68f89a5d4a35128c56 Mon Sep 17 00:00:00 2001 From: Damien ALEXANDRE Date: Tue, 20 Oct 2020 20:55:50 +0200 Subject: [PATCH] Ref #31 Add bulk query string parameters support --- CHANGELOG.md | 1 + README.md | 6 ++++++ src/Indexer.php | 27 ++++++++++++++++++++++++++- tests/IndexerTest.php | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7048c1..a42483b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- New `setBulkRequestParams` on the Indexer allowing all the Bulk query params. - This changelog file. ## [1.0.2] - 2020-07-31 diff --git a/README.md b/README.md index 6909977..80cc3a6 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,12 @@ $dto->foo = 'Hops from Alsace, France'; $indexer->scheduleIndex('beers', new Document('123', $dto)); $indexer->flush(); +// Set parameters on the Bulk +$indexer->setBulkRequestParams([ + 'pipeline' => 'covfefe', + 'refresh' => 'wait_for' +]); + // Force index refresh if needed $indexer->refresh('beers'); diff --git a/src/Indexer.php b/src/Indexer.php index ddb1166..78f49f1 100644 --- a/src/Indexer.php +++ b/src/Indexer.php @@ -15,14 +15,16 @@ class Indexer private $serializer; /** @var Bulk|null */ private $currentBulk = null; + private $bulkRequestParams = []; - public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100) + public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100, array $bulkRequestParams = []) { // TODO: on the destruct, maybe throw an exception for non empty indexer queues? $this->client = $client; $this->bulkMaxSize = $bulkMaxSize ?? 100; $this->serializer = $serializer; + $this->bulkRequestParams = $bulkRequestParams; } public function scheduleIndex($index, Document $document) @@ -124,6 +126,7 @@ protected function getCurrentBulk(): Bulk { if (!($this->currentBulk instanceof Bulk)) { $this->currentBulk = new Bulk($this->client); + $this->refreshBulkRequestParams(); } return $this->currentBulk; @@ -137,4 +140,26 @@ public function setBulkMaxSize(int $bulkMaxSize): void $this->flush(); } } + + public function getBulkRequestParams(): array + { + return $this->bulkRequestParams; + } + + public function setBulkRequestParams(array $bulkRequestParams): void + { + $this->bulkRequestParams = $bulkRequestParams; + $this->refreshBulkRequestParams(); + } + + private function refreshBulkRequestParams() + { + if (!$this->currentBulk) { + return; + } + + foreach ($this->bulkRequestParams as $key => $value) { + $this->currentBulk->setRequestParam($key, $value); + } + } } diff --git a/tests/IndexerTest.php b/tests/IndexerTest.php index 677cc98..fc5aa45 100644 --- a/tests/IndexerTest.php +++ b/tests/IndexerTest.php @@ -140,6 +140,40 @@ public function testIndexingWithError(): void $this->assertEquals(0, $indexer->getQueueSize()); } + public function testRequestParameters(): void + { + $indexName = mb_strtolower(__FUNCTION__); + $dto = new TestDTO(); + $dto->bar = 'I like unicorns.'; + $dto->foo = 'Why is the sky blue?'; + + $client = $this->getClient(); + $client->setConfigValue(Client::CONFIG_BULK_SIZE, 3); + $indexer = $client->getIndexer(); + $indexer->setBulkRequestParams([ + 'refresh' => 'wait_for', + ]); + + $indexer->scheduleIndex($indexName, new Document('1', $dto)); + $response = $indexer->flush(); + + $this->assertInstanceOf(ResponseSet::class, $response); + $transferInfo = $response->getTransferInfo(); + $this->assertStringContainsString('_bulk?refresh=wait_for', $transferInfo['url']); + + // Test the same with an invalid pipeline + $indexer->setBulkRequestParams([ + 'pipeline' => 'covfefe', + ]); + + $indexer->scheduleIndex($indexName, new Document('1', $dto)); + + $this->expectException(ResponseException::class); + $this->expectExceptionMessageMatches('/pipeline with id \[covfefe\] does not exist/'); + + $indexer->flush(); + } + public function testIndexJsonString(): void { $indexName = mb_strtolower(__FUNCTION__);