In [ ]:
import sys, os
sys.path.append('bigartm-0.8.2-py2.7.egg')

In [ ]:
loglevel = int(os.environ.get('LOGLEVEL', '00'))
logfile = os.environ.get('LOGFILE', 'artm.log')
model_file = os.environ.get('MODELNAME', '../models/big_model.artm.mtx')
insception_classes_file = os.environ.get('INSCLASSES', '../resourses/insception-classes.tsv')
zmq_port = int(os.environ.get('ZMQPORT', '1349'))

In [ ]:
import logging

logging.basicConfig(level=loglevel)

formatter = logging.Formatter('%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S')

logger = logging.getLogger('ZMQ_ARTM')
fh = logging.FileHandler(logfile)
fh.setLevel(loglevel)
fh.setFormatter(formatter)

logger.addHandler(fh)

logger.info('Logging started.')
logger.info('Initialisation started...')

In [ ]:
logger.info('Launch parameters: ')
for k,v in [('model_file', model_file), 
            ('insception_classes_file', insception_classes_file),
            ('zmq_port', zmq_port)]:
    logger.info('  ...\t"%s" is equal to "%s"', k, v)

In [ ]:
import artm
import pandas as pd
import numpy as np
import json

import zmq

In [ ]:
from utils import sample_from, get_text_processor, prepare_for_artm, apply_box_cox
from collections import Counter

logger.info('Building text processor from %s', get_text_processor)
text_processor = get_text_processor()

In [ ]:
modalities_set = set(['classes', 'tag', 'text'])
logger.info('Possible modalities for inpus are: %s', modalities_set)

In [ ]:
logger.info('Reading class names for insception from %s', insception_classes_file)
with open(insception_classes_file) as income:
    class_mapping = dict(map(lambda l: map(str.strip, l.split('\t')), income))
class_names = [class_mapping[str(_)] for _ in range(0, 1000)]

In [ ]:
sparsed_topics = 45
smoothed_topics = 5

logger.info('Building ARTM-object...')

topics = ['good_%i'%_ for _ in range(sparsed_topics)] + ['mess_%s'%_ for _ in range(smoothed_topics)]

tm = artm.ARTM(num_topics=50, num_processors=2)

logger.info('Loading ARTM-model from %s', model_file)
tm.load(model_file)
tm.regularizers.add(artm.DecorrelatorPhiRegularizer(name='good_decor', tau=50000, topic_names=topics[:sparsed_topics]))
tm.regularizers.add(artm.DecorrelatorPhiRegularizer(name='all_decor', tau=1000, topic_names=topics[smoothed_topics:]))
tm.regularizers.add(artm.SmoothSparseThetaRegularizer(name='topic_smoother', tau=0.9, topic_names=topics[smoothed_topics:]))
tm.regularizers.add(artm.TopicSelectionThetaRegularizer(name='selector', tau=2.3, topic_names=topics))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='text_sparser', tau=-1.5, class_ids=['text']))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='tags_sparse', tau=-1, class_ids=['tag']))
tm.regularizers.add(artm.SmoothSparsePhiRegularizer(name='classes_sparser', tau=-0.1, class_ids=['classes']))
tm.regularizers.add(artm.SmoothSparseThetaRegularizer(name='topic_sparser', tau=-1.5, topic_names=topics[:sparsed_topics]))

logger.info('ARTM object is ready.')

In [ ]:
logger.info('Gathering available words for futher query checking...')
available_words = set(tm.get_phi(class_ids=['text']).index)
logger.info('There available %d words', len(available_words))

In [ ]:
def get_desc(tm, score_name, n_objcts, renamer={}):
    topic_desc = {}

    collection = tm.get_phi(class_ids=[score_name]).T.copy()
    
    if score_name in renamer:
        collection.columns = [renamer[score_name][c] for c in collection.columns]
    
    non_active_topics = collection.index[collection.sum(axis=1) < 0.5]
    logger.warn('Next topics are not active for "%s" modality: %s', score_name, list(non_active_topics))
    collection = collection[~collection.index.isin(non_active_topics)]
    
    collection.iloc[:,:] = (collection.values/collection.sum(axis=1).values[:, np.newaxis])

    for topic in collection.index:
        topic_sample = collection.loc[topic].sort_values(ascending=False)[:n_objcts]

        topic_desc[topic] = ', '.join(['%s:%3.3f'%(word, weight) for word, weight in topic_sample.iteritems()])
    return topic_desc

