fixes for error handling and foiaghost

This commit is contained in:
James Turk 2023-05-07 23:48:19 -05:00
parent 2fc2732a65
commit 8ef59d3d3c
2 changed files with 23 additions and 35 deletions

View File

@ -6,7 +6,8 @@ from beakers.recipe import Recipe
async def add_response(obj_with_url): async def add_response(obj_with_url):
print(obj_with_url["url"]) print(obj_with_url["url"])
url = obj_with_url["url"] url = obj_with_url["url"]
response = await httpx.get(url) async with httpx.AsyncClient() as client:
response = await client.get(url)
return { return {
"url": url, "url": url,
"status_code": response.status_code, "status_code": response.status_code,
@ -28,8 +29,8 @@ recipe.add_conditional(
if_false="missing_urls", if_false="missing_urls",
) )
recipe.add_transform( recipe.add_transform(
"good_urls", "responses", add_response, error_beaker="bad_requests" "good_urls",
"responses",
add_response,
error_map={(httpx.ConnectError, httpx.ConnectTimeout): "bad_requests"},
) )
recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.run_once()

View File

@ -46,10 +46,10 @@ class Recipe:
from_beaker: str, from_beaker: str,
to_beaker: str, to_beaker: str,
transform_func: callable, transform_func: callable,
error_beaker: str | None = None, error_map: dict[tuple, str] | None = None,
) -> None: ) -> None:
self.graph.add_edge( self.graph.add_edge(
from_beaker, to_beaker, transform=transform_func, error_beaker=error_beaker from_beaker, to_beaker, transform=transform_func, error_map=error_map
) )
def add_conditional( def add_conditional(
@ -152,14 +152,6 @@ class Recipe:
for from_b, to_b, edge in self.graph.out_edges(node, data=True): for from_b, to_b, edge in self.graph.out_edges(node, data=True):
print(f" {from_b} -> {to_b} ({edge})") print(f" {from_b} -> {to_b} ({edge})")
def reset(self):
count = 0
for beaker in self.beakers.values():
if isinstance(beaker, SqliteBeaker):
beaker.reset()
count += 1
typer.secho(f"reset {count} beakers")
def run_once(self): def run_once(self):
log.info("run_once", recipe=self) log.info("run_once", recipe=self)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -173,39 +165,34 @@ class Recipe:
if "transform" not in edge: if "transform" not in edge:
raise ValueError(f"unknown edge: {edge}") raise ValueError(f"unknown edge: {edge}")
if error_b := edge.get("error_beaker"): error_map = edge.get("error_map", {})
error_beaker = self.beakers[error_b]
else:
error_beaker = None
from_beaker = self.beakers[from_b] from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b] to_beaker = self.beakers[to_b]
func = edge["transform"] func = edge["transform"]
log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker))
# convert coroutine to function
if inspect.iscoroutinefunction(func): if inspect.iscoroutinefunction(func):
t_func = lambda x: loop.run_until_complete(func(x)) t_func = lambda x: loop.run_until_complete(func(x))
else: else:
t_func = func t_func = func
# for id, item in from_beaker.items():
# try:
# transformed = loop.run_until_complete(func(item))
# if transformed:
# to_beaker.add_item(transformed, from_beaker.name, id)
# except Exception as e:
# error_beaker.add_item(
# {"exception": str(e), "exc_type": type(e)}
# from_beaker.name, id
# )
# else:
for id, item in from_beaker.items(): for id, item in from_beaker.items():
try: try:
transformed = t_func(item) transformed = t_func(item)
if transformed: if transformed:
to_beaker.add_item(transformed, from_beaker.name, id) to_beaker.add_item(transformed, from_beaker.name, id)
except Exception as e: except Exception as e:
error_beaker.add_item( for error_types, error_beaker_name in error_map.items():
{"exception": str(e), "exc_type": str(type(e))}, if isinstance(e, error_types):
from_beaker.name, error_beaker = self.beakers[error_beaker_name]
id, error_beaker.add_item(
) {"exception": str(e), "exc_type": str(type(e))},
from_beaker.name,
id,
)
break
else:
# no error handler, re-raise
raise