getting more real

This commit is contained in:
James Turk 2023-05-07 22:55:02 -05:00
parent ebfef0a2d7
commit 53c32315e7
8 changed files with 126 additions and 71 deletions

0
examples/__init__.py Normal file
View File

View File

@ -15,9 +15,10 @@ async def add_response(obj_with_url):
# current thinking, beakers exist within a recipe # current thinking, beakers exist within a recipe
recipe = Recipe("fetch urls") recipe = Recipe("fetch urls", "url_example.db")
recipe.add_beaker("agencies") recipe.add_beaker("agencies")
recipe.add_beaker("responses") recipe.add_beaker("responses")
recipe.add_beaker("bad_requests")
recipe.add_beaker("good_urls", temp=True) recipe.add_beaker("good_urls", temp=True)
recipe.add_beaker("missing_urls", temp=True) recipe.add_beaker("missing_urls", temp=True)
recipe.add_conditional( recipe.add_conditional(
@ -26,7 +27,9 @@ recipe.add_conditional(
if_true="good_urls", if_true="good_urls",
if_false="missing_urls", if_false="missing_urls",
) )
recipe.add_pour("good_urls", "responses", add_response) recipe.add_transform(
"good_urls", "responses", add_response, error_beaker="bad_requests"
)
recipe.csv_to_beaker("agencies.csv", "agencies") recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.run_push() recipe.run_once()

28
examples/fruits.py Normal file
View File

@ -0,0 +1,28 @@
from beakers.recipe import Recipe
recipe = Recipe("example01")
words_beaker = recipe.add_beaker("words", temp=True)
recipe.add_beaker("fruits")
recipe.add_beaker("other")
recipe.add_beaker("sentences")
recipe.add_conditional(
"words",
lambda x: x["word"]
in (
"apple",
"banana",
"fig",
"grape",
"lemon",
"mango",
"orange",
"pear",
"raspberry",
),
"fruits",
"other",
)
recipe.add_transform("fruits", "sentences", lambda x: f"I like to eat {x['word']}")
recipe.add_transform(
"other", "sentences", lambda x: f"I'm not so sure about {x['word']}"
)

View File

@ -1,5 +1,5 @@
[tool.poetry] [tool.poetry]
name = "foiaghost" name = "beakers"
version = "0.1.0" version = "0.1.0"
description = "" description = ""
authors = ["James Turk <dev@jamesturk.net>"] authors = ["James Turk <dev@jamesturk.net>"]

35
src/beakers/cli.py Normal file
View File

@ -0,0 +1,35 @@
import importlib
import typer
import sys
from typing_extensions import Annotated
app = typer.Typer()
def _load_recipe(dotted_path: str):
sys.path.append(".")
path, name = dotted_path.rsplit(".", 1)
mod = importlib.import_module(path)
return getattr(mod, name)
@app.command()
def reset(recipe: Annotated[str, typer.Option(...)]):
mod = _load_recipe(recipe)
mod.reset()
@app.command()
def show(recipe: Annotated[str, typer.Option(...)]):
mod = _load_recipe(recipe)
mod.show()
@app.command()
def run(recipe: Annotated[str, typer.Option(...)]):
mod = _load_recipe(recipe)
mod.run_once()
if __name__ == "__main__":
app()

View File

@ -1,5 +1,6 @@
import csv import csv
import json import json
import typer
import inspect import inspect
import sqlite3 import sqlite3
import hashlib import hashlib
@ -19,11 +20,11 @@ def get_sha512(filename: str) -> str:
class Recipe: class Recipe:
def __init__(self, name): def __init__(self, name, db_name="beakers.db"):
self.name = name self.name = name
self.graph = networkx.DiGraph() self.graph = networkx.DiGraph()
self.beakers = {} self.beakers = {}
self.db = sqlite3.connect("beakers.db") self.db = sqlite3.connect(db_name)
cursor = self.db.cursor() cursor = self.db.cursor()
cursor.execute( cursor.execute(
"CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)" "CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)"
@ -41,9 +42,15 @@ class Recipe:
return self.beakers[name] return self.beakers[name]
def add_transform( def add_transform(
self, from_beaker: str, to_beaker: str, transform_func: callable self,
from_beaker: str,
to_beaker: str,
transform_func: callable,
error_beaker: str | None = None,
) -> None: ) -> None:
self.graph.add_edge(from_beaker, to_beaker, transform=transform_func) self.graph.add_edge(
from_beaker, to_beaker, transform=transform_func, error_beaker=error_beaker
)
def add_conditional( def add_conditional(
self, self,
@ -130,8 +137,23 @@ class Recipe:
lg.info("from_csv", case="match") lg.info("from_csv", case="match")
return beaker return beaker
def run_push(self): def show(self):
log.info("recipe", recipe=self) for node in networkx.topological_sort(self.graph):
beaker = self.beakers[node]
temp = isinstance(beaker, TempBeaker)
if temp:
typer.secho(node, fg=typer.colors.CYAN)
else:
lb = len(beaker)
typer.secho(
f"{node} ({lb})",
fg=typer.colors.GREEN if lb else typer.colors.YELLOW,
)
for from_b, to_b, edge in self.graph.out_edges(node, data=True):
print(f" {from_b} -> {to_b} ({edge})")
def run_once(self):
log.info("run_once", recipe=self)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# go through each node in forward order, pushing data # go through each node in forward order, pushing data
@ -143,19 +165,39 @@ class Recipe:
if "transform" not in edge: if "transform" not in edge:
raise ValueError(f"unknown edge: {edge}") raise ValueError(f"unknown edge: {edge}")
if error_b := edge.get("error_beaker"):
error_beaker = self.beakers[error_b]
else:
error_beaker = None
from_beaker = self.beakers[from_b] from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b] to_beaker = self.beakers[to_b]
func = edge["transform"] func = edge["transform"]
log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker)) log.info("transform", from_b=from_b, to_b=to_b, items=len(from_beaker))
if inspect.iscoroutinefunction(func): if inspect.iscoroutinefunction(func):
for id, item in from_beaker.items(): t_func = lambda x: loop.run_until_complete(func(x))
log.debug("transform item", id=id, item=item, async_=True)
transformed = loop.run_until_complete(func(item))
if transformed:
to_beaker.add_item(transformed, from_beaker.name, id)
else: else:
t_func = func
# for id, item in from_beaker.items():
# try:
# transformed = loop.run_until_complete(func(item))
# if transformed:
# to_beaker.add_item(transformed, from_beaker.name, id)
# except Exception as e:
# error_beaker.add_item(
# {"exception": str(e), "exc_type": type(e)}
# from_beaker.name, id
# )
# else:
for id, item in from_beaker.items(): for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=False) try:
transformed = func(item) transformed = t_func(item)
if transformed: if transformed:
to_beaker.add_item(transformed, from_beaker.name, id) to_beaker.add_item(transformed, from_beaker.name, id)
except Exception as e:
error_beaker.add_item(
{"exception": str(e), "exc_type": str(type(e))},
from_beaker.name,
id,
)

View File

@ -1,53 +0,0 @@
from beakers.recipe import Recipe
words = [
"apple",
"banana",
"cactus",
"daikon",
"eel",
"fig",
"grape",
"hibiscus",
"ice",
"jelly",
"kingfish",
"lemon",
"mango",
"nougat",
"orange",
"pear",
"quail",
"raspberry",
"sturgeon",
"tamarind",
]
recipe = Recipe("example01")
words_beaker = recipe.add_beaker("words", temp=True)
recipe.add_beaker("fruits")
recipe.add_beaker("other")
recipe.add_beaker("sentences")
recipe.add_conditional(
"words",
lambda x: x
in (
"apple",
"banana",
"fig",
"grape",
"lemon",
"mango",
"orange",
"pear",
"raspberry",
),
"fruits",
"other",
)
recipe.add_pour("fruits", "sentences", lambda x: f"I like to eat {x}")
recipe.add_pour("other", "sentences", lambda x: f"I'm not so sure about {x}")
for word in words:
words_beaker.add_item(word)
recipe.run_push()