Skip to content

AI Agents: Building Trustworthy Agents


Link: https://techcommunity.microsoft.com/t5/educator-developer-blog/ai-agents-building-trustworthy-agents/ba-p/4187653
Verified Views: 893+
Technology Area: Trust, Safety, Responsible AI
Publication Date: April 7, 2025


Article Overview

Trust & Safety

This article explores essential patterns and techniques for building trustworthy AI agents that operate safely and responsibly. As Part 6 of the AI Agents series, it demonstrates how to implement safety mechanisms, content filtering, ethical boundaries, and responsible operation capabilities.

Why Trust and Safety Matter

As AI agents become more capable and autonomous, ensuring they operate safely and responsibly becomes increasingly critical. Trustworthy agents must:

  1. Enforce safety boundaries: Prevent harmful, unethical, or illegal behaviors
  2. Protect user privacy: Handle sensitive information appropriately
  3. Operate transparently: Make their capabilities and limitations clear
  4. Maintain reliability: Function consistently and predictably
  5. Prevent misuse: Resist attempts to circumvent safety measures

Core Trust and Safety Patterns

Content Filtering Pattern

The content filtering pattern evaluates and filters potentially harmful content:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ContentFilter:
    def __init__(self, llm_client):
        self.llm = llm_client

    async def filter_content(self, content: str, context: dict = None) -> dict:
        """Filter potentially harmful content."""
        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": "You are a content filtering assistant. Your task is to analyze the given content for potentially harmful elements."
                },
                {
                    "role": "user",
                    "content": f"Please analyze this content and identify any harmful elements:\n\n{content}"
                }
            ],
            response_format={"type": "json_object"}
        )

        result = json.loads(response.choices[0].message.content)

        # Add filtering metadata
        result["filtered_at"] = datetime.now().isoformat()
        result["content_hash"] = hashlib.sha256(content.encode()).hexdigest()

        return result

    async def is_safe(self, content: str, context: dict = None) -> bool:
        """Quickly determine if content is safe."""
        filter_result = await self.filter_content(content, context)
        return not filter_result.get("has_harmful_content", False)

    async def get_filtered_version(self, content: str, context: dict = None) -> str:
        """Get a safe version of the content with harmful elements removed."""
        if await self.is_safe(content, context):
            return content

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": "You are a content filtering assistant. Your task is to create a safe version of the given content."
                },
                {
                    "role": "user",
                    "content": f"Please create a safe version of this content, removing or replacing any harmful elements:\n\n{content}"
                }
            ]
        )

        return response.choices[0].message.content

Safety Boundaries Pattern

The safety boundaries pattern establishes clear limits on agent behavior:

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class SafetyBoundary:
    def __init__(self, boundaries: List[dict], llm_client):
        self.boundaries = boundaries
        self.llm = llm_client
        self.boundary_violations = []

    async def check_action_safety(self, action: dict) -> dict:
        """Check if an action violates safety boundaries."""
        violations = []

        for boundary in self.boundaries:
            if boundary["type"] == "content":
                # Check content-based boundaries
                is_violation = await self.check_content_boundary(
                    action.get("content", ""),
                    boundary
                )

                if is_violation:
                    violations.append({
                        "boundary": boundary["name"],
                        "description": boundary["description"],
                        "severity": boundary["severity"]
                    })

            elif boundary["type"] == "action":
                # Check action-based boundaries
                is_violation = await self.check_action_boundary(
                    action,
                    boundary
                )

                if is_violation:
                    violations.append({
                        "boundary": boundary["name"],
                        "description": boundary["description"],
                        "severity": boundary["severity"]
                    })

        result = {
            "is_safe": len(violations) == 0,
            "violations": violations,
            "action": action,
            "timestamp": datetime.now().isoformat()
        }

        # Log violations for auditing
        if violations:
            self.boundary_violations.append({
                "action": action,
                "violations": violations,
                "timestamp": datetime.now().isoformat()
            })

        return result

    async def check_content_boundary(self, content: str, boundary: dict) -> bool:
        """Check if content violates a specific boundary."""
        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": f"You are a safety boundary checker. Your task is to determine if content violates this boundary: {boundary['description']}"
                },
                {
                    "role": "user",
                    "content": f"Does this content violate the boundary? Respond with just 'true' or 'false'.\n\n{content}"
                }
            ]
        )

        decision = response.choices[0].message.content.strip().lower()
        return decision == "true"

    async def check_action_boundary(self, action: dict, boundary: dict) -> bool:
        """Check if an action violates a specific boundary."""
        # Format action details for evaluation
        action_details = json.dumps(action, indent=2)

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": f"You are a safety boundary checker. Your task is to determine if an action violates this boundary: {boundary['description']}"
                },
                {
                    "role": "user",
                    "content": f"Does this action violate the boundary? Respond with just 'true' or 'false'.\n\nAction details: {action_details}"
                }
            ]
        )

        decision = response.choices[0].message.content.strip().lower()
        return decision == "true"

    def get_violation_log(self):
        """Get the log of boundary violations."""
        return self.boundary_violations