In [ ]:
logger.info('Building description for %s modalities', modalities_set)

desc_pack = {}

numbers = {'text': 20, 'classes': 20, 'tag': 4}

for modality in modalities_set:
    desc_pack[modality] = get_desc(tm, modality, numbers[modality], {'classes': class_mapping})

logger.info('Building description for regularizers',)

reg_info = []
for rname, obj in tm.regularizers.data.items():
    reg_info.append({'reg_type': type(obj).__name__, 'reg_name':rname, 'tau_coef':obj.tau})

model_description = {'regularizers': reg_info, 'modalities_desc': desc_pack, 'topics': tm.topic_names}

In [ ]:
logger.info('Gathering topic description with text modality...')
text_about_topics = get_desc(tm, 'text', 8, {})
logger.info('Gathering topic description with classes modality...')
classes_about_topics = get_desc(tm, 'classes', 5, {'classes': class_mapping})

def grouper(iterable, n, fillvalue=None):
    from itertools import izip_longest
    args = [iter(iterable)] * n
    return izip_longest(fillvalue=fillvalue, *args)

chars_in_line = 50

logger.info('Pregenerating descriptions of topics')
topics_desc = ['%s: %s\n<br>--------\n<br>%s'%(topic_name, 
                            '\n<br>'.join(map(lambda ls: ''.join(ls), grouper(text_about_topics.get(topic_name, ''), chars_in_line, ' '))), 
                            '\n<br>'.join(map(lambda ls: ''.join(ls), grouper(classes_about_topics.get(topic_name, ''), chars_in_line, ' '))))
                                               for topic_name in topics]

In [ ]:
for topic, row in pd.DataFrame(model_description['modalities_desc']).iterrows():
    logger.debug('%s:', topic)
    for c in row.index:
        logger.debug('\t%s: %s', c, row[c])

In [ ]:
logger.debug(model_description['regularizers'])

In [ ]:
def get_top(row, treshold=0.95, number=5):
    sorted_row = row.sort_values(ascending=False)
    
    res = []
    prob_mass = 0
    for k, val in sorted_row.iteritems():
        prob_mass+=val
        res.append((k, val))
        if prob_mass>treshold:
            break
        if len(res)==number:
            res.append(('other', (1. - prob_mass)))
            break
    
    return [(l[0], float(l[-1])) for l in res]

In [ ]:
def prepare_available_data(df_in):
    if 'text' in df_in:
        df_in.text = df_in.text.apply(text_processor).apply(Counter)

    if 'classes' in df_in:
        df_in.classes = df_in.classes.apply(np.array).apply(sample_from)
    
    return df_in

In [ ]:
class NoInfoAvailable(Exception):
    pass

In [ ]:
def filter_with_enough_data(df_in, available_words=available_words):
    logger.debug('Cheking if there available info for TM:')
    logger.debug('\t ... columns of DF: %s'% df_in.columns)
    
    if len(set(df_in.columns) & modalities_set) == 0:
        return df_in.drop(df_in.index, axis=0)
    
    logger.debug('\t ... there is exists some modality')
    
    if len(set(df_in.columns) & modalities_set) == 1 and 'text' in df_in.columns:
        logger.debug('\t ... the only modality is "text", checkng if there are some known words...')
        logger.debug('\t ... available records are: %s', list(df_in.index))
        good_idxs = df_in.text.apply(lambda wrds: len(set(wrds.keys()) & available_words) > 0)
        logger.debug('\t ... records with enouth text data are: %s', list(df_in.loc[good_idxs].index))
        return df_in.loc[good_idxs]
    
    logger.debug('\t ... there is exists image modality')
    
    return df_in

