WIP graph recipe

This commit is contained in:
James Turk 2023-05-07 18:39:46 -05:00
parent 7381eb9f30
commit 8e4358c1e9
4 changed files with 133 additions and 65 deletions

View File

@ -1,41 +0,0 @@
import sqlite3
import csv
import json
import hashlib
from structlog import get_logger
log = get_logger()
class Beaker:
def __init__(self, table_name: str):
self.table_name = table_name
def __repr__(self):
return f"Beaker({self.table_name})"
def __iter__(self):
self.cursor.execute(f"SELECT data FROM {self.table_name}")
data = self.cursor.fetchall()
for item in data:
yield item["id"], item["data"]
def __len__(self):
self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
return self.cursor.fetchone()[0]
def connect_to_db(self, db) -> None:
self.db = db
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
# create table if it doesn't exist
self.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
self.cursor.execute(
f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),)
)
self.db.commit()

19
src/beakers/filters.py Normal file
View File

@ -0,0 +1,19 @@
from .recipe import Beaker
class ConditionalFilter:
def __init__(self, condition: callable, true_path: Beaker, false_path: Beaker):
self.condition = condition
self.true_path = true_path
self.false_path = false_path
def __repr__(self):
return (
f"ConditionalFilter({self.condition}, {self.true_path}, {self.false_path})"
)
def __call__(self, item):
if self.condition(item):
self.true_path.add_item(item)
else:
self.false_path.add_item(item)

View File

@ -2,17 +2,72 @@ import csv
import json
import sqlite3
import hashlib
from .beaker import Beaker
import asyncio
from dataclasses import dataclass
from structlog import get_logger
log = get_logger()
"""
Implementation thoughts
class Pour:
def __init__(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable):
self.from_beaker = from_beaker
self.to_beaker = to_beaker
self.transform = transform
Originally beaker was the central object, but pretty quickly
most functionality was moved to recipe. Now beaker is just a
thin wrapper around a sqlite table.
Recipe is the central object. It contains a list of pours,
which are the instructions for how to transform data from one
beaker to another.
Recipe also contains a list of beakers, which are just sqlite
tables. Beakers are created by the recipe, and are passed to
pours as arguments.
Beakers right now know they are stored in SQLite, but I think
that will be abstracted away. Beakers should be able to be
stored in any database, or on disk.
Core functionality of a beaker:
- initialize (create table if it doesn't exist)
- add item with optional link to another item
- should this be links? (probably)
- iterate over items
- get item by id
- count items
Ultimately, this is a graph problem. Beakers are nodes, and
pours are edges. The recipe is the graph.
"""
class Beaker:
def __init__(self, table_name: str, recipe):
self.table_name = table_name
self.recipe = recipe
# create table if it doesn't exist
self.recipe.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
def __repr__(self):
return f"Beaker({self.table_name})"
def items(self):
self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}")
data = self.recipe.cursor.fetchall()
for item in data:
yield item["id"], json.loads(item["data"])
def __len__(self):
self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
return self.recipe.cursor.fetchone()[0]
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
self.recipe.cursor.execute(
f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),)
)
self.recipe.cursor.commit()
def get_sha512(filename: str) -> str:
@ -20,10 +75,27 @@ def get_sha512(filename: str) -> str:
return hashlib.sha512(file.read()).hexdigest()
@dataclass
class Pour:
from_beaker: str
to_beaker: str
transform: callable
@dataclass
class Split:
from_beaker: str
condition: callable
if_true: str
if_false: str
class Recipe:
def __init__(self, name: str):
self.name = name
self.beakers = {}
self.pours = []
self.splits = []
self.db = sqlite3.connect("beakers.db")
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
@ -62,7 +134,6 @@ class Recipe:
self.pours.append(pour)
def csv_to_beaker(self, filename: str, beaker: Beaker) -> None:
beaker.connect_to_db(self.db)
lg = log.bind(beaker=beaker, filename=filename)
# three cases: empty, match, mismatch
# case 1: empty
@ -78,7 +149,7 @@ class Recipe:
meta["sha512"] = get_sha512(filename)
self.save_metadata(beaker.table_name, meta)
else:
old_sha = self.get_metadata(beaker.table_name).get("sha512")
old_sha = self.get_metadata(beaker).get("sha512")
new_sha = get_sha512(filename)
if old_sha != new_sha:
# case 3: mismatch
@ -89,23 +160,36 @@ class Recipe:
lg.info("from_csv", case="match")
return beaker
def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable):
def declare_beaker(self, name: str, temp=True):
beaker = Beaker(name, self)
self.beakers[name] = beaker
if temp:
self.cursor.execute(f"DROP TABLE IF EXISTS {name}")
return beaker
def add_pour(self, from_beaker: str, to_beaker: str, transform: callable):
pour = Pour(from_beaker, to_beaker, transform)
self.pours.append(pour)
def add_split(
self, from_beaker: str, condition: callable, if_true: str, if_false: str
):
split = Split(from_beaker, condition, if_true, if_false)
self.splits.append(split)
return split
def run_linearly(self):
log.info("recipe", recipe=self)
for pour in self.pours:
pour.from_beaker.connect_to_db(self.db)
pour.to_beaker.connect_to_db(self.db)
loop = asyncio.get_event_loop()
for pour in self.pours:
log.info(
"pour",
from_beaker=pour.from_beaker,
to_beaker=pour.to_beaker,
to_pour=len(pour.from_beaker),
)
for id, item in pour.from_beaker:
for id, item in pour.from_beaker.items():
log.info("pour_item", id=id, item=item)
transformed = pour.transform(item)
transformed = loop.run_until_complete(pour.transform(item))
pour.to_beaker.add_item(transformed, pour.from_beaker.table_name, id)

View File

@ -1,12 +1,9 @@
import csv
from beakers.recipe import Recipe
from beakers.beaker import Beaker
urls = Beaker("urls")
responses = Beaker("responses")
import httpx
from beakers.recipe import Recipe, Beaker
async def add_response(obj_with_url):
print(obj_with_url["url"])
url = obj_with_url["url"]
response = await httpx.get(url)
return {
@ -16,10 +13,19 @@ async def add_response(obj_with_url):
}
agencies = Beaker("agencies")
responses = Beaker("responses")
# current thinking, beakers exist within a recipe
recipe = Recipe("fetch urls")
recipe.csv_to_beaker("agencies.csv", agencies)
recipe.add_pour(agencies, responses, add_response)
recipe.declare_beaker("agencies")
recipe.declare_beaker("responses")
recipe.declare_beaker("good_urls", temp=True)
recipe.declare_beaker("missing_urls", temp=True)
recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.add_split(
"agencies",
lambda x: x["url"].startswith("http"),
if_true="good_urls",
if_false="missing_urls",
)
recipe.add_pour("good_urls", "responses", add_response)
recipe.run_linearly()