Input Validation Pattern

The input validation pattern ensures user inputs are safe and properly formatted:

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class InputValidator:
    def __init__(self, schema_registry, llm_client):
        self.schemas = schema_registry
        self.llm = llm_client

    async def validate_input(self, input_data: Any, schema_name: str) -> dict:
        """Validate input against a schema."""
        if schema_name not in self.schemas:
            raise ValueError(f"Schema '{schema_name}' not found in registry")

        schema = self.schemas[schema_name]

        # For simple types, use direct validation
        if schema["type"] in ["string", "number", "boolean"]:
            return self.validate_simple_type(input_data, schema)

        # For objects and arrays, use more complex validation
        elif schema["type"] == "object":
            return await self.validate_object(input_data, schema)

        elif schema["type"] == "array":
            return await self.validate_array(input_data, schema)

        else:
            raise ValueError(f"Unsupported schema type: {schema['type']}")

    def validate_simple_type(self, value, schema):
        """Validate a simple type (string, number, boolean)."""
        # Check type
        if schema["type"] == "string" and not isinstance(value, str):
            return {"valid": False, "errors": ["Value must be a string"]}

        elif schema["type"] == "number" and not isinstance(value, (int, float)):
            return {"valid": False, "errors": ["Value must be a number"]}

        elif schema["type"] == "boolean" and not isinstance(value, bool):
            return {"valid": False, "errors": ["Value must be a boolean"]}

        # Check string-specific constraints
        if schema["type"] == "string":
            # Check pattern if specified
            if "pattern" in schema and not re.match(schema["pattern"], value):
                return {"valid": False, "errors": [f"Value does not match pattern: {schema['pattern']}"]}

            # Check min/max length
            if "minLength" in schema and len(value) < schema["minLength"]:
                return {"valid": False, "errors": [f"Value length is less than minimum: {schema['minLength']}"]}

            if "maxLength" in schema and len(value) > schema["maxLength"]:
                return {"valid": False, "errors": [f"Value length exceeds maximum: {schema['maxLength']}"]}

        # Check number-specific constraints
        if schema["type"] == "number":
            if "minimum" in schema and value < schema["minimum"]:
                return {"valid": False, "errors": [f"Value is less than minimum: {schema['minimum']}"]}

            if "maximum" in schema and value > schema["maximum"]:
                return {"valid": False, "errors": [f"Value exceeds maximum: {schema['maximum']}"]}

        return {"valid": True}

    async def validate_object(self, obj, schema):
        """Validate an object against a schema."""
        if not isinstance(obj, dict):
            return {"valid": False, "errors": ["Value must be an object"]}

        errors = []

        # Check required properties
        for prop in schema.get("required", []):
            if prop not in obj:
                errors.append(f"Missing required property: {prop}")

        # Validate properties
        if "properties" in schema:
            for prop_name, prop_schema in schema["properties"].items():
                if prop_name in obj:
                    prop_result = await self.validate_input(obj[prop_name], prop_schema["type"])
                    if not prop_result["valid"]:
                        for error in prop_result["errors"]:
                            errors.append(f"Property '{prop_name}': {error}")

        # Check for safety concerns using LLM
        safety_check = await self.check_object_safety(obj, schema)
        if not safety_check["safe"]:
            for concern in safety_check["concerns"]:
                errors.append(f"Safety concern: {concern}")

        return {
            "valid": len(errors) == 0,
            "errors": errors
        }

    async def validate_array(self, arr, schema):
        """Validate an array against a schema."""
        if not isinstance(arr, list):
            return {"valid": False, "errors": ["Value must be an array"]}

        errors = []

        # Check items if specified
        if "items" in schema:
            for i, item in enumerate(arr):
                item_result = await self.validate_input(item, schema["items"]["type"])
                if not item_result["valid"]:
                    for error in item_result["errors"]:
                        errors.append(f"Item {i}: {error}")

        # Check array-specific constraints
        if "minItems" in schema and len(arr) < schema["minItems"]:
            errors.append(f"Array has fewer items than minimum: {schema['minItems']}")

        if "maxItems" in schema and len(arr) > schema["maxItems"]:
            errors.append(f"Array has more items than maximum: {schema['maxItems']}")

        if "uniqueItems" in schema and schema["uniqueItems"]:
            # Check if all items are unique
            if len(arr) != len(set(str(item) for item in arr)):
                errors.append("Array items must be unique")

        return {
            "valid": len(errors) == 0,
            "errors": errors
        }

    async def check_object_safety(self, obj, schema):
        """Check an object for safety concerns."""
        # Convert object to string for LLM analysis
        obj_str = json.dumps(obj, indent=2)

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": "You are an input safety validator. Your task is to identify potential safety concerns in the given input data."
                },
                {
                    "role": "user",
                    "content": f"Analyze this data for safety concerns like injection attacks, malicious content, or privacy issues:\n\n{obj_str}"
                }
            ],
            response_format={"type": "json_object"}
        )

        result = json.loads(response.choices[0].message.content)
        return {
            "safe": not result.get("has_concerns", False),
            "concerns": result.get("concerns", [])
        }

