From 7381eb9f3061c589d09ec364e16abf66440bcb6b Mon Sep 17 00:00:00 2001 From: James Turk Date: Thu, 27 Apr 2023 14:51:25 -0500 Subject: [PATCH] big recipe refactor --- src/beakers/beaker.py | 77 +++++-------------------------------------- src/beakers/recipe.py | 76 ++++++++++++++++++++++++++++++++++++++++-- src/example.py | 3 +- 3 files changed, 83 insertions(+), 73 deletions(-) diff --git a/src/beakers/beaker.py b/src/beakers/beaker.py index 24ba27e..108263e 100644 --- a/src/beakers/beaker.py +++ b/src/beakers/beaker.py @@ -9,94 +9,33 @@ log = get_logger() class Beaker: - def __init__(self, table_name: str, db=None): + def __init__(self, table_name: str): self.table_name = table_name - self.db = db if db else sqlite3.connect(f"beaker.db") - self.cursor = self.db.cursor() - self.cursor.row_factory = sqlite3.Row - # create table if it doesn't exist - self._init_metadata() - self.cursor.execute( - f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" - ) - def __str__(self): + def __repr__(self): return f"Beaker({self.table_name})" - __repr__ = __str__ - def __iter__(self): self.cursor.execute(f"SELECT data FROM {self.table_name}") data = self.cursor.fetchall() for item in data: - print(item) yield item["id"], item["data"] def __len__(self): self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") return self.cursor.fetchone()[0] - def _init_metadata(self): + def connect_to_db(self, db) -> None: + self.db = db + self.cursor = self.db.cursor() + self.cursor.row_factory = sqlite3.Row + # create table if it doesn't exist self.cursor.execute( - "CREATE TABLE IF NOT EXISTS _metadata (id INTEGER PRIMARY KEY, table_name TEXT, data JSON)" + f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" ) - self.cursor.execute( - "INSERT INTO _metadata (table_name, data) VALUES (?, ?)", - (self.table_name, json.dumps({})), - ) - self.db.commit() - - def get_metadata(self) -> dict: - self.cursor.execute( - "SELECT data FROM _metadata WHERE table_name = ?", - (self.table_name,), - ) - data = self.cursor.fetchone()["data"] - return json.loads(data) - - def save_metadata(self, data: dict) -> None: - self.cursor.execute( - "UPDATE _metadata SET data = ? WHERE table_name = ?", - (json.dumps(data), self.table_name), - ) - self.db.commit() def add_item(self, item: dict, from_table=None, from_id=None) -> None: self.cursor.execute( f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) ) self.db.commit() - - @classmethod - def from_csv(cls, table_name, filename: str) -> None: - beaker = cls(table_name) - lg = log.bind(table_name=table_name, filename=filename) - # three cases: empty, match, mismatch - # case 1: empty - if len(beaker) == 0: - with open(filename, "r") as file: - reader = csv.DictReader(file) - added = 0 - for row in reader: - beaker.add_item(row) - added += 1 - lg.info("from_csv", case="empty", added=added) - meta = beaker.get_metadata() - meta["sha512"] = get_sha512(filename) - beaker.save_metadata(meta) - else: - old_sha = beaker.get_metadata().get("sha512") - new_sha = get_sha512(filename) - if old_sha != new_sha: - # case 3: mismatch - lg.info("from_csv", case="mismatch", old_sha=old_sha, new_sha=new_sha) - raise Exception("sha512 mismatch") - else: - # case 2: match - lg.info("from_csv", case="match") - return beaker - - -def get_sha512(filename: str) -> str: - with open(filename, "rb") as file: - return hashlib.sha512(file.read()).hexdigest() diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 67367a2..c9faa44 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -1,3 +1,7 @@ +import csv +import json +import sqlite3 +import hashlib from .beaker import Beaker from structlog import get_logger @@ -11,15 +15,79 @@ class Pour: self.transform = transform +def get_sha512(filename: str) -> str: + with open(filename, "rb") as file: + return hashlib.sha512(file.read()).hexdigest() + + class Recipe: def __init__(self, name: str): self.name = name self.pours = [] + self.db = sqlite3.connect("beakers.db") + self.cursor = self.db.cursor() + self.cursor.row_factory = sqlite3.Row + self.cursor.execute( + "CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)" + ) - def __str__(self) -> str: + def __repr__(self) -> str: return f"Recipe({self.name})" - __repr__ = __str__ + def get_metadata(self, table_name) -> dict: + self.cursor.execute( + "SELECT data FROM _metadata WHERE table_name = ?", + (table_name,), + ) + try: + data = self.cursor.fetchone()["data"] + log.debug("get_metadata", table_name=table_name, data=data) + return json.loads(data) + except TypeError: + log.debug("get_metadata", table_name=table_name, data={}) + return {} + + def save_metadata(self, table_name: str, data: dict) -> None: + data_json = json.dumps(data) + log.info("save_metadata", table_name=table_name, data=data_json) + # sqlite upsert + self.cursor.execute( + "INSERT INTO _metadata (table_name, data) VALUES (?, ?) ON CONFLICT(table_name) DO UPDATE SET data = ?", + (table_name, data_json, data_json), + ) + self.db.commit() + + def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): + pour = Pour(from_beaker, to_beaker, transform) + self.pours.append(pour) + + def csv_to_beaker(self, filename: str, beaker: Beaker) -> None: + beaker.connect_to_db(self.db) + lg = log.bind(beaker=beaker, filename=filename) + # three cases: empty, match, mismatch + # case 1: empty + if len(beaker) == 0: + with open(filename, "r") as file: + reader = csv.DictReader(file) + added = 0 + for row in reader: + beaker.add_item(row) + added += 1 + lg.info("from_csv", case="empty", added=added) + meta = self.get_metadata(beaker.table_name) + meta["sha512"] = get_sha512(filename) + self.save_metadata(beaker.table_name, meta) + else: + old_sha = self.get_metadata(beaker.table_name).get("sha512") + new_sha = get_sha512(filename) + if old_sha != new_sha: + # case 3: mismatch + lg.info("from_csv", case="mismatch", old_sha=old_sha, new_sha=new_sha) + raise Exception("sha512 mismatch") + else: + # case 2: match + lg.info("from_csv", case="match") + return beaker def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): pour = Pour(from_beaker, to_beaker, transform) @@ -28,7 +96,9 @@ class Recipe: def run_linearly(self): log.info("recipe", recipe=self) for pour in self.pours: - print(pour.from_beaker, pour.to_beaker, pour.transform) + pour.from_beaker.connect_to_db(self.db) + pour.to_beaker.connect_to_db(self.db) + log.info( "pour", from_beaker=pour.from_beaker, diff --git a/src/example.py b/src/example.py index 0f323c8..d8aabf8 100644 --- a/src/example.py +++ b/src/example.py @@ -16,9 +16,10 @@ async def add_response(obj_with_url): } -agencies = Beaker.from_csv("agencies", "agencies.csv") +agencies = Beaker("agencies") responses = Beaker("responses") recipe = Recipe("fetch urls") +recipe.csv_to_beaker("agencies.csv", agencies) recipe.add_pour(agencies, responses, add_response) recipe.run_linearly()