AI Agent for Forestry & Paper: Automate Forest Inventory, Harvest Planning & Mill Operations

March 28, 2026 15 min read Forestry & Paper

The global forestry and paper industry manages over 4 billion hectares of forest and produces 400 million tonnes of paper annually, yet most operations still rely on manual cruising crews, spreadsheet-based harvest schedules, and reactive mill control loops. A single percentage point of fiber loss in a pulp mill translates to millions in lost revenue, and inaccurate forest inventory leads to over-harvesting, regulatory fines, and stranded capital in road infrastructure that serves depleted stands.

AI agents for forestry and paper operate across the entire value chain, from airborne LiDAR point clouds that map individual tree crowns to real-time digester control systems that predict Kappa number within two units. Unlike dashboard analytics that report what happened, these agents make decisions: selecting which stands to harvest this quarter, routing trucks to the right mill at the right time, and adjusting refiner plate gaps to hit freeness targets without operator intervention.

This guide covers six core areas where AI agents transform integrated forestry-to-mill operations, with production-ready Python code for each. Whether you manage 50,000 hectares of plantation or a multi-line paper mill, these patterns scale to your operation.

Table of Contents

1. Forest Inventory & Growth Modeling

Traditional forest inventory requires ground crews walking systematic plots, measuring diameter at breast height (DBH) with calipers, estimating heights with clinometers, and recording species on paper forms. A 100,000-hectare concession takes months to cruise and costs $8-15 per hectare. Airborne LiDAR combined with multispectral imagery reduces this to days of flight time and delivers wall-to-wall coverage rather than statistical samples. The AI agent processes raw point clouds into actionable inventory data: individual tree detection, species classification, volume estimation, and carbon stock quantification.

The core of LiDAR-based inventory is the Canopy Height Model (CHM), derived by subtracting the Digital Terrain Model (DTM) from the Digital Surface Model (DSM). Local maxima detection on the CHM identifies individual tree tops, and watershed segmentation delineates crown boundaries. From crown diameter and height, the agent estimates DBH using allometric relationships calibrated to regional species. Multispectral bands (especially near-infrared and red-edge) feed a random forest classifier that assigns species labels with 85-92% accuracy across temperate and boreal forests.

Growth and yield modeling predicts future stand conditions using site index curves and the Schumacher yield equation. The agent fits site index from measured dominant height and age, then projects volume, basal area, and stems per hectare forward in 5-year increments. Carbon stock estimation follows IPCC Tier 2 methodology, applying species-specific biomass expansion factors and root-to-shoot ratios to convert merchantable volume into above-ground and below-ground carbon pools. This feeds directly into carbon credit registries and ESG reporting pipelines.

import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional

@dataclass
class LiDARPoint:
    x: float
    y: float
    z: float
    intensity: float
    return_number: int
    classification: int  # 2=ground, 3=low_veg, 5=high_veg

@dataclass
class TreeRecord:
    tree_id: str
    x: float
    y: float
    height_m: float
    crown_diameter_m: float
    dbh_cm: float
    species: str
    volume_m3: float
    carbon_kg: float

@dataclass
class StandRecord:
    stand_id: str
    area_ha: float
    age_years: int
    site_index: float
    species_composition: Dict[str, float]  # {species: proportion}
    stems_per_ha: int
    basal_area_m2_ha: float
    volume_m3_ha: float
    carbon_t_ha: float

class ForestInventoryAgent:
    """AI agent for LiDAR-based inventory, species classification, and growth modeling."""

    CHM_RESOLUTION_M = 0.5
    MIN_TREE_HEIGHT_M = 3.0
    CROWN_SEARCH_RADIUS_M = 2.5
    BIOMASS_EXPANSION_FACTORS = {
        "spruce": 1.30, "pine": 1.25, "fir": 1.32,
        "birch": 1.40, "aspen": 1.45, "eucalyptus": 1.20
    }
    ROOT_SHOOT_RATIOS = {
        "spruce": 0.24, "pine": 0.22, "fir": 0.25,
        "birch": 0.28, "aspen": 0.30, "eucalyptus": 0.20
    }
    WOOD_DENSITY = {
        "spruce": 380, "pine": 420, "fir": 370,
        "birch": 510, "aspen": 370, "eucalyptus": 490
    }  # kg/m3

    def __init__(self, point_cloud: List[LiDARPoint],
                 multispectral_bands: Optional[np.ndarray] = None):
        self.raw_points = point_cloud
        self.spectral = multispectral_bands
        self.ground_points = [p for p in point_cloud if p.classification == 2]
        self.canopy_points = [p for p in point_cloud if p.classification == 5]

    def generate_chm(self, bounds: Tuple[float, float, float, float]) -> np.ndarray:
        """Build Canopy Height Model from classified point cloud."""
        xmin, ymin, xmax, ymax = bounds
        cols = int((xmax - xmin) / self.CHM_RESOLUTION_M)
        rows = int((ymax - ymin) / self.CHM_RESOLUTION_M)

        dsm = np.full((rows, cols), -999.0)
        dtm = np.full((rows, cols), 9999.0)

        for p in self.canopy_points:
            c = int((p.x - xmin) / self.CHM_RESOLUTION_M)
            r = int((p.y - ymin) / self.CHM_RESOLUTION_M)
            if 0 <= r < rows and 0 <= c < cols:
                dsm[r, c] = max(dsm[r, c], p.z)

        for p in self.ground_points:
            c = int((p.x - xmin) / self.CHM_RESOLUTION_M)
            r = int((p.y - ymin) / self.CHM_RESOLUTION_M)
            if 0 <= r < rows and 0 <= c < cols:
                dtm[r, c] = min(dtm[r, c], p.z)

        dtm[dtm == 9999.0] = np.nanmean(dtm[dtm != 9999.0])
        dsm[dsm == -999.0] = dtm[dsm == -999.0]

        chm = np.maximum(dsm - dtm, 0)
        return chm

    def detect_individual_trees(self, chm: np.ndarray,
                                 bounds: Tuple) -> List[TreeRecord]:
        """Local maxima detection + allometric DBH estimation."""
        xmin, ymin = bounds[0], bounds[1]
        trees = []
        radius_px = int(self.CROWN_SEARCH_RADIUS_M / self.CHM_RESOLUTION_M)

        rows, cols = chm.shape
        for r in range(radius_px, rows - radius_px):
            for c in range(radius_px, cols - radius_px):
                height = chm[r, c]
                if height < self.MIN_TREE_HEIGHT_M:
                    continue

                window = chm[r-radius_px:r+radius_px+1,
                             c-radius_px:c+radius_px+1]
                if height >= np.max(window):
                    crown_d = self._estimate_crown_diameter(chm, r, c)
                    dbh = self._allometric_dbh(height, crown_d)
                    species = self._classify_species(r, c)
                    volume = self._stem_volume(dbh, height, species)
                    carbon = self._carbon_stock(volume, species)

                    trees.append(TreeRecord(
                        tree_id=f"T_{r}_{c}",
                        x=xmin + c * self.CHM_RESOLUTION_M,
                        y=ymin + r * self.CHM_RESOLUTION_M,
                        height_m=round(height, 1),
                        crown_diameter_m=round(crown_d, 1),
                        dbh_cm=round(dbh, 1),
                        species=species,
                        volume_m3=round(volume, 3),
                        carbon_kg=round(carbon, 1)
                    ))

        return trees

    def project_growth(self, stand: StandRecord,
                       years_forward: int = 20) -> List[dict]:
        """Schumacher yield model with site index projection."""
        projections = []
        si = stand.site_index
        age = stand.age_years

        for yr in range(5, years_forward + 1, 5):
            future_age = age + yr
            # Schumacher height-age: H = SI * exp(b1 * (1/A - 1/Aref))
            b1 = -18.5  # species-dependent coefficient
            a_ref = 50   # reference age for site index
            dom_height = si * np.exp(b1 * (1/future_age - 1/a_ref))

            # Volume from height and density
            mortality_rate = 0.005 * yr
            future_sph = stand.stems_per_ha * (1 - mortality_rate)
            ba = stand.basal_area_m2_ha * (1 + 0.025 * yr)
            volume = ba * dom_height * 0.42  # form factor

            carbon = volume * 0.5 * 0.38  # density * carbon fraction avg

            projections.append({
                "year": yr,
                "age": future_age,
                "dominant_height_m": round(dom_height, 1),
                "stems_per_ha": int(future_sph),
                "basal_area_m2_ha": round(ba, 1),
                "volume_m3_ha": round(volume, 1),
                "carbon_t_ha": round(carbon, 1),
                "mai_m3_ha_yr": round(volume / future_age, 1)
            })

        return projections

    def _estimate_crown_diameter(self, chm, r, c) -> float:
        peak = chm[r, c]
        threshold = peak * 0.65
        radius = 0
        for d in range(1, 20):
            ring_vals = []
            for dr in range(-d, d+1):
                for dc in range(-d, d+1):
                    if abs(dr) == d or abs(dc) == d:
                        nr, nc = r+dr, c+dc
                        if 0 <= nr < chm.shape[0] and 0 <= nc < chm.shape[1]:
                            ring_vals.append(chm[nr, nc])
            if ring_vals and np.mean(ring_vals) < threshold:
                radius = d
                break
        return radius * 2 * self.CHM_RESOLUTION_M

    def _allometric_dbh(self, height: float, crown_d: float) -> float:
        """DBH from height and crown diameter (generic temperate model)."""
        return 2.0 + 0.85 * height + 1.2 * crown_d

    def _classify_species(self, r: int, c: int) -> str:
        if self.spectral is not None and r < self.spectral.shape[0] and c < self.spectral.shape[1]:
            nir = self.spectral[r, c, 3] if self.spectral.shape[2] > 3 else 0.5
            red_edge = self.spectral[r, c, 4] if self.spectral.shape[2] > 4 else 0.4
            if nir > 0.6 and red_edge > 0.5:
                return "spruce"
            elif nir > 0.5:
                return "pine"
            else:
                return "birch"
        return "spruce"

    def _stem_volume(self, dbh_cm: float, height_m: float, species: str) -> float:
        ba = np.pi * (dbh_cm / 200) ** 2
        form_factor = 0.42 if species in ["spruce", "pine", "fir"] else 0.38
        return ba * height_m * form_factor

    def _carbon_stock(self, volume_m3: float, species: str) -> float:
        density = self.WOOD_DENSITY.get(species, 400)
        bef = self.BIOMASS_EXPANSION_FACTORS.get(species, 1.3)
        rsr = self.ROOT_SHOOT_RATIOS.get(species, 0.25)
        above_ground = volume_m3 * density * bef / 1000
        below_ground = above_ground * rsr
        total_biomass = above_ground + below_ground
        return total_biomass * 0.5 * 1000  # kg carbon
