serialize/deserialize
This commit is contained in:
parent
2b5e4f1d91
commit
04e43f0ab1
@ -53,6 +53,7 @@ recipe.add_transform(
|
||||
"article",
|
||||
extract_npr_article,
|
||||
)
|
||||
recipe.add_transform("archived_article")
|
||||
|
||||
|
||||
npr_examples = [
|
||||
|
@ -1,28 +1,57 @@
|
||||
from beakers.recipe import Recipe
|
||||
import pydantic
|
||||
|
||||
recipe = Recipe("example01")
|
||||
words_beaker = recipe.add_beaker("words", temp=True)
|
||||
recipe.add_beaker("fruits")
|
||||
recipe.add_beaker("other")
|
||||
recipe.add_beaker("sentences")
|
||||
|
||||
class Word(pydantic.BaseModel):
|
||||
word: str
|
||||
|
||||
|
||||
class ClassifiedWord(pydantic.BaseModel):
|
||||
normalized_word: str
|
||||
is_fruit: bool
|
||||
|
||||
|
||||
class Sentence(pydantic.BaseModel):
|
||||
sentence: list[str]
|
||||
|
||||
|
||||
def word_classifier(item) -> ClassifiedWord:
|
||||
return ClassifiedWord(
|
||||
normalized_word=item.word.lower(),
|
||||
is_fruit=item.word.lower()
|
||||
in (
|
||||
"apple",
|
||||
"banana",
|
||||
"fig",
|
||||
"grape",
|
||||
"lemon",
|
||||
"mango",
|
||||
"orange",
|
||||
"pear",
|
||||
"raspberry",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
recipe = Recipe("fruits-example")
|
||||
recipe.add_beaker("word", Word)
|
||||
recipe.add_beaker("classified_word", ClassifiedWord)
|
||||
recipe.add_beaker("sentence", Sentence)
|
||||
recipe.add_transform("word", "classified_word", word_classifier)
|
||||
recipe.add_conditional(
|
||||
"words",
|
||||
lambda x: x["word"]
|
||||
in (
|
||||
"apple",
|
||||
"banana",
|
||||
"fig",
|
||||
"grape",
|
||||
"lemon",
|
||||
"mango",
|
||||
"orange",
|
||||
"pear",
|
||||
"raspberry",
|
||||
),
|
||||
"classified_word",
|
||||
lambda cw: cw.is_fruit,
|
||||
"fruits",
|
||||
"other",
|
||||
)
|
||||
recipe.add_transform("fruits", "sentences", lambda x: f"I like to eat {x['word']}")
|
||||
recipe.add_transform(
|
||||
"other", "sentences", lambda x: f"I'm not so sure about {x['word']}"
|
||||
recipe.add_transform("fruits", "sentence", lambda x: f"I love a fresh {x['word']}")
|
||||
|
||||
recipe.add_seed(
|
||||
"word",
|
||||
[
|
||||
Word(word="apple"),
|
||||
Word(word="banana"),
|
||||
Word(word="hammer"),
|
||||
Word(word="orange"),
|
||||
Word(word="egg"),
|
||||
],
|
||||
)
|
||||
|
@ -4,13 +4,31 @@ import sqlite3
|
||||
import uuid
|
||||
|
||||
|
||||
class DataObject:
|
||||
def __init__(self, id: str | None = None):
|
||||
self._id = id if id else str(uuid.uuid4())
|
||||
self._data = {}
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self._data[name]
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if name.startswith("_"):
|
||||
super().__setattr__(name, value)
|
||||
elif name not in self._data:
|
||||
self._data[name] = value
|
||||
else:
|
||||
raise AttributeError(f"DataObject attribute {name} already exists")
|
||||
|
||||
|
||||
class Beaker(abc.ABC):
|
||||
def __init__(self, name: str, recipe):
|
||||
def __init__(self, name: str, model: type, recipe: "Recipe"):
|
||||
self.name = name
|
||||
self.model = model
|
||||
self.recipe = recipe
|
||||
|
||||
def __repr__(self):
|
||||
return f"Beaker({self.name})"
|
||||
return f"Beaker({self.name}, {self.model.__name__})"
|
||||
|
||||
@abc.abstractmethod
|
||||
def items(self):
|
||||
@ -21,23 +39,29 @@ class Beaker(abc.ABC):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_item(self, item: dict, id: str | None = None) -> None:
|
||||
def add_item(self, item: "T", id: str | None = None) -> None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def add_items(self, items: list["T"]) -> None:
|
||||
for item in items:
|
||||
self.add_item(item)
|
||||
|
||||
|
||||
class TempBeaker(Beaker):
|
||||
def __init__(self, name: str, recipe):
|
||||
super().__init__(name, recipe)
|
||||
def __init__(self, name: str, model: type, recipe: "Recipe"):
|
||||
super().__init__(name, model, recipe)
|
||||
self._items = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self._items)
|
||||
|
||||
def add_item(self, item: dict, id=None) -> None:
|
||||
def add_item(self, item: "T", id=None) -> None:
|
||||
if id is None:
|
||||
id = str(uuid.uuid1())
|
||||
self._items.append((id, item))
|
||||
|
||||
def items(self):
|
||||
@ -48,8 +72,8 @@ class TempBeaker(Beaker):
|
||||
|
||||
|
||||
class SqliteBeaker(Beaker):
|
||||
def __init__(self, name: str, recipe):
|
||||
super().__init__(name, recipe)
|
||||
def __init__(self, name: str, model: type, recipe: "Recipe"):
|
||||
super().__init__(name, model, recipe)
|
||||
# create table if it doesn't exist
|
||||
self.cursor = self.recipe.db.cursor()
|
||||
self.cursor.row_factory = sqlite3.Row
|
||||
@ -61,18 +85,19 @@ class SqliteBeaker(Beaker):
|
||||
self.cursor.execute(f"SELECT uuid, data FROM {self.name}")
|
||||
data = self.cursor.fetchall()
|
||||
for item in data:
|
||||
yield item["uuid"], json.loads(item["data"])
|
||||
yield item["uuid"], self.model(**json.loads(item["data"]))
|
||||
|
||||
def __len__(self):
|
||||
self.cursor.execute(f"SELECT COUNT(*) FROM {self.name}")
|
||||
return self.cursor.fetchone()[0]
|
||||
|
||||
def add_item(self, item: dict, id: str | None = None) -> None:
|
||||
def add_item(self, item: "T", id: str | None = None) -> None:
|
||||
if id is None:
|
||||
id = str(uuid.uuid1())
|
||||
print("UUID", id, item)
|
||||
self.cursor.execute(
|
||||
f"INSERT INTO {self.name} (uuid, data) VALUES (?, ?)",
|
||||
(id, json.dumps(item)),
|
||||
(id, item.model_dump_json()),
|
||||
)
|
||||
self.recipe.db.commit()
|
||||
|
||||
|
@ -54,9 +54,9 @@ class Recipe:
|
||||
def add_beaker(self, name: str, datatype: type | None) -> Beaker:
|
||||
self.graph.add_node(name, datatype=datatype)
|
||||
if datatype is None:
|
||||
self.beakers[name] = TempBeaker(name, self)
|
||||
self.beakers[name] = TempBeaker(name, datatype, self)
|
||||
else:
|
||||
self.beakers[name] = SqliteBeaker(name, self)
|
||||
self.beakers[name] = SqliteBeaker(name, datatype, self)
|
||||
return self.beakers[name]
|
||||
|
||||
def add_transform(
|
||||
@ -124,7 +124,7 @@ class Recipe:
|
||||
log.info("process_seeds", recipe=self.name)
|
||||
for beaker_name, seeds in self.seeds.items():
|
||||
for seed in seeds:
|
||||
self.beakers[beaker_name].add_item(seed)
|
||||
self.beakers[beaker_name].add_items(seed)
|
||||
|
||||
def get_metadata(self, table_name) -> dict:
|
||||
cursor = self.db.cursor()
|
||||
|
Loading…
Reference in New Issue
Block a user