1 분 소요

Airflow DAGs

Airflow 도커 세팅을 완료하고 GCS에 data를 자동으로 올리는 과정의 DAG를 작성해 보았다.

  1. wget 으로 csv 데이터를 다운로드 받는 과정
  2. csv 파일을 parquet 파일로 변환
  3. parquet 파일을 GCS에 업로드

필요한 모듈 임포트

import os
import logging
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

from google.cloud import storage

import pyarrow.csv as pv
import pyarrow.parquet as pq

다운로드 할 파일 url, 저장 할 이름 설정

AIRFLOW_HOME = os.environ.get("AIRFLOW_HOME", '/opt/airflow/')
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
BUCKET = os.environ.get("GCP_GCS_BUCKET")

# execution_date는 deprecated 라고 한다. data_interval_start 대신 사용?
URL_PREFIX = "https://s3.amazonaws.com/nyc-tlc/csv_backup/"
URL_TEMPLATE = URL_PREFIX + "fhv_tripdata_.csv"

CSV_SAVE_POSTFIX = '/fhv_tripdata_.csv'
PARQUET_SAVE_POSTFIX = '/fhv_tripdata_.parquet'

CSV_OUTPUT_FILE_TEMPLATE = AIRFLOW_HOME + CSV_SAVE_POSTFIX
PARQUET_OUTPUT_FILE_TEMPLATE = AIRFLOW_HOME + PARQUET_SAVE_POSTFIX

PARQUET 변환 함수, GCS 업로드 함수 작성

def format_to_parquet(src_file, dst_file):
    if not src_file.endswith('.csv'):
        logging.error("Can only accept source files in CSV format, for the moment")
        return
    table = pv.read_csv(src_file)
    pq.write_table(table, dst_file)

def upload_to_gcs(bucket, object_name, local_file):
    client = storage.Client()
    bucket = client.bucket(bucket)

    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)

DAG 작성

dag = DAG(
    "FHV_to_GCS",
    start_date = datetime(2019, 1, 1),
    end_date= datetime(2019,12,31),
    schedule_interval = "0 6 2 * *",   # 매월
    catchup=True,
    max_active_runs=3,
    tags=['dtc-de']
)

with dag:
    wget_task = BashOperator(
        task_id ='wget',
        bash_command = f'wget {URL_TEMPLATE} -O {CSV_OUTPUT_FILE_TEMPLATE}'
    )

    csv_to_parquet_task = PythonOperator(
        task_id = "csv_to_parquet",
        python_callable=format_to_parquet,
        op_kwargs={
            "src_file": CSV_OUTPUT_FILE_TEMPLATE,
            "dst_file": PARQUET_OUTPUT_FILE_TEMPLATE
        }
    )

    local_to_gcs_task = PythonOperator(
        task_id="local_to_gcs_task",
        python_callable=upload_to_gcs,
        op_kwargs={
            "bucket": BUCKET,
            "object_name": f"FHV_tripData{PARQUET_SAVE_POSTFIX}",
            "local_file": f"{PARQUET_OUTPUT_FILE_TEMPLATE}",
        },
    )

    wget_task >> csv_to_parquet_task >> local_to_gcs_task

댓글남기기