Key insight: LiDAR-derived individual tree detection achieves 85-95% stem count accuracy in plantation forests and 70-80% in natural mixed stands. The critical calibration step is ground-truthing the allometric DBH model with 50-100 field plots, which brings volume estimates within 8-12% of traditional cruising at a fraction of the cost.

2. Harvest Planning & Optimization

Harvest planning is a multi-objective optimization problem that balances timber revenue against access costs, environmental constraints, and long-term forest sustainability. A poorly planned harvest can destroy riparian buffers, trigger slope erosion, or strand valuable timber behind unbuilt road segments. Traditional planning uses experienced foresters reviewing maps and spreadsheets, a process that considers perhaps 10-20 alternative scenarios. An AI agent evaluates thousands of stand-level combinations to find the harvest schedule that maximizes net present value (NPV) while respecting all regulatory and operational constraints.

The agent begins by scoring every eligible stand on harvest priority using NPV per hectare: expected timber revenue minus logging cost, road construction cost (if needed), and haul cost to the nearest mill. Stands within riparian buffers, steep slopes above 35%, or visual quality corridors are automatically excluded or constrained to partial retention systems. The cut-block layout algorithm respects maximum opening size regulations (typically 40-80 hectares depending on jurisdiction), adjacency rules that prevent harvesting neighboring blocks within 3-5 years, and green-up requirements that mandate minimum regeneration height before adjacent blocks can be cut.

Equipment allocation follows the cut-block schedule. Feller-buncher and forwarder teams are routed between blocks to minimize mobilization cost and idle time. Road network optimization uses Dijkstra's algorithm with terrain-weighted edges, where road construction cost per kilometer increases exponentially with slope gradient. The agent identifies the minimum-cost road network that connects all planned harvest blocks to the existing road system, considering both permanent mainline roads and temporary spur roads that will be decommissioned after harvest.

import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import heapq

@dataclass
class HarvestStand:
    stand_id: str
    area_ha: float
    volume_m3_ha: float
    species_mix: Dict[str, float]
    stumpage_value_m3: float
    slope_pct: float
    distance_to_road_km: float
    haul_distance_km: float
    in_riparian_buffer: bool
    in_visual_corridor: bool
    adjacency_group: str
    last_harvest_year: Optional[int]
    regeneration_height_m: float

@dataclass
class Equipment:
    unit_id: str
    equipment_type: str  # "feller_buncher", "forwarder", "processor"
    daily_capacity_m3: float
    mobilization_cost: float
    daily_operating_cost: float
    current_location: str

@dataclass
class RoadSegment:
    from_node: str
    to_node: str
    length_km: float
    slope_pct: float
    construction_cost_per_km: float
    exists: bool

