Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mysql support #381

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions blockchainetl/jobs/exporters/mysql_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import collections

from sqlalchemy import create_engine

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter


class MySQLItemExporter:

def __init__(self, connection_url, item_type_to_insert_stmt_mapping, converters=(), print_sql=True):
self.connection_url = connection_url
self.item_type_to_insert_stmt_mapping = item_type_to_insert_stmt_mapping
self.converter = CompositeItemConverter(converters)
self.print_sql = print_sql

self.engine = self.create_engine()

def open(self):
pass

def export_items(self, items):
items_grouped_by_type = group_by_item_type(items)

for item_type, insert_stmt in self.item_type_to_insert_stmt_mapping.items():
item_group = items_grouped_by_type.get(item_type)
if item_group:
connection = self.engine.connect()
converted_items = list(self.convert_items(item_group))
connection.execute(insert_stmt, converted_items)

def convert_items(self, items):
for item in items:
yield self.converter.convert_item(item)

def create_engine(self):
engine = create_engine(self.connection_url, echo=self.print_sql, pool_recycle=3600)
return engine

def close(self):
pass


def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
result[item.get('type')].append(item)

return result
7 changes: 7 additions & 0 deletions blockchainetl/streaming/mysql_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from sqlalchemy.dialects.mysql import insert


def create_insert_statement_for_table(table):
insert_stmt = insert(table)

return insert_stmt
24 changes: 24 additions & 0 deletions ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ def create_item_exporter(output):
},
converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(),
ListFieldItemConverter('topics', 'topic', fill=4)])
elif item_exporter_type == ItemExporterType.MYSQL:
from blockchainetl.jobs.exporters.mysql_item_exporter import MySQLItemExporter
from blockchainetl.streaming.mysql_utils import create_insert_statement_for_table
from blockchainetl.jobs.exporters.converters.unix_timestamp_item_converter import UnixTimestampItemConverter
from blockchainetl.jobs.exporters.converters.int_to_decimal_item_converter import IntToDecimalItemConverter
from blockchainetl.jobs.exporters.converters.list_field_item_converter import ListFieldItemConverter
from ethereumetl.streaming.mysql_tables import BLOCKS, TRANSACTIONS, LOGS, TOKEN_TRANSFERS, TRACES, TOKENS, CONTRACTS

item_exporter = MySQLItemExporter(
output, item_type_to_insert_stmt_mapping={
'block': create_insert_statement_for_table(BLOCKS),
'transaction': create_insert_statement_for_table(TRANSACTIONS),
'log': create_insert_statement_for_table(LOGS),
'token_transfer': create_insert_statement_for_table(TOKEN_TRANSFERS),
'trace': create_insert_statement_for_table(TRACES),
'token': create_insert_statement_for_table(TOKENS),
'contract': create_insert_statement_for_table(CONTRACTS),
},
converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(),
ListFieldItemConverter('topics', 'topic', fill=4)])

elif item_exporter_type == ItemExporterType.GCS:
from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter
bucket, path = get_bucket_and_path_from_gcs_output(output)
Expand Down Expand Up @@ -113,6 +134,8 @@ def determine_item_exporter_type(output):
return ItemExporterType.KAFKA
elif output is not None and output.startswith('postgresql'):
return ItemExporterType.POSTGRES
elif output is not None and output.startswith('mysql'):
return ItemExporterType.MYSQL
elif output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is None or output == 'console':
Expand All @@ -124,6 +147,7 @@ def determine_item_exporter_type(output):
class ItemExporterType:
PUBSUB = 'pubsub'
POSTGRES = 'postgres'
MYSQL = 'mysql'
GCS = 'gcs'
CONSOLE = 'console'
KAFKA = 'kafka'
Expand Down
129 changes: 129 additions & 0 deletions ethereumetl/streaming/mysql_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@

from sqlalchemy import Table, Column, Integer, BigInteger, Boolean, String, Numeric, \
MetaData, PrimaryKeyConstraint, VARCHAR, DATETIME

metadata = MetaData()

# SQL schema is here https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema

