AI Agent for Utilities: Automate Grid Management, Water Treatment & Customer Operations

March 28, 2026 14 min read Utilities AI Agents

The US electric grid experiences $150 billion per year in outage costs. Water utilities lose 20–30% of treated water to leaks before it reaches customers. Customer call centers handle 2–5 million calls per year per large utility, with 40% being simple billing inquiries. AI agents that predict equipment failure, optimize water treatment chemistry, detect leaks acoustically, and handle customer requests autonomously are reshaping the $2.1 trillion global utilities sector.

This guide covers building autonomous agents for electric, gas, and water utilities. Production-ready Python code, SCADA/OT integration patterns, and hard ROI numbers from real deployments.

Table of Contents

1. Smart Grid Optimization Agent

Modern grids must balance variable renewable generation (solar, wind), distributed energy resources (rooftop solar, batteries, EVs), and demand patterns that shift hourly. The agent processes real-time telemetry from thousands of sensors to optimize voltage regulation, reactive power, and load balancing across the distribution network.

import numpy as np
from datetime import datetime, timedelta

class SmartGridAgent:
    """Optimizes electric grid operations in real time."""

    VOLTAGE_LIMITS = {
        "distribution_120v": {"min": 114, "max": 126},   # ANSI C84.1 Range A
        "distribution_240v": {"min": 228, "max": 252},
        "primary_4kv": {"min": 3800, "max": 4200},
        "primary_13kv": {"min": 12350, "max": 13650},
    }

    def __init__(self, scada_feed, weather_api, der_registry, market_api):
        self.scada = scada_feed
        self.weather = weather_api
        self.der = der_registry      # Distributed Energy Resources
        self.market = market_api

    def optimize_voltage(self, feeder_id):
        """Volt-VAR optimization for a distribution feeder."""
        measurements = self.scada.get_feeder_measurements(feeder_id)
        capacitor_banks = self.scada.get_capacitors(feeder_id)
        voltage_regulators = self.scada.get_regulators(feeder_id)

        violations = []
        actions = []

        for node in measurements:
            voltage_class = node["voltage_class"]
            limits = self.VOLTAGE_LIMITS.get(voltage_class, {})
            v = node["voltage"]

            if v < limits.get("min", 0):
                violations.append({
                    "node": node["id"],
                    "type": "undervoltage",
                    "value": v,
                    "limit": limits["min"],
                    "deviation_pct": round((limits["min"] - v) / limits["min"] * 100, 2),
                })
            elif v > limits.get("max", 999999):
                violations.append({
                    "node": node["id"],
                    "type": "overvoltage",
                    "value": v,
                    "limit": limits["max"],
                    "deviation_pct": round((v - limits["max"]) / limits["max"] * 100, 2),
                })

        # Determine corrections
        if violations:
            low_v = [v for v in violations if v["type"] == "undervoltage"]
            high_v = [v for v in violations if v["type"] == "overvoltage"]

            if low_v:
                # Switch on capacitor banks to boost voltage
                for cap in capacitor_banks:
                    if cap["status"] == "open":
                        actions.append({
                            "device": cap["id"],
                            "action": "close",
                            "type": "capacitor_bank",
                            "expected_voltage_boost_pct": 1.5,
                        })
                        break

                # Raise voltage regulator taps
                for reg in voltage_regulators:
                    if reg["tap_position"] < reg["max_tap"]:
                        actions.append({
                            "device": reg["id"],
                            "action": "raise_tap",
                            "type": "voltage_regulator",
                            "current_tap": reg["tap_position"],
                            "new_tap": reg["tap_position"] + 1,
                        })

            if high_v:
                # Opposite: open caps, lower taps
                for cap in capacitor_banks:
                    if cap["status"] == "closed":
                        actions.append({
                            "device": cap["id"],
                            "action": "open",
                            "type": "capacitor_bank",
                        })
                        break

        return {
            "feeder_id": feeder_id,
            "timestamp": datetime.utcnow().isoformat(),
            "violations": violations,
            "actions": actions,
            "total_nodes": len(measurements),
            "compliant_pct": round(
                (len(measurements) - len(violations)) / len(measurements) * 100, 1
            ),
        }

    def forecast_renewable_output(self, feeder_id, hours_ahead=24):
        """Predict solar/wind generation on a feeder."""
        der_assets = self.der.get_assets(feeder_id)
        solar = [d for d in der_assets if d["type"] == "solar"]
        wind = [d for d in der_assets if d["type"] == "wind"]

        forecast = []
        weather = self.weather.get_hourly_forecast(feeder_id, hours_ahead)

        for hour_offset in range(hours_ahead):
            wx = weather[hour_offset]
            solar_output = sum(
                s["capacity_kw"] * self._solar_capacity_factor(
                    wx["cloud_cover_pct"], wx["solar_elevation_deg"]
                ) for s in solar
            )
            wind_output = sum(
                w["capacity_kw"] * self._wind_capacity_factor(
                    wx["wind_speed_ms"], w["hub_height_m"]
                ) for w in wind
            )

            forecast.append({
                "hour_offset": hour_offset,
                "solar_kw": round(solar_output, 1),
                "wind_kw": round(wind_output, 1),
                "total_der_kw": round(solar_output + wind_output, 1),
                "cloud_cover": wx["cloud_cover_pct"],
                "wind_speed": wx["wind_speed_ms"],
            })

        return forecast

    def _solar_capacity_factor(self, cloud_cover_pct, elevation_deg):
        if elevation_deg <= 0:
            return 0
        clear_sky = min(1.0, elevation_deg / 60)  # Simplified
        cloud_factor = 1 - (cloud_cover_pct / 100 * 0.75)
        return clear_sky * cloud_factor

    def _wind_capacity_factor(self, wind_speed_ms, hub_height):
        # Simplified power curve (cut-in 3, rated 12, cut-out 25 m/s)
        if wind_speed_ms < 3 or wind_speed_ms > 25:
            return 0
        if wind_speed_ms >= 12:
            return 1.0
        return ((wind_speed_ms - 3) / 9) ** 3
