Skip to content

Commit

Permalink
refactor for dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
tkorays committed Nov 26, 2022
1 parent 4a92c91 commit 46fcb1e
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 118 deletions.
5 changes: 4 additions & 1 deletion Coffee/Core/LogWatchDog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from Coffee.Core.LogTail import LogTail
from Coffee.Core.Utils import merge_datetime
from Coffee.Data import (
DataSink, DataPoint, DataLoader, PatternGroupBuilder, PatternGroup, RegexPattern
PatternGroupBuilder, PatternGroup, RegexPattern
)
from Data import DataLoader
from Coffee.Data.DataFlow import DataPoint, DataSink
from datetime import datetime
import re
import os
Expand Down Expand Up @@ -119,6 +121,7 @@ def on_line(self, filename: str, line: str):
def start(self):
self.observer.schedule(self.event_handler, self.path)
self.observer.start()
print('watching...')
try:
while self.observer.is_alive():
pass
Expand Down
96 changes: 96 additions & 0 deletions Coffee/Data/DataFlow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2022 tkorays. All Rights Reserved.
# Licensed to MIT under a Contributor Agreement.
import abc
from abc import ABC
from dataclasses import dataclass
from datetime import datetime


@dataclass
class DataPoint:
"""
represent a single data record
"""
# type of this datapoint
name: str
# timestamp of this datapoint
timestamp: datetime
# data values in key-value format
value: dict
# which keys should be a tag
tags: list
# meta data of this datapoint
meta: dict

def timestamp_ms(self) -> int:
return int(self.timestamp.timestamp() * 1000)

def timestamp_s(self) -> int:
return int(self.timestamp.timestamp())

@staticmethod
def make_meta_datapoint(meta):
return DataPoint("", datetime.now(), {}, [], meta)


class DataSink(metaclass=abc.ABCMeta):
@abc.abstractmethod
def on_data(self, datapoint: DataPoint) -> DataPoint:
"""
Input source data to sink.
:param datapoint: data
"""
pass

@abc.abstractmethod
def finish(self, datapoint: DataPoint) -> DataPoint:
"""
No data anymore. This is the last call for processing data.
:param datapoint: data
"""
pass


class DataSource(metaclass=abc.ABCMeta):
@abc.abstractmethod
def add_sink(self, sink: DataSink):
"""
Add data consumer to the data source.
:param sink: data consumer to consume the output data.
"""
pass

@abc.abstractmethod
def start(self):
"""
Bootstrap the data generator.
"""
pass


class DataLoader(DataSource, DataSink, ABC):
"""
Load data from some source, and feed datapoints to all sinks.
"""
def __init__(self):
self.sinks = []

def add_sink(self, sink: DataSink):
self.sinks.append(sink)
return self

def start(self):
pass

def on_data(self, datapoint: DataPoint) -> DataPoint:
for s in self.sinks:
datapoint = s.on_data(datapoint)
return datapoint

def finish(self, datapoint: DataPoint) -> DataPoint:
for s in self.sinks:
datapoint = s.finish(datapoint)
return datapoint
30 changes: 1 addition & 29 deletions Coffee/Data/DataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
道生一,一生二,二生三,三生万物。
万物负阴而抱阳,冲气以为和。
"""
from abc import ABC
from datetime import datetime
from rich.console import Console
from rich.progress import Progress
Expand All @@ -14,34 +13,7 @@
from Coffee.Data.DataPattern import RegexPattern, PatternGroupBuilder
from Coffee.Core.Utils import merge_datetime
from Coffee.Data.DataPattern import PatternGroup
from Coffee.Data.DataProcessor import DataSink
from Coffee.Data.DataProcessor import DataSource
from Coffee.Data.DataPoint import DataPoint


class DataLoader(DataSource, DataSink, ABC):
"""
Load data from some source, and feed datapoints to all sinks.
"""
def __init__(self):
self.sinks = []

def add_sink(self, sink: DataSink):
self.sinks.append(sink)
return self

def start(self):
pass

def on_data(self, datapoint: DataPoint) -> DataPoint:
for s in self.sinks:
datapoint = s.on_data(datapoint)
return datapoint

def finish(self, datapoint: DataPoint) -> DataPoint:
for s in self.sinks:
datapoint = s.finish(datapoint)
return datapoint
from Coffee.Data.DataFlow import DataPoint, DataLoader


class LogFileDataLoader(DataLoader, PatternGroupBuilder):
Expand Down
4 changes: 4 additions & 0 deletions Coffee/Data/DataPattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def run_tests(self):
return success_cnt == len(self.tests)


class GrokPattern(PatternInterface):
pass


class PatternGroupBuilder:
"""
build a pattern group
Expand Down
29 changes: 0 additions & 29 deletions Coffee/Data/DataPoint.py

This file was deleted.

12 changes: 8 additions & 4 deletions Coffee/Data/DataStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pickle
import h5py
import numpy as np
from Coffee.Core.Settings import DEF_CFG


Expand Down Expand Up @@ -135,6 +136,9 @@ def fetch(self, type_id: str, cache_id: str):


class HDF5DataStore(DataStore):
# how to store VLEN bytes in HDF5
# https://docs.h5py.org/en/latest/special.html?highlight=VLEN#h5py.vlen_dtype

