big recipe refactor

This commit is contained in:
James Turk 2023-04-27 14:51:25 -05:00
parent 4391389d64
commit 7381eb9f30
3 changed files with 83 additions and 73 deletions

View File

@ -9,94 +9,33 @@ log = get_logger()
class Beaker: class Beaker:
def __init__(self, table_name: str, db=None): def __init__(self, table_name: str):
self.table_name = table_name self.table_name = table_name
self.db = db if db else sqlite3.connect(f"beaker.db")
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
# create table if it doesn't exist
self._init_metadata()
self.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
def __str__(self): def __repr__(self):
return f"Beaker({self.table_name})" return f"Beaker({self.table_name})"
__repr__ = __str__
def __iter__(self): def __iter__(self):
self.cursor.execute(f"SELECT data FROM {self.table_name}") self.cursor.execute(f"SELECT data FROM {self.table_name}")
data = self.cursor.fetchall() data = self.cursor.fetchall()
for item in data: for item in data:
print(item)
yield item["id"], item["data"] yield item["id"], item["data"]
def __len__(self): def __len__(self):
self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
return self.cursor.fetchone()[0] return self.cursor.fetchone()[0]
def _init_metadata(self): def connect_to_db(self, db) -> None:
self.db = db
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
# create table if it doesn't exist
self.cursor.execute( self.cursor.execute(
"CREATE TABLE IF NOT EXISTS _metadata (id INTEGER PRIMARY KEY, table_name TEXT, data JSON)" f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
) )
self.cursor.execute(
"INSERT INTO _metadata (table_name, data) VALUES (?, ?)",
(self.table_name, json.dumps({})),
)
self.db.commit()
def get_metadata(self) -> dict:
self.cursor.execute(
"SELECT data FROM _metadata WHERE table_name = ?",
(self.table_name,),
)
data = self.cursor.fetchone()["data"]
return json.loads(data)
def save_metadata(self, data: dict) -> None:
self.cursor.execute(
"UPDATE _metadata SET data = ? WHERE table_name = ?",
(json.dumps(data), self.table_name),
)
self.db.commit()
def add_item(self, item: dict, from_table=None, from_id=None) -> None: def add_item(self, item: dict, from_table=None, from_id=None) -> None:
self.cursor.execute( self.cursor.execute(
f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),)
) )
self.db.commit() self.db.commit()
@classmethod
def from_csv(cls, table_name, filename: str) -> None:
beaker = cls(table_name)
lg = log.bind(table_name=table_name, filename=filename)
# three cases: empty, match, mismatch
# case 1: empty
if len(beaker) == 0:
with open(filename, "r") as file:
reader = csv.DictReader(file)
added = 0
for row in reader:
beaker.add_item(row)
added += 1
lg.info("from_csv", case="empty", added=added)
meta = beaker.get_metadata()
meta["sha512"] = get_sha512(filename)
beaker.save_metadata(meta)
else:
old_sha = beaker.get_metadata().get("sha512")
new_sha = get_sha512(filename)
if old_sha != new_sha:
# case 3: mismatch
lg.info("from_csv", case="mismatch", old_sha=old_sha, new_sha=new_sha)
raise Exception("sha512 mismatch")
else:
# case 2: match
lg.info("from_csv", case="match")
return beaker
def get_sha512(filename: str) -> str:
with open(filename, "rb") as file:
return hashlib.sha512(file.read()).hexdigest()

View File

@ -1,3 +1,7 @@
import csv
import json
import sqlite3
import hashlib
from .beaker import Beaker from .beaker import Beaker
from structlog import get_logger from structlog import get_logger
@ -11,15 +15,79 @@ class Pour:
self.transform = transform self.transform = transform
def get_sha512(filename: str) -> str:
with open(filename, "rb") as file:
return hashlib.sha512(file.read()).hexdigest()
class Recipe: class Recipe:
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
self.pours = [] self.pours = []
self.db = sqlite3.connect("beakers.db")
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)"
)
def __str__(self) -> str: def __repr__(self) -> str:
return f"Recipe({self.name})" return f"Recipe({self.name})"
__repr__ = __str__ def get_metadata(self, table_name) -> dict:
self.cursor.execute(
"SELECT data FROM _metadata WHERE table_name = ?",
(table_name,),
)
try:
data = self.cursor.fetchone()["data"]
log.debug("get_metadata", table_name=table_name, data=data)
return json.loads(data)
except TypeError:
log.debug("get_metadata", table_name=table_name, data={})
return {}
def save_metadata(self, table_name: str, data: dict) -> None:
data_json = json.dumps(data)
log.info("save_metadata", table_name=table_name, data=data_json)
# sqlite upsert
self.cursor.execute(
"INSERT INTO _metadata (table_name, data) VALUES (?, ?) ON CONFLICT(table_name) DO UPDATE SET data = ?",
(table_name, data_json, data_json),
)
self.db.commit()
def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable):
pour = Pour(from_beaker, to_beaker, transform)
self.pours.append(pour)
def csv_to_beaker(self, filename: str, beaker: Beaker) -> None:
beaker.connect_to_db(self.db)
lg = log.bind(beaker=beaker, filename=filename)
# three cases: empty, match, mismatch
# case 1: empty
if len(beaker) == 0:
with open(filename, "r") as file:
reader = csv.DictReader(file)
added = 0
for row in reader:
beaker.add_item(row)
added += 1
lg.info("from_csv", case="empty", added=added)
meta = self.get_metadata(beaker.table_name)
meta["sha512"] = get_sha512(filename)
self.save_metadata(beaker.table_name, meta)
else:
old_sha = self.get_metadata(beaker.table_name).get("sha512")
new_sha = get_sha512(filename)
if old_sha != new_sha:
# case 3: mismatch
lg.info("from_csv", case="mismatch", old_sha=old_sha, new_sha=new_sha)
raise Exception("sha512 mismatch")
else:
# case 2: match
lg.info("from_csv", case="match")
return beaker
def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable):
pour = Pour(from_beaker, to_beaker, transform) pour = Pour(from_beaker, to_beaker, transform)
@ -28,7 +96,9 @@ class Recipe:
def run_linearly(self): def run_linearly(self):
log.info("recipe", recipe=self) log.info("recipe", recipe=self)
for pour in self.pours: for pour in self.pours:
print(pour.from_beaker, pour.to_beaker, pour.transform) pour.from_beaker.connect_to_db(self.db)
pour.to_beaker.connect_to_db(self.db)
log.info( log.info(
"pour", "pour",
from_beaker=pour.from_beaker, from_beaker=pour.from_beaker,

View File

@ -16,9 +16,10 @@ async def add_response(obj_with_url):
} }
agencies = Beaker.from_csv("agencies", "agencies.csv") agencies = Beaker("agencies")
responses = Beaker("responses") responses = Beaker("responses")
recipe = Recipe("fetch urls") recipe = Recipe("fetch urls")
recipe.csv_to_beaker("agencies.csv", agencies)
recipe.add_pour(agencies, responses, add_response) recipe.add_pour(agencies, responses, add_response)
recipe.run_linearly() recipe.run_linearly()