Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWWS Attack #101

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
400 changes: 400 additions & 0 deletions code_soup/ch8/pwws.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions code_soup/common/text/datasets/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def dataset_mapping(x):
return {
"x": x["sentence"],
"y": 1 if x["label"] > 0.5 else 0,
}
20 changes: 20 additions & 0 deletions code_soup/common/text/models/classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from typing import List, Tuple

import numpy as np


class Classifier(ABC): # no pragma: no cover
def __init__(self):
pass

@abstractmethod
def get_prob(input_: List[str]) -> np.ndarray:
pass

@abstractmethod
def get_pred(input_: List[str]) -> np.ndarray:
pass

def get_grad(input_: List[str], labels: List[int]) -> Tuple[np.ndarray, np.ndarray]:
pass
180 changes: 180 additions & 0 deletions code_soup/common/text/models/transformers_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
Class for transformers-based classifiers. Adapted from
https://github.com/thunlp/OpenAttack/blob/master/OpenAttack/victim/classifiers/transformers.py"""
import numpy as np
import torch
import transformers

from code_soup.common.text.models.classifier import Classifier
from code_soup.common.text.utils.tokenizer import TransformersTokenizer
from code_soup.common.text.utils.word_embedding import WordEmbedding


class HookCloser:
def __init__(self, model_wrapper):
self.model_wrapper = model_wrapper

def __call__(self, module, input_, output_):
self.model_wrapper.curr_embedding = output_
output_.retain_grad()


class TransformersClassifier(Classifier):
def __init__(
self,
model: transformers.PreTrainedModel,
tokenizer: transformers.PreTrainedTokenizer,
embedding_layer,
device: torch.device = None,
max_length: int = 128,
batch_size: int = 8,
):
"""
Args:
model: Huggingface model for classification.
tokenizer: Huggingface tokenizer for classification. **Default:** None
embedding_layer: The module of embedding_layer used in transformers models. For example,
``BertModel.bert.embeddings.word_embeddings``. **Default:** None
device: Device of pytorch model. **Default:** "cpu" if cuda is not available else "cuda"
max_len: Max length of input tokens. If input token list is too long, it will be truncated. Uses None for no
truncation. **Default:** None
batch_size: Max batch size of this classifier.
"""

self.model = model

if device is None: # no pragma: no cover
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

self.to(device)

self.curr_embedding = None
self.hook = embedding_layer.register_forward_hook(HookCloser(self))
self.embedding_layer = embedding_layer

self.word2id = dict()
for i in range(tokenizer.vocab_size):
self.word2id[tokenizer.convert_ids_to_tokens(i)] = i
self.__tokenizer = tokenizer

self.embedding = embedding_layer.weight.detach().cpu().numpy()

self.token_unk = tokenizer.unk_token
self.token_unk_id = tokenizer.unk_token_id

self.max_length = max_length
self.batch_size = batch_size

@property
def tokenizer(self):
return TransformersTokenizer(self.__tokenizer) # no pragma: no cover

def to(self, device: torch.device):
"""
Args:
device: Device that moves model to.
"""
self.device = device
self.model = self.model.to(device)
return self

def get_pred(self, input_):
return self.get_prob(input_).argmax(axis=1)

def get_prob(self, input_):
return self.get_grad(
[self.__tokenizer.tokenize(sent) for sent in input_], [0] * len(input_)
)[0]

def get_grad(self, input_, labels):
v = self.predict(input_, labels)
return v[0], v[1]

def predict(self, sen_list, labels=None):
sen_list = [sen[: self.max_length - 2] for sen in sen_list]
sent_lens = [len(sen) for sen in sen_list]
batch_len = max(sent_lens) + 2

attentions = np.array(
[
[1] * (len(sen) + 2) + [0] * (batch_len - 2 - len(sen))
for sen in sen_list
],
dtype="int64",
)
sen_list = [self.__tokenizer.convert_tokens_to_ids(sen) for sen in sen_list]
tokeinzed_sen = np.array(
[
[self.__tokenizer.cls_token_id]
+ sen
+ [self.__tokenizer.sep_token_id]
+ ([self.__tokenizer.pad_token_id] * (batch_len - 2 - len(sen)))
for sen in sen_list
],
dtype="int64",
)

result = None
result_grad = None
all_hidden_states = None

if labels is None:
labels = [0] * len(sen_list)
labels = torch.LongTensor(labels).to(self.device)

