sqlite pushing

This commit is contained in:
James Turk 2023-05-07 20:06:28 -05:00
parent f98b0140b6
commit 3b93ece18a
4 changed files with 157 additions and 27 deletions

69
src/beakers/beakers.py Normal file
View File

@ -0,0 +1,69 @@
import abc
import json
import sqlite3
class Beaker(abc.ABC):
def __init__(self, name: str, recipe):
self.name = name
self.recipe = recipe
def __repr__(self):
return f"Beaker({self.name})"
@abc.abstractmethod
def items(self):
pass
@abc.abstractmethod
def __len__(self):
pass
@abc.abstractmethod
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
pass
class TempBeaker(Beaker):
def __init__(self, name: str, recipe):
super().__init__(name, recipe)
self._items = []
def __len__(self):
return len(self._items)
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
self._items.append((from_id, item))
def items(self):
yield from self._items
class SqliteBeaker(Beaker):
def __init__(self, name: str, recipe):
super().__init__(name, recipe)
# create table if it doesn't exist
cursor = self.recipe.db.cursor()
cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
def items(self):
cursor = self.recipe.db.cursor()
cursor.row_factory = sqlite3.Row
cursor.execute(f"SELECT id, data FROM {self.name}")
data = cursor.fetchall()
for item in data:
yield item["id"], json.loads(item["data"])
def __len__(self):
cursor = self.recipe.db.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {self.name}")
return cursor.fetchone()[0]
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
cursor = self.db.cursor()
cursor.execute(
f"INSERT INTO {self.name} (data) VALUES (?)", (json.dumps(item),)
)
self.db.commit()

View File

@ -1,5 +1,6 @@
import csv
import json
import inspect
import sqlite3
import hashlib
import asyncio
@ -79,9 +80,8 @@ class Recipe:
self.graph = networkx.DiGraph()
self.beakers = {}
self.db = sqlite3.connect("beakers.db")
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
self.cursor.execute(
cursor = self.db.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS _metadata (table_name TEXT PRIMARY KEY, data JSON)"
)
@ -130,12 +130,13 @@ class Recipe:
)
def get_metadata(self, table_name) -> dict:
self.cursor.execute(
cursor = self.db.cursor()
cursor.execute(
"SELECT data FROM _metadata WHERE table_name = ?",
(table_name,),
)
try:
data = self.cursor.fetchone()["data"]
data = cursor.fetchone()["data"]
log.debug("get_metadata", table_name=table_name, data=data)
return json.loads(data)
except TypeError:
@ -146,7 +147,8 @@ class Recipe:
data_json = json.dumps(data)
log.info("save_metadata", table_name=table_name, data=data_json)
# sqlite upsert
self.cursor.execute(
cursor = self.db.cursor()
cursor.execute(
"INSERT INTO _metadata (table_name, data) VALUES (?, ?) ON CONFLICT(table_name) DO UPDATE SET data = ?",
(table_name, data_json, data_json),
)
@ -180,25 +182,30 @@ class Recipe:
lg.info("from_csv", case="match")
return beaker
def solve_dag(self):
"""
Solve the DAG by topological sort.
"""
for node in networkx.topological_sort(self.graph):
print(node)
def run_linearly(self):
def run_push(self):
log.info("recipe", recipe=self)
loop = asyncio.get_event_loop()
for pour in self.pours:
# go through each node in forward order, pushing data
for node in networkx.topological_sort(self.graph):
# get outbound edges
edges = self.graph.out_edges(node, data=True)
for from_b, to_b, edge in edges:
if "transform" in edge:
func = edge["transform"]
from_beaker = self.beakers[from_b]
to_beaker = self.beakers[to_b]
log.info(
"pour",
from_beaker=pour.from_beaker,
to_beaker=pour.to_beaker,
to_pour=len(pour.from_beaker),
"transform", from_b=from_b, to_b=to_b, items=len(from_beaker)
)
for id, item in pour.from_beaker.items():
log.info("pour_item", id=id, item=item)
transformed = loop.run_until_complete(pour.transform(item))
pour.to_beaker.add_item(transformed, pour.from_beaker.name, id)
if inspect.iscoroutinefunction(func):
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=True)
transformed = loop.run_until_complete(func(item))
to_beaker.add_item(transformed, from_beaker.name, id)
else:
for id, item in from_beaker.items():
log.debug("transform item", id=id, item=item, async_=False)
transformed = func(item)
to_beaker.add_item(transformed, from_beaker.name, id)

View File

@ -1,5 +1,6 @@
import httpx
from beakers.recipe import Recipe, Beaker
from beakers.beakers import Beaker
from beakers.recipe import Recipe
async def add_response(obj_with_url):
@ -28,4 +29,4 @@ recipe.add_conditional(
recipe.add_pour("good_urls", "responses", add_response)
recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.solve_dag()
recipe.run_push()

53
src/sort_example.py Normal file
View File

@ -0,0 +1,53 @@
from beakers.recipe import Recipe
words = [
"apple",
"banana",
"cactus",
"daikon",
"eel",
"fig",
"grape",
"hibiscus",
"ice",
"jelly",
"kingfish",
"lemon",
"mango",
"nougat",
"orange",
"pear",
"quail",
"raspberry",
"sturgeon",
"tamarind",
]
recipe = Recipe("example01")
words_beaker = recipe.add_beaker("words", temp=True)
recipe.add_beaker("fruits")
recipe.add_beaker("other")
recipe.add_beaker("sentences")
recipe.add_conditional(
"words",
lambda x: x
in (
"apple",
"banana",
"fig",
"grape",
"lemon",
"mango",
"orange",
"pear",
"raspberry",
),
"fruits",
"other",
)
recipe.add_pour("fruits", "sentences", lambda x: f"I like to eat {x}")
recipe.add_pour("other", "sentences", lambda x: f"I'm not so sure about {x}")
for word in words:
words_beaker.add_item(word)
recipe.run_push()