본문 바로가기
Python

[FastAPI] Server Send Event(SSE) 구현

by 윤팍 2025. 2. 10.

개요

 

기존 프로젝트의 소스코드를 수정을 해야하는 날이 있었다.

SSH 로 연결하여 tail -f 명령어로 실시간으로 로그데이터를 가져오는 기능이였는데 보안 정책으로 인해 원격서버가 주기적으로 비밀번호가 변경되어, SSH 연결정보를 수동으로 바꿔줘야하는 상황이 생겨버렸다.

 

수동적인 변경이 어려워 이를 해결할 방법으로 API서버를 구현하여 이를 수행하게 만드는 것이였다.

이를 구현하는 방법 중 하나로 SSE(Server Sent Event)를 선택하게 되었고, FastAPI를 사용하여 구현하였다.

 

SSE 로 구현한 이유

 

Server Sent Event( SSE )란, 서버가 실시간으로 발생시키는 이벤트나 데이터를 클라이언트 에 단방향으로 전달하기 위해 설계된 웹 표준 방식 중 하나이다.

 

단방향 실시간 통신에 적합한데, 서버에서만 데이터를 전송하고 클라이언트는 이를 수신하기만 하는 구조이다.

로그 모니터링, 알림 서비스처럼 클라이언트가 서버로 별도의 데이터를 보낼 필요가 없는 경우에 적합하다.

별도의 프로트콜(WebSocket) 없이 HTTP 연결을 통해 스트리밍을 유지한다.

 

SSE 말고 다른 방식으로는 WebSocket 방식이 있다.

 

WebSocket은 서버와 클라이언트가 양방향으로 데이터를 주고 받을 수 있는 소켓이다.

즉, 클라이언트와 서버 사이에 별도의 TCP 전이중 통신 연결 세션을 구성하여 별도의 요청 없이 실시간으로 통신을 가능하게 한다.

 

하지만 앞서 프로젝트에 구현해야할 기능을 참고하면, 클라이언트가 실시간으로 로그를 받아서 수행하면 충분했기 때문에, SSE 방식이 가장 적합한 방법이라고 판단하여 SSE로 구현하게 되었다.

 

SSE 예시

 

FastAPI 로 SSE 구현

 

FastAPI 공식 문서의 StreamingResponse 예제가 존재한다.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
 
app = FastAPI()
 
async def fake_video_streamer():
    for i in range(10):
        yield b"some fake video bytes"
        time.sleep(1)
        
 
@app.get("/")
async def main():
    return StreamingResponse(fake_video_streamer())

 

예제의 소스는 1초에 한번씩 총 10번의 데이터가 반환되는 엔드포인트이다.

 

아래는 위 예제를 참고삼아  tail -f +n 100 으로 돌았던 기존 프로세스에서 SSE 를 사용하여 API를 구축한 코드이다 

@router.get("/tail/stream")
async def stream_logs(
    filepath:str,
    request : Request,
    offset:int = 1):

    if not os.path.exists(filepath) :
        logger.error("File not found")
        raise HTTPException(status_code=500, detail=f"{filepath} 해당 파일을 찾을 수 없습니다.")

    async def tail() :
        try :
            with open(filepath, "r", encoding="utf-8", errors="replace") as f :
                for _ in range(offset) :
                    f.readline()
                
                while True :
                    if await request.is_disconnected() :
                        logger.info("tail disconnect")
                        break

                    line = f.readline()

                    yield line
                    await asyncio.sleep(0.01)
        except Exception as e :
            logger.error(e)
            raise HTTPException(status_code=404, detail=str(e))

    return StreamingResponse(
        tail(),
        media_type="text/plain"  # 또는 "text/event-stream" 등
    )

 

특정 파일을 실시간으로 스트리밍 하여 반환 하는 프로세스이다

 

 

아래는 클라이언트 코드이다.

def stream_log_line() :
    r = requests.get(url, params=params, stream=True, timeout=(3, 10))
    r.raise_for_status()

    if not r.ok :
        print "Request Fail"
        return
        
    while True :
        try :
            line = next(r.iter_lines(), None)
    
            if line is None :
                print "None"
            print line

            r.close()

        except requests.exceptions.ConnectionError :
            print("Read Time out")
            print("다음 로직") 
            continue
        except Exception, e :
            print e
            break
            
if __name__ == "__main__" :
    stream_log_lines()

 

지속적인 연결을 위해 먼저 스트림 연결 후 connect timeout, read timeout 을 줘서 데이터가 더이상 들어오지 않을 시 후 처리를 하게 되어있는 로직이다.

 

처음엔 iter_lines() 를 직접적으로 연동하여 읽어왔으나 지속적인 읽기가 안되어 바꿔주었다.

계속 서버와 연결하고 있다가 데이터가 들어오면 받아서 출력하고, 20초동안 더이상 읽을 데이터가 없으면 ConnectionError 가 발생하여 다음 로직으로 진행하였다.

 

이처럼 코드를 짜니 실시간으로 데이터를 받아올 수 있었다. 물론 20초 후에 재연결 로직이 필요하지만.

 

마치며

 

이번에 SSE 방식의 실시간 연동을 해보았는데, 간단하게 구현할 수 있어서 좋았던거 같다.

기존 방식이랑 비슷하게 구현할려고 예외처리 고민을 많이 하였는데, 실시간 연동 자체는 쉬웠던거 같아 편리했다.

 

지금은 단방향 통신만 있으면 되었으나, 양방향 통신이 필요할 경우 WebSocket 도 고려해볼 거 같다.