for i in range((len(sen_list) + self.batch_size - 1) // self.batch_size):
curr_sen = tokeinzed_sen[i * self.batch_size : (i + 1) * self.batch_size]
curr_mask = attentions[i * self.batch_size : (i + 1) * self.batch_size]

xs = torch.from_numpy(curr_sen).long().to(self.device)
masks = torch.from_numpy(curr_mask).long().to(self.device)
outputs = self.model(
input_ids=xs,
attention_mask=masks,
output_hidden_states=True,
labels=labels[i * self.batch_size : (i + 1) * self.batch_size],
)
if i == 0:
all_hidden_states = outputs.hidden_states[-1].detach().cpu()
loss = outputs.loss
logits = outputs.logits
logits = torch.nn.functional.softmax(logits, dim=-1)
loss = -loss
loss.backward()

result_grad = self.curr_embedding.grad.clone().cpu()
self.curr_embedding.grad.zero_()
self.curr_embedding = None
result = logits.detach().cpu()
else:
all_hidden_states = torch.cat(
(all_hidden_states, outputs.hidden_states[-1].detach().cpu()), dim=0
)
loss = outputs.loss
logits = outputs.logits
logits = torch.nn.functional.softmax(logits, dim=-1)
loss = -loss
loss.backward()

result_grad = torch.cat(
(result_grad, self.curr_embedding.grad.clone().cpu()), dim=0
)
self.curr_embedding.grad.zero_()
self.curr_embedding = None

result = torch.cat((result, logits.detach().cpu()))

result = result.numpy()
all_hidden_states = all_hidden_states.numpy()
result_grad = result_grad.numpy()[:, 1:-1]
return result, result_grad, all_hidden_states

def get_hidden_states(self, input_, labels=None):
"""
:param list input_: A list of sentences of which we want to get the hidden states in the model.
:rtype torch.tensor
"""
return self.predict(input_, labels)[2]

def get_embedding(self):
return WordEmbedding(self.word2id, self.embedding)
41 changes: 41 additions & 0 deletions code_soup/common/text/utils/attack_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Utility functions for text-based attacks. Adapted from https://github.com/thunlp/OpenAttack."""


def __measure(data, adversarial_sample, metrics): # no pragma: no cover
ret = {}
for it in metrics:
value = it.after_attack(data, adversarial_sample)
if value is not None:
ret[it.name] = value
return ret


def __iter_dataset(dataset, metrics): # no pragma: no cover
for data in dataset:
v = data
for it in metrics:
ret = it.before_attack(v)
if ret is not None:
v = ret
yield v


def __iter_metrics(iterable_result, metrics): # no pragma: no cover
for data, result in iterable_result:
adversarial_sample = result
ret = {
"data": data,
"success": adversarial_sample is not None,
"result": adversarial_sample,
"metrics": {**__measure(data, adversarial_sample, metrics)},
}
yield ret


def attack_process(attacker, victim, dataset, metrics): # no pragma: no cover
def result_iter():
for data in __iter_dataset(dataset, metrics):
yield attacker(victim, data)

for ret in __iter_metrics(zip(dataset, result_iter()), metrics):
yield ret
13 changes: 13 additions & 0 deletions code_soup/common/text/utils/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Exceptions for text-based attacks."""


class AttackException(Exception):
pass


class WordNotInDictionaryException(AttackException):
pass


class UnknownPOSException(AttackException):
pass
60 changes: 60 additions & 0 deletions code_soup/common/text/utils/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Various metrics for text. Adapted from https://github.com/thunlp/OpenAttack/tree/master/OpenAttack/metric/algorithms."""
from typing import List

import torch

from code_soup.common.text.utils.tokenizer import Tokenizer


class AttackMetric(object): # no pragma: no cover
"""
Base class of all metrics.
"""

def before_attack(self, input):
return

def after_attack(self, input, adversarial_sample):
return


class Levenshtein(AttackMetric):
def __init__(self, tokenizer: Tokenizer) -> None:
"""
Args:
tokenizer: A tokenizer that will be used in this metric. Must be an instance of :py:class:`.Tokenizer`
"""
self.tokenizer = tokenizer
self.name = "Levenshtein Edit Distance"

def calc_score(self, a: List[str], b: List[str]) -> int:
"""
Args:
a: The first list.
b: The second list.
Returns:
Levenshtein edit distance between two sentences.

Both parameters can be str or list, str for char-level edit distance while list for token-level edit distance.
"""
la = len(a)
lb = len(b)
f = torch.zeros(la + 1, lb + 1, dtype=torch.long)
for i in range(la + 1):
for j in range(lb + 1):
if i == 0:
f[i][j] = j
elif j == 0:
f[i][j] = i
elif a[i - 1] == b[j - 1]:
f[i][j] = f[i - 1][j - 1]
else:
f[i][j] = min(f[i - 1][j - 1], f[i - 1][j], f[i][j - 1]) + 1
return f[la][lb].item()

def after_attack(self, input, adversarial_sample):
if adversarial_sample is not None:
return self.calc_score(
self.tokenizer.tokenize(input["x"], pos_tagging=False),
self.tokenizer.tokenize(adversarial_sample, pos_tagging=False),
)
Loading