In [83]:
import yaml
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Column, Table
from sqlalchemy import Integer, String, DateTime, Boolean
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from contextlib import contextmanager
import datetime
import sys
sys.path.append('../')
import jobs2phones as j2p
import smtplib
In [84]:
with open('../config.yaml') as f:
cf = yaml.safe_load(f)
In [6]:
#Old, for testing purposes only. Use setup_schema.py for most updated schema.
def setup_schema():
engine = create_engine('postgresql://'+cf['postgres_username']+':'+
cf['postgres_password']+'@localhost/'+cf['postgres_db'])
metadata = MetaData(bind=engine)
posts_table = Table('posts', metadata,
Column('id', String, primary_key=True),
Column('address', String(200)),
Column('body', String(10000)),
Column('date_posted', DateTime),
Column('pay',String(200)),
Column('phone_number',String(100)),
Column('new',Boolean),
Column('text',String(140)),
Column('search_criteria',String(140)),
)
users_table = Table('users', metadata,
Column('password', String),
Column('email_address', String),
Column('phone_number', String, primary_key=True, nullable=False),
Column('search_criteria', String, nullable=False),
)
# create or drops tables in database
metadata.drop_all()
metadata.create_all()
In [107]:
search_term='warehouse&20philadelphia dishwasher&20philadelphia'
search_type='lab'
directory = '../data/'+search_term+'/'
In [116]:
Base = declarative_base()
class Post(Base):
__tablename__ = 'posts'
id = Column('id', String, primary_key=True)
address = Column('address', String(200))
body = Column('body', String(10000))
date_posted = Column('date_posted', DateTime)
pay = Column('pay',String(200))
phone_number = Column('phone_number',String(100))
new = Column('new',Boolean)
text = Column('text',String(140))
search_criteria = Column('search_criteria',String(140))
class User(Base):
__tablename__ = 'users'
password = Column('password', String)
email_address = Column('email_address', String)
phone_number = Column('phone_number', String, primary_key=True)
search_criteria = Column('search_criteria', String, nullable=False)
posts_sent_count = Column('posts_sent_count',Integer)
class UserInserter(object):
def insert(self,session,email_address,phone_number,password,search_criteria):
if len(session.query(User).filter(User.phone_number==phone_number).all())==0:
new_user = User(email_address=email_address, phone_number=phone_number,password=password,
search_criteria=search_criteria,posts_sent_count=0)
session.merge(new_user)
class UserDeleter(object):
def delete(self,session,phone_number):
user = session.query(User).filter(User.phone_number==phone_number)[0]
session.delete(user)
class UserReader(object):
def read(self,session, search_criteria):
users = []
for i,user in enumerate(session.query(User).
filter(User.search_criteria.like('%'+search_criteria+'%'))):
users.append(user.phone_number)
return users
class UserEditer(object):
def edit(self,session,phone_number,search_criteria):
user = session.query(User).filter(User.phone_number==phone_number)[0]
user.search_criteria = search_criteria
session.merge(user)
class UserExistChecker(object):
def check(self,session,phone_number):
if session.query(User).filter(User.phone_number==phone_number).count() == 0:
return False
else:
return True
class PostInserter(object):
def insert(self,session,row):
if len(session.query(Post).filter(Post.id==row['post_id']).all())==0:
new_post = Post(id=row['post_id'], address=row['address'], body=row['body'],
date_posted=datetime.datetime.strptime(row['date_posted'], "%Y-%m-%d %I:%M%p") ,
pay=row['pay'],phone_number=row['phone_number'], new=row['new'], text=row['text'],
search_criteria=search_term)
session.add(new_post)
class PostReader(object):
def read(self,session):
posts = []
for i,post in enumerate(session.query(Post).filter(Post.new==True)):
posts.append([post.text,post.search_criteria])
return posts
class PostMarker(object):
def mark(self,session,post):
updated_post = session.query(Post).filter(Post.text==post)[0]
updated_post.new = False
session.merge(updated_post)
class PostsCountIncrementer(object):
def increment(self,session,phone_number):
user = session.query(User).filter(User.phone_number==phone_number)[0]
user.posts_sent_count = user.posts_sent_count + 1
session.merge(user)
@contextmanager
def session_scope():
"""Provide a transactional scope around a series of operations."""
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
def mark_post(post):
with session_scope() as session:
return PostMarker().mark(session,post)
def load_data(df_valid):
with session_scope() as session:
for i,row in df_valid.iterrows():
#if(i=='4665470743'):
# row['new']=True
#else:
# row['new']=False
PostInserter().insert(session,row)
def read_new_data():
with session_scope() as session:
return PostReader().read(session)
def read_interested_users(search_criteria):
with session_scope() as session:
return UserReader().read(session,search_criteria)
def delete_user(phone_number):
with session_scope() as session:
return UserDeleter().delete(session,phone_number)
def check_user(phone_number):
with session_scope() as session:
return UserExistChecker().check(session,phone_number)
def edit_user(phone_number,search_criteria):
with session_scope() as session:
return UserEditer().edit(session,phone_number,search_criteria)
def insert_user(email_address,phone_number,password,search_criteria):
with session_scope() as session:
UserInserter().insert(session,email_address,phone_number,password,search_criteria)
def increment_posts_sent_count(phone_number):
with session_scope() as session:
return PostsCountIncrementer().increment(session,phone_number)
In [87]:
def send_text(to_addrs,msg):
fromaddr = cf['fromaddr']
# Credentials (if needed)
username = cf['username']
password = cf['password']
# The actual mail send
server = smtplib.SMTP('smtp.gmail.com:587')
server.starttls()
server.login(username,password)
server.sendmail(fromaddr, toaddrs, msg)
server.quit()
In [88]:
def bind_to_database():
engine = create_engine('postgresql://'+cf['postgres_username']+':'+
cf['postgres_password']+'@localhost/'+cf['postgres_db'])
Session = sessionmaker(bind=engine)
return Session
In [109]:
Session = bind_to_database()
insert_user('',cf['toaddrs_demo'],'pword',search_term)
In [24]:
Session = bind_to_database()
edit_user(cf['toaddrs_demo'],search_term)
In [117]:
search_criteria = 'warehouse&20philadelphia'
print search_criteria.split(' ')
Session = bind_to_database()
read_interested_users(search_criteria)
Out[117]:
In [101]:
In [108]:
Session = bind_to_database()
delete_user(cf['toaddrs_demo'])
In [75]:
Session = bind_to_database()
check_user(cf['toaddrs_demo'])
Out[75]:
In [29]:
def parse_and_load_directory_into_database(directory):
df = j2p.create_df(directory)
df_valid = j2p.get_valid_texts(df)
load_data(df_valid)
In [33]:
def send_from_database():
posts = read_new_data()
for post in posts:
phone_numbers = read_interested_users(post[1])
for phone_number in phone_numbers:
send_text(phone_number,post[0])
mark_post(post[0])
increment_posts_sent_count(phone_number)