A new Pipeline Approach: CORRAL

The TOROS Pipeline's Engine

Co-workers: J. B. Cabral, M. Beroiz, TOROS Collaboration

What is a pipeline?

A pipeline can be understood as a collection of filters and connectors that consume data by performing transformation of it on a linear and repetitive fashion.

This data can be named as a stream and each filter is a transformation or operation.

Connectors between the filters are the schema of the pipeline, and it inspires a always forward structure.

Some Pipelines

  • SDSS
  • Pan STARSS

Why do we do this?

We need this for TOROS for several reasons

  • Observational astronomers are facing a data tsunami, and synoptic surveys are one of the more data intensive tasks today.

  • Telescopes of any size can produce massive amounts of data, and on top of that TOROS needs real-time processing

  • Processing is time consuming for humans, and most of the tasks involved are likely to fail.

  • Handling meta-data is key to a Sinoptic Survey

The TORITOS Project

  • The first observation campaign needs.

    • Where to host data
    • A data base to record process
    • Computational power (real-time analysis)
    • Automatization of every process
    • A report system (lack of connectivity)
    • Error handling mechanisms


Corral is a framework designed to create fully integrated pipelines without tears and headaches.

Provides a structure where your pipeline metadata can be stored, mined, and analysed by controlling the processing stages of the actual data.

It makes use of some computer science formalisms and tools:

  • MVC pattern
  • ETL operation routines
  • OOP (Python native)
  • SQL database fully integrated
  • Pipeline branching
  • Asynchronus and embarrasing parallel processing

Testing and Code quality measurements integrated!

Toritos pipeline design

A main feature of this new Pipeline design approach is Model-View-Controller pattern.

The TORITOS pipeline needs three central entities:

  • The Models
  • The Loader and Steps (as controllers)
  • The Alerts (as user views)

The models

These are the designed data structures that will be interacting with the processing steps, and the storage of meta data that the pipeline handles.

It is correct to say that are the SQL data tables definitions which the pipeline works with.

In [1]:
from sqlalchemy.orm import *
from sqlalchemy import *
from sqlalchemy.ext import declarative

Model = declarative.declarative_base(name="Model")

In [2]:
class Observatory(Model):
    """Model for observatories. SQLAlchemy Model object.

    __tablename__ = 'Observatory'

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False, unique=True)
    latitude = Column(Float, nullable=False)
    longitude = Column(Float, nullable=False)
    description = Column(Text, nullable=True)

    def __repr__(self):
        return self.name

In [3]:
class Campaign(Model):

    __tablename__ = 'Campaign'

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False, unique=True)
    description = Column(Text, nullable=True)

    observatory_id = Column(Integer, ForeignKey('Observatory.id'))
    observatory = relationship(
        "Observatory", backref=backref('campaigns', order_by=id))
    ccd_id = Column(Integer, ForeignKey('CCD.id'))
    ccd = relationship(
        "CCD", backref=backref('campaigns', order_by=id))

    def __repr__(self):
        return self.name

The Steps

The processing operations performed during each stage of the pipeline are encapsulated in atomic Steps, i.e. each one is to be performed completely or failed fatally.

They cannot have a mixed outcome, and every time a step process is executed a database entry is recorded, to help mantain reproducibility of results and error tracking.


The Loader is the very first step to be executed and is the only one that is able to input data on the stream of the pipeline.

It needs to exist and it can be hosted inside a cronjob or similar task managers.

General Steps and branching

The general step entity is formalised in filters and connectors.

  • Filter: works by Quering the SQL database, extracting data, and transforming it
  • Connector: works by loading the result of a filter operation into new database records

This can be seen inside a ETL closed process

Extract Transform and Load

This is a processing recipe in which data is queried from a DB, some processing takes place, and after that a new record is registered into a DB (can be the same).


A line of data stream can (should!) be processed in parallel, if there is no concurrence between its partner processes.

The Alerts

Whenever a previously defined condition is satisfied, or a preferred state is reached, an alert is triggered.

CORRAL can communicate these alerts by SMS, e-mail, or web services (like a Django web app).

Inside CORRAL any alert can be defined, and any channel compatible with Python will communicate the information.


This Pipeline framework has been developed for TOROS, and it will be fully deployed in next campaign.

  • CORRAL in general is a framework ideal for research, and scientific DB hosting and data mining.
  • It is able to work under many environment conditions, being really flexible
  • It is completely written in Python, and TORITOS does not depend on any other language or software.
  • We can guarantee code quality and reproducibility of results
  • Provides a parallelization of the chain processes out of the box.
  • Provides a customizable user interface without risking DB and process complexity
  • It can grow organically, by adding new branchs to existing pipelines
  • It uses state of the art SQL Alchemy ORM supporting many DB
  • It is One ring to rule them all