Skip to content

Commit

Permalink
Merge pull request #31 from boostcampaitech4lv23nlp2/feat/mlflow-tran…
Browse files Browse the repository at this point in the history
…sformer

Feat/mlflow-transformer
  • Loading branch information
kyc3492 authored Nov 24, 2022
2 parents 2ac4275 + 0e74eb5 commit 253d64c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 87 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pandas==1.1.5
scikit-learn~=0.24.1
transformers==4.10.0
transformers==4.24.0
pytorch-lightning==1.7.7
pyyaml==6.0
pysftp==0.2.9
Expand Down
11 changes: 8 additions & 3 deletions src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@

from src.data_loader import REDataset, data_loader
from src.model import compute_metrics
from src.utils import get_train_valid_split, label_to_num, save_model_remote, set_mlflow_logger, set_seed
from src.utils import end_train, get_train_valid_split, label_to_num, save_model_remote, set_mlflow_logger, set_seed


def train(model_args, data_args, training_args):
# Using HfArgumentParser we can turn this class into argparse arguments to be able to specify them on the command line.
# parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
# model_args, data_args, training_args = parser.parse_args_into_dataclasses()
set_mlflow_logger()

set_seed(data_args.seed)

Expand Down Expand Up @@ -56,6 +55,12 @@ def train(model_args, data_args, training_args):
)

# train model
special_word = data_args.task_name
tracking_uri = ""
experiment_name = ""
logging_step = 100

model_id = set_mlflow_logger(special_word, tracking_uri, experiment_name, logging_step)
trainer.train()
model.save_pretrained(data_args.best_model_dir_path)
save_model_remote(special_word=data_args.task_name)
save_model_remote(experiment_name, model_id)
2 changes: 1 addition & 1 deletion src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .arguments import DataTrainingArguments, ModelArguments, get_training_args
from .control_mlflow import save_model_remote, set_mlflow_logger
from .control_mlflow import end_train, save_model_remote, set_mlflow_logger
from .get_train_valid_split import get_train_valid_split
from .preprocess import replace_symbol
from .representation import representation
Expand Down
180 changes: 98 additions & 82 deletions src/utils/control_mlflow.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import json
import os
import pickle as pickle
import uuid

import mlflow.pytorch
import mlflow
import pysftp
import yaml
from transformers.integrations import MLflowCallback


def set_mlflow_logger(tracking_uri="", experiment_name="", logging_step=0):
def end_train():
mlflow.end_run()
print("Train Finished... The model will be saved on remote")


def set_mlflow_logger(special_word="", tracking_uri="", experiment_name="", logging_step=0):
"""A function sets mlfow logger environments.
:param `tracking_uri`: A String Data that informs uri of the mlflow site.
Args:
tracking_uri (String): A String Data that informs uri of the mlflow site.
Usually uses port 5000.
:param `experiment_name`: A String Data that informs experiment name at mlflow.
experiment_name (String): A String Data that informs experiment name at mlflow.
If it doesn't exist at mlflow, it creates one using this name.
:param `logging_step`: An Integer Data sets how much steps
logging_step (String): An Integer Data sets per how much steps
"""

try:
Expand Down Expand Up @@ -47,16 +54,18 @@ def set_mlflow_logger(tracking_uri="", experiment_name="", logging_step=0):
print("Set Experiment Name:", experiment_name)
print("Set Logging Step:", logging_step)

mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
mlflow.pytorch.autolog(
log_every_n_step=logging_step,
)
model_id = special_word + "_" + uuid.uuid4().hex

os.environ["MLFLOW_TRACKING_URI"] = tracking_uri
os.environ["MLFLOW_EXPERIMENT_NAME"] = experiment_name
os.environ["MLFLOW_TAGS"] = '{"mlflow.runName": "' + model_id + '"}'
mlflow.doctor()
finally:
print("MLflow setup job finished")
return model_id


