LSTM (Long Short-Term Memory) from scratch¶
LSTM stands for Long Short-Term Memory, which is a type of artificial recurrent neural network (RNN) architecture. LSTMs are well-suited to classify, process, and predict time series data since there can be lags of unknown duration between important events in a time series.
Key Characteristics:¶
- LSTMs are designed to avoid the long-term dependency problem where the model would forget the information from earlier in the sequence.
- They are capable of learning long-term dependencies.
How It Works:¶
- Cell State: Responsible for carrying information across time steps in the network.
- Gates: LSTM has three gates to control the information flow:
- Forget Gate: Decides what information to throw away from the cell state, using a sigmoid function.
- Input Gate: Decides which new information to be used to update the cell state using a sigmoid function and a tanh function to produce new candidate values.
- Output Gate: Finally, the output gate decides which information from the cell state will be output based on a sigmoid function, filtering the current cell state which is passed through a tanh function to regulate the output.
Resources:¶
- This post provides an excellent explanation.
All formula involved
$$ \begin{aligned} f_t &= \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) \\ i_t &= \sigma(W_i \cdot [h_{t-1}, x_t] + b_i) \\ \hat{C}_t &= \tanh(W_c \cdot [h_{t-1}, x_t] + b_c) \\ C_t &= f_t \cdot C_{t-1} + i_t \cdot \hat{C}_t \\ o_t &= \sigma(W_o \cdot [h_{t-1}, x_t] + b_o) \\ h_t &= o_t \cdot \tanh(C_t) \end{aligned} $$
Finally, $y_t=W_y\cdot h_t + b_t$
Prepare data set¶
Let's create a toy problem to be solved by LSTM
# Import necessary libraries
import re
import numpy as np
# Create a simple dataset for an NLP problem (e.g., predict next word)
texts = " ".join(
[
"I love this game, it's fantastic!",
"What a terrible game. It's too difficult. I hated it.",
"Absolutely loved the game, will beat it again.",
"Not a fan of this type of games. It is too hard",
"The game was okay, neither hard nor easy.",
"Fantastic artwork and music, really enjoyed it!",
"Worst game ever, such a torture. would not recommend.",
"Brilliant art and great gameplay.",
"Awful game, it was a waste of money. I can't even finish it.",
"Amazing story and stunning artwork.",
"this game is one of the best games i have played all i can complain about it is at the start to dificult and its to cheap",
"Perfection, Team Cherry does it again. Absolutely love the world and all the characters you can meet and all the environments (except Bilewater)",
"Love the new charm system (tools) and being able to swap out movesets (crests).",
"Tons of content and incredibly fun, absolutely recommend.",
"Really fun but really difficult and a step up from hollow knight. My favorite area: bell hart. My least favorite area: Putrified ducks. and bilewa",
"To prevent myself from imploding due to my love for this game along with its prequel",
"i hate playing this game on a keyboard",
"peak gaming storyline is excellent",
]
).lower()
texts = re.sub(r"[^a-z\s]", "", texts)
chars = set(texts)
data_size, char_size = len(texts), len(chars)
print(f"Data size: {data_size}, Char Size: {char_size}")
char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}
Data size: 1129, Char Size: 27
# Define the one hot encoding function
def one_hot_encode(word, char_to_idx, char_size):
one_hot_matrix = np.zeros((len(word), char_size), dtype=int)
# Fill the matrix with one hot encoded values
for i, char in enumerate(word):
idx = char_to_idx[char]
one_hot_matrix[i, idx] = 1
return one_hot_matrix
# Define the function to reverse one hot encoding
def decode_one_hot(one_hot_matrix, idx_to_char):
decoded_word = ""
for one_hot_vector in one_hot_matrix:
idx = np.argmax(one_hot_vector)
decoded_word += idx_to_char[idx]
return decoded_word
# Example usage
word = "hello"
encoded_word = one_hot_encode(word, char_to_idx, char_size)
decoded_word = decode_one_hot(encoded_word, idx_to_char)
print(f"One Hot Encoding of '{word}':\n{encoded_word}")
print(f"Decoded Word: {decoded_word}")
One Hot Encoding of 'hello': [[0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]] Decoded Word: hello
encoded_matrix = one_hot_encode(texts, char_to_idx, char_size)
# Split data into training, validation, and test sets
from sklearn.model_selection import train_test_split
# Assume each character is a sample, split them
train, temp = train_test_split(encoded_matrix, test_size=0.4, random_state=42)
val, test = train_test_split(temp, test_size=0.5, random_state=42)
print(
f"Training set shape: {train.shape}, Validation set shape: {val.shape}, Test set shape: {test.shape}"
)
Training set shape: (677, 27), Validation set shape: (226, 27), Test set shape: (226, 27)
X_train, Y_train = train[:-1], train[1:]
X_val, Y_val = val[:-1], val[1:]
X_test, Y_test = test[:-1], test[1:]
Base Model¶
The base model is naive and can only predict via forward propagation. This is a random model.
class BaseLSTM:
char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}
@staticmethod
def xavier_init(input_size, output_size):
return np.random.uniform(-1, 1, (input_size, output_size)) * np.sqrt(
6 / (input_size + output_size)
)
def __init__(self, input_size, hidden_size, output_size=27):
# Initialize random weights and biases
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
combined_size = input_size + hidden_size
self.combined_size = combined_size
# forget gate
# W_f is of size [input_size, hidden_size]
# so that multiplication of
self.W_f = self.xavier_init(hidden_size, combined_size)
self.b_f = np.zeros((hidden_size, 1))
# input gate
self.W_i = self.xavier_init(hidden_size, combined_size)
self.b_i = np.zeros((hidden_size, 1))
# candidate gate
self.W_C = self.xavier_init(hidden_size, combined_size)
self.b_C = np.zeros((hidden_size, 1))
# output gate
self.W_o = self.xavier_init(hidden_size, combined_size)
self.b_o = np.zeros((hidden_size, 1))
# output layer
self.W_y = self.xavier_init(output_size, hidden_size)
self.b_y = np.zeros((output_size, 1))
self.states = {
"combined": {},
"input_gates": {},
"forget_gates": {},
"candidate_gates": {},
"output_gates": {},
"outputs": {},
"hidden": {},
"cell": {},
}
def reset_states(self):
for k in self.states.keys():
self.states[k] = {}
# set an initial prev state to avoid cold start out of index exception
self.states["hidden"][-1] = np.zeros(self.hidden_size).reshape(-1, 1)
self.states["cell"][-1] = np.zeros(self.hidden_size).reshape(-1, 1)
@staticmethod
def sigmoid(x, derivative=False):
if derivative:
return x * (1 - x)
return 1 / (1 + np.exp(-x))
@staticmethod
def tanh(x, derivative=False):
if derivative:
return 1 - x**2
return np.tanh(x)
@staticmethod
def softmax(y):
return np.exp(y) / np.sum(np.exp(y))
def predict_scores(self, X):
self.reset_states()
pred = []
for t in range(len(X)):
# dimension check
# X.shape = [1, input_size (or vocab size)]
# combined.shape = [1, input_size + hidden_size]
combined = np.concat((self.states["hidden"][t - 1], X[t].reshape(-1, 1)))
self.states["combined"][t] = combined
# Forget gate:
# W_f: [hidden_size, input_size + hidden_size]
# combined: [1, input_size + hidden_size]
# b_f: [hidden_size, 1]
# Expect f_t: [50,1]
self.states["forget_gates"][t] = self.sigmoid(
self.W_f @ combined + self.b_f
)
# Input gate:
# W_i: [hidden_size, input_size + hidden_size]
# b_i: [hidden_size, 1]
# Expect i_t: [50,1]
self.states["input_gates"][t] = self.sigmoid(self.W_i @ combined + self.b_i)
# Candidate gate:
# W_C: [hidden_size, input_size + hidden_size]
# b_C: [hidden_size, 1]
# Expect C_hat_t: [50,1]
self.states["candidate_gates"][t] = self.tanh(
self.W_C @ combined + self.b_C
)
# Update cell state:
# C_t = f_t * C_prev + i_t * C_hat_t
# f_t: [50,1]
# C_prev: [50,1]
# i_t: [50,1]
# C_hat_t: [50,1]
# Expect C_t: [50,1]
self.states["cell"][t] = (
self.states["forget_gates"][t] * self.states["cell"][t - 1]
+ self.states["input_gates"][t] * self.states["candidate_gates"][t]
)
# Output gate:
# W_o: [hidden_size, input_size + hidden_size]
# b_o: [hidden_size, 1]
# Expect o_t: [output_size, 1]
self.states["output_gates"][t] = self.sigmoid(
self.W_o @ combined + self.b_o
)
# Output gate:
# W_o: [hidden_size, input_size + hidden_size]
# b_o: [hidden_size, 1]
# Expect o_t: [output_size, 1]
self.states["output_gates"][t] = self.sigmoid(
self.W_o @ combined + self.b_o
)
# h_t = o_t * tanh(C_t)
self.states["hidden"][t] = self.states["output_gates"][t] * self.tanh(
self.states["cell"][t]
)
# W_y: [output_size, hidden_size]
# h_t: [hidden_size, 1]
# b_y: [output_size, 1]
output = self.softmax(self.W_y @ self.states["hidden"][t] + self.b_y)
pred.append(output)
return pred
def predict(self, X):
scores = self.predict_scores(X)
decoded_text = ""
for score in scores:
idx = np.argmax(score)
decoded_text += idx_to_char[idx]
return decoded_text
How good is a random model?¶
Try a hidden layer size of 50. The prediction is indeed terrible as expected!
# Initialize the model
hidden_size = 50 # Example hidden size
input_size = char_size # We are using one-hot encoded characters as inputs
model = BaseLSTM(input_size, hidden_size)
pred = model.predict(X_train)
pred
'm'
Backpropagation¶
The fun part. Skip the math here since it's hard, but if you are up for it, here's an article. The key things to know are:
- Loss function. This is the standard loss function for probability prediction. At time t, the loss is $L_t = \sum_{i=1}^{m} -y_{i,t} \log(\hat{y}_{i,t})$, where m is the dimension of the output. In our case it's the vocabulary size of one-hot encoding, 27.
- Total loss. The total loss going back from the last step to the current step is $L_{total} = \sum_{t=1}^{n} L_t$. This creates a dependency between neighbor steps. Partial derivatives goes from next to previous.
class LSTM(BaseLSTM):
def __init__(self, input_size, hidden_size, learning_rate, output_size=27):
super().__init__(input_size, hidden_size, output_size)
self.learning_rate = learning_rate
def backpropagation(self, errors, X):
dW_f = np.zeros_like(self.W_f)
dW_i = np.zeros_like(self.W_i)
dW_C = np.zeros_like(self.W_C)
dW_o = np.zeros_like(self.W_o)
dW_y = np.zeros_like(self.W_y)
db_f = np.zeros_like(self.b_f)
db_i = np.zeros_like(self.b_i)
db_C = np.zeros_like(self.b_C)
db_o = np.zeros_like(self.b_o)
db_y = np.zeros_like(self.b_y)
dh_next = np.zeros((self.hidden_size, 1))
dC_next = np.zeros((self.hidden_size, 1))
for t in reversed(range(len(errors))):
# Error at step t
e = errors[t]
# Going back in the reverse order
# Output layer
dW_y += e @ self.states['hidden'][t].T
db_y += e
# TODO Hidden state
dh = self.W_y @ e + dh_next
# TODO Output gate
do = dh * np.tanh(self.C[t])
# TODO Cell state
dC = dh * self.o[t] * (1 - np.tanh(self.C[t]) ** 2) + dC_next
# TODO Gradient of loss w.r.t C_bar_t (candidate gate)
dC_bar = dC * self.i[t]
# TODO Gradient of loss w.r.t f_t (forget gate)
df = dC * self.C[t - 1]
# TODO Gradient of loss w.r.t i_t (input gate)
di = dC * self.C_bar[t]
# TODO Update weight gradient for output gate and corresponding bias
dW_o += np.dot(do * self.o[t] * (1 - self.o[t]), np.hstack((self.h[t - 1], X[t])).T)
db_o += do * self.o[t] * (1 - self.o[t])
# TODO Update weight gradient for input gate and corresponding bias
dW_i += np.dot(di * self.i[t] * (1 - self.i[t]), np.hstack((self.h[t - 1], X[t])).T)
db_i += di * self.i[t] * (1 - self.i[t])
# TODO Update weight gradient for candidate memory cell and corresponding bias
dW_C += np.dot(dC_bar * (1 - self.C_bar[t] ** 2), np.hstack((self.h[t - 1], X[t])).T)
db_C += dC_bar * (1 - self.C_bar[t] ** 2)
# TODO Update weight gradient for forget gate and corresponding bias
dW_f += np.dot(df * self.f[t] * (1 - self.f[t]), np.hstack((self.h[t - 1], X[t])).T)
db_f += df * self.f[t] * (1 - self.f[t])
# TODO Calculate dh_next for backwards pass
dh_next = (
np.dot(self.W_f[:, : self.hidden_size].T, df)
+ np.dot(self.W_i[:, : self.hidden_size].T, di)
+ np.dot(self.W_C[:, : self.hidden_size].T, dC_bar)
+ np.dot(self.W_o[:, : self.hidden_size].T, do)
)
# TODO Calculate dC_next for backwards pass
dC_next = dC * self.f[t]
# TODO Update weights and biases
self.W_f -= self.learning_rate * dW_f
self.b_f -= self.learning_rate * db_f
self.W_i -= self.learning_rate * dW_i
self.b_i -= self.learning_rate * db_i
self.W_C -= self.learning_rate * dW_C
self.b_C -= self.learning_rate * db_C
self.W_o -= self.learning_rate * dW_o
self.b_o -= self.learning_rate * db_o
self.W_y -= self.learning_rate * dW_y
self.b_y -= self.learning_rate * db_y