Source code for mesa_llm.tools.inbuilt_tools

import json
import logging
from typing import TYPE_CHECKING, Any

from mesa.discrete_space import (
    OrthogonalMooreGrid,
    OrthogonalVonNeumannGrid,
)
from mesa.space import (
    ContinuousSpace,
    MultiGrid,
    SingleGrid,
)

from mesa_llm.tools.tool_decorator import tool

if TYPE_CHECKING:
    from mesa_llm.llm_agent import LLMAgent

logger = logging.getLogger(__name__)

# Mapping directions to (dx, dy) for Cartesian-style spaces.
direction_map_xy = {
    "North": (0, 1),
    "South": (0, -1),
    "East": (1, 0),
    "West": (-1, 0),
    "NorthEast": (1, 1),
    "NorthWest": (-1, 1),
    "SouthEast": (1, -1),
    "SouthWest": (-1, -1),
}


# Mapping directions to (drow, dcol) for mesa.discrete_space orthogonal grids.
direction_map_row_col = {
    "North": (-1, 0),
    "South": (1, 0),
    "East": (0, 1),
    "West": (0, -1),
    "NorthEast": (-1, 1),
    "NorthWest": (-1, -1),
    "SouthEast": (1, 1),
    "SouthWest": (1, -1),
}


def _get_agent_position(agent: "LLMAgent") -> Any:
    """Return the agent position across Mesa space APIs."""
    cell = getattr(agent, "cell", None)
    if cell is not None and getattr(cell, "coordinate", None) is not None:
        return cell.coordinate

    pos = getattr(agent, "pos", None)
    if pos is not None:
        return pos

    position = getattr(agent, "position", None)
    if position is not None:
        return position

    raise ValueError(
        "Could not infer agent position from `cell`, `pos`, or `position`."
    )


[docs] @tool def move_one_step(agent: "LLMAgent", direction: str) -> str: """ Moves agents one step in specified cardinal/diagonal directions (North, South, East, West, NorthEast, NorthWest, SouthEast, SouthWest). Automatically handles different Mesa grid types including SingleGrid, MultiGrid, OrthogonalGrids, and ContinuousSpace. Args: direction: The direction to move in. Must be one of: 'North', 'South', 'East', 'West', 'NorthEast', 'NorthWest', 'SouthEast', or 'SouthWest'. agent: Provided automatically. Returns: A string confirming the result of the movement attempt. """ if direction not in direction_map_xy: raise ValueError( f"Invalid direction '{direction}'." f"Must be one of {list(direction_map_xy.keys())}" ) grid = getattr(agent.model, "grid", None) if isinstance(grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid): row, col = _get_agent_position(agent) drow, dcol = direction_map_row_col[direction] new_pos = (row + drow, col + dcol) if grid.torus: dimensions = grid.dimensions if len(dimensions) == len(new_pos): new_pos = tuple(coord % dim for coord, dim in zip(new_pos, dimensions)) elif new_pos not in grid._cells: return ( f"Agent {agent.unique_id} is at the boundary and cannot move " f"{direction}. Try a different direction." ) target_cell = grid._cells.get(new_pos) if target_cell is None: return ( f"Agent {agent.unique_id} is at the boundary and cannot move " f"{direction}. Try a different direction." ) if target_cell.is_full: return ( f"Agent {agent.unique_id} cannot move {direction} because " "the target cell is full." ) target_coordinates = tuple(target_cell.coordinate) return teleport_to_location(agent, target_coordinates) space = getattr(agent.model, "space", None) grid_or_space = None if isinstance(grid, SingleGrid | MultiGrid): grid_or_space = grid elif isinstance(space, ContinuousSpace): grid_or_space = space if grid_or_space is not None: dx, dy = direction_map_xy[direction] x, y = _get_agent_position(agent) new_pos = (x + dx, y + dy) if grid_or_space.torus: new_pos = grid_or_space.torus_adj(new_pos) elif grid_or_space.out_of_bounds(new_pos): return ( f"Agent {agent.unique_id} is at the boundary and cannot move " f"{direction}. Try a different direction." ) if isinstance(grid_or_space, SingleGrid) and not grid_or_space.is_cell_empty( new_pos ): return ( f"Agent {agent.unique_id} cannot move {direction} because " "the target cell is occupied." ) target_coordinates = tuple(new_pos) return teleport_to_location(agent, target_coordinates) raise ValueError( "Unsupported environment for move_one_step. Expected SingleGrid, " "MultiGrid, OrthogonalMooreGrid, OrthogonalVonNeumannGrid, or " "ContinuousSpace." )
[docs] @tool def teleport_to_location( agent: "LLMAgent", target_coordinates: list[int | float], ) -> str: """ Instantly moves agents to specific [x, y] coordinates within grid boundaries. Useful for rapid repositioning or spawning mechanics. Validates coordinates are within environment bounds. Args: target_coordinates: Exactly two numeric coordinates in the form [x, y] that fall inside the current environment bounds. Examples: [3, 7] or [3.5, 7.25] agent: Provided automatically Returns: a string confirming the agent's new position. """ target_coordinates = tuple(target_coordinates) if isinstance(agent.model.grid, SingleGrid | MultiGrid): agent.model.grid.move_agent(agent, target_coordinates) elif isinstance(agent.model.grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid): cell = agent.model.grid._cells[target_coordinates] agent.cell = cell elif isinstance(agent.model.space, ContinuousSpace): agent.model.space.move_agent(agent, target_coordinates) else: raise ValueError( "Unsupported environment for teleport_to_location. Expected " "SingleGrid, MultiGrid, OrthogonalMooreGrid, " "OrthogonalVonNeumannGrid, or ContinuousSpace." ) return f"agent {agent.unique_id} moved to {target_coordinates}."
[docs] @tool def speak_to( agent: "LLMAgent", listener_agents_unique_ids: list[int], message: str ) -> str: """ Enables agent-to-agent communication by sending messages to specified recipients. Messages are automatically added to recipients' memory systems for future reasoning context. Supports both single agent and multiple agent communication. Args: agent: The agent sending the message(conversation contents) (as a LLM, ignore this argument in function calling). listener_agents_unique_ids: The unique ids of the agents receiving the message message: The message to send """ if isinstance(listener_agents_unique_ids, str): try: listener_agents_unique_ids = json.loads(listener_agents_unique_ids) except (json.JSONDecodeError, ValueError): listener_agents_unique_ids = [ int(x.strip()) for x in listener_agents_unique_ids.strip("[]").split(",") if x.strip() ] listener_agents_unique_ids = [ int(uid) for uid in (listener_agents_unique_ids or []) ] listener_agents = [ listener_agent for listener_agent in agent.model.agents if listener_agent.unique_id in listener_agents_unique_ids and listener_agent.unique_id != agent.unique_id ] delivered_ids = [] skipped_ids = [] for recipient in listener_agents: if not hasattr(recipient, "memory"): skipped_ids.append(recipient.unique_id) logger.warning( "Agent %s has no memory attribute; skipping speak_to.", recipient.unique_id, ) continue delivered_ids.append(recipient.unique_id) recipient.memory.add_to_memory( type="message", content={ "message": message, "sender": agent.unique_id, }, ) status_parts = [] if delivered_ids: status_parts.append(f"sent message {message!r} to {delivered_ids}") if skipped_ids: status_parts.append( f"skipped {skipped_ids} because they have no `memory` attribute" ) if not status_parts: return f"Could not send message {message!r}: no matching recipients found." return "; ".join(status_parts)