Skip to content

Redis State Store

Shared state store for multi-agent coordination and caching, backed by Redis with an automatic in-memory fallback.

Setup

pip install redis

# Start Redis (Docker)
docker run -d --name redis -p 6379:6379 redis:7-alpine

# Or set connection URL
export REDIS_URL=redis://localhost:6379/0

No Redis? No problem — the store automatically uses a thread-safe in-memory dict with the same API.

Why a State Store?

In a multi-agent setup, agents need to share information:

Problem Without StateStore With StateStore
Agent A changes steam flow → Agent B needs to know Agents can't communicate Agent A writes, Agent B reads from shared store
MCP tool returns energy prices → all 4 agents need it Each agent calls MCP separately (4× latency) One call → cache → all agents read from cache
Count constraint violations across episode Each agent tracks locally Atomic increment() on shared counter
Track global reactor state Agents only see their local view publish_state() makes full state available

Usage

from methanol_apc_env.integrations import StateStore

# Auto-detects Redis from REDIS_URL env var
store = StateStore()
print(f"Backend: {store.backend}")  # "redis" or "memory"

Basic Operations

# Store any JSON-serializable value
store.set("plant_mode", "optimization")
store.set("global_temperature", 252.3)
store.set("pricing", {"gas": 3.42, "electricity": 0.11})

# Retrieve
temp = store.get("global_temperature")           # 252.3
pricing = store.get("pricing")                    # {"gas": 3.42, ...}
missing = store.get("nonexistent", default=0.0)   # 0.0

# Check existence
store.exists("plant_mode")   # True

# Delete
store.delete("plant_mode")

TTL (Time-to-Live) Cache

# Cache energy pricing for 5 minutes (300 seconds)
store.set("energy_pricing", {"gas": 3.42, "elec": 0.11}, ttl_seconds=300)

# After 5 minutes, key auto-expires (Redis only; memory store has no TTL)

Batch Operations

# Write multiple keys atomically (single Redis pipeline)
store.set_many({
    "reactor_temp": 252.3,
    "reactor_pressure": 81.2,
    "catalyst_health": 0.93,
})

# Read multiple keys in one call
data = store.get_many(["reactor_temp", "reactor_pressure", "catalyst_health"])
# {"reactor_temp": 252.3, "reactor_pressure": 81.2, "catalyst_health": 0.93}

Multi-Agent Reactor State Sharing

# After each env.step(), publish the full state
store.publish_state(reactor_state)

# Any agent reads the latest state
state = store.get_reactor_state()
# {"temperature": 252.3, "pressure": 81.2, "feed_rate_h2": 5.0, ...}

Energy Pricing Cache

# Cache MCP tool result (avoids redundant API calls)
store.cache_energy_pricing(gas_price=3.42, elec_price=0.11)

# All agents read from cache (5-minute TTL)
pricing = store.get_energy_pricing()
# {"gas_price": 3.42, "electricity_price": 0.11}

Atomic Counters

# Count constraint violations across episode
store.increment("constraint_violations")      # → 1
store.increment("constraint_violations")      # → 2
store.increment("constraint_violations", 3)   # → 5

# Reset at episode start
store.clear_all()

Multi-Agent Example

from methanol_apc_env.integrations import StateStore
from methanol_apc_env.agents import ReformerAgent, SynthesisAgent, SupervisoryAgent

store = StateStore()
env = MethanolAPCEnvironment()
obs = env.reset(task_name="optimization")

for step in range(100):
    # Publish state so all agents see it
    store.publish_state(env._sim_state)

    # Cache energy pricing (one MCP call, not 4)
    if step % 10 == 0:
        store.cache_energy_pricing(gas_price=3.42, elec_price=0.11)

    # Each agent reads shared state
    pricing = store.get_energy_pricing()
    global_state = store.get_reactor_state()

    # Agents make decisions with full context
    r = ReformerAgent().rule_based_action(obs)
    s = SynthesisAgent().rule_based_action(obs)

    # Track violations
    if obs.temperature > 280:
        store.increment("constraint_violations")

    action = SupervisoryAgent.merge_actions(r, s, p)
    obs = env.step(action)

violations = store.get("constraint_violations")
print(f"Total violations: {violations}")

Fallback Behavior

Feature Redis Mode Memory Mode
set / get Redis server dict with threading.Lock
TTL expiry Automatic Not supported (keys persist)
Batch operations Redis pipeline (atomic) Sequential dict updates
increment INCRBY (atomic) Lock-protected +=
Persistence Survives process restart Lost on restart
Multi-process Shared across processes Per-process only