schema = { "public_records_email": "email", "public_records_address": "str", "public_records_phone": "555-555-5555", "public_records_fax": "555-555-5555", "public_records_web": "url", "general_contact_phone": "555-555-5555", "general_contact_address": "str", "foia_guide": "url", "public_reading_room": "url", "agency_logo": "url", } extra_instructions = """ The fields that begin with public_records should refer to contact information specific to FOIA/Public Information/Freedom of Information requests. The fields that begin with general_contact should refer to contact information for the agency in general. If a field is not found in the HTML, leave it as null in the JSON. """ # create a scraper w/ a sqlite cache scraper = Scraper(requests_per_minute=600) scraper.cache_storage = SQLiteCache("cache.sqlite") # create a scrapeghost ghost = SchemaScraper( schema=schema, extra_preprocessors=[], ) agencies = [] async def fetch_urls(urls): async with httpx.AsyncClient() as client: tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks) return responses async def worker(queue, batch_size): with open("results.csv", "w") as outf: out = csv.DictWriter( outf, fieldnames=["id", "url", "status"] + list(schema.keys()) ) while True: urls = [] for _ in range(batch_size): try: url = await queue.get() urls.append(url) except asyncio.QueueEmpty: break if len(urls) > 0: responses = await fetch_urls(urls, batch_size) async yield responses async def main(): batch_size = 5 with open("agencies.csv", "r") as inf, agencies = csv.DictReader(inf) # grouper -> https://docs.python.org/3/library/itertools.html#itertools-recipes except Exception as e: print(e) out.writerow( { "id": agency["id"], "url": agency["url"], "status": "ERROR", } ) continue result = ghost.scrape(page.text) out.writerow( result + {"id": agency["id"], "url": agency["url"], "status": "OK"} ) if __name__ == "__main__": main()