Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/mlflow-transformer #31

Merged
merged 4 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pandas==1.1.5
scikit-learn~=0.24.1
transformers==4.10.0
transformers==4.24.0
pytorch-lightning==1.7.7
pyyaml==6.0
pysftp==0.2.9
Expand Down
11 changes: 8 additions & 3 deletions src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@

from src.data_loader import REDataset, data_loader
from src.model import compute_metrics
from src.utils import get_train_valid_split, label_to_num, save_model_remote, set_mlflow_logger, set_seed
from src.utils import end_train, get_train_valid_split, label_to_num, save_model_remote, set_mlflow_logger, set_seed


def train(model_args, data_args, training_args):
# Using HfArgumentParser we can turn this class into argparse arguments to be able to specify them on the command line.
# parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
# model_args, data_args, training_args = parser.parse_args_into_dataclasses()
set_mlflow_logger()

set_seed(data_args.seed)

Expand Down Expand Up @@ -56,6 +55,12 @@ def train(model_args, data_args, training_args):
)

# train model
special_word = data_args.task_name
tracking_uri = ""
experiment_name = ""
logging_step = 100

model_id = set_mlflow_logger(special_word, tracking_uri, experiment_name, logging_step)
trainer.train()
model.save_pretrained(data_args.best_model_dir_path)
save_model_remote(special_word=data_args.task_name)
save_model_remote(experiment_name, model_id)
2 changes: 1 addition & 1 deletion src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .arguments import DataTrainingArguments, ModelArguments, get_training_args
from .control_mlflow import save_model_remote, set_mlflow_logger
from .control_mlflow import end_train, save_model_remote, set_mlflow_logger
from .get_train_valid_split import get_train_valid_split
from .preprocess import replace_symbol
from .representation import representation
Expand Down
180 changes: 98 additions & 82 deletions src/utils/control_mlflow.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import json
import os
import pickle as pickle
import uuid

import mlflow.pytorch
import mlflow
import pysftp
import yaml
from transformers.integrations import MLflowCallback


def set_mlflow_logger(tracking_uri="", experiment_name="", logging_step=0):
def end_train():
mlflow.end_run()
print("Train Finished... The model will be saved on remote")


def set_mlflow_logger(special_word="", tracking_uri="", experiment_name="", logging_step=0):
"""A function sets mlfow logger environments.

:param `tracking_uri`: A String Data that informs uri of the mlflow site.
Args:
tracking_uri (String): A String Data that informs uri of the mlflow site.
Usually uses port 5000.
:param `experiment_name`: A String Data that informs experiment name at mlflow.
experiment_name (String): A String Data that informs experiment name at mlflow.
If it doesn't exist at mlflow, it creates one using this name.
:param `logging_step`: An Integer Data sets how much steps
logging_step (String): An Integer Data sets per how much steps
"""

try:
Expand Down Expand Up @@ -47,16 +54,18 @@ def set_mlflow_logger(tracking_uri="", experiment_name="", logging_step=0):
print("Set Experiment Name:", experiment_name)
print("Set Logging Step:", logging_step)

mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
mlflow.pytorch.autolog(
log_every_n_step=logging_step,
)
model_id = special_word + "_" + uuid.uuid4().hex

os.environ["MLFLOW_TRACKING_URI"] = tracking_uri
os.environ["MLFLOW_EXPERIMENT_NAME"] = experiment_name
os.environ["MLFLOW_TAGS"] = '{"mlflow.runName": "' + model_id + '"}'
mlflow.doctor()
finally:
print("MLflow setup job finished")
return model_id


