Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional attributes for redis.search methods create_index, search #2635

Merged
merged 26 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
191ef9d
Add wrappers for redis.search methods create_index, search, aggregate
mpozniak95 Jun 25, 2024
15e09d3
Skip creating span for redisearch methods in _traced_execute_command
mpozniak95 Jun 26, 2024
4cb2415
Reformatted file
mpozniak95 Jun 27, 2024
16d99e0
Add additional functional tests
mpozniak95 Jun 27, 2024
c53b08e
Merge remote-tracking branch 'origin/main' into redis_additional_attr…
mpozniak95 Aug 12, 2024
fd52a83
Merge branch 'main' into redis_additional_attributes
lzchen Aug 14, 2024
5507098
Change the span kind to client, add db.system attributes to added met…
mpozniak95 Aug 26, 2024
da7ae34
Reformat files
mpozniak95 Aug 26, 2024
5a1a1d5
Merge branch 'main' into redis_additional_attributes
lzchen Aug 26, 2024
748022a
Reformat files
mpozniak95 Aug 28, 2024
adc4263
Merge branch 'open-telemetry:main' into redis_additional_attributes
mpozniak95 Aug 28, 2024
59b7c91
Remove aggregate, and skip function, edit attributes in execute command
mpozniak95 Aug 28, 2024
7b5bd19
Merge branch 'main' into redis_additional_attributes
lzchen Aug 28, 2024
a9e5afc
Fix linter problems
mpozniak95 Aug 29, 2024
2ad9768
Merge remote-tracking branch 'origin/main' into redis_additional_attr…
mpozniak95 Aug 30, 2024
8c6c75d
Change name to _set_span_attribute_if_value, use constant for _FIELD_…
mpozniak95 Aug 30, 2024
23b2736
Remove useless return
mpozniak95 Aug 30, 2024
6bf2b22
Merge branch 'main' into redis_additional_attributes
lzchen Sep 4, 2024
ae5f1a3
Change docker image to redis-stack for redis docker tests
mpozniak95 Sep 16, 2024
87374e4
Merge branch 'main' into redis_additional_attributes
lzchen Sep 16, 2024
fb3ba66
Add span.is_recording check before adding additional parameters to spans
mpozniak95 Sep 17, 2024
c3fc628
Merge branch 'main' into redis_additional_attributes
lzchen Sep 18, 2024
cc52746
Add entry to changelog
mpozniak95 Sep 19, 2024
f889861
Merge branch 'main' into redis_additional_attributes
lzchen Sep 23, 2024
c1d0d7f
Merge remote-tracking branch 'origin/main' into redis_additional_attr…
mpozniak95 Oct 1, 2024
6a8bc22
Merge branch 'main' into redis_additional_attributes
lzchen Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,18 @@ def response_hook(span, instance, response):
from typing import Any, Collection

import redis
import redis.commands
from wrapt import wrap_function_wrapper

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.redis.package import _instruments
from opentelemetry.instrumentation.redis.util import (
_args_or_none,
_check_skip,
_extract_conn_attributes,
_format_command_args,
_set_span_attribute,
)
from opentelemetry.instrumentation.redis.version import __version__
from opentelemetry.instrumentation.utils import unwrap
Expand Down Expand Up @@ -181,7 +185,8 @@ def _instrument(
def _traced_execute_command(func, instance, args, kwargs):
query = _format_command_args(args)
name = _build_span_name(instance, args)

if _check_skip(name):
return func(*args, **kwargs)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
Expand All @@ -202,9 +207,9 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)

exception = None

if _check_skip(span_name):
return func(*args, **kwargs)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
Expand All @@ -230,6 +235,59 @@ def _traced_execute_pipeline(func, instance, args, kwargs):

return response

def _traced_create_index(func, instance, args, kwargs):
span_name = "redis.create_index"
with tracer.start_as_current_span(span_name) as span:
_set_span_attribute(
span,
"redis.create_index.fields",
kwargs.get("fields").__str__(),
)
_set_span_attribute(
span,
"redis.create_index.definition",
kwargs.get("definition").__str__(),
)
response = func(*args, **kwargs)
return response

def _traced_search(func, instance, args, kwargs):
span_name = "redis.search"
with tracer.start_as_current_span(span_name) as span:
query = kwargs.get("query") or _args_or_none(args, 0)
_set_span_attribute(
span,
"redis.commands.search.query",
query.query_string(),
)
response = func(*args, **kwargs)
_set_span_attribute(
span, "redis.commands.search.total", response.total
)
_set_span_attribute(
span, "redis.commands.search.duration", response.duration
)
for index, doc in enumerate(response.docs):
_set_span_attribute(
span, f"redis.commands.search.xdoc_{index}", doc.__str__()
)
return response

