An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.
fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
|- migrations: (database migrations)
|- web_api: Flask backend API
|- web_ui: TBD
- Clone the repository
pip install -e .
fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json
--source
: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)
--target
: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)
--transform
: The pipeline's tranform type (default is the only implemented transform so far)
--config
: The path to JSON configuration file for the pipeline
--logs_to_slack
: Send error logs to slack
--logs_to_cloudwatch
: Send logs to cloudwatch
--logs_to_file
: Send logs to a file
The ETL framework has 4 concepts:
The base class fastlane.source.Source
provides basic functionality, and defines a standard interface for
extracting data from a particular source type. An instance of Source
is responsible only for extracting data from
source and returning as a python list of dictionaries.
Implementations of the Source
base class must fulfill the following functions at minimum:
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils
class SourceImpl(Source):
...
def extract(self) -> List[dict]:
"""This function should retrieve data from the source and return it as a list of dictionaries.
The Source class is an iterator, and this function is called on each iteration.
The iterator stops (and source worker exits) when this function returns an empty list.
So when there are no more records to fetch, this function should return [].
"""
@utils.classproperty
def source_type(self) -> str:
"""Return a string describing type of source this is, for example mysql or bigquery"""
@classmethod
def configuration_schema(cls) -> SourceConfigSchema:
"""Return a marshmallow schema inherited from SourceConfigSchema base schema.
This schema is used to validate the sources configuration, so all possible fields should be covered in
schema returned here."""
Example implementation of Source
interface is in fastlane.sources.impl.source_mysql
MySQL
BigQuery
MongoDB
The base class fastlane.transform.Transform
provides basic functionality, and defines a standard interface for
transforming data to be ready for target. An instance of Transform
is responsible only for transforming data from
source into a format compatible with target.
Implementations of the Transform
base class must fulfill the following functions at minimum:
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils
class TransformImpl(Transform):
...
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""This function should run any transformation on the dataframe and return the transformed dataframe.
Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created,
Make sure to remove the old dataframes from memory.
This function is called by the transform worker every time a new batch of source data has been received.
"""
@utils.classproperty
def transform_type(self) -> str:
"""Return a string describing type of transform this is."""
@classmethod
def configuration_schema(cls) -> TransformConfigSchema:
"""Return a marshmallow schema inherited from TransformConfigSchema base schema.
This schema is used to validate the transforms configuration, so all possible fields should be covered in
schema returned here."""
Example implementation of Transform
interface is in fastlane.transform.impl.transform_default
The base class fastlane.target.Target
provides basic functionality, and defines a standard interface for
loading data into a destination. An instance of Target
is responsible only for storing data which has been transformed
into a destination.
Implementations of the Target
base class must fulfill the following functions at minimum:
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils
class TargetImpl(Target):
...
def load(self, df: pd.DataFrame):
"""This function is called by the target worker every time a new batch of transformed data has been received.
This function should store the dataframe in whatever destination it implements.
"""
def get_offset(self):
"""Get the largest key which has been stored in the target. Used from incrementally loaded tables."""
@utils.classproperty
def target_type(self) -> str:
"""Return a string describing type of target this is."""
@classmethod
def target_id(cls, configuration: dict) -> str:
"""Return a unique identifier from this specific targets configuration.
The id should be unique across the whole target destination.
For example the target_id for mysql target is built from table and database"""
Example implementation of Target
interface is in fastlane.target.impl.target_athena
S3
InfluxDB
MySQL
Firehose
The fastlane.pipeline.Pipeline
class is what drives the ETL process.
It manages the source
, transform
and target
processes, and runs monitoring processes which give insight into the
performance/state of the running pipeline.
The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:
_________________ Queue ____________________ Queue ________________ load
extract | source_worker | --> [|.|.|.|] -->| transform_worker_1 | --> [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________| -------------------- ---------------- load
-->| transform_worker_2 | --> | target_worker_2 | ------>
-------------------- ---------------- load
-->| transform_worker_3 | --> | target_worker_3 | ------>
-------------------- ---------------- load
--> | target_worker_4 | ------>
----------------
Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals
such as memory usage, records loaded per second, total records loaded, queue sizes.
See fastlane.monitoring.pipeline_monitor
for more details on how thats done.
This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.
CRUD on pipelines
list pipelines
invocation of a particular pipeline
latest invocation of a particular pipeline.
records per second metrics for a particuar pipeline run.
memory usage metrics for a particular pipeline run.
logs (from cloudwatch) for a particular pipeline run.
Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.