Untitled
raw download clone
TEXT
views 44
,
size 2925 b
from datetime import datetime, timedelta

from airflow.models import DAG, Variable
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import \
    PythonOperator, BranchPythonOperator
from airflow.contrib.operators.bigquery_check_operator import \
    BigQueryCheckOperator
from airflow.contrib.operators.bigquery_operator import \
    BigQueryOperator


DAG_CONF = Variable.get('tnk_dconf_api_mobile_device', deserialize_json=True)

DAG_OBJ = DAG(
    dag_id='el_api_mobile_device',
    description='Mobile data extraction',
    default_args={
        'owner': DAG_CONF['owner'],
        'start_date': datetime(2019, 5, 23),
        'email': DAG_CONF['emails'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 3,
        'retry_delay': timedelta(minutes=10),
    },
    schedule_interval='@daily')

def check_table_func(**kwargs):
    hook = BigQueryHook(bigquery_conn_id=DAG_CONF['bigquery_conn_id'])
    table_exists = hook.table_exists(
        project_id=DAG_CONF['project_id'],
        dataset_id=DAG_CONF['src_dataset_id'],
        table_id=DAG_CONF['src_table_id'])
    if table_exists:
        return 'extract'
    else:
        return 'table_doesnt_exists'

check_table = BranchPythonOperator(
    task_id='check_table',
    python_callable=check_table_func,
    provide_context=True,
    dag=DAG_OBJ)

table_doesnt_exists = DummyOperator(
    task_id='table_doesnt_exists',
    trigger_rule='one_success',
    dag=DAG_OBJ)

sql = """
SELECT
    loanid
    {%- for key in params.columns %}
    , JSON_EXTRACT_SCALAR(additional_parameters, '$.{{ key }}') AS {{ key }}
    {%- endfor %}
FROM `{{ params.src_dataset }}.{{ params.src_table }}`
WHERE DATE(_PARTITIONTIME) = '{{ ds }}'
"""
extract = BigQueryOperator(
    task_id='extract',
    bigquery_conn_id=DAG_CONF['bigquery_conn_id'],
    sql=sql,
    use_legacy_sql=False,
    destination_dataset_table=('{{ params.project }}'
                               ':{{ params.dst_dataset }}'
                               '.{{ params.dst_table }}'
                               '${{ ds_nodash }}'),
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    schema_update_options=('ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION'),
    params={
        'project': DAG_CONF['project_id'],
        'src_dataset': DAG_CONF['src_dataset_id'],
        'dst_dataset': DAG_CONF['dst_dataset_id'],
        'src_table': DAG_CONF['src_table_id'],
        'dst_table': DAG_CONF['dst_table_id'],
        'columns': DAG_CONF['keys'],
    },
    dag=DAG_OBJ)

completed = DummyOperator(
    task_id='completed',
    trigger_rule='one_success',
    dag=DAG_OBJ)

check_table >> [
    table_doesnt_exists,
    extract
] >> completed
close fullscreen
Login or Register to edit or fork this paste. It's free.