class HarvestPlanningAgent:
    """Optimize harvest scheduling, cut-block layout, and equipment routing."""

    MAX_SLOPE_PCT = 35.0
    MAX_OPENING_HA = 60.0
    ADJACENCY_DELAY_YEARS = 3
    MIN_GREENUP_HEIGHT_M = 3.0
    DISCOUNT_RATE = 0.06
    ROAD_COST_SLOPE_FACTOR = 1.8  # exponential slope penalty

    def __init__(self, stands: List[HarvestStand],
                 equipment: List[Equipment],
                 road_network: List[RoadSegment],
                 current_year: int = 2026):
        self.stands = {s.stand_id: s for s in stands}
        self.equipment = equipment
        self.roads = road_network
        self.current_year = current_year

    def score_stands(self) -> List[dict]:
        """Rank stands by harvest priority using NPV analysis."""
        scored = []

        for sid, stand in self.stands.items():
            if not self._is_eligible(stand):
                continue

            gross_revenue = stand.area_ha * stand.volume_m3_ha * stand.stumpage_value_m3
            logging_cost = self._estimate_logging_cost(stand)
            road_cost = self._estimate_road_cost(stand)
            haul_cost = stand.area_ha * stand.volume_m3_ha * 0.12 * stand.haul_distance_km

            net_revenue = gross_revenue - logging_cost - road_cost - haul_cost
            npv_per_ha = net_revenue / stand.area_ha

            scored.append({
                "stand_id": sid,
                "area_ha": stand.area_ha,
                "volume_m3": round(stand.area_ha * stand.volume_m3_ha, 0),
                "gross_revenue": round(gross_revenue, 0),
                "logging_cost": round(logging_cost, 0),
                "road_cost": round(road_cost, 0),
                "haul_cost": round(haul_cost, 0),
                "net_revenue": round(net_revenue, 0),
                "npv_per_ha": round(npv_per_ha, 0),
                "slope_pct": stand.slope_pct,
                "constraints": self._list_constraints(stand)
            })

        scored.sort(key=lambda s: -s["npv_per_ha"])
        return scored

    def generate_harvest_schedule(self, planning_horizon_years: int = 5,
                                   annual_volume_target_m3: float = 200000
                                   ) -> List[dict]:
        """Build multi-year harvest schedule respecting adjacency and volume."""
        scored = self.score_stands()
        schedule = []
        harvested_groups = {}  # {adjacency_group: harvest_year}
        annual_volumes = {yr: 0 for yr in range(self.current_year,
                          self.current_year + planning_horizon_years)}

        for year in range(self.current_year,
                          self.current_year + planning_horizon_years):
            for stand_info in scored:
                sid = stand_info["stand_id"]
                stand = self.stands[sid]

                if any(s["stand_id"] == sid for s in schedule):
                    continue

                if annual_volumes[year] + stand_info["volume_m3"] > annual_volume_target_m3 * 1.1:
                    continue

                group = stand.adjacency_group
                if group in harvested_groups:
                    if year - harvested_groups[group] < self.ADJACENCY_DELAY_YEARS:
                        continue

                discount = 1 / (1 + self.DISCOUNT_RATE) ** (year - self.current_year)
                schedule.append({
                    "stand_id": sid,
                    "harvest_year": year,
                    "area_ha": stand.area_ha,
                    "volume_m3": stand_info["volume_m3"],
                    "npv": round(stand_info["net_revenue"] * discount, 0),
                    "system": "ground" if stand.slope_pct < 25 else "cable"
                })
                annual_volumes[year] += stand_info["volume_m3"]
                harvested_groups[group] = year

        return schedule

    def allocate_equipment(self, schedule: List[dict]) -> List[dict]:
        """Route equipment teams to harvest blocks minimizing mobilization."""
        assignments = []
        available = {e.unit_id: e for e in self.equipment}

        blocks_by_year = {}
        for entry in schedule:
            yr = entry["harvest_year"]
            blocks_by_year.setdefault(yr, []).append(entry)

        for year, blocks in sorted(blocks_by_year.items()):
            blocks.sort(key=lambda b: -b["volume_m3"])

            for block in blocks:
                best_unit = None
                best_cost = float("inf")

                for uid, equip in available.items():
                    if block["system"] == "cable" and equip.equipment_type == "feller_buncher":
                        continue
                    days_needed = block["volume_m3"] / equip.daily_capacity_m3
                    total_cost = (equip.mobilization_cost +
                                  days_needed * equip.daily_operating_cost)
                    if total_cost < best_cost:
                        best_cost = total_cost
                        best_unit = uid

                if best_unit:
                    equip = available[best_unit]
                    days = block["volume_m3"] / equip.daily_capacity_m3
                    assignments.append({
                        "stand_id": block["stand_id"],
                        "year": year,
                        "equipment_id": best_unit,
                        "equipment_type": equip.equipment_type,
                        "days_required": round(days, 1),
                        "operating_cost": round(best_cost, 0)
                    })

        return assignments

    def optimize_road_network(self, target_stands: List[str]) -> dict:
        """Find minimum-cost road network connecting harvest blocks."""
        graph = {}
        for seg in self.roads:
            cost = seg.length_km * seg.construction_cost_per_km if not seg.exists else 0
            slope_penalty = (1 + seg.slope_pct / 100) ** self.ROAD_COST_SLOPE_FACTOR
            weighted_cost = cost * slope_penalty

            graph.setdefault(seg.from_node, []).append(
                (seg.to_node, weighted_cost, seg.length_km, seg.exists)
            )
            graph.setdefault(seg.to_node, []).append(
                (seg.from_node, weighted_cost, seg.length_km, seg.exists)
            )

        total_cost = 0
        total_new_km = 0
        paths = []

        for stand_id in target_stands:
            dist, prev = self._dijkstra(graph, stand_id, "mill_yard")
            if "mill_yard" in dist:
                path = self._reconstruct_path(prev, stand_id, "mill_yard")
                new_km = sum(
                    seg[2] for seg in self._path_segments(path, graph)
                    if not seg[3]
                )
                paths.append({
                    "stand_id": stand_id,
                    "route_cost": round(dist["mill_yard"], 0),
                    "new_road_km": round(new_km, 1),
                    "path_nodes": path
                })
                total_cost += dist["mill_yard"]
                total_new_km += new_km

        return {
            "total_construction_cost": round(total_cost, 0),
            "total_new_road_km": round(total_new_km, 1),
            "stand_routes": paths
        }

    def _is_eligible(self, stand: HarvestStand) -> bool:
        if stand.slope_pct > self.MAX_SLOPE_PCT and stand.in_riparian_buffer:
            return False
        if stand.in_riparian_buffer:
            return False
        if stand.area_ha > self.MAX_OPENING_HA:
            return True  # will be split into blocks
        return True

    def _estimate_logging_cost(self, stand: HarvestStand) -> float:
        base_cost = 22.0 if stand.slope_pct < 25 else 38.0  # $/m3
        volume = stand.area_ha * stand.volume_m3_ha
        return volume * base_cost

    def _estimate_road_cost(self, stand: HarvestStand) -> float:
        if stand.distance_to_road_km <= 0:
            return 0
        base_per_km = 35000
        slope_factor = (1 + stand.slope_pct / 100) ** self.ROAD_COST_SLOPE_FACTOR
        return stand.distance_to_road_km * base_per_km * slope_factor

    def _list_constraints(self, stand: HarvestStand) -> List[str]:
        constraints = []
        if stand.slope_pct > 25:
            constraints.append("steep_slope_cable_required")
        if stand.in_visual_corridor:
            constraints.append("visual_quality_retention")
        return constraints

    def _dijkstra(self, graph, start, end):
        dist = {start: 0}
        prev = {}
        pq = [(0, start)]
        while pq:
            d, u = heapq.heappop(pq)
            if d > dist.get(u, float("inf")):
                continue
            for v, w, length, exists in graph.get(u, []):
                nd = d + w
                if nd < dist.get(v, float("inf")):
                    dist[v] = nd
                    prev[v] = u
                    heapq.heappush(pq, (nd, v))
        return dist, prev

    def _reconstruct_path(self, prev, start, end):
        path = [end]
        while path[-1] != start and path[-1] in prev:
            path.append(prev[path[-1]])
        return list(reversed(path))

    def _path_segments(self, path, graph):
        segments = []
        for i in range(len(path) - 1):
            for v, w, length, exists in graph.get(path[i], []):
                if v == path[i+1]:
                    segments.append((path[i], v, length, exists))
                    break
        return segments
Key insight: Adjacency constraints and green-up rules are the most common source of infeasible harvest plans. Manual planners often discover conflicts late in the approval process, causing 2-3 month delays. The AI agent enforces all spatial and temporal constraints during plan generation, producing approvable schedules on the first submission 90%+ of the time.

3. Log Scaling & Grade Optimization

Between the stump and the mill, log scaling determines how much fiber volume is captured and at what grade it enters the value chain. Traditional manual scaling relies on a scaler estimating defect and taper visually, introducing 5-10% measurement error. Modern scaling integrates acoustic velocity sensors (which correlate with wood stiffness and structural grade), CT scanning at the log yard, and 3D profile cameras that measure diameter, sweep, and taper at sub-centimeter resolution. The AI agent fuses these data streams to classify each log and determine the optimal bucking pattern.

Bucking optimization is where the largest value recovery gains occur. A single stem can yield dramatically different revenue depending on where it is cut into logs. A 20-meter spruce stem might produce one 5-meter sawlog (structural grade), two 4-meter pulp logs, and 3 meters of waste, or it could be bucked into two 6-meter peeler logs plus one 3-meter sawlog with a defect cutout that more than doubles the stem value. The agent evaluates hundreds of bucking combinations per stem against a price matrix that accounts for log length, small-end diameter, grade, and destination mill requirements.

Sort yard management completes the log-to-mill handoff. The agent assigns each incoming log to a landing sort based on species, grade, and destination mill. It monitors sort pile inventories in real time and triggers truck dispatch when a pile reaches the optimal load volume, minimizing the number of sorts (which drives yard space requirements) while maintaining mill fiber quality specifications.

