Skip to content

AI Agents: Planning and Orchestration


Link: https://techcommunity.microsoft.com/t5/educator-developer-blog/ai-agents-planning-and-orchestration/ba-p/4222345
Verified Views: 965+
Technology Area: Planning, Coordination, Agent Orchestration
Publication Date: April 14, 2025


Article Overview

Complex Coordination

This article explores advanced planning and orchestration patterns for AI agents handling complex, multi-step tasks and coordinated workflows. As Part 7 of the AI Agents series, it demonstrates how to implement sophisticated planning capabilities that enable agents to tackle complex problems systematically.

The Planning Design Pattern

Planning is a critical capability that allows agents to break down complex problems into manageable steps, organize their approach, and execute solutions methodically. Effective planning enables agents to:

  1. Decompose Problems: Break complex tasks into simpler sub-tasks
  2. Sequence Operations: Determine the optimal order of steps
  3. Allocate Resources: Assign appropriate tools and capabilities to each step
  4. Handle Dependencies: Manage relationships between tasks
  5. Adapt Dynamically: Adjust plans as new information emerges

Core Planning Approaches

Task Decomposition Pattern

The task decomposition pattern involves breaking down a complex problem into smaller, more manageable sub-tasks:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
async def decompose_task(llm_client, task_description: str) -> List[dict]:
    """Break down a complex task into subtasks."""
    response = await llm_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": "You are a task planning assistant. Your job is to break down complex tasks into smaller, manageable subtasks."
            },
            {
                "role": "user",
                "content": f"Please break down the following task into subtasks: {task_description}\n\n"
                           f"For each subtask, provide: 1) a descriptive name, 2) dependencies (if any), "
                           f"3) required tools or resources, and 4) success criteria."
            }
        ],
        response_format={"type": "json_object"}
    )

    result = json.loads(response.choices[0].message.content)
    return result["subtasks"]

Example Output:

JSON
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  "subtasks": [
    {
      "name": "Extract Key Information",
      "dependencies": [],
      "tools": ["document_parser", "entity_extraction"],
      "success_criteria": "All relevant entities (names, dates, amounts) are identified"
    },
    {
      "name": "Retrieve Background Information",
      "dependencies": ["Extract Key Information"],
      "tools": ["database_query", "web_search"],
      "success_criteria": "Contextual data about entities is collected"
    },
    {
      "name": "Generate Analysis Report",
      "dependencies": ["Extract Key Information", "Retrieve Background Information"],
      "tools": ["report_generator"],
      "success_criteria": "Comprehensive report with insights is created"
    }
  ]
}

Sequential Execution Pattern

The sequential execution pattern manages the ordered execution of subtasks, respecting dependencies:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def execute_plan(subtasks: List[dict], tools: ToolRegistry, llm_client):
    """Execute a plan of subtasks in the correct order."""
    # Track completion status
    completion_status = {task["name"]: False for task in subtasks}
    task_results = {}

    # Continue until all tasks are complete
    while not all(completion_status.values()):
        for task in subtasks:
            # Skip if already completed
            if completion_status[task["name"]]:
                continue

            # Check if dependencies are satisfied
            dependencies_met = all(
                completion_status.get(dep, False) 
                for dep in task.get("dependencies", [])
            )

            if dependencies_met:
                print(f"Executing task: {task['name']}")

                # Gather inputs from dependencies
                inputs = {
                    dep: task_results[dep] 
                    for dep in task.get("dependencies", [])
                }

                # Execute the task
                result = await execute_subtask(
                    task, 
                    inputs, 
                    tools,
                    llm_client
                )

                # Store the result and mark as completed
                task_results[task["name"]] = result
                completion_status[task["name"]] = True

        # If no tasks could be executed in this iteration, something is wrong
        if not any(completion_status.values()) and not all(completion_status.values()):
            raise Exception("Plan execution deadlocked - check dependencies")

    return task_results

Dynamic Replanning

The dynamic replanning pattern enables adaptation when conditions change or subtasks fail:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def execute_with_replanning(original_task: str, initial_plan: List[dict], 
                                 tools: ToolRegistry, llm_client):
    """Execute a plan with dynamic replanning when needed."""
    current_plan = initial_plan.copy()
    task_results = {}
    failed_tasks = []

    for task in current_plan:
        try:
            # Gather inputs from dependencies
            inputs = {
                dep: task_results[dep] 
                for dep in task.get("dependencies", []) 
                if dep in task_results
            }

            # Execute the task
            result = await execute_subtask(task, inputs, tools, llm_client)
            task_results[task["name"]] = result

        except Exception as e:
            # Task failed, add to failed tasks
            failed_tasks.append({
                "task": task,
                "error": str(e),
                "completed_tasks": list(task_results.keys())
            })

            # If we have failures, replan
            if failed_tasks:
                new_plan = await replan(
                    original_task,
                    current_plan,
                    task_results,
                    failed_tasks,
                    llm_client
                )

                # Recursively execute the new plan
                remaining_results = await execute_with_replanning(
                    original_task,
                    new_plan,
                    tools,
                    llm_client
                )

                # Combine results from completed tasks and replanned execution
                return {**task_results, **remaining_results}

    return task_results

