Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional attributes for redis.search methods create_index, search #2635

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
191ef9d
Add wrappers for redis.search methods create_index, search, aggregate
mpozniak95 Jun 25, 2024
15e09d3
Skip creating span for redisearch methods in _traced_execute_command
mpozniak95 Jun 26, 2024
4cb2415
Reformatted file
mpozniak95 Jun 27, 2024
16d99e0
Add additional functional tests
mpozniak95 Jun 27, 2024
c53b08e
Merge remote-tracking branch 'origin/main' into redis_additional_attr…
mpozniak95 Aug 12, 2024
fd52a83
Merge branch 'main' into redis_additional_attributes
lzchen Aug 14, 2024
5507098
Change the span kind to client, add db.system attributes to added met…
mpozniak95 Aug 26, 2024
da7ae34
Reformat files
mpozniak95 Aug 26, 2024
5a1a1d5
Merge branch 'main' into redis_additional_attributes
lzchen Aug 26, 2024
748022a
Reformat files
mpozniak95 Aug 28, 2024
adc4263
Merge branch 'open-telemetry:main' into redis_additional_attributes
mpozniak95 Aug 28, 2024
59b7c91
Remove aggregate, and skip function, edit attributes in execute command
mpozniak95 Aug 28, 2024
7b5bd19
Merge branch 'main' into redis_additional_attributes
lzchen Aug 28, 2024
a9e5afc
Fix linter problems
mpozniak95 Aug 29, 2024
2ad9768
Merge remote-tracking branch 'origin/main' into redis_additional_attr…
mpozniak95 Aug 30, 2024
8c6c75d
Change name to _set_span_attribute_if_value, use constant for _FIELD_…
mpozniak95 Aug 30, 2024
23b2736
Remove useless return
mpozniak95 Aug 30, 2024
6bf2b22
Merge branch 'main' into redis_additional_attributes
lzchen Sep 4, 2024
ae5f1a3
Change docker image to redis-stack for redis docker tests
mpozniak95 Sep 16, 2024
87374e4
Merge branch 'main' into redis_additional_attributes
lzchen Sep 16, 2024
fb3ba66
Add span.is_recording check before adding additional parameters to spans
mpozniak95 Sep 17, 2024
c3fc628
Merge branch 'main' into redis_additional_attributes
lzchen Sep 18, 2024
cc52746
Add entry to changelog
mpozniak95 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def response_hook(span, instance, response):
from opentelemetry.instrumentation.redis.util import (
_extract_conn_attributes,
_format_command_args,
_set_span_attribute_if_value,
_value_or_none,
)
from opentelemetry.instrumentation.redis.version import __version__
from opentelemetry.instrumentation.utils import unwrap
Expand All @@ -126,6 +128,8 @@ def response_hook(span, instance, response):
_REDIS_CLUSTER_VERSION = (4, 1, 0)
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)

_FIELD_TYPES = ["NUMERIC", "TEXT", "GEO", "TAG", "VECTOR"]


def _set_connection_attributes(span, conn):
if not span.is_recording() or not hasattr(conn, "connection_pool"):
Expand All @@ -138,7 +142,12 @@ def _set_connection_attributes(span, conn):

def _build_span_name(instance, cmd_args):
if len(cmd_args) > 0 and cmd_args[0]:
name = cmd_args[0]
if cmd_args[0] == "FT.SEARCH":
name = "redis.search"
elif cmd_args[0] == "FT.CREATE":
name = "redis.create_index"
else:
name = cmd_args[0]
else:
name = instance.connection_pool.connection_kwargs.get("db", 0)
return name
Expand Down Expand Up @@ -181,17 +190,21 @@ def _instrument(
def _traced_execute_command(func, instance, args, kwargs):
query = _format_command_args(args)
name = _build_span_name(instance, args)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if span.name == "redis.create_index":
_add_create_attributes(span, args)
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = func(*args, **kwargs)
if span.is_recording():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can't we include this as part of the above "if"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, because one of the additional attributes is based on the response of search method so we need this to be placed after:
response = func(*args, **kwargs)
which as I understand should happen even if span is not recording

if span.name == "redis.search":
_add_search_attributes(span, response, args)
if callable(response_hook):
response_hook(span, instance, response)
return response
Expand All @@ -202,9 +215,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)

