foiaghost/examples/foiaghost.py
2023-05-08 01:11:07 -05:00

44 lines
1.1 KiB
Python

from ssl import SSLCertVerificationError, SSLError
import httpx
from beakers.beakers import Beaker
from beakers.recipe import Recipe
async def add_response(obj_with_url):
print(obj_with_url["url"])
url = obj_with_url["url"]
async with httpx.AsyncClient() as client:
response = await client.get(url)
return {
"url": url,
"status_code": response.status_code,
"response_body": response.text,
}
# current thinking, beakers exist within a recipe
recipe = Recipe("fetch urls", "url_example.db")
recipe.add_beaker("agencies")
recipe.add_beaker("responses")
recipe.add_beaker("bad_requests")
recipe.add_beaker("good_urls", temp=True)
recipe.add_beaker("missing_urls", temp=True)
recipe.add_conditional(
"agencies",
lambda x: x["url"].startswith("http"),
if_true="good_urls",
if_false="missing_urls",
)
recipe.add_transform(
"good_urls",
"responses",
add_response,
error_map={
(
httpx.HTTPError,
SSLCertVerificationError,
SSLError,
): "bad_requests"
},
)