diff --git a/src/beakers/beakers.py b/src/beakers/beakers.py index 7f8033b..e9c8582 100644 --- a/src/beakers/beakers.py +++ b/src/beakers/beakers.py @@ -1,6 +1,7 @@ import abc import json import sqlite3 +import uuid class Beaker(abc.ABC): @@ -20,7 +21,7 @@ class Beaker(abc.ABC): pass @abc.abstractmethod - def add_item(self, item: dict, from_table=None, from_id=None) -> None: + def add_item(self, item: dict, id: str | None = None) -> None: pass @abc.abstractmethod @@ -36,8 +37,8 @@ class TempBeaker(Beaker): def __len__(self): return len(self._items) - def add_item(self, item: dict, from_table=None, from_id=None) -> None: - self._items.append((from_id, item)) + def add_item(self, item: dict, id=None) -> None: + self._items.append((id, item)) def items(self): yield from self._items @@ -53,23 +54,25 @@ class SqliteBeaker(Beaker): self.cursor = self.recipe.db.cursor() self.cursor.row_factory = sqlite3.Row self.cursor.execute( - f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" + f"CREATE TABLE IF NOT EXISTS {self.name} (uuid TEXT PRIMARY KEY, data JSON)" ) def items(self): - self.cursor.execute(f"SELECT id, data FROM {self.name}") + self.cursor.execute(f"SELECT uuid, data FROM {self.name}") data = self.cursor.fetchall() for item in data: - yield item["id"], json.loads(item["data"]) + yield item["uuid"], json.loads(item["data"]) def __len__(self): self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}") return self.cursor.fetchone()[0] - def add_item(self, item: dict, from_table=None, from_id=None) -> None: + def add_item(self, item: dict, id: str | None = None) -> None: + if id is None: + id = str(uuid.uuid1()) self.cursor.execute( - f"INSERT INTO {self.name} (data, from_table, from_id) VALUES (?, ?, ?)", - (json.dumps(item), from_table, from_id), + f"INSERT INTO {self.name} (uuid, data) VALUES (?, ?)", + (id, json.dumps(item)), ) self.recipe.db.commit() diff --git a/src/beakers/cli.py b/src/beakers/cli.py index a091c03..13c550b 100644 --- a/src/beakers/cli.py +++ b/src/beakers/cli.py @@ -51,6 +51,8 @@ def show(ctx: typer.Context): def run( ctx: typer.Context, input: Annotated[Optional[List[str]], typer.Option(...)] = None, + start: Optional[str] = typer.Option(None), + end: Optional[str] = typer.Option(None), ): has_data = any(ctx.obj.beakers.values()) if not has_data and not input: @@ -59,7 +61,7 @@ def run( for input_str in input: beaker, filename = input_str.split("=") ctx.obj.csv_to_beaker(filename, beaker) - ctx.obj.run_once() + ctx.obj.run_once(start, end) if __name__ == "__main__": diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 5af0098..59df5c8 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -152,12 +152,28 @@ class Recipe: for from_b, to_b, edge in self.graph.out_edges(node, data=True): print(f" {from_b} -> {to_b} ({edge})") - def run_once(self): + def run_once( + self, start_beaker: str | None = None, end_beaker: str | None = None + ) -> None: log.info("run_once", recipe=self) loop = asyncio.get_event_loop() + started = False if start_beaker else True + # go through each node in forward order, pushing data for node in networkx.topological_sort(self.graph): + # only process nodes between start and end + if not started: + if node == start_beaker: + started = True + log.info("partial run start", node=node) + else: + log.info("partial run skip", node=node, waiting_for=start_beaker) + continue + if end_beaker and node == end_beaker: + log.info("partial run end", node=node) + break + # get outbound edges edges = self.graph.out_edges(node, data=True) @@ -182,14 +198,17 @@ class Recipe: try: transformed = t_func(item) if transformed: - to_beaker.add_item(transformed, from_beaker.name, id) + to_beaker.add_item(transformed, id) except Exception as e: for error_types, error_beaker_name in error_map.items(): if isinstance(e, error_types): error_beaker = self.beakers[error_beaker_name] error_beaker.add_item( - {"exception": str(e), "exc_type": str(type(e))}, - from_beaker.name, + { + "item": item, + "exception": str(e), + "exc_type": str(type(e)), + }, id, ) break