Refactoring Python Code


Evgeny Demchenko

Scope

  • What is refactoring?
  • Why should we refactor?
  • When to refactor?
  • Refactoring examples (how to refactor?)

What is Refactoring?

"Refactoring is a disciplined technique for restructuring an existing body of code, altering its internal structure without changing its external behavior." -- © Martin Fowler

Or in other words:

Refactoring is a process of improving your code without writing any new functionality.

Why should we refactor?

  • Make the code easier to read and maintain
  • Reduce complexity
  • Improve performance
  • Make the code more reusable and flexible
  • Pay off technical debt
  • Keep the "cost of change" low

When to refactor?

  • Ideally: as you code...
  • Otherwise:
    • When you find yourself duplicating functionality
    • When working with legacy code
    • When adding features
    • When fixing bugs

The boy-scout rule:

"always leave the code behind in a better state than you found it." -- © Uncle Bob

How to refactor?

Types of Refactoring:

  • Improve form (names, functions size, nesting)
  • Improve style (PEP8/PyLint, pythonic code)
  • Reduce duplication (DRY) and complexity (KISS)
  • Apply design patterns (such as "Ports and Adapters")
  • Apply SOLID principles (such as "Single Responsibility Principle")
  • Improve code structure (modularity and composability)

Refactoring examples

We're going to refactor a sample console movie reviews aggregation utility to demonstrate some examples of all kinds of refactoring.

We'll start with a working application that aggregates reviews from Twitter and IMDB using the movie name as input and we'll apply one-by-one different refartoring patters to make it more readable and maintainable python code.

Code "Before Refactoring"

movie_reviews.py


In [73]:
"""Movie Reviews.

Usage:  movie_reviews.py <title>
        movie_reviews.py (-h | --help)
        movie_reviews.py --version

Arguments:
  <title>         Movie title

Options:
  -h --help     Show this screen.
  --version     Show version.
"""

from docopt import docopt
from TwitterSearch import *
from dateutil import parser
from imdbpie import Imdb
import logging
import os

def main(title):
    reviews = []

    # Search tweets
    ts = TwitterSearch(
        consumer_key = os.environ.get('TWITTER_CONSUMER_KEY'),
        consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET'),
        access_token = os.environ.get('TWITTER_ACCESS_TOKEN'),
        access_token_secret = os.environ.get('TWITTER_TOKEN_SECRET')
    )
    try:
        ts.connect()

        tso = TwitterSearchOrder() # create a TwitterSearchOrder object
        tso.setKeywords(['#' + title + 'Movie']) # let's define all words we would like to have a look for
        tso.setLanguage('en') # we want to see German tweets only
        tso.setIncludeEntities(False) # and don't give us all those entity information

        # add tweets to reviews list
        results = ts.getSearchResults(tso)

    except TwitterSearchException as e: # take care of all those ugly errors if there are some
        logging.exception(str(e))
        ts.cleanUp()
    else:
        for offset in range(results.getSize()):
            if offset > 9:
                break
            tweet = results.getTweetByIndex(offset)
            reviews.append({
                'author': tweet.getUserName(),
                'summary': tweet.getText(),
                'text': tweet.getText(),
                'date': parser.parse(tweet.getCreatedDate(), ignoretz=True),
                'source': 'Twitter'
            })
    finally:
        ts.disconnect()

    # Search Imdb
    imdb = Imdb()
    try:
        response = imdb.search_for_title(title)[0]
        title_id = response['imdb_id']
        response = imdb.get_title_reviews(title_id, max_results=10)
    except IndexError as e:
        logging.exception(str(e))
    else:
        for review in response:
            reviews.append({
                'author': review.username,
                'summary': review.summary,
                'text': review.text,
                'date': parser.parse(review.date, ignoretz=True),
                'source': 'IMDB'
            })

    # Sort reviews by date
    reviews.sort(cmp=_cmprev)

    # Print reviews
    for review in reviews:
        print('(%s) @%s: %s [Source: %s]' % ( review['date'].strftime('%Y-%m-%d'), review['author'], review['summary'], review['source'] ) )

def _cmprev(r1, r2):
    if r1['date']>r2['date']:
        return -1
    elif r1['date']<r2['date']:
        return 1
    else:
        return 0

Improve Style

PEP8/PyLint:

  • Imports rules/order
  • Whitespace rules

Imports: BEFORE


In [74]:
from docopt import docopt
from TwitterSearch import *
from dateutil import parser
from imdbpie import Imdb
import logging
import os

Problems:

  • 3rd-party packages before the standard library
  • Wildcard imports

Imports: AFTER

commit 1fa2de3


In [75]:
import logging
import os

