Skip to content

Commit

Permalink
Refactor(RawData): use spark to generate raw data (#825)
Browse files Browse the repository at this point in the history
* refactor(raw_data): refactor raw data with spark

* fix(raw_data): fix k8s spark logic

* polish(raw_data): polish some variable name

* fix(raw_data): use yaml configuration for spark

* fix(raw_data): move common dependencies out.

* feat(raw_data): add zip script

* feat(rawdata_v2): support k8s config

* fix(raw_data): fix spark k8s configuration

* fix(raw_data): use hdfs k8s config

* fix(raw_data): fix master script for rawdata

* fix(raw_data): add flatten_dict package

* fix(raw_data): remove blank lines

* feat(rawdata_v2): support long-running mode

* feat(rawdata_v2): abstract spark application class

* fix(rawdata_v2): fix long-running bug

* feat(rawdata_v2): output gzip compression type by default

* feat(rawdata_v2): use local jars for spark job

* feat(rawdata_v2): parameterize spark image

* remove(rawdata): remove unused old rawdata code

* fix(rawdata_v2): remove schema inferring

* fix(rawdata_v2): add dirty input checking

* fix(rawdata): swith master and worker function.

* fix: make output partition num not required

* fix(rawdata_v2): fix status of spark

* feat: not compression for data block

* fix k8s client bug

* fix(rawdata): support kvstore type

* refactor(rawdata): use spark api of webconsole

* feat: add progress logging

* fix: fix response processing

* [fix]: fix websocnole spark api calling.

* fix: support nas filesystem

* feat: support csv format for input/output

* fix: add output_data_format for deploy script

* fix: raw_data partition_id is wrong

* fix: format input data

* feat: filter files start with .

* fix: add validation for input data

* feat: rawdata support filter by datetime

* fix: typo

* fix: print spark log when job failed

* fix: wildcard bug of bash script

* fix: remove unused code

* feat: support oss

* feat: rawdata support aliyun oss

* fix: use etcd kvstore by default

* fix: get datatime of data block

* fix: replace CSV with CSV_DICT

* feat(raw_data): add spark speculation config

* feat: remove ununsed keys in rawdata schema

* fix: polish code

* feat(rawdata): support multiple input paths

* fix(raw_data): summary for input data manager

* fix: data-block output support multiple input dirs

* fix: polish spark api code

* fix: relaunch spark when in unknown state

* fix(rawdata): add master script for compatibility
  • Loading branch information
nolanliou authored Apr 18, 2022
1 parent bd291ca commit c27d771
Show file tree
Hide file tree
Showing 32 changed files with 2,583 additions and 2,072 deletions.
21 changes: 4 additions & 17 deletions deploy/scripts/data_portal/run_data_portal_master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,7 @@ source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
source /app/deploy/scripts/env_to_args.sh

input_file_wildcard=$(normalize_env_to_args "--input_file_wildcard" "$FILE_WILDCARD")
kvstore_type=$(normalize_env_to_args '--kvstore_type' $KVSTORE_TYPE)
files_per_job_limit=$(normalize_env_to_args '--files_per_job_limit' $FILES_PER_JOB_LIMIT)
start_date=$(normalize_env_to_args '--start_date' $START_DATE)
end_date=$(normalize_env_to_args '--end_date' $END_DATE)

python -m fedlearner.data_join.cmd.data_portal_master_service \
--listen_port=50051 \
--data_portal_name=$DATA_PORTAL_NAME \
--data_portal_type=$DATA_PORTAL_TYPE \
--output_partition_num=$OUTPUT_PARTITION_NUM \
--input_base_dir=$INPUT_BASE_DIR \
--output_base_dir=$OUTPUT_BASE_DIR \
--raw_data_publish_dir=$RAW_DATA_PUBLISH_DIR \
$input_file_wildcard $LONG_RUNNING $CHECK_SUCCESS_TAG \
$kvstore_type $SINGLE_SUBFOLDER $files_per_job_limit \
$start_date $end_date
while true; do
echo dummy master, sleep infinitly...
sleep 86400;
done
80 changes: 57 additions & 23 deletions deploy/scripts/data_portal/run_data_portal_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,64 @@ set -ex
export CUDA_VISIBLE_DEVICES=
source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
source /app/deploy/scripts/env_to_args.sh
source /app/deploy/scripts/env_to_args.sh

MASTER_POD_NAMES=`python -c 'import json, os; print(json.loads(os.environ["CLUSTER_SPEC"])["clusterSpec"]["Master"][0])'`
UPLOAD_DIR=$OUTPUT_BASE_DIR/upload
spark_entry_script="fedlearner/data_join/raw_data/raw_data.py"
push_file $spark_entry_script $UPLOAD_DIR
# create deps folder structure
DEP_FILE=deps.zip
CUR_DIR=`pwd`
TMP_DIR=`mktemp -d`
TMP_FEDLEARNER_DIR=${TMP_DIR}/fedlearner/data_join/raw_data
mkdir -p $TMP_FEDLEARNER_DIR
cp fedlearner/data_join/raw_data/common.py $TMP_FEDLEARNER_DIR
cd $TMP_DIR
touch fedlearner/__init__.py
touch fedlearner/data_join/__init__.py
touch fedlearner/data_join/raw_data/__init__.py
python /app/deploy/scripts/zip.py ${DEP_FILE} fedlearner
push_file ${DEP_FILE} ${UPLOAD_DIR}
cd $CUR_DIR
rm -rf $TMP_DIR

merger_read_ahead_size=$(normalize_env_to_args "--merger_read_ahead_size" $MERGE_READ_AHEAD_SIZE)
merger_read_batch_size=$(normalize_env_to_args "--merger_read_batch_size" $MERGE_READ_BATCH_SIZE)
input_data_file_iter=$(normalize_env_to_args "--input_data_file_iter" $INPUT_DATA_FORMAT)
compressed_type=$(normalize_env_to_args "--compressed_type" $COMPRESSED_TYPE)
read_ahead_size=$(normalize_env_to_args "--read_ahead_size" $READ_AHEAD_SIZE)
read_batch_size=$(normalize_env_to_args "--read_batch_size" $READ_BATCH_SIZE)
output_builder=$(normalize_env_to_args "--output_builder" $OUTPUT_DATA_FORMAT)
builder_compressed_type=$(normalize_env_to_args "--builder_compressed_type" $BUILDER_COMPRESSED_TYPE)
batch_size=$(normalize_env_to_args "--batch_size" $BATCH_SIZE)
input_file_wildcard=$(normalize_env_to_args "--input_file_wildcard" "$FILE_WILDCARD")
kvstore_type=$(normalize_env_to_args '--kvstore_type' $KVSTORE_TYPE)
memory_limit_ratio=$(normalize_env_to_args '--memory_limit_ratio' $MEMORY_LIMIT_RATIO)
optional_fields=$(normalize_env_to_args '--optional_fields' $OPTIONAL_FIELDS)
input_data_validation_ratio=$(normalize_env_to_args '--input_data_validation_ratio' $INPUT_DATA_VALIDATION_RATIO)


python -m fedlearner.data_join.cmd.data_portal_worker_cli \
--rank_id=$INDEX \
--master_addr=$MASTER_POD_NAMES \
$input_data_file_iter $compressed_type $read_ahead_size $read_batch_size \
$output_builder $builder_compressed_type \
$batch_size $kvstore_type $memory_limit_ratio \
$optional_fields $input_data_validation_ratio
input_format=$(normalize_env_to_args '--input_format' $INPUT_DATA_FORMAT)
files_per_job_limit=$(normalize_env_to_args '--files_per_job_limit' $FILES_PER_JOB_LIMIT)
output_type=$(normalize_env_to_args '--output_type' $OUTPUT_TYPE)
output_format=$(normalize_env_to_args '--output_format' $OUTPUT_DATA_FORMAT)
data_source_name=$(normalize_env_to_args '--data_source_name' $DATA_SOURCE_NAME)
data_block_dump_threshold=$(normalize_env_to_args '--data_block_dump_threshold' $DATA_BLOCK_DUMP_THRESHOLD)
spark_image=$(normalize_env_to_args '--spark_image' $SPARK_IMAGE)
spark_driver_cores=$(normalize_env_to_args '--spark_driver_cores' $SPARK_DRIVER_CORES)
spark_driver_memory=$(normalize_env_to_args '--spark_driver_memory' $SPARK_DRIVER_MEMORY)
spark_executor_cores=$(normalize_env_to_args '--spark_executor_cores' $SPARK_EXECUTOR_CORES)
spark_executor_memory=$(normalize_env_to_args '--spark_executor_memory' $SPARK_EXECUTOR_MEMORY)
spark_executor_instances=$(normalize_env_to_args '--spark_executor_instances' $SPARK_EXECUTOR_INSTANCES)
validation=$(normalize_env_to_args '--validation' $VALIDATION)
start_date=$(normalize_env_to_args '--start_date' $START_DATE)
end_date=$(normalize_env_to_args '--end_date' $END_DATE)
oss_access_key_id=$(normalize_env_to_args '--oss_access_key_id' $OSS_ACCESS_KEY_ID)
oss_access_key_secret=$(normalize_env_to_args '--oss_access_key_secret' $OSS_ACCESS_KEY_SECRET)
oss_endpoint=$(normalize_env_to_args '--oss_endpoint' $OSS_ENDPOINT)

python -m fedlearner.data_join.cmd.raw_data_cli \
--data_portal_name=$DATA_PORTAL_NAME \
--data_portal_type=$DATA_PORTAL_TYPE \
--output_partition_num=$OUTPUT_PARTITION_NUM \
--input_base_dir=$INPUT_BASE_DIR \
--output_base_dir=$OUTPUT_BASE_DIR \
--raw_data_publish_dir=$RAW_DATA_PUBLISH_DIR \
--upload_dir=$UPLOAD_DIR \
--web_console_url=$WEB_CONSOLE_V2_ENDPOINT \
--web_console_username=$ROBOT_USERNAME \
--web_console_password=$ROBOT_PWD \
--spark_dependent_package=$UPLOAD_DIR/${DEP_FILE} \
$input_file_wildcard $input_format $LONG_RUNNING $CHECK_SUCCESS_TAG $kvstore_type \
$SINGLE_SUBFOLDER $files_per_job_limit $output_type $output_format \
$data_source_name $data_block_dump_threshold \
$spark_image $spark_driver_cores $spark_driver_memory \
$spark_executor_cores $spark_executor_memory $spark_executor_instances \
$validation $start_date $end_date \
$oss_access_key_id $oss_access_key_secret $oss_endpoint
14 changes: 14 additions & 0 deletions deploy/scripts/env_to_args.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@ pull_code() {
cd $cwd
fi
}

push_file() {
filename=`basename $1`
target_path=$2/${filename}
if [[ $2 == "hdfs://"* ]]; then
${HADOOP_HOME}/bin/hadoop fs -mkdir -p $2
${HADOOP_HOME}/bin/hadoop fs -put -f $1 $target_path
elif [[ $2 == "oss://"* ]]; then
python -c "import tensorflow as tf; import tensorflow_io; tf.io.gfile.makedirs('$2'); open('${target_path}', 'wb').write(tf.io.gfile.GFile('$1', 'rb').read())"
else
mkdir -p $2
cp $1 $target_path
fi
}
33 changes: 33 additions & 0 deletions deploy/scripts/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import sys
import zipfile


def add_to_zip(zf, path):
if os.path.isdir(path):
for nm in os.listdir(path):
add_to_zip(zf, os.path.join(path, nm))
else: # file
zf.write(path)


def main(args=None):
import textwrap
usage = textwrap.dedent("""\
Usage:
zip.py zipfile.zip src ... # Create zipfile from sources
""")
if args is None:
args = sys.argv[1:]

if len(args) != 2:
print(usage)
sys.exit(1)

with zipfile.ZipFile(args[0], 'w') as zf:
for path in args[1:]:
add_to_zip(zf, path)


if __name__ == "__main__":
main()
29 changes: 29 additions & 0 deletions docker/dataflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
FROM registry.cn-beijing.aliyuncs.com/fedlearner/spark-py:v3.0.0
LABEL maintainer="fedlearner <[email protected]>"

USER root
ARG DEBIAN_FRONTEND=noninteractive

RUN mkdir -p /usr/share/man/man1/ && apt-get --allow-releaseinfo-change update && apt install -y software-properties-common
RUN apt-add-repository 'deb http://security.debian.org/debian-security stretch/updates main' && \
apt-get --allow-releaseinfo-change update
RUN apt install -y maven openjdk-8-jdk git \
&& apt-get clean && rm -rf /var/lib/apt/lists/*

RUN git clone https://github.com/tensorflow/ecosystem.git /opt/ecosystem

ENV ROOT_DIR /opt/ecosystem
ENV SPARK_HOME /opt/spark
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
ENV PATH ${JAVA_HOME}/bin:${PATH}
ENV PYSPARK_PYTHON=/usr/bin/python3
ENV PYSPARK_DRIVER_PYTHON=/usr/bin/python3

# NOTE: scala version is 2.12
RUN cd ${ROOT_DIR}/hadoop && mvn versions:set -DnewVersion=1.15.0 && mvn clean install -DskipTests && cp target/tensorflow-hadoop-1.15.0.jar ${SPARK_HOME}/jars/
RUN cd ${ROOT_DIR}/spark/spark-tensorflow-connector && mvn versions:set -DnewVersion=1.15.0 && mvn clean install -DskipTests && cp target/spark-tensorflow-connector_2.12-1.15.0.jar ${SPARK_HOME}/jars/ \
&& rm -rf /opt/ecosystem

COPY requirements.txt /opt/env/requirements.txt
RUN pip3 install -U pip -i https://pypi.tuna.tsinghua.edu.cn/simple \
&& pip3 install -r /opt/env/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
3 changes: 3 additions & 0 deletions docker/dataflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tensorflow==1.15.3
cityhash
psutil==5.8.0
16 changes: 15 additions & 1 deletion fedlearner/common/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import logging
import time

import pytz

Expand Down Expand Up @@ -253,7 +254,7 @@ def convert_time_string_to_datetime(value):


def set_logger():
verbosity = os.environ.get('VERBOSITY', 1)
verbosity = int(os.environ.get('VERBOSITY', 1))
if verbosity == 0:
logging.getLogger().setLevel(logging.WARNING)
elif verbosity == 1:
Expand All @@ -270,3 +271,16 @@ def time_diff(minuend, sub):
ts_minuend = convert_to_datetime(minuend, enable_tz=False).timestamp()
ts_sub = convert_to_datetime(sub, enable_tz=False).timestamp()
return ts_minuend - ts_sub


class Timer:
def __init__(self, content):
self._content = content
self._start_time = 0

def __enter__(self):
self._start_time = time.time()

def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("%s takes %s second", self._content,
time.time() - self._start_time)
121 changes: 0 additions & 121 deletions fedlearner/data_join/cmd/data_portal_master_service.py

This file was deleted.

Loading

0 comments on commit c27d771

Please sign in to comment.