BLOCKS = Table(
'blocks', metadata,
Column('timestamp', DATETIME),
Column('number', BigInteger),
Column('hash', VARCHAR, primary_key=True),
Column('parent_hash', VARCHAR),
Column('nonce', VARCHAR),
Column('sha3_uncles', VARCHAR),
Column('logs_bloom', VARCHAR),
Column('transactions_root', VARCHAR),
Column('state_root', VARCHAR),
Column('receipts_root', VARCHAR),
Column('miner', VARCHAR),
Column('difficulty', Numeric(38)),
Column('total_difficulty', Numeric(38)),
Column('size', BigInteger),
Column('extra_data', VARCHAR),
Column('gas_limit', BigInteger),
Column('gas_used', BigInteger),
Column('transaction_count', BigInteger),
Column('base_fee_per_gas', BigInteger),
)

TRANSACTIONS = Table(
'transactions', metadata,
Column('hash', VARCHAR, primary_key=True),
Column('nonce', BigInteger),
Column('transaction_index', BigInteger),
Column('from_address', VARCHAR),
Column('to_address', VARCHAR),
Column('value', Numeric(38)),
Column('gas', BigInteger),
Column('gas_price', BigInteger),
Column('input', VARCHAR),
Column('receipt_cumulative_gas_used', BigInteger),
Column('receipt_gas_used', BigInteger),
Column('receipt_contract_address', VARCHAR),
Column('receipt_root', VARCHAR),
Column('receipt_status', BigInteger),
Column('block_timestamp', DATETIME),
Column('block_number', BigInteger),
Column('block_hash', VARCHAR),
Column('max_fee_per_gas', BigInteger),
Column('max_priority_fee_per_gas', BigInteger),
Column('transaction_type', BigInteger),
Column('receipt_effective_gas_price', BigInteger),
)

LOGS = Table(
'logs', metadata,
Column('log_index', BigInteger, primary_key=True),
Column('transaction_hash', VARCHAR, primary_key=True),
Column('transaction_index', BigInteger),
Column('address', VARCHAR),
Column('data', VARCHAR),
Column('topic0', VARCHAR),
Column('topic1', VARCHAR),
Column('topic2', VARCHAR),
Column('topic3', VARCHAR),
Column('block_timestamp', DATETIME),
Column('block_number', BigInteger),
Column('block_hash', VARCHAR),
)

TOKEN_TRANSFERS = Table(
'token_transfers', metadata,
Column('token_address', VARCHAR),
Column('from_address', VARCHAR),
Column('to_address', VARCHAR),
Column('value', Numeric(18)),
Column('transaction_hash', VARCHAR, primary_key=True),
Column('log_index', BigInteger, primary_key=True),
Column('block_timestamp', DATETIME),
Column('block_number', BigInteger),
Column('block_hash', VARCHAR),
)

TRACES = Table(
'traces', metadata,
Column('transaction_hash', VARCHAR),
Column('transaction_index', BigInteger),
Column('from_address', VARCHAR),
Column('to_address', VARCHAR),
Column('value', Numeric(38)),
Column('input', VARCHAR),
Column('output', VARCHAR),
Column('trace_type', VARCHAR),
Column('call_type', VARCHAR),
Column('reward_type', VARCHAR),
Column('gas', BigInteger),
Column('gas_used', BigInteger),
Column('subtraces', BigInteger),
Column('trace_address', VARCHAR),
Column('error', VARCHAR),
Column('status', Integer),
Column('block_timestamp', DATETIME),
Column('block_number', BigInteger),
Column('block_hash', VARCHAR),
Column('trace_id', VARCHAR, primary_key=True),
)

TOKENS = Table(
'tokens', metadata,
Column('address', VARCHAR(42), primary_key=True),
Column('name', String),
Column('symbol', String),
Column('decimals', Integer),
Column('function_sighashes', String),
Column('total_supply', Numeric(18)),
Column('block_number', BigInteger, primary_key=True),
)

CONTRACTS = Table(
'contracts', metadata,
Column('address', VARCHAR(42), primary_key=True),
Column('bytecode', VARCHAR),
Column('function_sighashes', String),
Column('is_erc20', Boolean),
Column('is_erc721', Boolean),
Column('block_number', BigInteger, primary_key=True),
)