diff --git a/backend/components/flink-connector/Dockerfile.result-sender b/backend/components/flink-connector/Dockerfile.result-sender
new file mode 100644
index 000000000..6580476fa
--- /dev/null
+++ b/backend/components/flink-connector/Dockerfile.result-sender
@@ -0,0 +1,16 @@
+FROM golang:1.17
+
+WORKDIR /app
+
+COPY ./src/types.go ./src/tools.go ./src/result-sender.go ./
+
+RUN go mod init main && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
+ go get golang.org/x/net
+
+RUN go build -o app
+
+CMD ["./app"]
diff --git a/backend/components/flink-connector/Dockerfile.statements-executor b/backend/components/flink-connector/Dockerfile.statements-executor
new file mode 100644
index 000000000..3280b144f
--- /dev/null
+++ b/backend/components/flink-connector/Dockerfile.statements-executor
@@ -0,0 +1,16 @@
+FROM golang:1.17
+
+WORKDIR /app
+
+COPY ./src/types.go ./src/tools.go ./src/statements-executor.go ./
+
+RUN go mod init main && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
+ go get golang.org/x/net
+
+RUN go build -o app
+
+CMD ["./app"]
diff --git a/backend/components/flink-connector/Makefile b/backend/components/flink-connector/Makefile
new file mode 100644
index 000000000..1e7dcec8d
--- /dev/null
+++ b/backend/components/flink-connector/Makefile
@@ -0,0 +1,13 @@
+build-statements-executor:
+ docker build -t flink-connector/statements-executor -f Dockerfile.statements-executor .
+
+release-statements-executor: build-statements-executor
+ docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release
+ docker push ghcr.io/airyhq/connectors/flink/statements-executor:release
+
+build-result-sender:
+ docker build -t flink-connector/result-sender -f Dockerfile.result-sender .
+
+release-result-sender: build-result-sender
+ docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
+ docker push ghcr.io/airyhq/connectors/flink/result-sender:release
diff --git a/backend/components/flink-connector/helm/BUILD b/backend/components/flink-connector/helm/BUILD
new file mode 100644
index 000000000..45805d5f1
--- /dev/null
+++ b/backend/components/flink-connector/helm/BUILD
@@ -0,0 +1,3 @@
+load("//tools/build:helm.bzl", "helm_ruleset_core_version")
+
+helm_ruleset_core_version()
diff --git a/backend/components/flink-connector/helm/Chart.yaml b/backend/components/flink-connector/helm/Chart.yaml
new file mode 100644
index 000000000..4eb993fb2
--- /dev/null
+++ b/backend/components/flink-connector/helm/Chart.yaml
@@ -0,0 +1,6 @@
+
+apiVersion: v2
+appVersion: "1.0"
+description: Flink connector
+name: flink-connector
+version: 1.0
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/configmap.yaml b/backend/components/flink-connector/helm/templates/configmap.yaml
new file mode 100644
index 000000000..d8301a65d
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/configmap.yaml
@@ -0,0 +1,10 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ .Values.component }}
+ labels:
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: "{{ .Values.component }}"
+ annotations:
+ core.airy.co/enabled: "{{ .Values.enabled }}"
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml
new file mode 100644
index 000000000..0f50fc554
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml
@@ -0,0 +1,50 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ labels:
+ app: {{ .Values.component }}
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: {{ .Values.component }}
+spec:
+ replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
+ selector:
+ matchLabels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ spec:
+ containers:
+ - name: app
+ image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
+ imagePullPolicy: Always
+ envFrom:
+ - configMapRef:
+ name: security
+ - configMapRef:
+ name: kafka-config
+ - configMapRef:
+ name: {{ .Values.component }}
+ env:
+ - name: KAFKA_TOPIC_NAME
+ value: {{ .Values.resultSender.topic }}
+ - name: API_COMMUNICATION_URL
+ value: {{ .Values.apiCommunicationUrl }}
+ livenessProbe:
+ httpGet:
+ path: /actuator/health
+ port: {{ .Values.port }}
+ httpHeaders:
+ - name: Health-Check
+ value: health-check
+ initialDelaySeconds: 43200
+ periodSeconds: 10
+ failureThreshold: 3
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/result-sender/service.yaml b/backend/components/flink-connector/helm/templates/result-sender/service.yaml
new file mode 100644
index 000000000..cdf73d72b
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/result-sender/service.yaml
@@ -0,0 +1,15 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ labels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+spec:
+ type: ClusterIP
+ clusterIP: None
+ ports:
+ - name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ port: 80
+ targetPort: {{ .Values.port }}
+ selector:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml
new file mode 100644
index 000000000..44c7b6e59
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml
@@ -0,0 +1,50 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: {{ .Values.component }}-{{ .Values.executor.name }}
+ labels:
+ app: {{ .Values.component }}
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: {{ .Values.component }}
+spec:
+ replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
+ selector:
+ matchLabels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+ spec:
+ containers:
+ - name: app
+ image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
+ imagePullPolicy: Always
+ envFrom:
+ - configMapRef:
+ name: security
+ - configMapRef:
+ name: kafka-config
+ - configMapRef:
+ name: {{ .Values.component }}
+ env:
+ - name: KAFKA_TOPIC_NAME
+ value: {{ .Values.executor.topic }}
+ - name: FLINK_GATEWAY_URL
+ value: {{ .Values.gatewayUrl }}
+ livenessProbe:
+ httpGet:
+ path: /actuator/health
+ port: {{ .Values.port }}
+ httpHeaders:
+ - name: Health-Check
+ value: health-check
+ initialDelaySeconds: 43200
+ periodSeconds: 10
+ failureThreshold: 3
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/statements-executor/service.yaml b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml
new file mode 100644
index 000000000..3e5fbfc30
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml
@@ -0,0 +1,16 @@
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.component }}-{{ .Values.executor.name }}
+ labels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+spec:
+ type: ClusterIP
+ clusterIP: None
+ ports:
+ - name: {{ .Values.component }}-{{ .Values.executor.name }}
+ port: 80
+ targetPort: {{ .Values.port }}
+ selector:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/values.yaml b/backend/components/flink-connector/helm/values.yaml
new file mode 100644
index 000000000..71f4475a3
--- /dev/null
+++ b/backend/components/flink-connector/helm/values.yaml
@@ -0,0 +1,16 @@
+
+component: flink-connector
+mandatory: false
+enabled: false
+port: 8080
+resources:
+gatewayUrl: "http://flink-jobmanager:8083"
+apiCommunicationUrl: "http://api-communication/messages.send"
+executor:
+ name: statements-executor
+ image: connectors/flink/statements-executor
+ topic: flink.statements
+resultSender:
+ name: result-sender
+ image: connectors/flink/result-sender
+ topic: flink.output
\ No newline at end of file
diff --git a/backend/components/flink-connector/src/result-sender.go b/backend/components/flink-connector/src/result-sender.go
new file mode 100644
index 000000000..90bfff738
--- /dev/null
+++ b/backend/components/flink-connector/src/result-sender.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func main() {
+
+ // Create Kafka consumer to read the statements
+ kafkaURL := os.Getenv("KAFKA_BROKERS")
+ schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
+ topicName := os.Getenv("KAFKA_TOPIC_NAME")
+ systemToken := os.Getenv("systemToken")
+ authUsername := os.Getenv("AUTH_JAAS_USERNAME")
+ authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
+ flinkProvider := os.Getenv("provider")
+ groupID := "result-sender"
+ msgNormal := false
+ msgDebug := true
+
+ if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
+ fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
+ return
+ }
+
+ var confluentConnection ConfluentConnection
+ confluentConnection.Token = os.Getenv("confluentToken")
+ confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
+ confluentConnection.Principal = os.Getenv("confluentPrincipal")
+ confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
+ confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")
+
+ // Healthcheck
+ http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
+ response := map[string]string{"status": "UP"}
+ jsonResponse, err := json.Marshal(response)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(jsonResponse)
+ })
+
+ go func() {
+ if err := http.ListenAndServe(":80", nil); err != nil {
+ panic(err)
+ }
+ }()
+
+ fmt.Println("Health-check started")
+
+ // Create Kafka consumer configuration
+ fmt.Println("Creating Kafka consumer for topic: ", topicName)
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ "group.id": groupID,
+ "auto.offset.reset": "earliest",
+ "security.protocol": "SASL_SSL",
+ "sasl.mechanisms": "PLAIN",
+ "sasl.username": authUsername,
+ "sasl.password": authPassword,
+ })
+ if err != nil {
+ fmt.Printf("Error creating consumer: %v\n", err)
+ return
+ }
+ c.SubscribeTopics([]string{topicName}, nil)
+ // Channel for signals
+ signals := make(chan os.Signal, 1)
+ done := make(chan bool, 1)
+
+ signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ for {
+ select {
+ case sig := <-signals:
+ // If an interrupt signal is received, break the loop
+ fmt.Printf("Caught signal %v: terminating\n", sig)
+ done <- true
+ return
+ default:
+ msg, err := c.ReadMessage(-1)
+ if err == nil {
+ var flinkOutput FlinkOutput
+ if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
+ fmt.Printf("Error unmarshalling message: %v\n", err)
+ continue
+ } else {
+ fmt.Printf("Received message: %+v\n", flinkOutput)
+
+ flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
+ confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")
+
+ var result FlinkResult
+ var headerConfluent []string
+ var resultConfluent string
+
+ if flinkProvider == "flink" {
+ fmt.Println("Flink gateway: ", flinkGatewayURL)
+ result, err = getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
+ headerConfluent = []string{}
+ } else {
+ fmt.Println("Flink gateway: ", confluentGatewayURL)
+ fmt.Println("Waiting 20 seconds...")
+ time.Sleep(20 * time.Second)
+ headerConfluent, resultConfluent, err = getFlinkResultConfluent(confluentGatewayURL, flinkOutput.SessionID, confluentConnection)
+ }
+ if err != nil {
+ fmt.Println("Unable to get Flink result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ return
+ }
+ if flinkProvider == "flink" {
+ sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", result), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
+ response, err := convertResultToMarkdown(result)
+ if err != nil {
+ fmt.Println("Unable to generate Markdown from result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
+ return
+ }
+ sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
+ } else {
+ sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", resultConfluent), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
+ response, err := convertConfluentResultToMarkdown(headerConfluent, resultConfluent)
+ if err != nil {
+ fmt.Println("Unable to generate Markdown from result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
+ return
+ }
+ sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
+ }
+ }
+ } else {
+ fmt.Printf("Consumer error: %v\n", err)
+ }
+ }
+ }
+ }()
+ <-done
+ c.Close()
+ fmt.Println("Consumer closed")
+}
diff --git a/backend/components/flink-connector/src/statements-executor.go b/backend/components/flink-connector/src/statements-executor.go
new file mode 100644
index 000000000..6ccf2e91b
--- /dev/null
+++ b/backend/components/flink-connector/src/statements-executor.go
@@ -0,0 +1,140 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func main() {
+
+ kafkaURL := os.Getenv("KAFKA_BROKERS")
+ schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
+ topicName := os.Getenv("KAFKA_TOPIC_NAME")
+ systemToken := os.Getenv("systemToken")
+ authUsername := os.Getenv("AUTH_JAAS_USERNAME")
+ authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
+ flinkProvider := os.Getenv("provider")
+ groupID := "statement-executor-"
+ msgNormal := false
+ msgDebug := true
+
+ if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
+ fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
+ return
+ }
+
+ var confluentConnection ConfluentConnection
+ confluentConnection.Token = os.Getenv("confluentToken")
+ confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
+ confluentConnection.Principal = os.Getenv("confluentPrincipal")
+ confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
+ confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")
+
+ // Healthcheck
+ http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
+ response := map[string]string{"status": "UP"}
+ jsonResponse, err := json.Marshal(response)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(jsonResponse)
+ })
+
+ go func() {
+ if err := http.ListenAndServe(":80", nil); err != nil {
+ panic(err)
+ }
+ }()
+
+ fmt.Println("Health-check started")
+
+ // Create Kafka consumer configuration
+ fmt.Println("Creating Kafka consumer for topic: ", topicName)
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ "group.id": groupID,
+ "auto.offset.reset": "earliest",
+ "security.protocol": "SASL_SSL",
+ "sasl.mechanisms": "PLAIN",
+ "sasl.username": authUsername,
+ "sasl.password": authPassword,
+ })
+ if err != nil {
+ fmt.Printf("Error creating consumer: %v\n", err)
+ return
+ }
+ c.SubscribeTopics([]string{topicName}, nil)
+ // Channel for signals
+ signals := make(chan os.Signal, 1)
+ done := make(chan bool, 1)
+
+ signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ for {
+ select {
+ case sig := <-signals:
+ fmt.Printf("Caught signal %v: terminating\n", sig)
+ done <- true
+ return
+ default:
+ msg, err := c.ReadMessage(-1)
+ if err == nil {
+ var statementSet FlinkStatementSet
+ if err := json.Unmarshal(msg.Value, &statementSet); err != nil {
+ fmt.Printf("Error unmarshalling message: %v\n", err)
+ continue
+ } else {
+ fmt.Printf("Received message: %+v\n", statementSet)
+
+ flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
+ confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")
+ var sessionID string
+ if flinkProvider == "flink" {
+ sessionID, err = sendFlinkSQL(flinkGatewayURL, statementSet)
+ } else {
+ sessionID, err = sendFlinkSQLConfluent(confluentGatewayURL, statementSet, confluentConnection)
+ }
+
+ if err != nil {
+ fmt.Println("Error running Flink statement:", err)
+ sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug)
+ sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal)
+ return
+ }
+ fmt.Println("Successfully executed the Flink statement.")
+ sendMessage("FlinkSessionID: "+sessionID, statementSet.ConversationID, systemToken, msgDebug)
+ var flinkOutput FlinkOutput
+ flinkOutput.SessionID = sessionID
+ flinkOutput.Question = statementSet.Question
+ flinkOutput.MessageID = statementSet.MessageID
+ flinkOutput.ConversationID = statementSet.ConversationID
+ err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword)
+ if err != nil {
+
+ fmt.Printf("error producing message to Kafka: %v\n", err)
+ sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug)
+ sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal)
+ }
+ sendMessage("Message produced to topic: flink.outputs", statementSet.ConversationID, systemToken, msgDebug)
+ }
+ } else {
+ fmt.Printf("Consumer error: %v\n", err)
+ }
+ }
+ }
+ }()
+ <-done
+ c.Close()
+ fmt.Println("Consumer closed")
+}
diff --git a/backend/components/flink-connector/src/tools.go b/backend/components/flink-connector/src/tools.go
new file mode 100644
index 000000000..be68a6368
--- /dev/null
+++ b/backend/components/flink-connector/src/tools.go
@@ -0,0 +1,503 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) {
+ timestamp := time.Now().Unix()
+ strTimestamp := fmt.Sprintf("%d", timestamp)
+ replacements := map[string]string{
+ "{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp,
+ "{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"),
+ "{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")),
+ }
+ for i, stmt := range statementSet.Statements {
+ for placeholder, value := range replacements {
+ stmt = strings.Replace(stmt, placeholder, value, -1)
+ }
+ statementSet.Statements[i] = stmt
+ }
+ fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements)
+
+ req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte("")))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Response: ", string(body))
+ var sessionResponse FlinkSessionResponse
+ if err := json.Unmarshal(body, &sessionResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ defer resp.Body.Close()
+
+ fmt.Println("The Flink session is: ", sessionResponse.SessionHandle)
+ for _, statement := range statementSet.Statements {
+ payload := FlinkSQLRequest{
+ Statement: statement,
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return "", err
+ }
+
+ req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client = &http.Client{}
+ resp, err = client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err = io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse FlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle)
+ defer resp.Body.Close()
+ }
+
+ return sessionResponse.SessionHandle, nil
+}
+
+func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error {
+
+ kafkaTopic := "flink.outputs"
+
+ flinkOutputJSON, err := json.Marshal(flinkOutput)
+ if err != nil {
+ return fmt.Errorf("error marshaling query to JSON: %w", err)
+ }
+
+ configMap := kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ }
+ if authUsername != "" && authPassword != "" {
+ configMap.SetKey("security.protocol", "SASL_SSL")
+ configMap.SetKey("sasl.mechanisms", "PLAIN")
+ configMap.SetKey("sasl.username", authUsername)
+ configMap.SetKey("sasl.password", authPassword)
+ }
+
+ producer, err := kafka.NewProducer(&configMap)
+ if err != nil {
+ return fmt.Errorf("failed to create producer: %w", err)
+ }
+ defer producer.Close()
+
+ // Produce the message
+ message := kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
+ Key: []byte(flinkOutput.SessionID),
+ Value: flinkOutputJSON,
+ }
+
+ err = producer.Produce(&message, nil)
+ if err != nil {
+ return fmt.Errorf("failed to produce message: %w", err)
+ }
+ fmt.Println("message scheduled for production")
+ producer.Flush(15 * 1000)
+ fmt.Println("message flushed")
+ return nil
+}
+
+func sendMessage(message string, conversationId string, systemToken string, debug bool) (int, string, error) {
+ messageContent := messageContent{
+ Text: message,
+ Debug: debug,
+ }
+ messageToSend := ApplicationCommunicationSendMessage{
+ ConversationID: conversationId,
+ Message: messageContent,
+ }
+ messageJSON, err := json.Marshal(messageToSend)
+ if err != nil {
+ fmt.Printf("Error encoding response to JSON: %v\n", err)
+ return 0, "", errors.New("The message could not be encoded to JSON for sending.")
+ }
+
+ req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON))
+ if err != nil {
+ fmt.Printf("Error creating request: %v\n", err)
+ return 0, "", errors.New("The message could not be sent.")
+ }
+ req.Header.Add("Authorization", "Bearer "+systemToken)
+ req.Header.Add("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ fmt.Printf("Error sending POST request: %v\n", err)
+ return 0, "", errors.New("Error sending POST request.")
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body:", err)
+ return 0, "", errors.New("Error reading response body.")
+ }
+
+ var response SendMessageResponse
+ err = json.Unmarshal(body, &response)
+ if err != nil {
+ fmt.Println("Error unmarshaling response:", err)
+ return 0, "", errors.New("Response couldn't be unmarshaled.")
+ }
+
+ fmt.Printf("Message sent with status code: %d\n", resp.StatusCode)
+ return resp.StatusCode, response.ID, nil
+}
+
+func sendFlinkSQLConfluent(url string, statementSet FlinkStatementSet, connection ConfluentConnection) (string, error) {
+ timestamp := time.Now().Unix()
+ strTimestamp := fmt.Sprintf("%d", timestamp)
+ statementName := "airy-" + strTimestamp
+ payload := ConfluentFlink{
+ Name: statementName,
+ Spec: ConfluentFlinkSpec{
+ Statement: statementSet.Statements[0],
+ ComputePoolID: connection.ComputePoolID,
+ Principal: connection.Principal,
+ Properties: FlinkSpecProperties{
+ SQLCurrentCatalog: connection.SQLCurrentCatalog,
+ SQLCurrentDatabase: connection.SQLCurrentDatabase,
+ },
+ Stopped: false,
+ },
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return "", err
+ }
+
+ req, err := http.NewRequest("POST", url, bytes.NewReader(payloadBytes))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse ConfluentFlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ fmt.Printf("Check status on: %s/%s\n", url, statementName)
+ defer resp.Body.Close()
+
+ return statementName, nil
+}
+
+func getFlinkResult(url, sessionID string) (FlinkResult, error) {
+ fmt.Println("The Flink session is: ", sessionID)
+ payload := FlinkSQLRequest{
+ Statement: "select * from output;",
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+
+ req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse FlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return FlinkResult{}, err
+ }
+
+ fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle)
+ time.Sleep(20 * time.Second)
+ req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client = &http.Client{}
+ resp, err = client.Do(req)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ body, err = io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var flinkResultResponse FlinkResultResponse
+ if err := json.Unmarshal(body, &flinkResultResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return FlinkResult{}, err
+ }
+ defer resp.Body.Close()
+
+ if flinkResultResponse.Errors != nil {
+ statementError := errors.New(strings.Join(flinkResultResponse.Errors, ","))
+ return FlinkResult{}, statementError
+ }
+ return flinkResultResponse.Results, nil
+}
+
+func markdown(message string) (string, error) {
+ return message, nil
+}
+
+func convertResultToMarkdown(result FlinkResult) (string, error) {
+ var builder strings.Builder
+
+ if len(result.Columns) == 0 {
+ return "", errors.New("No columns found for generating the Markdown table.")
+ }
+ for _, col := range result.Columns {
+ builder.WriteString("| " + col.Name + " ")
+ }
+ builder.WriteString("|\n")
+
+ for range result.Columns {
+ builder.WriteString("|---")
+ }
+ builder.WriteString("|\n")
+
+ for _, d := range result.Data {
+ for _, field := range d.Fields {
+ builder.WriteString(fmt.Sprintf("| %v ", field))
+ }
+ builder.WriteString("|\n")
+ }
+
+ return builder.String(), nil
+}
+
+func getFlinkResultConfluent(url, sessionID string, connection ConfluentConnection) ([]string, string, error) {
+ req, err := http.NewRequest("GET", url+"/"+sessionID, bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse ConfluentFlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ fmt.Printf("Received result for statement: %s\n", sessionID)
+ fmt.Println("Phase: ", statementResponse.Status.Phase, " Detail: ", statementResponse.Status.Detail)
+ defer resp.Body.Close()
+
+ if statementResponse.Status.Phase == "RUNNING" || statementResponse.Status.Phase == "COMPLETED" {
+ columns, err := getColumnNames(statementResponse.Status.ResultSchema)
+ if err != nil {
+ fmt.Println("Extracting of the column names failed.")
+ return []string{}, "", err
+ }
+ req, err := http.NewRequest("GET", url+"/"+sessionID+"/results", bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var result ConfluentFlinkResultsResponse
+ if err := json.Unmarshal(body, &result); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ nextResult := result.Metadata.Next
+ fmt.Println("Next result: ", nextResult)
+ fmt.Println("Result: ", result.Results.Data)
+ data, err := dataToString(result.Results.Data)
+ if data != "" {
+ return columns, data, nil
+ } else {
+ req, err := http.NewRequest("GET", nextResult, bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var result ConfluentFlinkResultsResponse
+ if err := json.Unmarshal(body, &result); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ data, err := dataToString(result.Results.Data)
+ return columns, data, err
+ }
+ } else {
+ err := errors.New("Flink statement failed. Status: " + statementResponse.Status.Phase)
+ return []string{}, "", err
+ }
+}
+
+func dataToString(data interface{}) (string, error) {
+ if slice, ok := data.([]interface{}); ok && len(slice) > 0 {
+ dataBytes, err := json.Marshal(data)
+ if err != nil {
+ return "", err
+ }
+ return string(dataBytes), nil
+ }
+ return "", nil
+}
+
+func convertConfluentResultToMarkdown(headerNames []string, jsonStr string) (string, error) {
+ var dataRows []ConfluentDataRow
+ err := json.Unmarshal([]byte(jsonStr), &dataRows)
+ if err != nil {
+ return "", err
+ }
+
+ var sb strings.Builder
+
+ header := generateMarkdownHeader(headerNames)
+ sb.WriteString(header)
+ sb.WriteString("\n")
+
+ separator := strings.Repeat("| --- ", strings.Count(header, "|")-1) + "|"
+ sb.WriteString(separator)
+ sb.WriteString("\n")
+
+ for _, dataRow := range dataRows {
+ sb.WriteString("|")
+ for _, cell := range dataRow.Row {
+ sb.WriteString(" ")
+ sb.WriteString(cell)
+ sb.WriteString(" |")
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), nil
+}
+
+func extractColumnNames(jsonStr string) ([]string, error) {
+ var schema ConfluentResultSchema
+ err := json.Unmarshal([]byte(jsonStr), &schema)
+ if err != nil {
+ return nil, err
+ }
+
+ var columnNames []string
+ for _, column := range schema.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+
+ return columnNames, nil
+}
+
+func generateMarkdownHeader(columnNames []string) string {
+ var header string
+
+ for _, name := range columnNames {
+ header += "| " + name + " "
+ }
+ header += "|"
+
+ return header
+}
+
+func ResultsToString(rs ConfluentResultSchema) string {
+ var columnNames []string
+ for _, column := range rs.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+ return strings.Join(columnNames, ", ")
+}
+
+func getColumnNames(schema ConfluentResultSchema) ([]string, error) {
+ var columnNames []string
+ for _, column := range schema.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+ return columnNames, nil
+}
diff --git a/backend/components/flink-connector/src/types.go b/backend/components/flink-connector/src/types.go
new file mode 100644
index 000000000..c67ee3944
--- /dev/null
+++ b/backend/components/flink-connector/src/types.go
@@ -0,0 +1,137 @@
+package main
+
+type ApplicationCommunicationSendMessage struct {
+ ConversationID string `json:"conversation_id"`
+ Message messageContent `json:"message"`
+ Metadata map[string]string `json:"metadata"`
+}
+
+type messageContent struct {
+ Text string `json:"text"`
+ Debug bool `json:"debug"`
+}
+
+type SendMessageResponse struct {
+ ID string `json:"id"`
+ State string `json:"state"`
+}
+
+type FlinkOutput struct {
+ SessionID string `json:"session_id"`
+ Question string `json:"question"`
+ MessageID string `json:"message_id"`
+ ConversationID string `json:"conversation_id"`
+}
+
+type FlinkSQLRequest struct {
+ Statement string `json:"statement"`
+}
+
+type FlinkSessionResponse struct {
+ SessionHandle string `json:"sessionHandle"`
+}
+
+type FlinkStatementResponse struct {
+ OperationHandle string `json:"operationHandle"`
+}
+
+type Column struct {
+ Name string `json:"name"`
+ LogicalType struct {
+ Type string `json:"type"`
+ Nullable bool `json:"nullable"`
+ Length int `json:"length,omitempty"`
+ } `json:"logicalType"`
+ Comment interface{} `json:"comment"`
+}
+
+type Data struct {
+ Kind string `json:"kind"`
+ Fields []interface{} `json:"fields"`
+}
+
+type FlinkResult struct {
+ Columns []Column `json:"columns"`
+ RowFormat string `json:"rowFormat"`
+ Data []Data `json:"data"`
+}
+
+type FlinkResultResponse struct {
+ ResultType string `json:"resultType"`
+ IsQueryResult bool `json:"isQueryResult"`
+ JobID string `json:"jobID"`
+ ResultKind string `json:"resultKind"`
+ Results FlinkResult `json:"results"`
+ NextResultUri string `json:"nextResultUri"`
+ Errors []string `json:"errors"`
+}
+
+type ConfluentFlink struct {
+ Name string `json:"name"`
+ Spec ConfluentFlinkSpec `json:"spec"`
+}
+
+type ConfluentFlinkSpec struct {
+ Statement string `json:"statement"`
+ ComputePoolID string `json:"compute_pool_id"`
+ Principal string `json:"principal"`
+ Properties FlinkSpecProperties `json:"properties"`
+ Stopped bool `json:"stopped"`
+}
+
+type FlinkSpecProperties struct {
+ SQLCurrentCatalog string `json:"sql.current-catalog"`
+ SQLCurrentDatabase string `json:"sql.current-database"`
+}
+
+type ConfluentFlinkStatementResponse struct {
+ Name string `json:"name"`
+ Status ConfluentFlinkStatementStatus `json:"status"`
+}
+
+type ConfluentFlinkStatementStatus struct {
+ Detail string `json:"detail"`
+ Phase string `json:"phase"`
+ ResultSchema ConfluentResultSchema `json:"result_schema"`
+}
+
+type ConfluentResultSchema struct {
+ Columns []struct {
+ Name string `json:"name"`
+ } `json:"columns"`
+}
+
+type ConfluentFlinkResultsResponse struct {
+ Metadata ResultResponseMetadata `json:"metadata"`
+ Results ResultResponseResults `json:"results"`
+}
+
+type ResultResponseMetadata struct {
+ CreatedAt string `json:"created_at"`
+ Next string `json:"next"`
+ Self string `json:"self"`
+}
+
+type ResultResponseResults struct {
+ Data interface{} `json:"data"`
+}
+
+type ConfluentDataRow struct {
+ Op int `json:"op"`
+ Row []string `json:"row"`
+}
+
+type FlinkStatementSet struct {
+ Statements []string `json:"statements"`
+ Question string `json:"question"`
+ MessageID string `json:"message_id"`
+ ConversationID string `json:"conversation_id"`
+}
+
+type ConfluentConnection struct {
+ Token string
+ ComputePoolID string
+ Principal string
+ SQLCurrentCatalog string
+ SQLCurrentDatabase string
+}
diff --git a/docs/docs/getting-started/components.md b/docs/docs/getting-started/components.md
index c358383c5..2e5b32d58 100644
--- a/docs/docs/getting-started/components.md
+++ b/docs/docs/getting-started/components.md
@@ -96,5 +96,6 @@ Here is a list of the open source components which can be added to `Airy Core`:
- sources-twilio
- sources-viber
- sources-whatsapp
+- flink-connector
More information about the components API can be found [here](/api/endpoints/components).
diff --git a/frontend/control-center/src/components/ChannelAvatar/index.tsx b/frontend/control-center/src/components/ChannelAvatar/index.tsx
index b1d6c33bf..6894e2c24 100644
--- a/frontend/control-center/src/components/ChannelAvatar/index.tsx
+++ b/frontend/control-center/src/components/ChannelAvatar/index.tsx
@@ -27,6 +27,7 @@ import {ReactComponent as MosaicAvatar} from 'assets/images/icons/mosaic.svg';
import {ReactComponent as WeaviateAvatar} from 'assets/images/icons/weaviate.svg';
import {ReactComponent as GmailAvatar} from 'assets/images/icons/gmail.svg';
import {ReactComponent as SlackAvatar} from 'assets/images/icons/slack.svg';
+import {ReactComponent as FlinkAvatar} from 'assets/images/icons/flink.svg';
import {Channel, Source} from 'model';
import styles from './index.module.scss';
@@ -135,6 +136,8 @@ export const getChannelAvatar = (source: string) => {
return ;
case 'Slack connector':
return ;
+ case 'Flink connector':
+ return ;
default:
return ;
diff --git a/lib/typescript/assets/images/icons/flink.svg b/lib/typescript/assets/images/icons/flink.svg
new file mode 100644
index 000000000..fb63bd841
--- /dev/null
+++ b/lib/typescript/assets/images/icons/flink.svg
@@ -0,0 +1 @@
+
\ No newline at end of file