In [ ]:
import sys, os
sys.path.append('bigartm-0.8.2-py2.7.egg')
In [ ]:
loglevel = int(os.environ.get('LOGLEVEL', '00'))
logfile = os.environ.get('LOGFILE', 'artm.log')
model_file = os.environ.get('MODELNAME', '../models/big_model.artm.mtx')
insception_classes_file = os.environ.get('INSCLASSES', '../resourses/insception-classes.tsv')
zmq_port = int(os.environ.get('ZMQPORT', '1349'))
In [ ]:
import logging
logging.basicConfig(level=loglevel)
formatter = logging.Formatter('%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger('ZMQ_ARTM')
fh = logging.FileHandler(logfile)
fh.setLevel(loglevel)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info('Logging started.')
logger.info('Initialisation started...')
In [ ]:
logger.info('Launch parameters: ')
for k,v in [('model_file', model_file),
('insception_classes_file', insception_classes_file),
('zmq_port', zmq_port)]:
logger.info(' ...\t"%s" is equal to "%s"', k, v)
In [ ]:
import artm
import pandas as pd
import numpy as np
import json
import zmq
In [ ]:
from utils import sample_from, get_text_processor, prepare_for_artm, apply_box_cox
from collections import Counter
logger.info('Building text processor from %s', get_text_processor)
text_processor = get_text_processor()
In [ ]:
modalities_set = set(['classes', 'tag', 'text'])
logger.info('Possible modalities for inpus are: %s', modalities_set)
In [ ]:
logger.info('Reading class names for insception from %s', insception_classes_file)
with open(insception_classes_file) as income:
class_mapping = dict(map(lambda l: map(str.strip, l.split('\t')), income))
class_names = [class_mapping[str(_)] for _ in range(0, 1000)]
In [ ]:
sparsed_topics = 45
smoothed_topics = 5
logger.info('Building ARTM-object...')
topics = ['good_%i'%_ for _ in range(sparsed_topics)] + ['mess_%s'%_ for _ in range(smoothed_topics)]
tm = artm.ARTM(num_topics=50, num_processors=2)
logger.info('Loading ARTM-model from %s', model_file)
tm.load(model_file)
tm.regularizers.add(artm.DecorrelatorPhiRegularizer(name='good_decor', tau=50000, topic_names=topics[:sparsed_topics]))
tm.regularizers.add(artm.DecorrelatorPhiRegularizer(name='all_decor', tau=1000, topic_names=topics[smoothed_topics:]))
tm.regularizers.add(artm.SmoothSparseThetaRegularizer(name='topic_smoother', tau=0.9, topic_names=topics[smoothed_topics:]))
tm.regularizers.add(artm.TopicSelectionThetaRegularizer(name='selector', tau=2.3, topic_names=topics))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='text_sparser', tau=-1.5, class_ids=['text']))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='tags_sparse', tau=-1, class_ids=['tag']))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='classes_sparser', tau=-0.1, class_ids=['classes']))
tm.regularizers.add(artm.SmoothSparseThetaRegularizer(name='topic_sparser', tau=-1.5, topic_names=topics[:sparsed_topics]))
logger.info('ARTM object is ready.')
In [ ]:
logger.info('Gathering available words for futher query checking...')
available_words = set(tm.get_phi(class_ids=['text']).index)
logger.info('There available %d words', len(available_words))
In [ ]:
def get_desc(tm, score_name, n_objcts, renamer={}):
topic_desc = {}
collection = tm.get_phi(class_ids=[score_name]).T.copy()
if score_name in renamer:
collection.columns = [renamer[score_name][c] for c in collection.columns]
non_active_topics = collection.index[collection.sum(axis=1) < 0.5]
logger.warn('Next topics are not active for "%s" modality: %s', score_name, list(non_active_topics))
collection = collection[~collection.index.isin(non_active_topics)]
collection.iloc[:,:] = (collection.values/collection.sum(axis=1).values[:, np.newaxis])
for topic in collection.index:
topic_sample = collection.loc[topic].sort_values(ascending=False)[:n_objcts]
topic_desc[topic] = ', '.join(['%s:%3.3f'%(word, weight) for word, weight in topic_sample.iteritems()])
return topic_desc
In [ ]:
logger.info('Building description for %s modalities', modalities_set)
desc_pack = {}
numbers = {'text': 20, 'classes': 20, 'tag': 4}
for modality in modalities_set:
desc_pack[modality] = get_desc(tm, modality, numbers[modality], {'classes': class_mapping})
logger.info('Building description for regularizers',)
reg_info = []
for rname, obj in tm.regularizers.data.items():
reg_info.append({'reg_type': type(obj).__name__, 'reg_name':rname, 'tau_coef':obj.tau})
model_description = {'regularizers': reg_info, 'modalities_desc': desc_pack, 'topics': tm.topic_names}
In [ ]:
logger.info('Gathering topic description with text modality...')
text_about_topics = get_desc(tm, 'text', 8, {})
logger.info('Gathering topic description with classes modality...')
classes_about_topics = get_desc(tm, 'classes', 5, {'classes': class_mapping})
def grouper(iterable, n, fillvalue=None):
from itertools import izip_longest
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
chars_in_line = 50
logger.info('Pregenerating descriptions of topics')
topics_desc = ['%s: %s\n<br>--------\n<br>%s'%(topic_name,
'\n<br>'.join(map(lambda ls: ''.join(ls), grouper(text_about_topics.get(topic_name, ''), chars_in_line, ' '))),
'\n<br>'.join(map(lambda ls: ''.join(ls), grouper(classes_about_topics.get(topic_name, ''), chars_in_line, ' '))))
for topic_name in topics]
In [ ]:
for topic, row in pd.DataFrame(model_description['modalities_desc']).iterrows():
logger.debug('%s:', topic)
for c in row.index:
logger.debug('\t%s: %s', c, row[c])
In [ ]:
logger.debug(model_description['regularizers'])
In [ ]:
def get_top(row, treshold=0.95, number=5):
sorted_row = row.sort_values(ascending=False)
res = []
prob_mass = 0
for k, val in sorted_row.iteritems():
prob_mass+=val
res.append((k, val))
if prob_mass>treshold:
break
if len(res)==number:
res.append(('other', (1. - prob_mass)))
break
return [(l[0], float(l[-1])) for l in res]
In [ ]:
def prepare_available_data(df_in):
if 'text' in df_in:
df_in.text = df_in.text.apply(text_processor).apply(Counter)
if 'classes' in df_in:
df_in.classes = df_in.classes.apply(np.array).apply(sample_from)
return df_in
In [ ]:
class NoInfoAvailable(Exception):
pass
In [ ]:
def filter_with_enough_data(df_in, available_words=available_words):
logger.debug('Cheking if there available info for TM:')
logger.debug('\t ... columns of DF: %s'% df_in.columns)
if len(set(df_in.columns) & modalities_set) == 0:
return df_in.drop(df_in.index, axis=0)
logger.debug('\t ... there is exists some modality')
if len(set(df_in.columns) & modalities_set) == 1 and 'text' in df_in.columns:
logger.debug('\t ... the only modality is "text", checkng if there are some known words...')
logger.debug('\t ... available records are: %s', list(df_in.index))
good_idxs = df_in.text.apply(lambda wrds: len(set(wrds.keys()) & available_words) > 0)
logger.debug('\t ... records with enouth text data are: %s', list(df_in.loc[good_idxs].index))
return df_in.loc[good_idxs]
logger.debug('\t ... there is exists image modality')
return df_in
In [ ]:
def generate_insception_classes(tm, batch):
return list(tm.transform(batch, predict_class_id='classes').T.sort_index().T\
.apply(lambda r: apply_box_cox(r.values), axis=0)\
.apply(lambda p: get_top(pd.Series(p.values, index=p.index.map(class_mapping.get)), 0.5, 7)))
In [ ]:
def generate_text_description(tm, batch):
return list(tm.transform(batch, predict_class_id='text').T.sort_index().T\
.apply(lambda r: get_top(r, 0.2, 20), axis=0))
In [ ]:
def generate_tags(tm, batch):
return list(tm.transform(batch, predict_class_id='tag').T.sort_index().T\
.apply(lambda r: get_top(r, 0.2, 5), axis=0))
In [ ]:
predicted_marker = 'predicted_'
views_keys = dict(text='tokens_from_raw_text', classes='classes_multinomial_params')
def transform_operation(data_in, temp_name='temp', modalities_to_generate=modalities_set):
df_in = pd.DataFrame(data_in)
prepare_available_data(df_in)
df_in = filter_with_enough_data(df_in)
if df_in.shape[0] == 0:
raise NoInfoAvailable()
else:
logger.debug('There enough info for computing')
batch = prepare_for_artm(df_in, temp_name)
if 'classes' in modalities_to_generate:
df_in[predicted_marker + 'classes'] = generate_insception_classes(tm, batch)
if 'text' in modalities_to_generate:
df_in[predicted_marker + 'text'] = generate_text_description(tm, batch)
if 'tag' in modalities_to_generate:
df_in[predicted_marker + 'tag'] = generate_tags(tm, batch)
df_in['topics'] = map(lambda (k, row): map(float, row), tm.transform(batch).T.sort_index().iterrows())
outcome = []
for u, data in df_in.iterrows():
result = data.to_dict()
ans = dict()
ans['id'] = u
ans['topics'] = result['topics']
ans['topics_desc'] = topics_desc
ans['modalities'] = dict()
ans['view'] = dict()
for predicted_field in [_ for _ in result.keys() if _.startswith(predicted_marker)]:
ans['modalities'][predicted_field[len(predicted_marker):]] = result[predicted_field]
for passed_field in set(views_keys.keys()) & set(result.keys()):
ans['view'][views_keys[passed_field]] = result[passed_field]
if passed_field == 'classes':
ans['view'][views_keys[passed_field]] = {class_mapping[str(k)]:v for k,v in ans['view'][views_keys[passed_field]].items()}
outcome.append(u'%s\n'% json.dumps(ans))
return '[%s]'% ', \n'.join(outcome)
In [ ]:
logger.info('Starting ZMQ context')
context = zmq.Context()
socket = context.socket(zmq.REP)
bind_addr ="tcp://*:%i"%zmq_port
logger.info('Binding %s zmq socket on adress %s', 'zmq.REP', bind_addr)
socket.bind(bind_addr)
In [ ]:
logger.info('Ready to work')
logger.info('Starting infinite handler loop...')
try:
while True:
logger.debug('ready to recieve')
income = socket.recv_json()
logger.debug('income is %s', income)
command = income.pop('command')
logger.debug('The command is %s', command)
if command == 'transform':
income = income.pop('data')
resp_status = None
try:
res = transform_operation(income)
resp_status='ok'
except NoInfoAvailable as e:
resp_status='little_of_data'
res = '""'
except Exception as e:
resp_status='unknown_error'
res = '"%s"'%e.message
logger.exception(e)
res = '{"status": "%s", "response": %s}'%(resp_status, res)
elif command == 'info':
res = json.dumps(model_description)
else:
logger.warn('Skipping unknown command: "%s"', command)
continue
logger.debug('result is %s', res)
socket.send_string(res.decode('utf8'))
logger.debug('answer was sent')
except Exception as e:
logger.exception(e)
raise