In [1]:
#!pip3 install flask_wtf #--proxy=http://127.0.0.1:5865
In [2]:
from flask import Flask,render_template,jsonify
from flask import Markup
import collections
#from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
#app.config.from_object('config')
DEBUG=True
PORT=5000
SITE_OWNER='Alex Xiao'
services=dict()
Service = collections.namedtuple('Service', ['core','input_type','output_type'])
#set column to default nullable to false
Service.__new__.__defaults__ = (None,)
In [3]:
import sys
sys.path.append('/home/tuobi/')
import axtools
from axtools import DB
mydb=DB.DB('web.db')
#mydb.DEBUG=DEBUG
cache=axtools.cache
In [4]:
import re
def capitalize(str_in):
"""
Removed multiply spaces and capitalize the first letter of eeach word of str_in
"""
return re.sub(r"\w+", lambda m: m.group(0).capitalize(), re.sub(' +',' ', str_in))
In [6]:
print('Running from dir:',axtools.HOME)
In [7]:
import hashlib
def md5(in_str):
return hashlib.md5(in_str.encode('utf-8')).hexdigest()
In [8]:
import datetime
import time
In [9]:
def warp_time(in_time):
return "strftime('%Y-%m-%d %H:%M:%S',"+in_time+")"
In [10]:
def log(info):
curTS = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
try:
cache.insert('logs',['time','info'],[(curTS,info)])
except:
if DEBUG:
print("Init, creating logs table")
coldef=[DB.Column(name='time',type='datetime',nullable=False),
DB.Column(name='info',type='text',nullable=False)]
cache.create_table('logs',coldef)
cache.insert('logs',['time','info'],[(warp_time(curTS),info)])
In [ ]:
def load_users():
ulist=[]
ulist,cnt,datatyp=mydb.select("users")
if datatyp=="error" and 'no such table' in ulist.args[0] :
#init the DB if not exist
print("For 1st time run, initializing users tables")
coldef=[DB.Column(name='name',type='text',nullable=False),
DB.Column(name='pwd',type='text',nullable=True),
DB.Column(name='type',type='CHARACTER(1)')]
mydb.create_table('users',coldef)
mydb.insert('users',['name','type','pwd'],[(SITE_OWNER,'A',md5('abc001'))])
#mydb.insert('users',['name','type'],[('Michael Ngo','N')])
print("For 1st time run, initializing user "+SITE_OWNER+" inserted tables")
ulist,cnt,data=mydb.select("users")
print("Created table users")
rtn=dict()
User = collections.namedtuple('User', ['type','pwd'])
for u in ulist:
rtn[u.name]=User(type=u.type,pwd=u.pwd)
return rtn
userlist=load_users()
In [12]:
from flask import flash, redirect,request,session,g
from flask_wtf import FlaskForm as Form
In [13]:
def my_render(html,**args):
"""
Redirect to render_template, with default paramaters
"""
if check_login(LoginForm()):
return redirect('/index')
return render_template(html,
username=get_user(),
usertype=get_user_type(),
form=LoginForm(),
**args)
In [14]:
from wtforms import StringField, BooleanField,PasswordField,SelectField,BooleanField,HiddenField
from wtforms.validators import DataRequired
class LoginForm(Form):
username = StringField('username', validators=[DataRequired()])
passwd = PasswordField('password', default='PASS',validators=[DataRequired()])
remember_me_flag = BooleanField('remember_me_flag', default=False)
@app.route('/login', methods=['GET', 'POST'])
def login():
return my_render('login.html',
title='Sign In')
In [15]:
@app.route('/logout')
def logout():
session.pop('username', None)
return redirect('/index')
In [ ]:
class UserForm(Form):
updusername = StringField('username', validators=[DataRequired()])
updusertype=SelectField('usertype',choices=[('A','Administrator'),('N','Guest')],default='N')
updpasswd = PasswordField('password', default='PASS',validators=[DataRequired()])
upddelflg=BooleanField('delflg',default=False)
updopttype=HiddenField('opttype',default="New")
@app.route('/admin', methods = ['GET', 'POST'])
def admin():
global userlist
userlist=load_users()
ulist=list()
for user in userlist:
if userlist[user].pwd!='' and userlist[user].pwd!=None:
pwdsts='Set'
else:
pwdsts='Not set'
ulist.append({'username':user,'type':userlist[user].type,'pwdstatus':pwdsts})
#print(ulist)
return my_render('admin.html', userform=UserForm(),
title='Admin Page',userlist=ulist)
@app.route('/admin_user', methods = ['POST'])
def admin_user():
form=UserForm()
print(form.updusername.data,form.updusertype.data,form.updpasswd.data,
form.updopttype.data,form.upddelflg.data)
usr=capitalize(form.updusername.data)
if form.updopttype.data=="New":
#Adding new user
mydb.insert('users',['name','type','pwd'],[(usr,form.updusertype.data,md5(form.updpasswd.data))])
myflash('user '+usr+' has been added',"green")
else:
#process exising
if not form.upddelflg.data:
#update
sql="update users set type='"+form.updusertype.data+"', pwd='"+md5(form.updpasswd.data)+"' where name='"+usr+"'"
if DEBUG:
print(sql)
mydb.run(sql)
myflash('user '+usr+' has been updated',"blue")
else:
#delete
if usr==SITE_OWNER:
myflash('Not able to delete '+usr,"red")
else:
sql="delete from users where name='"+usr+"'"
if DEBUG:
print(sql)
mydb.run(sql)
myflash('user '+usr+' has been DELETED',"red")
return redirect('/admin')
@app.route('/_check_upd_user')
def check_user_exist():
usr = capitalize(request.args.get('username', 0, type=str))
clr='red'
if usr in userlist:
rtn='E'
else:
rtn='N'
clr='green'
if DEBUG:
print('User name input, checking user',usr,'result is',rtn)
return jsonify(result=rtn, color=clr)
In [ ]:
@app.route('/', methods = ['GET', 'POST'])
@app.route('/index', methods = ['GET', 'POST'])
def index():
#logs('Visited by '+request.)
posts = [ # fake array of posts
{
'author': {'nickname': 'John'},
'body': 'Beautiful day in Portland!'
},
{
'author': {'nickname': 'Susan'},
'body': 'The Avengers movie was so cool!'
}
]
html_args={'posts':posts}
return my_render("index.html",posts=posts)
#to reflcect Ajax call
@app.route('/_check_user')
def check_usertype():
usr = capitalize(request.args.get('username', 0, type=str))
clr='green'
if usr in userlist:
rtn=userlist[usr].type
else:
rtn='User Not found!'
clr='red'
if DEBUG:
print('User name input, checking user',usr,'result is',rtn)
return jsonify(result=rtn, color=clr)
In [16]:
def warp_font_color(string,color):
return Markup('<font color="'+color+'">'+string+'</font>')
def warp_span_color(string,color):
return Markup('<span style="color:'+color+'">'+string+'</span>')
def myflash(msg,color='black'):
flash(Markup(msg),color)
def get_user():
if 'username' in session:
return session['username']
return ''
def get_user_type():
if get_user() in userlist:
return userlist[get_user()].type
return ''
def check_login(form):
if form.validate_on_submit():
curuser =capitalize(form.username.data)
if curuser in userlist:
if userlist[curuser].type=='A':
#For admin
if userlist[curuser].pwd==md5(form.passwd.data):
myflash('Welcome Administrator '+curuser+' !','green')
session['username'] =curuser
#password is correct
else:
myflash('Wrong password for '+curuser+' !','red')
else:
#For normal user
myflash('Welcome '+curuser+' !')
session['username'] =curuser
else:
myflash('User '+curuser+' has NOT been registered! Please contact <a href="mailto:alex@alexxiao.me?subject=New user ['+curuser+'] to AlexXiao.me">Alex!</a> ','red')
return True
return False
In [17]:
@app.route('/service/<service_name>')
def distr_services(service_name=None, methods = ['GET', 'POST']):
if services[service_name].input_type=='get':
htmlname,rtn=process(service_name,request.args)
elif services[service_name].input_type=='post':
htmlname,rtn=process(service_name,request.form)
else:
htmlname,rtn=process(service_name,request.data)
return my_render(htmlname,rtn)
In [ ]:
class dummy_module:
def __init__(self,name):
self.name=name
self.status='invalid'
#load module
servicename=''
standard_function_list=['start','stop','pause']
try:
#to validate the module has expected functions
has_functions(services[service_name],standard_function_list)
mol=loadmodule
services[service_name]=mol
mol.__setattr__('status','init')
mol.__setattr__('name','service_name')
except:
services[service_name]=(dummy_module(servicename))
In [ ]:
def process(service_name,input_args):
if DEBUG:
print('processing ',service_name,'input type:',type(input_args))
if has_function(services[service_name],'status'):
#make sure if the service has this function
if services[service_name].status
return 'general_list.html','abc'
In [ ]:
app.run(host=app.config.get("HOST", "0.0.0.0"),
port=app.config.get("PORT", PORT)
, use_reloader=False)
In [ ]: