example runs

This commit is contained in:
James Turk 2023-05-07 20:16:38 -05:00
parent 3b93ece18a
commit efe572e1f6
2 changed files with 23 additions and 18 deletions

View File

@ -43,27 +43,24 @@ class SqliteBeaker(Beaker):
def __init__(self, name: str, recipe):
super().__init__(name, recipe)
# create table if it doesn't exist
cursor = self.recipe.db.cursor()
cursor.execute(
self.cursor = self.recipe.db.cursor()
self.cursor.row_factory = sqlite3.Row
self.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
def items(self):
cursor = self.recipe.db.cursor()
cursor.row_factory = sqlite3.Row
cursor.execute(f"SELECT id, data FROM {self.name}")
data = cursor.fetchall()
self.cursor.execute(f"SELECT id, data FROM {self.name}")
data = self.cursor.fetchall()
for item in data:
yield item["id"], json.loads(item["data"])
def __len__(self):
cursor = self.recipe.db.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {self.name}")
return cursor.fetchone()[0]
self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}")
return self.cursor.fetchone()[0]
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
cursor = self.db.cursor()
cursor.execute(
self.cursor.execute(
f"INSERT INTO {self.name} (data) VALUES (?)", (json.dumps(item),)
)
self.db.commit()
self.recipe.db.commit()

View File

@ -121,12 +121,16 @@ class Recipe:
self.graph.add_edge(
cond_name,
if_true,
filter_func=lambda data, condition: data if condition else None,
transform=lambda data_cond_tup: data_cond_tup[0]
if data_cond_tup[1]
else None,
)
self.graph.add_edge(
cond_name,
if_false,
filter_func=lambda data, condition: data if not condition else None,
transform=lambda data_cond_tup: data_cond_tup[0]
if not data_cond_tup[1]
else None,
)
def get_metadata(self, table_name) -> dict:
@ -192,10 +196,10 @@ class Recipe:
edges = self.graph.out_edges(node, data=True)
for from_b, to_b, edge in edges:
from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b]
if "transform" in edge:
func = edge["transform"]
from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b]
log.info(
"transform", from_b=from_b, to_b=to_b, items=len(from_beaker)
)
@ -203,9 +207,13 @@ class Recipe:
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=True)
transformed = loop.run_until_complete(func(item))
to_beaker.add_item(transformed, from_beaker.name, id)
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
else:
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=False)
transformed = func(item)
to_beaker.add_item(transformed, from_beaker.name, id)
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
else:
raise Exception(f"unknown edge: {edge}")