From 8ef59d3d3c653e74b20ac76f6d59fa8350514eea Mon Sep 17 00:00:00 2001 From: James Turk Date: Sun, 7 May 2023 23:48:19 -0500 Subject: [PATCH] fixes for error handling and foiaghost --- examples/foiaghost.py | 11 +++++----- src/beakers/recipe.py | 47 ++++++++++++++++--------------------------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/examples/foiaghost.py b/examples/foiaghost.py index 405beaf..6d6546c 100644 --- a/examples/foiaghost.py +++ b/examples/foiaghost.py @@ -6,7 +6,8 @@ from beakers.recipe import Recipe async def add_response(obj_with_url): print(obj_with_url["url"]) url = obj_with_url["url"] - response = await httpx.get(url) + async with httpx.AsyncClient() as client: + response = await client.get(url) return { "url": url, "status_code": response.status_code, @@ -28,8 +29,8 @@ recipe.add_conditional( if_false="missing_urls", ) recipe.add_transform( - "good_urls", "responses", add_response, error_beaker="bad_requests" + "good_urls", + "responses", + add_response, + error_map={(httpx.ConnectError, httpx.ConnectTimeout): "bad_requests"}, ) - -recipe.csv_to_beaker("agencies.csv", "agencies") -recipe.run_once() diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 56db89b..5af0098 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -46,10 +46,10 @@ class Recipe: from_beaker: str, to_beaker: str, transform_func: callable, - error_beaker: str | None = None, + error_map: dict[tuple, str] | None = None, ) -> None: self.graph.add_edge( - from_beaker, to_beaker, transform=transform_func, error_beaker=error_beaker + from_beaker, to_beaker, transform=transform_func, error_map=error_map ) def add_conditional( @@ -152,14 +152,6 @@ class Recipe: for from_b, to_b, edge in self.graph.out_edges(node, data=True): print(f" {from_b} -> {to_b} ({edge})") - def reset(self): - count = 0 - for beaker in self.beakers.values(): - if isinstance(beaker, SqliteBeaker): - beaker.reset() - count += 1 - typer.secho(f"reset {count} beakers") - def run_once(self): log.info("run_once", recipe=self) loop = asyncio.get_event_loop() @@ -173,39 +165,34 @@ class Recipe: if "transform" not in edge: raise ValueError(f"unknown edge: {edge}") - if error_b := edge.get("error_beaker"): - error_beaker = self.beakers[error_b] - else: - error_beaker = None + error_map = edge.get("error_map", {}) from_beaker = self.beakers[from_b] to_beaker = self.beakers[to_b] func = edge["transform"] log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) + + # convert coroutine to function if inspect.iscoroutinefunction(func): t_func = lambda x: loop.run_until_complete(func(x)) else: t_func = func - # for id, item in from_beaker.items(): - # try: - # transformed = loop.run_until_complete(func(item)) - # if transformed: - # to_beaker.add_item(transformed, from_beaker.name, id) - # except Exception as e: - # error_beaker.add_item( - # {"exception": str(e), "exc_type": type(e)} - # from_beaker.name, id - # ) - # else: for id, item in from_beaker.items(): try: transformed = t_func(item) if transformed: to_beaker.add_item(transformed, from_beaker.name, id) except Exception as e: - error_beaker.add_item( - {"exception": str(e), "exc_type": str(type(e))}, - from_beaker.name, - id, - ) + for error_types, error_beaker_name in error_map.items(): + if isinstance(e, error_types): + error_beaker = self.beakers[error_beaker_name] + error_beaker.add_item( + {"exception": str(e), "exc_type": str(type(e))}, + from_beaker.name, + id, + ) + break + else: + # no error handler, re-raise + raise