Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

AWS Lambda(Python)でAmazon Bedrockの出力をレスポンスストリーミング対応してみた

こんにちはイワツカです。
今年の夏は、特に猛暑日が続いていたので、例年にも増して素麺を食べてました。

さて今回は、AWS Lambda(Python)でLambda Web Adapterを用いてレスポンスストリーミングする方法を試してみたので紹介します。

1. 概要

1.1 レスポンスストリーミングとは?

レスポンスストリーミングとは、HTTPリクエストに対してサーバーがレスポンスを一度にまとめて送るのではなく、データを分割して順次クライアントに送信する手法のことです。
AWS Lambdaでは、Lambda関数URLを設定してLambdaからのレスポンスをレスポンスストリーミングにすることができます。
レスポンスストリーミングにすることで、従来に比べてユーザーにレスポンスが届くまでの時間を軽減できるので、Webアプリケーションにおけるユーザー体験を向上させることができます。
用途としては、リアルタイムなチャットのやり取りや、処理の進行状況などを表示するのに便利です。

ただ、現在レスポンスストリーミングに直接対応しているのは、ランタイムがNode.jsの場合のみです。 docs.aws.amazon.com

Lambdaで使用するランタイムがPythonの場合は、Lambda Web Adapterを使用することでレスポンスストリーミングを実現可能です。

1.2 Lambda Web Adapterとは?

Lambda Web Adapterとは、ウェブアプリケーションAWS Lambda上で実行するためのツールです。
Lambda Web Adapterを使用するとFastAPIなどのフレームワークを利用できるようになります。
github.com

そしてFastAPIには、StreamingResponseという、レスポンスストリーミングを実現するための機能があります。
fastapi.tiangolo.com

そのため、Lambda Web AdapterとFastAPIを組み合わせることによって、Pythonで実装をするLambdaでも、レスポンスストリーミングを実現できるようになります。

2. アプリ作成

この記事では、Lambda Web Adapterを使ってFastAPIをLambda上で動かし、Amazon Bedrockを使ったLLM(大規模言語モデル)とのチャットアプリをレスポンスストリーミングで作成します。
構成や実装はAWSの公式の例を参考に実施しました。 github.com

全体像はこちらです。StreamlitというPythonフレームワークで画面を作成し、LLMとのチャットをできるようにしています。

作成するアプリの全体像

2.1 実行環境

本アプリを動かすには、FastAPIを動かすためのコンテナをビルドするためのDocker環境とStreamlitを動かすためのPython環境が必要です。
Python環境は、Python 3.12.4、Streamlit 1.38.0を使用しました。

2.2 ディレクトリ構成

ディレクトリ構成は、AWS公式の例に載っているファイルに加えて、Streamlitのスクリプトのみを追加しています。
Dockerfile、requirements.txt、template.yamlは本アプリでも同じ内容でそのまま利用しました。

./
  |--app/
  |  |--Dockerfile
  |  |--main.py
  |  |--requirements.txt
  |
  |--streamlit_app.py
  |--template.yaml

2.3 FastAPIの実装

FastAPIの実装(main.py)では、APIのレスポンスをStreamingResponseにすることがポイントです。
また、StreamingResponseに渡すbedrock_chat_stream()の処理では、画面から受け取ったユーザーのメッセージに対して、Bedrockからの応答をreturnでなくyieldで都度、返しているところも重要です。
基本的な実装は公式の例に載っているので、比較的簡単に実装することができました。

import boto3
import json
import os
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional


# FastAPIのインスタンスを作成
app = FastAPI()


# チャットメッセージのモデル
class ChatMessage(BaseModel):
    message: Optional[str] = None


# APIエンドポイントを定義
@app.post("/api/chat")
def api_chat(chat_message: ChatMessage):
    if chat_message.message is None or chat_message.message.strip() == "":
        return {"error": "Message cannot be empty"}

    return StreamingResponse(bedrock_chat_stream(chat_message.message), media_type="text/event-stream")


# Bedrockクライアントを初期化
bedrock = boto3.client('bedrock-runtime')


async def bedrock_chat_stream(user_message: str):
    """ユーザーのメッセージを受け取り、LLMからの返答をストリーミング形式で返す"""
    instruction = f"ユーザーメッセージに対する会話を続けてください: {user_message}"

    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "messages": [
            {
                "role": "user",
                "content": instruction,
            }
        ],
    })

    # Bedrockからレスポンスストリーミングを取得
    response = bedrock.invoke_model_with_response_stream(
        modelId='anthropic.claude-3-haiku-20240307-v1:0',
        body=body
    )

    # ストリームが存在する場合、逐次データを処理
    stream = response.get('body')
    if stream:
        for event in stream:
            chunk = event.get('chunk')
            if chunk:
                message = json.loads(chunk.get("bytes").decode())
                if message['type'] == "content_block_delta":
                    yield message['delta']['text'] or ""
                elif message['type'] == "message_stop":
                    yield "\n"


