networkx version

This commit is contained in:
James Turk 2023-05-07 19:17:28 -05:00
parent 8e4358c1e9
commit b688df1c02
4 changed files with 233 additions and 66 deletions

112
poetry.lock generated
View File

@ -124,6 +124,27 @@ files = [
[package.dependencies]
frozenlist = ">=1.1.0"
[[package]]
name = "anyio"
version = "3.6.2"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
category = "main"
optional = false
python-versions = ">=3.6.2"
files = [
{file = "anyio-3.6.2-py3-none-any.whl", hash = "sha256:fbbe32bd270d2a2ef3ed1c5d45041250284e31fc0a4df4a5a6071842051a51e3"},
{file = "anyio-3.6.2.tar.gz", hash = "sha256:25ea0d673ae30af41a0c442f81cf3b38c7e79fdc7b60335a4c14e05eb0947421"},
]
[package.dependencies]
idna = ">=2.8"
sniffio = ">=1.1"
[package.extras]
doc = ["packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"]
test = ["contextlib2", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (<0.15)", "uvloop (>=0.15)"]
trio = ["trio (>=0.16,<0.22)"]
[[package]]
name = "async-timeout"
version = "4.0.2"
@ -375,6 +396,64 @@ files = [
{file = "frozenlist-1.3.3.tar.gz", hash = "sha256:58bcc55721e8a90b88332d6cd441261ebb22342e238296bb330968952fbb3a6a"},
]
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
]
[[package]]
name = "httpcore"
version = "0.17.0"
description = "A minimal low-level HTTP client."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "httpcore-0.17.0-py3-none-any.whl", hash = "sha256:0fdfea45e94f0c9fd96eab9286077f9ff788dd186635ae61b312693e4d943599"},
{file = "httpcore-0.17.0.tar.gz", hash = "sha256:cc045a3241afbf60ce056202301b4d8b6af08845e3294055eb26b09913ef903c"},
]
[package.dependencies]
anyio = ">=3.0,<5.0"
certifi = "*"
h11 = ">=0.13,<0.15"
sniffio = ">=1.0.0,<2.0.0"
[package.extras]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (>=1.0.0,<2.0.0)"]
[[package]]
name = "httpx"
version = "0.24.0"
description = "The next generation HTTP client."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "httpx-0.24.0-py3-none-any.whl", hash = "sha256:447556b50c1921c351ea54b4fe79d91b724ed2b027462ab9a329465d147d5a4e"},
{file = "httpx-0.24.0.tar.gz", hash = "sha256:507d676fc3e26110d41df7d35ebd8b3b8585052450f4097401c9be59d928c63e"},
]
[package.dependencies]
certifi = "*"
httpcore = ">=0.15.0,<0.18.0"
idna = "*"
sniffio = "*"
[package.extras]
brotli = ["brotli", "brotlicffi"]
cli = ["click (>=8.0.0,<9.0.0)", "pygments (>=2.0.0,<3.0.0)", "rich (>=10,<14)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (>=1.0.0,<2.0.0)"]
[[package]]
name = "idna"
version = "3.4"
@ -601,6 +680,25 @@ files = [
{file = "multidict-6.0.4.tar.gz", hash = "sha256:3666906492efb76453c0e7b97f2cf459b0682e7402c0489a95484965dbc1da49"},
]
[[package]]
name = "networkx"
version = "3.1"
description = "Python package for creating and manipulating graphs and networks"
category = "main"
optional = false
python-versions = ">=3.8"
files = [
{file = "networkx-3.1-py3-none-any.whl", hash = "sha256:4f33f68cb2afcf86f28a45f43efc27a9386b535d567d2127f8f61d51dec58d36"},
{file = "networkx-3.1.tar.gz", hash = "sha256:de346335408f84de0eada6ff9fafafff9bcda11f0a0dfaa931133debb146ab61"},
]
[package.extras]
default = ["matplotlib (>=3.4)", "numpy (>=1.20)", "pandas (>=1.3)", "scipy (>=1.8)"]
developer = ["mypy (>=1.1)", "pre-commit (>=3.2)"]
doc = ["nb2plots (>=0.6)", "numpydoc (>=1.5)", "pillow (>=9.4)", "pydata-sphinx-theme (>=0.13)", "sphinx (>=6.1)", "sphinx-gallery (>=0.12)", "texext (>=0.6.7)"]
extra = ["lxml (>=4.6)", "pydot (>=1.4.2)", "pygraphviz (>=1.10)", "sympy (>=1.10)"]
test = ["codecov (>=2.1)", "pytest (>=7.2)", "pytest-cov (>=4.0)"]
[[package]]
name = "openai"
version = "0.27.4"
@ -842,6 +940,18 @@ files = [
[package.dependencies]
requests = {version = ">=2.28.1,<3.0.0", extras = ["security"]}
[[package]]
name = "sniffio"
version = "1.3.0"
description = "Sniff out which async library your code is running under"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "sniffio-1.3.0-py3-none-any.whl", hash = "sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"},
{file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"},
]
[[package]]
name = "structlog"
version = "22.3.0"
@ -1068,4 +1178,4 @@ multidict = ">=4.0"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "b0d01dfcd886acda1190e6f9ce4c18dc9fddf2ca01338036c1f0f3fc734537b7"
content-hash = "7e97eb5b1a403e6d90beff20f5d470e4bdc71648da3f0971f92fc96e1150f7fa"