from docopt import docopt
from dateutil import parser
from imdbpie import Imdb
from TwitterSearch import TwitterSearch, TwitterSearchOrder, TwitterSearchException

Improve Names

  • Explicit better than implicit
  • Names should reflect the purpose

Sorting function: BEFORE


In [76]:
def _cmprev(r1, r2):
    if r1['date']>r2['date']:
        return -1
    elif r1['date']<r2['date']:
        return 1
    else:
        return 0

Sorting function: AFTER

commit 80fc3d6


In [77]:
def _sort_by_date_desc(first, second):
    first_date = first['date']
    second_date = second['date']

    if first_date > second_date:
        return -1
    elif first_date < second_date:
        return 1

    return 0

Create Adapters for 3rd-party code

Benefits:

  • Decouple the dependencies on the application boundary
  • Only expose the functionality you need
  • Expose 3rd-party functionality via a pythonic API
  • Easy to mock/test
  • Easy to swap

Twitter Search: BEFORE


In [78]:
def main(title):
    reviews = []

    # Search tweets
    ts = TwitterSearch(
        consumer_key = os.environ.get('TWITTER_CONSUMER_KEY'),
        consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET'),
        access_token = os.environ.get('TWITTER_ACCESS_TOKEN'),
        access_token_secret = os.environ.get('TWITTER_TOKEN_SECRET')
    )
    try:
        ts.connect()

        tso = TwitterSearchOrder()  # create a TwitterSearchOrder object
        tso.setKeywords(['#' + title + 'Movie'])  # let's define all words we would like to have a look for
        tso.setLanguage('en')  # we want to see German tweets only
        tso.setIncludeEntities(False)  # and don't give us all those entity information

        # add tweets to reviews list
        results = ts.getSearchResults(tso)

    except TwitterSearchException as e:  # take care of all those ugly errors if there are some
        logging.exception(str(e))
        ts.cleanUp()
    else:
        for offset in range(results.getSize()):
            if offset > 9:
                break
            tweet = results.getTweetByIndex(offset)
            reviews.append({
                'author': tweet.getUserName(),
                'summary': tweet.getText(),
                'text': tweet.getText(),
                'date': parser.parse(tweet.getCreatedDate(), ignoretz=True),
                'source': 'Twitter'
            })
    finally:
        ts.disconnect()

TwitterReviews adapter

commit 3156496


In [79]:
import logging
import os
from dateutil import parser

from TwitterSearch import TwitterSearch, TwitterSearchOrder, TwitterSearchException


class TwitterReviews(object):
    def __init__(self, movie, limit=10, language='en'):
        self.movie = movie
        self.limit = 10
        self.language = language
        self.client = TwitterSearch(
            consumer_key=os.environ.get('TWITTER_CONSUMER_KEY'),
            consumer_secret=os.environ.get('TWITTER_CONSUMER_SECRET'),
            access_token=os.environ.get('TWITTER_ACCESS_TOKEN'),
            access_token_secret=os.environ.get('TWITTER_TOKEN_SECRET')
        )

    def __enter__(self):
        self.client.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type == TwitterSearchException:
            logging.exception(str(exc_val))
            self.client.cleanUp()
        self.client.disconnect()

    @property
    def reviews(self):
        return Reviews(self._get_results(), limit=self.limit)

    def _prepare_request(self):
        tso = TwitterSearchOrder()
        tso.setKeywords(self._get_keywords())
        tso.setLanguage(self.language)
        tso.setIncludeEntities(False)
        return tso

    def _get_keywords(self):
        return ['#' + self.movie + 'Movie']

    def _get_results(self):
        request = self._prepare_request()
        return self.client.getSearchResults(request)


class Reviews(object):
    def __init__(self, tweets, limit=10):
        self.limit = limit
        self.tweets = tweets

    def __len__(self):
        size = self.tweets.getSize()
        return size if size < self.limit else self.limit

    def __getitem__(self, item):
        if item >= len(self):
            raise IndexError
        tweet = self.tweets.getTweetByIndex(item)
        return Review(tweet)


class Review(object):
    def __init__(self, review):
        self.review = review

    @property
    def author(self):
        return self.review.getUserName()

    @property
    def summary(self):
        return self.review.getText()

    @property
    def text(self):
        return self.review.getText()

    @property
    def date(self):
        return parser.parse(self.review.getCreatedDate(), ignoretz=True)

    @property
    def source(self):
        return 'Twitter'

Twitter Search: AFTER

commit d0a40a4


In [80]:
def main(title, year=None):
    reviews = []
    
    with TwitterReviews(title) as reviews_backend:
        for review in reviews_backend.reviews:
            reviews.append(review)

