Skip to content

Commit

Permalink
feat: Add spanId attribute for span-log join query (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh authored Apr 29, 2021
1 parent 287f57e commit e6a3738
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

import static io.reactivex.rxjava3.core.Single.zip;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.experimental.Accessors;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString;
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder;
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType;
import org.hypertrace.core.graphql.common.utils.Converter;
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
import org.hypertrace.core.graphql.span.request.SpanRequest;
import org.hypertrace.gateway.service.v1.common.Expression;
import org.hypertrace.gateway.service.v1.common.Filter;
Expand All @@ -29,21 +33,28 @@ class SpanLogEventRequestBuilder {
private final Converter<Collection<AttributeRequest>, Set<Expression>> attributeConverter;
private final Converter<Collection<AttributeAssociation<FilterArgument>>, Filter> filterConverter;
private final FilterRequestBuilder filterRequestBuilder;
private final AttributeStore attributeStore;
private final AttributeRequestBuilder attributeRequestBuilder;

@Inject
SpanLogEventRequestBuilder(
Converter<Collection<AttributeRequest>, Set<Expression>> attributeConverter,
Converter<Collection<AttributeAssociation<FilterArgument>>, Filter> filterConverter,
FilterRequestBuilder filterRequestBuilder) {
FilterRequestBuilder filterRequestBuilder,
AttributeStore attributeStore,
AttributeRequestBuilder attributeRequestBuilder) {
this.attributeConverter = attributeConverter;
this.filterConverter = filterConverter;
this.filterRequestBuilder = filterRequestBuilder;
this.attributeStore = attributeStore;
this.attributeRequestBuilder = attributeRequestBuilder;
}

Single<LogEventsRequest> buildLogEventsRequest(
SpanRequest gqlRequest, SpansResponse spansResponse) {
return zip(
this.attributeConverter.convert(gqlRequest.logEventAttributes()),
getRequestAttributes(
gqlRequest.spanEventsRequest().context(), gqlRequest.logEventAttributes()),
buildLogEventsQueryFilter(gqlRequest, spansResponse).flatMap(filterConverter::convert),
(selections, filter) ->
LogEventsRequest.newBuilder()
Expand All @@ -56,6 +67,20 @@ Single<LogEventsRequest> buildLogEventsRequest(
.build());
}

private Single<Set<Expression>> getRequestAttributes(
GraphQlRequestContext requestContext, Collection<AttributeRequest> logEventAttributes) {
return this.attributeStore
.getForeignIdAttribute(
requestContext,
HypertraceCoreAttributeScopeString.LOG_EVENT,
HypertraceCoreAttributeScopeString.SPAN)
.map(attributeRequestBuilder::buildForAttribute)
.toObservable()
.mergeWith(Observable.fromIterable(logEventAttributes))
.collect(Collectors.toSet())
.flatMap(attributeConverter::convert);
}

private Single<List<AttributeAssociation<FilterArgument>>> buildLogEventsQueryFilter(
SpanRequest gqlRequest, SpansResponse spansResponse) {
List<String> spanIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.experimental.Accessors;
import org.hypertrace.core.graphql.attributes.AttributeModel;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
Expand Down Expand Up @@ -43,29 +44,44 @@ Single<SpanLogEventsResponse> buildResponse(
HypertraceCoreAttributeScopeString.LOG_EVENT,
HypertraceCoreAttributeScopeString.SPAN)
.flatMap(
spanId ->
buildResponse(spanId.key(), attributeRequests, spansResponse, logEventsResponse));
spanId -> buildResponse(spanId, attributeRequests, spansResponse, logEventsResponse));
}

private Single<SpanLogEventsResponse> buildResponse(
String foreignIdAttribute,
AttributeModel foreignIdAttribute,
Collection<AttributeRequest> attributeRequests,
SpansResponse spansResponse,
LogEventsResponse logEventsResponse) {
return Observable.fromIterable(logEventsResponse.getLogEventsList())
.concatMapSingle(
logEventsResponseVar -> this.convert(attributeRequests, logEventsResponseVar))
.collect(Collectors.groupingBy(logEvent -> (String) logEvent.attribute(foreignIdAttribute)))
logEventsResponseVar ->
this.convert(foreignIdAttribute, attributeRequests, logEventsResponseVar))
.collect(
Collectors.groupingBy(
SpanLogEventPair::spanId,
Collectors.mapping(SpanLogEventPair::logEvent, Collectors.toList())))
.map(
spanIdVsLogEventsMap -> new SpanLogEventsResponse(spansResponse, spanIdVsLogEventsMap));
}

private Single<LogEvent> convert(
private Single<SpanLogEventPair> convert(
AttributeModel foreignIdAttribute,
Collection<AttributeRequest> request,
org.hypertrace.gateway.service.v1.log.events.LogEvent logEvent) {
return this.attributeMapConverter
.convert(request, logEvent.getAttributesMap())
.map(ConvertedLogEvent::new);
.map(
attributeMap ->
new SpanLogEventPair(
logEvent.getAttributesMap().get(foreignIdAttribute.id()).getString(),
new ConvertedLogEvent(attributeMap)));
}

@lombok.Value
@Accessors(fluent = true)
private static class SpanLogEventPair {
String spanId;
LogEvent logEvent;
}

@lombok.Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.inject.AbstractModule;
import com.google.inject.Guice;
Expand All @@ -25,12 +27,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.hypertrace.core.graphql.attributes.AttributeModel;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder;
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
import org.hypertrace.core.graphql.common.request.ResultSetRequest;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.core.graphql.common.utils.Converter;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultAttributeRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultResultSetRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultSpanRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultTimeRange;
Expand All @@ -54,6 +60,10 @@ class SpanLogEventRequestBuilderTest {

@Mock private FilterRequestBuilder filterRequestBuilder;

@Mock private AttributeStore attributeStore;

@Mock private AttributeRequestBuilder attributeRequestBuilder;

private SpanLogEventRequestBuilder spanLogEventRequestBuilder;

@BeforeEach
Expand Down Expand Up @@ -82,11 +92,13 @@ protected void configure() {
new TypeLiteral<Converter<Collection<AttributeRequest>, Set<Expression>>>() {}));

spanLogEventRequestBuilder =
new SpanLogEventRequestBuilder(attributeConverter, filterConverter, filterRequestBuilder);
}
new SpanLogEventRequestBuilder(
attributeConverter,
filterConverter,
filterRequestBuilder,
attributeStore,
attributeRequestBuilder);

@Test
void testBuildRequest() {
doAnswer(
invocation -> {
Set<FilterArgument> filterArguments = invocation.getArgument(2, Set.class);
Expand All @@ -103,6 +115,21 @@ void testBuildRequest() {
.when(filterRequestBuilder)
.build(any(), any(), anyCollection());

when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString()))
.thenReturn(Single.just(spanIdAttribute.attribute()));

doAnswer(
invocation -> {
AttributeModel attributeModel = invocation.getArgument(0, AttributeModel.class);
return new DefaultAttributeRequest(attributeModel);
})
.when(attributeRequestBuilder)
.buildForAttribute(any());
}

@Test
void testBuildRequest() {

long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis();

Expand Down Expand Up @@ -149,7 +176,58 @@ void testBuildRequest() {
assertEquals(
Set.of("attributes", "traceId", "spanId"),
logEventsRequest.getSelectionList().stream()
.map(v -> v.getColumnIdentifier().getColumnName())
.map(expression -> expression.getColumnIdentifier().getColumnName())
.collect(Collectors.toSet()));
}

@Test
void testBuildRequest_addSpanId() {
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis();

Collection<AttributeRequest> logAttributeRequests = List.of(traceIdAttribute);
ResultSetRequest resultSetRequest =
new DefaultResultSetRequest(
null,
List.of(DaoTestUtil.eventIdAttribute),
new DefaultTimeRange(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)),
DaoTestUtil.eventIdAttribute,
0,
0,
List.of(),
Collections.emptyList(),
Optional.empty());
SpanRequest spanRequest = new DefaultSpanRequest(resultSetRequest, logAttributeRequests);

LogEventsRequest logEventsRequest =
spanLogEventRequestBuilder.buildLogEventsRequest(spanRequest, spansResponse).blockingGet();

assertEquals(Operator.IN, logEventsRequest.getFilter().getChildFilter(0).getOperator());
assertEquals(
spanIdAttribute.attribute().id(),
logEventsRequest
.getFilter()
.getChildFilter(0)
.getLhs()
.getColumnIdentifier()
.getColumnName());
assertEquals(
List.of("span1", "span2", "span3"),
logEventsRequest
.getFilter()
.getChildFilter(0)
.getRhs()
.getLiteral()
.getValue()
.getStringArrayList()
.stream()
.collect(Collectors.toList()));
assertEquals(startTime, logEventsRequest.getStartTimeMillis());
assertEquals(endTime, logEventsRequest.getEndTimeMillis());
assertEquals(
Set.of("traceId", "spanId"),
logEventsRequest.getSelectionList().stream()
.map(expression -> expression.getColumnIdentifier().getColumnName())
.collect(Collectors.toSet()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,37 @@ void testBuildResponse() {
Map<String, Value> map = invocation.getArgument(1, Map.class);
return Single.just(
map.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().getString())));
.collect(
Collectors.toMap(
Entry::getKey, valueEntry -> valueEntry.getValue().getString())));
})
.when(attributeMapConverter)
.convert(anyCollection(), anyMap());

SpanLogEventsResponse response =
spanLogEventResponseConverter
.buildResponse(requestContext, attributeRequests, spansResponse, logEventsResponse)
.blockingGet();

assertEquals(spansResponse, response.spansResponse());
assertEquals(Set.of("span1", "span2"), response.spanIdToLogEvents().keySet());
}

@Test
void testBuildResponse_spanIdNotRequested() {
Collection<AttributeRequest> attributeRequests = List.of(traceIdAttribute, attributesAttribute);

when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString()))
.thenReturn(Single.just(spanIdAttribute.attribute()));

doAnswer(
invocation -> {
Map<String, Value> map = invocation.getArgument(1, Map.class);
return Single.just(
map.entrySet().stream()
.collect(
Collectors.toMap(
Entry::getKey, valueEntry -> valueEntry.getValue().getString())));
})
.when(attributeMapConverter)
.convert(anyCollection(), anyMap());
Expand Down

0 comments on commit e6a3738

Please sign in to comment.