import sqlite3 import csv import json import hashlib from structlog import get_logger log = get_logger() class Beaker: def __init__(self, table_name: str, db=None): self.table_name = table_name self.db = db if db else sqlite3.connect(f"beaker.db") self.cursor = self.db.cursor() self.cursor.row_factory = sqlite3.Row # create table if it doesn't exist self._init_metadata() self.cursor.execute( f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" ) def __str__(self): return f"Beaker({self.table_name})" __repr__ = __str__ def __iter__(self): self.cursor.execute(f"SELECT data FROM {self.table_name}") data = self.cursor.fetchall() for item in data: print(item) yield item["id"], item["data"] def __len__(self): self.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") return self.cursor.fetchone()[0] def _init_metadata(self): self.cursor.execute( "CREATE TABLE IF NOT EXISTS _metadata (id INTEGER PRIMARY KEY, table_name TEXT, data JSON)" ) self.cursor.execute( "INSERT INTO _metadata (table_name, data) VALUES (?, ?)", (self.table_name, json.dumps({})), ) self.db.commit() def get_metadata(self) -> dict: self.cursor.execute( "SELECT data FROM _metadata WHERE table_name = ?", (self.table_name,), ) data = self.cursor.fetchone()["data"] return json.loads(data) def save_metadata(self, data: dict) -> None: self.cursor.execute( "UPDATE _metadata SET data = ? WHERE table_name = ?", (json.dumps(data), self.table_name), ) self.db.commit() def add_item(self, item: dict, from_table=None, from_id=None) -> None: self.cursor.execute( f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) ) self.db.commit() @classmethod def from_csv(cls, table_name, filename: str) -> None: beaker = cls(table_name) lg = log.bind(table_name=table_name, filename=filename) # three cases: empty, match, mismatch # case 1: empty if len(beaker) == 0: with open(filename, "r") as file: reader = csv.DictReader(file) added = 0 for row in reader: beaker.add_item(row) added += 1 lg.info("from_csv", case="empty", added=added) meta = beaker.get_metadata() meta["sha512"] = get_sha512(filename) beaker.save_metadata(meta) else: old_sha = beaker.get_metadata().get("sha512") new_sha = get_sha512(filename) if old_sha != new_sha: # case 3: mismatch lg.info("from_csv", case="mismatch", old_sha=old_sha, new_sha=new_sha) raise Exception("sha512 mismatch") else: # case 2: match log.info("from_csv", case="match") return beaker def get_sha512(filename: str) -> str: with open(filename, "rb") as file: return hashlib.sha512(file.read()).hexdigest()