This commit is contained in:
James Turk 2023-05-07 20:22:40 -05:00
parent efe572e1f6
commit ebfef0a2d7
2 changed files with 18 additions and 95 deletions

View File

@ -1,19 +0,0 @@
from .recipe import Beaker
class ConditionalFilter:
def __init__(self, condition: callable, true_path: Beaker, false_path: Beaker):
self.condition = condition
self.true_path = true_path
self.false_path = false_path
def __repr__(self):
return (
f"ConditionalFilter({self.condition}, {self.true_path}, {self.false_path})"
)
def __call__(self, item):
if self.condition(item):
self.true_path.add_item(item)
else:
self.false_path.add_item(item)

View File

@ -12,68 +12,12 @@ from .beakers import Beaker, SqliteBeaker, TempBeaker
log = get_logger()
"""
Implementation thoughts
Originally beaker was the central object, but pretty quickly
most functionality was moved to recipe. Now beaker is just a
thin wrapper around a sqlite table.
Recipe is the central object. It contains a list of pours,
which are the instructions for how to transform data from one
beaker to another.
Recipe also contains a list of beakers, which are just sqlite
tables. Beakers are created by the recipe, and are passed to
pours as arguments.
Beakers right now know they are stored in SQLite, but I think
that will be abstracted away. Beakers should be able to be
stored in any database, or on disk.
Core functionality of a beaker:
- initialize (create table if it doesn't exist)
- add item with optional link to another item
- should this be links? (probably)
- iterate over items
- get item by id
- count items
Ultimately, this is a graph problem. Beakers are nodes, and
pours are edges. The recipe is the graph.
"""
def get_sha512(filename: str) -> str:
with open(filename, "rb") as file:
return hashlib.sha512(file.read()).hexdigest()
@dataclass(eq=True, frozen=True)
class Pour:
"""
A pour is an edge in the graph.
It contains a function that transforms data
from one beaker to another.
"""
transform_func: callable
@dataclass(eq=True, frozen=True)
class Conditional:
"""
A conditional is a decision point in the graph.
condition_func should return values, and the
values will be used to determine which path
to take.
"""
condition_func: callable
class Recipe:
def __init__(self, name):
self.name = name
@ -96,7 +40,7 @@ class Recipe:
self.beakers[name] = SqliteBeaker(name, self)
return self.beakers[name]
def add_pour(
def add_transform(
self, from_beaker: str, to_beaker: str, transform_func: callable
) -> None:
self.graph.add_edge(from_beaker, to_beaker, transform=transform_func)
@ -111,7 +55,7 @@ class Recipe:
# first add a transform to evaluate the conditional
cond_name = f"cond-{from_beaker}-{condition_func.__name__}"
self.add_beaker(cond_name, temp=True)
self.add_pour(
self.add_transform(
from_beaker,
cond_name,
lambda data: (data, condition_func(data)),
@ -196,24 +140,22 @@ class Recipe:
edges = self.graph.out_edges(node, data=True)
for from_b, to_b, edge in edges:
if "transform" not in edge:
raise ValueError(f"unknown edge: {edge}")
from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b]
if "transform" in edge:
func = edge["transform"]
log.info(
"transform", from_b=from_b, to_b=to_b, items=len(from_beaker)
)
if inspect.iscoroutinefunction(func):
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=True)
transformed = loop.run_until_complete(func(item))
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
else:
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=False)
transformed = func(item)
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
func = edge["transform"]
log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker))
if inspect.iscoroutinefunction(func):
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=True)
transformed = loop.run_until_complete(func(item))
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
else:
raise Exception(f"unknown edge: {edge}")
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=False)
transformed = func(item)
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)