The sqlite3 module implements a Python DB-API 2.0 compliant interface to SQLite, an in-process relational database. SQLite is designed to be embedded in applications, instead of using a separate database server program such as MySQL, PostgreSQL, or Oracle. It is fast, rigorously tested, and flexible, making it suitable for prototyping and production deployment for some applications.
In [1]:
import os
import sqlite3
db_filename = 'todo.db'
db_is_new = not os.path.exists(db_filename)
conn = sqlite3.connect(db_filename)
if db_is_new:
print('Need to create schema')
else:
print('Database exists, assume schme does, too.')
conn.close()
In [5]:
%ls *.db
In [8]:
%rm -rf todo.db
import os
import sqlite3
db_filename= 'todo.db'
scheme_filename = 'todo_schema.sql'
db_is_new = not os.path.exists(db_filename)
with sqlite3.connect(db_filename) as conn:
if db_is_new:
print('Creating scheme')
with open(scheme_filename, 'rt') as f:
scheme = f.read()
conn.executescript(scheme)
print('Inserting inital data')
conn.executescript("""
insert into project (name, description, deadline)
values ('pymotw', 'Python Module of the Week',
'2016-11-01');
insert into task (details, status, deadline, project)
values ('write about select', 'done', '2016-04-25',
'pymotw');
insert into task (details, status, deadline, project)
values ('write about random', 'waiting', '2016-08-22',
'pymotw');
insert into task (details, status, deadline, project)
values ('write about sqlite3', 'active', '2017-07-31',
'pymotw');
""")
else:
print('Database exists, assume scheme does, too.')
In [11]:
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select id, priority, details, status, deadline from task
where project = 'pymotw'
""")
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
In [12]:
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select name, description, deadline from project
where name = 'pymotw'
""")
name, description, deadline = cursor.fetchone()
print('Project details for {} ({})\n due {}'.format(
description, name, deadline))
cursor.execute("""
select id, priority, details, status, deadline from task
where project = 'pymotw' order by deadline
""")
print('\nNext 5 tasks:')
for row in cursor.fetchmany(5):
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
The DB-API 2.0 specification says that after execute() has been called, the Cursor should set its description attribute to hold information about the data that will be returned by the fetch methods. The API specification say that the description value is a sequence of tuples containing the column name, type, display size, internal size, precision, scale, and a flag that says whether null values are accepted.
In [13]:
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select * from task where project = 'pymotw'
""")
print('Task table has these columns:')
for colinfo in cursor.description:
print(colinfo)
In [14]:
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
# Change the row factory to use Row
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
select name, description, deadline from project
where name = 'pymotw'
""")
name, description, deadline = cursor.fetchone()
print('Project details for {} ({})\n due {}'.format(
description, name, deadline))
cursor.execute("""
select id, priority, status, deadline, details from task
where project = 'pymotw' order by deadline
""")
print('\nNext 5 tasks:')
for row in cursor.fetchmany(5):
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
row['id'], row['priority'], row['details'],
row['status'], row['deadline'],
))
In [15]:
import sqlite3
import sys
db_filename = 'todo.db'
project_name = "pymotw"
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
query = """
select id, priority, details, status, deadline from task
where project = ?
"""
cursor.execute(query, (project_name,))
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
In [16]:
import sqlite3
import sys
db_filename = 'todo.db'
project_name = "pymotw"
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
query = """
select id, priority, details, status, deadline from task
where project = :project_name
order by deadline, priority
"""
cursor.execute(query, {'project_name': project_name})
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
In [17]:
import csv
import sqlite3
import sys
db_filename = 'todo.db'
data_filename = 'task.csv'
SQL = """
insert into task (details, priority, status, deadline, project)
values (:details, :priority, 'active', :deadline, :project)
"""
with open(data_filename, 'rt') as csv_file:
csv_reader = csv.DictReader(csv_file)
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.executemany(SQL, csv_reader)
In [18]:
import sqlite3
db_filename = 'todo.db'
def show_projects(conn):
cursor = conn.cursor()
cursor.execute('select name, description from project')
for name, desc in cursor.fetchall():
print(' ', name)
with sqlite3.connect(db_filename) as conn1:
print('Before changes:')
show_projects(conn1)
# Insert in one cursor
cursor1 = conn1.cursor()
cursor1.execute("""
insert into project (name, description, deadline)
values ('virtualenvwrapper', 'Virtualenv Extensions',
'2011-01-01')
""")
print('\nAfter changes in conn1:')
show_projects(conn1)
# Select from another connection, without committing first
print('\nBefore commit:')
with sqlite3.connect(db_filename) as conn2:
show_projects(conn2)
# Commit then select from another connection
conn1.commit()
print('\nAfter commit:')
with sqlite3.connect(db_filename) as conn3:
show_projects(conn3)
In [19]:
import sqlite3
db_filename = 'todo.db'
def show_projects(conn):
cursor = conn.cursor()
cursor.execute('select name, description from project')
for name, desc in cursor.fetchall():
print(' ', name)
with sqlite3.connect(db_filename) as conn:
print('Before changes:')
show_projects(conn)
try:
# Insert
cursor = conn.cursor()
cursor.execute("""delete from project
where name = 'virtualenvwrapper'
""")
# Show the settings
print('\nAfter delete:')
show_projects(conn)
# Pretend the processing caused an error
raise RuntimeError('simulated error')
except Exception as err:
# Discard the changes
print('ERROR:', err)
conn.rollback()
else:
# Save the changes
conn.commit()
# Show the results
print('\nAfter rollback:')
show_projects(conn)
An aggregation function collects many pieces of individual data and summarizes it in some way. Examples of built-in aggregation functions are avg() (average), min(), max(), and count().
The API for aggregators used by sqlite3 is defined in terms of a class with two methods. The step() method is called once for each data value as the query is processed. The finalize() method is called one time at the end of the query and should return the aggregate value. This example implements an aggregator for the arithmetic mode. It returns the value that appears most frequently in the input.
In [20]:
import sqlite3
import collections
db_filename = 'todo.db'
class Mode:
def __init__(self):
self.counter = collections.Counter()
def step(self, value):
print('step({!r})'.format(value))
self.counter[value] += 1
def finalize(self):
result, count = self.counter.most_common(1)[0]
print('finalize() -> {!r} ({} times)'.format(
result, count))
return result
with sqlite3.connect(db_filename) as conn:
conn.create_aggregate('mode', 1, Mode)
cursor = conn.cursor()
cursor.execute("""
select mode(deadline) from task where project = 'pymotw'
""")
row = cursor.fetchone()
print('mode(deadline) is:', row[0])