Source code for mesa_llm.memory.episodic_memory

import json
from collections import deque
from typing import TYPE_CHECKING

from pydantic import BaseModel, Field

from mesa_llm.memory.memory import Memory, MemoryEntry, _format_message_entry

if TYPE_CHECKING:
    from mesa_llm.llm_agent import LLMAgent


[docs] class EventGrade(BaseModel): grade: int = Field( description="Integer score representing the importance of the event" )
[docs] def normalize_dict_values(scores: dict, min_target: float, max_target: float) -> dict: """ Normalize dictionary values to a target range with min-max scaling. This mirrors the min-max helper used in the Generative Agents reference retrieval implementation: https://github.com/joonspk-research/generative_agents/blob/main/reverie/backend_server/persona/cognitive_modules/retrieve.py """ if not scores: return {} vals = list(scores.values()) min_val = min(vals) max_val = max(vals) range_val = max_val - min_val if range_val == 0: midpoint = (max_target - min_target) / 2 + min_target for key in scores: scores[key] = midpoint else: for key, val in scores.items(): scores[key] = (val - min_val) * ( max_target - min_target ) / range_val + min_target return scores
[docs] class EpisodicMemory(Memory): """ Event-level memory with LLM-based importance scoring and recency-aware retrieval. Credit / references: - Paper: Generative Agents: Interactive Simulacra of Human Behavior https://arxiv.org/abs/2304.03442 - Reference retrieval code: https://github.com/joonspk-research/generative_agents/blob/main/reverie/backend_server/persona/cognitive_modules/retrieve.py This implementation is inspired by the paper's retrieval scoring design (component-wise min-max normalization, then weighted combination). It is not a strict copy of the original code: relevance scoring via embeddings is not implemented yet, and recency is computed from step age. """ def __init__( self, agent: "LLMAgent", llm_model: str | None = None, display: bool = True, max_capacity: int = 200, considered_entries: int = 30, recency_decay: float = 0.995, api_base: str | None = None, ): """ Initialize the EpisodicMemory. Args: agent : the agent that owns this memory llm_model : the model used to grade event importance display : whether to display memory entries in the console max_capacity : maximum number of finalized episodic entries to keep considered_entries : number of entries to consider during retrieval recency_decay : exponential decay factor for recency scoring api_base : the API base URL to use for the LLM provider """ if not llm_model: raise ValueError( "llm_model must be provided for the usage of episodic memory" ) super().__init__( agent, llm_model=llm_model, api_base=api_base, display=display, ) self.max_capacity = max_capacity self.memory_entries = deque(maxlen=self.max_capacity) self.considered_entries = considered_entries self.recency_decay = recency_decay self.system_prompt = """ You are an assistant that evaluates memory entries on a scale from 1 to 5, based on their importance to a specific problem or task. Your goal is to assign a score that reflects how much each entry contributes to understanding, solving, or advancing the task. Use the following grading scale: 5 - Critical: Introduces essential, novel information that significantly impacts problem-solving or decision-making. 4 - High: Provides important context or clarification that meaningfully improves understanding or direction. 3 - Moderate: Adds somewhat useful information that may assist but is not essential. 2 - Low: Offers minimal relevance or slight redundancy; impact is marginal. 1 - Irrelevant: Contains no useful or applicable information for the current problem. Only assess based on the entry's content and its value to the task at hand. Ignore style, grammar, or tone. """ def _extract_importance(self, entry) -> int: """ Safely extracts importance score regardless of data structure. Handles: - Nested: {"msg": {"importance": 5}} - Flat: {"importance": 5} """ if "importance" in entry.content: val = entry.content["importance"] return val if isinstance(val, (int, float)) else 1 for value in entry.content.values(): if isinstance(value, dict) and "importance" in value: val = value["importance"] return val if isinstance(val, (int, float)) else 1 return 1 def _build_grade_prompt(self, type: str, content: dict) -> str: """ This helper assembles a prompt that includes the event type, event content, and up to the five most recent memory entries for contextual grounding. It is shared by both synchronous and asynchronous grading methods to avoid duplicated prompt-construction logic. """ if len(self.memory_entries) > 0: entries = list(self.memory_entries)[-5:] previous_entries = "previous memory entries:\n\n".join( [str(entry) for entry in entries] ) else: previous_entries = "No previous memory entries" return f""" grade the importance of the following event on a scale from 1 to 5: {type}: {content} ------------------------------ {previous_entries} """
[docs] def grade_event_importance(self, type: str, content: dict) -> float: """ Grade this event based on the content respect to the previous memory entries """ prompt = self._build_grade_prompt(type, content) self.llm.system_prompt = self.system_prompt rsp = self.llm.generate( prompt=prompt, response_format=EventGrade, ) formatted_response = json.loads(rsp.choices[0].message.content) return formatted_response["grade"]
[docs] async def agrade_event_importance(self, type: str, content: dict) -> float: """ Asynchronous version of grade_event_importance """ prompt = self._build_grade_prompt(type, content) self.llm.system_prompt = self.system_prompt rsp = await self.llm.agenerate( prompt=prompt, response_format=EventGrade, ) formatted_response = json.loads(rsp.choices[0].message.content) return formatted_response["grade"]
[docs] def retrieve_top_k_entries(self, k: int) -> list[MemoryEntry]: """ Retrieve the top-k entries using normalized importance and recency. Notes: - Inspired by Generative Agents retrieval scoring: recency/importance/relevance are normalized separately and combined. - This implementation currently combines importance + recency only. Relevance (embedding cosine similarity with a focal query) is pending. """ if not self.memory_entries: return [] importance_dict = {} recency_dict = {} entries = list(self.memory_entries) current_step = self.agent.model.steps for i, entry in enumerate(entries): importance_dict[i] = self._extract_importance(entry) age = current_step - entry.step recency_dict[i] = self.recency_decay**age importance_scaled = normalize_dict_values(importance_dict, 0, 1) recency_scaled = normalize_dict_values(recency_dict, 0, 1) final_scores = [] for i in range(len(entries)): total_score = importance_scaled[i] + recency_scaled[i] final_scores.append((total_score, entries[i])) final_scores.sort(key=lambda x: x[0], reverse=True) return [entry for _, entry in final_scores[:k]]
def _finalize_entry(self, type: str, graded_content: dict): """Create and persist a finalized episodic entry.""" new_entry = MemoryEntry( agent=self.agent, content={type: graded_content}, step=self.agent.model.steps, ) self.memory_entries.append(new_entry)
[docs] def add_to_memory(self, type: str, content: dict): """ grading logic + adding to memory function call """ graded_content = { **content, "importance": self.grade_event_importance(type, content), } self._finalize_entry(type, graded_content)
[docs] async def aadd_to_memory(self, type: str, content: dict): """ Async version of add_to_memory + grading logic """ graded_content = { **content, "importance": await self.agrade_event_importance(type, content), } self._finalize_entry(type, graded_content)
[docs] def get_prompt_ready(self) -> str: return f"Top {self.considered_entries} memory entries:\n\n" + "\n".join( [ str(entry) for entry in self.retrieve_top_k_entries(self.considered_entries) ] )
[docs] def get_communication_history(self) -> str: """ Get the communication history """ lines = [] for entry in self.memory_entries: if "message" not in entry.content: continue msgs = entry.content["message"] if isinstance(msgs, list): for msg in msgs: lines.append(f"Step {entry.step}: {_format_message_entry(msg)}\n\n") else: lines.append(f"Step {entry.step}: {_format_message_entry(msgs)}\n\n") return "\n".join(lines)
[docs] async def aprocess_step(self, pre_step: bool = False): """ Asynchronous version of process_step. EpisodicMemory persists entries at add-time and does not use two-phase pre/post-step buffering. """ return
[docs] def process_step(self, pre_step: bool = False): """ Process step hook (no-op for episodic memory). EpisodicMemory persists entries at add-time and does not use two-phase pre/post-step buffering. """ return