diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/foiaghost_beaker.py b/examples/foiaghost.py similarity index 78% rename from src/foiaghost_beaker.py rename to examples/foiaghost.py index 856d1ca..405beaf 100644 --- a/src/foiaghost_beaker.py +++ b/examples/foiaghost.py @@ -15,9 +15,10 @@ async def add_response(obj_with_url): # current thinking, beakers exist within a recipe -recipe = Recipe("fetch urls") +recipe = Recipe("fetch urls", "url_example.db") recipe.add_beaker("agencies") recipe.add_beaker("responses") +recipe.add_beaker("bad_requests") recipe.add_beaker("good_urls", temp=True) recipe.add_beaker("missing_urls", temp=True) recipe.add_conditional( @@ -26,7 +27,9 @@ recipe.add_conditional( if_true="good_urls", if_false="missing_urls", ) -recipe.add_pour("good_urls", "responses", add_response) +recipe.add_transform( + "good_urls", "responses", add_response, error_beaker="bad_requests" +) recipe.csv_to_beaker("agencies.csv", "agencies") -recipe.run_push() +recipe.run_once() diff --git a/examples/fruits.py b/examples/fruits.py new file mode 100644 index 0000000..f632ca5 --- /dev/null +++ b/examples/fruits.py @@ -0,0 +1,28 @@ +from beakers.recipe import Recipe + +recipe = Recipe("example01") +words_beaker = recipe.add_beaker("words", temp=True) +recipe.add_beaker("fruits") +recipe.add_beaker("other") +recipe.add_beaker("sentences") +recipe.add_conditional( + "words", + lambda x: x["word"] + in ( + "apple", + "banana", + "fig", + "grape", + "lemon", + "mango", + "orange", + "pear", + "raspberry", + ), + "fruits", + "other", +) +recipe.add_transform("fruits", "sentences", lambda x: f"I like to eat {x['word']}") +recipe.add_transform( + "other", "sentences", lambda x: f"I'm not so sure about {x['word']}" +) diff --git a/src/foiaghost.py b/foiaghost.py similarity index 100% rename from src/foiaghost.py rename to foiaghost.py diff --git a/pyproject.toml b/pyproject.toml index a16f77a..109f7d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [tool.poetry] -name = "foiaghost" +name = "beakers" version = "0.1.0" description = "" authors = ["James Turk "] diff --git a/src/beakers/cli.py b/src/beakers/cli.py new file mode 100644 index 0000000..94ab3f4 --- /dev/null +++ b/src/beakers/cli.py @@ -0,0 +1,35 @@ +import importlib +import typer +import sys +from typing_extensions import Annotated + +app = typer.Typer() + + +def _load_recipe(dotted_path: str): + sys.path.append(".") + path, name = dotted_path.rsplit(".", 1) + mod = importlib.import_module(path) + return getattr(mod, name) + + +@app.command() +def reset(recipe: Annotated[str, typer.Option(...)]): + mod = _load_recipe(recipe) + mod.reset() + + +@app.command() +def show(recipe: Annotated[str, typer.Option(...)]): + mod = _load_recipe(recipe) + mod.show() + + +@app.command() +def run(recipe: Annotated[str, typer.Option(...)]): + mod = _load_recipe(recipe) + mod.run_once() + + +if __name__ == "__main__": + app() diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index b245a52..c927d10 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -1,5 +1,6 @@ import csv import json +import typer import inspect import sqlite3 import hashlib @@ -19,11 +20,11 @@ def get_sha512(filename: str) -> str: class Recipe: - def __init__(self, name): + def __init__(self, name, db_name="beakers.db"): self.name = name self.graph = networkx.DiGraph() self.beakers = {} - self.db = sqlite3.connect("beakers.db") + self.db = sqlite3.connect(db_name) cursor = self.db.cursor() cursor.execute( "CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)" @@ -41,9 +42,15 @@ class Recipe: return self.beakers[name] def add_transform( - self, from_beaker: str, to_beaker: str, transform_func: callable + self, + from_beaker: str, + to_beaker: str, + transform_func: callable, + error_beaker: str | None = None, ) -> None: - self.graph.add_edge(from_beaker, to_beaker, transform=transform_func) + self.graph.add_edge( + from_beaker, to_beaker, transform=transform_func, error_beaker=error_beaker + ) def add_conditional( self, @@ -130,8 +137,23 @@ class Recipe: lg.info("from_csv", case="match") return beaker - def run_push(self): - log.info("recipe", recipe=self) + def show(self): + for node in networkx.topological_sort(self.graph): + beaker = self.beakers[node] + temp = isinstance(beaker, TempBeaker) + if temp: + typer.secho(node, fg=typer.colors.CYAN) + else: + lb = len(beaker) + typer.secho( + f"{node} ({lb})", + fg=typer.colors.GREEN if lb else typer.colors.YELLOW, + ) + for from_b, to_b, edge in self.graph.out_edges(node, data=True): + print(f" {from_b} -> {to_b} ({edge})") + + def run_once(self): + log.info("run_once", recipe=self) loop = asyncio.get_event_loop() # go through each node in forward order, pushing data @@ -143,19 +165,39 @@ class Recipe: if "transform" not in edge: raise ValueError(f"unknown edge: {edge}") + if error_b := edge.get("error_beaker"): + error_beaker = self.beakers[error_b] + else: + error_beaker = None + from_beaker = self.beakers[from_b] to_beaker = self.beakers[to_b] func = edge["transform"] log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) if inspect.iscoroutinefunction(func): - for id, item in from_beaker.items(): - log.debug("transform item", id=id, item=item, async_=True) - transformed = loop.run_until_complete(func(item)) - if transformed: - to_beaker.add_item(transformed, from_beaker.name, id) + t_func = lambda x: loop.run_until_complete(func(x)) else: - for id, item in from_beaker.items(): - log.debug("transform item", id=id, item=item, async_=False) - transformed = func(item) + t_func = func + + # for id, item in from_beaker.items(): + # try: + # transformed = loop.run_until_complete(func(item)) + # if transformed: + # to_beaker.add_item(transformed, from_beaker.name, id) + # except Exception as e: + # error_beaker.add_item( + # {"exception": str(e), "exc_type": type(e)} + # from_beaker.name, id + # ) + # else: + for id, item in from_beaker.items(): + try: + transformed = t_func(item) if transformed: to_beaker.add_item(transformed, from_beaker.name, id) + except Exception as e: + error_beaker.add_item( + {"exception": str(e), "exc_type": str(type(e))}, + from_beaker.name, + id, + ) diff --git a/src/sort_example.py b/src/sort_example.py deleted file mode 100644 index 7b8907d..0000000 --- a/src/sort_example.py +++ /dev/null @@ -1,53 +0,0 @@ -from beakers.recipe import Recipe - -words = [ - "apple", - "banana", - "cactus", - "daikon", - "eel", - "fig", - "grape", - "hibiscus", - "ice", - "jelly", - "kingfish", - "lemon", - "mango", - "nougat", - "orange", - "pear", - "quail", - "raspberry", - "sturgeon", - "tamarind", -] - -recipe = Recipe("example01") -words_beaker = recipe.add_beaker("words", temp=True) -recipe.add_beaker("fruits") -recipe.add_beaker("other") -recipe.add_beaker("sentences") -recipe.add_conditional( - "words", - lambda x: x - in ( - "apple", - "banana", - "fig", - "grape", - "lemon", - "mango", - "orange", - "pear", - "raspberry", - ), - "fruits", - "other", -) -recipe.add_pour("fruits", "sentences", lambda x: f"I like to eat {x}") -recipe.add_pour("other", "sentences", lambda x: f"I'm not so sure about {x}") - -for word in words: - words_beaker.add_item(word) -recipe.run_push()