switch to uuids

This commit is contained in:
James Turk 2023-05-08 01:29:48 -05:00
parent c9cb7f9bd2
commit 3366cb2611
3 changed files with 38 additions and 14 deletions

View File

@ -1,6 +1,7 @@
import abc import abc
import json import json
import sqlite3 import sqlite3
import uuid
class Beaker(abc.ABC): class Beaker(abc.ABC):
@ -20,7 +21,7 @@ class Beaker(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def add_item(self, item: dict, from_table=None, from_id=None) -> None: def add_item(self, item: dict, id: str | None = None) -> None:
pass pass
@abc.abstractmethod @abc.abstractmethod
@ -36,8 +37,8 @@ class TempBeaker(Beaker):
def __len__(self): def __len__(self):
return len(self._items) return len(self._items)
def add_item(self, item: dict, from_table=None, from_id=None) -> None: def add_item(self, item: dict, id=None) -> None:
self._items.append((from_id, item)) self._items.append((id, item))
def items(self): def items(self):
yield from self._items yield from self._items
@ -53,23 +54,25 @@ class SqliteBeaker(Beaker):
self.cursor = self.recipe.db.cursor() self.cursor = self.recipe.db.cursor()
self.cursor.row_factory = sqlite3.Row self.cursor.row_factory = sqlite3.Row
self.cursor.execute( self.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" f"CREATE TABLE IF NOT EXISTS {self.name} (uuid TEXT PRIMARY KEY, data JSON)"
) )
def items(self): def items(self):
self.cursor.execute(f"SELECT id, data FROM {self.name}") self.cursor.execute(f"SELECT uuid, data FROM {self.name}")
data = self.cursor.fetchall() data = self.cursor.fetchall()
for item in data: for item in data:
yield item["id"], json.loads(item["data"]) yield item["uuid"], json.loads(item["data"])
def __len__(self): def __len__(self):
self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}") self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}")
return self.cursor.fetchone()[0] return self.cursor.fetchone()[0]
def add_item(self, item: dict, from_table=None, from_id=None) -> None: def add_item(self, item: dict, id: str | None = None) -> None:
if id is None:
id = str(uuid.uuid1())
self.cursor.execute( self.cursor.execute(
f"INSERT INTO {self.name} (data, from_table, from_id) VALUES (?, ?, ?)", f"INSERT INTO {self.name} (uuid, data) VALUES (?, ?)",
(json.dumps(item), from_table, from_id), (id, json.dumps(item)),
) )
self.recipe.db.commit() self.recipe.db.commit()

View File

@ -51,6 +51,8 @@ def show(ctx: typer.Context):
def run( def run(
ctx: typer.Context, ctx: typer.Context,
input: Annotated[Optional[List[str]], typer.Option(...)] = None, input: Annotated[Optional[List[str]], typer.Option(...)] = None,
start: Optional[str] = typer.Option(None),
end: Optional[str] = typer.Option(None),
): ):
has_data = any(ctx.obj.beakers.values()) has_data = any(ctx.obj.beakers.values())
if not has_data and not input: if not has_data and not input:
@ -59,7 +61,7 @@ def run(
for input_str in input: for input_str in input:
beaker, filename = input_str.split("=") beaker, filename = input_str.split("=")
ctx.obj.csv_to_beaker(filename, beaker) ctx.obj.csv_to_beaker(filename, beaker)
ctx.obj.run_once() ctx.obj.run_once(start, end)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -152,12 +152,28 @@ class Recipe:
for from_b, to_b, edge in self.graph.out_edges(node, data=True): for from_b, to_b, edge in self.graph.out_edges(node, data=True):
print(f" {from_b} -> {to_b} ({edge})") print(f" {from_b} -> {to_b} ({edge})")
def run_once(self): def run_once(
self, start_beaker: str | None = None, end_beaker: str | None = None
) -> None:
log.info("run_once", recipe=self) log.info("run_once", recipe=self)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
started = False if start_beaker else True
# go through each node in forward order, pushing data # go through each node in forward order, pushing data
for node in networkx.topological_sort(self.graph): for node in networkx.topological_sort(self.graph):
# only process nodes between start and end
if not started:
if node == start_beaker:
started = True
log.info("partial run start", node=node)
else:
log.info("partial run skip", node=node, waiting_for=start_beaker)
continue
if end_beaker and node == end_beaker:
log.info("partial run end", node=node)
break
# get outbound edges # get outbound edges
edges = self.graph.out_edges(node, data=True) edges = self.graph.out_edges(node, data=True)
@ -182,14 +198,17 @@ class Recipe:
try: try:
transformed = t_func(item) transformed = t_func(item)
if transformed: if transformed:
to_beaker.add_item(transformed, from_beaker.name, id) to_beaker.add_item(transformed, id)
except Exception as e: except Exception as e:
for error_types, error_beaker_name in error_map.items(): for error_types, error_beaker_name in error_map.items():
if isinstance(e, error_types): if isinstance(e, error_types):
error_beaker = self.beakers[error_beaker_name] error_beaker = self.beakers[error_beaker_name]
error_beaker.add_item( error_beaker.add_item(
{"exception": str(e), "exc_type": str(type(e))}, {
from_beaker.name, "item": item,
"exception": str(e),
"exc_type": str(type(e)),
},
id, id,
) )
break break