def save_model_remote(experiment_name="", special_word=""):
def save_model_remote(experiment_name="", model_id=""):
"""A function saves best model on remote storage.
Args:
Expand All @@ -65,74 +74,81 @@ def save_model_remote(experiment_name="", special_word=""):
special_word (String): A String Data that user can customize the name of the model.
User can add anything like hyper_parameter setting, user name, etc.
"""
with open("src/config/mlflow_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
print(config_data)
if experiment_name == "":
print("No input for experiment_name... import Default")
experiment_name = config_data["experiment_name"]

progressDict = {}
progressEveryPercent = 10

for i in range(0, 101):
if i % progressEveryPercent == 0:
progressDict[str(i)] = ""

def printProgressDecimal(x, y):
"""A callback function for show sftp progress log.
Source: https://stackoverflow.com/questions/24278146/how-do-i-monitor-the-progress-of-a-file-transfer-through-pysftp
Args:
x (String): Represent for to-do-data size(e.g. remained file size of pulling of getting)
y (String): Represent for total file size
"""
if (
int(100 * (int(x) / int(y))) % progressEveryPercent == 0
and progressDict[str(int(100 * (int(x) / int(y))))] == ""
):
print("{}% ({} Transfered(B)/ {} Total File Size(B))".format(str("%.2f" % (100 * (int(x) / int(y)))), x, y))
progressDict[str(int(100 * (int(x) / int(y))))] = "1"

model_id = uuid.uuid4().hex

with open("src/config/sftp_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
host = config_data["host"]
port = config_data["port"]
username = config_data["username"]
password = config_data["password"]

cnopts = pysftp.CnOpts()
cnopts.hostkeys = None

mlflow.log_artifact("src/best_model/config.json")
with pysftp.Connection(host, port=port, username=username, password=password, cnopts=cnopts) as sftp:
print("connected!!")
sftp.chdir("./mlflow_models")
try:
sftp.chdir(experiment_name)
except IOError:
sftp.mkdir(experiment_name)
sftp.chdir(experiment_name)
sftp.mkdir(special_word + "_" + model_id)
sftp.chdir(special_word + "_" + model_id)

model_url = "/mlflow_models/" + experiment_name + "/" + special_word + "_" + model_id
model_url_json = {"model_url": model_url}

with open("src/best_model/model_url.json", "w") as json_file:
json.dump(model_url_json, json_file)
mlflow.log_artifact("src/best_model/model_url.json")
sftp.put(
localpath="best_model/pytorch_model.bin",
remotepath="pytorch_model.bin",
callback=lambda x, y: printProgressDecimal(x, y),
)
sftp.put(
localpath="best_model/config.json",
remotepath="config.json",
callback=lambda x, y: printProgressDecimal(x, y),
try:
if "/" in model_id:
raise TypeError("##### special_word cannot include a character '/'")
except Exception as e:
print(e)
raise TypeError(
"Plz Check your parameter from train.py. \nThe model you trained may saved locally. \nSo, you can put on remote manually"
)
print("Model Saved on", model_url)
sftp.close()
else:
with open("src/config/mlflow_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
print(config_data)
if experiment_name == "":
print("No input for experiment_name... import Default")
experiment_name = config_data["experiment_name"]

progressDict = {}
progressEveryPercent = 10

for i in range(0, 101):
if i % progressEveryPercent == 0:
progressDict[str(i)] = ""

def printProgressDecimal(x, y):
"""A callback function for show sftp progress log.
Source: https://stackoverflow.com/questions/24278146/how-do-i-monitor-the-progress-of-a-file-transfer-through-pysftp
Args:
x (String): Represent for to-do-data size(e.g. remained file size of pulling of getting)
y (String): Represent for total file size
"""
if (
int(100 * (int(x) / int(y))) % progressEveryPercent == 0
and progressDict[str(int(100 * (int(x) / int(y))))] == ""
):
print(
"{}% ({} Transfered(B)/ {} Total File Size(B))".format(
str("%.2f" % (100 * (int(x) / int(y)))), x, y
)
)
progressDict[str(int(100 * (int(x) / int(y))))] = "1"

with open("src/config/sftp_config.yml") as f:
config_data = yaml.load(f, Loader=yaml.FullLoader)
host = config_data["host"]
port = config_data["port"]
username = config_data["username"]
password = config_data["password"]

cnopts = pysftp.CnOpts()
cnopts.hostkeys = None

with pysftp.Connection(host, port=port, username=username, password=password, cnopts=cnopts) as sftp:
print("connected!!")
sftp.chdir("./mlflow_models")
try:
sftp.chdir(experiment_name)
except IOError:
sftp.mkdir(experiment_name)
sftp.chdir(experiment_name)
sftp.mkdir(model_id)
sftp.chdir(model_id)

model_url = "/mlflow_models/" + experiment_name + "/" + model_id
sftp.put(
localpath="src/best_model/pytorch_model.bin",
remotepath="pytorch_model.bin",
callback=lambda x, y: printProgressDecimal(x, y),
)
sftp.put(
localpath="src/best_model/config.json",
remotepath="config.json",
callback=lambda x, y: printProgressDecimal(x, y),
)
print("Success!!! Model Saved on", model_url)
sftp.close()
finally:
print("Model Saving job Finished")

0 comments on commit 253d64c

Please sign in to comment.