📌 개요
Airflow에서 데이터베이스를 다루다 보면 테이블명이나 컬럼명이 변경될 때마다 여러 파일을 수정해야 해서 번거로울 수 있습니다. 이 글에서는 SQL 스키마를 Python 코드 내 한곳에 집중 관리하여 유지보수를 간편하게 만드는 방법을 소개합니다.
🔍 이 방법을 쓰는 이유
- ✅ 유지보수 간편화: 변경이 필요하면 한 파일만 수정하면 끝!
- ✅ 코드 가독성 향상: SQL 쿼리와 로직이 명확하게 분리되어 관리
- ✅ 기존 Airflow Hook 활용: MySqlHook 등을 그대로 사용할 수 있어 변경 부담이 적음
💻 구현 예제
📄 db_config.py (중앙 집중식 SQL 관리)
class DBConfig:
TABLE_ALERT_HISTORY = "alert_history"
COL_EXECUTION_ID = "execution_id"
COL_PROCESS_NAME = "process_name"
COL_END_TIME = "end_time"
@classmethod
def create_alert_history_sql(cls):
return f"""
CREATE TABLE IF NOT EXISTS {cls.TABLE_ALERT_HISTORY} (
{cls.COL_EXECUTION_ID} INT PRIMARY KEY,
{cls.COL_PROCESS_NAME} VARCHAR(255),
{cls.COL_END_TIME} DATETIME
);
"""
@classmethod
def insert_alert_history_sql(cls):
return f"""
INSERT INTO {cls.TABLE_ALERT_HISTORY} (
{cls.COL_EXECUTION_ID},
{cls.COL_PROCESS_NAME},
{cls.COL_END_TIME}
) VALUES (%s, %s, %s);
"""
📄 Airflow DAG 코드에서 사용
from airflow import DAG
from airflow.decorators import task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
from db_config import DBConfig
with DAG('example_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
@task
def create_table():
hook = MySqlHook('my_conn')
hook.run(DBConfig.create_alert_history_sql())
@task
def insert_data():
hook = MySqlHook('my_conn')
hook.run(DBConfig.insert_alert_history_sql(), parameters=(123, 'process_x', '2024-04-21 12:00:00'))
create_table() >> insert_data()
⚖️ 다른 방식과 비교
방식 | 장점 👍 | 단점 👎 |
Jinja2 + .sql 파일 | SQL 별도 관리 가능 | 파라미터 관리가 번거로울 수 있음 |
Python 코드 내 문자열 | 코드에서 한눈에 관리 | 복잡한 SQL이 많을 때 가독성 저하 |
SQLAlchemy (ORM) | 자동화 및 타입 안전성 | 코드 리팩토링 비용 높음 |
✅ 현재 방식 (상수+메서드) | 코드 관리 효율성 높음 | SQL 하이라이트 제한 |
👉 추천: 간단한 유지보수와 Airflow Hook을 그대로 활용하려면 현재 방식이 가장 효율적입니다.
🎯 결론 및 추천
스키마 정보를 한 곳에서 관리하는 방식으로 유지보수 시간과 노력을 절약 할 수 있습니다. 팀 규모나 프로젝트 특성에 따라 추가적으로 Jinja2나 SQLAlchemy와 같은 다른 방법도 고려할 수 있지만, 대부분의 경우 현재 소개한 방식이 가장 간편하고 효과적일 것입니다.
✨ Tip: 추가로 pytest나 sqlfluff 등을 활용하여 SQL의 품질과 정확성을 높이면 더욱 좋습니다.
'BigData > Airflow' 카테고리의 다른 글
[Airflow] PostgresOperator 설치 및 DB연동 (0) | 2023.05.10 |
---|---|
[airflow] 설치 및 설정 (Postgresql 연동) (0) | 2023.02.23 |