classPlan(BaseModel): """Plan to follow in future"""
steps: List[str] = Field( description="different steps to follow, should be in sorted order" )
组装
1 2 3 4 5 6 7 8 9 10 11 12
planner_prompt = ChatPromptTemplate.from_messages( [ ( "system", """For the given objective, come up with a simple step by step plan. \ This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \ The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""", ), ("placeholder", "{messages}"), ] ) planner = planner_prompt | llm.with_structured_output(Plan)
action: Union[Response, Plan] = Field( description="Action to perform. If you want to respond to user, use Response. " "If you need to further use tools to get the answer, use Plan." )
replanner_prompt = ChatPromptTemplate.from_template( """For the given objective, come up with a simple step by step plan. \ This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \ The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps. Your objective was this: {input} Your original plan was this: {plan} You have currently done the follow steps: {past_steps} Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.""" )
# Add the plan node workflow.add_node("planner", plan_step)
# Add the execution step workflow.add_node("agent", execute_step)
# Add a replan node workflow.add_node("replan", replan_step)
workflow.set_entry_point("planner")
# From plan we go to agent workflow.add_edge("planner", "agent")
# From agent, we replan workflow.add_edge("agent", "replan")
workflow.add_conditional_edges( "replan", # Next, we pass in the function that will determine which node is called next. should_end, )
# Finally, we compile it! # This compiles it into a LangChain Runnable, # meaning you can use it as you would any other runnable app = workflow.compile()
最终的流程图
使用
1 2 3 4 5 6 7 8 9 10
asyncdefrun(): config = {"recursion_limit": 20} inputs = { "input": "苹果的股票在未来五年内还会继续上涨吗?"} asyncfor event in app.astream(inputs, config=config): for k, v in event.items(): if k != "__end__": print(v)