Production tip: Volt-VAR optimization (VVO) alone saves utilities 2–4% on distribution losses. At scale, that's $20–50M per year for a large utility. The key is sub-second SCADA telemetry and accurate feeder models.

2. Outage Prediction & Response Agent

Power outages cost the US economy $150B per year. Most are caused by vegetation contact, equipment failure, and weather. The agent combines weather forecasts, vegetation growth models, equipment age data, and historical outage patterns to predict and prevent outages before they happen.

class OutagePredictionAgent:
    """Predicts and manages power outages."""

    def __init__(self, oms_client, weather_api, asset_db, vegetation_model):
        self.oms = oms_client          # Outage Management System
        self.weather = weather_api
        self.assets = asset_db
        self.veg = vegetation_model

    def predict_outage_risk(self, region, hours_ahead=48):
        """Predict outage probability by circuit."""
        circuits = self.assets.get_circuits(region)
        weather = self.weather.get_severe_forecast(region, hours_ahead)
        risk_scores = []

        for circuit in circuits:
            # Weather risk
            weather_risk = 0
            if weather.get("wind_gust_mph", 0) > 40:
                weather_risk += 0.3
            if weather.get("ice_accumulation_in", 0) > 0.25:
                weather_risk += 0.5  # Ice storms are devastating
            if weather.get("lightning_probability", 0) > 0.6:
                weather_risk += 0.15

            # Equipment age risk
            avg_pole_age = circuit.get("avg_pole_age_years", 30)
            equipment_risk = min(0.3, avg_pole_age / 100)

            # Vegetation risk
            veg_score = self.veg.get_encroachment_score(circuit["id"])
            veg_risk = veg_score * 0.25

            # Historical pattern
            historical = self.oms.get_outage_frequency(
                circuit["id"], years=3
            )
            history_risk = min(0.2, historical / 10)

            composite = min(1.0, weather_risk + equipment_risk + veg_risk + history_risk)
            risk_scores.append({
                "circuit_id": circuit["id"],
                "circuit_name": circuit["name"],
                "customers_served": circuit["customer_count"],
                "risk_score": round(composite, 3),
                "risk_level": (
                    "critical" if composite > 0.7
                    else "high" if composite > 0.4
                    else "medium" if composite > 0.2
                    else "low"
                ),
                "primary_driver": max(
                    [("weather", weather_risk), ("equipment", equipment_risk),
                     ("vegetation", veg_risk), ("history", history_risk)],
                    key=lambda x: x[1]
                )[0],
                "components": {
                    "weather": round(weather_risk, 3),
                    "equipment": round(equipment_risk, 3),
                    "vegetation": round(veg_risk, 3),
                    "history": round(history_risk, 3),
                },
            })

        return sorted(risk_scores, key=lambda r: -r["risk_score"])

    def optimize_crew_dispatch(self, active_outages, available_crews):
        """Assign restoration crews to outages by priority."""
        # Priority: customers affected * estimated restoration time * critical facilities
        scored_outages = []
        for outage in active_outages:
            critical_count = outage.get("critical_facilities", 0)  # hospitals, etc
            customer_impact = outage["customers_affected"]
            est_restore_hrs = outage["estimated_restore_hours"]

            priority = (
                customer_impact * 1.0 +
                critical_count * 500 +
                (1 / max(est_restore_hrs, 0.5)) * 100  # Faster fixes first
            )
            scored_outages.append({**outage, "priority_score": priority})

        scored_outages.sort(key=lambda o: -o["priority_score"])

        assignments = []
        assigned_crews = set()
        for outage in scored_outages:
            best_crew = None
            best_travel = float("inf")

            for crew in available_crews:
                if crew["id"] in assigned_crews:
                    continue
                if crew["skill_level"] < outage.get("min_skill_required", 1):
                    continue
                travel = self._estimate_travel_time(
                    crew["location"], outage["location"]
                )
                if travel < best_travel:
                    best_travel = travel
                    best_crew = crew

            if best_crew:
                assignments.append({
                    "outage_id": outage["id"],
                    "crew_id": best_crew["id"],
                    "travel_time_min": round(best_travel),
                    "customers_affected": outage["customers_affected"],
                    "priority_score": round(outage["priority_score"]),
                })
                assigned_crews.add(best_crew["id"])

        return assignments