async def replan(original_task: str, current_plan: List[dict], 
                completed_results: dict, failures: List[dict], llm_client):
    """Generate a revised plan based on execution results and failures."""
    # Format the planning context
    context = {
        "original_task": original_task,
        "current_plan": current_plan,
        "completed_tasks": list(completed_results.keys()),
        "failures": failures
    }

    response = await llm_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": "You are a task planning assistant. Your job is to revise plans when subtasks fail."
            },
            {
                "role": "user",
                "content": f"The original task was: {original_task}\n\n"
                           f"Here is the current plan: {json.dumps(current_plan, indent=2)}\n\n"
                           f"These tasks have been completed: {json.dumps(list(completed_results.keys()), indent=2)}\n\n"
                           f"These tasks failed: {json.dumps(failures, indent=2)}\n\n"
                           f"Please provide a revised plan to complete the original task."
            }
        ],
        response_format={"type": "json_object"}
    )

    result = json.loads(response.choices[0].message.content)
    return result["revised_plan"]

Advanced Orchestration Patterns

Planning Hierarchy Pattern

The planning hierarchy pattern establishes multiple levels of planning, from high-level goals to detailed actions:

Text Only
1
2
3
4
5
6
7
8
graph TD
    A[Strategic Goal] --> B1[Tactical Objective 1]
    A --> B2[Tactical Objective 2]
    B1 --> C1[Operational Task 1.1]
    B1 --> C2[Operational Task 1.2]
    B2 --> C3[Operational Task 2.1]
    C1 --> D1[Action 1.1.1]
    C1 --> D2[Action 1.1.2]

Implementation Approach:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class HierarchicalPlanner:
    def __init__(self, llm_client):
        self.llm = llm_client

    async def create_strategic_plan(self, goal: str) -> dict:
        """Create a high-level strategic plan."""
        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a strategic planning assistant."},
                {"role": "user", "content": f"Create a strategic plan for: {goal}"}
            ],
            response_format={"type": "json_object"}
        )
        return json.loads(response.choices[0].message.content)

    async def create_tactical_plan(self, strategic_objective: str) -> dict:
        """Create a tactical plan for a strategic objective."""
        # Implementation details
        pass

    async def create_operational_plan(self, tactical_objective: str) -> List[dict]:
        """Create detailed operational tasks for a tactical objective."""
        # Implementation details
        pass

    async def plan_and_execute(self, goal: str):
        """Create and execute a full hierarchical plan."""
        # 1. Create strategic plan
        strategic_plan = await self.create_strategic_plan(goal)

        # 2. Create tactical plans for each strategic objective
        tactical_plans = {}
        for objective in strategic_plan["objectives"]:
            tactical_plans[objective["name"]] = await self.create_tactical_plan(objective)

        # 3. Create operational plans for each tactical objective
        operational_plans = {}
        for obj_name, tactical_plan in tactical_plans.items():
            op_plans = {}
            for tactic in tactical_plan["tactics"]:
                op_plans[tactic["name"]] = await self.create_operational_plan(tactic)
            operational_plans[obj_name] = op_plans

        # 4. Execute the full hierarchical plan
        return await self.execute_hierarchical_plan(
            strategic_plan, 
            tactical_plans, 
            operational_plans
        )

Resource Allocation Pattern

The resource allocation pattern optimizes the use of limited resources across tasks:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
async def allocate_resources(tasks: List[dict], available_resources: dict) -> dict:
    """Allocate limited resources optimally across tasks."""
    # Define constraints for resource allocation
    constraints = {
        "total_resources": available_resources,
        "task_requirements": {task["name"]: task.get("resources_needed", {}) for task in tasks},
        "priorities": {task["name"]: task.get("priority", 1) for task in tasks}
    }

    # Use the LLM to generate an allocation plan
    response = await llm_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a resource allocation assistant."},
            {"role": "user", "content": f"Allocate these resources optimally: {json.dumps(constraints, indent=2)}"}
        ],
        response_format={"type": "json_object"}
    )

    allocation = json.loads(response.choices[0].message.content)
    return allocation["resource_allocation"]

Parallel Execution Pattern

The parallel execution pattern executes independent tasks concurrently for improved efficiency:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async def execute_parallel_tasks(tasks: List[dict], tools: ToolRegistry, llm_client):
    """Execute independent tasks in parallel."""
    # Group tasks by their dependency level
    dependency_levels = {}

    for task in tasks:
        # Calculate dependency level (0 for no dependencies)
        deps = task.get("dependencies", [])
        level = 0 if not deps else max(dependency_levels.get(dep, 0) for dep in deps) + 1

        if level not in dependency_levels:
            dependency_levels[level] = []

        dependency_levels[level].append(task)

    # Execute tasks level by level
    results = {}
    for level in sorted(dependency_levels.keys()):
        level_tasks = dependency_levels[level]

        # Create coroutines for all tasks at this level
        coroutines = []
        for task in level_tasks:
            # Gather inputs from dependencies
            inputs = {
                dep: results[dep] 
                for dep in task.get("dependencies", [])
            }

            # Create coroutine for task execution
            coroutines.append(
                execute_subtask(task, inputs, tools, llm_client)
            )

        # Execute all tasks at this level concurrently
        level_results = await asyncio.gather(*coroutines)

        # Store results
        for task, result in zip(level_tasks, level_results):
            results[task["name"]] = result

    return results

