diff --git a/poetry.lock b/poetry.lock index 75fa75a..66a28ca 100644 --- a/poetry.lock +++ b/poetry.lock @@ -124,6 +124,27 @@ files = [ [package.dependencies] frozenlist = ">=1.1.0" +[[package]] +name = "anyio" +version = "3.6.2" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +category = "main" +optional = false +python-versions = ">=3.6.2" +files = [ + {file = "anyio-3.6.2-py3-none-any.whl", hash = "sha256:fbbe32bd270d2a2ef3ed1c5d45041250284e31fc0a4df4a5a6071842051a51e3"}, + {file = "anyio-3.6.2.tar.gz", hash = "sha256:25ea0d673ae30af41a0c442f81cf3b38c7e79fdc7b60335a4c14e05eb0947421"}, +] + +[package.dependencies] +idna = ">=2.8" +sniffio = ">=1.1" + +[package.extras] +doc = ["packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] +test = ["contextlib2", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (<0.15)", "uvloop (>=0.15)"] +trio = ["trio (>=0.16,<0.22)"] + [[package]] name = "async-timeout" version = "4.0.2" @@ -375,6 +396,64 @@ files = [ {file = "frozenlist-1.3.3.tar.gz", hash = "sha256:58bcc55721e8a90b88332d6cd441261ebb22342e238296bb330968952fbb3a6a"}, ] +[[package]] +name = "h11" +version = "0.14.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, + {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, +] + +[[package]] +name = "httpcore" +version = "0.17.0" +description = "A minimal low-level HTTP client." +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "httpcore-0.17.0-py3-none-any.whl", hash = "sha256:0fdfea45e94f0c9fd96eab9286077f9ff788dd186635ae61b312693e4d943599"}, + {file = "httpcore-0.17.0.tar.gz", hash = "sha256:cc045a3241afbf60ce056202301b4d8b6af08845e3294055eb26b09913ef903c"}, +] + +[package.dependencies] +anyio = ">=3.0,<5.0" +certifi = "*" +h11 = ">=0.13,<0.15" +sniffio = ">=1.0.0,<2.0.0" + +[package.extras] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (>=1.0.0,<2.0.0)"] + +[[package]] +name = "httpx" +version = "0.24.0" +description = "The next generation HTTP client." +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "httpx-0.24.0-py3-none-any.whl", hash = "sha256:447556b50c1921c351ea54b4fe79d91b724ed2b027462ab9a329465d147d5a4e"}, + {file = "httpx-0.24.0.tar.gz", hash = "sha256:507d676fc3e26110d41df7d35ebd8b3b8585052450f4097401c9be59d928c63e"}, +] + +[package.dependencies] +certifi = "*" +httpcore = ">=0.15.0,<0.18.0" +idna = "*" +sniffio = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (>=8.0.0,<9.0.0)", "pygments (>=2.0.0,<3.0.0)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (>=1.0.0,<2.0.0)"] + [[package]] name = "idna" version = "3.4" @@ -601,6 +680,25 @@ files = [ {file = "multidict-6.0.4.tar.gz", hash = "sha256:3666906492efb76453c0e7b97f2cf459b0682e7402c0489a95484965dbc1da49"}, ] +[[package]] +name = "networkx" +version = "3.1" +description = "Python package for creating and manipulating graphs and networks" +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "networkx-3.1-py3-none-any.whl", hash = "sha256:4f33f68cb2afcf86f28a45f43efc27a9386b535d567d2127f8f61d51dec58d36"}, + {file = "networkx-3.1.tar.gz", hash = "sha256:de346335408f84de0eada6ff9fafafff9bcda11f0a0dfaa931133debb146ab61"}, +] + +[package.extras] +default = ["matplotlib (>=3.4)", "numpy (>=1.20)", "pandas (>=1.3)", "scipy (>=1.8)"] +developer = ["mypy (>=1.1)", "pre-commit (>=3.2)"] +doc = ["nb2plots (>=0.6)", "numpydoc (>=1.5)", "pillow (>=9.4)", "pydata-sphinx-theme (>=0.13)", "sphinx (>=6.1)", "sphinx-gallery (>=0.12)", "texext (>=0.6.7)"] +extra = ["lxml (>=4.6)", "pydot (>=1.4.2)", "pygraphviz (>=1.10)", "sympy (>=1.10)"] +test = ["codecov (>=2.1)", "pytest (>=7.2)", "pytest-cov (>=4.0)"] + [[package]] name = "openai" version = "0.27.4" @@ -842,6 +940,18 @@ files = [ [package.dependencies] requests = {version = ">=2.28.1,<3.0.0", extras = ["security"]} +[[package]] +name = "sniffio" +version = "1.3.0" +description = "Sniff out which async library your code is running under" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sniffio-1.3.0-py3-none-any.whl", hash = "sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"}, + {file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"}, +] + [[package]] name = "structlog" version = "22.3.0" @@ -1068,4 +1178,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "b0d01dfcd886acda1190e6f9ce4c18dc9fddf2ca01338036c1f0f3fc734537b7" +content-hash = "7e97eb5b1a403e6d90beff20f5d470e4bdc71648da3f0971f92fc96e1150f7fa" diff --git a/pyproject.toml b/pyproject.toml index d8dde7e..a16f77a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,8 @@ readme = "README.md" python = "^3.11" scrapeghost = {path = "../scrapeghost", develop = true} scrapelib = "^2.1.0" +httpx = "^0.24.0" +networkx = "^3.1" [build-system] diff --git a/src/beakers/recipe.py b/src/beakers/recipe.py index 2eca7dd..c8e8819 100644 --- a/src/beakers/recipe.py +++ b/src/beakers/recipe.py @@ -4,6 +4,7 @@ import sqlite3 import hashlib import asyncio from dataclasses import dataclass +import networkx from structlog import get_logger log = get_logger() @@ -40,34 +41,34 @@ pours are edges. The recipe is the graph. """ -class Beaker: - def __init__(self, table_name: str, recipe): - self.table_name = table_name - self.recipe = recipe +# class Beaker: +# def __init__(self, table_name: str, recipe): +# self.table_name = table_name +# self.recipe = recipe - # create table if it doesn't exist - self.recipe.cursor.execute( - f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" - ) +# # create table if it doesn't exist +# self.recipe.cursor.execute( +# f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)" +# ) - def __repr__(self): - return f"Beaker({self.table_name})" +# def __repr__(self): +# return f"Beaker({self.table_name})" - def items(self): - self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}") - data = self.recipe.cursor.fetchall() - for item in data: - yield item["id"], json.loads(item["data"]) +# def items(self): +# self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}") +# data = self.recipe.cursor.fetchall() +# for item in data: +# yield item["id"], json.loads(item["data"]) - def __len__(self): - self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") - return self.recipe.cursor.fetchone()[0] +# def __len__(self): +# self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") +# return self.recipe.cursor.fetchone()[0] - def add_item(self, item: dict, from_table=None, from_id=None) -> None: - self.recipe.cursor.execute( - f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) - ) - self.recipe.cursor.commit() +# def add_item(self, item: dict, from_table=None, from_id=None) -> None: +# self.recipe.cursor.execute( +# f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),) +# ) +# self.recipe.cursor.commit() def get_sha512(filename: str) -> str: @@ -75,27 +76,49 @@ def get_sha512(filename: str) -> str: return hashlib.sha512(file.read()).hexdigest() -@dataclass +@dataclass(eq=True, frozen=True) +class Beaker: + """ + A beaker is a node in the graph. + + They can correspond to tables in the database, + or they can be temporary. + """ + + name: str + temporary: bool = False + + +@dataclass(eq=True, frozen=True) class Pour: - from_beaker: str - to_beaker: str - transform: callable + """ + A pour is an edge in the graph. + + It contains a function that transforms data + from one beaker to another. + """ + + transform_func: callable -@dataclass -class Split: - from_beaker: str - condition: callable - if_true: str - if_false: str +@dataclass(eq=True, frozen=True) +class Conditional: + """ + A conditional is a decision point in the graph. + + condition_func should return values, and the + values will be used to determine which path + to take. + """ + + condition_func: callable class Recipe: - def __init__(self, name: str): + def __init__(self, name): self.name = name + self.graph = networkx.DiGraph() self.beakers = {} - self.pours = [] - self.splits = [] self.db = sqlite3.connect("beakers.db") self.cursor = self.db.cursor() self.cursor.row_factory = sqlite3.Row @@ -106,6 +129,47 @@ class Recipe: def __repr__(self) -> str: return f"Recipe({self.name})" + def add_beaker(self, name: str, temp: bool = False) -> Beaker: + beaker = Beaker(name, temporary=temp) + self.graph.add_node(beaker) + self.beakers[name] = beaker + return beaker + + def add_pour( + self, from_beaker: str, to_beaker: str, transform_func: callable + ) -> None: + self.graph.add_edge( + self.beakers[from_beaker], self.beakers[to_beaker], transform=transform_func + ) + + def add_conditional( + self, + from_beaker: str, + condition_func: callable, + if_true: str, + if_false: str, + ) -> None: + # first add a transform to evaluate the conditional + cond_name = f"cond-{from_beaker}-{condition_func.__name__}" + cond = self.add_beaker(cond_name, temp=True) + self.add_pour( + from_beaker, + cond_name, + lambda data: (data, condition_func(data)), + ) + + # then add two filtered paths that remove the condition result + self.graph.add_edge( + cond, + self.beakers[if_true], + filter_func=lambda data, condition: data if condition else None, + ) + self.graph.add_edge( + cond, + self.beakers[if_false], + filter_func=lambda data, condition: data if not condition else None, + ) + def get_metadata(self, table_name) -> dict: self.cursor.execute( "SELECT data FROM _metadata WHERE table_name = ?", @@ -129,11 +193,8 @@ class Recipe: ) self.db.commit() - def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable): - pour = Pour(from_beaker, to_beaker, transform) - self.pours.append(pour) - - def csv_to_beaker(self, filename: str, beaker: Beaker) -> None: + def csv_to_beaker(self, filename: str, beaker_name: str) -> None: + beaker = self.beakers[beaker_name] lg = log.bind(beaker=beaker, filename=filename) # three cases: empty, match, mismatch # case 1: empty @@ -160,23 +221,17 @@ class Recipe: lg.info("from_csv", case="match") return beaker - def declare_beaker(self, name: str, temp=True): - beaker = Beaker(name, self) - self.beakers[name] = beaker - if temp: - self.cursor.execute(f"DROP TABLE IF EXISTS {name}") - return beaker - - def add_pour(self, from_beaker: str, to_beaker: str, transform: callable): - pour = Pour(from_beaker, to_beaker, transform) - self.pours.append(pour) - - def add_split( - self, from_beaker: str, condition: callable, if_true: str, if_false: str - ): - split = Split(from_beaker, condition, if_true, if_false) - self.splits.append(split) - return split + def solve_dag(self): + """ + Solve the DAG by topological sort. + """ + for node in networkx.topological_sort(self.graph): + if isinstance(node, Beaker): + print(node) + elif isinstance(node, Conditional): + print(node) + else: + raise Exception("unknown node type") def run_linearly(self): log.info("recipe", recipe=self) diff --git a/src/example.py b/src/example.py index aed07df..41f5c4d 100644 --- a/src/example.py +++ b/src/example.py @@ -15,12 +15,12 @@ async def add_response(obj_with_url): # current thinking, beakers exist within a recipe recipe = Recipe("fetch urls") -recipe.declare_beaker("agencies") -recipe.declare_beaker("responses") -recipe.declare_beaker("good_urls", temp=True) -recipe.declare_beaker("missing_urls", temp=True) -recipe.csv_to_beaker("agencies.csv", "agencies") -recipe.add_split( +recipe.add_beaker("agencies") +recipe.add_beaker("responses") +recipe.add_beaker("good_urls", temp=True) +recipe.add_beaker("missing_urls", temp=True) +# recipe.csv_to_beaker("agencies.csv", "agencies") +recipe.add_conditional( "agencies", lambda x: x["url"].startswith("http"), if_true="good_urls", @@ -28,4 +28,4 @@ recipe.add_split( ) recipe.add_pour("good_urls", "responses", add_response) -recipe.run_linearly() +recipe.solve_dag()