[Langchain] FastAPI에서 LLM Agent를 활용한 Response Streaming

2024. 10. 31. 23:33Python

서론

저번에 한번 같은 글을 올렸었는데요 그것보다 훨씬 더 간편하고 성능적으로 개선된 방법을 설명해 볼까 합니다.

기존글은 이거예요!

https://todaycodeplus.tistory.com/70

 

[Langchain] Langserve에서 LLM Agent를 활용한 Response Streaming

기존에 LCEL기반의 LLM pipeline에서 Agent를 활용하는 방식으로 전환을 하고 있었습니다. 그 이유는 아래와 같습니다.1. Retriever를 구조화하여 사용하기 힘들다.2. 복잡한 형태, 크로스 도매인을 가진

todaycodeplus.tistory.com

 

무엇이 바뀌었을까?

우선 기존에는 방식이 다음과 같았습니다. 

  1. Agent가 결과물을 생성하는 과정에서 LLM모델의 Output에서 생성되는 Token을 Callback으로 인식합니다.
  2. Callback으로 들어온 Token을 Async Queue에 담습니다.
  3. Async Queue에 담긴 Token들을 비동기로 Generator를 사용해서 Output을 냅니다.

이 방식에는 좀 문제가 있었는데, 우선 복잡도가.. Agent의 답변을 내리는 것도 아니고 LLM에서 생성하는 모든 Token을 내립니다. 그런데 이제 Async queue를 곁들여서 그래서 Callback을 한번 거쳐 나가다 보니, 좀 복잡합니다. (위에 글을 보시면 복잡하다는 게 확 느껴질 겁니다.)

 

그래서 이것을 단순화시키고자 Callback을 제거하고, Langchain의 Runnable객체로 생성된 Chain이라면 가지고 있는 astream_events 메서드를 사용하기로 했습니다. 그리고 방식은 기존과 같이 SSE를 async generator를 사용해서 구현했습니다.

 

코드 설명

이 방법은 다른 구현체가 필요 없습니다. 기존에는 Custom Callback 같은 녀석을 만들어줄 필요가 없는 거죠 다만 이미 짜인 데이터를 내려받다 보니, 내려오는 데이터의 구조가 좀 복잡 시럽 다는 것만 빼면요 

@app.post("/chat")
async def stream_agent(input_data: QuestionInput):
    try:
        logger.info(f"Received question: {input_data.question}")
        logger.info(f"Chat history length: {len(input_data.chat_history)}")

        question_data = {
            "chat_history": input_data.chat_history,
            "question": input_data.question
        }

        async def generate():
            try:
                async for event in agent_pipeline.astream_events(question_data, config={"callbacks": [langfuse_handler]}, version="v2"):
                    output_data = event["data"]
                    if "chunk" in output_data.keys():
                        if isinstance(output_data["chunk"], AIMessageChunk):
                            yield f'data: {json.dumps({"token": output_data["chunk"].__dict__["content"]})}\n\n'

            except Exception as e:
                yield f"data: {json.dumps({'error': str(e)})}\n\n"

            finally:
                yield "data: [DONE]\n\n"

        logger.info("run_agent completed")
        return StreamingResponse(generate(), media_type="text/event-stream")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

 

오우 많이 간단해졌죠? 

 

다만 주의할 점이 있습니다.

  1. 내려오는 데이터 구조가 복잡합니다. 단순하지 않고 객체 형태로 내려오는 데이터도 있기 때문에 원하는 데이터를 저보다 더 거르고 싶다면 if문의 분기를 피할 수 없을 것 같습니다.
  2. 에러 처리가 좀 복잡합니다. 어떻게 에러를 처리할 것인가에 대해서 고민을 더 해봐야 합니다.

 

그러나 이런 단점을 상쇄할 만큼 확실히 코드 가독성 측면에서도, queue를 한 번 더 거치지 않아도 돼서 성능면에서도 더 뛰어나기 때문에 훨씬 나아졌습니다!

 

마치며

제가 얼마나 기본기가 없었는지 깨달은 내용입니다. 전반적인 비동기 처리에 대한 지식이나, python generator의 구동방식 Langchain을 활용하는 방법까지 단순히 땜빵식이 아니라 더 깊게, 더 치열하게 생각하고 고민해야지만 한다는 생각을 더 하게 됩니다.