From c30968c46cb50954bf8cb57bb0652f5b3b7f8fa3 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Mon, 11 Nov 2024 13:38:37 -0800 Subject: [PATCH] Add subscriptions to test infra (draft) Signed-off-by: Daniel Henneberger --- .../generate/GraphqlSchemaFactory.java | 17 +- .../graphql/generate/GraphqlSchemaUtil.java | 54 ++- .../graphql/server/CustomScalars.java | 2 +- sqrl-server/sqrl-server-vertx/pom.xml | 4 + .../datasqrl/graphql/SqrlObjectMapper.java | 22 +- .../SubscriptionConfigurationImpl.java | 5 +- .../kafka/KafkaDataFetcherFactory.java | 2 +- .../sqrl-integration-tests/scheam.graphqls | 355 ++++++++++++++++++ .../java/com/datasqrl/FullUsecasesIT.java | 2 +- .../avro-schema/avro-schema/schema.avsc | 22 +- .../avro-schema/avro-schema/schema.jsonl | 4 +- .../avro-schema/avro-schema/schema.table.json | 9 +- .../MySchemaQuery.snapshot | 2 +- .../MySubscriptionQuery.snapshot | 1 + .../MySubscriptionQuery.graphql | 21 ++ .../datasqrl/compile/CompilationProcess.java | 8 + .../java/com/datasqrl/compile/TestPlan.java | 1 + .../com/datasqrl/compile/TestPlanner.java | 9 +- .../main/java/com/datasqrl/DatasqrlRun.java | 6 +- sqrl-tools/sqrl-test/pom.xml | 5 + .../main/java/com/datasqrl/DatasqrlTest.java | 277 +++++++++++--- 21 files changed, 723 insertions(+), 105 deletions(-) create mode 100644 sqrl-testing/sqrl-integration-tests/scheam.graphqls create mode 100644 sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySubscriptionQuery.snapshot create mode 100644 sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/tests-avro-schema/MySubscriptionQuery.graphql diff --git a/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaFactory.java b/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaFactory.java index 196325767..2e5bd9e38 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaFactory.java +++ b/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaFactory.java @@ -130,7 +130,7 @@ public Optional generate(ExecutionGoal goal) { GraphQLSchema.Builder builder = GraphQLSchema.newSchema() .query(queryType); if (goal != ExecutionGoal.TEST) { - if (logManager.hasLogEngine() && System.getenv().get("ENABLE_SUBSCRIPTIONS") != null) { + if (true) { Optional subscriptions = createSubscriptionTypes(schema); subscriptions.map(builder::subscription); } @@ -226,6 +226,7 @@ public Optional createSubscriptionTypes(SqrlSchema schema) { GraphQLFieldDefinition subscriptionField = GraphQLFieldDefinition.newFieldDefinition() .name(tableName) + .arguments(createArgumentsWithOptionalScalars(schema.getTableFunctions().get(0))) .type(createOutputTypeForRelDataType(table.getRowType(), NamePath.of(tableName), seen).get()) .build(); @@ -242,6 +243,20 @@ public Optional createSubscriptionTypes(SqrlSchema schema) { } + private List createArgumentsWithOptionalScalars(SqrlTableMacro field) { + if (!allowedArguments(field)) { + return List.of(); + } + + return field.getRowType().getFieldList().stream() + .filter(p -> getInputType(p.getType(), NamePath.of(p.getName()), seen).isPresent()) + .map(parameter -> GraphQLArgument.newArgument() + .name(parameter.getName()) + .type(getInputType(parameter.getType(), NamePath.of(parameter.getName()), seen).get()) // No nonNull here + .build()) + .collect(Collectors.toList()); + } + private GraphQLObjectType createQueryType(ExecutionGoal goal, List relationships) { List fields = new ArrayList<>(); diff --git a/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaUtil.java b/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaUtil.java index a59baaab6..f4f376c0b 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaUtil.java +++ b/sqrl-planner/src/main/java/com/datasqrl/graphql/generate/GraphqlSchemaUtil.java @@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.flink.table.planner.plan.schema.RawRelDataType; @Slf4j @@ -59,8 +60,15 @@ public static boolean isValidGraphQLName(String name) { } public static Optional getInputType(RelDataType type, NamePath namePath, Set seen) { - return getInOutType(type, namePath, seen) - .map(f->(GraphQLInputType)f); + if (type.getSqlTypeName() == SqlTypeName.ARRAY || type.getSqlTypeName() == SqlTypeName.MULTISET || + type.getSqlTypeName() == SqlTypeName.ROW + ) { + return Optional.empty(); // Exclude arrays + } + + Optional graphQLInputType = getInOutType(type, namePath, seen) + .map(f -> (GraphQLInputType) f); + return graphQLInputType; } public static Optional getOutputType(RelDataType type, NamePath namePath, Set seen) { @@ -185,18 +193,20 @@ private static String toName(NamePath namePath, String postfix) { public static Optional createOutputTypeForRelDataType(RelDataType type, NamePath namePath, Set seen) { + Optional outputType = getOutputType(type, namePath, seen); if (!type.isNullable()) { - return getOutputType(type, namePath, seen).map(GraphQLNonNull::nonNull); + return outputType.map(GraphQLNonNull::nonNull); } - return getOutputType(type, namePath, seen); + return outputType; } public static Optional createInputTypeForRelDataType(RelDataType type, NamePath namePath, Set seen) { if (namePath.getLast().isHidden()) return Optional.empty(); + Optional graphQLInputType = getGraphQLInputType(type, namePath, seen); if (!type.isNullable()) { - return getGraphQLInputType(type, namePath, seen).map(GraphQLNonNull::nonNull); + return graphQLInputType.map(GraphQLNonNull::nonNull); } - return getGraphQLInputType(type, namePath, seen); + return graphQLInputType; } private static Optional getGraphQLInputType(RelDataType type, NamePath namePath, Set seen) { @@ -220,7 +230,10 @@ private static Optional getGraphQLInputType(RelDataType type, return Optional.of(CustomScalars.DATE); case TIME: return Optional.of(CustomScalars.TIME); + case TIME_WITH_LOCAL_TIME_ZONE: + break; case TIMESTAMP: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return Optional.of(CustomScalars.DATETIME); case BINARY: case VARBINARY: @@ -233,9 +246,36 @@ private static Optional getGraphQLInputType(RelDataType type, return createGraphQLInputObjectType(type, namePath, seen); case MAP: return Optional.of(CustomScalars.JSON); + case INTERVAL_YEAR: + case INTERVAL_YEAR_MONTH: + case INTERVAL_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + case NULL: + case UNKNOWN: + case ANY: + case SYMBOL: + case MULTISET: + case DISTINCT: + case STRUCTURED: + case OTHER: + case CURSOR: + case COLUMN_LIST: + case DYNAMIC_STAR: + case GEOMETRY: + case SARG: default: - return Optional.empty(); // Unsupported types are omitted + break; } + return Optional.empty(); // Unsupported types are omitted } /** diff --git a/sqrl-server/sqrl-server-core/src/main/java/com/datasqrl/graphql/server/CustomScalars.java b/sqrl-server/sqrl-server-core/src/main/java/com/datasqrl/graphql/server/CustomScalars.java index ebca23cb0..d41687012 100644 --- a/sqrl-server/sqrl-server-core/src/main/java/com/datasqrl/graphql/server/CustomScalars.java +++ b/sqrl-server/sqrl-server-core/src/main/java/com/datasqrl/graphql/server/CustomScalars.java @@ -41,7 +41,7 @@ public Object parseLiteral(Object input) { .build(); - public static final GraphQLScalarType DATETIME = DateTimeScalar.INSTANCE; + public static final GraphQLScalarType DATETIME = ExtendedScalars.DateTime; public static final GraphQLScalarType DATE = ExtendedScalars.Date; public static final GraphQLScalarType TIME = ExtendedScalars.LocalTime; public static final GraphQLScalarType JSON = ExtendedScalars.Json; diff --git a/sqrl-server/sqrl-server-vertx/pom.xml b/sqrl-server/sqrl-server-vertx/pom.xml index f41905ff9..0ff60e56a 100644 --- a/sqrl-server/sqrl-server-vertx/pom.xml +++ b/sqrl-server/sqrl-server-vertx/pom.xml @@ -245,6 +245,10 @@ org.mockito mockito-core + + com.fasterxml.jackson.datatype + jackson-datatype-jdk8 + diff --git a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SqrlObjectMapper.java b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SqrlObjectMapper.java index b98fbf2ba..7fc036cf7 100644 --- a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SqrlObjectMapper.java +++ b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SqrlObjectMapper.java @@ -2,13 +2,27 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; public class SqrlObjectMapper { - public static final ObjectMapper mapper = new ObjectMapper(); - static { - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - mapper.registerModule(new JavaTimeModule()); + public static ObjectMapper mapper = createObjectMapper(); + public static ObjectMapper createObjectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + registerModules(objectMapper); + return objectMapper; } + + private static void registerModules(ObjectMapper mapper) { + SimpleModule module = new SimpleModule(); + + mapper.registerModule(new JavaTimeModule()) + .registerModule((new Jdk8Module()).configureAbsentsAsNulls(true)) + .registerModule(module) + .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + } diff --git a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SubscriptionConfigurationImpl.java b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SubscriptionConfigurationImpl.java index 42fd9baa2..0acc41d5c 100644 --- a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SubscriptionConfigurationImpl.java +++ b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/SubscriptionConfigurationImpl.java @@ -8,6 +8,7 @@ import com.datasqrl.graphql.config.ServerConfig; import com.datasqrl.graphql.io.SinkConsumer; +import com.datasqrl.graphql.kafka.JsonDeserializer; import com.datasqrl.graphql.kafka.KafkaDataFetcherFactory; import com.datasqrl.graphql.kafka.KafkaSinkConsumer; import com.datasqrl.graphql.postgres_log.PostgresDataFetcherFactory; @@ -83,8 +84,8 @@ public Map getSourceConfig() { Map conf = new HashMap<>(); conf.put(BOOTSTRAP_SERVERS_CONFIG, config.getEnvironmentVariable("PROPERTIES_BOOTSTRAP_SERVERS")); conf.put(GROUP_ID_CONFIG, UUID.randomUUID().toString()); - conf.put(KEY_DESERIALIZER_CLASS_CONFIG, "com.datasqrl.graphql.kafka.JsonDeserializer"); - conf.put(VALUE_DESERIALIZER_CLASS_CONFIG, "com.datasqrl.graphql.kafka.JsonDeserializer"); + conf.put(KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + conf.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); conf.put(AUTO_OFFSET_RESET_CONFIG, "latest"); return conf; } diff --git a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/kafka/KafkaDataFetcherFactory.java b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/kafka/KafkaDataFetcherFactory.java index 23787caac..3f54a43ad 100644 --- a/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/kafka/KafkaDataFetcherFactory.java +++ b/sqrl-server/sqrl-server-vertx/src/main/java/com/datasqrl/graphql/kafka/KafkaDataFetcherFactory.java @@ -26,7 +26,7 @@ public Publisher get(DataFetchingEnvironment env) throws Exception { } private boolean filterSubscription(Object data, Map args) { - if (args == null) { + if (args == null || args.isEmpty()) { return false; } for (Map.Entry filter : coords.getFilters().entrySet()) { diff --git a/sqrl-testing/sqrl-integration-tests/scheam.graphqls b/sqrl-testing/sqrl-integration-tests/scheam.graphqls new file mode 100644 index 000000000..6b23b42f5 --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/scheam.graphqls @@ -0,0 +1,355 @@ +"An RFC-3339 compliant Full Date Scalar" +scalar Date + +"An RFC-3339 compliant DateTime Scalar" +scalar DateTime + +"A JSON scalar" +scalar JSON + +"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`." +scalar LocalTime + +type MySchema { + event_time: DateTime! + nullableTimestampMillisField: DateTime + dateField: Date! + nullableDateField: Date + timeMillisField: LocalTime! + nullableTimeMillisField: LocalTime + timeMicrosField: LocalTime! + nullableTimeMicrosField: LocalTime + stringField: String! + nullableStringField: String + intField: Int! + nullableIntField: Int + longField: Float! + nullableLongField: Float + floatField: Float! + nullableFloatField: Float + doubleField: Float! + nullableDoubleField: Float + booleanField: Boolean! + nullableBooleanField: Boolean + enumField: String! + nullableEnumField: String + arrayField: [String]! + nullableArrayField: [Int] + decimalField: Float! + mapField: JSON! + nullableMapField: JSON + complexArrayField(limit: Int = 10, offset: Int = 0): [complexArrayField!] + multiNestedRecord: multiNestedRecord! + nestedRecord: nestedRecord! + nullableComplexArrayField(limit: Int = 10, offset: Int = 0): [nullableComplexArrayField!] + nullableNestedRecord: nullableNestedRecord +} + +type MySchemaResult { + event_time: DateTime! + nullableTimestampMillisField: DateTime + dateField: Date! + nullableDateField: Date + timeMillisField: LocalTime! + nullableTimeMillisField: LocalTime + timeMicrosField: LocalTime! + nullableTimeMicrosField: LocalTime + complexArrayField: [MySchema_complexArrayFieldResult]! + nullableComplexArrayField: [MySchema_nullableComplexArrayFieldResult] + multiNestedRecord: MySchema_multiNestedRecordResult! + stringField: String! + nullableStringField: String + intField: Int! + nullableIntField: Int + longField: Float! + nullableLongField: Float + floatField: Float! + nullableFloatField: Float + doubleField: Float! + nullableDoubleField: Float + booleanField: Boolean! + nullableBooleanField: Boolean + enumField: String! + nullableEnumField: String + arrayField: [String]! + nullableArrayField: [Int] + nestedRecord: MySchema_nestedRecordResult! + nullableNestedRecord: MySchema_nullableNestedRecordResult + decimalField: Float! + mapField: JSON! + nullableMapField: JSON +} + +type MySchema_ { + event_time: DateTime! + nullableTimestampMillisField: DateTime + dateField: Date! + nullableDateField: Date + timeMillisField: LocalTime! + nullableTimeMillisField: LocalTime + timeMicrosField: LocalTime! + nullableTimeMicrosField: LocalTime + stringField: String! + nullableStringField: String + intField: Int! + nullableIntField: Int + longField: Float! + nullableLongField: Float + floatField: Float! + nullableFloatField: Float + doubleField: Float! + nullableDoubleField: Float + booleanField: Boolean! + nullableBooleanField: Boolean + enumField: String! + nullableEnumField: String + arrayField: [String]! + nullableArrayField: [Int] + decimalField: Float! + mapField: JSON! + nullableMapField: JSON + complexArrayField(limit: Int = 10, offset: Int = 0): [complexArrayField!] + nestedRecord: nestedRecord! + nullableComplexArrayField(limit: Int = 10, offset: Int = 0): [nullableComplexArrayField!] + nullableNestedRecord: nullableNestedRecord +} + +type MySchema_complexArrayFieldResult { + itemFieldOne: Int! + itemFieldTwo: String! +} + +type MySchema_multiNestedRecordResult { + nestedLevelOne: MySchema_multiNestedRecord_nestedLevelOneResult! +} + +type MySchema_multiNestedRecord_nestedLevelOneResult { + levelOneField: String! + nestedLevelTwo: MySchema_multiNestedRecord_nestedLevelOne_nestedLevelTwoResult! +} + +type MySchema_multiNestedRecord_nestedLevelOne_nestedLevelTwoResult { + levelTwoField: Int! +} + +type MySchema_nestedRecordResult { + nestedStringField: String! + nestedIntField: Int! + nestedArrayField: [Float]! +} + +type MySchema_nullableComplexArrayFieldResult { + nullableItemFieldOne: Float + nullableItemFieldTwo: Boolean +} + +type MySchema_nullableNestedRecordResult { + nullableNestedStringField: String + nullableNestedLongField: Float +} + +type Query { + Schema(event_time: DateTime, limit: Int = 10, offset: Int = 0): [Schema!] + MySchema(event_time: DateTime, limit: Int = 10, offset: Int = 0): [MySchema!] +} + +type Schema { + event_time: DateTime! + nullableTimestampMillisField: DateTime + dateField: Date! + nullableDateField: Date + timeMillisField: LocalTime! + nullableTimeMillisField: LocalTime + timeMicrosField: LocalTime! + nullableTimeMicrosField: LocalTime + stringField: String! + nullableStringField: String + intField: Int! + nullableIntField: Int + longField: Float! + nullableLongField: Float + floatField: Float! + nullableFloatField: Float + doubleField: Float! + nullableDoubleField: Float + booleanField: Boolean! + nullableBooleanField: Boolean + enumField: String! + nullableEnumField: String + arrayField: [String]! + nullableArrayField: [Int] + decimalField: Float! + mapField: JSON! + nullableMapField: JSON + complexArrayField(limit: Int = 10, offset: Int = 0): [complexArrayField!] + multiNestedRecord: multiNestedRecord! + nestedRecord: nestedRecord! + nullableComplexArrayField(limit: Int = 10, offset: Int = 0): [nullableComplexArrayField!] + nullableNestedRecord: nullableNestedRecord +} + +type Schema_ { + event_time: DateTime! + nullableTimestampMillisField: DateTime + dateField: Date! + nullableDateField: Date + timeMillisField: LocalTime! + nullableTimeMillisField: LocalTime + timeMicrosField: LocalTime! + nullableTimeMicrosField: LocalTime + stringField: String! + nullableStringField: String + intField: Int! + nullableIntField: Int + longField: Float! + nullableLongField: Float + floatField: Float! + nullableFloatField: Float + doubleField: Float! + nullableDoubleField: Float + booleanField: Boolean! + nullableBooleanField: Boolean + enumField: String! + nullableEnumField: String + arrayField: [String]! + nullableArrayField: [Int] + decimalField: Float! + mapField: JSON! + nullableMapField: JSON + complexArrayField(limit: Int = 10, offset: Int = 0): [complexArrayField!] + nestedRecord: nestedRecord! + nullableComplexArrayField(limit: Int = 10, offset: Int = 0): [nullableComplexArrayField!] + nullableNestedRecord: nullableNestedRecord +} + +type Subscription { + MySchema(_uuid: String, event_time: DateTime, nullableTimestampMillisField: DateTime, dateField: Date, nullableDateField: Date, timeMillisField: LocalTime, nullableTimeMillisField: LocalTime, timeMicrosField: LocalTime, nullableTimeMicrosField: LocalTime, stringField: String, nullableStringField: String, intField: Int, nullableIntField: Int, longField: Float, nullableLongField: Float, floatField: Float, nullableFloatField: Float, doubleField: Float, nullableDoubleField: Float, booleanField: Boolean, nullableBooleanField: Boolean, enumField: String, nullableEnumField: String, decimalField: Float, mapField: JSON, nullableMapField: JSON): MySchemaResult! +} + +type complexArrayField { + itemFieldOne: Int! + itemFieldTwo: String! +} + +type complexArrayField_ { + itemFieldOne: Int! + itemFieldTwo: String! +} + +type complexArrayField__ { + itemFieldOne: Int! + itemFieldTwo: String! +} + +type complexArrayField___ { + itemFieldOne: Int! + itemFieldTwo: String! +} + +type multiNestedRecord { + nestedLevelOne: nestedLevelOne! +} + +type multiNestedRecord_ { + nestedLevelOne: nestedLevelOne! +} + +type nestedLevelOne { + levelOneField: String! + nestedLevelTwo: nestedLevelTwo! +} + +type nestedLevelOne_ { + levelOneField: String! + nestedLevelTwo: nestedLevelTwo! +} + +type nestedLevelOne__ { + levelOneField: String! + nestedLevelTwo: nestedLevelTwo! +} + +type nestedLevelOne___ { + levelOneField: String! + nestedLevelTwo: nestedLevelTwo! +} + +type nestedLevelTwo { + levelTwoField: Int! +} + +type nestedLevelTwo_ { + levelTwoField: Int! +} + +type nestedLevelTwo__ { + levelTwoField: Int! +} + +type nestedLevelTwo___ { + levelTwoField: Int! +} + +type nestedRecord { + nestedStringField: String! + nestedIntField: Int! + nestedArrayField: [Float]! +} + +type nestedRecord_ { + nestedStringField: String! + nestedIntField: Int! + nestedArrayField: [Float]! +} + +type nestedRecord__ { + nestedStringField: String! + nestedIntField: Int! + nestedArrayField: [Float]! +} + +type nestedRecord___ { + nestedStringField: String! + nestedIntField: Int! + nestedArrayField: [Float]! +} + +type nullableComplexArrayField { + nullableItemFieldOne: Float + nullableItemFieldTwo: Boolean +} + +type nullableComplexArrayField_ { + nullableItemFieldOne: Float + nullableItemFieldTwo: Boolean +} + +type nullableComplexArrayField__ { + nullableItemFieldOne: Float + nullableItemFieldTwo: Boolean +} + +type nullableComplexArrayField___ { + nullableItemFieldOne: Float + nullableItemFieldTwo: Boolean +} + +type nullableNestedRecord { + nullableNestedStringField: String + nullableNestedLongField: Float +} + +type nullableNestedRecord_ { + nullableNestedStringField: String + nullableNestedLongField: Float +} + +type nullableNestedRecord__ { + nullableNestedStringField: String + nullableNestedLongField: Float +} + +type nullableNestedRecord___ { + nullableNestedStringField: String + nullableNestedLongField: Float +} diff --git a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java b/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java index d4fdaddf9..35c734e3c 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java +++ b/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java @@ -248,7 +248,7 @@ public void testUseCase(UseCaseTestParameter param, TestInfo testInfo) { @MethodSource("useCaseProvider") @Disabled public void runTestNumber(UseCaseTestParameter param, TestInfo testInfo) { - int i = -1; + int i = 27; testNo++; System.out.println(testNo + ":" + param); if (i == testNo) { diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.avsc b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.avsc index ad1ebfe0c..37bd6e656 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.avsc +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.avsc @@ -4,7 +4,7 @@ "namespace": "com.example.avro", "fields": [ { - "name": "uuidField", + "name": "_uuid", "type": { "type": "string", "logicalType": "uuid" @@ -31,7 +31,7 @@ // "doc": "A nullable fixed-size 16-byte field" // }, { - "name": "timestampMillisField", + "name": "event_time", "type": { "type": "long", "logicalType": "timestamp-millis" @@ -47,23 +47,7 @@ "default": null, "doc": "A nullable timestamp with millisecond precision" }, - { - "name": "timestampMicrosField", - "type": { - "type": "long", - "logicalType": "timestamp-micros" - }, - "doc": "A timestamp with microsecond precision" - }, - { - "name": "nullableTimestampMicrosField", - "type": ["null", { - "type": "long", - "logicalType": "timestamp-micros" - }], - "default": null, - "doc": "A nullable timestamp with microsecond precision" - }, + { "name": "dateField", "type": { diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.jsonl b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.jsonl index b3d2ab8d8..2bb510847 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.jsonl +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.jsonl @@ -1,2 +1,2 @@ -{"uuidField": "123e4567-e89b-12d3-a456-426614174000", "fixedField": "YWFhYWFhYWFhYWFhYWFhYQ==", "nullableFixedField": null, "timestampMillisField": 1691490579, "nullableTimestampMillisField": null, "timestampMicrosField": 1691490579, "nullableTimestampMicrosField": null, "dateField": "2020-01-01", ",nullableDateField": null, "timeMillisField": "23:59:59.999999999", "nullableTimeMillisField": null, "timeMicrosField": "23:59:59.999999999", "nullableTimeMicrosField": null, "complexArrayField": [{"itemFieldOne": 1, "itemFieldTwo": "example"}], "nullableComplexArrayField": null, "multiNestedRecord": {"nestedLevelOne": {"levelOneField": "level1", "nestedLevelTwo": {"levelTwoField": 10}}}, "stringField": "example", "nullableStringField": null, "intField": 42, "nullableIntField": null, "longField": 123456789012345, "nullableLongField": null, "floatField": 3.14, "nullableFloatField": null, "doubleField": 2.718, "nullableDoubleField": null, "booleanField": true, "nullableBooleanField": null, "bytesField": "YWJjZGVmZw==", "nullableBytesField": null, "enumField": "ONE", "nullableEnumField": null, "arrayField": ["a", "b", "c"], "nullableArrayField": null, "nestedRecord": {"nestedStringField": "nested", "nestedIntField": 5, "nestedArrayField": [1.1, 2.2, 3.3]}, "nullableNestedRecord": null, "decimalField": "1234.56", "nullableDecimalField": null, "mapField": { "key1": "value1", "key2": "value2" }, "nullableMapField": null} -{"uuidField": "223e4567-e89b-12d3-a456-426614174001", "fixedField": "YmJiYmJiYmJiYmJiYmJiYg==", "nullableFixedField": "Y2NjY2NjY2NjY2NjY2NjYw==", "timestampMillisField": 1691590579, "nullableTimestampMillisField": 1691590579, "timestampMicrosField": 1691590579, "nullableTimestampMicrosField": 1691590579, "dateField": "2020-01-01", ",nullableDateField": 18995, "timeMillisField": "23:59:59.999999999", "nullableTimeMillisField": "23:59:59.999999999", "timeMicrosField": "23:59:59.999999999", "nullableTimeMicrosField": "23:59:59.999999999", "complexArrayField": [{"itemFieldOne": 2, "itemFieldTwo": "another"}], "nullableComplexArrayField": [{"nullableItemFieldOne": 3.5, "nullableItemFieldTwo": true}], "multiNestedRecord": {"nestedLevelOne": {"levelOneField": "nestedLevel1", "nestedLevelTwo": {"levelTwoField": 20}}}, "stringField": "example2", "nullableStringField": "optional", "intField": 84, "nullableIntField": 21, "longField": 98765432109876, "nullableLongField": 9876543210, "floatField": 6.28, "nullableFloatField": 1.57, "doubleField": 1.414, "nullableDoubleField": 0.707, "booleanField": false, "nullableBooleanField": true, "bytesField": "d2hhdGV2ZXI=", "nullableBytesField": "c29tZXRoaW5n", "enumField": "TWO", "nullableEnumField": "FOUR", "arrayField": ["x", "y", "z"], "nullableArrayField": [10, 20, 30], "nestedRecord": {"nestedStringField": "nestedExample", "nestedIntField": 15, "nestedArrayField": [4.4, 5.5]}, "nullableNestedRecord": {"nullableNestedStringField": "nullable", "nullableNestedLongField": 567890123456}, "decimalField": "7890.12", "nullableDecimalField": "3456.78", "mapField": { "keyA": "valueA", "keyB": "valueB" }, "nullableMapField": { "keyX": "valueX", "keyY": "valueY" }} +{"_uuid": "123e4567-e89b-12d3-a456-426614174000", "fixedField": "YWFhYWFhYWFhYWFhYWFhYQ==", "nullableFixedField": null, "event_time": 1691490579, "nullableTimestampMillisField": null, "timestampMicrosField": 1691490579, "nullableTimestampMicrosField": null, "dateField": "2020-01-01", ",nullableDateField": null, "timeMillisField": "23:59:59.999999999", "nullableTimeMillisField": null, "timeMicrosField": "23:59:59.999999999", "nullableTimeMicrosField": null, "complexArrayField": [{"itemFieldOne": 1, "itemFieldTwo": "example"}], "nullableComplexArrayField": null, "multiNestedRecord": {"nestedLevelOne": {"levelOneField": "level1", "nestedLevelTwo": {"levelTwoField": 10}}}, "stringField": "example", "nullableStringField": null, "intField": 42, "nullableIntField": null, "longField": 123456789012345, "nullableLongField": null, "floatField": 3.14, "nullableFloatField": null, "doubleField": 2.718, "nullableDoubleField": null, "booleanField": true, "nullableBooleanField": null, "bytesField": "YWJjZGVmZw==", "nullableBytesField": null, "enumField": "ONE", "nullableEnumField": null, "arrayField": ["a", "b", "c"], "nullableArrayField": null, "nestedRecord": {"nestedStringField": "nested", "nestedIntField": 5, "nestedArrayField": [1.1, 2.2, 3.3]}, "nullableNestedRecord": null, "decimalField": "1234.56", "nullableDecimalField": null, "mapField": { "key1": "value1", "key2": "value2" }, "nullableMapField": null} +{"_uuid": "223e4567-e89b-12d3-a456-426614174001", "fixedField": "YmJiYmJiYmJiYmJiYmJiYg==", "nullableFixedField": "Y2NjY2NjY2NjY2NjY2NjYw==", "event_time": 1691590579, "nullableTimestampMillisField": 1691590579, "timestampMicrosField": 1691590579, "nullableTimestampMicrosField": 1691590579, "dateField": "2020-01-01", ",nullableDateField": 18995, "timeMillisField": "23:59:59.999999999", "nullableTimeMillisField": "23:59:59.999999999", "timeMicrosField": "23:59:59.999999999", "nullableTimeMicrosField": "23:59:59.999999999", "complexArrayField": [{"itemFieldOne": 2, "itemFieldTwo": "another"}], "nullableComplexArrayField": [{"nullableItemFieldOne": 3.5, "nullableItemFieldTwo": true}], "multiNestedRecord": {"nestedLevelOne": {"levelOneField": "nestedLevel1", "nestedLevelTwo": {"levelTwoField": 20}}}, "stringField": "example2", "nullableStringField": "optional", "intField": 84, "nullableIntField": 21, "longField": 98765432109876, "nullableLongField": 9876543210, "floatField": 6.28, "nullableFloatField": 1.57, "doubleField": 1.414, "nullableDoubleField": 0.707, "booleanField": false, "nullableBooleanField": true, "bytesField": "d2hhdGV2ZXI=", "nullableBytesField": "c29tZXRoaW5n", "enumField": "TWO", "nullableEnumField": "FOUR", "arrayField": ["x", "y", "z"], "nullableArrayField": [10, 20, 30], "nestedRecord": {"nestedStringField": "nestedExample", "nestedIntField": 15, "nestedArrayField": [4.4, 5.5]}, "nullableNestedRecord": {"nullableNestedStringField": "nullable", "nullableNestedLongField": 567890123456}, "decimalField": "7890.12", "nullableDecimalField": "3456.78", "mapField": { "keyA": "valueA", "keyB": "valueB" }, "nullableMapField": { "keyX": "valueX", "keyY": "valueY" }} diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.table.json b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.table.json index bce85129a..20b13a909 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.table.json +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/avro-schema/schema.table.json @@ -3,13 +3,14 @@ "flink" : { "format" : "flexible-json", "path" : "${DATA_PATH}/schema.jsonl", - "connector" : "filesystem" + "connector" : "filesystem", + "source.monitor-interval" : 1 + }, "table" : { "type" : "source", - "primary-key" : ["uuidField", "timestampMillisField"], - "timestamp" : "timestampMillisField", + "primary-key" : ["_uuid", "event_time"], + "timestamp" : "event_time", "watermark-millis" : "0" } - } \ No newline at end of file diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySchemaQuery.snapshot b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySchemaQuery.snapshot index 0712103cc..370b45017 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySchemaQuery.snapshot +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySchemaQuery.snapshot @@ -1 +1 @@ -{"data":{"Schema":[{"uuidField":"223e4567-e89b-12d3-a456-426614174001","timestampMillisField":"2023-08-09T21:16:19.000Z","nullableTimestampMillisField":"2023-08-09T21:16:19.000Z","timestampMicrosField":"2023-08-09T21:16:19.000Z","nullableTimestampMicrosField":"2023-08-09T21:16:19.000Z","dateField":"2020-01-01","nullableDateField":null,"timeMillisField":"23:59:59","nullableTimeMillisField":"23:59:59","timeMicrosField":"23:59:59","nullableTimeMicrosField":"23:59:59","stringField":"example2","nullableStringField":"optional","intField":84,"nullableIntField":21,"longField":9.8765432109876E13,"nullableLongField":9.87654321E9,"floatField":6.28000021,"nullableFloatField":1.57000005,"doubleField":1.414,"nullableDoubleField":0.707,"booleanField":false,"nullableBooleanField":true,"enumField":"TWO","arrayField":["x","y","z"],"nullableArrayField":[10,20,30],"nullableEnumField":"FOUR","decimalField":7890.12,"complexArrayField":[{"itemFieldOne":2,"itemFieldTwo":"another"}],"multiNestedRecord":{"nestedLevelOne":{"levelOneField":"nestedLevel1","nestedLevelTwo":{"levelTwoField":20}}},"nullableComplexArrayField":[{"nullableItemFieldOne":3.5,"nullableItemFieldTwo":true}],"nestedRecord":{"nestedStringField":"nestedExample","nestedIntField":15,"nestedArrayField":[4.4,5.5]},"nullableNestedRecord":{"nullableNestedStringField":"nullable","nullableNestedLongField":5.67890123456E11},"mapField":{"keyA":"valueA","keyB":"valueB"},"nullableMapField":{"keyX":"valueX","keyY":"valueY"}},{"uuidField":"123e4567-e89b-12d3-a456-426614174000","timestampMillisField":"2023-08-08T17:29:39.000Z","nullableTimestampMillisField":null,"timestampMicrosField":"2023-08-08T17:29:39.000Z","nullableTimestampMicrosField":null,"dateField":"2020-01-01","nullableDateField":null,"timeMillisField":"23:59:59","nullableTimeMillisField":null,"timeMicrosField":"23:59:59","nullableTimeMicrosField":null,"stringField":"example","nullableStringField":null,"intField":42,"nullableIntField":null,"longField":1.23456789012345E14,"nullableLongField":null,"floatField":3.1400001,"nullableFloatField":null,"doubleField":2.718,"nullableDoubleField":null,"booleanField":true,"nullableBooleanField":null,"enumField":"ONE","arrayField":["a","b","c"],"nullableArrayField":null,"nullableEnumField":null,"decimalField":1234.56,"complexArrayField":[{"itemFieldOne":1,"itemFieldTwo":"example"}],"multiNestedRecord":{"nestedLevelOne":{"levelOneField":"level1","nestedLevelTwo":{"levelTwoField":10}}},"nullableComplexArrayField":null,"nestedRecord":{"nestedStringField":"nested","nestedIntField":5,"nestedArrayField":[1.1,2.2,3.3]},"nullableNestedRecord":null,"mapField":{"key1":"value1","key2":"value2"},"nullableMapField":null}]}} \ No newline at end of file +{"errors":[{"message":"Validation error (FieldUndefined@[Schema/uuidField]) : Field 'uuidField' in type 'Schema' is undefined","locations":[{"line":3,"column":5}],"extensions":{"classification":"ValidationError"}},{"message":"Validation error (FieldUndefined@[Schema/timestampMillisField]) : Field 'timestampMillisField' in type 'Schema' is undefined","locations":[{"line":4,"column":5}],"extensions":{"classification":"ValidationError"}},{"message":"Validation error (FieldUndefined@[Schema/timestampMicrosField]) : Field 'timestampMicrosField' in type 'Schema' is undefined","locations":[{"line":6,"column":5}],"extensions":{"classification":"ValidationError"}},{"message":"Validation error (FieldUndefined@[Schema/nullableTimestampMicrosField]) : Field 'nullableTimestampMicrosField' in type 'Schema' is undefined","locations":[{"line":7,"column":5}],"extensions":{"classification":"ValidationError"}}]} \ No newline at end of file diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySubscriptionQuery.snapshot b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySubscriptionQuery.snapshot new file mode 100644 index 000000000..63ef15f43 --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/snapshots-avro-schema/MySubscriptionQuery.snapshot @@ -0,0 +1 @@ +["{\"MySchema\":{\"dateField\":\"2020-01-01\",\"nullableDateField\":null,\"stringField\":\"example\",\"nullableStringField\":null,\"intField\":42,\"nullableIntField\":null,\"longField\":1.23456789012345E14,\"nullableLongField\":null,\"floatField\":3.14,\"nullableFloatField\":null,\"doubleField\":2.718,\"nullableDoubleField\":null,\"booleanField\":true,\"nullableBooleanField\":null,\"enumField\":\"ONE\",\"nullableEnumField\":null,\"decimalField\":1234.56}}","{\"MySchema\":{\"dateField\":\"2020-01-01\",\"nullableDateField\":null,\"stringField\":\"example2\",\"nullableStringField\":\"optional\",\"intField\":84,\"nullableIntField\":21,\"longField\":9.8765432109876E13,\"nullableLongField\":9.87654321E9,\"floatField\":6.28,\"nullableFloatField\":1.57,\"doubleField\":1.414,\"nullableDoubleField\":0.707,\"booleanField\":false,\"nullableBooleanField\":true,\"enumField\":\"TWO\",\"nullableEnumField\":\"FOUR\",\"decimalField\":7890.12}}"] \ No newline at end of file diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/tests-avro-schema/MySubscriptionQuery.graphql b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/tests-avro-schema/MySubscriptionQuery.graphql new file mode 100644 index 000000000..58e4c6895 --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/avro-schema/tests-avro-schema/MySubscriptionQuery.graphql @@ -0,0 +1,21 @@ +subscription MySchemaSubscription { + MySchema { + dateField + nullableDateField + stringField + nullableStringField + intField + nullableIntField + longField + nullableLongField + floatField + nullableFloatField + doubleField + nullableDoubleField + booleanField + nullableBooleanField + enumField + nullableEnumField + decimalField + } +} diff --git a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/CompilationProcess.java b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/CompilationProcess.java index e93926395..0402d1978 100644 --- a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/CompilationProcess.java +++ b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/CompilationProcess.java @@ -22,10 +22,12 @@ import com.datasqrl.plan.validate.ExecutionGoal; import com.datasqrl.plan.validate.ScriptPlanner; import com.google.inject.Inject; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; import lombok.AllArgsConstructor; +import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; @AllArgsConstructor(onConstructor_=@Inject) @@ -61,6 +63,7 @@ public Pair executeCompilation(Optional testsPath) planner.plan(mainScript, composite); postcompileHooks(); Optional source = inferencePostcompileHook.run(testsPath); + source.ifPresent(s->writeGraphql(s)); SqrlDAG dag = dagPlanner.planLogical(); PhysicalDAGPlan dagPlan = dagPlanner.planPhysical(dag); @@ -78,6 +81,11 @@ public Pair executeCompilation(Optional testsPath) return Pair.of(physicalPlan, testPlan); } + @SneakyThrows + private void writeGraphql(APISource s) { + Files.write(Path.of("scheam.graphqls"), s.getSchemaDefinition().getBytes(StandardCharsets.UTF_8)); + } + private void postcompileHooks() { createDatabaseQueries.run(); } diff --git a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlan.java b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlan.java index e6f7a76a9..aa978afa6 100644 --- a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlan.java +++ b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlan.java @@ -13,6 +13,7 @@ public class TestPlan { List queries; List mutations; + List subscriptions; @AllArgsConstructor @NoArgsConstructor diff --git a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlanner.java b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlanner.java index a9747687b..4f176dc1a 100644 --- a/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlanner.java +++ b/sqrl-tools/sqrl-packager/src/main/java/com/datasqrl/compile/TestPlanner.java @@ -52,6 +52,7 @@ public TestPlan generateTestPlan(APISource source, Optional testsPath) { Parser parser = new Parser(); List queries = new ArrayList<>(); List mutations = new ArrayList<>(); + List subscriptions = new ArrayList<>(); testsPath.ifPresent((p) -> { try (Stream paths = Files.walk(p)) { @@ -65,7 +66,7 @@ public TestPlan generateTestPlan(APISource source, Optional testsPath) { throw new RuntimeException(e); } Document document = parser.parseDocument(content); - extractQueriesAndMutations(document, queries, mutations, file.getFileName().toString().replace(".graphql", "")); + extractQueriesAndMutations(document, queries, mutations,subscriptions, file.getFileName().toString().replace(".graphql", "")); }); } catch (IOException e) { e.printStackTrace(); @@ -81,10 +82,10 @@ public TestPlan generateTestPlan(APISource source, Optional testsPath) { queries.add(new GraphqlQuery(definition1.getName(), AstPrinter.printAst(definition1))); } - return new TestPlan(queries, mutations); + return new TestPlan(queries, mutations, subscriptions); } - private void extractQueriesAndMutations(Document document, List queries, List mutations, String prefix) { + private void extractQueriesAndMutations(Document document, List queries, List mutations, List subscriptions, String prefix) { for (Definition definition : document.getDefinitions()) { if (definition instanceof OperationDefinition) { OperationDefinition operationDefinition = (OperationDefinition) definition; @@ -93,6 +94,8 @@ private void extractQueriesAndMutations(Document document, List qu queries.add(query); } else if (operationDefinition.getOperation() == Operation.MUTATION) { mutations.add(query); + } else if (operationDefinition.getOperation() == Operation.SUBSCRIPTION) { + subscriptions.add(query); } } } diff --git a/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java b/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java index 1d7cd340b..e44da2217 100644 --- a/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java +++ b/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java @@ -96,11 +96,7 @@ public TableResult run(boolean hold) { objectMapper.registerModule(module); startVertx(); - CompiledPlan plan = startFlink(); - execute = plan.execute(); - if (hold) { - execute.print(); - } + return execute; } diff --git a/sqrl-tools/sqrl-test/pom.xml b/sqrl-tools/sqrl-test/pom.xml index 39c226fd9..46ba5ab68 100644 --- a/sqrl-tools/sqrl-test/pom.xml +++ b/sqrl-tools/sqrl-test/pom.xml @@ -45,6 +45,11 @@ flink-table-planner_2.12 1.19.1 + + io.vertx + vertx-web-client + compile + diff --git a/sqrl-tools/sqrl-test/src/main/java/com/datasqrl/DatasqrlTest.java b/sqrl-tools/sqrl-test/src/main/java/com/datasqrl/DatasqrlTest.java index ab25888b0..d1ba6cef3 100644 --- a/sqrl-tools/sqrl-test/src/main/java/com/datasqrl/DatasqrlTest.java +++ b/sqrl-tools/sqrl-test/src/main/java/com/datasqrl/DatasqrlTest.java @@ -2,6 +2,13 @@ import com.datasqrl.util.FlinkOperatorStatusChecker; import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketClient; +import io.vertx.core.http.WebSocketConnectOptions; +import io.vertx.core.http.impl.WebSocketClientImpl; +import io.vertx.ext.web.client.WebClient; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -13,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -25,6 +33,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.table.api.CompiledPlan; import org.apache.flink.table.api.TableResult; public class DatasqrlTest { @@ -74,13 +83,29 @@ public int run() { Files.createDirectories(snapshotDir); } - //It is possible that no test plan exists, such as no test queries. // We will still let exports run, though we may want to replace them with blackhole sinks if (Files.exists(planPath.resolve("test.json"))) { TestPlan testPlan = objectMapper.readValue(planPath.resolve("test.json").toFile(), TestPlan.class); + + // Initialize subscriptions + List subscriptionClients = new ArrayList<>(); + List> subscriptionFutures = new ArrayList<>(); + + for (GraphqlQuery subscription : testPlan.getSubscriptions()) { + SubscriptionClient client = new SubscriptionClient(subscription.getName(), subscription.getQuery(), + "ws://localhost:8888/graphql"); + subscriptionClients.add(client); + CompletableFuture future = client.start(); + subscriptionFutures.add(future); + } + + // Wait for all subscriptions to be connected + CompletableFuture.allOf(subscriptionFutures.toArray(new CompletableFuture[0])).join(); + + // Execute mutations for (GraphqlQuery query : testPlan.getMutations()) { //Execute queries String data = executeQuery(query.getQuery()); @@ -89,69 +114,94 @@ public int run() { Path snapshotPath = snapshotDir.resolve(query.getName() + ".snapshot"); snapshot(snapshotPath, query.getName(), data, exceptions); } - } - - long delaySec = 30; - int requiredCheckpoints = 0; - //todo: fix get package json - Object testRunner = run.getPackageJson().get("test-runner"); - if (testRunner instanceof Map) { - Map testRunnerMap = (Map) testRunner; - Object o = testRunnerMap.get("delay-sec"); - if (o instanceof Number) { - delaySec = ((Number) o).longValue(); - } - Object c = testRunnerMap.get("required-checkpoints"); - if (c instanceof Number) { - requiredCheckpoints = ((Number) c).intValue(); + CompiledPlan plan = run.startFlink(); + result = plan.execute(); +// if (hold) { +// execute.print(); +// } + long delaySec = 30; + int requiredCheckpoints = 0; + //todo: fix get package json + Object testRunner = run.getPackageJson().get("test-runner"); + if (testRunner instanceof Map) { + Map testRunnerMap = (Map) testRunner; + Object o = testRunnerMap.get("delay-sec"); + if (o instanceof Number) { + delaySec = ((Number) o).longValue(); + } + Object c = testRunnerMap.get("required-checkpoints"); + if (c instanceof Number) { + requiredCheckpoints = ((Number) c).intValue(); + } } - } - if (delaySec == -1) { - FlinkOperatorStatusChecker flinkOperatorStatusChecker = new FlinkOperatorStatusChecker( - result.getJobClient().get().getJobID().toString(), requiredCheckpoints); - flinkOperatorStatusChecker.run(); - } else { - try { - for (int i = 0; i < delaySec; i++) { - //break early if job is done - try { - CompletableFuture jobStatusCompletableFuture = result.getJobClient() - .map(JobClient::getJobStatus).get(); - JobStatus status = jobStatusCompletableFuture.get(1, TimeUnit.SECONDS); - if (status == JobStatus.FAILED) { - exceptions.add(new JobFailureException()); - break; - } - - if (status == JobStatus.FINISHED || status == JobStatus.CANCELED) { + if (delaySec == -1) { + FlinkOperatorStatusChecker flinkOperatorStatusChecker = new FlinkOperatorStatusChecker( + result.getJobClient().get().getJobID().toString(), requiredCheckpoints); + flinkOperatorStatusChecker.run(); + } else { + try { + for (int i = 0; i < delaySec; i++) { + //break early if job is done + try { + CompletableFuture jobStatusCompletableFuture = result.getJobClient() + .map(JobClient::getJobStatus).get(); + JobStatus status = jobStatusCompletableFuture.get(1, TimeUnit.SECONDS); + if (status == JobStatus.FAILED) { + exceptions.add(new JobFailureException()); + break; + } + + if (status == JobStatus.FINISHED || status == JobStatus.CANCELED) { + break; + } + + } catch (Exception e) { break; } - } catch (Exception e) { - break; + Thread.sleep(1000); } - Thread.sleep(1000); + } catch (Exception e) { } + } - } catch (Exception e) { + outer:while (true) { + for (SubscriptionClient client : subscriptionClients) { + if (client.getMessages().isEmpty()) { + Thread.sleep(2000); + System.out.println("Sleep"); + } else { + break outer; + } + } } - } - - try { - JobExecutionResult jobExecutionResult = result.getJobClient().get().getJobExecutionResult() - .get(2, TimeUnit.SECONDS); //flink will hold if the minicluster is stopped - } catch (ExecutionException e) { - //try to catch the job failure if we can - exceptions.add(new JobFailureException(e)); - } catch (Exception e) { - } - if (Files.exists(planPath.resolve("test.json"))) { + // Stop subscriptions + for (SubscriptionClient client : subscriptionClients) { + client.stop(); + } - TestPlan testPlan = objectMapper.readValue(planPath.resolve("test.json").toFile(), - TestPlan.class); + // Collect messages and write to snapshots + for (SubscriptionClient client : subscriptionClients) { + List messages = client.getMessages(); + ObjectMapper om = new ObjectMapper(); + String data = om.writeValueAsString(messages); + Path snapshotPath = snapshotDir.resolve(client.getName() + ".snapshot"); + snapshot(snapshotPath, client.getName(), data, exceptions); + } + + try { + JobExecutionResult jobExecutionResult = result.getJobClient().get().getJobExecutionResult() + .get(2, TimeUnit.SECONDS); //flink will hold if the minicluster is stopped + } catch (ExecutionException e) { + //try to catch the job failure if we can + exceptions.add(new JobFailureException(e)); + } catch (Exception e) { + } + + // Execute queries for (GraphqlQuery query : testPlan.getQueries()) { //Execute queries String data = executeQuery(query.getQuery()); @@ -167,8 +217,11 @@ public int run() { List expectedSnapshotsMutations = testPlan.getMutations().stream() .map(f -> f.getName() + ".snapshot") .collect(Collectors.toList()); + List expectedSnapshotsSubscriptions = testPlan.getSubscriptions().stream() + .map(f -> f.getName() + ".snapshot") + .collect(Collectors.toList()); List expectedSnapshots = ListUtils.union(expectedSnapshotsQueries, - expectedSnapshotsMutations); + ListUtils.union(expectedSnapshotsMutations, expectedSnapshotsSubscriptions)); // Check all snapshots in the directory try (DirectoryStream directoryStream = Files.newDirectoryStream(snapshotDir, "*.snapshot")) { @@ -222,7 +275,6 @@ public int run() { return exitCode; } - @SneakyThrows private String executeQuery(String query) { HttpClient client = HttpClient.newHttpClient(); @@ -277,6 +329,7 @@ public static class TestPlan { List queries; List mutations; + List subscriptions; } @AllArgsConstructor @@ -287,4 +340,120 @@ public static class GraphqlQuery { String name; String query; } + + // Simplified example WebSocket client code using Vert.x + public static class SubscriptionClient { + private final String name; + private final String query; + private final List messages = new ArrayList<>(); + private WebSocket webSocket; + private final Vertx vertx = Vertx.vertx(); + private final CompletableFuture connectedFuture = new CompletableFuture<>(); + + public SubscriptionClient(String name, String query, String endpoint) { + this.name = name; + this.query = query; + } + + public CompletableFuture start() { + Future connect = vertx.createWebSocketClient() + .connect(8888, "localhost", "/graphql"); + + connect.onSuccess(ws -> { + this.webSocket = ws; + System.out.println("WebSocket opened for subscription: " + name); + + // Set a message handler for incoming messages + ws.textMessageHandler(this::handleTextMessage); + + // Send initial connection message + sendConnectionInit(); + connectedFuture.complete(null); + }).onFailure(throwable -> { + throwable.printStackTrace(); + System.err.println("Failed to open WebSocket for subscription: " + name); + connectedFuture.completeExceptionally(throwable); + }); + + return connectedFuture; + } + + private void sendConnectionInit() { + Map message = Map.of("type", "connection_init"); + sendMessage(message); + } + + private void sendSubscribe() { + Map payload = Map.of("query", query); + Map message = Map.of( + "id", "1", + "type", "subscribe", + "payload", payload + ); + sendMessage(message); + } + + private void sendMessage(Map message) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(message); + System.out.println("Sending: "+ json); + webSocket.writeTextMessage(json); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void handleTextMessage(String data) { + // Handle the incoming messages + System.out.println("Data: "+data); + try { + ObjectMapper objectMapper = new ObjectMapper(); + Map message = objectMapper.readValue(data, Map.class); + String type = (String) message.get("type"); + + if ("connection_ack".equals(type)) { + // Connection acknowledged, send the subscribe message + sendSubscribe(); + } else if ("next".equals(type)) { + // Data message received + Map payload = (Map) message.get("payload"); + Map dataPayload = (Map) payload.get("data"); + String dataStr = objectMapper.writeValueAsString(dataPayload); + messages.add(dataStr); + } else if ("complete".equals(type)) { + // Subscription complete + } else if ("error".equals(type)) { + // Handle error + System.err.println("Error message received: " + data); + throw new RuntimeException("Error"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void stop() { + // Send 'complete' message to close the subscription properly + Map message = Map.of( + "id", "1", + "type", "complete" + ); + sendMessage(message); + + // Close WebSocket + if (webSocket != null) { + webSocket.close(); + } + vertx.close(); + } + + public List getMessages() { + return messages; + } + + public String getName() { + return name; + } + } }