Advanced Trust Mechanisms

Explainability Pattern

The explainability pattern enables agents to explain their reasoning and decisions:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Explainable:
    def __init__(self, llm_client):
        self.llm = llm_client
        self.reasoning_log = []

    async def make_decision_with_explanation(self, context: dict, options: List[str]) -> dict:
        """Make a decision and provide an explanation."""
        # Format the decision context
        context_str = json.dumps(context, indent=2)
        options_str = "\n".join(f"- {option}" for option in options)

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": "You are a decision-making assistant. Your task is to make a decision and explain your reasoning."
                },
                {
                    "role": "user",
                    "content": f"Given the following context, choose the best option and explain your reasoning.\n\n"
                               f"Context:\n{context_str}\n\n"
                               f"Options:\n{options_str}"
                }
            ],
            response_format={"type": "json_object"}
        )

        result = json.loads(response.choices[0].message.content)

        # Log the decision and reasoning
        log_entry = {
            "context": context,
            "options": options,
            "decision": result.get("decision"),
            "reasoning": result.get("reasoning"),
            "timestamp": datetime.now().isoformat()
        }
        self.reasoning_log.append(log_entry)

        return result

    async def explain_past_decision(self, decision_id: str) -> dict:
        """Explain a past decision based on the reasoning log."""
        # Find the decision in the log
        for entry in self.reasoning_log:
            if str(id(entry)) == decision_id:
                return {
                    "decision": entry.get("decision"),
                    "reasoning": entry.get("reasoning"),
                    "context": entry.get("context"),
                    "options": entry.get("options")
                }

        return {"error": "Decision not found"}

    async def generate_explanation_for_action(self, action: dict, user_friendly: bool = False) -> str:
        """Generate an explanation for an action."""
        # Format action details
        action_str = json.dumps(action, indent=2)

        style = "user_friendly" if user_friendly else "technical"

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": f"You are an explainability assistant. Your task is to generate a {style} explanation for an action."
                },
                {
                    "role": "user",
                    "content": f"Explain why this action was taken:\n\n{action_str}"
                }
            ]
        )

        return response.choices[0].message.content

