AI大模型實戰篇:AI Agent設計模式 – LLM Compiler

0 評論 2008 瀏覽 4 收藏 18 分鐘

通過構建有向無環圖DAG來表示任務之間的依賴關系,LLM Compiler能夠實現任務的并行執行,從而大幅降低總執行時間。本文將詳細介紹LLM Compiler的原理、實現過程以及其在實際應用中的優勢。

在上篇文章《AI大模型實戰篇:AI Agent設計模式 – Plan & Execute》中,風叔結合原理和具體源代碼,詳細介紹了AI Agent設計模式中的Plan-and-Execute。但是Plan-and-execute的局限性在于,每個任務是按順序執行的,這可能會導致總執行時間的增加。

一種有效改進的辦法是將每個任務表示為有向無環圖DAG,這樣可以讓多個任務并行執行,大幅降低執行總時間。

這就是本篇文章風叔將為大家介紹的AI Agent設計模式,LLM Compiler。

01 LLM Compiler的概念

LLM Compiler是伯克利大學的SqueezeAILab于2023年12月提出的新項目。這個項目在ReWOO引入的變量分配的基礎上,進一步訓練大語言模型生成一個有向無環圖(Directed Acyclic Graph,DAG,如下圖所示)類的規劃。DAG可以明確各步驟任務之間的依賴關系,從而并行執行任務,實現類似處理器“亂序執行”的效果,可以大幅加速AI Agent完成任務的速度。

比如下圖的例子,向Agent提問“微軟的市值需要增加多少才能超過蘋果的市值?”,Planner并行搜索微軟的市值和蘋果的市值,然后進行合并計算。

1. LLM Compiler設計模式主要有以下組件:

  • Planner:輸出流式傳輸任務的DAG,每個任務都包含一個工具、參數和依賴項列表。相比ReWOO的Planner,依賴項列表是最大的不同。
  • Task Fetching Unit:調度并執行任務,一旦滿足任務的依賴性,該單元就會安排任務。由于許多工具涉及對搜索引擎或LLM的其他調用,因此額外的并行性可以顯著提高速度。
  • Joiner:由LLM根據整個歷史記錄(包括任務執行結果),決定是否響應最終答案或是否將進度重新傳遞回Planner。

2. 下圖是LLM Compiler的原理:

  • Planner接收來自用戶的輸入,輸出流式傳輸任務的DAG
  • Task Fetching Unit從式傳輸任務DAG中讀取任務,通過處理工具并行執行
  • Task Fetching Unit將狀態和結果傳遞給Joiner(或Replanner),Joiner來決定是將結果輸出給用戶,還是增加更多任務交由Task Fetching Unit處理

02 LLM Compiler的實現過程

下面,風叔通過實際的源碼,詳細介紹LLM Compiler模式的實現方法。大家可以關注風叔,回復關鍵詞【LLMC源碼】,獲取LLM Compiler設計模式的完整源代碼。

第一步 構建工具Tools

首先,我們要定義Agent需要使用的工具。在這個例子中,我們將使用搜索引擎 + 計算器這兩個工具。

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from math_tools import get_math_tool

_get_pass(“TAVILY_API_KEY”)

calculate = get_math_tool(ChatOpenAI(model=”gpt-4-turbo-preview”))
search = TavilySearchResults(
max_results=1,
description=’tavily_search_results_json(query=”the search query”) – a search engine.’,
)

tools = [search, calculate]

第二步 構建Planner

Planner接收用戶輸入,并生成一個待執行的任務清單的DAG。

以下代碼構建了Planner的提示模板,并將其與 LLM 和輸出解析器組合在一起,輸出解析器處理以下形式的任務列表。在Planner中,我們同時定義了replanner的Prompt,這個prompt提出了三項核心的約束

  • 啟動當前計劃時,應該從概述下一個計劃策略的“Thought”開始
  • 在當前計劃中,絕不應該重復上一個計劃中已經執行的操作
  • 必須從上一個任務索引的末尾繼續任務索引,不要重復任務索引