3. Water Treatment Optimization Agent

Water treatment plants consume 2–4% of national electricity and use $3–8 billion in chemicals annually. Raw water quality varies daily based on weather, seasonal runoff, and upstream events. The agent adjusts chemical dosing, filtration rates, and disinfection in real time to maintain EPA compliance while minimizing chemical and energy costs.

class WaterTreatmentAgent:
    """Optimizes water treatment plant operations."""

    EPA_LIMITS = {
        "turbidity_ntu": 0.3,         # 95th percentile
        "chlorine_residual_mg_l": {"min": 0.2, "max": 4.0},
        "ph": {"min": 6.5, "max": 8.5},
        "lead_ppb": 15,
        "thm_ppb": 80,                # Trihalomethanes
        "haa5_ppb": 60,               # Haloacetic acids
    }

    def __init__(self, scada_feed, lab_results, chemical_inventory, weather_api):
        self.scada = scada_feed
        self.lab = lab_results
        self.chemicals = chemical_inventory
        self.weather = weather_api

    def optimize_coagulant_dose(self, plant_id):
        """Adjust coagulant dosing based on raw water quality."""
        raw = self.scada.get_current(plant_id, "raw_water")
        turbidity = raw["turbidity_ntu"]
        ph = raw["ph"]
        temperature = raw["temp_c"]
        toc = raw.get("toc_mg_l", 3.0)     # Total organic carbon
        alkalinity = raw.get("alkalinity_mg_l", 100)

        # Jar test correlation model
        # Base dose from turbidity (empirical)
        if turbidity < 5:
            base_dose = 15  # mg/L alum
        elif turbidity < 20:
            base_dose = 25
        elif turbidity < 100:
            base_dose = 40
        else:
            base_dose = 60

        # Temperature correction (cold water needs more coagulant)
        if temperature < 5:
            temp_factor = 1.3
        elif temperature < 10:
            temp_factor = 1.15
        else:
            temp_factor = 1.0

        # TOC correction (more organics = more coagulant)
        toc_factor = 1.0 + max(0, (toc - 2.0)) * 0.1

        # pH adjustment needed?
        optimal_ph = 6.8  # Optimal for alum coagulation
        ph_adjustment = optimal_ph - ph

        recommended_dose = round(base_dose * temp_factor * toc_factor, 1)

        return {
            "plant_id": plant_id,
            "raw_turbidity_ntu": turbidity,
            "raw_ph": ph,
            "raw_temp_c": temperature,
            "raw_toc_mg_l": toc,
            "recommended_coagulant_mg_l": recommended_dose,
            "current_dose_mg_l": self.scada.get_current_dose(plant_id, "coagulant"),
            "ph_adjustment_needed": round(ph_adjustment, 2),
            "alkalinity_sufficient": alkalinity > recommended_dose * 0.45,
        }

    def monitor_disinfection(self, plant_id):
        """Ensure adequate disinfection while minimizing DBP formation."""
        clearwell = self.scada.get_current(plant_id, "clearwell")
        ct = clearwell["chlorine_mg_l"] * clearwell["contact_time_min"]

        # CT requirement depends on temperature and pH
        required_ct = self._get_required_ct(
            clearwell["temp_c"], clearwell["ph"],
            target_log_removal=3.0  # 3-log Giardia
        )

        # DBP formation potential
        toc = clearwell.get("toc_mg_l", 2.0)
        chlorine = clearwell["chlorine_mg_l"]
        temp = clearwell["temp_c"]
        # Simplified THM formation model
        estimated_thm = toc * chlorine * (temp / 20) * 8  # ppb estimate

        return {
            "plant_id": plant_id,
            "actual_ct": round(ct, 1),
            "required_ct": round(required_ct, 1),
            "ct_ratio": round(ct / required_ct, 2),
            "compliant": ct >= required_ct,
            "chlorine_residual": clearwell["chlorine_mg_l"],
            "estimated_thm_ppb": round(estimated_thm, 1),
            "thm_limit_ppb": self.EPA_LIMITS["thm_ppb"],
            "thm_margin_pct": round(
                (1 - estimated_thm / self.EPA_LIMITS["thm_ppb"]) * 100, 1
            ),
            "recommendation": (
                "reduce_chlorine" if estimated_thm > 60 and ct > required_ct * 1.5
                else "increase_chlorine" if ct < required_ct * 1.1
                else "maintain"
            ),
        }