import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional

@dataclass
class StemProfile:
    stem_id: str
    species: str
    total_length_m: float
    diameters_cm: List[Tuple[float, float]]  # [(position_m, diameter_cm)]
    acoustic_velocity_m_s: float
    defects: List[Dict]  # [{"position_m": 4.2, "length_m": 0.8, "type": "rot"}]

@dataclass
class LogGrade:
    grade: str          # "structural", "peeler", "sawlog", "pulp", "chip"
    min_sed_cm: float   # small-end diameter
    max_sweep_pct: float
    min_length_m: float
    max_length_m: float
    min_acoustic_vel: float
    price_per_m3: float

@dataclass
class SortYardPile:
    pile_id: str
    species: str
    grade: str
    destination_mill: str
    current_volume_m3: float
    target_volume_m3: float  # triggers truck dispatch

class LogScalingAgent:
    """Log grading, bucking optimization, and sort yard management."""

    CT_DEFECT_CONFIDENCE = 0.88
    PROFILE_CAMERA_ACCURACY_CM = 0.3
    KERF_LOSS_M = 0.005

    def __init__(self, grade_matrix: List[LogGrade],
                 sort_piles: List[SortYardPile]):
        self.grades = sorted(grade_matrix, key=lambda g: -g.price_per_m3)
        self.piles = {p.pile_id: p for p in sort_piles}

    def grade_log(self, stem: StemProfile, position_start: float,
                  length: float) -> dict:
        """Grade a log segment using acoustic velocity and defect data."""
        sed = self._diameter_at(stem, position_start + length)
        led = self._diameter_at(stem, position_start)
        mid_d = self._diameter_at(stem, position_start + length / 2)
        sweep = self._calculate_sweep(stem, position_start, length, mid_d)
        volume = self._smalian_volume(sed, led, length)

        has_defect = any(
            d["position_m"] >= position_start and
            d["position_m"] <= position_start + length
            for d in stem.defects
        )

        assigned_grade = "chip"
        price = 0

        for grade in self.grades:
            if (sed >= grade.min_sed_cm and
                length >= grade.min_length_m and
                length <= grade.max_length_m and
                sweep <= grade.max_sweep_pct and
                stem.acoustic_velocity_m_s >= grade.min_acoustic_vel and
                not (has_defect and grade.grade in ["structural", "peeler"])):
                assigned_grade = grade.grade
                price = grade.price_per_m3
                break

        return {
            "grade": assigned_grade,
            "sed_cm": round(sed, 1),
            "led_cm": round(led, 1),
            "length_m": length,
            "volume_m3": round(volume, 3),
            "sweep_pct": round(sweep, 1),
            "acoustic_vel": stem.acoustic_velocity_m_s,
            "has_defect": has_defect,
            "value_usd": round(volume * price, 2)
        }

    def optimize_bucking(self, stem: StemProfile) -> dict:
        """Dynamic programming to maximize total stem value."""
        usable_length = stem.total_length_m
        positions = np.arange(0, usable_length + 0.1, 0.1)
        n = len(positions)

        # DP table: best value achievable from position i to end
        dp_value = np.zeros(n)
        dp_cuts = [[] for _ in range(n)]

        for i in range(n - 1, -1, -1):
            best_val = 0
            best_cuts = []

            for grade in self.grades:
                for length in np.arange(grade.min_length_m,
                                         min(grade.max_length_m, usable_length - positions[i]) + 0.1,
                                         0.5):
                    end_pos = positions[i] + length
                    end_idx = int(round(end_pos / 0.1))
                    if end_idx >= n:
                        continue

                    log_info = self.grade_log(stem, positions[i], length)
                    remaining = dp_value[min(end_idx + 1, n - 1)] if end_idx + 1 < n else 0
                    total = log_info["value_usd"] + remaining

                    if total > best_val:
                        best_val = total
                        rest_cuts = dp_cuts[min(end_idx + 1, n - 1)] if end_idx + 1 < n else []
                        best_cuts = [log_info] + rest_cuts

            dp_value[i] = best_val
            dp_cuts[i] = best_cuts

        naive_value = self._naive_bucking_value(stem)

        return {
            "stem_id": stem.stem_id,
            "species": stem.species,
            "total_length_m": stem.total_length_m,
            "optimized_cuts": dp_cuts[0],
            "optimized_value_usd": round(dp_value[0], 2),
            "naive_value_usd": round(naive_value, 2),
            "value_uplift_pct": round(
                (dp_value[0] - naive_value) / max(naive_value, 1) * 100, 1
            ),
            "log_count": len(dp_cuts[0])
        }

    def manage_sort_yard(self, incoming_logs: List[dict]) -> dict:
        """Assign logs to sort piles and trigger truck dispatch."""
        assignments = []
        dispatch_ready = []

        for log in incoming_logs:
            best_pile = None
            for pid, pile in self.piles.items():
                if (pile.species == log["species"] and
                    pile.grade == log["grade"] and
                    pile.current_volume_m3 < pile.target_volume_m3 * 1.2):
                    best_pile = pid
                    break

            if best_pile:
                self.piles[best_pile].current_volume_m3 += log["volume_m3"]
                assignments.append({
                    "log_id": log.get("stem_id", "unknown"),
                    "pile": best_pile,
                    "destination": self.piles[best_pile].destination_mill
                })

                if self.piles[best_pile].current_volume_m3 >= self.piles[best_pile].target_volume_m3:
                    dispatch_ready.append({
                        "pile_id": best_pile,
                        "volume_m3": round(self.piles[best_pile].current_volume_m3, 1),
                        "destination": self.piles[best_pile].destination_mill,
                        "action": "dispatch_truck"
                    })

        return {
            "logs_sorted": len(assignments),
            "assignments": assignments,
            "dispatch_orders": dispatch_ready,
            "pile_status": {
                pid: {
                    "volume_m3": round(p.current_volume_m3, 1),
                    "fill_pct": round(p.current_volume_m3 / p.target_volume_m3 * 100, 1)
                }
                for pid, p in self.piles.items()
            }
        }

    def _diameter_at(self, stem: StemProfile, position: float) -> float:
        diam = stem.diameters_cm
        for i in range(len(diam) - 1):
            if diam[i][0] <= position <= diam[i+1][0]:
                ratio = (position - diam[i][0]) / (diam[i+1][0] - diam[i][0])
                return diam[i][1] + ratio * (diam[i+1][1] - diam[i][1])
        return diam[-1][1] if diam else 20.0

    def _calculate_sweep(self, stem, start, length, mid_d) -> float:
        expected_mid = (self._diameter_at(stem, start) +
                        self._diameter_at(stem, start + length)) / 2
        return abs(mid_d - expected_mid) / max(expected_mid, 1) * 100

    def _smalian_volume(self, sed_cm, led_cm, length_m) -> float:
        a1 = np.pi * (sed_cm / 200) ** 2
        a2 = np.pi * (led_cm / 200) ** 2
        return (a1 + a2) / 2 * length_m

    def _naive_bucking_value(self, stem: StemProfile) -> float:
        standard_length = 5.0
        value = 0
        pos = 0
        while pos + standard_length <= stem.total_length_m:
            log = self.grade_log(stem, pos, standard_length)
            value += log["value_usd"]
            pos += standard_length + self.KERF_LOSS_M
        return value
Key insight: Optimized bucking consistently delivers 8-15% more value per stem than fixed-length bucking patterns. On a harvest of 200,000 m3/year, that translates to $1.5-3M in additional revenue with zero additional fiber input. The key is real-time price matrix updates from mill demand signals, so the agent adjusts cut patterns as sawlog premiums shift relative to pulp log prices.

4. Pulp & Paper Mill Process Control