def save_model_remote(experiment_name="", special_word=""):
def save_model_remote(experiment_name="", model_id=""):
"""A function saves best model on remote storage.

Args:
Expand All @@ -65,74 +74,81 @@ def save_model_remote(experiment_name="", special_word=""):
special_word (String): A String Data that user can customize the name of the model.
User can add anything like hyper_parameter setting, user name, etc.
"""
with open("src/config/mlflow_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
print(config_data)
if experiment_name == "":
print("No input for experiment_name... import Default")
experiment_name = config_data["experiment_name"]

progressDict = {}
progressEveryPercent = 10

for i in range(0, 101):
if i % progressEveryPercent == 0:
progressDict[str(i)] = ""

def printProgressDecimal(x, y):
"""A callback function for show sftp progress log.
Source: https://stackoverflow.com/questions/24278146/how-do-i-monitor-the-progress-of-a-file-transfer-through-pysftp

Args:
x (String): Represent for to-do-data size(e.g. remained file size of pulling of getting)
y (String): Represent for total file size
"""
if (
int(100 * (int(x) / int(y))) % progressEveryPercent == 0
and progressDict[str(int(100 * (int(x) / int(y))))] == ""
):
print("{}% ({} Transfered(B)/ {} Total File Size(B))".format(str("%.2f" % (100 * (int(x) / int(y)))), x, y))
progressDict[str(int(100 * (int(x) / int(y))))] = "1"

model_id = uuid.uuid4().hex

with open("src/config/sftp_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
host = config_data["host"]
port = config_data["port"]
username = config_data["username"]
password = config_data["password"]

cnopts = pysftp.CnOpts()
cnopts.hostkeys = None

mlflow.log_artifact("src/best_model/config.json")
with pysftp.Connection(host, port=port, username=username, password=password, cnopts=cnopts) as sftp:
print("connected!!")
sftp.chdir("./mlflow_models")
try:
sftp.chdir(experiment_name)
except IOError:
sftp.mkdir(experiment_name)
sftp.chdir(experiment_name)
sftp.mkdir(special_word + "_" + model_id)
sftp.chdir(special_word + "_" + model_id)

model_url = "/mlflow_models/" + experiment_name + "/" + special_word + "_" + model_id
model_url_json = {"model_url": model_url}

with open("src/best_model/model_url.json", "w") as json_file:
json.dump(model_url_json, json_file)
mlflow.log_artifact("src/best_model/model_url.json")
sftp.put(
localpath="best_model/pytorch_model.bin",
remotepath="pytorch_model.bin",
callback=lambda x, y: printProgressDecimal(x, y),
)
sftp.put(
localpath="best_model/config.json",
remotepath="config.json",
callback=lambda x, y: printProgressDecimal(x, y),
try:
if "/" in model_id:
raise TypeError("##### special_word cannot include a character '/'")
except Exception as e:
print(e)
raise TypeError(
"Plz Check your parameter from train.py. \nThe model you trained may saved locally. \nSo, you can put on remote manually"
)
print("Model Saved on", model_url)
sftp.close()
else:
with open("src/config/mlflow_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
print(config_data)
if experiment_name == "":
print("No input for experiment_name... import Default")
experiment_name = config_data["experiment_name"]

progressDict = {}
progressEveryPercent = 10

for i in range(0, 101):
if i % progressEveryPercent == 0:
progressDict[str(i)] = ""

def printProgressDecimal(x, y):
"""A callback function for show sftp progress log.
Source: https://stackoverflow.com/questions/24278146/how-do-i-monitor-the-progress-of-a-file-transfer-through-pysftp

Args:
x (String): Represent for to-do-data size(e.g. remained file size of pulling of getting)
y (String): Represent for total file size
"""
if (
int(100 * (int(x) / int(y))) % progressEveryPercent == 0
and progressDict[str(int(100 * (int(x) / int(y))))] == ""
):
print(
"{}% ({} Transfered(B)/ {} Total File Size(B))".format(
str("%.2f" % (100 * (int(x) / int(y)))), x, y
)
)
progressDict[str(int(100 * (int(x) / int(y))))] = "1"

with open("src/config/sftp_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
host = config_data["host"]
port = config_data["port"]
username = config_data["username"]
password = config_data["password"]

cnopts = pysftp.CnOpts()
cnopts.hostkeys = None

with pysftp.Connection(host, port=port, username=username, password=password, cnopts=cnopts) as sftp:
print("connected!!")
sftp.chdir("./mlflow_models")
try:
sftp.chdir(experiment_name)
except IOError:
sftp.mkdir(experiment_name)
sftp.chdir(experiment_name)
sftp.mkdir(model_id)
sftp.chdir(model_id)

model_url = "/mlflow_models/" + experiment_name + "/" + model_id
sftp.put(
localpath="src/best_model/pytorch_model.bin",
remotepath="pytorch_model.bin",
callback=lambda x, y: printProgressDecimal(x, y),
)
sftp.put(
localpath="src/best_model/config.json",
remotepath="config.json",
callback=lambda x, y: printProgressDecimal(x, y),
)
print("Success!!! Model Saved on", model_url)
sftp.close()
finally:
print("Model Saving job Finished")