Remove Duplication (DRY)

  • Duplicated code should be extracted into separate functions, methods and classes
  • But the purpose is reusability, not just "not allowing any duplication"

IMDB Reviews: BEFORE


In [81]:
def main(title, year=None):
    reviews = []
    
    with TwitterReviews(title) as reviews_backend:
        for review in reviews_backend.reviews:
            reviews.append(review)
            
    # Search Imdb
    imdb = Imdb()
    try:
        response = imdb.search_for_title(title)[0]
        title_id = response['imdb_id']
        response = imdb.get_title_reviews(title_id, max_results=10)
    except IndexError as e:
        logging.exception(str(e))
    else:
        for review in response:
            reviews.append({
                'author': review.username,
                'summary': review.summary,
                'text': review.text,
                'date': parser.parse(review.date, ignoretz=True),
                'source': 'IMDB'
            })

IMDB Reviews adapter

commit aefc54e


In [82]:
import logging

from dateutil import parser
from imdbpie import Imdb


class IMDBReviews(object):
    def __init__(self, movie, limit=10, language='en'):
        self.movie = movie
        self.limit = limit
        self.language = language
        self.client = Imdb()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val:
            logging.exception(str(exc_val))

    @property
    def reviews(self):
        return [Review(result) for result in self._get_results()]

    def _get_results(self):
        try:
            response = self.client.search_for_title(self.movie)[0]
            title_id = response['imdb_id']
            response = self.client.get_title_reviews(title_id, max_results=10)
            return response
        except IndexError as e:
            logging.exception(str(e))
            return []


class Review(object):
    def __init__(self, review):
        self.review = review

    @property
    def author(self):
        return self.review.username

    @property
    def summary(self):
        return self.review.summary

    @property
    def text(self):
        return self.review.text

    @property
    def date(self):
        return parser.parse(self.review.date, ignoretz=True)

    @property
    def source(self):
        return 'IMDB'

IMDB Reviews: AFTER

commit 23888d9


In [83]:
def main(title):
    reviews = []
    
    for backend_class in (TwitterReviews, IMDBReviews):
        with backend_class(title) as reviews_backend:
            for review in reviews_backend.reviews:
                reviews.append(review)

BaseReview class

commit 4a314cd


In [84]:
class BaseReview(object):
    def __init__(self, review):
        self.review = review

    @property
    def author(self):
        raise NotImplementedError

    @property
    def summary(self):
        raise NotImplementedError

    @property
    def text(self):
        raise NotImplementedError

    @property
    def date(self):
        raise NotImplementedError

    @property
    def source(self):
        raise NotImplementedError

    def display(self):
        print '(%s) @%s: %s [Source: %s]' % (
            self.date.strftime('%Y-%m-%d'),
            self.author,
            self.summary,
            self.source)

Adding more backends

  • Now that we've refactored using new abstractions we can easily add more review sources
  • Let's add NY Times movie reviews to the mix

commit 801f26c


In [85]:
import logging
import os

import requests
from dateutil import parser

BaseReview = object  # ignore this line, it's just for Jupyter notebook


class NYTimesReviews(object):
    def __init__(self, movie, limit=10, language='en'):
        self.movie = movie
        self.limit = 10
        self.language = language
        self.url = 'https://api.nytimes.com/svc/movies/v2/reviews/search.json'
        self.api_key = os.environ.get('NY_TIMES_API_KEY')

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val:
            logging.exception(str(exc_val))

    @property
    def reviews(self):
        return [Review(result) for result in self._get_results()]

    def _get_results(self):
        response = requests.get(self.url, self._prepare_request_data())
        return response.json()['results']

    def _prepare_request_data(self):
        data = {
            'query': self.movie,
            'api-key': self.api_key
        }
        return data


class Review(BaseReview):
    @property
    def author(self):
        return self.review['byline']

    @property
    def summary(self):
        return self.review['headline']

    @property
    def text(self):
        return self.review['summary_short']

    @property
    def date(self):
        return parser.parse(self.review['date_updated'], ignoretz=True)

    @property
    def source(self):
        return 'NYTimes'

NY Times Reviews: BEFORE


In [86]:
def main(title):
    reviews = []

    for backend_class in (TwitterReviews, IMDBReviews):
        with backend_class(title) as reviews_backend:
            for review in reviews_backend.reviews:
                reviews.append(review)

    # Search NYTimes
    url = "https://api.nytimes.com/svc/movies/v2/reviews/search.json"
    data = {
        'query': title,
        'api-key': os.environ.get('NY_TIMES_API_KEY')
    }
    response = requests.get(url, data)
    count = 0
    for review in response.json()['results']:
        if count > 9:
            break
        reviews.append({
            'author': review['byline'],
            'summary': review['headline'],
            'text': review['summary_short'],
            'date': parser.parse(review['date_updated'], ignoretz=True),
            'source': 'NYTimes'
        })
        count += 1

