-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprototype.py
139 lines (112 loc) · 4.22 KB
/
prototype.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import urllib.request
import torch
import torch.nn as nn
from torch.nn import functional as F
batch_size = 32 # number of parallel independent sequences to run
chunk_size = 8 # max size of the chunks to run algo on
max_iters = 3000 # max times to run algo
eval_interval = 300
learning_rate = 1e-2
# ability to run on gpu if the machine has it (much faster)
device ='cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embed = 32
url = 'http://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
# get data from Shakespeare text
urllib.request.urlretrieve(url, filename="input.txt")
with open('input.txt', 'r', encoding='utf-8') as f:
text = f.read()
# sort all unique characters into a list
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create simple encryption from integer to character
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])
# train and test splits
# 80% as training data
# 20% as testing data
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]
# load the data
# inputs are x, targets are y
def get_batch(split):
# designates which data to look at
data = train_data if split == 'train' else val_data
ix = torch.randint(len(data) - chunk_size, (batch_size,))
# stack a bunch of torch rows on top to get a tensor matrix
x = torch.stack([data[i:i+chunk_size] for i in ix])
y = torch.stack([data[i+1:i+chunk_size+1] for i in ix])
return x, y
# used to perform validation and blocks leaks from test model
# disables gradients temporarily
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ['train', 'val']:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
x, y = get_batch(split)
logits, loss = model(x, y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
# simple bigram model
class BigramLanguageModel(nn.Module):
def __init__(self):
super().__init__()
self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
self.lm_head = nn.Linear(n_embed, vocab_size)
def forward(self, idx, targets=None):
B, T = idx.shape
# add token and positional embeds
# get logits based off of that
token_embed = self.token_embedding_table(idx)
logits = self.lm_head(token_embed)
# converting B, T, C into B, C, T:
# loss is the penalty for making a bad guess
if targets is None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B*T, C)
targets = targets.view(B*T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
for _ in range(max_new_tokens):
# get predictions
logits, loss = self(idx)
logits = logits[:, -1, :]
# apply softmax to get probabilities - converts vector into vector of possibilities
probs = F.softmax(logits, dim=-1)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1)
# append sampled index to the running sequence
idx = torch.cat((idx, idx_next), dim=1)
return idx
# defining model to use
model = BigramLanguageModel()
m = model.to(device)
# using Adam python optimizer
optimizer = torch.optim.AdamW(m.parameters(), lr=learning_rate)
for iter in range(max_iters):
# evaluate the loss on train and val sets once in a while
if iter % eval_interval == 0:
losses = estimate_loss()
print(f"Step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
# sample a batch of data
xb, yb = get_batch('train')
# evaluate the loss
logits, loss = m(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))