From efe572e1f6e2242b2de56fcbc0e2c6ad0c17532f Mon Sep 17 00:00:00 2001 From: James Turk Date: Sun, 7 May 2023 20:16:38 -0500 Subject: [PATCH] example runs --- src/beakers/beakers.py | 21 +++++++++------------ src/beakers/recipe.py | 20 ++++++++++++++------ 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/beakers/beakers.py b/src/beakers/beakers.py index 3fb0ee6..ab67360 100644 --- a/src/beakers/beakers.py +++ b/src/beakers/beakers.py @@ -43,27 +43,24 @@ class SqliteBeaker(Beaker): def __init__(self, name: str, recipe): super().__init__(name, recipe) # create table if it doesn't exist - cursor = self.recipe.db.cursor() - cursor.execute( + self.cursor = self.recipe.db.cursor() + self.cursor.row_factory = sqlite3.Row + self.cursor.execute( f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" ) def items(self): - cursor = self.recipe.db.cursor() - cursor.row_factory = sqlite3.Row - cursor.execute(f"SELECT id, data FROM {self.name}") - data = cursor.fetchall() + self.cursor.execute(f"SELECT id, data FROM {self.name}") + data = self.cursor.fetchall() for item in data: yield item["id"], json.loads(item["data"]) def __len__(self): - cursor = self.recipe.db.cursor() - cursor.execute(f"SELECT COUNT(*) FROM {self.name}") - return cursor.fetchone()[0] + self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}") + return self.cursor.fetchone()[0] def add_item(self, item: dict, from_table=None, from_id=None) -> None: - cursor = self.db.cursor() - cursor.execute( + self.cursor.execute( f"INSERT INTO {self.name} (data) VALUES (?)", (json.dumps(item),) ) - self.db.commit() + self.recipe.db.commit() diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 10c1933..fe51601 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -121,12 +121,16 @@ class Recipe: self.graph.add_edge( cond_name, if_true, - filter_func=lambda data, condition: data if condition else None, + transform=lambda data_cond_tup: data_cond_tup[0] + if data_cond_tup[1] + else None, ) self.graph.add_edge( cond_name, if_false, - filter_func=lambda data, condition: data if not condition else None, + transform=lambda data_cond_tup: data_cond_tup[0] + if not data_cond_tup[1] + else None, ) def get_metadata(self, table_name) -> dict: @@ -192,10 +196,10 @@ class Recipe: edges = self.graph.out_edges(node, data=True) for from_b, to_b, edge in edges: + from_beaker = self.beakers[from_b] + to_beaker = self.beakers[to_b] if "transform" in edge: func = edge["transform"] - from_beaker = self.beakers[from_b] - to_beaker = self.beakers[to_b] log.info( "transform", from_b=from_b, to_b=to_b, items=len(from_beaker) ) @@ -203,9 +207,13 @@ class Recipe: for id, item in from_beaker.items(): log.debug("transform item", id=id, item=item, async_=True) transformed = loop.run_until_complete(func(item)) - to_beaker.add_item(transformed, from_beaker.name, id) + if transformed: + to_beaker.add_item(transformed, from_beaker.name, id) else: for id, item in from_beaker.items(): log.debug("transform item", id=id, item=item, async_=False) transformed = func(item) - to_beaker.add_item(transformed, from_beaker.name, id) + if transformed: + to_beaker.add_item(transformed, from_beaker.name, id) + else: + raise Exception(f"unknown edge: {edge}")