From ebfef0a2d7d6dea83a7d5ba12be544880a7c3e4e Mon Sep 17 00:00:00 2001 From: James Turk Date: Sun, 7 May 2023 20:22:40 -0500 Subject: [PATCH] cleanup --- src/beakers/filters.py | 19 --------- src/beakers/recipe.py | 94 ++++++++---------------------------------- 2 files changed, 18 insertions(+), 95 deletions(-) delete mode 100644 src/beakers/filters.py diff --git a/src/beakers/filters.py b/src/beakers/filters.py deleted file mode 100644 index 641b0f1..0000000 --- a/src/beakers/filters.py +++ /dev/null @@ -1,19 +0,0 @@ -from .recipe import Beaker - - -class ConditionalFilter: - def __init__(self, condition: callable, true_path: Beaker, false_path: Beaker): - self.condition = condition - self.true_path = true_path - self.false_path = false_path - - def __repr__(self): - return ( - f"ConditionalFilter({self.condition}, {self.true_path}, {self.false_path})" - ) - - def __call__(self, item): - if self.condition(item): - self.true_path.add_item(item) - else: - self.false_path.add_item(item) diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index fe51601..b245a52 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -12,68 +12,12 @@ from .beakers import Beaker, SqliteBeaker, TempBeaker log = get_logger() -""" -Implementation thoughts - -Originally beaker was the central object, but pretty quickly -most functionality was moved to recipe. Now beaker is just a -thin wrapper around a sqlite table. - -Recipe is the central object. It contains a list of pours, -which are the instructions for how to transform data from one -beaker to another. - -Recipe also contains a list of beakers, which are just sqlite -tables. Beakers are created by the recipe, and are passed to -pours as arguments. - -Beakers right now know they are stored in SQLite, but I think -that will be abstracted away. Beakers should be able to be -stored in any database, or on disk. - -Core functionality of a beaker: -- initialize (create table if it doesn't exist) -- add item with optional link to another item - - should this be links? (probably) -- iterate over items -- get item by id -- count items - -Ultimately, this is a graph problem. Beakers are nodes, and -pours are edges. The recipe is the graph. -""" - def get_sha512(filename: str) -> str: with open(filename, "rb") as file: return hashlib.sha512(file.read()).hexdigest() -@dataclass(eq=True, frozen=True) -class Pour: - """ - A pour is an edge in the graph. - - It contains a function that transforms data - from one beaker to another. - """ - - transform_func: callable - - -@dataclass(eq=True, frozen=True) -class Conditional: - """ - A conditional is a decision point in the graph. - - condition_func should return values, and the - values will be used to determine which path - to take. - """ - - condition_func: callable - - class Recipe: def __init__(self, name): self.name = name @@ -96,7 +40,7 @@ class Recipe: self.beakers[name] = SqliteBeaker(name, self) return self.beakers[name] - def add_pour( + def add_transform( self, from_beaker: str, to_beaker: str, transform_func: callable ) -> None: self.graph.add_edge(from_beaker, to_beaker, transform=transform_func) @@ -111,7 +55,7 @@ class Recipe: # first add a transform to evaluate the conditional cond_name = f"cond-{from_beaker}-{condition_func.__name__}" self.add_beaker(cond_name, temp=True) - self.add_pour( + self.add_transform( from_beaker, cond_name, lambda data: (data, condition_func(data)), @@ -196,24 +140,22 @@ class Recipe: edges = self.graph.out_edges(node, data=True) for from_b, to_b, edge in edges: + if "transform" not in edge: + raise ValueError(f"unknown edge: {edge}") + from_beaker = self.beakers[from_b] to_beaker = self.beakers[to_b] - if "transform" in edge: - func = edge["transform"] - log.info( - "transform", from_b=from_b, to_b=to_b, items=len(from_beaker) - ) - if inspect.iscoroutinefunction(func): - for id, item in from_beaker.items(): - log.debug("transform item", id=id, item=item, async_=True) - transformed = loop.run_until_complete(func(item)) - if transformed: - to_beaker.add_item(transformed, from_beaker.name, id) - else: - for id, item in from_beaker.items(): - log.debug("transform item", id=id, item=item, async_=False) - transformed = func(item) - if transformed: - to_beaker.add_item(transformed, from_beaker.name, id) + func = edge["transform"] + log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) + if inspect.iscoroutinefunction(func): + for id, item in from_beaker.items(): + log.debug("transform item", id=id, item=item, async_=True) + transformed = loop.run_until_complete(func(item)) + if transformed: + to_beaker.add_item(transformed, from_beaker.name, id) else: - raise Exception(f"unknown edge: {edge}") + for id, item in from_beaker.items(): + log.debug("transform item", id=id, item=item, async_=False) + transformed = func(item) + if transformed: + to_beaker.add_item(transformed, from_beaker.name, id)