Skip to content

Commit

Permalink
Update llama-2 with vllm
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxusmusti committed Feb 27, 2024
1 parent ef6b3db commit 230d495
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 43 deletions.
63 changes: 35 additions & 28 deletions language/gpt-j/SUT.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ def stream_api(self, input, response_ids, idx):
'model_id': 'GPT-J',
'inputs': input,
'parameters': {
'max_new_tokens': 1024,
'min_new_tokens': 1,
'max_new_tokens': 128,
'min_new_tokens': 30,
'decoding_method': "GREEDY"
},
}
Expand Down Expand Up @@ -512,31 +512,38 @@ def stream_api_vllm(self, input, response_ids, idx):
'stream': True,
'logprobs': 1
}

token_cache = []
s = requests.Session()
first = True
with s.post(
f'{self.api_servers[idx]}/v1/completions',
headers=headers,
json=json_data,
verify=False,
stream=True
) as resp:
for line in resp.iter_lines():
if line:
decoded = line.decode()
if decoded.startswith("data") and "[DONE]" not in decoded:
inter = json.loads(decoded[6:])["choices"][0]["logprobs"]
if "top_logprobs" in inter:
token_s = list(inter["top_logprobs"][0].keys())[0]
token = self.gpt_vocab[token_s]
if first:
self.first_token_queue.put((token, response_ids[0]))
first = False
else:
token_cache.append(token)
return token_cache

while True:
try:
token_cache = []
s = requests.Session()
first = True
with s.post(
f'{self.api_servers[idx]}/v1/completions',
headers=headers,
json=json_data,
verify=False,
stream=True
) as resp:
for line in resp.iter_lines():
if line:
decoded = line.decode()
if decoded.startswith("data") and "[DONE]" not in decoded:
inter = json.loads(decoded[6:])["choices"][0]["logprobs"]
if "top_logprobs" in inter:
token_s = list(inter["top_logprobs"][0].keys())[0]
token = self.gpt_vocab[token_s]
if first:
self.first_token_queue.put((token, response_ids[0]))
first = False
else:
token_cache.append(token)
s.close()
return token_cache
except:
s.close()
print("Connection failure")


def async_process_query(self, input_ids_tensor, qitem_id, idx):
decoded = input_ids_tensor
Expand All @@ -554,7 +561,7 @@ def async_process_query(self, input_ids_tensor, qitem_id, idx):
response = [lg.QuerySampleResponse(
qitem_id, bi[0], bi[1], n_tokens)]
lg.QuerySamplesComplete(response)
sys.exit()
return