Audit Trail Pattern

The audit trail pattern maintains a complete record of agent actions:

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class AuditTrail:
    def __init__(self, storage_provider=None):
        self.audit_log = []
        self.storage = storage_provider

    def log_action(self, action: dict, actor: str, context: dict = None) -> str:
        """Log an action in the audit trail."""
        entry_id = str(uuid.uuid4())

        entry = {
            "id": entry_id,
            "timestamp": datetime.now().isoformat(),
            "action": action,
            "actor": actor,
            "context": context or {}
        }

        self.audit_log.append(entry)

        # If storage provider exists, persist the entry
        if self.storage:
            self.storage.save_audit_entry(entry)

        return entry_id

    def get_log_entry(self, entry_id: str) -> dict:
        """Retrieve a log entry by ID."""
        # First check in-memory log
        for entry in self.audit_log:
            if entry["id"] == entry_id:
                return entry

        # If not found and storage provider exists, check storage
        if self.storage:
            return self.storage.get_audit_entry(entry_id)

        return None

    def search_log(self, criteria: dict) -> List[dict]:
        """Search the audit log using criteria."""
        results = []

        # Define matching function
        def matches_criteria(entry, criteria):
            for key, value in criteria.items():
                # Handle nested keys using dot notation (e.g., "context.user_id")
                if "." in key:
                    parts = key.split(".")
                    entry_value = entry
                    for part in parts:
                        if part not in entry_value:
                            return False
                        entry_value = entry_value[part]

                    if entry_value != value:
                        return False
                elif key not in entry or entry[key] != value:
                    return False
            return True

        # Search in-memory log
        for entry in self.audit_log:
            if matches_criteria(entry, criteria):
                results.append(entry)

        # If storage provider exists and results are insufficient, search storage
        if self.storage and criteria.get("extended_search", False):
            storage_results = self.storage.search_audit_entries(criteria)

            # Merge results, avoiding duplicates
            existing_ids = {entry["id"] for entry in results}
            for entry in storage_results:
                if entry["id"] not in existing_ids:
                    results.append(entry)

        return results

    def generate_audit_report(self, start_time: str = None, end_time: str = None, 
                            actor: str = None, action_type: str = None) -> dict:
        """Generate an audit report for a specific time period and criteria."""
        # Build search criteria
        criteria = {}
        if actor:
            criteria["actor"] = actor
        if action_type:
            criteria["action.type"] = action_type

        # Filter by time range
        entries = self.search_log(criteria)

        if start_time:
            start_dt = datetime.fromisoformat(start_time)
            entries = [e for e in entries if datetime.fromisoformat(e["timestamp"]) >= start_dt]

        if end_time:
            end_dt = datetime.fromisoformat(end_time)
            entries = [e for e in entries if datetime.fromisoformat(e["timestamp"]) <= end_dt]

        # Generate summary statistics
        action_counts = {}
        actor_counts = {}

        for entry in entries:
            action_type = entry["action"].get("type", "unknown")
            actor = entry["actor"]

            action_counts[action_type] = action_counts.get(action_type, 0) + 1
            actor_counts[actor] = actor_counts.get(actor, 0) + 1

        return {
            "time_period": {
                "start": start_time,
                "end": end_time
            },
            "total_entries": len(entries),
            "action_summary": action_counts,
            "actor_summary": actor_counts,
            "entries": entries
        }

