class TextModel:
def __init__(self, ...):
...
# tokenizador
docs = [self.tokenize(d) for d in docs]
# dictionario
self.dictionary = corpora.Dictionary(docs)
corpus = [self.dictionary.doc2bow(d) for d in docs]
if self.token_min_filter != 1 or self.token_max_filter != 1.0:
...
self.dictionary.filter_extremes(no_below=self.token_min_filter,...)
# TFIDF
if self.tfidf:
self.model = TfidfModel(corpus)
else:
self.model = None
| nombre | valores | descripción |
|---|---|---|
| del-punc | yes, no | Determina si las puntuaciones deben removerse |
| del-d1 | yes, no | Determina si se deben borrar letras repetidas |
| del-diac | yes, no | Determina si los simbolos que no ocupan espacios deben ser removidos |
| lc | yes, no | Determina si los símbolos deben ser normalizados en minúsculas |
| emo | remove, group, none | Controla como deben tratarse los emoticones |
| num | remove, group, none | ........................ números |
| url | remove, group, none | ........................ urls |
| usr | remove, group, none | ........................ usuarios |
class TextModel:
def __init__(...):
...
if emo_option == OPTION_NONE:
self.emo_map = None
else:
self.emo_map = EmoticonClassifier()
...
def _tokenize(self, text):
...
# emo
if self.emo_map:
text = self.emo_map.replace(text, option=self.emo_option)
class TextModel:
def _tokenize(self, text):
...
# lc
if self.lc:
text = text.lower()
class TextModel:
def _tokenize(self, text):
...
# num
if self.num_option == OPTION_DELETE:
text = re.sub(r"\d+\.?\d+", "", text)
elif self.num_option == OPTION_GROUP:
text = re.sub(r"\d+\.?\d+", "_num", text)
# url
if self.url_option == OPTION_DELETE:
text = re.sub(r"https?://\S+", "", text)
elif self.url_option == OPTION_GROUP:
text = re.sub(r"https?://\S+", "_url", text)
# usr
if self.usr_option == OPTION_DELETE:
text = re.sub(r"@\S+", "", text)
elif self.usr_option == OPTION_GROUP:
text = re.sub(r"@\S+", "_usr", text)
class TextModel:
def _tokenize(self, text):
...
# del-punc, del-diac y del-d1
text = norm_chars(text, ...)
Los tokenizadores son en realidad una lista de tokenizadores, y están definidos tokenizer un elemento en $\wp{(\text{n-words} \cup \text{q-grams} \cup \text{skip-grams})} \setminus \{\emptyset\}$
| nombre | valores | descripción |
|---|---|---|
| n-words | $\{1,2,3\}$ | Longitud de n-gramas de palabras (n-words) |
| q-grams | $\{1,2,3,4,5,6,7\}$ | Longitud de q-gramas de caracteres) |
| skip-grams | $\{(2,1), (3, 1), (2, 2), (3, 2)\}$ | Lista de skip-grams |
class TextModel:
def _tokenize(self, text):
...
for q in self.token_list:
if isinstance(q, int):
if q < 0:
if textlist is None:
# n-words
textlist = get_word_list(text)
class TextModel:
def _tokenize(self, text):
...
for q in self.token_list:
if isinstance(q, int):
if q < 0:
...
# q-grams
expand_qgrams_word_list(textlist, abs(q), L)
else:
# q-grams
expand_qgrams(_text, q, L)
class TextModel:
def _tokenize(self, text):
...
for q in self.token_list:
if isinstance(q, int):
...
else:
if textlist is None:
textlist = get_word_list(text)
# skip-grams
expand_skipgrams_word_list(textlist, q, L)
class ClassifierWrapper(object):
def __init__(self, classifier=LinearSVC):
self.svc = classifier()
self.num_terms = -1
def fit(self, X, y):
X = corpus2csc(X).T
self.num_terms = X.shape[1]
self.svc.fit(X, y)
return self
def decision_function(self, Xnew):
Xnew = corpus2csc(Xnew, num_terms=self.num_terms).T
return self.svc.decision_function(Xnew)
def predict(self, Xnew):
Xnew = corpus2csc(Xnew, num_terms=self.num_terms).T
ynew = self.svc.predict(Xnew)
return ynew
DefaultParams = dict(
num_option=Option(),
usr_option=Option(),
url_option=Option(),
emo_option=Option(),
lc=Boolean(),
del_dup=Boolean(),
del_punc=Boolean(),
del_diac=Boolean(),
token_list=PowersetVariable([(3, 1), (2, 2), (2, 1),
-3, -2, -1, 1, 2, 3, 5, 7, 9], max_size=5),
token_min_filter=SetVariable([-1]),
token_max_filter=Fixed(1.0),
tfidf=Fixed(True))
from itertools import combinations
class PowersetVariable:
def __init__(self, initial_set, max_size=None):
self.valid_values = []
if max_size is None:
max_size = len(initial_set) // 2 + 1
for i in range(1, len(initial_set)+1):
for l in combinations(initial_set, i):
if len(l) <= max_size:
self.valid_values.append(l)
def neighborhood(self, value):
L = []
for v in value:
if isinstance(v, list):
v = tuple(v)
L.append(v)
return list(self.mismatches(set(L)))
def get_random(self):
i = np.random.randint(len(self.valid_values))
return self.valid_values[i]
BASIC_OPTIONS = [OPTION_DELETE, OPTION_GROUP, OPTION_NONE]
def Option():
return SetVariable(BASIC_OPTIONS)
def Boolean():
return SetVariable([False, True])
# búsqueda aleatoria
tabu = set() # memory for tabu search
if best_list is None:
L = []
for conf in self.sample_param_space(bsize):
code = get_filename(conf)
if code in tabu:
continue
tabu.add(code)
L.append((conf, code))
best_list = self.get_best(fun_score, L, pool=pool)
else:
for conf in best_list:
tabu.add(get_filename(conf))
def sample_param_space(self, n):
for i in range(n):
kwargs = {}
for k, v in self.params.items():
kwargs[k] = v.get_random()
yield kwargs
def search(self, fun_score, bsize=32,
hill_climbing=True, pool=None, best_list=None):
...
if hill_climbing:
_hill_climbing(['token_list'], "optimizing token_list")
...
ks = list(self.params.keys())
ks.remove('token_list')
ks.remove('token_min_filter')
ks.remove('token_max_filter')
_hill_climbing(ks, "optimizing the rest of params")
return best_list
def _hill_climbing(keywords, desc):
# second approximation, a hill climbing process
i = 0
while True:
i += 1
bscore = best_list[0]['_score']
L = []
for conf in self.expand_neighbors(best_list[0],
keywords=keywords):
code = get_filename(conf)
if code in tabu:
continue
tabu.add(code)
L.append((conf, code))
best_list.extend(self.get_best(fun_score, L,
desc=desc + " {0}".format(i), pool=pool))
best_list.sort(key=lambda x: x['_score'], reverse=True)
if bscore == best_list[0]['_score']:
break
def expand_neighbors(self, s, keywords=None):
...
vtype = self.params[k]
if isinstance(vtype, Fixed):
continue
for neighbor in vtype.neighborhood(v):
x = s.copy()
x[k] = neighbor
yield(x)
class ScoreKFoldWrapper(ScoreSampleWrapper):
def __init__(self, X, y, Xstatic=[], ystatic=[],
nfolds=5, score='macrof1',
classifier=ClassifierWrapper, random_state=None):
self.nfolds = nfolds
self.score = score
self.X = np.array(X)
self.Xstatic = Xstatic
self.le = preprocessing.LabelEncoder().fit(y)
self.y = self.le.transform(y)
if len(ystatic) > 0:
self.ystatic = self.le.transform(ystatic)
else:
self.ystatic = []
self.test_y = self.y
self.create_classifier = classifier
self.kfolds = cross_validation.StratifiedKFold(y, n_folds=nfolds,
shuffle=True,
random_state=random_state)
def __call__(self, conf_code):
conf, code = conf_code
st = time()
predY = np.zeros(len(self.y))
for train, test in self.kfolds:
A = self.X[train]
if len(self.Xstatic) > 0:
A = np.hstack((A, self.Xstatic))
textmodel = TextModel(A, **conf)
# textmodel = TextModel([self.X[i] for i in train], **conf)
trainX = [textmodel[x] for x in A]
trainY = self.y[train]
if len(self.ystatic) > 0:
trainY = np.hstack((trainY, self.ystatic))
c = self.create_classifier()
c.fit(trainX, trainY)
testX = [textmodel[self.X[i]] for i in test]
predY[test] = c.predict(testX)
self.compute_score(conf, predY)
conf['_time'] = (time() - st) / self.nfolds
return conf
class CommandLine(object):
def main(self, args=None, params=None):
...
sel = ParameterSelection(params=params)
X, y = [], []
Xstatic, ystatic = [], []
for train in self.data.training_set:
if train.startswith("static:"):
X_, y_ = read_data_labels(train[7:])
Xstatic.extend(X_)
ystatic.extend(y_)
else:
X_, y_ = read_data_labels(train)
X.extend(X_)
y.extend(y_)
...
if ratio > 1:
fun_score = ScoreKFoldWrapper(X, y, Xstatic=Xstatic,
ystatic=ystatic,
nfolds=int(ratio), score=self.data.score,
random_state=self.data.seed)
...
best_list = sel.search(
fun_score,
bsize=self.data.samplesize,
hill_climbing=self.data.hill_climbing,
pool=pool,
best_list=best_list
)
with open(self.get_output(), 'w') as fpt:
fpt.write(json.dumps(best_list, indent=2, sort_keys=True))
return best_list
class CommandLineTrain(CommandLine):
def main(self, args=None):
...
corpus, labels = [], []
for train in self.data.training_set:
X_, y_ = read_data_labels(train)
corpus.extend(X_)
labels.extend(y_)
best = param_list[0]
t = TextModel(corpus, **best)
le = LabelEncoder()
if self.data.labels:
le.fit(self.data.labels.split(','))
else:
le.fit(labels)
y = le.transform(labels)
c = ClassifierWrapper()
X = [t[x] for x in corpus]
c.fit(X, y)
with open(self.get_output(), 'wb') as fpt:
pickle.dump([t, c, le], fpt)
return [t, c, le]
class CommandLinePredict(CommandLine):
def main(self, args=None, model_svc_le=None):
...
if model_svc_le is None:
with open(self.data.model, 'rb') as fpt:
model, svc, le = pickle.load(fpt)
else:
model, svc, le = model_svc_le
veclist, afflist = [], []
for x in read_data(self.data.test_set):
v, a = model.vectorize(x)
veclist.append(v)
afflist.append(a)
L = []
hy = svc.decision_function(veclist)
hyy = le.inverse_transform(svc.predict(veclist))
for tweet, scores, klass, aff in zip(tweet_iterator(self.data.test_set),
hy, hyy, afflist):
L.append(tweet)
with open(self.get_output(), 'w') as fpt:
for tweet in L:
fpt.write(json.dumps(tweet)+"\n")
return L
class NuevoClasificador(object):
def __init__(self, classifier=LinearSVC):
self.num_terms = -1
def fit(self, X, y):
X = corpus2csc(X).T
self.num_terms = X.shape[1]
...
return self
def decision_function(self, Xnew):
Xnew = corpus2csc(Xnew, num_terms=self.num_terms).T
...
def predict(self, Xnew):
Xnew = corpus2csc(Xnew, num_terms=self.num_terms).T
...
class CommandLine(object):
def main(self, args=None, params=None):
...
sel = NuevoAlgoritmoBusqueda(params=params)
...
best_list = sel.search(
fun_score,
bsize=self.data.samplesize,
hill_climbing=self.data.hill_climbing,
pool=pool,
best_list=best_list
)
with open(self.get_output(), 'w') as fpt:
fpt.write(json.dumps(best_list, indent=2, sort_keys=True))
return best_list
def _hill_climbing(keywords, desc):
# second approximation, a hill climbing process
i = 0
while True:
i += 1
bscore = best_list[0]['_score']
L = []
## Aquí
for conf in self.expand_neighbors(best_list[0],
keywords=keywords):
code = get_filename(conf)
if code in tabu:
continue
tabu.add(code)
L.append((conf, code))
##Aquí
best_list.extend(self.get_best(fun_score, L,
desc=desc + " {0}".format(i), pool=pool))
best_list.sort(key=lambda x: x['_score'], reverse=True)
if bscore == best_list[0]['_score']:
break
# búsqueda aleatoria
tabu = set() # memory for tabu search
if best_list is None:
L = []
for conf in self.sample_param_space(bsize):
code = get_filename(conf)
if code in tabu:
continue
tabu.add(code)
L.append((conf, code))
##Aquí
best_list = self.get_best(fun_score, L, pool=pool)
else:
for conf in best_list:
tabu.add(get_filename(conf))