4. Leak Detection Agent

Water utilities lose 20–30% of treated water to leaks — called non-revenue water (NRW). In developing countries, NRW can exceed 50%. The agent uses acoustic sensors, flow balance analysis, and pressure transient monitoring to detect and locate leaks in real time.

class LeakDetectionAgent:
    """Detects and locates water network leaks."""

    def __init__(self, sensor_network, hydraulic_model, gis_data):
        self.sensors = sensor_network
        self.model = hydraulic_model
        self.gis = gis_data

    def district_metered_area_analysis(self, dma_id):
        """Flow balance analysis for a DMA to detect leaks."""
        inflow = self.sensors.get_flow(dma_id, "inlet")
        outflows = self.sensors.get_flow(dma_id, "outlets")
        consumption = self.sensors.get_metered_consumption(dma_id)

        total_inflow = sum(f["flow_m3h"] for f in inflow)
        total_outflow = sum(f["flow_m3h"] for f in outflows)
        total_consumption = consumption["total_m3h"]

        net_inflow = total_inflow - total_outflow
        unaccounted = net_inflow - total_consumption
        nrw_pct = (unaccounted / net_inflow * 100) if net_inflow > 0 else 0

        # Minimum Night Flow (MNF) analysis — best leak indicator
        mnf = self.sensors.get_minimum_night_flow(dma_id)
        legitimate_night_use = consumption.get("night_use_estimate_m3h", 0.5)
        leak_estimate = max(0, mnf - legitimate_night_use)

        # Trend analysis
        mnf_history = self.sensors.get_mnf_history(dma_id, days=30)
        mnf_trend = np.polyfit(range(len(mnf_history)), mnf_history, 1)[0]

        return {
            "dma_id": dma_id,
            "net_inflow_m3h": round(net_inflow, 2),
            "consumption_m3h": round(total_consumption, 2),
            "unaccounted_m3h": round(unaccounted, 2),
            "nrw_pct": round(nrw_pct, 1),
            "mnf_m3h": round(mnf, 2),
            "estimated_leakage_m3h": round(leak_estimate, 2),
            "mnf_trend_m3h_per_day": round(mnf_trend, 4),
            "alert": nrw_pct > 25 or mnf_trend > 0.05,
            "severity": (
                "critical" if nrw_pct > 40 or leak_estimate > 10
                else "high" if nrw_pct > 25
                else "medium" if nrw_pct > 15
                else "low"
            ),
        }

    def acoustic_leak_locate(self, pipe_segment_id):
        """Use acoustic sensor correlation to pinpoint leak location."""
        sensors = self.sensors.get_acoustic_pair(pipe_segment_id)
        if len(sensors) < 2:
            return {"error": "Need 2 acoustic sensors for correlation"}

        s1, s2 = sensors[0], sensors[1]
        pipe = self.gis.get_pipe(pipe_segment_id)

        # Cross-correlation of acoustic signals
        correlation = self._cross_correlate(
            s1["signal"], s2["signal"], s1["sample_rate"]
        )
        time_delay = correlation["peak_delay_ms"]

        # Calculate leak position
        pipe_length = pipe["length_m"]
        sound_speed = self._get_sound_speed(pipe["material"], pipe["diameter_mm"])

        distance_from_s1 = (
            pipe_length / 2 +
            (time_delay / 1000 * sound_speed) / 2
        )

        return {
            "pipe_segment": pipe_segment_id,
            "distance_from_sensor_1_m": round(distance_from_s1, 1),
            "confidence": correlation["confidence"],
            "pipe_material": pipe["material"],
            "pipe_diameter_mm": pipe["diameter_mm"],
            "gps_location": self.gis.interpolate_position(
                pipe_segment_id, distance_from_s1
            ),
        }