Planning Evaluation and Optimization

Plan Quality Assessment

The plan quality assessment pattern evaluates plans before execution:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
async def evaluate_plan(plan: List[dict], llm_client) -> dict:
    """Evaluate a plan for quality and completeness."""
    evaluation_criteria = [
        "Completeness - Does the plan address all aspects of the task?",
        "Efficiency - Is the task breakdown optimal?",
        "Dependencies - Are dependencies correctly identified?",
        "Resource Usage - Is resource allocation appropriate?",
        "Risk Management - Are potential failures addressed?"
    ]

    response = await llm_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a plan evaluation assistant."},
            {"role": "user", "content": f"Evaluate this plan:\n{json.dumps(plan, indent=2)}\n\n"
                                      f"Use these criteria:\n{', '.join(evaluation_criteria)}"}
        ],
        response_format={"type": "json_object"}
    )

    return json.loads(response.choices[0].message.content)

Performance Tracking

The performance tracking pattern monitors and analyzes plan execution:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class PlanExecutionTracker:
    def __init__(self):
        self.execution_log = []
        self.start_time = None
        self.end_time = None

    def start_execution(self):
        """Mark the start of plan execution."""
        self.start_time = time.time()

    def log_task_execution(self, task_name, start_time, end_time, status, result=None, error=None):
        """Log the execution of a task."""
        self.execution_log.append({
            "task": task_name,
            "start_time": start_time,
            "end_time": end_time,
            "duration": end_time - start_time,
            "status": status,
            "result_summary": summarize_result(result) if result else None,
            "error": str(error) if error else None
        })

    def end_execution(self):
        """Mark the end of plan execution."""
        self.end_time = time.time()

    def generate_execution_report(self):
        """Generate a report on the plan execution."""
        if not self.start_time or not self.end_time:
            return {"error": "Execution timing incomplete"}

        total_duration = self.end_time - self.start_time
        task_durations = {log["task"]: log["duration"] for log in self.execution_log}
        task_statuses = {log["task"]: log["status"] for log in self.execution_log}

        return {
            "total_duration": total_duration,
            "tasks_completed": sum(1 for s in task_statuses.values() if s == "completed"),
            "tasks_failed": sum(1 for s in task_statuses.values() if s == "failed"),
            "task_durations": task_durations,
            "bottlenecks": identify_bottlenecks(self.execution_log),
            "success_rate": sum(1 for s in task_statuses.values() if s == "completed") / len(task_statuses)
        }

Real-World Implementation: Project Management Agent

This example demonstrates a project management agent that uses planning and orchestration for software development projects:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class ProjectManagementAgent:
    def __init__(self, llm_client, tools_registry):
        self.llm = llm_client
        self.tools = tools_registry
        self.planner = HierarchicalPlanner(llm_client)
        self.execution_tracker = PlanExecutionTracker()

    async def manage_project(self, project_description: str, deadline: str, team: List[dict]):
        """Manage a complete software development project."""
        # 1. Create project plan
        project_plan = await self.planner.create_strategic_plan(
            f"Software project: {project_description} by {deadline}"
        )

        # 2. Evaluate plan quality
        plan_evaluation = await evaluate_plan(project_plan, self.llm)
        if plan_evaluation["overall_score"] < 0.7:
            project_plan = await self.improve_plan(project_plan, plan_evaluation)

        # 3. Allocate team resources
        team_allocation = await allocate_resources(
            project_plan["tasks"],
            {"team_members": team}
        )

        # 4. Execute the project plan with tracking
        self.execution_tracker.start_execution()

        try:
            # Execute phases in sequence
            for phase in project_plan["phases"]:
                # Execute tasks in parallel where possible
                phase_tasks = self.get_phase_tasks(project_plan, phase["name"])
                await self.execute_parallel_tasks(phase_tasks, team_allocation)

                # Checkpoint review after each phase
                await self.phase_review(phase, team)

        except Exception as e:
            # Replan if issues arise
            await self.handle_project_exception(e, project_plan)

        finally:
            self.execution_tracker.end_execution()

        # 5. Generate project summary and lessons learned
        return await self.generate_project_report()

Conclusion

Effective planning and orchestration are essential for creating truly capable AI agents. By implementing these patterns, developers can create agents that systematically approach complex problems, adapt to changing conditions, and deliver reliable results even in challenging scenarios.

The next article in this series will explore the Multi-Agent Design Pattern, showing how to create systems of specialized agents that collaborate to solve complex problems.


Series Navigation