This tutorial walks through some of the fundamental concepts, objects and their usage.
Hooks are a simple abstraction layer on systems Flux interacts with. You should expect a lot more consistency across hooks than you would with the different systems' underlying libraries. You should also expect a higher level of abstraction.
Connection information is stored in the Flux metadata database, so that you don't need to hard code or remember this connection information. In the bellow example, we connect to a MySQL database by specifying the mysql_dbid, which looks up Flux's metadata to get the actual hostname, login, password, and schema name behind the scene.
Common methods:
In [19]:
# A little bit of setup
import logging
reload(logging)
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')
In [20]:
from flux.hooks import MySqlHook
mysql_hook = MySqlHook(mysql_dbid='local_mysql')
sql = """
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'flux'
"""
mysql_hook.get_records(sql)
Out[20]:
Operators allow you to perform a type of interaction with subsystems. There are 3 main families of operator
An operator instance is a task, and it represents a node in the DAG (directed acyclic graph). A task defines a start_date, end_date (None for open ended) and a schedule_interval (say daily or hourly). Actual task runs for a specific schedule time are what we refer to as task instances.
Bellow we run a simple remote MySQL statement, over a date range. The task.run() method will instantiate many task runs for the schedule specified, and run them, storing the state in the Flux database. If you were to re-run this, it would say it already succeeded.
In [25]:
from flux.operators import MySqlOperator
from datetime import datetime, timedelta
sql = """
INSERT INTO tmp
SELECT 1;
"""
mysql_op = MySqlOperator(task_id='test_task3', sql=sql, mysql_dbid='local_mysql', owner='max')
mysql_op.run(start_date=datetime(2014, 9, 15), end_date=datetime(2014, 9, 17))
A DAG is simply a collection of tasks, with relationship between them, and their associated task instance run states.
In [26]:
from flux.operators import MySqlOperator
from flux import DAG
from datetime import datetime
# Setting some default operator parameters
default_args = {
'owner': 'max',
'mysql_dbid': 'local_mysql',
}
# Initializing a directed acyclic graph
dag = DAG(dag_id='test_dag')
# MySQL Operator
sql = "TRUNCATE TABLE tmp;"
mysql_fisrt = MySqlOperator(task_id='mysql_fisrt', sql=sql, **default_args)
dag.add_task(mysql_fisrt)
sql = """
INSERT INTO tmp
SELECT 1;
"""
mysql_second = MySqlOperator(task_id='mysql_second', sql=sql, **default_args)
dag.add_task(mysql_second)
mysql_second.set_upstream(mysql_fisrt)
dag.tree_view()
dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))
Jinja is a powerful templating engine in Python. It allows to nicely integrate code logic, variables and call methods within your commands.
By default all templated fields in operators get access to these objects:
In [27]:
# Integrate arbitrary macros in your code, grow the macro module
sql = """
INSERT INTO tmp
SELECT {{ macros.random() * 100 }}
FROM t
WHERE ds='{{ macros.hive.latest_partition_for_table(some_other_table) }}';
"""
# References to constants, execution_date
sql = """
INSERT OVERWRITE TABLE {{ params["destination_table"] }}
PARTITON (ds='{{ task_instance.execution_date }}')
SELECT field
FROM {{ params["source_table"] }}
WHERE ds='{{ macros.latest_partition_for_table(some_other_table) }}';
"""
# Code logic
sql = """
INSERT OVERWRITE TABLE the_table
PARTITON (ds='{{ task_instance.execution_date }}')
{% if (mactros.datetime.now() - task_instance.execution_date).days > 90 %}
SELECT * FROM anonymized_table;
{% else %}
SELECT * FROM non_anonymized_table;
{% endif %}
"""
Perform any surgery you need from the command line. Fix false positive, false negative, rerun subsection DAGs.
In [36]:
%%bash
# Printing the --help for the main and subcommands
cd /home/mistercrunch/Flux
./flux --help
echo ============================================================================
./flux backfill -h
echo ============================================================================
./flux clear -h
echo ============================================================================
./flux run -h
echo ============================================================================
./flux webserver -h
echo ============================================================================
./flux master -h
In [43]:
%%bash
# Example run command
cd $FLUX_HOME
./flux run example_2 runme_1 2014-09-01 -sd /home/mistercrunch/Flux/dags/examples/example2.py
# Printing the log
cat /home/mistercrunch/Flux/logs/example_2/runme_1/2014-09-01T00:00:00
Deriving BaseOperator is easy. You should create all the operators your environment needs as building blocks factories for your pipelines.
Here's the source for the MySqlOperator
In [3]:
from core.models import BaseOperator
from core.hooks import MySqlHook
class MySqlOperator(BaseOperator):
__mapper_args__ = {'polymorphic_identity': 'MySqlOperator'} # SqlAlchemy artifact
template_fields = ('sql',) # the jinja template will be applied to these fields
def __init__(self, sql, mysql_dbid, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.hook = MySqlHook(mysql_dbid=mysql_dbid)
self.sql = sql
def execute(self, execution_date):
print('Executing:' + self.sql)
self.hook.run(self.sql)
Executors are an abstraction on top of systems that can run Flux task instances. The default LocalExecutor is a simple implementation of Python's multiprocessing with a simple joinable queue.
Arbitrary executors can be derived from BaseExecutor. Expect a Celery, Redis/Mesos and other executors to be created soon.
In [44]:
# Coming up