Real-world impact: Thames Water (UK) deployed AI leak detection and reduced leakage by 15% in the first year — saving 100 million liters per day. The acoustic correlation technique can pinpoint leaks to within 1 meter on metallic pipes.

5. Demand Response Agent

Peak electricity demand drives 10–25% of total grid investment despite occurring less than 100 hours per year. Demand response programs that reduce peak load by 5–10% can defer billions in infrastructure spending. The agent coordinates load curtailment across commercial, industrial, and residential customers.

class DemandResponseAgent:
    """Manages demand response events and distributed resources."""

    def __init__(self, customer_db, der_registry, market_api, weather_api):
        self.customers = customer_db
        self.der = der_registry
        self.market = market_api
        self.weather = weather_api

    def trigger_demand_response(self, target_reduction_mw, duration_hours):
        """Orchestrate a demand response event."""
        # Get all enrolled resources sorted by cost
        resources = self.customers.get_dr_enrolled()
        resource_stack = []

        for r in resources:
            curtailable_kw = r["max_curtailment_kw"]
            cost_per_kwh = r.get("incentive_rate", 0.25)
            reliability = r.get("historical_performance", 0.85)

            resource_stack.append({
                "customer_id": r["id"],
                "name": r["name"],
                "type": r["customer_type"],  # commercial, industrial, residential
                "curtailable_kw": curtailable_kw,
                "effective_kw": curtailable_kw * reliability,
                "cost_per_kwh": cost_per_kwh,
                "total_event_cost": curtailable_kw * cost_per_kwh * duration_hours,
            })

        # Sort by cost-effectiveness
        resource_stack.sort(key=lambda r: r["cost_per_kwh"])

        # Stack resources until target met
        dispatched = []
        total_dispatched_kw = 0
        target_kw = target_reduction_mw * 1000

        for resource in resource_stack:
            if total_dispatched_kw >= target_kw * 1.1:  # 10% over-dispatch
                break
            dispatched.append(resource)
            total_dispatched_kw += resource["effective_kw"]

        total_cost = sum(r["total_event_cost"] for r in dispatched)
        avoided_market_cost = target_reduction_mw * self.market.get_peak_price() * duration_hours

        return {
            "target_mw": target_reduction_mw,
            "dispatched_mw": round(total_dispatched_kw / 1000, 2),
            "resources_dispatched": len(dispatched),
            "duration_hours": duration_hours,
            "total_incentive_cost": round(total_cost),
            "avoided_market_cost": round(avoided_market_cost),
            "net_savings": round(avoided_market_cost - total_cost),
            "dispatched_resources": dispatched,
        }

