diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkApplication.java index 52be518..9dec0f5 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkApplication.java @@ -1,8 +1,8 @@ package com.monitory.data; import com.monitory.data.sources.MqttSource; +import com.monitory.data.transformations.TimeStampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -16,16 +16,9 @@ public static void main (String [] args) throws Exception { // 2. 데이터 소스 DataStream sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source"); - // 3. 데이터 처리: 단순하게 문자열을 대문자로 변환하는 예시 + // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지 DataStream transformedStream = sourceStream - .map(new MapFunction() { - @Override - public String map(String value) throws Exception { -// Thread.sleep(2000000); - System.out.println("💡 received: " + value); - return value.toUpperCase(); - } - }); + .map(new TimeStampAssigner()); // 4. 데이터 싱크: 콘솔에 출력 transformedStream.print(); diff --git a/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java b/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java new file mode 100644 index 0000000..0dbc561 --- /dev/null +++ b/src/main/java/com/monitory/data/transformations/TimeStampAssigner.java @@ -0,0 +1,20 @@ +package com.monitory.data.transformations; + +import org.apache.flink.api.common.functions.MapFunction; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.time.Instant; +import java.time.LocalDateTime; + +public class TimeStampAssigner implements MapFunction { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Override + public String map(String value) throws Exception { + ObjectNode jsonNode = (ObjectNode) mapper.readTree(value); + jsonNode.put("time", LocalDateTime.now().toString()); + return mapper.writeValueAsString(jsonNode); + } +}