def process_queries(self):
"""Processor of the queued queries. User may choose to add batching logic """
Expand Down
3 changes: 2 additions & 1 deletion language/gpt-j/api-endpoint-artifacts/benchmark.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: mlperf-inference-gpt
name: mlperf-inference-gpt-2
spec:
restartPolicy: Never
containers:
- name: mlperf-env
image: quay.io/meyceoz/mlperf-inference-gpt:v2
resources:
requests:
cpu: 140
memory: 20000Mi
volumeMounts:
- mountPath: /dev/shm
Expand Down
20 changes: 20 additions & 0 deletions language/gpt-j/api-endpoint-artifacts/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
kind: Service
apiVersion: v1
metadata:
name: vllm
labels:
app: vllm
spec:
clusterIP: None
ipFamilies:
- IPv4
ports:
- name: http
protocol: TCP
port: 8000
targetPort: http
type: ClusterIP
ipFamilyPolicy: SingleStack
sessionAffinity: None
selector:
app: vllm
4 changes: 2 additions & 2 deletions language/gpt-j/api-endpoint-artifacts/serving-vllm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ spec:
protocol: TCP
resources: # configure as required
requests:
cpu: 24
cpu: 12
memory: 128Gi
nvidia.com/gpu: 1
limits:
cpu: 24
cpu: 12
memory: 128Gi
nvidia.com/gpu: 1
volumeMounts:
Expand Down
111 changes: 111 additions & 0 deletions language/gpt-j/api-endpoint-artifacts/standalone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
kind: Deployment
apiVersion: apps/v1
metadata:
annotations:
deployment.kubernetes.io/revision: '8'
resourceVersion: '249593'
name: vllm
generation: 24
namespace: gpt-service
labels:
app: vllm
spec:
replicas: 1
selector:
matchLabels:
app: vllm
template:
metadata:
creationTimestamp: null
labels:
app: vllm
spec:
restartPolicy: Always
schedulerName: default-scheduler
affinity: {}
terminationGracePeriodSeconds: 120
securityContext: {}
containers:
- resources:
limits:
cpu: '12'
memory: 128Gi
nvidia.com/gpu: '1'
requests:
cpu: '12'
memory: 128Gi
nvidia.com/gpu: '1'
# readinessProbe:
# httpGet:
# path: /health
# port: http
# scheme: HTTP
# timeoutSeconds: 5
# periodSeconds: 30
# successThreshold: 1
# failureThreshold: 3
terminationMessagePath: /dev/termination-log
name: server
# livenessProbe:
# httpGet:
# path: /health
# port: http
# scheme: HTTP
# timeoutSeconds: 8
# periodSeconds: 100
# successThreshold: 1
# failureThreshold: 3
securityContext:
capabilities:
drop:
- ALL
runAsNonRoot: true
allowPrivilegeEscalation: false
seccompProfile:
type: RuntimeDefault
ports:
- name: http
containerPort: 8000
protocol: TCP
imagePullPolicy: IfNotPresent
# startupProbe:
# httpGet:
# path: /health
# port: http
# scheme: HTTP
# timeoutSeconds: 1
# periodSeconds: 30
# successThreshold: 1
# failureThreshold: 24
volumeMounts:
- name: models-cache
mountPath: /models-cache
- name: shm
mountPath: /dev/shm
terminationMessagePolicy: File
image: 'quay.io/rh-aiservices-bu/vllm-openai-ubi9:0.3.1-fix-2939'
args:
- '--model'
- /models-cache/gpt-model-info/
# - EleutherAI/gpt-j-6b
- '--download-dir'
- /models-cache
- '--dtype'
- float16
volumes:
- name: models-cache
persistentVolumeClaim:
claimName: vllm-model-cache
- name: shm
emptyDir:
medium: Memory
sizeLimit: 1Gi
dnsPolicy: ClusterFirst
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
strategy:
type: Recreate
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
2 changes: 1 addition & 1 deletion language/gpt-j/user.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# The key has the format 'model.scenario.key'. Value is mostly int64_t.
# Model maybe '*' as wildcard. In that case the value applies to all models.
# All times are in milli seconds
*.Server.target_qps = 120
*.Server.target_qps = 132
*.Offline.min_query_count = 93576
77 changes: 75 additions & 2 deletions language/llama2-70b/SUT.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __init__(self,
additional_servers=[],
grpc=False,
batch_grpc=False,
vllm=False,
dtype="bfloat16",
device="cpu",
batch_size=None,
Expand All @@ -117,6 +118,9 @@ def __init__(self,
self.api_model_name = api_model_name
self.grpc = grpc
self.batch_grpc = batch_grpc
self.vllm = vllm
if self.vllm and (self.grpc or self.batch_grpc):
sys.exit("vllm does not support grpc")
self.device = device

if not batch_size:
Expand Down Expand Up @@ -203,6 +207,26 @@ def query_api(self, input, idx):
print("connection failure")
return json.loads(response.text)["generated_text"]

def query_api_vllm(self, inputs, idx):
headers = {
'Content-Type': 'application/json',
}

json_data = {
'model': '/mnt/models/',
'prompt': inputs,
'max_tokens': 128,
'temperature': 0,
}

response_code = 0
while response_code != 200:
try:
response = requests.post(f'{self.api_servers[idx]}/v1/completions', headers=headers, json=json_data, verify=False)
response_code = response.status_code
except:
print("connection failure")
return [resp["text"] for resp in json.loads(response.text)["choices"]]

def query_api_grpc(self, input, idx):
resp = self.grpc_clients[idx].make_request([input], model_id=self.api_model_name)
Expand All @@ -219,6 +243,8 @@ def api_action_handler(self, chunk, server_idx):
else:
with ThreadPoolExecutor(max_workers=len(chunk)) as executor:
output = list(executor.map(self.query_api_grpc,chunk, repeat(server_idx)))
elif self.vllm:
output = self.query_api_vllm(chunk, server_idx)
else:
with ThreadPoolExecutor(max_workers=len(chunk)) as executor:
output = list(executor.map(self.query_api,chunk, repeat(server_idx)))
Expand Down Expand Up @@ -398,9 +424,9 @@ def __del__(self):


class SUTServer(SUT):
def __init__(self, model_path=None, api_server=None, additional_servers=[], api_model_name=None, grpc=False, batch_grpc=False, dtype="bfloat16", device="cpu", total_sample_count=24576, dataset_path=None, workers=1):
def __init__(self, model_path=None, api_server=None, additional_servers=[], api_model_name=None, grpc=False, batch_grpc=False, vllm=False, dtype="bfloat16", device="cpu", total_sample_count=24576, dataset_path=None, workers=1):

super().__init__(model_path=model_path, api_server=api_server, additional_servers=additional_servers, api_model_name=api_model_name, grpc=grpc, dtype=dtype, device=device, total_sample_count=total_sample_count, dataset_path=dataset_path, workers=workers)
super().__init__(model_path=model_path, api_server=api_server, additional_servers=additional_servers, api_model_name=api_model_name, grpc=grpc, vllm=vllm, dtype=dtype, device=device, total_sample_count=total_sample_count, dataset_path=dataset_path, workers=workers)

with open(f"{self.model_path}/tokenizer.json", 'r') as token_file:
llama_tokenizer = json.load(token_file)
Expand Down Expand Up @@ -490,12 +516,59 @@ def stream_api_grpc(self, input, response_ids, idx):
else:
token_cache.append(token)
return token_cache

def stream_api_vllm(self, input, response_ids, idx):
headers = {
'Content-Type': 'application/json',
}

json_data = {
'model': '/mnt/models/',
'prompt': input,
'max_tokens': 128,
'temperature': 0,
'stream': True,
'logprobs': 1
}

while True:
try:
token_cache = []
s = requests.Session()
first = True
with s.post(
f'{self.api_servers[idx]}/v1/completions',
headers=headers,
json=json_data,
verify=False,
stream=True
) as resp:
for line in resp.iter_lines():
if line:
decoded = line.decode()
if decoded.startswith("data") and "[DONE]" not in decoded:
inter = json.loads(decoded[6:])["choices"][0]["logprobs"]
if "top_logprobs" in inter:
token_s = list(inter["top_logprobs"][0].keys())[0]
token = self.gpt_vocab[token_s]
if first:
self.first_token_queue.put((token, response_ids[0]))
first = False
else:
token_cache.append(token)
s.close()
return token_cache
except:
s.close()
print("Connection failure")

def async_process_query(self, input_ids_tensor, qitem_id, idx):
decoded = self.tokenizer.decode(input_ids_tensor[0])
response_ids = [qitem_id]
if self.grpc:
output_tokens = self.stream_api_grpc(decoded, response_ids, idx)
elif self.vllm:
output_tokens = self.stream_api_vllm(decoded, response_ids, idx)
else:
output_tokens = self.stream_api(decoded, response_ids, idx)

Expand Down
5 changes: 3 additions & 2 deletions language/llama2-70b/api-endpoint-artifacts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ Prerequisites:
- Install the OpenShift AI model serving stack
- Add your AWS credentials to `secret.yaml` access the model files
- Apply `secret.yaml`, `sa.yaml`
- FOR CAIKIT: Apply `serving-runtime.yaml`, then finally `model.yaml`
- FOR TGIS STANDALONE: Apply `serving-tgis.yaml`, then finally `model-tgis.yaml`
- FOR CAIKIT+TGIS: Apply `serving-runtime.yaml`, then finally `model.yaml`
- FOR TGIS: Apply `serving-tgis.yaml`, then finally `model-tgis.yaml`
- FOR VLLM: Apply `serving-vllm.yaml`, then finally `model-vllm.yaml` (NOTE: we'll see if further instructions necessary based on performance)
- Create a benchmark pod using `benchmark.yaml`

In the pod, before any benchmark, first run `cd inference/language/llama2-70b`
Expand Down
Loading

0 comments on commit 230d495

Please sign in to comment.