The consent management pattern tracks and respects user consent:

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class ConsentManager:
    def __init__(self, storage_provider=None):
        self.consent_records = {}
        self.storage = storage_provider

    async def record_consent(self, user_id: str, purpose: str, 
                           data_categories: List[str], expiration: str = None) -> str:
        """Record user consent for a specific purpose."""
        consent_id = str(uuid.uuid4())

        consent_record = {
            "id": consent_id,
            "user_id": user_id,
            "purpose": purpose,
            "data_categories": data_categories,
            "granted_at": datetime.now().isoformat(),
            "expiration": expiration,
            "status": "active"
        }

        # Store in memory
        if user_id not in self.consent_records:
            self.consent_records[user_id] = {}

        self.consent_records[user_id][consent_id] = consent_record

        # Persist if storage provider exists
        if self.storage:
            self.storage.save_consent_record(consent_record)

        return consent_id

    async def check_consent(self, user_id: str, purpose: str, data_category: str) -> bool:
        """Check if user has given consent for a specific purpose and data category."""
        # Get user's consent records
        user_records = self.consent_records.get(user_id, {})

        # Check in-memory records first
        for record_id, record in user_records.items():
            if (record["status"] == "active" and
                record["purpose"] == purpose and
                data_category in record["data_categories"]):

                # Check if consent has expired
                if record["expiration"]:
                    expiration_dt = datetime.fromisoformat(record["expiration"])
                    if expiration_dt < datetime.now():
                        continue

                return True

        # Check storage if available
        if self.storage:
            storage_records = self.storage.get_consent_records_for_user(user_id)

            for record in storage_records:
                if (record["status"] == "active" and
                    record["purpose"] == purpose and
                    data_category in record["data_categories"]):

                    # Check if consent has expired
                    if record["expiration"]:
                        expiration_dt = datetime.fromisoformat(record["expiration"])
                        if expiration_dt < datetime.now():
                            continue

                    # Cache the record
                    if user_id not in self.consent_records:
                        self.consent_records[user_id] = {}

                    self.consent_records[user_id][record["id"]] = record

                    return True

        return False

    async def revoke_consent(self, user_id: str, consent_id: str) -> bool:
        """Revoke a specific consent."""
        # Check in-memory records
        if (user_id in self.consent_records and
            consent_id in self.consent_records[user_id]):

            self.consent_records[user_id][consent_id]["status"] = "revoked"
            self.consent_records[user_id][consent_id]["revoked_at"] = datetime.now().isoformat()

            # Update persistent storage
            if self.storage:
                self.storage.update_consent_record(self.consent_records[user_id][consent_id])

            return True

        # Check storage if available
        if self.storage:
            record = self.storage.get_consent_record(consent_id)

            if record and record["user_id"] == user_id:
                record["status"] = "revoked"
                record["revoked_at"] = datetime.now().isoformat()

                # Update storage
                self.storage.update_consent_record(record)

                # Update cache
                if user_id not in self.consent_records:
                    self.consent_records[user_id] = {}

                self.consent_records[user_id][consent_id] = record

                return True

        return False

    async def get_consent_history(self, user_id: str) -> List[dict]:
        """Get the consent history for a user."""
        # Start with in-memory records
        if user_id in self.consent_records:
            records = list(self.consent_records[user_id].values())
        else:
            records = []

        # Add records from storage if available
        if self.storage:
            storage_records = self.storage.get_consent_records_for_user(user_id)

            # Merge, avoiding duplicates
            existing_ids = {record["id"] for record in records}
            for record in storage_records:
                if record["id"] not in existing_ids:
                    records.append(record)

        # Sort by granted_at timestamp
        return sorted(records, key=lambda r: r["granted_at"])

Real-World Implementation: Trusted Financial Advisor