Pulp and paper mills are among the most energy-intensive manufacturing operations, and process variability directly erodes margin. In kraft pulping, the digester must cook wood chips to a target Kappa number (a measure of residual lignin, typically 25-35 for softwood) while avoiding over-cooking that degrades fiber strength and under-cooking that increases bleach chemical consumption. The Kappa number depends on chip species mix, moisture content, liquor-to-wood ratio, cooking temperature, and H-factor (a time-temperature integral). Traditional control relies on operator experience and periodic lab samples with a 2-4 hour lag. An AI agent predicts Kappa in real time from process sensors and adjusts cooking parameters to hold the target within 2 Kappa units.

Downstream, the refining stage converts pulp into a mat of fibrillated fibers suitable for papermaking. The agent controls refiner plate gap and power to achieve a target Canadian Standard Freeness (CSF), which correlates with drainage rate on the paper machine wire. Too much refining wastes energy and produces a slow-draining sheet; too little yields a weak, porous sheet. The agent uses a transfer function model that relates specific refining energy (SRE in kWh/t) to CSF drop, accounting for incoming fiber characteristics that change with every chip supply shift.

On the paper machine itself, cross-direction (CD) and machine-direction (MD) control maintain basis weight, moisture, and caliper within customer specifications. The CD profile is controlled by dilution headbox slice lip actuators spaced every 50-75mm across the sheet width. The agent learns the response matrix mapping each actuator to the resulting basis weight profile, then computes optimal actuator setpoints every scan cycle (2-4 seconds). Broke recovery optimization rounds out mill control: the agent decides when to recycle off-specification sheet material versus sending it to the repulper, minimizing fiber loss while maintaining product quality.

import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
from collections import deque

@dataclass
class DigestorState:
    timestamp: float
    temperature_c: float
    pressure_kpa: float
    liquor_to_wood: float
    effective_alkali_pct: float
    chip_moisture_pct: float
    chip_species_mix: Dict[str, float]
    h_factor: float
    cooking_time_min: float

@dataclass
class RefinerState:
    motor_power_kw: float
    plate_gap_mm: float
    consistency_pct: float
    flow_rate_lpm: float
    inlet_csf_ml: float
    specific_energy_kwh_t: float

@dataclass
class PaperMachineState:
    speed_mpm: float
    basis_weight_gsm: List[float]   # CD profile
    moisture_pct: List[float]       # CD profile
    caliper_um: List[float]         # CD profile
    headbox_actuators: List[float]  # dilution valve positions
    steam_pressure_kpa: float
    wire_retention_pct: float

class MillProcessAgent:
    """Pulp mill digester control, refiner optimization, and paper machine CD/MD."""

    KAPPA_TARGET = 30.0
    KAPPA_TOLERANCE = 2.0
    CSF_TARGET = 450       # ml
    CSF_TOLERANCE = 25
    BASIS_WEIGHT_SPEC = 80.0  # g/m2
    MOISTURE_SPEC = 5.5       # %
    CALIPER_SPEC = 105        # micrometers

    def __init__(self):
        self.kappa_history = deque(maxlen=500)
        self.refiner_history = deque(maxlen=200)
        self.cd_response_matrix = None

    def predict_kappa(self, state: DigestorState) -> dict:
        """Predict Kappa number from digester process variables."""
        # Empirical Kappa model: Kappa = f(H-factor, EA, L:W, species)
        species_factor = sum(
            self._species_kappa_coefficient(sp) * frac
            for sp, frac in state.chip_species_mix.items()
        )
        moisture_correction = 1 + (state.chip_moisture_pct - 45) * 0.008

        predicted_kappa = (
            species_factor
            * 180 * np.exp(-0.0012 * state.h_factor)
            * (state.effective_alkali_pct / 18) ** -0.35
            * (state.liquor_to_wood / 4.0) ** -0.15
            * moisture_correction
        )

        deviation = predicted_kappa - self.KAPPA_TARGET
        action = "hold"
        adjustments = {}

        if abs(deviation) > self.KAPPA_TOLERANCE:
            if deviation > 0:  # under-cooked
                temp_increase = min(deviation * 0.8, 5.0)
                adjustments = {
                    "temperature_delta_c": round(temp_increase, 1),
                    "extend_cook_min": round(deviation * 3, 0),
                    "reason": "Kappa above target, increase cooking severity"
                }
                action = "increase_severity"
            else:  # over-cooked
                temp_decrease = min(abs(deviation) * 0.8, 5.0)
                adjustments = {
                    "temperature_delta_c": round(-temp_decrease, 1),
                    "reduce_cook_min": round(abs(deviation) * 3, 0),
                    "reason": "Kappa below target, reduce cooking severity"
                }
                action = "decrease_severity"

        result = {
            "predicted_kappa": round(predicted_kappa, 1),
            "target": self.KAPPA_TARGET,
            "deviation": round(deviation, 1),
            "h_factor": round(state.h_factor, 0),
            "action": action,
            "adjustments": adjustments,
            "confidence": "high" if state.cooking_time_min > 60 else "medium"
        }
        self.kappa_history.append(result)
        return result

    def optimize_refining(self, state: RefinerState,
                           target_csf: Optional[float] = None) -> dict:
        """Control refiner to hit CSF target with minimum energy."""
        target = target_csf or self.CSF_TARGET
        production_t_h = (state.flow_rate_lpm * state.consistency_pct / 100
                          * 60 / 1000)

        # Transfer function: delta_CSF = k * delta_SRE
        k_csf_sre = -3.2   # ml CSF per kWh/t (species-dependent)
        current_csf_estimate = state.inlet_csf_ml + k_csf_sre * state.specific_energy_kwh_t

        csf_error = current_csf_estimate - target
        action = "hold"
        adjustments = {}

        if abs(csf_error) > self.CSF_TOLERANCE:
            sre_delta = -csf_error / k_csf_sre
            new_sre = state.specific_energy_kwh_t + sre_delta
            new_power = new_sre * production_t_h

            gap_delta = sre_delta * 0.02  # mm per kWh/t
            new_gap = max(0.1, state.plate_gap_mm + gap_delta)

            adjustments = {
                "plate_gap_mm": round(new_gap, 2),
                "target_power_kw": round(new_power, 0),
                "sre_target_kwh_t": round(new_sre, 1),
                "energy_cost_delta_pct": round(sre_delta / max(state.specific_energy_kwh_t, 1) * 100, 1)
            }
            action = "increase_refining" if csf_error > 0 else "decrease_refining"

        result = {
            "estimated_csf_ml": round(current_csf_estimate, 0),
            "target_csf_ml": target,
            "csf_error": round(csf_error, 0),
            "current_sre_kwh_t": round(state.specific_energy_kwh_t, 1),
            "production_t_h": round(production_t_h, 1),
            "action": action,
            "adjustments": adjustments
        }
        self.refiner_history.append(result)
        return result

    def control_paper_machine_cd(self, state: PaperMachineState) -> dict:
        """Cross-direction profile control for basis weight, moisture, caliper."""
        n_zones = len(state.basis_weight_gsm)

        if self.cd_response_matrix is None:
            self.cd_response_matrix = self._build_response_matrix(n_zones)

        bw_errors = [bw - self.BASIS_WEIGHT_SPEC for bw in state.basis_weight_gsm]
        moisture_errors = [m - self.MOISTURE_SPEC for m in state.moisture_pct]

        bw_std = np.std(state.basis_weight_gsm)
        moisture_std = np.std(state.moisture_pct)
        caliper_std = np.std(state.caliper_um)

        # Compute actuator corrections using pseudo-inverse
        R = np.array(self.cd_response_matrix)
        bw_err_vec = np.array(bw_errors)
        corrections = -np.linalg.lstsq(R, bw_err_vec, rcond=None)[0]
        corrections = np.clip(corrections, -0.5, 0.5)

        new_actuators = [
            round(state.headbox_actuators[i] + corrections[i], 3)
            for i in range(min(len(corrections), len(state.headbox_actuators)))
        ]

        return {
            "basis_weight": {
                "mean_gsm": round(np.mean(state.basis_weight_gsm), 2),
                "std_gsm": round(bw_std, 3),
                "two_sigma_pct": round(2 * bw_std / self.BASIS_WEIGHT_SPEC * 100, 2),
                "target": self.BASIS_WEIGHT_SPEC
            },
            "moisture": {
                "mean_pct": round(np.mean(state.moisture_pct), 2),
                "std_pct": round(moisture_std, 3),
                "target": self.MOISTURE_SPEC
            },
            "caliper": {
                "mean_um": round(np.mean(state.caliper_um), 1),
                "std_um": round(caliper_std, 2),
                "target": self.CALIPER_SPEC
            },
            "actuator_corrections": [round(c, 4) for c in corrections[:10]],
            "max_correction": round(float(np.max(np.abs(corrections))), 4),
            "broke_risk_pct": round(
                min(bw_std / self.BASIS_WEIGHT_SPEC * 500, 100), 1
            )
        }

    def evaluate_broke_recovery(self, off_spec_gsm: float,
                                  off_spec_moisture: float,
                                  broke_system_capacity_pct: float) -> dict:
        """Decide whether to recycle off-spec sheet or reject it."""
        bw_deviation = abs(off_spec_gsm - self.BASIS_WEIGHT_SPEC)
        moisture_deviation = abs(off_spec_moisture - self.MOISTURE_SPEC)

        bw_score = bw_deviation / self.BASIS_WEIGHT_SPEC * 100
        moisture_score = moisture_deviation / self.MOISTURE_SPEC * 100

        if broke_system_capacity_pct > 90:
            action = "reject_to_repulper"
            reason = "Broke system near capacity"
        elif bw_score < 3 and moisture_score < 8:
            action = "blend_inline"
            blend_ratio = min(0.15, 0.05 + bw_score * 0.02)
            reason = f"Minor deviation, blend at {blend_ratio:.0%} ratio"
        elif bw_score < 8:
            action = "recycle_to_broke_chest"
            reason = "Moderate deviation, recycle through broke system"
        else:
            action = "reject_to_repulper"
            reason = "Excessive deviation, full repulping required"

        fiber_value_per_tonne = 280
        recovery_cost = 15 if action == "blend_inline" else 45 if action == "recycle_to_broke_chest" else 85

        return {
            "off_spec_bw_gsm": off_spec_gsm,
            "off_spec_moisture_pct": off_spec_moisture,
            "bw_deviation_pct": round(bw_score, 1),
            "moisture_deviation_pct": round(moisture_score, 1),
            "action": action,
            "reason": reason,
            "recovery_cost_per_tonne": recovery_cost,
            "fiber_value_saved_per_tonne": fiber_value_per_tonne - recovery_cost,
            "broke_system_load_pct": broke_system_capacity_pct
        }

    def _species_kappa_coefficient(self, species: str) -> float:
        coefficients = {
            "spruce": 1.0, "pine": 1.05, "fir": 0.98,
            "birch": 0.82, "aspen": 0.78, "eucalyptus": 0.75
        }
        return coefficients.get(species, 1.0)

    def _build_response_matrix(self, n_zones: int) -> List[List[float]]:
        """Build CD actuator-to-profile response matrix (Gaussian kernel)."""
        matrix = []
        for i in range(n_zones):
            row = []
            for j in range(n_zones):
                distance = abs(i - j)
                response = np.exp(-distance**2 / (2 * 3.0**2))
                row.append(round(response, 4))
            matrix.append(row)
        return matrix