In [ ]:
def generate_insception_classes(tm, batch):
    return list(tm.transform(batch, predict_class_id='classes').T.sort_index().T\
                                .apply(lambda r: apply_box_cox(r.values), axis=0)\
                                .apply(lambda p: get_top(pd.Series(p.values, index=p.index.map(class_mapping.get)), 0.5, 7)))

In [ ]:
def generate_text_description(tm, batch):
    return list(tm.transform(batch, predict_class_id='text').T.sort_index().T\
                            .apply(lambda r: get_top(r, 0.2, 20), axis=0))

In [ ]:
def generate_tags(tm, batch):
    return list(tm.transform(batch, predict_class_id='tag').T.sort_index().T\
                            .apply(lambda r: get_top(r, 0.2, 5), axis=0))

In [ ]:
predicted_marker = 'predicted_'

views_keys = dict(text='tokens_from_raw_text', classes='classes_multinomial_params')

def transform_operation(data_in, temp_name='temp', modalities_to_generate=modalities_set):
    df_in = pd.DataFrame(data_in)

    prepare_available_data(df_in)
    
    df_in = filter_with_enough_data(df_in)
    
    if df_in.shape[0] == 0:
        raise NoInfoAvailable()
    else:
        logger.debug('There enough info for computing')
    
    batch = prepare_for_artm(df_in, temp_name)

    if 'classes' in modalities_to_generate: 
        df_in[predicted_marker + 'classes'] = generate_insception_classes(tm, batch)
 
    if 'text' in modalities_to_generate: 
        df_in[predicted_marker + 'text'] = generate_text_description(tm, batch)

    if 'tag' in modalities_to_generate: 
        df_in[predicted_marker + 'tag'] = generate_tags(tm, batch)
   

    df_in['topics'] = map(lambda (k, row): map(float, row), tm.transform(batch).T.sort_index().iterrows())
    
    outcome = []
    for u, data in df_in.iterrows():
        result = data.to_dict()
        ans = dict()
        
        ans['id'] = u
        ans['topics'] = result['topics']
        ans['topics_desc'] = topics_desc
        ans['modalities'] = dict()
        ans['view'] = dict()
        
        for predicted_field in [_ for _ in result.keys() if _.startswith(predicted_marker)]:
            ans['modalities'][predicted_field[len(predicted_marker):]] = result[predicted_field]
        
        for passed_field in set(views_keys.keys()) & set(result.keys()):
            ans['view'][views_keys[passed_field]] = result[passed_field]
            if passed_field == 'classes':
                ans['view'][views_keys[passed_field]] = {class_mapping[str(k)]:v for k,v in ans['view'][views_keys[passed_field]].items()}
            
        outcome.append(u'%s\n'% json.dumps(ans))
    
    return '[%s]'% ', \n'.join(outcome)

In [ ]:
logger.info('Starting ZMQ context')
context = zmq.Context()
socket = context.socket(zmq.REP)

bind_addr ="tcp://*:%i"%zmq_port
logger.info('Binding %s zmq socket on adress %s', 'zmq.REP', bind_addr)
socket.bind(bind_addr)

In [ ]:
logger.info('Ready to work')
logger.info('Starting infinite handler loop...')
try:
    while True:
        logger.debug('ready to recieve')
        income = socket.recv_json()
        logger.debug('income is %s', income)
        command = income.pop('command')
        logger.debug('The command is %s', command)
        if command == 'transform':
            income = income.pop('data')
            
            resp_status = None
            
            try:
                res = transform_operation(income)
                resp_status='ok'
            except NoInfoAvailable as e:
                resp_status='little_of_data'
                res = '""'
            except Exception as e:
                resp_status='unknown_error'
                res = '"%s"'%e.message
                logger.exception(e)
                
            res =  '{"status": "%s", "response": %s}'%(resp_status, res)
        elif command == 'info':
            res = json.dumps(model_description)
        else:
            logger.warn('Skipping unknown command: "%s"', command)
            continue
        logger.debug('result is %s', res)
        socket.send_string(res.decode('utf8'))
        logger.debug('answer was sent')
except Exception as e:
    logger.exception(e)
    raise