Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
eli64s committed Mar 31, 2023
1 parent 1288a44 commit 5b3f6fe
Show file tree
Hide file tree
Showing 10 changed files with 1,459 additions and 5 deletions.
Binary file added .DS_Store
Binary file not shown.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Data Stream Processing with Apache Flink

---

1,001 changes: 1,001 additions & 0 deletions data/data.csv

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pandas
asyncio
aiohttp
aioresponses
Expand Down
20 changes: 20 additions & 0 deletions scripts/clean.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Backup files
find . -type f -name "*.py-e" -delete

# Clean up Python cache files
find . -type f -name "*.DS_Store" -ls -delete
find . -type f -name "*.py[co]" -delete
find . -type d -name "__pycache__" -exec rm -rf {} +

# Remove build artifacts
rm -rf build/ dist/ *.egg-info/

# Remove Jupyter notebook checkpoints
find . -type d -name ".ipynb_checkpoints" -exec rm -rf {} +

# Remove pytest cache
rm -rf .pytest_cache/

find . -type f -name "*.log" -delete
30 changes: 30 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
setup.py
"""
from pathlib import Path

from setuptools import find_namespace_packages, setup

BASE_DIR = Path(__file__).parent
with open(Path(BASE_DIR, "requirements.txt"), "r") as file:
required_packages = [ln.strip() for ln in file.readlines()]

docs_packages = ["mkdocs==1.3.0", "mkdocstrings==0.18.1"]
style_packages = ["black==22.3.0", "flake8==3.9.2", "isort==5.10.1"]
test_packages = ["pytest==7.1.2", "pytest-cov==2.10.1", "great-expectations==0.15.15"]

setup(
name="STREAM-ON",
version=0.1,
description="",
author="",
author_email="",
url="i",
python_requires=">=3.7",
packages=find_namespace_packages(),
install_requires=[required_packages],
extras_require={
"dev": docs_packages + style_packages + test_packages + ["pre-commit==2.19.0"],
"test": test_packages,
},
)
44 changes: 39 additions & 5 deletions setup/setup.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,43 @@
#!/bin/bash

conda create -n flink python=3.8
conda activate flink
# Check for Java 11 installation
if [[ -z $(java -version 2>&1 | grep "version \"11") ]]; then
echo "Java 11 not found. Installing..."
sudo apt-get update
sudo apt-get install openjdk-11-jdk -y
fi

python -m pip install apache-flink
pip install pyflink
# Check for Python 3.7 installation
if [[ -z $(python3.7 -V 2>&1 | grep "Python 3.7") ]]; then
echo "Python 3.7 not found. Installing..."
sudo apt-get update
sudo apt-get install python3.7 -y
fi

export FLINK_CONDA_HOME=$(dirname $(dirname $CONDA_EXE))
# Check for Conda installation
conda create -n pyflink python=3.8 -y
conda activate pyflink
python -m pip install apache-flink pyflink

# Download and extract PyFlink
echo "Downloading PyFlink..."
wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz -O flink.tgz
echo "Extracting PyFlink..."
tar -xzf flink.tgz
rm flink.tgz
mv flink-* pyflink

# Set environment variables
echo "Setting environment variables..."
export FLINK_CONDA_HOME=$(pwd)/pyflink
export PATH=$FLINK_CONDA_HOME/bin:$PATH
export PYTHONPATH=$FLINK_CONDA_HOME/python:$PYTHONPATH

# Set aliases for zsh
echo "Setting aliases for zsh..."
echo 'alias flink="flink run -p 1 -c"' >> ~/.zshrc
echo 'alias flink-submit="flink run -p 1 -m yarn-cluster -ynm PyFlink"' >> ~/.zshrc
echo 'alias flink-stop="flink cancel PyFlink"' >> ~/.zshrc

# Done!
echo "PyFlink setup complete."
102 changes: 102 additions & 0 deletions src/alerts_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""REST API alert handler for the Flink consumer."""

import aiohttp
import asyncio
import io
import threading
import unittest.mock as mock
from typing import Dict, List

import fastavro

from logger import Logger

ALERTS_BUFFER = []
ALERTS_BUFFER_LOCK = threading.Lock()
API_URL = "http://example.com/api/alert"
LOGGER = Logger("alert")
SCHEMA = {
"type": "record",
"name": "AlertData",
"fields": [
{"name": "id", "type": "int"},
{"name": "year", "type": "string"},
{"name": "make", "type": "string"},
{"name": "model", "type": "string"},
{"name": "price", "type": "double"},
],
}


async def send_alerts(alerts: List[Dict]) -> None:
"""
Sends alerts to the API using aiohttp in batches.
Parameters
----------
alerts : list
List of dictionaries containing the alerts to be sent.
"""
try:
ids = [alert["id"] for alert in alerts]
ids = ', '.join(str(i) for i in ids)
LOGGER.info(f"Sending {len(alerts)} total alerts...")

async with aiohttp.ClientSession() as session:
with mock.patch("aiohttp.ClientSession.post") as post_mock:
post_mock.return_value.status = 200
payload = serialize_alerts(alerts)

async with session.post(API_URL, data=payload) as response:
response.raise_for_status()
LOGGER.info(
f"API POST request completed for ids: {ids}\n"
)
except Exception as ex:
LOGGER.error(f"Failed to send alerts: {ex}", exc_info=True)


def send_alerts_batch() -> None:
"""
Sends all the alerts in the buffer to the API in batches.
"""
global ALERTS_BUFFER
with ALERTS_BUFFER_LOCK:
alerts = ALERTS_BUFFER
ALERTS_BUFFER = []
asyncio.run(send_alerts(alerts))


async def buffer_alerts(alerts: List[Dict]) -> None:
"""
Adds alerts to a buffer and sends them to the API in batches.
Parameters
----------
alerts : list
List of dictionaries containing the alerts to be sent.
"""
global ALERTS_BUFFER
with ALERTS_BUFFER_LOCK:
ALERTS_BUFFER.extend(alerts)
if len(ALERTS_BUFFER) >= 10:
threading.Thread(target=send_alerts_batch).start()


def serialize_alerts(alerts: List[Dict]) -> bytes:
"""
Serializes alerts using Apache Avro.
Parameters
----------
alerts : list
List of dictionaries containing the alerts to be serialized.
Returns
-------
bytes
Serialized alerts in bytes format.
"""
buffer = io.BytesIO()
fastavro.writer(buffer, SCHEMA, alerts)
return buffer.getvalue()
Loading

0 comments on commit 5b3f6fe

Please sign in to comment.