def _traced_aggregate(func, instance, args, kwargs):
span_name = "redis.aggregate"
with tracer.start_as_current_span(span_name) as span:
query = kwargs.get("query") or _args_or_none(args, 0)
_set_span_attribute(
span,
"redis.commands.aggregate.query",
query._query,
)
response = func(*args, **kwargs)
_set_span_attribute(
span, "redis.commands.aggregate.results", str(response.rows)
)
return response

pipeline_class = (
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
)
Expand All @@ -248,6 +306,21 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.create_index",
_traced_create_index,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.search",
_traced_search,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.aggregate",
_traced_aggregate,
)
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.cluster",
Expand Down Expand Up @@ -371,6 +444,9 @@ def _instrument(self, **kwargs):
)

def _uninstrument(self, **kwargs):
unwrap(redis.commands.search.Search, "create_index")
unwrap(redis.commands.search.Search, "search")
unwrap(redis.commands.search.Search, "aggregate")
if redis.VERSION < (3, 0, 0):
unwrap(redis.StrictRedis, "execute_command")
unwrap(redis.StrictRedis, "pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,25 @@ def _format_command_args(args):
out_str = ""

return out_str


def _check_skip(name):
skip_list = ["FT.SEARCH", "FT.CREATE", "FT.AGGREGATE"]
lzchen marked this conversation as resolved.
Show resolved Hide resolved
for method in skip_list:
if method in name:
return True
return False


def _set_span_attribute(span, name, value):
xrmx marked this conversation as resolved.
Show resolved Hide resolved
if value is not None:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
if value != "":
span.set_attribute(name, value)
return


def _args_or_none(args, n):
try:
return args[n]
except IndexError:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
import redis
import redis.asyncio

from redis.exceptions import ResponseError
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.aggregation import AggregateRequest
from redis.commands.search.query import Query
from redis.commands.search.field import (
TextField,
VectorField,
)

from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -614,3 +623,72 @@ def test_get(self):
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET ?"
)


class TestRedisearchInstrument(TestBase):
def setUp(self):
super().setUp()
self.redis_client = redis.Redis(port=6379)
self.redis_client.flushall()
self.embedding_dim = 256
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
self.prepare_data()
self.create_index()

def tearDown(self):
RedisInstrumentor().uninstrument()
super().tearDown()

def prepare_data(self):
try:
self.redis_client.ft("idx:test_vss").dropindex(True)
except ResponseError:
print("No such index")
item = {"name": "test",
"value": "test_value",
"embeddings": [0.1] * 256}
pipeline = self.redis_client.pipeline()
pipeline.json().set(f"test:001", "$", item)
res = pipeline.execute()
assert False not in res

def create_index(self):
schema = (
TextField("$.name", no_stem=True, as_name="name"),
TextField("$.value", no_stem=True, as_name="value"),
VectorField("$.embeddings",
"FLAT",
{
"TYPE": "FLOAT32",
"DIM": self.embedding_dim,
"DISTANCE_METRIC": "COSINE",
},
as_name="vector",),
)
definition = IndexDefinition(prefix=["test:"], index_type=IndexType.JSON)
res = self.redis_client.ft("idx:test_vss").create_index(fields=schema, definition=definition)
assert "OK" in str(res)

def test_redis_create_index(self):
spans = self.memory_exporter.get_finished_spans()
span = next(span for span in spans if span.name == "redis.create_index")
assert "redis.create_index.definition" in span.attributes
assert "redis.create_index.fields" in span.attributes

def test_redis_aggregate(self):
query = "*"
self.redis_client.ft("idx:test_vss").aggregate(AggregateRequest(query).load())
spans = self.memory_exporter.get_finished_spans()
span = next(span for span in spans if span.name == "redis.aggregate")
assert span.attributes.get("redis.commands.aggregate.query") == query
assert "redis.commands.aggregate.results" in span.attributes

def test_redis_query(self):
query = "@name:test"
res = self.redis_client.ft("idx:test_vss").search(Query(query))

spans = self.memory_exporter.get_finished_spans()
span = next(span for span in spans if span.name == "redis.search")

assert span.attributes.get("redis.commands.search.query") == query
assert span.attributes.get("redis.commands.search.total") == 1
Loading