Source code for prorca.pathway

# pathway.py
import numpy as np
import pandas as pd
import networkx as nx
from typing import Dict, List, Tuple

import dowhy.gcm as gcm
from dowhy.gcm.anomaly_scorers import MedianCDFQuantileScorer
from dowhy.gcm import anomaly, attribute_anomalies

from graphviz import Digraph
from IPython.display import Image, display


[docs] class ScmBuilder: """ A builder class to construct a Structural Causal Model (SCM) from a given set of edges (and optionally nodes). It also provides a visualization of the causal graph if desired. Parameters ---------- edges : list of tuple List of edges in the format (source, target) representing causal relationships. nodes : list of str, optional List of nodes to include in the graph. If not provided, nodes are automatically inferred from edges. visualize : bool, default False Whether to visualize the constructed causal graph using Graphviz. viz_filename : str, default "dag_relationships" The base filename (without extension) to use for saving the graph visualization. random_seed : int, default 0 Random seed for reproducibility when building and fitting the SCM. """ def __init__( self, edges, nodes=None, visualize=False, viz_filename="dag_relationships", random_seed=0, ): self.edges = edges self.nodes = nodes self.visualize = visualize self.viz_filename = viz_filename self.random_seed = random_seed self.causal_graph = None self.scm = None
[docs] def build_graph(self): """ Build a networkx directed graph (DiGraph) from the provided nodes and edges. Returns ------- causal_graph : nx.DiGraph The constructed causal graph. """ self.causal_graph = nx.DiGraph() # If a list of nodes is provided, add them explicitly if self.nodes: self.causal_graph.add_nodes_from(self.nodes) # Add edges (this will automatically add any nodes that are not already in the graph) self.causal_graph.add_edges_from(self.edges) return self.causal_graph
[docs] def visualize_graph(self): """ Visualize the causal graph using Graphviz. Returns ------- image : IPython.display.Image The rendered image of the graph (useful in notebook environments). Raises ------ ValueError If the causal graph has not yet been built. """ if self.causal_graph is None: raise ValueError("Causal graph not built. Please call build_graph() first.") dag = Digraph(format="png", engine="dot") # Add each edge to the Graphviz graph for source, target in self.edges: dag.edge(source, target) # Render and save the graph image; view=True will open it in a default viewer dag.render(self.viz_filename, view=True) # Return the image object for inline display in notebooks return Image(f"{self.viz_filename}.png")
[docs] def build_scm(self, df=None): """ Build the Structural Causal Model (SCM) from the causal graph. If a DataFrame is provided, the method will automatically assign causal mechanisms and fit the model. Parameters ---------- df : pd.DataFrame, optional The data to use for automatically assigning generative models to each node and fitting the SCM. Returns ------- scm : gcm.StructuralCausalModel The constructed (and possibly fitted) SCM. """ # Set the random seed for reproducibility gcm.util.general.set_random_seed(self.random_seed) if self.causal_graph is None: self.build_graph() # Create the SCM from the causal graph self.scm = gcm.StructuralCausalModel(self.causal_graph) # If a DataFrame is provided, perform auto-assignment and fit the model if df is not None: print("Automatically assigning causal mechanisms...") auto_assignment_summary = gcm.auto.assign_causal_mechanisms( self.scm, df, gcm.auto.AssignmentQuality.BETTER ) print("Fitting the Structural Causal Model...") gcm.fit(self.scm, df) print(auto_assignment_summary) return self.scm
[docs] def build(self, df=None): """ Convenience method to build the causal graph, optionally visualize it, and then construct the SCM (with optional auto-assignment and fitting if data is provided). Parameters ---------- df : pd.DataFrame, optional The data to use for automatically assigning causal mechanisms and fitting the SCM. Returns ------- scm : gcm.StructuralCausalModel The final Structural Causal Model. """ self.build_graph() if self.visualize: self.visualize_graph() return self.build_scm(df)
[docs] class CausalRootCauseAnalyzer: """ Advanced root cause analyzer combining structural and noise-based approaches. """ def __init__(self, scm, min_score_threshold: float = 0.8): self.scm = scm self.min_score_threshold = min_score_threshold self.noise_contributions = None self.node_scores = None def _calculate_noise_contributions( self, df_agg: pd.DataFrame, anomaly_dates ) -> Dict[str, np.ndarray]: """ Calculate noise-based contributions for each node using DoWhy's attribution. """ noise_contributions = {} anomaly_samples = df_agg[df_agg["ORDERDATE"].isin(anomaly_dates)] for node in self.scm.graph.nodes(): try: # Use DoWhy's attribute_anomalies for each node contributions = attribute_anomalies( causal_model=self.scm, target_node=node, anomaly_samples=anomaly_samples, anomaly_scorer=MedianCDFQuantileScorer(), attribute_mean_deviation=True, num_distribution_samples=5000, ) # Ensure contributions is a numpy array if isinstance(contributions, dict): # If it's a dictionary, extract the values we need # This might need adjustment based on the actual structure contributions = np.array(list(contributions.values())) noise_contributions[node] = contributions except Exception as e: print( f"Warning: Could not calculate noise contribution for {node}: {e}" ) noise_contributions[node] = np.array([0.0]) # Default value continue return noise_contributions def _calculate_structural_scores( self, df_agg: pd.DataFrame, anomaly_dates ) -> Dict[str, np.ndarray]: """ Calculate structural anomaly scores using conditional mechanisms. """ node_scores = {} for node in self.scm.graph.nodes(): mechanism = self.scm.causal_mechanism(node) parents = list(self.scm.graph.predecessors(node)) if mechanism and parents: parent_samples = df_agg[parents].values target_samples = df_agg[node].values scores = anomaly.conditional_anomaly_scores( parent_samples=parent_samples, target_samples=target_samples, causal_mechanism=mechanism, num_samples_conditional=10000, ) node_scores[node] = scores[df_agg["ORDERDATE"].isin(anomaly_dates)] return node_scores def _calculate_combined_score(self, node: str) -> float: """ Enhanced combined score calculation with better weighting. """ if node not in self.node_scores or node not in self.noise_contributions: return 0.0 try: structural_score = float(self.node_scores[node].mean()) # Get the maximum absolute noise contribution noise_contribution = self.noise_contributions[node] if not isinstance(noise_contribution, np.ndarray): noise_contribution = np.array(noise_contribution) noise_score = float(np.max(np.abs(noise_contribution))) # Weighted combination favoring structural scores if structural_score + noise_score == 0: return 0.0 combined_score = 0.7 * structural_score + 0.3 * noise_score return combined_score except Exception as e: print(f"Warning: Error calculating combined score for {node}: {e}") return 0.0 def _find_root_cause_paths(self, start_node: str) -> List[List[Tuple[str, float]]]: """ Find paths to root causes using combined scoring approach with improved criteria. """ all_paths = [] visited = set() stack = [ ( start_node, [(start_node, self._calculate_combined_score(start_node))], ) ] while stack: current_node, current_path = stack.pop() parents = list(self.scm.graph.predecessors(current_node)) # If no parents, consider this a potential root cause if not parents: if current_path[-1][1] >= self.min_score_threshold: all_paths.append(current_path) continue # Track if we found any significant parent found_significant_parent = False for parent in parents: if parent in visited: continue parent_score = self._calculate_combined_score(parent) # More lenient threshold for intermediate nodes if parent_score >= self.min_score_threshold * 0.7: found_significant_parent = True new_path = current_path + [(parent, parent_score)] stack.append((parent, new_path)) visited.add(parent) # If no significant parents found, consider this a root cause if ( not found_significant_parent and current_path[-1][1] >= self.min_score_threshold ): all_paths.append(current_path) return all_paths
[docs] def analyze( self, df_agg: pd.DataFrame, anomaly_dates, start_node: str = "PROFIT_MARGIN", ) -> Dict: """ Main analysis method combining all approaches. """ print("Calculating noise-based contributions...") self.noise_contributions = self._calculate_noise_contributions( df_agg, anomaly_dates ) print("Calculating structural anomaly scores...") self.node_scores = self._calculate_structural_scores(df_agg, anomaly_dates) print("\nIdentifying root cause paths...") paths = self._find_root_cause_paths(start_node) # Calculate path significance using combined metrics path_scores = [] for path in paths: root_node = path[-1][0] path_score = self._calculate_path_significance(path, root_node) path_scores.append((path, path_score)) # Sort by significance sorted_paths = sorted(path_scores, key=lambda x: x[1], reverse=True) self._print_analysis_results(sorted_paths) return { "paths": sorted_paths, "node_scores": self.node_scores, "noise_contributions": self.noise_contributions, }
[docs] def analyze_by_date( self, df_agg: pd.DataFrame, anomaly_dates, start_node: str = "PROFIT_MARGIN", ) -> Dict: """ Run the analysis separately for each anomaly date so that date-specific root causes are captured. Parameters ---------- df_agg : pd.DataFrame The aggregated data containing an 'ORDERDATE' column. anomaly_dates : iterable An iterable of anomaly dates (e.g., a list or DatetimeIndex). start_node : str, default 'PROFIT_MARGIN' The starting node for the root cause analysis. Returns ------- results : dict A dictionary where each key is an anomaly date and the value is the analysis result for that date. """ results = {} for ad in anomaly_dates: print(f"\n--- Analyzing anomaly date: {ad} ---") # Filter for a single anomaly date (wrap the date in a list so that .isin works) current_date = [ad] self.noise_contributions = self._calculate_noise_contributions( df_agg, current_date ) self.node_scores = self._calculate_structural_scores(df_agg, current_date) paths = self._find_root_cause_paths(start_node) path_scores = [] for path in paths: root_node = path[-1][0] path_score = self._calculate_path_significance(path, root_node) path_scores.append((path, path_score)) sorted_paths = sorted(path_scores, key=lambda x: x[1], reverse=True) results[ad] = { "paths": sorted_paths, "node_scores": self.node_scores, "noise_contributions": self.noise_contributions, } self._print_analysis_results(sorted_paths) return results
def _calculate_path_significance( self, path: List[Tuple[str, float]], root_node: str ) -> float: """ Calculate path significance using causal consistency metrics. """ # Get noise contribution of root node root_noise = np.mean(self.noise_contributions.get(root_node, [0])) # Calculate path consistency path_nodes = [node for node, _ in path] consistency_score = self._evaluate_causal_consistency(path_nodes) # Fallback to 0 if consistency_score is NaN if np.isnan(consistency_score): consistency_score = 0.0 # Combine scores with theoretical weights return 0.7 * root_noise + 0.3 * consistency_score def _evaluate_causal_consistency(self, path_nodes: List[str]) -> float: """ Evaluate causal consistency of the path using noise patterns. """ consistency_scores = [] for i in range(len(path_nodes) - 1): current = path_nodes[i] next_node = path_nodes[i + 1] curr_array = self.noise_contributions.get(current, [0]) next_array = self.noise_contributions.get(next_node, [0]) # If there are not enough data points, skip the correlation calculation. if len(curr_array) < 2 or len(next_array) < 2: continue correlation_matrix = np.corrcoef(curr_array, next_array) correlation = correlation_matrix[0, 1] # Replace NaN correlations with 0 if np.isnan(correlation): correlation = 0.0 consistency_scores.append(abs(correlation)) return np.mean(consistency_scores) if consistency_scores else 0.0 def _print_analysis_results( self, sorted_paths: List[Tuple[List[Tuple[str, float]], float]] ): """ Print detailed analysis results. """ print(f"\nFound {len(sorted_paths)} potential root cause paths.") print("\nDetailed path analysis (ordered by causal significance):") print("-" * 60) for i, (path, significance) in enumerate(sorted_paths, 1): print(f"\nPath {i} (Causal Significance: {significance:.4f}):") for j, (node, score) in enumerate(path): prefix = "└─" if j == len(path) - 1 else "├─" noise_contrib = np.mean(self.noise_contributions.get(node, [0])) print( f"{' ' * j}{prefix} {node:<20} " f"(Combined Score: {score:.4f}, Noise Contribution: {noise_contrib:.4f})" )
[docs] class CausalResultsVisualizer: """ Visualizes the results from CausalRootCauseAnalyzer. The class expects the analysis results to contain: - 'paths': a list of tuples (path, significance), where each path is a list of tuples (node, combined_score) - 'node_scores': a dict mapping node -> array of structural scores - 'noise_contributions': a dict mapping node -> array of noise contributions It offers several plotting methods: - plot_root_cause_paths: a network diagram of the root cause pathways. - plot_node_scores: a bar chart of average structural scores per node. - plot_noise_contributions_distribution: a boxplot for the distribution of noise contributions. - plot_consistency_heatmap: a heatmap of the correlation between nodes' noise contributions. - plot_timeline: a timeline plot if you run separate analyses per anomaly date. """ def __init__(self, analysis_results): """ Parameters ---------- analysis_results : dict Results from the analyzer.analyze() call. Expected keys are 'paths', 'node_scores', and 'noise_contributions'. """ self.results = analysis_results
[docs] def plot_root_cause_paths(self): """ Visualize the discovered root cause pathways using Graphviz for clarity. Each path is displayed as a separate cluster with a background color in a gradient that starts from light green and moves to yellow. The order of nodes is reversed such that the root cause appears first and the final outcome (e.g., 'PROFIT_MARGIN') appears last. Duplicate arrows for identical edges across different paths are omitted. The chart is rendered inline in a Jupyter Notebook. """ paths = self.results.get("paths", []) if not paths: print("No root cause paths found.") return total_paths = len(paths) # Helper function to interpolate between two hex colors. def interpolate_color(color1, color2, factor): # Remove the '#' and convert to integers. r1, g1, b1 = ( int(color1[1:3], 16), int(color1[3:5], 16), int(color1[5:7], 16), ) r2, g2, b2 = ( int(color2[1:3], 16), int(color2[3:5], 16), int(color2[5:7], 16), ) r = int(r1 + factor * (r2 - r1)) g = int(g1 + factor * (g2 - g1)) b = int(b1 + factor * (b2 - b1)) return f"#{r:02x}{g:02x}{b:02x}" # Define the gradient endpoints: # Start with light green and end with yellow. color_start = "#50C878" # Light green. color_end = "#ffff99" # Yellow. # Create the main Graphviz digraph. dot = Digraph(format="png") # Global set to track already added edges (to avoid duplicates). added_edges = set() # Iterate over each discovered path. for idx, (path, significance) in enumerate(paths): # Compute a gradient factor: 0 for the first path, 1 for the final path. factor = idx / (total_paths - 1) if total_paths > 1 else 0 fill_color = interpolate_color(color_start, color_end, factor) # Reverse the path so that arrows point from the root cause to the final outcome. reversed_path = path[::-1] with dot.subgraph(name=f"cluster_{idx}") as c: # Set the cluster background to the computed gradient color. c.attr(style="filled", fillcolor=fill_color) c.attr(label=f"Path {idx+1}\nSignificance: {significance:.2f}") for i, (node, combined_score) in enumerate(reversed_path): node_label = f"{node}\n({combined_score:.2f})" # Add the node using its name as a unique identifier. c.node( node, label=node_label, shape="box", style="rounded,filled", fillcolor="skyblue", ) # Add an edge to the next node if available. if i < len(reversed_path) - 1: next_node = reversed_path[i + 1][0] edge_tuple = (node, next_node) if edge_tuple not in added_edges: c.edge(node, next_node, arrowhead="normal") added_edges.add(edge_tuple) # Render the Graphviz diagram to a PNG image and display it inline. png_data = dot.pipe(format="png") display(Image(png_data))