Skip to content

Commit

Permalink
Document new experimental ingestion streaming APIs (#584)
Browse files Browse the repository at this point in the history
* Document new experimental ingestion streaming APIs

Signed-off-by: Andriy Redko <[email protected]>

* Address review comments

Signed-off-by: Andriy Redko <[email protected]>

* Address review comments and add tests

Signed-off-by: Andriy Redko <[email protected]>

* Fix version constraints for tests

Signed-off-by: Andriy Redko <[email protected]>

* Address code review comments

Signed-off-by: Andriy Redko <[email protected]>

* Add transport-reactor-netty4 plugin to 2.17.0+ test containers

Signed-off-by: Andriy Redko <[email protected]>

* Enable streaming tests for 2.17.0 only

Signed-off-by: Andriy Redko <[email protected]>

* Exclude Dockefile from the list of scenario files

Signed-off-by: Andriy Redko <[email protected]>

* Move tests from tests/default to tests/plugins for streaming

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Sep 30, 2024
1 parent ee98419 commit ee2d99c
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 5 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
- version: 2.16.0
tests: snapshot
- version: 2.17.0
tests: plugins/streaming
- version: 2.18.0
hub: opensearchstaging
ref: '@sha256:4445e195c53992038891519dc3be0d273cdaad1b047943d68921168ed243e7e9'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-tools-integ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
test:
runs-on: ubuntu-latest
env:
OPENSEARCH_VERSION: 2.16.0
OPENSEARCH_VERSION: 2.17.0
OPENSEARCH_PASSWORD: myStrongPassword123!
OPENSEARCH_URL: https://localhost:9200
steps:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added `/_plugins/_flow_framework/`, `_search`, `state/_search`, `_provision`, `_deprovision`, `_steps`, and `_status` ([#508](https://github.com/opensearch-project/opensearch-api-specification/issues/508)) ([#833](https://github.com/opensearch-project/flow-framework/issues/833))
- Added `/_plugins/_ism/policies`, `add`, `remove`, `change_policy`, `explain`, and `retry` ([#568](https://github.com/opensearch-project/opensearch-api-specification/pull/568)) ([#578](https://github.com/opensearch-project/opensearch-api-specification/pull/578))
- Added `/_plugins/refresh_search_analyzers` ([#578](https://github.com/opensearch-project/opensearch-api-specification/pull/578))
- Added `/_bulk/stream` ([#584](https://github.com/opensearch-project/opensearch-api-specification/pull/584))
- Added `/_plugins/_ml/agents/_register`, `/_plugins/_ml/connectors/_create`, `DELETE /_plugins/_ml/agents/{agent_id}`, `DELETE /_plugins/_ml/connectors/{connector_id}` ([#228](https://github.com/opensearch-project/opensearch-api-specification/issues/228))
- Added the `context` query param to the `put_script` APIs ([#586](https://github.com/opensearch-project/opensearch-api-specification/pull/586))

Expand Down
244 changes: 244 additions & 0 deletions spec/namespaces/_core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,57 @@ paths:
responses:
'200':
$ref: '#/components/responses/bulk@200'
/_bulk/stream:
post:
operationId: bulk_stream.0
x-operation-group: bulk_stream
x-version-added: '2.17'
description: Allows to perform multiple index/update/delete operations using request response streaming.
externalDocs:
url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/
parameters:
- $ref: '#/components/parameters/bulk_stream::query._source'
- $ref: '#/components/parameters/bulk_stream::query._source_excludes'
- $ref: '#/components/parameters/bulk_stream::query._source_includes'
- $ref: '#/components/parameters/bulk_stream::query.batch_interval'
- $ref: '#/components/parameters/bulk_stream::query.batch_size'
- $ref: '#/components/parameters/bulk_stream::query.pipeline'
- $ref: '#/components/parameters/bulk_stream::query.refresh'
- $ref: '#/components/parameters/bulk_stream::query.require_alias'
- $ref: '#/components/parameters/bulk_stream::query.routing'
- $ref: '#/components/parameters/bulk_stream::query.timeout'
- $ref: '#/components/parameters/bulk_stream::query.type'
- $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards'
requestBody:
$ref: '#/components/requestBodies/bulk_stream'
responses:
'200':
$ref: '#/components/responses/bulk_stream@200'
put:
operationId: bulk_stream.1
x-operation-group: bulk_stream
x-version-added: '2.17'
description: Allows to perform multiple index/update/delete operations using request response streaming.
externalDocs:
url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/
parameters:
- $ref: '#/components/parameters/bulk_stream::query._source'
- $ref: '#/components/parameters/bulk_stream::query._source_excludes'
- $ref: '#/components/parameters/bulk_stream::query._source_includes'
- $ref: '#/components/parameters/bulk_stream::query.batch_interval'
- $ref: '#/components/parameters/bulk_stream::query.batch_size'
- $ref: '#/components/parameters/bulk_stream::query.pipeline'
- $ref: '#/components/parameters/bulk_stream::query.refresh'
- $ref: '#/components/parameters/bulk_stream::query.require_alias'
- $ref: '#/components/parameters/bulk_stream::query.routing'
- $ref: '#/components/parameters/bulk_stream::query.timeout'
- $ref: '#/components/parameters/bulk_stream::query.type'
- $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards'
requestBody:
$ref: '#/components/requestBodies/bulk_stream'
responses:
'200':
$ref: '#/components/responses/bulk_stream@200'
/_count:
get:
operationId: count.0
Expand Down Expand Up @@ -1056,6 +1107,59 @@ paths:
responses:
'200':
$ref: '#/components/responses/bulk@200'
/{index}/_bulk/stream:
post:
operationId: bulk_stream.2
x-operation-group: bulk_stream
x-version-added: '2.17'
description: Allows to perform multiple index/update/delete operations using request response streaming.
externalDocs:
url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/
parameters:
- $ref: '#/components/parameters/bulk_stream::path.index'
- $ref: '#/components/parameters/bulk_stream::query._source'
- $ref: '#/components/parameters/bulk_stream::query._source_excludes'
- $ref: '#/components/parameters/bulk_stream::query._source_includes'
- $ref: '#/components/parameters/bulk_stream::query.batch_interval'
- $ref: '#/components/parameters/bulk_stream::query.batch_size'
- $ref: '#/components/parameters/bulk_stream::query.pipeline'
- $ref: '#/components/parameters/bulk_stream::query.refresh'
- $ref: '#/components/parameters/bulk_stream::query.require_alias'
- $ref: '#/components/parameters/bulk_stream::query.routing'
- $ref: '#/components/parameters/bulk_stream::query.timeout'
- $ref: '#/components/parameters/bulk_stream::query.type'
- $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards'
requestBody:
$ref: '#/components/requestBodies/bulk_stream'
responses:
'200':
$ref: '#/components/responses/bulk_stream@200'
put:
operationId: bulk_stream.3
x-operation-group: bulk_stream
x-version-added: '2.17'
description: Allows to perform multiple index/update/delete operations using request response streaming.
externalDocs:
url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/
parameters:
- $ref: '#/components/parameters/bulk_stream::path.index'
- $ref: '#/components/parameters/bulk_stream::query._source'
- $ref: '#/components/parameters/bulk_stream::query._source_excludes'
- $ref: '#/components/parameters/bulk_stream::query._source_includes'
- $ref: '#/components/parameters/bulk_stream::query.batch_interval'
- $ref: '#/components/parameters/bulk_stream::query.batch_size'
- $ref: '#/components/parameters/bulk_stream::query.pipeline'
- $ref: '#/components/parameters/bulk_stream::query.refresh'
- $ref: '#/components/parameters/bulk_stream::query.require_alias'
- $ref: '#/components/parameters/bulk_stream::query.routing'
- $ref: '#/components/parameters/bulk_stream::query.timeout'
- $ref: '#/components/parameters/bulk_stream::query.type'
- $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards'
requestBody:
$ref: '#/components/requestBodies/bulk_stream'
responses:
'200':
$ref: '#/components/responses/bulk_stream@200'
/{index}/_count:
get:
operationId: count.2
Expand Down Expand Up @@ -2156,6 +2260,18 @@ components:
- type: object
description: The operation definition and data (action-data pairs), separated by newlines
required: true
bulk_stream:
content:
application/x-ndjson:
schema:
type: array
items:
anyOf:
- $ref: '../schemas/_core.bulk.yaml#/components/schemas/OperationContainer'
- $ref: '../schemas/_core.bulk.yaml#/components/schemas/UpdateAction'
- type: object
description: The operation definition and data (action-data pairs), separated by newlines
required: true
clear_scroll:
content:
application/json:
Expand Down Expand Up @@ -2651,6 +2767,30 @@ components:
- errors
- items
- took
bulk_stream@200:
content:
application/json:
schema:
type: object
properties:
errors:
type: boolean
items:
type: array
items:
type: object
additionalProperties:
$ref: '../schemas/_core.bulk.yaml#/components/schemas/ResponseItem'
minProperties: 1
maxProperties: 1
took:
type: number
ingest_took:
type: number
required:
- errors
- items
- took
clear_scroll@200:
content:
application/json:
Expand Down Expand Up @@ -3221,6 +3361,110 @@ components:
$ref: '../schemas/_common.yaml#/components/schemas/WaitForActiveShards'
style: form
x-default: '1'
bulk_stream::path.index:
in: path
name: index
description: Name of the data stream, index, or index alias to perform bulk actions on.
required: true
schema:
$ref: '../schemas/_common.yaml#/components/schemas/IndexName'
style: simple
bulk_stream::query._source:
in: query
name: _source
description: '`true` or `false` to return the `_source` field or not, or a list of fields to return.'
schema:
$ref: '../schemas/_core.search.yaml#/components/schemas/SourceConfigParam'
style: form
bulk_stream::query._source_excludes:
in: query
name: _source_excludes
description: A comma-separated list of source fields to exclude from the response.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Fields'
style: form
bulk_stream::query._source_includes:
in: query
name: _source_includes
description: A comma-separated list of source fields to include in the response.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Fields'
style: form
bulk_stream::query.pipeline:
in: query
name: pipeline
description: |-
ID of the pipeline to use to preprocess incoming documents.
If the index has a default ingest pipeline specified, then setting the value to `_none` disables the default ingest pipeline for this request.
If a final pipeline is configured it will always run, regardless of the value of this parameter.
schema:
type: string
style: form
bulk_stream::query.refresh:
in: query
name: refresh
description: |-
If `true`, OpenSearch refreshes the affected shards to make this operation visible to search, if `wait_for` then wait for a refresh to make this operation visible to search, if `false` do nothing with refreshes.
Valid values: `true`, `false`, `wait_for`.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Refresh'
style: form
bulk_stream::query.require_alias:
in: query
name: require_alias
description: If `true`, the request's actions must target an index alias.
schema:
type: boolean
default: false
style: form
bulk_stream::query.routing:
in: query
name: routing
description: Custom value used to route operations to a specific shard.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Routing'
style: form
bulk_stream::query.timeout:
in: query
name: timeout
description: 'Period each action waits for the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.'
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Duration'
style: form
bulk_stream::query.type:
name: type
in: query
description: Default document type for items which don't provide one.
schema:
type: string
description: Default document type for items which don't provide one.
bulk_stream::query.wait_for_active_shards:
in: query
name: wait_for_active_shards
description: |-
The number of shard copies that must be active before proceeding with the operation.
Set to all or any positive integer up to the total number of shards in the index (`number_of_replicas+1`).
schema:
$ref: '../schemas/_common.yaml#/components/schemas/WaitForActiveShards'
style: form
x-default: '1'
bulk_stream::query.batch_size:
in: query
name: batch_size
description: |-
Specifies how many bulk operations should be accumulated into a batch before sending the batch to data nodes.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/BatchSize'
style: form
x-default: 1
bulk_stream::query.batch_interval:
in: query
name: batch_interval
description: |-
Specifies for how long bulk operations should be accumulated into a batch before sending the batch to data nodes.
schema:
$ref: '../schemas/_common.yaml#/components/schemas/Duration'
style: form
clear_scroll::path.scroll_id:
in: path
name: scroll_id
Expand Down
3 changes: 3 additions & 0 deletions spec/schemas/_common.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2238,3 +2238,6 @@ components:
required:
- reason
- status
BatchSize:
type: integer
format: int64
6 changes: 6 additions & 0 deletions tests/plugins/streaming/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ARG OPENSEARCH_DOCKER_HUB_PROJECT
ARG OPENSEARCH_VERSION
ARG OPENSEARCH_DOCKER_REF

FROM ${OPENSEARCH_DOCKER_HUB_PROJECT}/opensearch:${OPENSEARCH_VERSION}${OPENSEARCH_DOCKER_REF}
RUN ./bin/opensearch-plugin install -b transport-reactor-netty4
54 changes: 54 additions & 0 deletions tests/plugins/streaming/_core/bulk/stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
$schema: ../../../../../json_schemas/test_story.schema.yaml

description: Test bulk streaming endpoint.
epilogues:
- path: /books,movies
method: DELETE
status: [200, 404]
chapters:
- synopsis: Create an index.
version: '>= 2.17'
path: /_bulk/stream
method: POST
request:
content_type: application/x-ndjson
payload:
- {create: {_index: movies}}
- {director: Bennett Miller, title: Moneyball, year: 2011}
- synopsis: Delete document in an index.
version: '2.17'
path: /_bulk/stream
method: PUT
request:
content_type: application/x-ndjson
payload:
- {delete: {_index: movies, _id: invalid}}
response:
status: 200
payload:
errors: false
items:
- delete:
_index: movies
_id: invalid
result: not_found
status: 404
- synopsis: Bulk document CRUD.
version: '>= 2.17'
path: /_bulk/stream
method: POST
request:
content_type: application/x-ndjson
payload:
- {create: {_index: books, _id: book_1392214}}
- {author: Harper Lee, title: To Kill a Mockingbird, year: 1960}
- {update: {_index: books, _id: book_1392214}}
- {doc: {pages: 376}}
- {update: {_index: books, _id: book_1392214}}
- {doc: {pages: 376}, _source: true}
- {update: {_index: books, _id: book_1392214}}
- {script: {source: ctx._source.pages = 376;}}
- {update: {_index: books, _id: does_not_exist}}
- {script: {source: 'ctx.op = "none";'}, scripted_upsert: true, upsert: {pages: 375}}
- {delete: {_index: books, _id: book_1392214}}

18 changes: 18 additions & 0 deletions tests/plugins/streaming/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3'

services:
opensearch-cluster:
build:
context: .
args:
- OPENSEARCH_DOCKER_HUB_PROJECT=${OPENSEARCH_DOCKER_HUB_PROJECT:-opensearchproject}
- OPENSEARCH_DOCKER_REF=${OPENSEARCH_DOCKER_REF}
- OPENSEARCH_VERSION=${OPENSEARCH_VERSION:-latest}
ports:
- 9200:9200
- 9600:9600
environment:
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_PASSWORD:-myStrongPassword123!}
- OPENSEARCH_JAVA_OPTS=${OPENSEARCH_JAVA_OPTS}
- discovery.type=single-node
- http.type=reactor-netty4-secure
Loading

0 comments on commit ee2d99c

Please sign in to comment.