def create_planner(
llm: BaseChatModel, tools: Sequence[BaseTool], base_prompt: ChatPromptTemplate
):
tool_descriptions = “n”.join(
f”{i+1}. {tool.description}n”
for i, tool in enumerate(
tools
) ?# +1 to offset the 0 starting index, we want it count normally from 1.
)
planner_prompt = base_prompt.partial(
replan=””,
num_tools=len(tools)
+ 1, ?# Add one because we’re adding the join() tool at the end.
tool_descriptions=tool_descriptions,
)
replanner_prompt = base_prompt.partial(
replan=’ – You are given “Previous Plan” which is the plan that the previous agent created along with the execution results ‘
“(given as Observation) of each plan and a general thought (given as Thought) about the executed results.”
‘You MUST use these information to create the next plan under “Current Plan”.n’
‘ – When starting the Current Plan, you should start with “Thought” that outlines the strategy for the next plan.n’
” – In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.n”
” – You must continue the task index from the end of the previous one. Do not repeat task indices.”,
num_tools=len(tools) + 1,
tool_descriptions=tool_descriptions,
)

def should_replan(state: list):
# Context is passed as a system message
return isinstance(state[-1], SystemMessage)

def wrap_messages(state: list):
return {“messages”: state}

def wrap_and_get_last_index(state: list):
next_task = 0
for message in state[::-1]:
if isinstance(message, FunctionMessage):
next_task = message.additional_kwargs[“idx”] + 1
break
state[-1].content = state[-1].content + f” – Begin counting at : {next_task}”
return {“messages”: state}

return (
RunnableBranch(
(should_replan, wrap_and_get_last_index | replanner_prompt),
wrap_messages | planner_prompt,
)
| llm
| LLMCompilerPlanParser(tools=tools)
)
llm = ChatOpenAI(model=”gpt-4-turbo-preview”)
planner = create_planner(llm, tools, prompt)

第三步 構建Task Fetching Unit

這個部分負責安排任務,它接收以下格式的數據流。

tool:BaseTool,

dependencies:number[]

其核心思想是,一旦滿足依賴關系,就開始執行工具,可以通過多線程實現。下面這段代碼的關鍵就在于schedule_tasks,會將所有任務處理成有向無環圖。在當前任務存在尚未完成的依賴關系時,放入pending task;在當前任務所有依賴關系都已完成時,執行任務。

@as_runnable

def schedule_task(task_inputs, config):
task: Task = task_inputs[“task”]
observations: Dict[int, Any] = task_inputs[“observations”]
try:
observation = _execute_task(task, observations, config)
except Exception:
import traceback

observation = traceback.format_exception() ?# repr(e) +

observations[task[“idx”]] = observation
def schedule_pending_task(task: Task, observations: Dict[int, Any], retry_after: float = 0.2):
while True:
deps = task[“dependencies”]
if deps and (any([dep not in observations for dep in deps])):
# Dependencies not yet satisfied
time.sleep(retry_after)
continue
schedule_task.invoke({“task”: task, “observations”: observations})
break

@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
“””Group the tasks into a DAG schedule.”””
tasks = scheduler_input[“tasks”]
args_for_tasks = {}
messages = scheduler_input[“messages”]

observations = _get_observations(messages)
task_names = {}
originals = set(observations)

futures = []
retry_after = 0.25 ?# Retry every quarter second
with ThreadPoolExecutor() as executor:
for task in tasks:
deps = task[“dependencies”]
task_names[task[“idx”]] = (
task[“tool”] if isinstance(task[“tool”], str) else task[“tool”].name
)
args_for_tasks[task[“idx”]] = task[“args”]
if (
# Depends on other tasks
deps
and (any([dep not in observations for dep in deps]))
):
futures.append(
executor.submit(
schedule_pending_task, task, observations, retry_after
)
)
else:
# No deps or all deps satisfied,can schedule now
schedule_task.invoke(dict(task=task, observations=observations))
# futures.append(executor.submit(schedule_task.invoke dict(task=task, observations=observations)))

# All tasks have been submitted or enqueued
# Wait for them to complete
wait(futures)
# Convert observations to new tool messages to add to the state
new_observations = {
k: (task_names[k], args_for_tasks[k], observations[k])
for k in sorted(observations.keys() – originals)
}

