change recipe to have a method to obtain graph data and use it in show

This commit is contained in:
James Turk 2023-05-23 16:24:54 -05:00
parent 20f9809689
commit 046c9e453b
3 changed files with 41 additions and 12 deletions

View File

@ -10,7 +10,6 @@ from scrapeghost.preprocessors import CleanHTML
async def add_response(obj_with_url):
print(obj_with_url["url"])
url = obj_with_url["url"]
async with httpx.AsyncClient() as client:
response = await client.get(url)

View File

@ -2,6 +2,7 @@ import importlib
from types import SimpleNamespace
import typer
import sys
from pprint import pprint
from typing import List, Optional
from typing_extensions import Annotated
@ -47,6 +48,11 @@ def show(ctx: typer.Context):
ctx.obj.show()
@app.command()
def graph(ctx: typer.Context):
pprint(ctx.obj.graph_data())
@app.command()
def run(
ctx: typer.Context,

View File

@ -166,20 +166,17 @@ class Recipe:
return beaker
def show(self):
for node in networkx.topological_sort(self.graph):
beaker = self.beakers[node]
temp = isinstance(beaker, TempBeaker)
if temp:
typer.secho(node, fg=typer.colors.CYAN)
graph_data = self.graph_data()
for node in graph_data:
if node["temp"]:
typer.secho(node["name"], fg=typer.colors.CYAN)
else:
lb = len(beaker)
typer.secho(
f"{node} ({lb})",
fg=typer.colors.GREEN if lb else typer.colors.YELLOW,
f"{node['name']} ({node['len']})",
fg=typer.colors.GREEN if node["len"] else typer.colors.YELLOW,
)
for from_b, to_b, edge in self.graph.out_edges(node, data=True):
name = edge["transform"].name
print(f" {from_b} -({name})-> {to_b}")
for edge in node["edges"]:
print(f" -({edge['transform'].name})-> {edge['to_beaker']}")
for k, v in edge["transform"].error_map.items():
if isinstance(k, tuple):
typer.secho(
@ -189,6 +186,33 @@ class Recipe:
else:
typer.secho(f" {k.__name__} -> {v}", fg=typer.colors.RED)
def graph_data(self):
nodes = {}
for node in networkx.topological_sort(self.graph):
beaker = self.beakers[node]
temp = isinstance(beaker, TempBeaker)
nodes[node] = {
"name": node,
"temp": temp,
"len": len(beaker),
"edges": [],
}
rank = 0
for from_b, to_b, edge in self.graph.in_edges(node, data=True):
if nodes[from_b]["rank"] > rank:
rank = nodes[from_b]["rank"]
nodes[node]["rank"] = rank + 1
for from_b, to_b, edge in self.graph.out_edges(node, data=True):
edge["to_beaker"] = to_b
nodes[node]["edges"].append(edge)
# all data collected for display
return sorted(nodes.values(), key=lambda x: (x["rank"], x["name"]))
def run_once(
self, start_beaker: str | None = None, end_beaker: str | None = None
) -> None: