Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ services:
- ia-mqtt-broker
security_opt:
- no-new-privileges
privileged: true
read_only: true
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9092/kapacitor/v1/ping || exit 1"]
Expand Down Expand Up @@ -121,6 +122,23 @@ services:
- ./time_series_analytics_microservice/tick_scripts/:/tmp/windturbine_anomaly_detector/tick_scripts/
- ./time_series_analytics_microservice/config.json:/app/config.json
- ./time_series_analytics_microservice/models/:/tmp/windturbine_anomaly_detector/models/
- /dev/dri:/dev/dri
- "/run/udev:/run/udev:ro"
device_cgroup_rules:
# Default run - device-cgroup-rule='c 189:* rmw'
# Selective rules can be applied for deployment
- 'c 189:* rmw'
- 'c 209:* rmw'
- 'a 189:* rwm'
devices:
# Following devices under /dev filesystem will be needed based on usecase
# dri - GPU
- "/dev/dri:/dev/dri"
group_add:
# render group ID for ubuntu 20.04 host OS
- "109"
# render group ID for ubuntu 22.04 host OS
- "110"

ia-grafana:
image: grafana/grafana-oss:12.0.0-ubuntu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#

""" Custom user defined function for anomaly detection on the windturbine speed and generated power data. """

import os
import sys
import json
Expand All @@ -18,16 +15,17 @@
import tempfile
import pickle
import math
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearnex import patch_sklearn
patch_sklearn()
# import paho.mqtt.client as mqtt
import modin.pandas as pd
import datetime
import time
import requests
import numpy as np
from sklearnex import patch_sklearn, config_context
patch_sklearn()
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor


# from gcp_mqtt_client import get_client

Expand All @@ -43,7 +41,6 @@
)

logger = logging.getLogger()


# Anomaly detection on the windturbine speed and generated power data
class AnomalyDetectorHandler(Handler):
Expand Down Expand Up @@ -166,7 +163,8 @@ def process_the_point(x,y):
check_for_anomalies = process_the_point(x,y)
point.fieldsDouble.add(key = "analytic", value = True)
if (check_for_anomalies):
y_pred = self.rf.predict(np.reshape(x,(-1,1)))
with config_context(target_offload="gpu"):
y_pred = self.rf.predict(np.reshape(x,(-1,1)))
error = (y_pred[0]-y)/(y)
if (error>self.error_threshold):
self.last_states.append(1)
Expand All @@ -180,8 +178,9 @@ def process_the_point(x,y):
x_feat = np.reshape(x_feat, (-1,1))
y_feat = list(zip(*self.last_anomalies))[1]

lm = LinearRegression()
lm.fit(x_feat, y_feat)
with config_context(target_offload="gpu"):
lm = LinearRegression()
lm.fit(x_feat, y_feat)

if (abs(lm.coef_)<200):
is_anomaly = 1
Expand All @@ -197,10 +196,28 @@ def process_the_point(x,y):
point.fieldsDouble.add(key = "anomaly_status", value = 1)
else:
self.last_states.append(0)

else:
logger.error(f"No input received for {self.x_name} {x}, {self.y_name} {y}. Skipping anomaly detection.")
point.fieldsDouble.add(key = "analytic", value = False)
#temp = None
# for kv in point.fieldsDouble:
# if kv.key == "temperature":
# temp = kv.value
# break
# if temp is None or isinstance(temp, (int, float)) is False:
# logger.error(f"Invalid temperature data received - {temp}")
# else:
# logger.debug(f"Received temperature point data {temp}")
# if temp < 20 or temp > 25:
#response = udf_pb2.Response()
#response.point.CopyFrom(point)
#logger.info(f"Temperature {temp} is outside the range 20-25.")
# #self._agent.write_response(response, True)
# #while (True):
# X_lr = np.random.random((5000, 100)).astype(np.float32)
# y_lr = np.random.random(5000).astype(np.float32)
# with config_context(target_offload="gpu"):
# lr = LinearRegression().fit(X_lr, y_lr)

# write data back to db if it is an anomaly point or there is an alarm for the point
response = udf_pb2.Response()
Expand Down