LangGraph - Plan-and-Execute Agent

基于 LangGraph,可以轻松实现不同设计模式的 Agent、RAG 等应用

这里以 Plan-and-Execute 为例,继续深入体验一下其功能


什么是 Plan and Execute

计划执行 Agent 和核心思想是

  • 首先进行多计划的拆分
  • 一次一个项目地完成这个计划
  • 完成特定任务后,可以重新查看计划并根据需要进行修改

和 ReAct Agent 相比优势在于

  • 明确的长期规划(但即使是真正强大的 LLM 也可能难以应对)
  • 能够在执行步骤中使用较小 / 较弱的模型,在计划步骤中仅使用较大 / 较好的模型


Coding

定义工具

环境参数的配置就不赘述了,这里依然使用维基百科作为工具

1
2
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
tools = [wikipedia]


定义执行代理

创建要用于执行任务的执行代理

这里针对不同的角色使用的是同一个代理,其实可以分别使用不同执行代理在实现

1
2
llm = Azure.chat_model_4o
agent_executor = create_react_agent(llm, tools, messages_modifier=prompt)

另外注意这里的 create_react_agent 来自 langgraph.prebuilt ,使用了一个预构建的 Agent

所以我理解,这里是外面一个 Agent 套里面一个 Agent,将预构建的这个 Agent 作为一个节点内的逻辑,在后面步骤可以看到,这个节点的 name 为 agent


定义状态

从定义该代理的状态(流程轨迹 track)开始

  • 首先,跟踪当前计划的轨迹,将其表示为字符串列表
  • 接下来,应该跟踪以前执行的步骤,让我们将其表示为元组列表(这些元组将包含步骤,然后包含结果)
  • 最后,需要有一些状态来表示最终响应和原始输入
1
2
3
4
5
6
7
8
import operator
from typing import Annotated, List, Tuple, TypedDict

class PlanExecute(TypedDict):
input: str
plan: List[str] # 计划轨迹
past_steps: Annotated[List[Tuple], operator.add] # 已经完成的步骤
response: str # 响应结果


计划步骤

创建一个计划步骤,它调用 LLM 来获取结构化返回

Plan 结构

1
2
3
4
5
6
class Plan(BaseModel):
"""Plan to follow in future"""

steps: List[str] = Field(
description="different steps to follow, should be in sorted order"
)

组装

1
2
3
4
5
6
7
8
9
10
11
12
planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""",
),
("placeholder", "{messages}"),
]
)
planner = planner_prompt | llm.with_structured_output(Plan)

调用验证一下

1
2
3
4
5
6
7
8
9
print(planner.invoke(
{
"messages": [
("user", "苹果的股票在未来五年内还会继续上涨吗?")
]
}
))

# Plan(steps=['收集苹果公司过去五年的财务数据和股价走势。', '分析苹果公司的财务健康状况,包括收入、利润、现金流等关键指标。', '研究苹果公司在未来五年的发展计划和战略,包括新产品发布、市场扩展等。', '评估全球经济环境和科技行业的整体趋势对苹果公司的影响。', '综合以上信息,预测苹果公司未来五年的股价走势。'])

这里可以看到步骤拆分的结果

  1. 收集苹果公司过去五年的财务数据和股价走势
  2. 分析苹果公司的财务健康状况,包括收入、利润、现金流等关键指标
  3. 研究苹果公司在未来五年的发展计划和战略,包括新产品发布、市场扩展等
  4. 评估全球经济环境和科技行业的整体趋势对苹果公司的影响
  5. 综合以上信息,预测苹果公司未来五年的股价走势


重新计划

继续创建一个步骤,根据上一步的结果重新执行计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from typing import Union

class Response(BaseModel):
"""Response to user."""

response: str

class Act(BaseModel):
"""Action to perform."""

action: Union[Response, Plan] = Field(
description="Action to perform. If you want to respond to user, use Response. "
"If you need to further use tools to get the answer, use Plan."
)

replanner_prompt = ChatPromptTemplate.from_template(
"""For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)

replanner = replanner_prompt | ChatOpenAI(
model="gpt-4o", temperature=0
).with_structured_output(Act)


创建流程图

最后将这些节点组装起来、定义条件边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Literal

async def execute_step(state: PlanExecute):
plan = state["plan"]
plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
task = plan[0]
task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
agent_response = await agent_executor.ainvoke(
{"messages": [("user", task_formatted)]}
)
return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

async def plan_step(state: PlanExecute):
plan = await planner.ainvoke({"messages": [("user", state["input"])]})
return {"plan": plan.steps}

async def replan_step(state: PlanExecute):
output = await replanner.ainvoke(state)
if isinstance(output.action, Response):
return {"response": output.action.response}
else:
return {"plan": output.action.steps}

def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
if "response" in state and state["response"]:
return "__end__"
else:
return "agent"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langgraph.graph import StateGraph

workflow = StateGraph(PlanExecute)

# Add the plan node
workflow.add_node("planner", plan_step)

# Add the execution step
workflow.add_node("agent", execute_step)

# Add a replan node
workflow.add_node("replan", replan_step)

workflow.set_entry_point("planner")

# From plan we go to agent
workflow.add_edge("planner", "agent")

# From agent, we replan
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
"replan",
# Next, we pass in the function that will determine which node is called next.
should_end,
)

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

最终的流程图


使用

1
2
3
4
5
6
7
8
9
10
async def run():
config = {"recursion_limit": 20}
inputs = {
"input": "苹果的股票在未来五年内还会继续上涨吗?"}
async for event in app.astream(inputs, config=config):
for k, v in event.items():
if k != "__end__":
print(v)

asyncio.run(run())


改进

上面已经成功构建出一个 Plan-and-Execute Agent

实现上存在一个已知问题,每个任务仍然是按顺序执行的,这意味着即使某些任务可以并行执行(即它们不依赖于彼此的结果,可以同时开始),但由于设计的原因,它们仍然被串行地添加到总执行时间中

可以思考使用 DAG(Directed Acyclic Graph 有向无环图),而不是常规的列表来表示任务的串并行关系