NY Times Reviews: AFTER

commit 49079b6


In [87]:
def main(title):
    reviews = []

    for backend_class in (TwitterReviews, IMDBReviews, NYTimesReviews):
        with backend_class(title) as reviews_backend:
            for review in reviews_backend.reviews:
                reviews.append(review)

Fix sorting

  • cmp functions are deprecated in Python 3 and are hard to read
  • Let's use key function instead
  • And separate getting sorted reviews into a service function

Sorting: BEFORE


In [88]:
def _cmprev(r1, r2):
    if r1['date']>r2['date']:
        return -1
    elif r1['date']<r2['date']:
        return 1
    else:
        return 0

Sorting and Services: AFTER

commit fa75a8f


In [89]:
def get_reviews(title):
    reviews = []
    for backend_class in (TwitterReviews, IMDBReviews, NYTimesReviews):
        with backend_class(title) as reviews_backend:
            for review in reviews_backend.reviews:
                reviews.append(review)

    return reversed(sorted(reviews, key=_get_review_date))


def _get_review_date(review):
    return review.date

Final: BEFORE

movie_reviews.py


In [90]:
"""Movie Reviews.

Usage:  movie_reviews.py <title>
        movie_reviews.py (-h | --help)
        movie_reviews.py --version

Arguments:
  <title>         Movie title

Options:
  -h --help     Show this screen.
  --version     Show version.
"""

from docopt import docopt
from TwitterSearch import *
from dateutil import parser
from imdbpie import Imdb
import logging
import os

def main(title):
    reviews = []

    # Search tweets
    ts = TwitterSearch(
        consumer_key = os.environ.get('TWITTER_CONSUMER_KEY'),
        consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET'),
        access_token = os.environ.get('TWITTER_ACCESS_TOKEN'),
        access_token_secret = os.environ.get('TWITTER_TOKEN_SECRET')
    )
    try:
        ts.connect()

        tso = TwitterSearchOrder() # create a TwitterSearchOrder object
        tso.setKeywords(['#' + title + 'Movie']) # let's define all words we would like to have a look for
        tso.setLanguage('en') # we want to see German tweets only
        tso.setIncludeEntities(False) # and don't give us all those entity information

        # add tweets to reviews list
        results = ts.getSearchResults(tso)

    except TwitterSearchException as e: # take care of all those ugly errors if there are some
        logging.exception(str(e))
        ts.cleanUp()
    else:
        for offset in range(results.getSize()):
            if offset > 9:
                break
            tweet = results.getTweetByIndex(offset)
            reviews.append({
                'author': tweet.getUserName(),
                'summary': tweet.getText(),
                'text': tweet.getText(),
                'date': parser.parse(tweet.getCreatedDate(), ignoretz=True),
                'source': 'Twitter'
            })
    finally:
        ts.disconnect()

    # Search Imdb
    imdb = Imdb()
    try:
        response = imdb.search_for_title(title)[0]
        title_id = response['imdb_id']
        response = imdb.get_title_reviews(title_id, max_results=10)
    except IndexError as e:
        logging.exception(str(e))
    else:
        for review in response:
            reviews.append({
                'author': review.username,
                'summary': review.summary,
                'text': review.text,
                'date': parser.parse(review.date, ignoretz=True),
                'source': 'IMDB'
            })

    # Search NYTimes
    url = "https://api.nytimes.com/svc/movies/v2/reviews/search.json"
    data = {
        'query': title,
        'api-key': os.environ.get('NY_TIMES_API_KEY')
    }
    response = requests.get(url, data)
    count = 0
    for review in response.json()['results']:
        if count > 9:
            break
        reviews.append({
            'author': review['byline'],
            'summary': review['headline'],
            'text': review['summary_short'],
            'date': parser.parse(review['date_updated'], ignoretz=True),
            'source': 'NYTimes'
        })
        count += 1

    # Sort reviews by date
    reviews.sort(cmp=_cmprev)

    # Print reviews
    for review in reviews:
        print('(%s) @%s: %s [Source: %s]' % ( review['date'].strftime('%Y-%m-%d'), review['author'], review['summary'], review['source'] ) )

def _cmprev(r1, r2):
    if r1['date']>r2['date']:
        return -1
    elif r1['date']<r2['date']:
        return 1
    else:
        return 0

Final main() program: AFTER

movie_reviews.py


In [91]:
def main(title):
    for review in get_reviews(title):
        review.display()

Final code structure


References:

Q & A


Evgeny Demchenko