In [1]:
import peewee
In [1]:
import asyncio
from aioorm import AioModel, AioMySQLDatabase,AioPostgreSQLDatabase
from aioorm.utils import aiodump_csv,aioload_csv
from peewee import (ForeignKeyField, IntegerField, CharField,
DateTimeField, TextField, PrimaryKeyField,Proxy,
SQL)
from io import StringIO
import aiofiles
from itertools import count
from pathlib import Path
from aitertools import alist
import os
In [2]:
db = Proxy()
In [3]:
class User_csv(AioModel):
name = CharField(max_length=25)
age = IntegerField()
sex = CharField(max_length=1)
class Meta:
database = db
In [4]:
class User_csv_f(AioModel):
id = PrimaryKeyField()
name = CharField(max_length=25)
age = IntegerField()
sex = CharField(max_length=1)
def __repr__(self):
return '{self.id},{self.name},{self.age},{self.sex}'.format(self=self)
class Meta:
database = db
In [5]:
database = AioPostgreSQLDatabase('test_ext',
host='localhost',
port=5432,
user='huangsizhe')
db.initialize(database)
In [24]:
# database = AioMySQLDatabase('test',
# host='127.0.0.1',
# port=3306,
# user='root',password = "hsz881224")
# db.initialize(database)
In [6]:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
In [26]:
async def test_csv_dump(loop):
await db.connect(loop)
assert await User_csv.table_exists() is False
await User_csv.create_table()
assert await User_csv.table_exists() is True
iq = User_csv.insert_many([
{'name': 'u1',"age":18,'sex':'f'},
{'name': 'u2',"age":17,'sex':'f'},
{'name': 'u3',"age":16,'sex':'m'},
{'name': 'u4',"age":15,'sex':'f'}])
await iq.execute()
#p = Path('.').absolute().parent
#filepath = p.joinpath('user_out.csv')
query = User_csv.select().order_by(User_csv.id)
#await aiodump_csv(query,str(filepath))
with StringIO() as f:
await aiodump_csv(query,f)
source = f.getvalue()
print(source)
await aioload_csv(User_csv_f, f,pk_in_csv=True)
all_cols = SQL('*')
query0 = User_csv_f.select(all_cols).order_by(User_csv_f.id)
user0 = await alist(query0)
(os.linesep).join([str(i) for i in user0])
await User_csv_f.drop_table()
async with aiofiles.open('0_user_out.csv') as ff:
er = await aioload_csv(User_csv_f, ff,pk_in_csv=True)
await User_csv_f.drop_table()
await aioload_csv(User_csv_f, '0_user_out.csv',pk_in_csv=True)
await User_csv_f.drop_table()
await User_csv.drop_table()
assert await User_csv.table_exists() is False
await db.close()
In [27]:
loop.run_until_complete(test_csv_dump(loop))
In [36]:
async def test_get(loop):
await db.connect(loop)
assert await User_csv.table_exists() is False
await User_csv.create_table()
assert await User_csv.table_exists() is True
iq = User_csv.insert_many([
{'name': 'u1',"age":18,'sex':'f'},
{'name': 'u2',"age":17,'sex':'f'},
{'name': 'u3',"age":16,'sex':'m'},
{'name': 'u4',"age":15,'sex':'f'}])
await iq.execute()
r = await User_csv.get(User_csv.name == "u1")
print(r.name)
print(r.age)
print(r.sex)
r = await User_csv.get(User_csv.name == "u5")
print(r)
r, created = await User_csv.get_or_create(name = "u1")
print(r.name)
print(r.age)
print(r.sex)
r, created = await User_csv.get_or_create(name = "u5",age=25,sex='f')
print(r.name)
print(r.age)
print(r.sex)
await User_csv.drop_table()
await db.close()
In [37]:
loop.run_until_complete(test_get(loop))
In [28]:
import csv
In [29]:
with open('0_user_out.csv') as f:
reader = csv.reader(f)
for row in reader:
print(row)
In [30]:
with open('0_user_out.csv') as f:
s = f.readline()
print(s)
s = f.readline()
In [11]:
from playhouse.postgres_ext import JSONField,BinaryJSONField
In [12]:
class APIResponse(AioModel):
url = CharField()
response = BinaryJSONField()
class Meta:
database = db
In [15]:
async def test_json(loop):
await db.connect(loop)
assert await APIResponse.table_exists() is False
await APIResponse.create_table()
assert await APIResponse.table_exists() is True
iq = APIResponse.insert_many([
dict(url='http://foo.com/baz/', response={'key': 'value'})
])
await iq.execute()
query = await APIResponse.select()
print([i.response for i in query])
for i in query:
i.response = {'key': 'value1'}
await i.save()
query = await APIResponse.select()
print([i.response for i in query])
await APIResponse.drop_table()
await db.close()
In [16]:
loop.run_until_complete(test_json(loop))
In [35]:
from playhouse.postgres_ext import ArrayField
In [17]:
class BlogPost(AioModel):
content = TextField()
tags = ArrayField(CharField)
In [ ]:
In [ ]: