In [1]:
import os
import subprocess
import shutil
import sys
ROOT_FOLDER = os.getcwd()
NOTE: To see how SQLAlchemy behaves with Postgres, use the --debug-sql
option when running the processor.
The SQLAlchemy ORM uses lazy loading by default. For most OLTP queries, this is appropriate as it will not join on associated tables until you access the related model (via the relationship property).
However, for OLAP queries you need to be careful. The Submission
model in this workshop sample is related to both User
and Form
. When transforming the response, it accesses the user
relationship property which will then trigger a separate query. If there were very few users who created lots of submissions, this would be fine because SQLAlchemy loaded caches model instances by default. However, we would generally expect that a single user would only make a single submission during a processing interval. Therefore, both users and submissions may be large.
You can configure SQLAlchemy to always avoid the additional queries by forcing the query to join on the related tables via eager loading. This is available in our workshop via the joined_load
boolean kwarg.
Running the naive processor seems to continually consume more and more memory:
python main.py generate medium
mprof run python main.py process medium naive-single
First we use --debug-mem
to only do memory profiling on the processor. We get something like this:
Line # Mem usage Increment Line Contents
================================================
1 51.9 MiB 0.0 MiB def process(extractor, transformer, loader):
2 """
3 Extract-Transform-Load process
4
5 :param extractor: partial extractor function
6 :param transformer: partial transformer function
7 :param loader: partial loader function
8 """
9 52.1 MiB 0.2 MiB submissions_generator = extractor()
10
11 52.1 MiB 0.0 MiB events_generator = transformer(submissions_generator)
12
13 84.2 MiB 32.1 MiB loader(events_generator)
loader
seems to have the highest incremental memory usage, so we dig further by decorating the naive-loader
:
from memory_profiler import profile
@profile
def naive_loader(session: sa_orm.Session, events):
...
Results are now:
Line # Mem usage Increment Line Contents
================================================
21 52.4 MiB 0.0 MiB @log_metrics
22 @profile
23 def naive_loader(session: sa_orm.Session, events):
24 52.4 MiB 0.0 MiB num_events = 0
25 82.8 MiB 30.3 MiB for event in events:
26 82.8 MiB 0.0 MiB session.add(event)
27 82.8 MiB 0.0 MiB num_events += 1
28 86.3 MiB 3.5 MiB session.flush()
29 86.3 MiB 0.0 MiB return num_events
Next we look at the events generator (which is the transformer). Unfortunately, the @profile
decorator in the memory_profiler
library does not work with generators that use the yield from
syntax due to issue #42. Instead, you can display the memory usage before and after:
from memory_profiler import memory_usage
def transform_submissions(session, submissions, processed_on:datetime.datetime=None):
...
mem_before = memory_usage()[0]
for submission in submissions:
yield from _transform_submission(get_node_path_map, submission, processed_on)
num_submissions += 1
mem_after = memory_usage()[0]
LOGGER.info('mem increment {} MB'.format((mem_after - mem_before)))
...
Output:
[INFO:app.etl.transformers]: mem increment 28.34375 MB
It seems that despite using a generator, the events are staying in memory. By adding every event to the SQLAlchemy session, the default behavior is to keep a reference to it in memory. This can be addressed by flush the single object immediately after adding it using session.flush()
(see load-single-flush-single
processor configuration).
from memory_profiler import profile
@log_metrics
@profile
def individual_flush_loader(session: sa_orm.Session, events):
...
The loader's incremental memory usage becomes:
Line # Mem usage Increment Line Contents
================================================
31 52.5 MiB 0.0 MiB @log_metrics
32 @profile
33 def individual_flush_loader(session: sa_orm.Session, events):
34 52.5 MiB 0.0 MiB num_events = 0
35 57.3 MiB 4.8 MiB for event in events:
36 57.3 MiB 0.0 MiB session.add(event)
37 57.3 MiB 0.0 MiB session.flush([event])
38 57.3 MiB 0.0 MiB num_events += 1
39 57.3 MiB 0.0 MiB return num_events
The overall memory usage becomes:
However, it is important to note that the insertion is very slow. This is because there is an INSERT
statement for each individual event. This will result in very poor I/O performance:
INSERT INTO clover_dwh.response_events (form_id, form_name, user_id, user_full_name, submission_id, submission_created, processed_on, schema_path, value, answer_type)
VALUES (...) RETURNING clover_dwh.response_events.id
INSERT INTO clover_dwh.response_events (form_id, form_name, user_id, user_full_name, submission_id, submission_created, processed_on, schema_path, value, answer_type)
VALUES (...) RETURNING clover_dwh.response_events.id
. . .
We are encountering a class space-time tradeoff with this processor. Doing individual INSERTs to conserve memory leads to the process taking too long (I/O timing bottleneck). Loading everything into memory before insertion leads to too much memory being consumed (memory bottleneck).
By using batching (a.k.a. chunking(, you can pick a balance along the spectrum between this spectrum.
If you run with the chunked-objects-with-join
processor, the total time improves:
[INFO:__main__]: Copying scenario 'medium_number' from template
[INFO:app.etl.transformers]: Transformed 300 JSON submissions
[INFO:app.etl.loaders]: Inserted 12150 response events into database
[INFO:__main__]: Elapsed time (seconds): 3.325
The total memory usage will increase, but only by a limited amount which you control. Therefore, the memory is still bounded:
The timing is improved because we reduce the number of SQL INSERT
statements.
The rows to be inserted are batched.
You can even verify this by look at the SQL log with --debug-sql
option.
You can further optimize load insertions by reducing ORM usage. SQLAlchemy provides the bulk_insert_mappings method which takes dictionaries as input rather than model instances. According to the SQLAlchemy documentation, this will reduce latency because there is no “history” or session state management features in use.
The chunked-mappings_with_join
processor configuration will allow you to load using bulk_insert_mappings
. The memory usage is the same as large-chunks-with-join
as it uses the same chunking parameters if you run it with mprof run
. The timing profile looks similar as well as inserts are chunked.
However, the total elapsed time as recorded in the default log output for chunked_mappings_with_join
is noticeably faster than large-chunks-with-join
. This is even more apparent if you use a larger number of submissions (e.g. the large_number_fewer_users
or large_number_many_users
scenarios).
Development turnaround can be shortened by optimizing repeated regression suite runs. In particular, You can avoid recreating a 'clean database' with every individual test case.
For most Always run all your SQLAlchemy tests inside a transaction:
Session
instance) which is always rolled back..flush()
rather than .commit()
to "persist" write operations.This speeds up individual tests. However, using .flush()
should not be employed if your test actually needs to verify transaction rollback behavior.
One of the slowest steps is the test database setup. By introducing a --keepdb
option to our pytest
suite, we can force a teamplate test database to be reused by:
base_dir
kwarg to a fixed path in the testing.postgresql.Postgresql()
constructor. This greatly reduces fixture setup time and allows us to immediately start running tests. However, it is the responsibility of the developer to remove the test database if the model schema changes or a test accidentally persists a change to the database.
In [4]:
# no optimizations
%timeit -n1 -r3 subprocess.call('pytest tests/unit/test_models.py', shell=True)
In [7]:
# with optimizations
import os
from tests.conftest import KEEPDB_PATH
# ensure the first run creates the retained folder
testdb_path = os.path.join(ROOT_FOLDER, KEEPDB_PATH)
if os.path.exists(testdb_path):
shutil.rmtree(testdb_path)
%timeit -n1 -r3 subprocess.call('pytest --keepdb tests/unit/test_models.py', shell=True)