diff --git a/src/beakers/beakers.py b/src/beakers/beakers.py new file mode 100644 index 0000000..3fb0ee6 --- /dev/null +++ b/src/beakers/beakers.py @@ -0,0 +1,69 @@ +import abc +import json +import sqlite3 + + +class Beaker(abc.ABC): + def __init__(self, name: str, recipe): + self.name = name + self.recipe = recipe + + def __repr__(self): + return f"Beaker({self.name})" + + @abc.abstractmethod + def items(self): + pass + + @abc.abstractmethod + def __len__(self): + pass + + @abc.abstractmethod + def add_item(self, item: dict, from_table=None, from_id=None) -> None: + pass + + +class TempBeaker(Beaker): + def __init__(self, name: str, recipe): + super().__init__(name, recipe) + self._items = [] + + def __len__(self): + return len(self._items) + + def add_item(self, item: dict, from_table=None, from_id=None) -> None: + self._items.append((from_id, item)) + + def items(self): + yield from self._items + + +class SqliteBeaker(Beaker): + def __init__(self, name: str, recipe): + super().__init__(name, recipe) + # create table if it doesn't exist + cursor = self.recipe.db.cursor() + cursor.execute( + f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" + ) + + def items(self): + cursor = self.recipe.db.cursor() + cursor.row_factory = sqlite3.Row + cursor.execute(f"SELECT id, data FROM {self.name}") + data = cursor.fetchall() + for item in data: + yield item["id"], json.loads(item["data"]) + + def __len__(self): + cursor = self.recipe.db.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {self.name}") + return cursor.fetchone()[0] + + def add_item(self, item: dict, from_table=None, from_id=None) -> None: + cursor = self.db.cursor() + cursor.execute( + f"INSERT INTO {self.name} (data) VALUES (?)", (json.dumps(item),) + ) + self.db.commit() diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index eb45b7c..10c1933 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -1,5 +1,6 @@ import csv import json +import inspect import sqlite3 import hashlib import asyncio @@ -79,9 +80,8 @@ class Recipe: self.graph = networkx.DiGraph() self.beakers = {} self.db = sqlite3.connect("beakers.db") - self.cursor = self.db.cursor() - self.cursor.row_factory = sqlite3.Row - self.cursor.execute( + cursor = self.db.cursor() + cursor.execute( "CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)" ) @@ -130,12 +130,13 @@ class Recipe: ) def get_metadata(self, table_name) -> dict: - self.cursor.execute( + cursor = self.db.cursor() + cursor.execute( "SELECT data FROM _metadata WHERE table_name = ?", (table_name,), ) try: - data = self.cursor.fetchone()["data"] + data = cursor.fetchone()["data"] log.debug("get_metadata", table_name=table_name, data=data) return json.loads(data) except TypeError: @@ -146,7 +147,8 @@ class Recipe: data_json = json.dumps(data) log.info("save_metadata", table_name=table_name, data=data_json) # sqlite upsert - self.cursor.execute( + cursor = self.db.cursor() + cursor.execute( "INSERT INTO _metadata (table_name, data) VALUES (?, ?) ON CONFLICT(table_name) DO UPDATE SET data = ?", (table_name, data_json, data_json), ) @@ -180,25 +182,30 @@ class Recipe: lg.info("from_csv", case="match") return beaker - def solve_dag(self): - """ - Solve the DAG by topological sort. - """ - for node in networkx.topological_sort(self.graph): - print(node) - - def run_linearly(self): + def run_push(self): log.info("recipe", recipe=self) loop = asyncio.get_event_loop() - for pour in self.pours: - log.info( - "pour", - from_beaker=pour.from_beaker, - to_beaker=pour.to_beaker, - to_pour=len(pour.from_beaker), - ) - for id, item in pour.from_beaker.items(): - log.info("pour_item", id=id, item=item) - transformed = loop.run_until_complete(pour.transform(item)) - pour.to_beaker.add_item(transformed, pour.from_beaker.name, id) + # go through each node in forward order, pushing data + for node in networkx.topological_sort(self.graph): + # get outbound edges + edges = self.graph.out_edges(node, data=True) + + for from_b, to_b, edge in edges: + if "transform" in edge: + func = edge["transform"] + from_beaker = self.beakers[from_b] + to_beaker = self.beakers[to_b] + log.info( + "transform", from_b=from_b, to_b=to_b, items=len(from_beaker) + ) + if inspect.iscoroutinefunction(func): + for id, item in from_beaker.items(): + log.debug("transform item", id=id, item=item, async_=True) + transformed = loop.run_until_complete(func(item)) + to_beaker.add_item(transformed, from_beaker.name, id) + else: + for id, item in from_beaker.items(): + log.debug("transform item", id=id, item=item, async_=False) + transformed = func(item) + to_beaker.add_item(transformed, from_beaker.name, id) diff --git a/src/example.py b/src/foiaghost_beaker.py similarity index 89% rename from src/example.py rename to src/foiaghost_beaker.py index ec4ce74..856d1ca 100644 --- a/src/example.py +++ b/src/foiaghost_beaker.py @@ -1,5 +1,6 @@ import httpx -from beakers.recipe import Recipe, Beaker +from beakers.beakers import Beaker +from beakers.recipe import Recipe async def add_response(obj_with_url): @@ -28,4 +29,4 @@ recipe.add_conditional( recipe.add_pour("good_urls", "responses", add_response) recipe.csv_to_beaker("agencies.csv", "agencies") -recipe.solve_dag() +recipe.run_push() diff --git a/src/sort_example.py b/src/sort_example.py new file mode 100644 index 0000000..7b8907d --- /dev/null +++ b/src/sort_example.py @@ -0,0 +1,53 @@ +from beakers.recipe import Recipe + +words = [ + "apple", + "banana", + "cactus", + "daikon", + "eel", + "fig", + "grape", + "hibiscus", + "ice", + "jelly", + "kingfish", + "lemon", + "mango", + "nougat", + "orange", + "pear", + "quail", + "raspberry", + "sturgeon", + "tamarind", +] + +recipe = Recipe("example01") +words_beaker = recipe.add_beaker("words", temp=True) +recipe.add_beaker("fruits") +recipe.add_beaker("other") +recipe.add_beaker("sentences") +recipe.add_conditional( + "words", + lambda x: x + in ( + "apple", + "banana", + "fig", + "grape", + "lemon", + "mango", + "orange", + "pear", + "raspberry", + ), + "fruits", + "other", +) +recipe.add_pour("fruits", "sentences", lambda x: f"I like to eat {x}") +recipe.add_pour("other", "sentences", lambda x: f"I'm not so sure about {x}") + +for word in words: + words_beaker.add_item(word) +recipe.run_push()