From 3eb14f9556a1d18bd37fb58237b0d84f47fbc2e8 Mon Sep 17 00:00:00 2001 From: James Turk Date: Mon, 8 May 2023 03:14:45 -0500 Subject: [PATCH] nicer output --- src/beakers/recipe.py | 83 ++++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 21 deletions(-) diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 59df5c8..5666859 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -19,6 +19,21 @@ def get_sha512(filename: str) -> str: return hashlib.sha512(file.read()).hexdigest() +@dataclass(frozen=True, eq=True) +class Transform: + name: str + transform_func: callable + error_map: dict[tuple, str] + + +def if_cond_true(data_cond_tup: tuple[dict, bool]) -> dict | None: + return data_cond_tup[0] if data_cond_tup[1] else None + + +def if_cond_false(data_cond_tup: tuple[dict, bool]) -> dict | None: + return data_cond_tup[0] if not data_cond_tup[1] else None + + class Recipe: def __init__(self, name, db_name="beakers.db"): self.name = name @@ -46,10 +61,23 @@ class Recipe: from_beaker: str, to_beaker: str, transform_func: callable, + *, + name=None, error_map: dict[tuple, str] | None = None, ) -> None: + if name is None: + name = transform_func.__name__ + if name == "": + name = "λ" + transform = Transform( + name=name, + transform_func=transform_func, + error_map=error_map or {}, + ) self.graph.add_edge( - from_beaker, to_beaker, transform=transform_func, error_map=error_map + from_beaker, + to_beaker, + transform=transform, ) def add_conditional( @@ -60,28 +88,28 @@ class Recipe: if_false: str, ) -> None: # first add a transform to evaluate the conditional - cond_name = f"cond-{from_beaker}-{condition_func.__name__}" + if condition_func.__name__ == "": + cond_name = f"cond-{from_beaker}" + else: + cond_name = f"cond-{from_beaker}-{condition_func.__name__}" self.add_beaker(cond_name, temp=True) self.add_transform( from_beaker, cond_name, lambda data: (data, condition_func(data)), + name=cond_name, ) # then add two filtered paths that remove the condition result - self.graph.add_edge( + self.add_transform( cond_name, if_true, - transform=lambda data_cond_tup: data_cond_tup[0] - if data_cond_tup[1] - else None, + if_cond_true, ) - self.graph.add_edge( + self.add_transform( cond_name, if_false, - transform=lambda data_cond_tup: data_cond_tup[0] - if not data_cond_tup[1] - else None, + if_cond_false, ) def get_metadata(self, table_name) -> dict: @@ -150,7 +178,13 @@ class Recipe: fg=typer.colors.GREEN if lb else typer.colors.YELLOW, ) for from_b, to_b, edge in self.graph.out_edges(node, data=True): - print(f" {from_b} -> {to_b} ({edge})") + name = edge["transform"].name + print(f" {from_b} -({name})-> {to_b}") + for k, v in edge["transform"].error_map.items(): + typer.secho( + f" {' '.join(c.__name__ for c in k)} -> {v}", + fg=typer.colors.RED, + ) def run_once( self, start_beaker: str | None = None, end_beaker: str | None = None @@ -178,21 +212,25 @@ class Recipe: edges = self.graph.out_edges(node, data=True) for from_b, to_b, edge in edges: - if "transform" not in edge: - raise ValueError(f"unknown edge: {edge}") - - error_map = edge.get("error_map", {}) + transform = edge["transform"] from_beaker = self.beakers[from_b] to_beaker = self.beakers[to_b] - func = edge["transform"] - log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) + log.info( + "transform", + from_b=from_b, + to_b=to_b, + items=len(from_beaker), + transform=edge["transform"].name, + ) # convert coroutine to function - if inspect.iscoroutinefunction(func): - t_func = lambda x: loop.run_until_complete(func(x)) + if inspect.iscoroutinefunction(transform.transform_func): + t_func = lambda x: loop.run_until_complete( + transform.transform_func(x) + ) else: - t_func = func + t_func = transform.transform_func for id, item in from_beaker.items(): try: @@ -200,7 +238,10 @@ class Recipe: if transformed: to_beaker.add_item(transformed, id) except Exception as e: - for error_types, error_beaker_name in error_map.items(): + for ( + error_types, + error_beaker_name, + ) in transform.error_map.items(): if isinstance(e, error_types): error_beaker = self.beakers[error_beaker_name] error_beaker.add_item(