PyCon 2017 ETL Workshop

Table of Contents

  • Test Optimizations
  • References

In [1]:
import os
import subprocess
import shutil
import sys

ROOT_FOLDER = os.getcwd()

Extraction

NOTE: To see how SQLAlchemy behaves with Postgres, use the --debug-sql option when running the processor.

Joins

The SQLAlchemy ORM uses lazy loading by default. For most OLTP queries, this is appropriate as it will not join on associated tables until you access the related model (via the relationship property).

However, for OLAP queries you need to be careful. The Submission model in this workshop sample is related to both User and Form. When transforming the response, it accesses the user relationship property which will then trigger a separate query. If there were very few users who created lots of submissions, this would be fine because SQLAlchemy loaded caches model instances by default. However, we would generally expect that a single user would only make a single submission during a processing interval. Therefore, both users and submissions may be large.

You can configure SQLAlchemy to always avoid the additional queries by forcing the query to join on the related tables via eager loading. This is available in our workshop via the joined_load boolean kwarg.

memory profiling

Running the naive processor seems to continually consume more and more memory:

python main.py generate medium
mprof run python main.py process medium naive-single

First we use --debug-mem to only do memory profiling on the processor. We get something like this:

Line #    Mem usage    Increment   Line Contents
================================================
     1     51.9 MiB      0.0 MiB   def process(extractor, transformer, loader):
     2                                 """
     3                                 Extract-Transform-Load process
     4
     5                                 :param extractor: partial extractor function
     6                                 :param transformer: partial transformer function
     7                                 :param loader:  partial loader function
     8                                 """
     9     52.1 MiB      0.2 MiB       submissions_generator = extractor()
    10
    11     52.1 MiB      0.0 MiB       events_generator = transformer(submissions_generator)
    12
    13     84.2 MiB     32.1 MiB       loader(events_generator)

loader seems to have the highest incremental memory usage, so we dig further by decorating the naive-loader:

from memory_profiler import profile

@profile
def naive_loader(session: sa_orm.Session, events):
  ...

Results are now:

Line #    Mem usage    Increment   Line Contents
================================================
    21     52.4 MiB      0.0 MiB   @log_metrics
    22                             @profile
    23                             def naive_loader(session: sa_orm.Session, events):
    24     52.4 MiB      0.0 MiB       num_events = 0
    25     82.8 MiB     30.3 MiB       for event in events:
    26     82.8 MiB      0.0 MiB           session.add(event)
    27     82.8 MiB      0.0 MiB           num_events += 1
    28     86.3 MiB      3.5 MiB       session.flush()
    29     86.3 MiB      0.0 MiB       return num_events

Next we look at the events generator (which is the transformer). Unfortunately, the @profile decorator in the memory_profiler library does not work with generators that use the yield from syntax due to issue #42. Instead, you can display the memory usage before and after:

from memory_profiler import memory_usage

def transform_submissions(session, submissions, processed_on:datetime.datetime=None):
  ...
    mem_before = memory_usage()[0]

    for submission in submissions:
        yield from _transform_submission(get_node_path_map, submission, processed_on)
        num_submissions += 1

    mem_after = memory_usage()[0]
    LOGGER.info('mem increment {} MB'.format((mem_after - mem_before)))
  ...

Output:

[INFO:app.etl.transformers]: mem increment 28.34375 MB

It seems that despite using a generator, the events are staying in memory. By adding every event to the SQLAlchemy session, the default behavior is to keep a reference to it in memory. This can be addressed by flush the single object immediately after adding it using session.flush() (see load-single-flush-single processor configuration).

from memory_profiler import profile

@log_metrics
@profile
def individual_flush_loader(session: sa_orm.Session, events):
    ...

The loader's incremental memory usage becomes:

Line #    Mem usage    Increment   Line Contents
================================================
    31     52.5 MiB      0.0 MiB   @log_metrics
    32                             @profile
    33                             def individual_flush_loader(session: sa_orm.Session, events):
    34     52.5 MiB      0.0 MiB       num_events = 0
    35     57.3 MiB      4.8 MiB       for event in events:
    36     57.3 MiB      0.0 MiB           session.add(event)
    37     57.3 MiB      0.0 MiB           session.flush([event])
    38     57.3 MiB      0.0 MiB           num_events += 1
    39     57.3 MiB      0.0 MiB       return num_events

The overall memory usage becomes:

However, it is important to note that the insertion is very slow. This is because there is an INSERT statement for each individual event. This will result in very poor I/O performance:

INSERT INTO clover_dwh.response_events (form_id, form_name, user_id, user_full_name, submission_id, submission_created, processed_on, schema_path, value, answer_type)
VALUES (...) RETURNING clover_dwh.response_events.id

INSERT INTO clover_dwh.response_events (form_id, form_name, user_id, user_full_name, submission_id, submission_created, processed_on, schema_path, value, answer_type)
VALUES (...) RETURNING clover_dwh.response_events.id
. . .

Timing profiling

If you run the load-single-flush-single through the python cProfile, you can visualize it with gprof2dot to see the timing bottleneck.

Specifically, the bottleneck will be the naive loader because SQLAlchemy emits a SQL INSERT statement for each tranformed event.

Chunking

We are encountering a class space-time tradeoff with this processor. Doing individual INSERTs to conserve memory leads to the process taking too long (I/O timing bottleneck). Loading everything into memory before insertion leads to too much memory being consumed (memory bottleneck).

By using batching (a.k.a. chunking(, you can pick a balance along the spectrum between this spectrum.

If you run with the chunked-objects-with-join processor, the total time improves:

[INFO:__main__]: Copying scenario 'medium_number' from template
[INFO:app.etl.transformers]: Transformed 300 JSON submissions
[INFO:app.etl.loaders]: Inserted 12150 response events into database
[INFO:__main__]: Elapsed time (seconds): 3.325

The total memory usage will increase, but only by a limited amount which you control. Therefore, the memory is still bounded:

The timing is improved because we reduce the number of SQL INSERT statements.

The rows to be inserted are batched.

You can even verify this by look at the SQL log with --debug-sql option.

Reducing ORM load

You can further optimize load insertions by reducing ORM usage. SQLAlchemy provides the bulk_insert_mappings method which takes dictionaries as input rather than model instances. According to the SQLAlchemy documentation, this will reduce latency because there is no “history” or session state management features in use.

The chunked-mappings_with_join processor configuration will allow you to load using bulk_insert_mappings. The memory usage is the same as large-chunks-with-join as it uses the same chunking parameters if you run it with mprof run. The timing profile looks similar as well as inserts are chunked.

However, the total elapsed time as recorded in the default log output for chunked_mappings_with_join is noticeably faster than large-chunks-with-join. This is even more apparent if you use a larger number of submissions (e.g. the large_number_fewer_users or large_number_many_users scenarios).

Unit Test Optimizations

Development turnaround can be shortened by optimizing repeated regression suite runs. In particular, You can avoid recreating a 'clean database' with every individual test case.

For most Always run all your SQLAlchemy tests inside a transaction:

  • Provide a session fixture (SQLAlchemy Session instance) which is always rolled back.
  • Using .flush() rather than .commit() to "persist" write operations.

This speeds up individual tests. However, using .flush() should not be employed if your test actually needs to verify transaction rollback behavior.

One of the slowest steps is the test database setup. By introducing a --keepdb option to our pytest suite, we can force a teamplate test database to be reused by:

  • Setting the base_dir kwarg to a fixed path in the testing.postgresql.Postgresql() constructor.
  • Preventing re-initialization of the Postgres extensions, schemas and tables if the test database did not previously exists

This greatly reduces fixture setup time and allows us to immediately start running tests. However, it is the responsibility of the developer to remove the test database if the model schema changes or a test accidentally persists a change to the database.


In [4]:
# no optimizations
%timeit -n1 -r3 subprocess.call('pytest tests/unit/test_models.py', shell=True)


1 loop, best of 3: 2.66 s per loop

In [7]:
# with optimizations
import os
from tests.conftest import KEEPDB_PATH

# ensure the first run creates the retained folder
testdb_path = os.path.join(ROOT_FOLDER, KEEPDB_PATH)
if os.path.exists(testdb_path):
    shutil.rmtree(testdb_path)

%timeit -n1 -r3 subprocess.call('pytest --keepdb tests/unit/test_models.py', shell=True)


1 loop, best of 3: 1.15 s per loop