foiaghost/foiaghost.py

91 lines
2.7 KiB
Python
Raw Normal View History

2023-04-27 06:25:07 +00:00
import asyncio
import httpx
import csv
from asyncio import Queue
from itertools import zip_longest
from scrapelib import Scraper, SQLiteCache
from scrapeghost import SchemaScraper, CSS
schema = {
"public_records_email": "email",
"public_records_address": "str",
"public_records_phone": "555-555-5555",
"public_records_fax": "555-555-5555",
"public_records_web": "url",
"general_contact_phone": "555-555-5555",
"general_contact_address": "str",
"foia_guide": "url",
"public_reading_room": "url",
"agency_logo": "url",
}
extra_instructions = """
The fields that begin with public_records should refer to contact information specific to FOIA/Public Information/Freedome of Information requests.
The fields that begin with general_contact should refer to contact information for the agency in general.
If a field is not found in the HTML, leave it as null in the JSON.
"""
# create a scraper w/ a sqlite cache
scraper = Scraper(requests_per_minute=600)
scraper.cache_storage = SQLiteCache("cache.sqlite")
# create a scrapeghost
ghost = SchemaScraper(
schema=schema,
extra_preprocessors=[],
)
agencies = []
async def fetch_urls(urls):
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return responses
async def worker(queue, batch_size):
with open("results.csv", "w") as outf:
out = csv.DictWriter(
outf, fieldnames=["id", "url", "status"] + list(schema.keys())
)
while True:
urls = []
for _ in range(batch_size):
try:
url = await queue.get()
urls.append(url)
except asyncio.QueueEmpty:
break
if len(urls) > 0:
responses = await fetch_urls(urls, batch_size)
async yield responses
async def main():
batch_size = 5
with open("agencies.csv", "r") as inf,
agencies = csv.DictReader(inf)
# grouper -> https://docs.python.org/3/library/itertools.html#itertools-recipes
except Exception as e:
print(e)
out.writerow(
{
"id": agency["id"],
"url": agency["url"],
"status": "ERROR",
}
)
continue
result = ghost.scrape(page.text)
out.writerow(
result
+ {"id": agency["id"], "url": agency["url"], "status": "OK"}
)
if __name__ == "__main__":
main()