From ee2d99ca9b40f862fdca35390eda0001c30c1c53 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 30 Sep 2024 11:47:10 -0400 Subject: [PATCH] Document new experimental ingestion streaming APIs (#584) * Document new experimental ingestion streaming APIs Signed-off-by: Andriy Redko * Address review comments Signed-off-by: Andriy Redko * Address review comments and add tests Signed-off-by: Andriy Redko * Fix version constraints for tests Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko * Add transport-reactor-netty4 plugin to 2.17.0+ test containers Signed-off-by: Andriy Redko * Enable streaming tests for 2.17.0 only Signed-off-by: Andriy Redko * Exclude Dockefile from the list of scenario files Signed-off-by: Andriy Redko * Move tests from tests/default to tests/plugins for streaming Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- .github/workflows/test-spec.yml | 1 + .github/workflows/test-tools-integ.yml | 2 +- CHANGELOG.md | 1 + spec/namespaces/_core.yaml | 244 ++++++++++++++++++ spec/schemas/_common.yaml | 3 + tests/plugins/streaming/Dockerfile | 6 + .../plugins/streaming/_core/bulk/stream.yaml | 54 ++++ tests/plugins/streaming/docker-compose.yml | 18 ++ .../streaming/indices/bulk/stream.yaml | 60 +++++ tools/src/tester/TestRunner.ts | 2 +- .../tester/fixtures/evals/passed/passed.yaml | 2 +- .../tester/fixtures/evals/skipped/semver.yaml | 2 +- tools/tests/tester/helpers.ts | 2 +- 13 files changed, 392 insertions(+), 5 deletions(-) create mode 100644 tests/plugins/streaming/Dockerfile create mode 100644 tests/plugins/streaming/_core/bulk/stream.yaml create mode 100644 tests/plugins/streaming/docker-compose.yml create mode 100644 tests/plugins/streaming/indices/bulk/stream.yaml diff --git a/.github/workflows/test-spec.yml b/.github/workflows/test-spec.yml index a460dc0c1..06a098265 100644 --- a/.github/workflows/test-spec.yml +++ b/.github/workflows/test-spec.yml @@ -37,6 +37,7 @@ jobs: - version: 2.16.0 tests: snapshot - version: 2.17.0 + tests: plugins/streaming - version: 2.18.0 hub: opensearchstaging ref: '@sha256:4445e195c53992038891519dc3be0d273cdaad1b047943d68921168ed243e7e9' diff --git a/.github/workflows/test-tools-integ.yml b/.github/workflows/test-tools-integ.yml index 5bb9ac5d8..d4384f4af 100644 --- a/.github/workflows/test-tools-integ.yml +++ b/.github/workflows/test-tools-integ.yml @@ -24,7 +24,7 @@ jobs: test: runs-on: ubuntu-latest env: - OPENSEARCH_VERSION: 2.16.0 + OPENSEARCH_VERSION: 2.17.0 OPENSEARCH_PASSWORD: myStrongPassword123! OPENSEARCH_URL: https://localhost:9200 steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 1081c7af2..34f8fa1cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added `/_plugins/_flow_framework/`, `_search`, `state/_search`, `_provision`, `_deprovision`, `_steps`, and `_status` ([#508](https://github.com/opensearch-project/opensearch-api-specification/issues/508)) ([#833](https://github.com/opensearch-project/flow-framework/issues/833)) - Added `/_plugins/_ism/policies`, `add`, `remove`, `change_policy`, `explain`, and `retry` ([#568](https://github.com/opensearch-project/opensearch-api-specification/pull/568)) ([#578](https://github.com/opensearch-project/opensearch-api-specification/pull/578)) - Added `/_plugins/refresh_search_analyzers` ([#578](https://github.com/opensearch-project/opensearch-api-specification/pull/578)) +- Added `/_bulk/stream` ([#584](https://github.com/opensearch-project/opensearch-api-specification/pull/584)) - Added `/_plugins/_ml/agents/_register`, `/_plugins/_ml/connectors/_create`, `DELETE /_plugins/_ml/agents/{agent_id}`, `DELETE /_plugins/_ml/connectors/{connector_id}` ([#228](https://github.com/opensearch-project/opensearch-api-specification/issues/228)) - Added the `context` query param to the `put_script` APIs ([#586](https://github.com/opensearch-project/opensearch-api-specification/pull/586)) diff --git a/spec/namespaces/_core.yaml b/spec/namespaces/_core.yaml index 0ef3a16a6..a9e7b8b0d 100644 --- a/spec/namespaces/_core.yaml +++ b/spec/namespaces/_core.yaml @@ -76,6 +76,57 @@ paths: responses: '200': $ref: '#/components/responses/bulk@200' + /_bulk/stream: + post: + operationId: bulk_stream.0 + x-operation-group: bulk_stream + x-version-added: '2.17' + description: Allows to perform multiple index/update/delete operations using request response streaming. + externalDocs: + url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/ + parameters: + - $ref: '#/components/parameters/bulk_stream::query._source' + - $ref: '#/components/parameters/bulk_stream::query._source_excludes' + - $ref: '#/components/parameters/bulk_stream::query._source_includes' + - $ref: '#/components/parameters/bulk_stream::query.batch_interval' + - $ref: '#/components/parameters/bulk_stream::query.batch_size' + - $ref: '#/components/parameters/bulk_stream::query.pipeline' + - $ref: '#/components/parameters/bulk_stream::query.refresh' + - $ref: '#/components/parameters/bulk_stream::query.require_alias' + - $ref: '#/components/parameters/bulk_stream::query.routing' + - $ref: '#/components/parameters/bulk_stream::query.timeout' + - $ref: '#/components/parameters/bulk_stream::query.type' + - $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards' + requestBody: + $ref: '#/components/requestBodies/bulk_stream' + responses: + '200': + $ref: '#/components/responses/bulk_stream@200' + put: + operationId: bulk_stream.1 + x-operation-group: bulk_stream + x-version-added: '2.17' + description: Allows to perform multiple index/update/delete operations using request response streaming. + externalDocs: + url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/ + parameters: + - $ref: '#/components/parameters/bulk_stream::query._source' + - $ref: '#/components/parameters/bulk_stream::query._source_excludes' + - $ref: '#/components/parameters/bulk_stream::query._source_includes' + - $ref: '#/components/parameters/bulk_stream::query.batch_interval' + - $ref: '#/components/parameters/bulk_stream::query.batch_size' + - $ref: '#/components/parameters/bulk_stream::query.pipeline' + - $ref: '#/components/parameters/bulk_stream::query.refresh' + - $ref: '#/components/parameters/bulk_stream::query.require_alias' + - $ref: '#/components/parameters/bulk_stream::query.routing' + - $ref: '#/components/parameters/bulk_stream::query.timeout' + - $ref: '#/components/parameters/bulk_stream::query.type' + - $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards' + requestBody: + $ref: '#/components/requestBodies/bulk_stream' + responses: + '200': + $ref: '#/components/responses/bulk_stream@200' /_count: get: operationId: count.0 @@ -1056,6 +1107,59 @@ paths: responses: '200': $ref: '#/components/responses/bulk@200' + /{index}/_bulk/stream: + post: + operationId: bulk_stream.2 + x-operation-group: bulk_stream + x-version-added: '2.17' + description: Allows to perform multiple index/update/delete operations using request response streaming. + externalDocs: + url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/ + parameters: + - $ref: '#/components/parameters/bulk_stream::path.index' + - $ref: '#/components/parameters/bulk_stream::query._source' + - $ref: '#/components/parameters/bulk_stream::query._source_excludes' + - $ref: '#/components/parameters/bulk_stream::query._source_includes' + - $ref: '#/components/parameters/bulk_stream::query.batch_interval' + - $ref: '#/components/parameters/bulk_stream::query.batch_size' + - $ref: '#/components/parameters/bulk_stream::query.pipeline' + - $ref: '#/components/parameters/bulk_stream::query.refresh' + - $ref: '#/components/parameters/bulk_stream::query.require_alias' + - $ref: '#/components/parameters/bulk_stream::query.routing' + - $ref: '#/components/parameters/bulk_stream::query.timeout' + - $ref: '#/components/parameters/bulk_stream::query.type' + - $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards' + requestBody: + $ref: '#/components/requestBodies/bulk_stream' + responses: + '200': + $ref: '#/components/responses/bulk_stream@200' + put: + operationId: bulk_stream.3 + x-operation-group: bulk_stream + x-version-added: '2.17' + description: Allows to perform multiple index/update/delete operations using request response streaming. + externalDocs: + url: https://opensearch.org/docs/latest/api-reference/document-apis/bulk-streaming/ + parameters: + - $ref: '#/components/parameters/bulk_stream::path.index' + - $ref: '#/components/parameters/bulk_stream::query._source' + - $ref: '#/components/parameters/bulk_stream::query._source_excludes' + - $ref: '#/components/parameters/bulk_stream::query._source_includes' + - $ref: '#/components/parameters/bulk_stream::query.batch_interval' + - $ref: '#/components/parameters/bulk_stream::query.batch_size' + - $ref: '#/components/parameters/bulk_stream::query.pipeline' + - $ref: '#/components/parameters/bulk_stream::query.refresh' + - $ref: '#/components/parameters/bulk_stream::query.require_alias' + - $ref: '#/components/parameters/bulk_stream::query.routing' + - $ref: '#/components/parameters/bulk_stream::query.timeout' + - $ref: '#/components/parameters/bulk_stream::query.type' + - $ref: '#/components/parameters/bulk_stream::query.wait_for_active_shards' + requestBody: + $ref: '#/components/requestBodies/bulk_stream' + responses: + '200': + $ref: '#/components/responses/bulk_stream@200' /{index}/_count: get: operationId: count.2 @@ -2156,6 +2260,18 @@ components: - type: object description: The operation definition and data (action-data pairs), separated by newlines required: true + bulk_stream: + content: + application/x-ndjson: + schema: + type: array + items: + anyOf: + - $ref: '../schemas/_core.bulk.yaml#/components/schemas/OperationContainer' + - $ref: '../schemas/_core.bulk.yaml#/components/schemas/UpdateAction' + - type: object + description: The operation definition and data (action-data pairs), separated by newlines + required: true clear_scroll: content: application/json: @@ -2651,6 +2767,30 @@ components: - errors - items - took + bulk_stream@200: + content: + application/json: + schema: + type: object + properties: + errors: + type: boolean + items: + type: array + items: + type: object + additionalProperties: + $ref: '../schemas/_core.bulk.yaml#/components/schemas/ResponseItem' + minProperties: 1 + maxProperties: 1 + took: + type: number + ingest_took: + type: number + required: + - errors + - items + - took clear_scroll@200: content: application/json: @@ -3221,6 +3361,110 @@ components: $ref: '../schemas/_common.yaml#/components/schemas/WaitForActiveShards' style: form x-default: '1' + bulk_stream::path.index: + in: path + name: index + description: Name of the data stream, index, or index alias to perform bulk actions on. + required: true + schema: + $ref: '../schemas/_common.yaml#/components/schemas/IndexName' + style: simple + bulk_stream::query._source: + in: query + name: _source + description: '`true` or `false` to return the `_source` field or not, or a list of fields to return.' + schema: + $ref: '../schemas/_core.search.yaml#/components/schemas/SourceConfigParam' + style: form + bulk_stream::query._source_excludes: + in: query + name: _source_excludes + description: A comma-separated list of source fields to exclude from the response. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Fields' + style: form + bulk_stream::query._source_includes: + in: query + name: _source_includes + description: A comma-separated list of source fields to include in the response. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Fields' + style: form + bulk_stream::query.pipeline: + in: query + name: pipeline + description: |- + ID of the pipeline to use to preprocess incoming documents. + If the index has a default ingest pipeline specified, then setting the value to `_none` disables the default ingest pipeline for this request. + If a final pipeline is configured it will always run, regardless of the value of this parameter. + schema: + type: string + style: form + bulk_stream::query.refresh: + in: query + name: refresh + description: |- + If `true`, OpenSearch refreshes the affected shards to make this operation visible to search, if `wait_for` then wait for a refresh to make this operation visible to search, if `false` do nothing with refreshes. + Valid values: `true`, `false`, `wait_for`. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Refresh' + style: form + bulk_stream::query.require_alias: + in: query + name: require_alias + description: If `true`, the request's actions must target an index alias. + schema: + type: boolean + default: false + style: form + bulk_stream::query.routing: + in: query + name: routing + description: Custom value used to route operations to a specific shard. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Routing' + style: form + bulk_stream::query.timeout: + in: query + name: timeout + description: 'Period each action waits for the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.' + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Duration' + style: form + bulk_stream::query.type: + name: type + in: query + description: Default document type for items which don't provide one. + schema: + type: string + description: Default document type for items which don't provide one. + bulk_stream::query.wait_for_active_shards: + in: query + name: wait_for_active_shards + description: |- + The number of shard copies that must be active before proceeding with the operation. + Set to all or any positive integer up to the total number of shards in the index (`number_of_replicas+1`). + schema: + $ref: '../schemas/_common.yaml#/components/schemas/WaitForActiveShards' + style: form + x-default: '1' + bulk_stream::query.batch_size: + in: query + name: batch_size + description: |- + Specifies how many bulk operations should be accumulated into a batch before sending the batch to data nodes. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/BatchSize' + style: form + x-default: 1 + bulk_stream::query.batch_interval: + in: query + name: batch_interval + description: |- + Specifies for how long bulk operations should be accumulated into a batch before sending the batch to data nodes. + schema: + $ref: '../schemas/_common.yaml#/components/schemas/Duration' + style: form clear_scroll::path.scroll_id: in: path name: scroll_id diff --git a/spec/schemas/_common.yaml b/spec/schemas/_common.yaml index 889c8b014..c3e9468d3 100644 --- a/spec/schemas/_common.yaml +++ b/spec/schemas/_common.yaml @@ -2238,3 +2238,6 @@ components: required: - reason - status + BatchSize: + type: integer + format: int64 diff --git a/tests/plugins/streaming/Dockerfile b/tests/plugins/streaming/Dockerfile new file mode 100644 index 000000000..ba94fcedd --- /dev/null +++ b/tests/plugins/streaming/Dockerfile @@ -0,0 +1,6 @@ +ARG OPENSEARCH_DOCKER_HUB_PROJECT +ARG OPENSEARCH_VERSION +ARG OPENSEARCH_DOCKER_REF + +FROM ${OPENSEARCH_DOCKER_HUB_PROJECT}/opensearch:${OPENSEARCH_VERSION}${OPENSEARCH_DOCKER_REF} +RUN ./bin/opensearch-plugin install -b transport-reactor-netty4 diff --git a/tests/plugins/streaming/_core/bulk/stream.yaml b/tests/plugins/streaming/_core/bulk/stream.yaml new file mode 100644 index 000000000..7560ea715 --- /dev/null +++ b/tests/plugins/streaming/_core/bulk/stream.yaml @@ -0,0 +1,54 @@ +$schema: ../../../../../json_schemas/test_story.schema.yaml + +description: Test bulk streaming endpoint. +epilogues: + - path: /books,movies + method: DELETE + status: [200, 404] +chapters: + - synopsis: Create an index. + version: '>= 2.17' + path: /_bulk/stream + method: POST + request: + content_type: application/x-ndjson + payload: + - {create: {_index: movies}} + - {director: Bennett Miller, title: Moneyball, year: 2011} + - synopsis: Delete document in an index. + version: '2.17' + path: /_bulk/stream + method: PUT + request: + content_type: application/x-ndjson + payload: + - {delete: {_index: movies, _id: invalid}} + response: + status: 200 + payload: + errors: false + items: + - delete: + _index: movies + _id: invalid + result: not_found + status: 404 + - synopsis: Bulk document CRUD. + version: '>= 2.17' + path: /_bulk/stream + method: POST + request: + content_type: application/x-ndjson + payload: + - {create: {_index: books, _id: book_1392214}} + - {author: Harper Lee, title: To Kill a Mockingbird, year: 1960} + - {update: {_index: books, _id: book_1392214}} + - {doc: {pages: 376}} + - {update: {_index: books, _id: book_1392214}} + - {doc: {pages: 376}, _source: true} + - {update: {_index: books, _id: book_1392214}} + - {script: {source: ctx._source.pages = 376;}} + - {update: {_index: books, _id: does_not_exist}} + - {script: {source: 'ctx.op = "none";'}, scripted_upsert: true, upsert: {pages: 375}} + - {delete: {_index: books, _id: book_1392214}} + diff --git a/tests/plugins/streaming/docker-compose.yml b/tests/plugins/streaming/docker-compose.yml new file mode 100644 index 000000000..aaf1d9f5b --- /dev/null +++ b/tests/plugins/streaming/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3' + +services: + opensearch-cluster: + build: + context: . + args: + - OPENSEARCH_DOCKER_HUB_PROJECT=${OPENSEARCH_DOCKER_HUB_PROJECT:-opensearchproject} + - OPENSEARCH_DOCKER_REF=${OPENSEARCH_DOCKER_REF} + - OPENSEARCH_VERSION=${OPENSEARCH_VERSION:-latest} + ports: + - 9200:9200 + - 9600:9600 + environment: + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_PASSWORD:-myStrongPassword123!} + - OPENSEARCH_JAVA_OPTS=${OPENSEARCH_JAVA_OPTS} + - discovery.type=single-node + - http.type=reactor-netty4-secure diff --git a/tests/plugins/streaming/indices/bulk/stream.yaml b/tests/plugins/streaming/indices/bulk/stream.yaml new file mode 100644 index 000000000..546d77b63 --- /dev/null +++ b/tests/plugins/streaming/indices/bulk/stream.yaml @@ -0,0 +1,60 @@ +$schema: ../../../../../json_schemas/test_story.schema.yaml + +description: Test bulk streaming index endpoint. +epilogues: + - path: /books,movies + method: DELETE + status: [200, 404] +chapters: + - synopsis: Create an index. + version: '2.17' + path: /{index}/_bulk/stream + method: POST + parameters: + index: movies + request: + content_type: application/x-ndjson + payload: + - {create: {}} + - {director: Bennett Miller, title: Moneyball, year: 2011} + - synopsis: Delete document in an index. + version: '>= 2.17' + path: /{index}/_bulk/stream + method: PUT + parameters: + index: movies + request: + content_type: application/x-ndjson + payload: + - {delete: {_id: invalid}} + response: + status: 200 + payload: + errors: false + items: + - delete: + _index: movies + _id: invalid + result: not_found + status: 404 + - synopsis: Bulk index document CRUD. + version: '>= 2.17' + method: POST + path: /{index}/_bulk/stream + parameters: + index: books + request: + content_type: application/x-ndjson + payload: + - {create: {_id: book_1392214}} + - {author: Harper Lee, title: To Kill a Mockingbird, year: 1960} + - {update: {_id: book_1392214}} + - {doc: {pages: 376}} + - {update: {_id: book_1392214}} + - {doc: {pages: 376}, _source: true} + - {update: {_id: book_1392214}} + - {script: {source: ctx._source.pages = 376;}} + - {update: {_id: does_not_exist}} + - {script: {source: 'ctx.op = "none";'}, scripted_upsert: true, upsert: {pages: 375}} + - {delete: {_id: book_1392214}} + diff --git a/tools/src/tester/TestRunner.ts b/tools/src/tester/TestRunner.ts index dc880f946..7b36af3b9 100644 --- a/tools/src/tester/TestRunner.ts +++ b/tools/src/tester/TestRunner.ts @@ -70,7 +70,7 @@ export default class TestRunner { #collect_story_files (folder: string, file: string, prefix: string): StoryFile[] { const path = file === '' ? folder : `${folder}/${file}` const next_prefix = prefix === '' ? file : `${prefix}/${file}` - if (file.startsWith('.') || file == 'docker-compose.yml') { + if (file.startsWith('.') || file == 'docker-compose.yml' || file == 'Dockerfile') { return [] } else if (fs.statSync(path).isFile()) { const story: Story = read_yaml(path) diff --git a/tools/tests/tester/fixtures/evals/passed/passed.yaml b/tools/tests/tester/fixtures/evals/passed/passed.yaml index da08a29fa..a87c944c9 100644 --- a/tools/tests/tester/fixtures/evals/passed/passed.yaml +++ b/tools/tests/tester/fixtures/evals/passed/passed.yaml @@ -202,7 +202,7 @@ chapters: - title: This GET /_cat/health should be skipped (> 2.999.0). overall: result: SKIPPED - message: Skipped because version 2.16.0 does not satisfy >= 2.999.0. + message: Skipped because version 2.17.0 does not satisfy >= 2.999.0. - title: This GET /_cat/health should run (>= 1.3, < 99.0). overall: result: PASSED diff --git a/tools/tests/tester/fixtures/evals/skipped/semver.yaml b/tools/tests/tester/fixtures/evals/skipped/semver.yaml index 30709742d..799717afe 100644 --- a/tools/tests/tester/fixtures/evals/skipped/semver.yaml +++ b/tools/tests/tester/fixtures/evals/skipped/semver.yaml @@ -3,5 +3,5 @@ full_path: tools/tests/tester/fixtures/stories/skipped/semver.yaml result: SKIPPED description: This story should be skipped because of version. -message: Skipped because version 2.16.0 does not satisfy >= 2.999.0. +message: Skipped because version 2.17.0 does not satisfy >= 2.999.0. diff --git a/tools/tests/tester/helpers.ts b/tools/tests/tester/helpers.ts index 1a38c4c6b..354fcabe4 100644 --- a/tools/tests/tester/helpers.ts +++ b/tools/tests/tester/helpers.ts @@ -142,5 +142,5 @@ export async function load_actual_evaluation (evaluator: StoryEvaluator, name: s full_path, display_path: `${name}.yaml`, story: read_yaml(full_path) - }, process.env.OPENSEARCH_VERSION ?? '2.16.0', process.env.OPENSEARCH_DISTRIBUTION ?? 'opensearch.org')) + }, process.env.OPENSEARCH_VERSION ?? '2.17.0', process.env.OPENSEARCH_DISTRIBUTION ?? 'opensearch.org')) }