Key insight: Real-time Kappa prediction eliminates the 2-4 hour lab sample lag that causes 30-40% of kraft pulp quality excursions. By predicting Kappa from H-factor, effective alkali, and chip moisture continuously, the agent holds cooking variability within 2 Kappa units versus 5-8 units under manual control, reducing bleach chemical cost by 10-15%.

5. Supply Chain & Fiber Logistics

The forestry supply chain is uniquely challenging because the raw material is dispersed across thousands of square kilometers of forest, varies in species, moisture, and quality by stand, and must arrive at the mill in the right species mix at the right moisture content at the right time. A typical integrated company sources from its own timberlands, Crown tenure (in Canada) or state forests, private woodlot purchases, and chip supply agreements with independent sawmills. The AI agent optimizes across this entire wood basket, balancing stumpage cost, haul cost, species mix requirements, and seasonal access constraints (spring breakup, fire closures, winter-only access roads).

Truck dispatch is the real-time execution layer of the supply chain. The agent tracks every truck via GPS, knows current inventory levels at each sort, and understands mill demand by the hour. When a truck becomes available at a landing, the agent assigns it to the sort-mill combination that minimizes total delivered cost while maintaining mill inventory targets. This replaces the radio-dispatch model where drivers call in and a dispatcher makes ad-hoc decisions, reducing empty backhaul miles by 15-25% and ensuring the mill never runs short on its critical species.

Chip pile and log yard inventory management ensures the mill has a buffer against supply disruptions without tying up excessive working capital. The agent monitors pile volumes using drone photogrammetry or LiDAR, tracks chip degradation rates (which increase with pile age and moisture), and recommends first-in-first-out rotation to minimize fiber quality loss. Fiber allocation across products decides which fiber goes to which paper grade, maximizing the spread between raw material cost and finished product value. The agent solves a linear program that allocates each fiber source to the product grade where its quality characteristics (fiber length, coarseness, brightness) command the highest premium.

import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta

@dataclass
class WoodSource:
    source_id: str
    source_type: str        # "own_timberland", "crown_tenure", "purchase", "chip_supply"
    species: str
    available_volume_m3: float
    stumpage_cost_m3: float
    haul_distance_km: float
    haul_cost_per_m3_km: float
    moisture_pct: float
    seasonal_access: List[str]   # ["jan","feb","mar","nov","dec"] for winter-only
    fiber_length_mm: float
    coarseness_mg_m: float

@dataclass
class Truck:
    truck_id: str
    capacity_m3: float
    current_lat: float
    current_lon: float
    status: str            # "loaded", "empty", "loading", "unloading"
    current_sort: Optional[str]
    eta_minutes: Optional[int]

@dataclass
class MillDemand:
    mill_id: str
    species_targets: Dict[str, float]  # {species: m3_per_day}
    chip_inventory_hours: float
    log_yard_inventory_days: float
    max_inventory_days: float
    products: List[Dict]   # [{"grade": "kraft_liner", "fiber_requirements": {...}}]

