在生成式 AI 应用中,性能瓶颈往往是由于 LLM 执行的长延迟造成的,因此解决这个问题至关重要。在本文中,我将与大家分享我们如何利用 Haystack 和 Hamilton 使 AI 管道完全异步化,并能支持 1500 多名并发用户!
Wren AI 是一个 Text-to-SQL 解决方案,帮助数据团队通过提出业务问题而无需编写 SQL 来更快地获得结果和洞察。它是开源的!Wren AI LLM 服务负责与 LLM 相关的任务,例如将自然语言问题转换为 SQL 查询并提供分步的 SQL 解释。
注:上述任务是 Wren AI LLM 服务目前的主要任务,我们还有一些其他潜在的新功能正在排队或进行中,例如 Text-to-chart、推荐后续问题、用户入驻后推荐新问题、反馈循环、澄清、llm 系统评估框架等。欢迎在我们的Discord 服务器上与我们讨论并分享您的想法!
Wren AI LLM 服务的第一版本没有使用任何大型语言模型(LLM)框架。相反,它依赖于
因此,第一个版本是用于 Text-to-SQL 任务的朴素 RAG。
挑战
在第一个版本完成后,我们意识到如果想支持更复杂的 AI 管道,会面临一些挑战
经过一周的调研,我们决定选择 Haystack 作为构建生成式 AI 应用的基础设施。我们在其 2.0 测试版期间开始使用它。我们选择 Haystack 的原因有以下几点
因此,我们可以轻松开发更复杂的 AI 管道,并立即集成其他 LLM 和 Document Store。
挑战
您可能认为 Haystack 是构建生成式 AI 应用的完美选择;不幸的是,世界上没有完美的软件。对于我们的用例来说,有一个挑战:我们需要考虑应用在每秒用户请求数(即吞吐量)和每个用户请求所花费的时间(即延迟)方面的性能。
由于 Haystack 没有内置的异步支持,管道计算默认是同步的(生成式 AI 应用的瓶颈通常源于 LLM 执行的长延迟)。因此,我们需要使我们的 AI 管道异步化来解决这个问题。(这个问题是一个常见的话题,Haystack 团队曾分享过一个幻灯片,介绍了几个解决此问题的提案)
我们认为候选解决方案应至少具备以下标准
我们找到一个名为Hamilton的库。Hamilton 是一个轻量级的 Python 库,可以帮助我们将管道计算逻辑与计算运行时环境分离。
Hamilton 是一个用于数据转换有向无环图 (DAG) 的轻量级 Python 库。您的 DAG 是可移植的;它可以在任何 Python 运行的环境中运行,无论是脚本、笔记本、Airflow 管道还是 FastAPI 服务器等。您的 DAG 富有表现力;Hamilton 拥有丰富的功能来定义和修改 DAG 的执行(例如,数据验证、实验跟踪、远程执行)。
要创建 DAG,请编写常规 Python 函数,这些函数使用其参数指定它们的依赖关系。如下所示,这会产生易于阅读的代码,并且始终可以进行可视化。Hamilton 加载该定义并自动为您构建 DAG!
通过下面提供的代码片段,您可以看到单独使用 Haystack 与将 Haystack 与 Hamilton 集成的区别
使用 Hamilton 重写 AI 管道之前的代码片段
Generation
类的 __init__
方法中,使用管道内置的 add_component
和 connect
方法来构建管道。run
方法来调用管道执行。class Generation(BasicPipeline):
def __init__(
self,
llm_provider: LLMProvider,
):
self._pipeline = Pipeline()
self._pipeline.add_component(
"text_to_sql_prompt_builder",
PromptBuilder(template=text_to_sql_user_prompt_template),
)
self._pipeline.add_component(
"text_to_sql_generator",
llm_provider.get_generator(system_prompt=text_to_sql_system_prompt),
)
self._pipeline.add_component("post_processor", init_generation_post_processor())
self._pipeline.connect(
"text_to_sql_prompt_builder.prompt", "text_to_sql_generator.prompt"
)
self._pipeline.connect(
"text_to_sql_generator.replies", "post_processor.replies"
)
super().__init__(self._pipeline)
@timer
def run(
self,
query: str,
contexts: List[Document],
exclude: List[Dict],
include_outputs_from: List[str] | None = None,
):
logger.info("Ask Generation pipeline is running...")
return self._pipeline.run(
{
"text_to_sql_prompt_builder": {
"query": query,
"documents": contexts,
"alert": TEXT_TO_SQL_RULES,
"exclude": exclude,
}
},
include_outputs_from=(
set(include_outputs_from) if include_outputs_from else None
),
)
使用 Hamilton 重写 AI 管道之后的代码片段
Generation
类的 __init__
方法中定义 Haystack 组件;但是,我们通过使用函数描述组件之间的连接来声明组件应如何组合在一起。run
方法来调用管道执行,但在底层,它使用的是 Hamilton 的 AsyncDriver
。## Start of Pipeline
@timer
def prompt(
query: str,
documents: List[Document],
exclude: List[Dict],
alert: str,
prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"query: {query}")
logger.debug(f"documents: {documents}")
logger.debug(f"exclude: {exclude}")
return prompt_builder.run(
query=query, documents=documents, exclude=exclude, alert=alert
)
@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
return await generator.run(prompt=prompt.get("prompt"))
@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
return await post_processor.run(generate.get("replies"))
## End of Pipeline
class Generation(BasicPipeline):
def __init__(
self,
llm_provider: LLMProvider,
engine: Engine,
):
self.generator = llm_provider.get_generator(
system_prompt=text_to_sql_system_prompt
)
self.prompt_builder = PromptBuilder(template=text_to_sql_user_prompt_template)
self.post_processor = GenerationPostProcessor(engine=engine)
super().__init__(
AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult())
)
@async_timer
async def run(
self,
query: str,
contexts: List[Document],
exclude: List[Dict],
):
logger.info("Ask Generation pipeline is running...")
return await self._pipe.execute(
["post_process"],
inputs={
"query": query,
"documents": contexts,
"exclude": exclude,
"alert": TEXT_TO_SQL_RULES,
"generator": self.generator,
"prompt_builder": self.prompt_builder,
"post_processor": self.post_processor,
},
)
现在,我将向大家展示一系列性能测试结果,以证明我们的架构重写是成功的,以及为什么我们说我们的 LLM 服务可以处理 1500 多名并发用户?(Wren AI LLM 服务是一个使用一个 uvicorn worker 运行的 FastAPI 服务器)
首先,性能提升非常显著
使用 Hamilton 重写 AI 管道之前的性能统计数据
使用 Hamilton 重写 AI 管道之后的性能统计数据
此外,为了确保我们的 AI 管道确实能够进行异步计算,我构建了一个模仿 AI 管道行为的“模拟 API”,以便与真实的 AI 管道 API 进行比较。
您可以在此处轻松查看相似的性能统计数据
完成的请求数
平均每秒请求数(即吞吐量)
平均响应时间(即延迟)
模拟 API 版本
真实 AI 管道 API 版本
您可以看到它们的性能统计数据相似,因此我们相信实现第三版本架构是成功的!
最后,我将与大家分享我们如何推断我们的 LLM 服务可以处理 1500 多名并发用户。既然我已经向大家展示了模拟 API 和真实 API 的性能统计数据相似,因此我假设如果我们能够证明模拟 API 版本可以处理 1500 多名并发用户,那么我就可以推断真实 API 版本也能达到同样的结果。(我在此假设 LLM 可以处理 1500 多名并发用户)
您可以看到性能统计数据非常健康!
注:所有测试均在我使用 Apple M2 Pro、16GB 内存的 2023 款 14 英寸 MacBook Pro 上完成
希望本文能帮助您了解在构建生成式 AI 应用时可能遇到的一些问题。我也希望本文中提到的工具能帮助您更轻松地解决问题!这些都是我们可以利用和学习的优秀开源软件工具!
如果您对此话题有任何想法想讨论,请随时在这篇文章中留言。谢谢!
感谢Howard Chi、William Chang审阅本文并提供反馈!
👉 GitHub:https://github.com/Canner/WrenAI
👉 X:https://twitter.com/getwrenai
立即用 AI 为您的数据赋能?!