def __init__(self, path):
self.path = path
self.hdf5 = h5py.File(self.path, "a")
Expand All @@ -157,14 +161,14 @@ def add(self, type_id: str, cache_id: str, data):
else:
if cache_id not in dataset.attrs.keys():
return True
dataset.attrs[cache_id] = pickle.dumps(data).hex()
dataset.attrs[cache_id] = np.void(pickle.dumps(data))
self.hdf5.update()
return True

def update(self, type_id: str, cache_id: str, data):
dataset = self.hdf5.get(f"{type_id}")
if dataset:
dataset.attrs[cache_id] = pickle.dumps(data).hex()
dataset.attrs[cache_id] = np.void(pickle.dumps(data))
self.hdf5.update()
return True

Expand All @@ -174,7 +178,7 @@ def update_or_add(self, type_id: str, cache_id: str, data):
self.hdf5.create_dataset(f'{type_id}', data='')
dataset = self.hdf5.get(f"{type_id}")

dataset.attrs[cache_id] = pickle.dumps(data).hex()
dataset.attrs[cache_id] = np.void(pickle.dumps(data))
self.hdf5.update()
return True

Expand All @@ -184,7 +188,7 @@ def fetch(self, type_id: str, cache_id: str):
return None
if cache_id not in dataset.attrs.keys():
return None
return pickle.loads(bytes.fromhex(dataset.attrs[cache_id]))
return pickle.loads(dataset.attrs[cache_id].tobytes())


DEF_DATA_STORE = FileSystemDataStore(DEF_CFG.data_store_path)
5 changes: 5 additions & 0 deletions Coffee/Data/DataViz.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def influxdb_ts_sql(sql, interval: str = '1s', fill: str = 'none'):
return f'{sql} AND $timeFilter GROUP BY time({interval}) fill({fill})'


def add_influxdb_source():
"http://{server_addr}/api/datasources"
pass


class GrafanaDashboardBuilder:
"""
Utils class for creating dashboard for grafana.
Expand Down
3 changes: 1 addition & 2 deletions Coffee/Data/PatternMatchReport.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Copyright 2022 tkorays. All Rights Reserved.
# Licensed to MIT under a Contributor Agreement.

from Coffee.Data.DataProcessor import DataSink
from Coffee.Data.DataPoint import DataPoint
from Coffee.Data.DataFlow import DataPoint, DataSink
import click


Expand Down
52 changes: 11 additions & 41 deletions Coffee/Data/DataProcessor.py → Coffee/Data/Processors.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
# Copyright 2022 tkorays. All Rights Reserved.
# Licensed to MIT under a Contributor Agreement.

import abc

from Coffee.Data.DataPoint import DataPoint
"""
this file provides some data processors:
* bypass processor
* influxdb processor
* time tracker
* data aggregator
"""

from Coffee.Data.DataFlow import DataPoint
from Coffee.Data.Database import InfluxDBV1
from Coffee.Core.Utils import randstr
from datetime import datetime, timedelta
from Coffee.Data.DataFlow import DataSink


class DataSink(metaclass=abc.ABCMeta):
@abc.abstractmethod
def on_data(self, datapoint: DataPoint) -> DataPoint:
"""
Input source data to sink.
:param datapoint: data
"""
pass

@abc.abstractmethod
def finish(self, datapoint: DataPoint) -> DataPoint:
"""
No data anymore. This is the last call for processing data.
:param datapoint: data
"""
pass
from datetime import datetime, timedelta


class BypassDataSink(DataSink):
Expand All @@ -37,24 +25,6 @@ def finish(self, datapoint: DataPoint) -> DataPoint:
return datapoint


class DataSource(metaclass=abc.ABCMeta):
@abc.abstractmethod
def add_sink(self, sink: DataSink):
"""
Add data consumer to the data source.
:param sink: data consumer to consume the output data.
"""
pass

@abc.abstractmethod
def start(self):
"""
Bootstrap the data generator.
"""
pass


class InfluxDBDataSink(DataSink):
"""
track the min and max time for range selection.
Expand Down
8 changes: 4 additions & 4 deletions Coffee/Data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
DEF_TSDB
)
from .DataLoader import (
DataLoader, LogFileDataLoader
LogFileDataLoader
)
from .DataModel import DataModel
from .DataPattern import (
PatternInterface, RegexPattern,
PatternGroup, PatternGroupBuilder
)
from .DataPoint import DataPoint
from .DataProcessor import (
DataSource, DataSink, DataAggregator, DatapointTimeTracker,
from .DataFlow import DataPoint, DataSink, DataSource, DataLoader
from .Processors import (
DataAggregator, DatapointTimeTracker,
InfluxDBDataSink
)
from .DataStore import (
Expand Down
2 changes: 2 additions & 0 deletions examples/log_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import pandas as pd
import matplotlib.pyplot as plt

from Data import DataPoint


class DataAggregator1(DataAggregator):
def on_data(self, datapoint: DataPoint) -> DataPoint:
Expand Down
6 changes: 2 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,5 @@ flask
pid
python-daemon
django
# celery
# django-rest-framework
# redis
h5py
h5py
ddt
5 changes: 1 addition & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@
'pid',
'python-daemon',
'django',
# 'celery',
# 'redis',
'h5py',
# 'pandas',
# 'matplotlib'
'ddt',
],
dependency_links=[
],
Expand Down
Empty file.
Loading

0 comments on commit 46fcb1e

Please sign in to comment.