tool_messages = [
FunctionMessage(
name=name, content=str(obs), additional_kwargs={“idx”: k, “args”: task_args}
)
for k, (name, task_args, obs) in new_observations.items()
]

return tool_messages

import itertools

@as_runnable
def plan_and_schedule(state):
messages = state[“messages”]
tasks = planner.stream(messages)
# Begin executing the planner immediately
try:
tasks = itertools.chain([next(tasks)], tasks)
except StopIteration:
# Handle the case where tasks is empty.
tasks = iter([])
scheduled_tasks = schedule_tasks.invoke(
{
“messages”: messages,
“tasks”: tasks,
}
)
return {“messages”: [scheduled_tasks]}

第四步 構建Joiner

前面我們構建了Planner和Task Fetching Unit,下一步我們需要構建Joiner來處理工具的輸出,以及決定是否需要使用新的計劃并開啟新的循環。

class FinalResponse(BaseModel):
“””The final response/answer.”””
response: str

class Replan(BaseModel):
feedback: str = Field(
description=”Analysis of the previous attempts and recommendations on what needs to be fixed.”
)

class JoinOutputs(BaseModel):
“””Decide whether to replan or whether you can return the final response.”””
thought: str = Field(
description=”The chain of thought reasoning for the selected action”
)
action: Union[FinalResponse, Replan]

joiner_prompt = hub.pull(“wfh/llm-compiler-joiner”).partial(
examples=””
) ?# You can optionally add examples

llm = ChatOpenAI(model=”gpt-4-turbo-preview”)
runnable = create_structured_output_runnable(JoinOutputs, llm, joiner_prompt)

如果Agent需要繼續循環,我們需要選擇狀態機內的最新消息,并按照Planner的要求輸出相應的格式。

def _parse_joiner_output(decision: JoinOutputs) -> List[BaseMessage]:
response = [AIMessage(content=f”Thought: {decision.thought}”)]
if isinstance(decision.action, Replan):
return response + [
SystemMessage(
content=f”Context from last attempt: {decision.action.feedback}”
)
]
else:

return {“messages”: response + [AIMessage(content=decision.action.response)]}
def select_recent_messages(state) -> dict:
messages = state[“messages”]
selected = []
for msg in messages[::-1]:
selected.append(msg)
if isinstance(msg, HumanMessage):
break

return {“messages”: selected[::-1]}
joiner = select_recent_messages | runnable | _parse_joiner_output
input_messages = [HumanMessage(content=example_question)] + tool_messages
joiner.invoke(input_messages)

第五步 構建流程圖

下面,我們構建流程圖,將Planner、Task Fetching Unit、Joiner等節點添加進來,循環執行并輸出結果。

from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from typing import Annotated

class State(TypedDict):
messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

graph_builder.add_node(“plan_and_schedule”, plan_and_schedule)
graph_builder.add_node(“join”, joiner)
graph_builder.add_edge(“plan_and_schedule”, “join”)

def should_continue(state):
messages = state[“messages”]
if isinstance(messages[-1], AIMessage):
return END
return “plan_and_schedule”

graph_builder.add_conditional_edges(
start_key=”join”,
# Next, we pass in the function that will determine which node is called next.
condition=should_continue,
)

graph_builder.add_edge(START, “plan_and_schedule”)
chain = graph_builder.compile()

總結

通過前面三篇文章,按照遞進關系,風叔依次介紹了REWOO、Plan-and-Execute和LLM Compiler三種更側重規劃能力的AI Agent設計模式。從最初的ReAct模式出發,加入規劃能力即演變成REWOO;再加上Replan能力即演變成Plan-and-Execute;最后再加上DAG和并行處理能力,即演變成LLM Compiler。

在后續的文章中,風叔將轉向另外幾種側重反思的AI Agent模式。下一篇文章,風叔將介紹Agent左右互搏之術,Basic Reflection。

本文由人人都是產品經理作者【風叔】,微信公眾號:【風叔云】,原創/授權 發布于人人都是產品經理,未經許可,禁止轉載。

題圖來自Unsplash,基于 CC0 協議。

更多精彩內容,請關注人人都是產品經理微信公眾號或下載App
評論
評論請登錄
  1. 目前還沒評論,等你發揮!