In [ ]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.mysql_operator import MySqlOperator
from gcloud import storage

from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta, SU

In [ ]:
default_args = {
    'owner': 'sy',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['snugyun01@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

In [ ]:
dag = DAG('first_dag', description='dag', default_args=default_args,
          schedule_interval='1 0 * * 1', catchup=False)


t = date.today() + relativedelta(weekday=SU(-2))
suffix = t.strftime("%Y%m%d")

first_task = BashOperator(
                    task_id='first_task_ios_{}'.format(t),
                    bash_command='bash script1.sh ios {t}'.format(t=t),
                    dag=dag)

second_task = BashOperator(
                    task_id='second_task_ios_{}'.format(t),
                    bash_command='bash script2.sh ios {t}'.format(t=t),
                    dag=dag)

# Google Storage Bucket에 있는 파일 개수 체크
client = storage.Client(project='project-name')
bucket = client.bucket("bucket-name")
blobs_file_ios = bucket.list_blobs(prefix='prefix_file_name_{}'.format(suffix))

query = "LOAD DATA LOCAL INFILE '~/airflow/download/{}' INTO TABLE {table} CHARACTER SET utf8 " \
        "FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' IGNORE 1 LINES "

i = 0
bulk_insert_tasks = []
for blob in blobs_file_ios:
    blob_name = blob.name
    file_name = blob_name.split("/")[2]
    task = MySqlOperator(task_id='insert_{}_{}'.format(t, i),
                         sql=query.format(file_name), mysql_conn_id='mysql_default', dag=dag)
    bulk_insert_tasks.append(task)
    i += 1


first_task >> second_task
second_task >> bulk_insert_tasks

2. for문 사용


In [ ]:
client = storage.Client.from_service_account_json("json_key", project='project-name')
bucket = client.bucket("bucket-name")

def bulk_insert(*args, **kwargs):
    ed = kwargs.get('execution_date') + relativedelta(weekday=SU(-2))
    platform = kwargs.get('platform')
    date = ed.strftime("%Y%m%d")

    blobs_file_ios = bucket.list_blobs(prefix="tmp/log/data_{}_{}".format(platform, date))
    connection = MySQLdb.Connect(host='localhost', user='root', passwd='password', db='db', local_infile=1)
    cursor = connection.cursor()
    cursor.execute("SET innodb_lock_wait_timeout = 360;")
    cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
    cursor.execute("SET UNIQUE_CHECKS = 0;")

    for blob in blobs_file_ios:
        blob_name = blob.name
        file_name = blob_name.split("/")[2]
        query_format = "LOAD DATA LOCAL INFILE '~/download/{}' REPLACE INTO TABLE database.table_{} CHARACTER SET utf8 FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' IGNORE 1 LINES " \
                       "(user_id, instance_id, platform, app_version, country, first_week, event_week, diff_week); "
        query = query_format.format(file_name, platform)
        cursor.execute(query)

    cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
    cursor.execute("SET UNIQUE_CHECKS = 1;")
    connection.commit()

    return 'success'

In [ ]:
date = "{{ macros.ds_add(ds, -9) }}"

agg_query = "INSERT INTO database.table_{platform} (`country`, `platform`, `first_week`, `diff_week`, `retain`)\n " \
            "SELECT a.country AS country, a.platform AS platform, a.first_week AS first_week, a.diff_week AS diff_week, COUNT(DISTINCT instance_id) AS retain \n " \
            "FROM database.table_log_{platform} AS a GROUP BY country, platform, first_week, diff_week ON DUPLICATE KEY UPDATE retain = retain; "


for platform in ['ios', 'android']:
    query_task = BashOperator(task_id='query_task_{}_{}'.format(platform, t),
                              bash_command='bash cron.sh retention_query {} '.format(platform) + date,
                              dag=dag)

    extract_task = BashOperator(task_id='extract_task_{}_{}'.format(platform, t),
                                bash_command='bash cron.sh extract {} '.format(platform) + date,
                                dag=dag)

    download_task = BashOperator(task_id='download_task_{}_{}'.format(platform, t),
                                 bash_command='bash cron.sh download {} '.format(platform) + date,
                                 dag=dag)

    bulk_insert_task = PythonOperator(task_id='bulk_insert_{}'.format(platform), python_callable=bulk_insert,
                                      dag=dag, provide_context=True, op_kwargs={'platform': platform})

    retention_agg_task = MySqlOperator(task_id='retention_agg_task_{}_{}'.format(platform, t),
                                       sql=agg_query.format(platform=platform), mysql_conn_id='mysql_default', dag=dag)

    query_task >> extract_task >> download_task >> bulk_insert_task >> retention_agg_task