class FiberLogisticsAgent:
    """Wood basket optimization, truck dispatch, and fiber allocation."""

    HAUL_COST_PER_KM = 0.12    # $/m3/km average
    CHIP_DEGRADATION_PCT_DAY = 0.15
    MIN_INVENTORY_HOURS = 8
    TARGET_INVENTORY_HOURS = 48
    EMPTY_BACKHAUL_COST = 2.50  # $/km

    def __init__(self, sources: List[WoodSource], trucks: List[Truck],
                 mill: MillDemand, current_month: str = "mar"):
        self.sources = {s.source_id: s for s in sources}
        self.trucks = {t.truck_id: t for t in trucks}
        self.mill = mill
        self.current_month = current_month

    def optimize_wood_basket(self, monthly_demand_m3: float) -> dict:
        """Select wood sources to minimize delivered cost while meeting species mix."""
        eligible = []
        for sid, src in self.sources.items():
            if self.current_month not in src.seasonal_access and src.seasonal_access:
                continue
            delivered_cost = (src.stumpage_cost_m3 +
                              src.haul_distance_km * src.haul_cost_per_m3_km)
            eligible.append({
                "source_id": sid,
                "species": src.species,
                "available_m3": src.available_volume_m3,
                "delivered_cost_m3": round(delivered_cost, 2),
                "haul_km": src.haul_distance_km,
                "fiber_length_mm": src.fiber_length_mm
            })

        eligible.sort(key=lambda s: s["delivered_cost_m3"])

        allocation = []
        remaining = monthly_demand_m3
        species_allocated = {}
        total_cost = 0

        for source in eligible:
            if remaining <= 0:
                break
            take = min(source["available_m3"], remaining)
            allocation.append({
                "source_id": source["source_id"],
                "species": source["species"],
                "volume_m3": round(take, 0),
                "delivered_cost_m3": source["delivered_cost_m3"],
                "total_cost": round(take * source["delivered_cost_m3"], 0)
            })
            species_allocated[source["species"]] = (
                species_allocated.get(source["species"], 0) + take
            )
            total_cost += take * source["delivered_cost_m3"]
            remaining -= take

        species_targets = self.mill.species_targets
        species_balance = {}
        for sp, target in species_targets.items():
            actual = species_allocated.get(sp, 0)
            monthly_target = target * 30
            species_balance[sp] = {
                "target_m3": round(monthly_target, 0),
                "allocated_m3": round(actual, 0),
                "balance_pct": round(actual / max(monthly_target, 1) * 100, 1)
            }

        return {
            "monthly_demand_m3": monthly_demand_m3,
            "total_allocated_m3": round(monthly_demand_m3 - remaining, 0),
            "shortfall_m3": round(max(remaining, 0), 0),
            "weighted_avg_cost_m3": round(
                total_cost / max(monthly_demand_m3 - remaining, 1), 2
            ),
            "total_cost": round(total_cost, 0),
            "allocation": allocation,
            "species_balance": species_balance,
            "sources_used": len(allocation)
        }

    def dispatch_truck(self, truck_id: str) -> dict:
        """Assign empty truck to optimal pickup-delivery route."""
        truck = self.trucks[truck_id]

        best_assignment = None
        best_cost = float("inf")

        for sid, source in self.sources.items():
            if self.current_month not in source.seasonal_access and source.seasonal_access:
                continue
            if source.available_volume_m3 < truck.capacity_m3:
                continue

            pickup_distance = self._estimate_distance(
                truck.current_lat, truck.current_lon, sid
            )
            delivery_distance = source.haul_distance_km
            total_distance = pickup_distance + delivery_distance

            load_value = truck.capacity_m3 * source.stumpage_cost_m3
            haul_cost = total_distance * self.HAUL_COST_PER_KM * truck.capacity_m3
            empty_cost = pickup_distance * self.EMPTY_BACKHAUL_COST

            species = source.species
            mill_need = self.mill.species_targets.get(species, 0)
            urgency = 2.0 if self.mill.chip_inventory_hours < self.MIN_INVENTORY_HOURS else 1.0
            species_bonus = mill_need * urgency * -50

            cost = haul_cost + empty_cost + species_bonus

            if cost < best_cost:
                best_cost = cost
                best_assignment = {
                    "truck_id": truck_id,
                    "pickup_source": sid,
                    "species": species,
                    "volume_m3": truck.capacity_m3,
                    "pickup_distance_km": round(pickup_distance, 1),
                    "delivery_distance_km": round(delivery_distance, 1),
                    "estimated_cost": round(haul_cost + empty_cost, 0),
                    "estimated_arrival_hours": round(
                        (pickup_distance + delivery_distance) / 50, 1
                    ),
                    "mill_inventory_hours": self.mill.chip_inventory_hours,
                    "urgency": "critical" if urgency > 1 else "normal"
                }

        return best_assignment or {"truck_id": truck_id, "action": "wait", "reason": "No suitable source"}

    def allocate_fiber_to_products(self) -> dict:
        """Assign fiber sources to paper grades for maximum margin."""
        allocations = []
        total_margin = 0

        products_sorted = sorted(
            self.mill.products,
            key=lambda p: p.get("selling_price_t", 0),
            reverse=True
        )

        used_sources = set()
        for product in products_sorted:
            grade = product.get("grade", "unknown")
            min_fiber_length = product.get("min_fiber_length_mm", 0)
            demand_m3 = product.get("monthly_demand_m3", 0)

            allocated = 0
            for sid, src in self.sources.items():
                if sid in used_sources:
                    continue
                if src.fiber_length_mm < min_fiber_length:
                    continue

                take = min(src.available_volume_m3, demand_m3 - allocated)
                if take <= 0:
                    continue

                delivered_cost = src.stumpage_cost_m3 + src.haul_distance_km * src.haul_cost_per_m3_km
                product_value = product.get("selling_price_t", 800) * 0.45
                margin = product_value - delivered_cost

                allocations.append({
                    "product_grade": grade,
                    "source_id": sid,
                    "species": src.species,
                    "volume_m3": round(take, 0),
                    "delivered_cost_m3": round(delivered_cost, 2),
                    "product_value_m3": round(product_value, 2),
                    "margin_m3": round(margin, 2)
                })
                total_margin += margin * take
                allocated += take

        return {
            "allocations": allocations,
            "total_monthly_margin": round(total_margin, 0),
            "products_allocated": len(set(a["product_grade"] for a in allocations)),
            "sources_utilized": len(set(a["source_id"] for a in allocations))
        }

    def _estimate_distance(self, lat, lon, source_id) -> float:
        np.random.seed(hash(source_id) % 2**32)
        return 15 + np.random.uniform(0, 80)
Key insight: Wood basket optimization with seasonal access constraints prevents the most costly supply chain failure in northern forestry: running out of accessible wood during spring breakup when 40-60% of roads are load-restricted. The agent pre-builds inventory from winter-access sources in Q1 to carry the mill through the 6-8 week breakup period without production curtailment.

6. ROI Analysis: Integrated Forestry Company

The business case for AI agents in forestry and paper must account for improvements across the entire value chain, from standing timber through finished product. Below is a detailed breakdown for a mid-size integrated company managing 200,000 hectares of timberland, harvesting 500,000 m3/year, and operating a kraft pulp mill producing 300,000 tonnes of market pulp and a paper machine producing 150,000 tonnes of containerboard annually.

Assumptions

Category Improvement Annual Savings (Company)
Forest Inventory Efficiency 80% reduction in cruising cost, better volume estimates $1,200,000 - $1,800,000
Harvest Planning Optimization 5-8% NPV improvement on harvest schedule $2,250,000 - $3,600,000
Bucking & Log Grade Optimization 8-15% value recovery uplift per stem $3,600,000 - $6,750,000
Pulp Mill Process Control Kappa variability reduction, 10-15% bleach savings $2,800,000 - $4,200,000
Paper Machine CD/MD Control 50% broke reduction, tighter caliper specs $3,200,000 - $5,400,000
Refiner Energy Optimization 8-12% specific energy reduction $1,400,000 - $2,100,000
Transportation & Logistics 15-25% empty mile reduction, optimal dispatch $1,800,000 - $3,000,000
Fiber Allocation & Carbon Credits Species-to-product optimization, verified carbon offsets $1,750,000 - $3,150,000
Total Annual Savings $18,000,000 - $30,000,000

Implementation Cost vs. Return

from dataclasses import dataclass
from typing import Dict