6. Customer Operations Agent

Utility customer centers handle 2–5 million calls per year. 40% are billing inquiries, 25% are outage reports, 15% are service requests. AI agents can handle 70–85% of these interactions autonomously, reducing call center costs by $15–25M per year.

class CustomerOpsAgent:
    """Handles utility customer inquiries autonomously."""

    def __init__(self, billing_db, oms_client, crm_client, llm):
        self.billing = billing_db
        self.oms = oms_client
        self.crm = crm_client
        self.llm = llm

    def handle_inquiry(self, customer_id, inquiry_text):
        """Route and resolve customer inquiries."""
        # Classify intent
        intent = self._classify_intent(inquiry_text)
        customer = self.crm.get_customer(customer_id)

        if intent == "billing_inquiry":
            return self._handle_billing(customer, inquiry_text)
        elif intent == "outage_report":
            return self._handle_outage(customer)
        elif intent == "high_bill":
            return self._handle_high_bill(customer)
        elif intent == "payment_arrangement":
            return self._handle_payment_plan(customer)
        elif intent == "service_request":
            return self._handle_service_request(customer, inquiry_text)
        else:
            return {"action": "escalate_to_agent", "reason": "unclassified_intent"}

    def _handle_high_bill(self, customer):
        """Analyze and explain high bill complaints."""
        current = self.billing.get_current_bill(customer["id"])
        previous = self.billing.get_bill_history(customer["id"], months=12)
        avg_bill = np.mean([b["amount"] for b in previous])

        # Usage analysis
        usage_current = current["usage_kwh"]
        usage_avg = np.mean([b["usage_kwh"] for b in previous])
        usage_increase_pct = (usage_current - usage_avg) / usage_avg * 100

        # Weather impact
        hdd = current.get("heating_degree_days", 0)
        cdd = current.get("cooling_degree_days", 0)
        avg_hdd = np.mean([b.get("heating_degree_days", 0) for b in previous])
        avg_cdd = np.mean([b.get("cooling_degree_days", 0) for b in previous])

        # Rate change check
        rate_changed = current["rate_per_kwh"] != previous[-1]["rate_per_kwh"]

        explanations = []
        if usage_increase_pct > 15:
            if hdd > avg_hdd * 1.2:
                explanations.append(
                    f"Heating demand was {round((hdd/avg_hdd - 1)*100)}% above average due to colder weather"
                )
            elif cdd > avg_cdd * 1.2:
                explanations.append(
                    f"Cooling demand was {round((cdd/avg_cdd - 1)*100)}% above average due to hotter weather"
                )
            else:
                explanations.append(
                    f"Usage increased {round(usage_increase_pct)}% vs your 12-month average"
                )
        if rate_changed:
            explanations.append("A rate adjustment took effect this billing period")

        return {
            "action": "auto_respond",
            "current_bill": current["amount"],
            "average_bill": round(avg_bill, 2),
            "usage_change_pct": round(usage_increase_pct, 1),
            "explanations": explanations,
            "suggestions": [
                "Enroll in budget billing to spread costs evenly",
                "Schedule a free energy audit",
                "Review thermostat settings",
            ],
        }

7. ROI Analysis

AgentAnnual SavingsImplementationPayback
Smart Grid (VVO)$20–50M (loss reduction)$5–10M3–6 months
Outage Prediction$15–40M (avoided outages)$3–6M3–5 months
Water Treatment$5–12M (chemicals + energy)$2–4M4–8 months
Leak Detection$10–30M (water savings)$4–8M4–8 months
Demand Response$30–80M (deferred capex)$5–10M2–4 months
Customer Operations$15–25M (call center)$2–4M2–3 months

Total portfolio: $95–237M in annual savings against $21–42M in implementation costs. The biggest impact comes from demand response (deferred infrastructure) and smart grid optimization (distribution loss reduction).

Build Your Own AI Agent

Get the complete blueprint for building autonomous AI agents — includes templates, security checklists, and deployment guides.

Get The AI Agent Playbook — $19