if __name__ == "__main__":
    # Uvicornを使用してサーバーを起動
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))

2.4 Streamlitの実装

Streamlitの公式ページに載っているLLMとのチャット画面を参考に実装しました。 docs.streamlit.io

Streamlitでストリーミング出力させる場合はst.write_stream()を利用します。
本アプリでは、LLMからの応答をrequestライブラリを通して取得していて、st.write_stream()が対応していない型になっているため、対応させるためにラッパー関数(stream_wrapper())を追加しています。

import requests
import streamlit as st


# FastAPIのエンドポイントURL
API_URL = "<Lambda関数URL>"

# 画面にタイトルを追加
st.title("AI Chat with Bedrock")

# チャット履歴のsession_stateを初期化
if "messages" not in st.session_state:
    st.session_state.messages = []

# チャット履歴を表示
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])


def stream_wrapper(response):
    """レスポンスをStreamlitで互換性のある形式に変換する"""
    for chunk in response.iter_content(chunk_size=128):
        if chunk:
            yield chunk.decode('utf-8')


# チャット入力
if prompt := st.chat_input("What would you like to discuss?"):
    with st.chat_message("user"):
        st.markdown(prompt)
    # チャット履歴にユーザー入力を追加
    st.session_state.messages.append({"role": "user", "content": prompt})

    # APIにリクエストを送信し、レスポンスストリーミングを受け取る
    with st.chat_message("assistant"):
        response = requests.post(API_URL, json={"message": prompt}, stream=True)
        response_text = st.write_stream(stream_wrapper(response))

    # チャット履歴にAPIのレスポンスを追加
    st.session_state.messages.append({"role": "assistant", "content": response_text})

3. アプリを動かして見る

本アプリは以下の2ステップで動かせます。
1. チャットアプリをデプロイ
2. Streamlitを起動する

3.1 チャットアプリをデプロイ

AWS公式の例に載っている手順でFastAPIを動かすLambdaのビルドとデプロイを行います。
そして、デプロイ後にLambda関数URLが分かります。

sam build --use-container
sam deploy --guided

以下は利用したtemplate.yamlです。AWS公式の例に載っているtemplate.yamlと同じ内容です。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  Streaming Chat with FastAPI on AWS Lambda

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 300

Resources:
  FastAPIFunction:
    Type: AWS::Serverless::Function
    Properties:
      PackageType: Image
      MemorySize: 512
      Environment:
        Variables:
          AWS_LWA_INVOKE_MODE: RESPONSE_STREAM
      FunctionUrlConfig:
        AuthType: NONE
        InvokeMode: RESPONSE_STREAM
      Policies:
      - Statement:
        - Sid: BedrockInvokePolicy
          Effect: Allow
          Action:
          - bedrock:InvokeModelWithResponseStream
          Resource: '*'
      Tracing: Active
    Metadata:
      Dockerfile: Dockerfile
      DockerContext: ./app
      DockerTag: v1

Outputs:
  FastAPIFunctionUrl:
    Description: "Function URL for FastAPI function"
    Value: !GetAtt FastAPIFunctionUrl.FunctionUrl
  FastAPIFunction:
    Description: "FastAPI Lambda Function ARN"
    Value: !GetAtt FastAPIFunction.Arn

3.2 Streamlitを起動する

上記のデプロイで得られたLambda関数URLをstreamlit_app.pyのAPI_URLに代入します。
その後、streamlit_app.pyがあるディレクトリで以下のコマンドを実行します。
streamlit run streamlit_app.py

3.3 チャットを試してみる

それでは実際にチャットを試してみます。
LLMからの応答が逐次表示されているのが分かります。
※なお、本アプリはレスポンスストリーミングを試すために作成したので、会話履歴はLLMに入力しておらず単発の会話のみ可能です。

レスポンスストリーミングのチャット画面

4. まとめ

今回はLambda Web Adapterを使ってLambda関数URLの出力をレスポンスストリーミングにする方法を試してみました。
レスポンスストリーミングがAWS Lambdaでも可能になるので、チャットアプリの実現等、Webアプリケーションの幅が広がりますね。
AWSの公式のサンプルやドキュメントが充実していて、比較的簡単に実装することができるので、ぜひ試してみてください。

Acroquest Technologyでは、キャリア採用を行っています。
  • Azure OpenAI/Amazon Bedrock等を使った生成AIソリューションの開発
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • マイクロサービス、DevOps、最新のOSSクラウドサービスを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
  少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。 www.wantedly.com