View File

@ -9,6 +9,8 @@ readme = "README.md"
python = "^3.11"
scrapeghost = {path = "../scrapeghost", develop = true}
scrapelib = "^2.1.0"
httpx = "^0.24.0"
networkx = "^3.1"
[build-system]

View File

@ -4,6 +4,7 @@ import sqlite3
import hashlib
import asyncio
from dataclasses import dataclass
import networkx
from structlog import get_logger
log = get_logger()
@ -40,34 +41,34 @@ pours are edges. The recipe is the graph.
"""
class Beaker:
def __init__(self, table_name: str, recipe):
self.table_name = table_name
self.recipe = recipe
# class Beaker:
# def __init__(self, table_name: str, recipe):
# self.table_name = table_name
# self.recipe = recipe
# create table if it doesn't exist
self.recipe.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
)
# # create table if it doesn't exist
# self.recipe.cursor.execute(
# f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY, data JSON, from_table TEXT NULL, from_id INTEGER NULL)"
# )
def __repr__(self):
return f"Beaker({self.table_name})"
# def __repr__(self):
# return f"Beaker({self.table_name})"
def items(self):
self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}")
data = self.recipe.cursor.fetchall()
for item in data:
yield item["id"], json.loads(item["data"])
# def items(self):
# self.recipe.cursor.execute(f"SELECT id, data FROM {self.table_name}")
# data = self.recipe.cursor.fetchall()
# for item in data:
# yield item["id"], json.loads(item["data"])
def __len__(self):
self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
return self.recipe.cursor.fetchone()[0]
# def __len__(self):
# self.recipe.cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
# return self.recipe.cursor.fetchone()[0]
def add_item(self, item: dict, from_table=None, from_id=None) -> None:
self.recipe.cursor.execute(
f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),)
)
self.recipe.cursor.commit()
# def add_item(self, item: dict, from_table=None, from_id=None) -> None:
# self.recipe.cursor.execute(
# f"INSERT INTO {self.table_name} (data) VALUES (?)", (json.dumps(item),)
# )
# self.recipe.cursor.commit()
def get_sha512(filename: str) -> str:
@ -75,27 +76,49 @@ def get_sha512(filename: str) -> str:
return hashlib.sha512(file.read()).hexdigest()
@dataclass
@dataclass(eq=True, frozen=True)
class Beaker:
"""
A beaker is a node in the graph.
They can correspond to tables in the database,
or they can be temporary.
"""
name: str
temporary: bool = False
@dataclass(eq=True, frozen=True)
class Pour:
from_beaker: str
to_beaker: str
transform: callable
"""
A pour is an edge in the graph.
It contains a function that transforms data
from one beaker to another.
"""
transform_func: callable
@dataclass
class Split:
from_beaker: str
condition: callable
if_true: str
if_false: str
@dataclass(eq=True, frozen=True)
class Conditional:
"""
A conditional is a decision point in the graph.
condition_func should return values, and the
values will be used to determine which path
to take.
"""
condition_func: callable
class Recipe:
def __init__(self, name: str):
def __init__(self, name):
self.name = name
self.graph = networkx.DiGraph()
self.beakers = {}
self.pours = []
self.splits = []
self.db = sqlite3.connect("beakers.db")
self.cursor = self.db.cursor()
self.cursor.row_factory = sqlite3.Row
@ -106,6 +129,47 @@ class Recipe:
def __repr__(self) -> str:
return f"Recipe({self.name})"
def add_beaker(self, name: str, temp: bool = False) -> Beaker:
beaker = Beaker(name, temporary=temp)
self.graph.add_node(beaker)
self.beakers[name] = beaker
return beaker
def add_pour(
self, from_beaker: str, to_beaker: str, transform_func: callable
) -> None:
self.graph.add_edge(
self.beakers[from_beaker], self.beakers[to_beaker], transform=transform_func
)
def add_conditional(
self,
from_beaker: str,
condition_func: callable,
if_true: str,
if_false: str,
) -> None:
# first add a transform to evaluate the conditional
cond_name = f"cond-{from_beaker}-{condition_func.__name__}"
cond = self.add_beaker(cond_name, temp=True)
self.add_pour(
from_beaker,
cond_name,
lambda data: (data, condition_func(data)),
)
# then add two filtered paths that remove the condition result
self.graph.add_edge(
cond,
self.beakers[if_true],
filter_func=lambda data, condition: data if condition else None,
)
self.graph.add_edge(
cond,
self.beakers[if_false],
filter_func=lambda data, condition: data if not condition else None,
)
def get_metadata(self, table_name) -> dict:
self.cursor.execute(
"SELECT data FROM _metadata WHERE table_name = ?",
@ -129,11 +193,8 @@ class Recipe:
)
self.db.commit()
def add_pour(self, from_beaker: Beaker, to_beaker: Beaker, transform: callable):
pour = Pour(from_beaker, to_beaker, transform)
self.pours.append(pour)
def csv_to_beaker(self, filename: str, beaker: Beaker) -> None:
def csv_to_beaker(self, filename: str, beaker_name: str) -> None:
beaker = self.beakers[beaker_name]
lg = log.bind(beaker=beaker, filename=filename)
# three cases: empty, match, mismatch
# case 1: empty
@ -160,23 +221,17 @@ class Recipe:
lg.info("from_csv", case="match")
return beaker
def declare_beaker(self, name: str, temp=True):
beaker = Beaker(name, self)
self.beakers[name] = beaker
if temp:
self.cursor.execute(f"DROP TABLE IF EXISTS {name}")
return beaker
def add_pour(self, from_beaker: str, to_beaker: str, transform: callable):
pour = Pour(from_beaker, to_beaker, transform)
self.pours.append(pour)
def add_split(
self, from_beaker: str, condition: callable, if_true: str, if_false: str
):
split = Split(from_beaker, condition, if_true, if_false)
self.splits.append(split)
return split
def solve_dag(self):
"""
Solve the DAG by topological sort.
"""
for node in networkx.topological_sort(self.graph):
if isinstance(node, Beaker):
print(node)
elif isinstance(node, Conditional):
print(node)
else:
raise Exception("unknown node type")
def run_linearly(self):
log.info("recipe", recipe=self)

View File

@ -15,12 +15,12 @@ async def add_response(obj_with_url):
# current thinking, beakers exist within a recipe
recipe = Recipe("fetch urls")
recipe.declare_beaker("agencies")
recipe.declare_beaker("responses")
recipe.declare_beaker("good_urls", temp=True)
recipe.declare_beaker("missing_urls", temp=True)
recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.add_split(
recipe.add_beaker("agencies")
recipe.add_beaker("responses")
recipe.add_beaker("good_urls", temp=True)
recipe.add_beaker("missing_urls", temp=True)
# recipe.csv_to_beaker("agencies.csv", "agencies")
recipe.add_conditional(
"agencies",
lambda x: x["url"].startswith("http"),
if_true="good_urls",
@ -28,4 +28,4 @@ recipe.add_split(
)
recipe.add_pour("good_urls", "responses", add_response)
recipe.run_linearly()
recipe.solve_dag()