playbook/antigravity-awesome-skills/skills/matematico-tao/scripts/dependency_graph.py

539 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Prof. Euler — Dependency Graph Analyzer
Gera grafo de dependências de projetos Kotlin/Android com análise matemática
de grafos: componentes, ciclos, centralidade, instabilidade.
Uso:
python dependency_graph.py [path] [--format dot|json|text] [--output FILE]
python dependency_graph.py C:/Users/renat/earbudllm
python dependency_graph.py C:/Users/renat/earbudllm --format dot --output deps.dot
"""
import os
import re
import sys
import json
import argparse
from pathlib import Path
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Tuple
# Fix Windows terminal encoding (cp1252 doesn't support emojis/unicode)
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except AttributeError:
pass # Python < 3.7 fallback
from collections import defaultdict, deque
@dataclass
class Node:
id: str
module: str
package: str
kind: str # 'class', 'interface', 'object', 'enum', 'data class'
is_abstract: bool = False
is_open: bool = False
@dataclass
class Edge:
src: str # node id
dst: str # node id
kind: str # 'implements', 'extends', 'uses', 'imports', 'delegates_to'
weight: float = 1.0
class DependencyGraph:
"""Grafo de dependências com análise matemática completa."""
def __init__(self):
self.nodes: Dict[str, Node] = {}
self.edges: List[Edge] = []
self._adj: Dict[str, Set[str]] = defaultdict(set) # adjacência direta
self._radj: Dict[str, Set[str]] = defaultdict(set) # adjacência reversa
def add_node(self, node: Node) -> None:
self.nodes[node.id] = node
def add_edge(self, edge: Edge) -> None:
if edge.src != edge.dst: # sem self-loops
self.edges.append(edge)
self._adj[edge.src].add(edge.dst)
self._radj[edge.dst].add(edge.src)
def successors(self, node_id: str) -> Set[str]:
return self._adj.get(node_id, set())
def predecessors(self, node_id: str) -> Set[str]:
return self._radj.get(node_id, set())
# ─── Algoritmos de Grafos ────────────────────────────────────────────────
def find_cycles(self) -> List[List[str]]:
"""Detecta ciclos usando DFS com coloração (branco/cinza/preto)."""
WHITE, GRAY, BLACK = 0, 1, 2
color = {n: WHITE for n in self.nodes}
cycles = []
path = []
def dfs(node: str) -> None:
color[node] = GRAY
path.append(node)
for neighbor in self._adj.get(node, set()):
if neighbor not in self.nodes:
continue
if color[neighbor] == GRAY:
# Ciclo encontrado! Extrair o ciclo do path
cycle_start = path.index(neighbor)
cycle = path[cycle_start:] + [neighbor]
cycles.append(cycle)
elif color[neighbor] == WHITE:
dfs(neighbor)
path.pop()
color[node] = BLACK
for node in list(self.nodes.keys()):
if color[node] == WHITE:
dfs(node)
# Deduplicar ciclos
unique = []
seen = set()
for cycle in cycles:
key = frozenset(cycle)
if key not in seen:
seen.add(key)
unique.append(cycle)
return unique
def strongly_connected_components(self) -> List[List[str]]:
"""Algoritmo de Kosaraju para SCCs."""
visited = set()
finish_order = []
def dfs1(node: str) -> None:
visited.add(node)
for neighbor in self._adj.get(node, set()):
if neighbor in self.nodes and neighbor not in visited:
dfs1(neighbor)
finish_order.append(node)
def dfs2(node: str, component: List[str]) -> None:
visited.add(node)
component.append(node)
for neighbor in self._radj.get(node, set()):
if neighbor in self.nodes and neighbor not in visited:
dfs2(neighbor, component)
# Fase 1: DFS no grafo original
for node in self.nodes:
if node not in visited:
dfs1(node)
# Fase 2: DFS no grafo transposto na ordem inversa de finalização
visited.clear()
sccs = []
for node in reversed(finish_order):
if node not in visited:
component = []
dfs2(node, component)
sccs.append(component)
return sccs
def topological_sort(self) -> Optional[List[str]]:
"""Ordenação topológica (só válida se DAG)."""
in_degree = defaultdict(int)
for edge in self.edges:
if edge.src in self.nodes and edge.dst in self.nodes:
in_degree[edge.dst] += 1
queue = deque([n for n in self.nodes if in_degree[n] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in self._adj.get(node, set()):
if neighbor in self.nodes:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) == len(self.nodes):
return result
return None # Há ciclos → não é DAG
def betweenness_centrality(self) -> Dict[str, float]:
"""
Centralidade de betweenness: nós que fazem "ponte" entre outros.
Alto betweenness = single point of failure.
Algoritmo de Brandes: O(V·E)
"""
betweenness = defaultdict(float)
nodes = list(self.nodes.keys())
for s in nodes:
# BFS para encontrar caminhos mais curtos de s para todos
stack = []
pred = defaultdict(list)
sigma = defaultdict(int)
sigma[s] = 1
dist = defaultdict(lambda: -1)
dist[s] = 0
queue = deque([s])
while queue:
v = queue.popleft()
stack.append(v)
for w in self._adj.get(v, set()):
if w not in self.nodes:
continue
if dist[w] < 0:
queue.append(w)
dist[w] = dist[v] + 1
if dist[w] == dist[v] + 1:
sigma[w] += sigma[v]
pred[w].append(v)
# Acumulação
delta = defaultdict(float)
while stack:
w = stack.pop()
for v in pred[w]:
if sigma[w] > 0:
delta[v] += (sigma[v] / sigma[w]) * (1 + delta[w])
if w != s:
betweenness[w] += delta[w]
# Normalizar
n = len(nodes)
if n > 2:
factor = 2 / ((n - 1) * (n - 2))
for node in betweenness:
betweenness[node] *= factor
return dict(betweenness)
def page_rank(self, damping: float = 0.85, iterations: int = 100) -> Dict[str, float]:
"""
PageRank para identificar classes/módulos mais "importantes".
Nodes com alto PageRank são frequentemente usados por outros.
"""
n = len(self.nodes)
if n == 0:
return {}
rank = {node: 1.0 / n for node in self.nodes}
for _ in range(iterations):
new_rank = {}
for node in self.nodes:
incoming_sum = sum(
rank.get(pred, 0) / max(len(self._adj.get(pred, set())), 1)
for pred in self._radj.get(node, set())
if pred in self.nodes
)
new_rank[node] = (1 - damping) / n + damping * incoming_sum
rank = new_rank
return rank
def coupling_metrics(self) -> Dict[str, Dict]:
"""Calcula Ca, Ce, instabilidade e abstração por módulo."""
module_nodes: Dict[str, Set[str]] = defaultdict(set)
for node_id, node in self.nodes.items():
module_nodes[node.module].add(node_id)
metrics = {}
for module, nodes in module_nodes.items():
# Ce: dependências efetivas (de fora do módulo)
efferent = set()
for n in nodes:
for dep in self._adj.get(n, set()):
if dep in self.nodes and self.nodes[dep].module != module:
efferent.add(self.nodes[dep].module)
# Ca: acoplamentos aferentes
afferent = set()
for n in nodes:
for dep in self._radj.get(n, set()):
if dep in self.nodes and self.nodes[dep].module != module:
afferent.add(self.nodes[dep].module)
ca = len(afferent)
ce = len(efferent)
instability = ce / (ca + ce) if (ca + ce) > 0 else 0.0
# Abstração: razão de interfaces/abstratas para total
abstracts = sum(1 for n in nodes if
n in self.nodes and
(self.nodes[n].is_abstract or self.nodes[n].kind == 'interface'))
abstraction = abstracts / max(len(nodes), 1)
# Distância da sequência principal: D = |A + I - 1|
# Sequência principal: A + I = 1 (ideal)
distance = abs(abstraction + instability - 1)
metrics[module] = {
'Ca': ca,
'Ce': ce,
'instability': round(instability, 3),
'abstraction': round(abstraction, 3),
'distance_from_main_sequence': round(distance, 3),
'total_classes': len(nodes),
'abstract_classes': abstracts,
'depends_on_modules': list(efferent),
'used_by_modules': list(afferent),
}
return metrics
class ProjectAnalyzer:
"""Analisa um projeto Kotlin/Android completo."""
def __init__(self, project_root: str):
self.project_root = Path(project_root)
self.graph = DependencyGraph()
self.module_files: Dict[str, List[Path]] = defaultdict(list)
def analyze(self) -> None:
"""Analisa todos os arquivos Kotlin."""
kt_files = list(self.project_root.glob("**/*.kt"))
for kt_file in kt_files:
module = self._detect_module(kt_file)
self.module_files[module].append(kt_file)
self._analyze_file(kt_file, module)
def _detect_module(self, file_path: Path) -> str:
known = ['app', 'bluetooth', 'audio', 'voice', 'llm', 'integrations', 'core-logging']
for part in file_path.parts:
if part in known:
return part
return 'unknown'
def _analyze_file(self, file_path: Path, module: str) -> None:
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
except Exception:
return
# Extrair package
pkg_match = re.search(r'^package\s+(.+)$', content, re.MULTILINE)
package = pkg_match.group(1).strip() if pkg_match else 'unknown'
# Extrair declarações de classe/interface/object
class_patterns = [
(r'(?:abstract\s+)?(?:open\s+)?(?:data\s+)?class\s+(\w+)', 'class'),
(r'interface\s+(\w+)', 'interface'),
(r'object\s+(\w+)', 'object'),
(r'enum\s+class\s+(\w+)', 'enum'),
]
for pattern, kind in class_patterns:
for match in re.finditer(pattern, content):
class_name = match.group(1)
node_id = f"{package}.{class_name}"
is_abstract = 'abstract' in content[max(0, match.start()-20):match.start()]
is_open = 'open' in content[max(0, match.start()-10):match.start()]
node = Node(
id=node_id,
module=module,
package=package,
kind=kind,
is_abstract=is_abstract,
is_open=is_open
)
self.graph.add_node(node)
# Extrair dependências via imports
imports = re.findall(r'^import\s+(.+)$', content, re.MULTILINE)
for imp in imports:
imp = imp.strip()
if imp.startswith('com.earllm'):
dep_id = imp
if dep_id != node_id:
edge = Edge(src=node_id, dst=dep_id, kind='imports')
self.graph.add_edge(edge)
def generate_full_report(self) -> Dict:
"""Gera relatório matemático completo do grafo."""
cycles = self.graph.find_cycles()
sccs = self.graph.strongly_connected_components()
topo = self.graph.topological_sort()
betweenness = self.graph.betweenness_centrality()
pagerank = self.graph.page_rank()
coupling = self.graph.coupling_metrics()
# Top nodes por betweenness (single points of failure)
top_betweenness = sorted(
betweenness.items(), key=lambda x: x[1], reverse=True
)[:10]
# Top nodes por pagerank (mais influentes)
top_pagerank = sorted(
pagerank.items(), key=lambda x: x[1], reverse=True
)[:10]
# SCCs com mais de 1 nó (ciclos reais)
real_sccs = [scc for scc in sccs if len(scc) > 1]
return {
'graph_summary': {
'nodes': len(self.graph.nodes),
'edges': len(self.graph.edges),
'is_dag': topo is not None,
'cycles_found': len(cycles),
'strongly_connected_components': len(real_sccs),
},
'cycles': cycles[:10], # primeiros 10 ciclos
'real_sccs': real_sccs[:5],
'topological_order': topo[:20] if topo else None,
'top_betweenness_centrality': [
{'node': n, 'betweenness': round(b, 4)} for n, b in top_betweenness
],
'top_pagerank': [
{'node': n, 'pagerank': round(pr, 4)} for n, pr in top_pagerank
],
'module_coupling': coupling,
'modules': {mod: len(files) for mod, files in self.module_files.items()},
}
def to_dot(self) -> str:
"""Exporta grafo em formato DOT para visualização (Graphviz)."""
lines = ['digraph AuriDependencies {']
lines.append(' rankdir=LR;')
lines.append(' node [shape=box, fontname="Arial"];')
# Cores por módulo
module_colors = {
'app': '#4CAF50',
'bluetooth': '#2196F3',
'audio': '#FF9800',
'voice': '#9C27B0',
'llm': '#F44336',
'integrations': '#00BCD4',
'core-logging': '#607D8B',
}
# Adicionar nós por módulo
modules_seen = defaultdict(list)
for node_id, node in self.graph.nodes.items():
modules_seen[node.module].append((node_id, node))
for module, nodes in modules_seen.items():
color = module_colors.get(module, '#9E9E9E')
lines.append(f' subgraph cluster_{module.replace("-", "_")} {{')
lines.append(f' label="{module}";')
lines.append(f' style=filled;')
lines.append(f' fillcolor="{color}20";') # 20 = ~12% opacity
for node_id, node in nodes[:20]: # limitar para visualização
short_name = node_id.split('.')[-1]
shape = 'diamond' if node.kind == 'interface' else 'box'
lines.append(f' "{node_id}" [label="{short_name}", shape={shape}];')
lines.append(' }')
# Adicionar arestas (amostra)
for edge in self.edges[:100]: # limitar para visualização
lines.append(f' "{edge.src}" -> "{edge.dst}" [label="{edge.kind}"];')
lines.append('}')
return '\n'.join(lines)
@property
def edges(self):
return self.graph.edges
def print_report(self, report: Dict) -> None:
print("\n" + "="*70)
print(" PROF. EULER — ANÁLISE DE GRAFOS DE DEPENDÊNCIAS")
print("="*70)
gs = report['graph_summary']
print(f"\n📊 RESUMO DO GRAFO G = (V={gs['nodes']}, E={gs['edges']}):")
print(f" É DAG (sem ciclos): {'✅ SIM' if gs['is_dag'] else '❌ NÃO'}")
print(f" Ciclos detectados: {gs['cycles_found']}")
print(f" SCCs com mais de 1 nó: {gs['strongly_connected_components']}")
if report['cycles']:
print(f"\n❌ CICLOS DE DEPENDÊNCIA (devem ser eliminados):")
for i, cycle in enumerate(report['cycles'][:5], 1):
print(f" {i}. {''.join(c.split('.')[-1] for c in cycle)}")
if report['top_betweenness_centrality']:
print(f"\n⚠️ TOP NÓDULOS CRÍTICOS (single points of failure — alto betweenness):")
for item in report['top_betweenness_centrality'][:5]:
short = item['node'].split('.')[-1]
print(f" {short:<35} betweenness={item['betweenness']:.4f}")
print(f"\n📦 ACOPLAMENTO DE MÓDULOS (Princípio de Martin):")
print(f" {'Módulo':<20} {'Ca':>5} {'Ce':>5} {'I':>8} {'A':>8} {'D':>8} Status")
print(f" {'-'*20} {'-'*5} {'-'*5} {'-'*8} {'-'*8} {'-'*8} {'-'*20}")
for mod, data in sorted(report['module_coupling'].items()):
i = data['instability']
a = data['abstraction']
d = data['distance_from_main_sequence']
status = "✅ OK" if d < 0.3 else ("⚠️ ZONA DE PROBLEMA" if d < 0.5 else "❌ FORA DA SEQ.")
print(f" {mod:<20} {data['Ca']:>5} {data['Ce']:>5} {i:>8.3f} {a:>8.3f} {d:>8.3f} {status}")
print("\n I=instabilidade, A=abstração, D=distância da sequência principal")
print(" D ideal < 0.3 (zona de exclusão = zona de dor/inutilidade)")
print("\n" + "="*70 + "\n")
def main():
parser = argparse.ArgumentParser(
description='Prof. Euler — Análise de Grafos de Dependências'
)
parser.add_argument('path', nargs='?',
default=r'C:\Users\renat\earbudllm',
help='Caminho raiz do projeto')
parser.add_argument('--format', '-f', choices=['text', 'json', 'dot'],
default='text', help='Formato de saída')
parser.add_argument('--output', '-o', help='Arquivo de saída')
args = parser.parse_args()
print(f"🔬 Prof. Euler analisando grafos: {args.path}")
analyzer = ProjectAnalyzer(args.path)
analyzer.analyze()
report = analyzer.generate_full_report()
if args.format == 'json':
output = json.dumps(report, indent=2, ensure_ascii=False)
print(output)
elif args.format == 'dot':
output = analyzer.to_dot()
print(output)
if args.output:
Path(args.output).write_text(output)
print(f"\n✅ Arquivo DOT salvo: {args.output}")
print(" Para visualizar: dot -Tpng deps.dot -o deps.png")
else:
analyzer.print_report(report)
if args.output and args.format != 'dot':
Path(args.output).write_text(
json.dumps(report, indent=2, ensure_ascii=False),
encoding='utf-8'
)
print(f"✅ Relatório salvo: {args.output}")
return report
if __name__ == '__main__':
main()