This example demonstrates a financial advisor agent with comprehensive trust and safety mechanisms:

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class TrustedFinancialAdvisor:
    def __init__(self, llm_client):
        self.llm = llm_client

        # Initialize trust and safety components
        self.content_filter = ContentFilter(llm_client)
        self.safety_boundaries = SafetyBoundary(FINANCIAL_BOUNDARIES, llm_client)
        self.input_validator = InputValidator(FINANCIAL_SCHEMAS, llm_client)
        self.explainer = Explainable(llm_client)
        self.audit_trail = AuditTrail()
        self.consent_manager = ConsentManager()

    async def provide_advice(self, user_id: str, financial_situation: dict, query: str):
        """Provide financial advice with trust and safety measures."""
        try:
            # 1. Validate inputs
            situation_validation = await self.input_validator.validate_input(
                financial_situation, 
                "financial_situation"
            )

            if not situation_validation["valid"]:
                return {
                    "success": False,
                    "error": "Invalid financial information",
                    "details": situation_validation["errors"]
                }

            # 2. Check content safety
            query_safety = await self.content_filter.filter_content(query)
            if query_safety.get("has_harmful_content", False):
                return {
                    "success": False,
                    "error": "Potentially harmful query detected",
                    "details": query_safety.get("harmful_categories", [])
                }

            # 3. Check user consent
            has_consent = await self.consent_manager.check_consent(
                user_id, 
                "financial_advice", 
                "financial_data"
            )

            if not has_consent:
                return {
                    "success": False,
                    "error": "No consent for financial advice",
                    "details": "User must provide consent for financial data processing"
                }

            # 4. Generate financial advice
            advice_action = {
                "type": "generate_financial_advice",
                "user_id": user_id,
                "financial_situation": financial_situation,
                "query": query
            }

            # 5. Check if action violates safety boundaries
            safety_check = await self.safety_boundaries.check_action_safety(advice_action)
            if not safety_check["is_safe"]:
                return {
                    "success": False,
                    "error": "Action violates safety boundaries",
                    "details": safety_check["violations"]
                }

            # 6. Generate and log the advice
            advice = await self.generate_advice(financial_situation, query)

            # 7. Filter the generated advice for safety
            advice_safety = await self.content_filter.filter_content(advice)
            if advice_safety.get("has_harmful_content", False):
                filtered_advice = await self.content_filter.get_filtered_version(advice)
                was_filtered = True
            else:
                filtered_advice = advice
                was_filtered = False

            # 8. Generate explanation for the advice
            explanation = await self.explainer.generate_explanation_for_action(
                {
                    "advice": filtered_advice,
                    "financial_situation": financial_situation,
                    "query": query
                },
                user_friendly=True
            )

            # 9. Log the action in the audit trail
            audit_id = self.audit_trail.log_action(
                {
                    "type": "financial_advice_provided",
                    "user_id": user_id,
                    "query": query,
                    "was_filtered": was_filtered
                },
                actor="financial_advisor",
                context={
                    "consent_verified": True,
                    "validation_performed": True
                }
            )

            return {
                "success": True,
                "advice": filtered_advice,
                "explanation": explanation,
                "audit_id": audit_id,
                "was_filtered": was_filtered
            }

        except Exception as e:
            # Log the error
            error_id = self.audit_trail.log_action(
                {
                    "type": "error",
                    "user_id": user_id,
                    "error": str(e)
                },
                actor="financial_advisor",
                context={
                    "query": query
                }
            )

            return {
                "success": False,
                "error": "An error occurred while generating advice",
                "error_id": error_id
            }

    async def generate_advice(self, financial_situation: dict, query: str) -> str:
        """Generate financial advice based on the user's situation and query."""
        # Format the input for the LLM
        situation_str = json.dumps(financial_situation, indent=2)

        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system", 
                    "content": "You are a trusted financial advisor assistant. Provide helpful financial advice based on the user's situation."
                },
                {
                    "role": "user",
                    "content": f"Financial situation:\n{situation_str}\n\nUser query: {query}"
                }
            ]
        )

        return response.choices[0].message.content

Conclusion

Building trustworthy AI agents requires a comprehensive approach to safety, ethics, and responsible operation. By implementing the patterns described in this article, developers can create agents that not only deliver valuable functionality but also operate within appropriate boundaries, protect user privacy, and maintain transparency.

As AI agents continue to evolve and gain more autonomy, these trust and safety mechanisms will become increasingly critical components of any production-grade AI system.

The next article in this series will explore the Planning and Orchestration Pattern, showing how agents can systematically approach complex problems.


Series Navigation