diff --git a/kafka2s3/.gitignore b/kafka2s3/.gitignore new file mode 100644 index 0000000..9924a99 --- /dev/null +++ b/kafka2s3/.gitignore @@ -0,0 +1,289 @@ +# Logs +logs +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Diagnostic reports (https://nodejs.org/api/report.html) +report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json + +# Runtime data +pids +*.pid +*.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage +*.lcov + +# nyc test coverage +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# Bower dependency directory (https://bower.io/) +bower_components + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) +build/Release + +# Dependency directories +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) +web_modules/ + +# TypeScript cache +*.tsbuildinfo + +# Optional npm cache directory +.npm + +# Optional eslint cache +.eslintcache + +# Optional stylelint cache +.stylelintcache + +# Microbundle cache +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history +.node_repl_history + +# Output of 'npm pack' +*.tgz + +# Yarn Integrity file +.yarn-integrity + +# dotenv environment variable files +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) +.cache +.parcel-cache + +# Next.js build output +.next +out + +# Nuxt.js build / generate output +.nuxt +dist + +# Gatsby files +.cache/ +# Comment in the public line in if your project uses Gatsby and not Next.js +# https://nextjs.org/blog/next-9-1#public-directory-support +# public + +# vuepress build output +.vuepress/dist + +# vuepress v2.x temp and cache directory +.temp +.cache + +# Docusaurus cache and generated files +.docusaurus + +# Serverless directories +.serverless/ + +# FuseBox cache +.fusebox/ + +# DynamoDB Local files +.dynamodb/ + +# TernJS port file +.tern-port + +# Stores VSCode versions used for testing VSCode extensions +.vscode-test + +# yarn v2 +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + + +secret* + +.tinyb diff --git a/kafka2s3/Makefile b/kafka2s3/Makefile new file mode 100644 index 0000000..65bd06a --- /dev/null +++ b/kafka2s3/Makefile @@ -0,0 +1,109 @@ + +include .env + +# crypto_raw_array: +# decodable stream create \ +# --name crypto_raw_array \ +# --field records="ARRAY>" + +crypto_desc: + decodable stream create \ + --name crypto_desc \ + --field id="STRING primary key" \ + --field currency=STRING + +crypto_raw: + decodable stream create \ + --name crypto_raw \ + --field id=STRING \ + --field currency=STRING \ + --field symbol=STRING \ + --field name=STRING \ + --field logo_url=STRING \ + --field status=STRING \ + --field price=STRING \ + --field price_date=STRING \ + --field price_timestamp=STRING \ + --field circulating_supply=STRING \ + --field max_supply=STRING \ + --field market_cap=STRING \ + --field market_cap_dominance=STRING \ + --field num_exchanges=STRING \ + --field num_pairs=STRING \ + --field num_pairs_unmapped=STRING \ + --field first_candle=STRING \ + --field first_trade=STRING \ + --field first_order_book=STRING \ + --field rank=STRING \ + --field rank_delta=STRING \ + --field high=STRING \ + --field high_timestamp=STRING \ + --field 1d="ROW" \ + --field 30d="ROW" + +crypto_raw_table: + decodable stream create \ + --name crypto_raw_table \ + --field id="STRING primary key" \ + --field currency=STRING \ + --field symbol=STRING \ + --field name=STRING \ + --field logo_url=STRING \ + --field status=STRING \ + --field price=STRING \ + --field price_date=STRING \ + --field price_timestamp=STRING \ + --field circulating_supply=STRING \ + --field max_supply=STRING \ + --field market_cap=STRING \ + --field market_cap_dominance=STRING \ + --field num_exchanges=STRING \ + --field num_pairs=STRING \ + --field num_pairs_unmapped=STRING \ + --field first_candle=STRING \ + --field first_trade=STRING \ + --field first_order_book=STRING \ + --field rank=STRING \ + --field rank_delta=STRING \ + --field high=STRING \ + --field high_timestamp=STRING \ + --field 1d="ROW" \ + --field 30d="ROW" + + +run: + python crypto.py + +copy.stream: + decodable stream get $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="${STREAM}")|.id ' ) -o json | \ + jq '.schema' | \ + + +postgres_cdc: + decodable conn create \ + --name crypto_desc_postgres_cdc \ + --connector postgres-cdc \ + --type source \ + --stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="crypto_desc")|.id ' ) \ + --prop database-name=postgres \ + --prop decoding.plugin.name=pgoutput \ + --prop hostname=hubert-demo.culgt83fyssj.us-west-2.rds.amazonaws.com \ + --prop port=5432 \ + --prop schema-name=public \ + --prop table-name=crypto_desc \ + --prop username=postgres \ + --prop password=$(PG_PASSWORD) + +db: + psql -h $(PG_HOST) -d $(PG_DB) -U $(PG_USER) -W + +list.topics: + kcat -L -b $(BOOTSTRAP) + +publish: kconfig # send data to kafka + # make publish JSON=<> + kcat -b $(BOOTSTRAP) -F config.properties -t $(TOPIC) -k $$RANDOM -P $(JSON) + +subscribe: kconfig + kcat -b $(BOOTSTRAP) -F config.properties -t $(TOPIC) -K : + diff --git a/kafka2s3/apache/README.md b/kafka2s3/apache/README.md new file mode 100644 index 0000000..1254a71 --- /dev/null +++ b/kafka2s3/apache/README.md @@ -0,0 +1,80 @@ +# Installing Apache Kafka and sending data to S3 + + +# Prepare RedHat + +```bash +sudo yum update -y +sudo yum install -y java-11-openjdk +sudo yum install -y jq + +``` + +# Download Apache Kafka + +```bash +curl -O https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz +tar xvf kafka_2.13-3.2.1.tgz + +cd kafka_2.13-3.2.1 +``` + +# Configure Kafka +Open the Kafka broker configuration and set the listeners and advertised.listeners for remote accessing. + +`vi config/server.properties` + +```properties +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://0.0.0.0:9092, + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners=PLAINTEXT://CHANGE-ME-TO-THE-PUBLIC-IP-OR-HOST:9092 +``` + +# Start Kafka + +```bash +./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties +./bin/kafka-server-start.sh -daemon config/server.properties +``` + +# Setup AWS RDS Postgres with Debezium / Enable CDC + +Follow these commands for RDS Postgres: +- https://github.com/debezium/debezium/blob/main/debezium-connector-postgres/RDS.md +- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-overview + + +When the parameter group is configured, restart Postgres and run the command below. The output should say `logical`. +```sql +SHOW wal_level; +``` + +A replication slot is a feature in PostgreSQL that ensures that the master server will retain the WAL logs that are needed by the replicas even when they are disconnected from the master. +```sql +SELECT * FROM pg_create_logical_replication_slot('pgoutput_rds', 'pgoutput') +``` + +Grant the user name used in the Postgres source connection. +```sql +grant rds_replication to $(PG_USER); +``` + +Then execute this commands in Postgres for confirmation. +```sql +select * from pg_replication_slots; +``` + +Execute the command below to enable CDC for each table. +```sql +ALTER TABLE $(my_table) REPLICA IDENTITY FULL; +``` + + diff --git a/kafka2s3/crypto.py b/kafka2s3/crypto.py new file mode 100644 index 0000000..e770179 --- /dev/null +++ b/kafka2s3/crypto.py @@ -0,0 +1,41 @@ +#!/bin/local/python + +import time +import logging +import json +import os +from dotenv import load_dotenv +import urllib.request + +from kafka import KafkaProducer +from kafka.errors import KafkaError +import json + +def poll(producer, topic): + url = "https://api.nomics.com/v1/currencies/ticker?key={}&ids=BTC,ETH,XRP,DOGE,SHIB,BCH,BSV,LINK,LTC&interval=1d,30d&per-page=1000&page=1".format(os.getenv("NOMIC_API_KEY")) + msgs = json.loads(urllib.request.urlopen(url).read()) + + for msg in msgs: + print(json.dumps(msg)) + key=bytes(msg['id'].encode('utf-8')) + producer.send(topic, key=key, value=msg) + producer.flush() + + +def main(): + load_dotenv() + logging.basicConfig(level=logging.INFO) + + producer = KafkaProducer(bootstrap_servers=[os.getenv("BOOTSTRAP")],value_serializer=lambda v: json.dumps(v).encode('utf-8')) + topic='crypto' + + while True: + poll(producer, topic) + time.sleep(20) + + + + + +if __name__== "__main__": + main() \ No newline at end of file diff --git a/kafka2s3/requirements.txt b/kafka2s3/requirements.txt new file mode 100644 index 0000000..77717ed --- /dev/null +++ b/kafka2s3/requirements.txt @@ -0,0 +1,3 @@ +python-dotenv==0.20.0 +kafka-python +certifi diff --git a/kafka2s3/sample.json b/kafka2s3/sample.json new file mode 100644 index 0000000..7fd2cbc --- /dev/null +++ b/kafka2s3/sample.json @@ -0,0 +1,390 @@ +[ + { + "id": "BTC", + "currency": "BTC", + "symbol": "BTC", + "name": "Bitcoin", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/btc.svg", + "status": "active", + "price": "23850.16622120", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "19119506", + "max_supply": "21000000", + "market_cap": "456003396167", + "market_cap_dominance": "0.3859", + "num_exchanges": "468", + "num_pairs": "92183", + "num_pairs_unmapped": "10598", + "first_candle": "2011-08-18T00:00:00Z", + "first_trade": "2011-08-18T00:00:00Z", + "first_order_book": "2017-01-06T00:00:00Z", + "rank": "1", + "rank_delta": "0", + "high": "67599.15572183", + "high_timestamp": "2021-11-08T00:00:00Z", + "1d": { + "volume": "62436814861.69", + "price_change": "-846.69030697", + "price_change_pct": "-0.0343", + "volume_change": "-20403947295.91", + "volume_change_pct": "-0.2463", + "market_cap_change": "-15549614997.58", + "market_cap_change_pct": "-0.0330" + }, + "30d": { + "volume": "1678857299284.61", + "price_change": "3617.73004646", + "price_change_pct": "0.1788", + "volume_change": "71903784110.84", + "volume_change_pct": "0.0447", + "market_cap_change": "69711703640.83", + "market_cap_change_pct": "0.1805" + } + }, + { + "id": "ETH", + "currency": "ETH", + "symbol": "ETH", + "name": "Ethereum", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/eth.svg", + "status": "active", + "platform_currency": "ETH", + "price": "1884.79571959", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "121933463", + "market_cap": "229819668664", + "market_cap_dominance": "0.1945", + "num_exchanges": "544", + "num_pairs": "94081", + "num_pairs_unmapped": "79603", + "first_candle": "2015-08-07T00:00:00Z", + "first_trade": "2015-08-07T00:00:00Z", + "first_order_book": "2018-08-29T00:00:00Z", + "rank": "2", + "rank_delta": "0", + "high": "4811.45017967", + "high_timestamp": "2021-11-08T00:00:00Z", + "1d": { + "volume": "35129866841.32", + "price_change": "-24.65765321", + "price_change_pct": "-0.0129", + "volume_change": "-16576896298.43", + "volume_change_pct": "-0.3206", + "market_cap_change": "-2981514397.83", + "market_cap_change_pct": "-0.0128" + }, + "30d": { + "volume": "974090550844.49", + "price_change": "770.14673302", + "price_change_pct": "0.6909", + "volume_change": "245398670205.75", + "volume_change_pct": "0.3368", + "market_cap_change": "94356655920.16", + "market_cap_change_pct": "0.6965" + } + }, + { + "id": "XRP", + "currency": "XRP", + "symbol": "XRP", + "name": "XRP", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/XRP.svg", + "status": "active", + "price": "0.37293117", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "49136155729", + "max_supply": "99989371889", + "market_cap": "18324404165", + "market_cap_dominance": "0.0155", + "num_exchanges": "313", + "num_pairs": "3069", + "num_pairs_unmapped": "121", + "first_candle": "2013-05-09T00:00:00Z", + "first_trade": "2013-05-09T00:00:00Z", + "first_order_book": "2018-08-29T00:00:00Z", + "rank": "6", + "rank_delta": "0", + "high": "2.75914929", + "high_timestamp": "2018-01-07T00:00:00Z", + "1d": { + "volume": "4869638297.39", + "price_change": "-0.0086842662", + "price_change_pct": "-0.0228", + "volume_change": "-593737536.45", + "volume_change_pct": "-0.1087", + "market_cap_change": "-113985373.25", + "market_cap_change_pct": "-0.0062" + }, + "30d": { + "volume": "65793167747.13", + "price_change": "0.049346769", + "price_change_pct": "0.1525", + "volume_change": "10555894665.66", + "volume_change_pct": "0.1911", + "market_cap_change": "2681330611.65", + "market_cap_change_pct": "0.1714" + } + }, + { + "id": "DOGE", + "currency": "DOGE", + "symbol": "DOGE", + "name": "Dogecoin", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/doge.svg", + "status": "active", + "price": "0.070833898", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "132670764300", + "market_cap": "9397587450", + "market_cap_dominance": "0.0080", + "num_exchanges": "271", + "num_pairs": "3278", + "num_pairs_unmapped": "1060", + "first_candle": "2014-01-21T00:00:00Z", + "first_trade": "2014-01-21T00:00:00Z", + "first_order_book": "2018-09-12T00:00:00Z", + "rank": "11", + "rank_delta": "0", + "high": "0.68872854", + "high_timestamp": "2021-05-07T00:00:00Z", + "1d": { + "volume": "572007705.56", + "price_change": "-0.0023294748", + "price_change_pct": "-0.0318", + "volume_change": "-156157546.93", + "volume_change_pct": "-0.2145", + "market_cap_change": "-302373858.93", + "market_cap_change_pct": "-0.0312" + }, + "30d": { + "volume": "18223830611.17", + "price_change": "0.0088620626", + "price_change_pct": "0.1430", + "volume_change": "-3815379936.34", + "volume_change_pct": "-0.1731", + "market_cap_change": "1175736621.15", + "market_cap_change_pct": "0.1430" + } + }, + { + "id": "SHIB", + "currency": "SHIB", + "symbol": "SHIB", + "name": "Shiba Inu", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/SHIB.png", + "status": "active", + "platform_currency": "ETH", + "price": "0.000012393043", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "558229456845274", + "max_supply": "999991243368670", + "market_cap": "6918161559", + "market_cap_dominance": "0.0059", + "num_exchanges": "158", + "num_pairs": "400", + "num_pairs_unmapped": "47", + "first_candle": "2020-07-31T00:00:00Z", + "first_trade": "2020-07-31T00:00:00Z", + "first_order_book": "2021-02-01T00:00:00Z", + "rank": "17", + "rank_delta": "-1", + "high": "0.000079590182", + "high_timestamp": "2021-10-27T00:00:00Z", + "1d": { + "volume": "4590772507.21", + "price_change": "-0.00000025107394", + "price_change_pct": "-0.0199", + "volume_change": "-485868997.50", + "volume_change_pct": "-0.0957", + "market_cap_change": "-140118826.83", + "market_cap_change_pct": "-0.0199" + }, + "30d": { + "volume": "27757831551.63", + "price_change": "0.0000018473708", + "price_change_pct": "0.1752", + "volume_change": "9441145506.74", + "volume_change_pct": "0.5154", + "market_cap_change": "1033766907.22", + "market_cap_change_pct": "0.1757" + } + }, + { + "id": "LINK", + "currency": "LINK", + "symbol": "LINK", + "name": "ChainLink", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/LINK.jpg", + "status": "active", + "platform_currency": "ETH", + "price": "9.40034899", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "470099970", + "max_supply": "1000000000", + "market_cap": "4419103781", + "market_cap_dominance": "0.0037", + "num_exchanges": "290", + "num_pairs": "3547", + "num_pairs_unmapped": "67", + "first_candle": "2017-07-09T00:00:00Z", + "first_trade": "2017-09-28T00:00:00Z", + "first_order_book": "2018-08-29T00:00:00Z", + "rank": "25", + "rank_delta": "2", + "high": "52.35818401", + "high_timestamp": "2021-05-09T00:00:00Z", + "1d": { + "volume": "4613678552.42", + "price_change": "0.30660784", + "price_change_pct": "0.0337", + "volume_change": "-373390353.50", + "volume_change_pct": "-0.0749", + "market_cap_change": "144136335.40", + "market_cap_change_pct": "0.0337" + }, + "30d": { + "volume": "32442086718.68", + "price_change": "3.22443021", + "price_change_pct": "0.5221", + "volume_change": "11464775205.43", + "volume_change_pct": "0.5465", + "market_cap_change": "1528156384.38", + "market_cap_change_pct": "0.5286" + } + }, + { + "id": "LTC", + "currency": "LTC", + "symbol": "LTC", + "name": "Litecoin", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/ltc.svg", + "status": "active", + "price": "61.32611813", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "70927096", + "max_supply": "84000000", + "market_cap": "4349683452", + "market_cap_dominance": "0.0037", + "num_exchanges": "391", + "num_pairs": "3944", + "num_pairs_unmapped": "663", + "first_candle": "2013-03-25T00:00:00Z", + "first_trade": "2013-03-25T00:00:00Z", + "first_order_book": "2018-08-29T00:00:00Z", + "rank": "26", + "rank_delta": "-2", + "high": "388.79109964", + "high_timestamp": "2021-05-09T00:00:00Z", + "1d": { + "volume": "3871151057.04", + "price_change": "-1.59097527", + "price_change_pct": "-0.0253", + "volume_change": "-376978014.22", + "volume_change_pct": "-0.0887", + "market_cap_change": "-110931410.08", + "market_cap_change_pct": "-0.0249" + }, + "30d": { + "volume": "31666562544.36", + "price_change": "11.72125983", + "price_change_pct": "0.2363", + "volume_change": "5930169737.76", + "volume_change_pct": "0.2304", + "market_cap_change": "842169397.30", + "market_cap_change_pct": "0.2401" + } + }, + { + "id": "BCH", + "currency": "BCH", + "symbol": "BCH", + "name": "Bitcoin Cash", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/BCH.png", + "status": "active", + "price": "140.15903693", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "19143394", + "max_supply": "21000000", + "market_cap": "2683119617", + "market_cap_dominance": "0.0023", + "num_exchanges": "311", + "num_pairs": "2546", + "num_pairs_unmapped": "136", + "first_candle": "2017-07-09T00:00:00Z", + "first_trade": "2017-07-27T00:00:00Z", + "first_order_book": "2018-08-29T00:00:00Z", + "rank": "33", + "rank_delta": "0", + "high": "3717.49802158", + "high_timestamp": "2017-12-20T00:00:00Z", + "1d": { + "volume": "726509403.24", + "price_change": "-8.51521306", + "price_change_pct": "-0.0573", + "volume_change": "-65811916.29", + "volume_change_pct": "-0.0831", + "market_cap_change": "-161870985.95", + "market_cap_change_pct": "-0.0569" + }, + "30d": { + "volume": "22771112813.90", + "price_change": "37.60718670", + "price_change_pct": "0.3667", + "volume_change": "828630415.24", + "volume_change_pct": "0.0378", + "market_cap_change": "722720512.17", + "market_cap_change_pct": "0.3687" + } + }, + { + "id": "BSV", + "currency": "BSV", + "symbol": "BSV", + "name": "Bitcoin SV", + "logo_url": "https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/BSV.png", + "status": "active", + "price": "62.18659156", + "price_date": "2022-08-12T00:00:00Z", + "price_timestamp": "2022-08-12T13:14:00Z", + "circulating_supply": "19139858", + "max_supply": "21000000", + "market_cap": "1190242525", + "market_cap_dominance": "0.0010", + "num_exchanges": "147", + "num_pairs": "734", + "num_pairs_unmapped": "135", + "first_candle": "2017-07-09T00:00:00Z", + "first_trade": "2018-11-08T00:00:00Z", + "first_order_book": "2018-11-13T00:00:00Z", + "rank": "54", + "rank_delta": "-1", + "high": "441.03122976", + "high_timestamp": "2021-04-16T00:00:00Z", + "1d": { + "volume": "56948934.06", + "price_change": "-1.91442074", + "price_change_pct": "-0.0299", + "volume_change": "17897970.78", + "volume_change_pct": "0.4583", + "market_cap_change": "-35372611.84", + "market_cap_change_pct": "-0.0289" + }, + "30d": { + "volume": "1492336265.82", + "price_change": "10.41894890", + "price_change_pct": "0.2013", + "volume_change": "-1520090848.69", + "volume_change_pct": "-0.5046", + "market_cap_change": "200814280.60", + "market_cap_change_pct": "0.2030" + } + } +] \ No newline at end of file diff --git a/kafka2s3/sql/materialize_crypto_raw.sql b/kafka2s3/sql/materialize_crypto_raw.sql new file mode 100644 index 0000000..aea6ce3 --- /dev/null +++ b/kafka2s3/sql/materialize_crypto_raw.sql @@ -0,0 +1,3 @@ +insert into crypto_raw_table +select * +from crypto_raw diff --git a/kafka2s3/sql/pipeline.sql b/kafka2s3/sql/pipeline.sql new file mode 100644 index 0000000..49cfaae --- /dev/null +++ b/kafka2s3/sql/pipeline.sql @@ -0,0 +1,19 @@ +insert into crypto_changes +select + crypto_raw_table.id, + crypto_raw_table.currency, + crypto_raw_table.symbol, + crypto_raw_table.name, + crypto_raw_table.logo_url, + crypto_raw_table.status, + crypto_raw_table.price, + crypto_raw_table.price_date, + crypto_raw_table.price_timestamp, + crypto_raw_table.circulating_supply, + crypto_raw_table.max_supply, + crypto_raw_table.market_cap_dominance, + crypto_raw_table.high, + crypto_raw_table.high_timestamp, + crypto_desc.description +from crypto_raw_table +left join crypto_desc on crypto_desc.id=crypto_raw_table.id diff --git a/util/schemas/helper.py b/util/schemas/helper.py new file mode 100644 index 0000000..d99b975 --- /dev/null +++ b/util/schemas/helper.py @@ -0,0 +1,32 @@ +#!/bin/local/python + +def handle_dict(key:str=None, value=None, parent=None): + for key in value.keys(): + switcher.get(type(value[key]), handle_field)(key, value[key]) +def handle_list(key:str=None, value=None, parent=None): + print() +def handle_field(key:str=None, value=None, parent=None): + if(parent == None): + print("--field {}={}".format(key, type(value).__name__)) + else: + print("`{}` {}".format(key, type(value).__name__)) + +switcher = { + dict: handle_dict, + list: handle_list +} + + +def main(): + load_dotenv() + logging.basicConfig(level=logging.INFO) + + msg = {} + func = switcher.get(type(msg), handle_field) + print(func) + func(value=msg) + + + +if __name__== "__main__": + main() \ No newline at end of file