@dataclass
class ForestryROIModel:
    """Calculate ROI for AI agent deployment across integrated forestry company."""

    timberland_ha: int = 200000
    annual_harvest_m3: int = 500000
    pulp_production_t: int = 300000
    paper_production_t: int = 150000
    avg_stumpage_m3: float = 45.0
    avg_delivered_cost_m3: float = 72.0
    pulp_price_t: float = 680.0
    paper_price_t: float = 850.0
    truck_fleet_size: int = 45
    annual_energy_cost: float = 28_000_000
    cruising_cost_ha: float = 12.0

    def calculate_inventory_savings(self, lidar_cost_ha: float = 2.5,
                                      volume_accuracy_gain_pct: float = 0.08) -> dict:
        cruising_current = self.timberland_ha * self.cruising_cost_ha / 5  # 5yr cycle
        lidar_cost = self.timberland_ha * lidar_cost_ha / 5
        inventory_savings = cruising_current - lidar_cost
        volume_accuracy_value = (self.annual_harvest_m3 *
                                  self.avg_stumpage_m3 * volume_accuracy_gain_pct)
        return {
            "cruising_cost_saved": round(inventory_savings, 0),
            "volume_accuracy_value": round(volume_accuracy_value, 0),
            "total": round(inventory_savings + volume_accuracy_value, 0)
        }

    def calculate_harvest_optimization(self, npv_gain_pct: float = 0.065) -> dict:
        harvest_revenue = self.annual_harvest_m3 * self.avg_stumpage_m3
        npv_gain = harvest_revenue * npv_gain_pct
        road_savings = self.annual_harvest_m3 * 0.8  # $/m3 road optimization
        return {
            "npv_improvement": round(npv_gain, 0),
            "road_savings": round(road_savings, 0),
            "total": round(npv_gain + road_savings, 0)
        }

    def calculate_bucking_value(self, uplift_pct: float = 0.11) -> dict:
        current_value = self.annual_harvest_m3 * self.avg_stumpage_m3
        uplift = current_value * uplift_pct
        return {
            "current_stem_value": round(current_value, 0),
            "optimized_value": round(current_value + uplift, 0),
            "value_uplift": round(uplift, 0),
            "uplift_pct": round(uplift_pct * 100, 1)
        }

    def calculate_mill_savings(self, kappa_bleach_savings_pct: float = 0.12,
                                 broke_reduction_pct: float = 0.50,
                                 energy_reduction_pct: float = 0.10) -> dict:
        bleach_cost = self.pulp_production_t * 35  # $/t bleach chemicals
        bleach_savings = bleach_cost * kappa_bleach_savings_pct

        broke_rate = 0.04  # 4% current broke rate
        broke_value = (self.paper_production_t * broke_rate *
                        self.paper_price_t * 0.3)  # 30% value loss
        broke_savings = broke_value * broke_reduction_pct

        energy_savings = self.annual_energy_cost * energy_reduction_pct

        return {
            "bleach_savings": round(bleach_savings, 0),
            "broke_savings": round(broke_savings, 0),
            "energy_savings": round(energy_savings, 0),
            "total": round(bleach_savings + broke_savings + energy_savings, 0)
        }

    def calculate_logistics_savings(self, empty_mile_reduction: float = 0.20,
                                      dispatch_efficiency: float = 0.12) -> dict:
        annual_haul_cost = self.annual_harvest_m3 * (
            self.avg_delivered_cost_m3 - self.avg_stumpage_m3
        )
        empty_mile_savings = annual_haul_cost * 0.15 * empty_mile_reduction
        dispatch_savings = annual_haul_cost * dispatch_efficiency
        return {
            "empty_mile_savings": round(empty_mile_savings, 0),
            "dispatch_optimization": round(dispatch_savings, 0),
            "total": round(empty_mile_savings + dispatch_savings, 0)
        }

    def full_roi_analysis(self) -> dict:
        inventory = self.calculate_inventory_savings()
        harvest = self.calculate_harvest_optimization()
        bucking = self.calculate_bucking_value()
        mill = self.calculate_mill_savings()
        logistics = self.calculate_logistics_savings()

        setup_cost = 850000
        annual_license = 480000
        annual_support = 220000
        lidar_annual = self.timberland_ha * 2.5 / 5
        total_annual_cost = annual_license + annual_support + lidar_annual
        total_year1_cost = setup_cost + total_annual_cost

        total_annual_benefit = (
            inventory["total"] + harvest["total"] +
            bucking["value_uplift"] + mill["total"] +
            logistics["total"]
        )

        roi_year1 = ((total_annual_benefit - total_year1_cost)
                      / total_year1_cost) * 100
        roi_year2 = ((total_annual_benefit - total_annual_cost)
                      / total_annual_cost) * 100
        payback_months = (total_year1_cost / total_annual_benefit) * 12

        return {
            "company_profile": {
                "timberland_ha": self.timberland_ha,
                "annual_harvest_m3": self.annual_harvest_m3,
                "pulp_tonnes": self.pulp_production_t,
                "paper_tonnes": self.paper_production_t
            },
            "annual_benefits": {
                "forest_inventory": inventory["total"],
                "harvest_optimization": harvest["total"],
                "bucking_value_recovery": bucking["value_uplift"],
                "mill_process_control": mill["total"],
                "logistics_optimization": logistics["total"],
                "total": round(total_annual_benefit, 0)
            },
            "costs": {
                "year_1_total": round(total_year1_cost, 0),
                "annual_recurring": round(total_annual_cost, 0),
                "lidar_annual": round(lidar_annual, 0)
            },
            "returns": {
                "roi_year_1_pct": round(roi_year1, 0),
                "roi_year_2_pct": round(roi_year2, 0),
                "payback_months": round(payback_months, 1),
                "net_benefit_year_1": round(
                    total_annual_benefit - total_year1_cost, 0
                ),
                "five_year_net": round(
                    total_annual_benefit * 5 - total_year1_cost -
                    total_annual_cost * 4, 0
                )
            }
        }

# Run the analysis
model = ForestryROIModel()
results = model.full_roi_analysis()

print(f"Company: {results['company_profile']['timberland_ha']:,} ha timberland")
print(f"Total Annual Benefits: ${results['annual_benefits']['total']:,.0f}")
print(f"Year 1 Cost: ${results['costs']['year_1_total']:,.0f}")
print(f"Year 1 ROI: {results['returns']['roi_year_1_pct']}%")
print(f"Year 2 ROI: {results['returns']['roi_year_2_pct']}%")
print(f"Payback Period: {results['returns']['payback_months']} months")
print(f"5-Year Net Benefit: ${results['returns']['five_year_net']:,.0f}")
Bottom line: An integrated forestry company investing $1.95M in year one (setup + annual costs including LiDAR) can expect $18-30M in annual benefits across the value chain, yielding a payback period under 2 months and year-2 ROI exceeding 1,500%. The largest single contributor is bucking optimization and mill process control, which together account for over 50% of total savings. Even using only the most conservative estimates, the investment pays for itself within the first quarter.

Getting Started: Implementation Roadmap

Deploying AI agents across an integrated forestry and paper operation works best as a phased approach, starting with the modules that deliver the fastest ROI and require the least integration complexity:

  1. Month 1-2: LiDAR inventory and growth modeling. Fly your highest-priority management units. Calibrate allometric models with existing ground plots. Deliver wall-to-wall volume and species maps that immediately improve harvest planning inputs.
  2. Month 3-4: Bucking optimization and log scaling. Deploy profile cameras and acoustic sensors at the primary landing or sort yard. Connect to the real-time price matrix from your marketing team. Measure value recovery uplift against historical stem-level data.
  3. Month 5-7: Mill process control. Integrate the Kappa prediction model with existing DCS sensors. Deploy refiner optimization on one line. Roll out CD/MD control enhancements on the paper machine. Track broke rate reduction and chemical savings.
  4. Month 8-10: Supply chain and truck dispatch. Install GPS tracking across the truck fleet. Connect mill demand signals to dispatch logic. Implement wood basket optimization for the next quarter's procurement plan.
  5. Month 11-12: Full integration and harvest planning. Connect all modules into a unified fiber-to-product planning system. Generate the first AI-optimized 5-year harvest schedule. Deploy carbon stock reporting for ESG and offset programs.

The key to adoption in forestry is respecting the expertise of foresters, operators, and mill engineers. The AI agent surfaces recommendations and quantifies trade-offs, but the registered professional forester signs the harvest plan, the mill superintendent approves process changes, and the truck dispatcher retains override authority. This advisory model builds trust and ensures regulatory compliance while capturing the full economic benefit of AI-driven optimization.

Build Your Own AI Agents with the Playbook

Step-by-step templates, architecture patterns, and deployment checklists for building production AI agents in any industry.

Get the Playbook — $19