본문 바로가기
BigData/Airflow

Airflow DAG에서 SQL 스키마 관리 쉽게 하기 ( 중앙집중화 버전 )

by 윤팍 2025. 4. 21.

📌 개요

Airflow에서 데이터베이스를 다루다 보면 테이블명이나 컬럼명이 변경될 때마다 여러 파일을 수정해야 해서 번거로울 수 있습니다. 이 글에서는 SQL 스키마를 Python 코드 내 한곳에 집중 관리하여 유지보수를 간편하게 만드는 방법을 소개합니다.


🔍 이 방법을 쓰는 이유

  • 유지보수 간편화: 변경이 필요하면 한 파일만 수정하면 끝!
  • 코드 가독성 향상: SQL 쿼리와 로직이 명확하게 분리되어 관리
  • 기존 Airflow Hook 활용: MySqlHook 등을 그대로 사용할 수 있어 변경 부담이 적음

 

💻 구현 예제

📄 db_config.py (중앙 집중식 SQL 관리)

class DBConfig:
    TABLE_ALERT_HISTORY = "alert_history"

    COL_EXECUTION_ID = "execution_id"
    COL_PROCESS_NAME = "process_name"
    COL_END_TIME = "end_time"

    @classmethod
    def create_alert_history_sql(cls):
        return f"""
        CREATE TABLE IF NOT EXISTS {cls.TABLE_ALERT_HISTORY} (
            {cls.COL_EXECUTION_ID} INT PRIMARY KEY,
            {cls.COL_PROCESS_NAME} VARCHAR(255),
            {cls.COL_END_TIME} DATETIME
        );
        """

    @classmethod
    def insert_alert_history_sql(cls):
        return f"""
        INSERT INTO {cls.TABLE_ALERT_HISTORY} (
            {cls.COL_EXECUTION_ID},
            {cls.COL_PROCESS_NAME},
            {cls.COL_END_TIME}
        ) VALUES (%s, %s, %s);
        """

 

📄 Airflow DAG 코드에서 사용

from airflow import DAG
from airflow.decorators import task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
from db_config import DBConfig

with DAG('example_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:

    @task
    def create_table():
        hook = MySqlHook('my_conn')
        hook.run(DBConfig.create_alert_history_sql())

    @task
    def insert_data():
        hook = MySqlHook('my_conn')
        hook.run(DBConfig.insert_alert_history_sql(), parameters=(123, 'process_x', '2024-04-21 12:00:00'))

    create_table() >> insert_data()

⚖️ 다른 방식과 비교

방식 장점 👍 단점 👎
Jinja2 + .sql 파일 SQL 별도 관리 가능 파라미터 관리가 번거로울 수 있음
Python 코드 내 문자열 코드에서 한눈에 관리 복잡한 SQL이 많을 때 가독성 저하
SQLAlchemy (ORM) 자동화 및 타입 안전성 코드 리팩토링 비용 높음
✅ 현재 방식 (상수+메서드) 코드 관리 효율성 높음 SQL 하이라이트 제한

👉 추천: 간단한 유지보수와 Airflow Hook을 그대로 활용하려면 현재 방식이 가장 효율적입니다.

 

 

🎯 결론 및 추천
스키마 정보를 한 곳에서 관리하는 방식으로 유지보수 시간과 노력을 절약 할 수 있습니다. 팀 규모나 프로젝트 특성에 따라 추가적으로 Jinja2나 SQLAlchemy와 같은 다른 방법도 고려할 수 있지만, 대부분의 경우 현재 소개한 방식이 가장 간편하고 효과적일 것입니다.

 

Tip: 추가로 pytest나 sqlfluff 등을 활용하여 SQL의 품질과 정확성을 높이면 더욱 좋습니다.

'BigData > Airflow' 카테고리의 다른 글

[Airflow] PostgresOperator 설치 및 DB연동  (0) 2023.05.10
[airflow] 설치 및 설정 (Postgresql 연동)  (0) 2023.02.23