From 8e4358c1e9aefafa036a982f5e577003159d6dc7 Mon Sep 17 00:00:00 2001 From: James Turk Date: Sun, 7 May 2023 18:39:46 -0500 Subject: [PATCH] WIP graph recipe --- src/beakers/beaker.py | 41 --------------- src/beakers/filters.py | 19 +++++++ src/beakers/recipe.py | 112 +++++++++++++++++++++++++++++++++++------ src/example.py | 26 ++++++---- 4 files changed, 133 insertions(+), 65 deletions(-) delete mode 100644 src/beakers/beaker.py create mode 100644 src/beakers/filters.py diff --git a/src/beakers/beaker.py b/src/beakers/beaker.py deleted file mode 100644 index 108263e..0000000 --- a/src/beakers/beaker.py +++ /dev/null @@ -1,41 +0,0 @@ -import sqlite3 -import csv -import json -import hashlib - -from structlog import get_logger - -log = get_logger() - - -class Beaker: - def __init__(self, table_name: str): - self.table_name = table_name - - def __repr__(self): - return f"Beaker({self.table_name})" - - def __iter__(self): - self.cursor.execute(f"SELECT data FROM {self.table_name}") - data = self.cursor.fetchall() - for item in data: - yield item["id"], item["data"] - - def __len__(self): - self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") - return self.cursor.fetchone()[0] - - def connect_to_db(self, db) -> None: - self.db = db - self.cursor = self.db.cursor() - self.cursor.row_factory = sqlite3.Row - # create table if it doesn't exist - self.cursor.execute( - f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" - ) - - def add_item(self, item: dict, from_table=None, from_id=None) -> None: - self.cursor.execute( - f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) - ) - self.db.commit() diff --git a/src/beakers/filters.py b/src/beakers/filters.py new file mode 100644 index 0000000..641b0f1 --- /dev/null +++ b/src/beakers/filters.py @@ -0,0 +1,19 @@ +from .recipe import Beaker + + +class ConditionalFilter: + def __init__(self, condition: callable, true_path: Beaker, false_path: Beaker): + self.condition = condition + self.true_path = true_path + self.false_path = false_path + + def __repr__(self): + return ( + f"ConditionalFilter({self.condition}, {self.true_path}, {self.false_path})" + ) + + def __call__(self, item): + if self.condition(item): + self.true_path.add_item(item) + else: + self.false_path.add_item(item) diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index c9faa44..2eca7dd 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -2,17 +2,72 @@ import csv import json import sqlite3 import hashlib -from .beaker import Beaker +import asyncio +from dataclasses import dataclass from structlog import get_logger log = get_logger() +""" +Implementation thoughts -class Pour: - def __init__(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): - self.from_beaker = from_beaker - self.to_beaker = to_beaker - self.transform = transform +Originally beaker was the central object, but pretty quickly +most functionality was moved to recipe. Now beaker is just a +thin wrapper around a sqlite table. + +Recipe is the central object. It contains a list of pours, +which are the instructions for how to transform data from one +beaker to another. + +Recipe also contains a list of beakers, which are just sqlite +tables. Beakers are created by the recipe, and are passed to +pours as arguments. + +Beakers right now know they are stored in SQLite, but I think +that will be abstracted away. Beakers should be able to be +stored in any database, or on disk. + +Core functionality of a beaker: +- initialize (create table if it doesn't exist) +- add item with optional link to another item + - should this be links? (probably) +- iterate over items +- get item by id +- count items + +Ultimately, this is a graph problem. Beakers are nodes, and +pours are edges. The recipe is the graph. +""" + + +class Beaker: + def __init__(self, table_name: str, recipe): + self.table_name = table_name + self.recipe = recipe + + # create table if it doesn't exist + self.recipe.cursor.execute( + f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" + ) + + def __repr__(self): + return f"Beaker({self.table_name})" + + def items(self): + self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}") + data = self.recipe.cursor.fetchall() + for item in data: + yield item["id"], json.loads(item["data"]) + + def __len__(self): + self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") + return self.recipe.cursor.fetchone()[0] + + def add_item(self, item: dict, from_table=None, from_id=None) -> None: + self.recipe.cursor.execute( + f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) + ) + self.recipe.cursor.commit() def get_sha512(filename: str) -> str: @@ -20,10 +75,27 @@ def get_sha512(filename: str) -> str: return hashlib.sha512(file.read()).hexdigest() +@dataclass +class Pour: + from_beaker: str + to_beaker: str + transform: callable + + +@dataclass +class Split: + from_beaker: str + condition: callable + if_true: str + if_false: str + + class Recipe: def __init__(self, name: str): self.name = name + self.beakers = {} self.pours = [] + self.splits = [] self.db = sqlite3.connect("beakers.db") self.cursor = self.db.cursor() self.cursor.row_factory = sqlite3.Row @@ -62,7 +134,6 @@ class Recipe: self.pours.append(pour) def csv_to_beaker(self, filename: str, beaker: Beaker) -> None: - beaker.connect_to_db(self.db) lg = log.bind(beaker=beaker, filename=filename) # three cases: empty, match, mismatch # case 1: empty @@ -78,7 +149,7 @@ class Recipe: meta["sha512"] = get_sha512(filename) self.save_metadata(beaker.table_name, meta) else: - old_sha = self.get_metadata(beaker.table_name).get("sha512") + old_sha = self.get_metadata(beaker).get("sha512") new_sha = get_sha512(filename) if old_sha != new_sha: # case 3: mismatch @@ -89,23 +160,36 @@ class Recipe: lg.info("from_csv", case="match") return beaker - def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): + def declare_beaker(self, name: str, temp=True): + beaker = Beaker(name, self) + self.beakers[name] = beaker + if temp: + self.cursor.execute(f"DROP TABLE IF EXISTS {name}") + return beaker + + def add_pour(self, from_beaker: str, to_beaker: str, transform: callable): pour = Pour(from_beaker, to_beaker, transform) self.pours.append(pour) + def add_split( + self, from_beaker: str, condition: callable, if_true: str, if_false: str + ): + split = Split(from_beaker, condition, if_true, if_false) + self.splits.append(split) + return split + def run_linearly(self): log.info("recipe", recipe=self) - for pour in self.pours: - pour.from_beaker.connect_to_db(self.db) - pour.to_beaker.connect_to_db(self.db) + loop = asyncio.get_event_loop() + for pour in self.pours: log.info( "pour", from_beaker=pour.from_beaker, to_beaker=pour.to_beaker, to_pour=len(pour.from_beaker), ) - for id, item in pour.from_beaker: + for id, item in pour.from_beaker.items(): log.info("pour_item", id=id, item=item) - transformed = pour.transform(item) + transformed = loop.run_until_complete(pour.transform(item)) pour.to_beaker.add_item(transformed, pour.from_beaker.table_name, id) diff --git a/src/example.py b/src/example.py index d8aabf8..aed07df 100644 --- a/src/example.py +++ b/src/example.py @@ -1,12 +1,9 @@ -import csv -from beakers.recipe import Recipe -from beakers.beaker import Beaker - -urls = Beaker("urls") -responses = Beaker("responses") +import httpx +from beakers.recipe import Recipe, Beaker async def add_response(obj_with_url): + print(obj_with_url["url"]) url = obj_with_url["url"] response = await httpx.get(url) return { @@ -16,10 +13,19 @@ async def add_response(obj_with_url): } -agencies = Beaker("agencies") -responses = Beaker("responses") +# current thinking, beakers exist within a recipe recipe = Recipe("fetch urls") -recipe.csv_to_beaker("agencies.csv", agencies) -recipe.add_pour(agencies, responses, add_response) +recipe.declare_beaker("agencies") +recipe.declare_beaker("responses") +recipe.declare_beaker("good_urls", temp=True) +recipe.declare_beaker("missing_urls", temp=True) +recipe.csv_to_beaker("agencies.csv", "agencies") +recipe.add_split( + "agencies", + lambda x: x["url"].startswith("http"), + if_true="good_urls", + if_false="missing_urls", +) +recipe.add_pour("good_urls", "responses", add_response) recipe.run_linearly()