exception = None

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
Expand All @@ -230,6 +241,60 @@ def _traced_execute_pipeline(func, instance, args, kwargs):

return response

def _add_create_attributes(span, args):
_set_span_attribute_if_value(
span, "redis.create_index.index", _value_or_none(args, 1)
)
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
try:
schema_index = args.index("SCHEMA")
except ValueError:
return
schema = args[schema_index:]
field_attribute = ""
# Schema in format:
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
field_attribute = "".join(
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
for index in range(1, len(schema))
if schema[index] in _FIELD_TYPES
)
_set_span_attribute_if_value(
span,
"redis.create_index.fields",
field_attribute,
)

def _add_search_attributes(span, response, args):
_set_span_attribute_if_value(
span, "redis.search.index", _value_or_none(args, 1)
)
_set_span_attribute_if_value(
span, "redis.search.query", _value_or_none(args, 2)
)
# Parse response from search
# https://redis.io/docs/latest/commands/ft.search/
# Response in format:
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
# Returned documents in array format:
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
number_of_returned_documents = _value_or_none(response, 0)
_set_span_attribute_if_value(
span, "redis.search.total", number_of_returned_documents
)
if "NOCONTENT" in args or not number_of_returned_documents:
return
for document_number in range(number_of_returned_documents):
document_index = _value_or_none(response, 1 + 2 * document_number)
if document_index:
document = response[2 + 2 * document_number]
for attribute_name_index in range(0, len(document), 2):
_set_span_attribute_if_value(
span,
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
document[attribute_name_index + 1],
)

pipeline_class = (
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,15 @@ def _format_command_args(args):
out_str = ""

return out_str


def _set_span_attribute_if_value(span, name, value):
if value is not None and value != "":
span.set_attribute(name, value)


def _value_or_none(values, n):
try:
return values[n]
except IndexError:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
POSTGRES_PASSWORD: testpassword
POSTGRES_DB: opentelemetry-tests
otredis:
image: redis:4.0-alpine
image: redis/redis-stack:7.2.0-v12
ports:
- "127.0.0.1:6379:6379"
otrediscluster:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
import redis
import redis.asyncio

from redis.exceptions import ResponseError
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.aggregation import AggregateRequest
from redis.commands.search.query import Query
from redis.commands.search.field import (
TextField,
VectorField,
)

from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -614,3 +623,63 @@ def test_get(self):
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET ?"
)


class TestRedisearchInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.Redis(port=6379)
self.redis_client.flushall()
self.embedding_dim = 256
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
self.prepare_data()
self.create_index()

def tearDown(self):
RedisInstrumentor().uninstrument()
super().tearDown()

def prepare_data(self):
try:
self.redis_client.ft("idx:test_vss").dropindex(True)
except ResponseError:
print("No such index")
item = {"name": "test",
"value": "test_value",
"embeddings": [0.1] * 256}
pipeline = self.redis_client.pipeline()
pipeline.json().set(f"test:001", "$", item)
res = pipeline.execute()
assert False not in res

def create_index(self):
schema = (
TextField("$.name", no_stem=True, as_name="name"),
TextField("$.value", no_stem=True, as_name="value"),
VectorField("$.embeddings",
"FLAT",
{
"TYPE": "FLOAT32",
"DIM": self.embedding_dim,
"DISTANCE_METRIC": "COSINE",
},
as_name="vector",),
)
definition = IndexDefinition(prefix=["test:"], index_type=IndexType.JSON)
res = self.redis_client.ft("idx:test_vss").create_index(fields=schema, definition=definition)
assert "OK" in str(res)

def test_redis_create_index(self):
spans = self.memory_exporter.get_finished_spans()
span = next(span for span in spans if span.name == "redis.create_index")
assert "redis.create_index.fields" in span.attributes

def test_redis_query(self):
query = "@name:test"
res = self.redis_client.ft("idx:test_vss").search(Query(query))

spans = self.memory_exporter.get_finished_spans()
span = next(span for span in spans if span.name == "redis.search")

assert span.attributes.get("redis.search.query") == query
assert span.attributes.get("redis.search.total") == 1
Loading