Python async scrapper with postgres and sqlalchemy
import asyncio
import aiohttp
import json
from lxml import html
from urllib.parse import urljoin, urlparse
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# ───── Database Setup ─────
DATABASE_URL = "postgresql+psycopg2://user:pass@localhost:5432/database"
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
# Self-referential many-to-many relationship
page_children = Table(
'page_children', Base.metadata,
Column('parent_id', Integer, ForeignKey('pages.id', ondelete="CASCADE"), primary_key=True),
Column('child_id', Integer, ForeignKey('pages.id', ondelete="CASCADE"), primary_key=True)
)
class Page(Base):
__tablename__ = 'pages'
id = Column(Integer, primary_key=True)
url = Column(String, unique=True, nullable=False)
title = Column(String)
description = Column(String)
keywords = Column(String)
children = relationship(
'Page',
secondary=page_children,
primaryjoin=id == page_children.c.parent_id,
secondaryjoin=id == page_children.c.child_id,
backref='parents'
)
Base.metadata.create_all(engine)
# ───── Async Crawler ─────
visited = set()
semaphore = asyncio.Semaphore(10) # Max 10 concurrent requests
async def fetch_and_parse(session_http, url, base_domain, depth):
if url in visited or depth < 0 or urlparse(url).netloc != base_domain:
return
visited.add(url)
print(f"[+] Crawling: {url} (depth={depth})")
try:
async with semaphore, session_http.get(url, timeout=10) as resp:
if resp.status != 200:
return
content = await resp.read()
except Exception as e:
print(f"[-] Error fetching {url}: {e}")
return
tree = html.fromstring(content)
# Extract metadata
page_title = tree.findtext(".//title") or ""
meta_desc = tree.xpath("//meta[@name='description']/@content")
meta_keywords = tree.xpath("//meta[@name='keywords']/@content")
parent_page = session.query(Page).filter_by(url=url).first()
if not parent_page:
parent_page = Page(
url=url,
title=page_title.strip(),
description=meta_desc[0].strip() if meta_desc else "",
keywords=meta_keywords[0].strip() if meta_keywords else ""
)
session.add(parent_page)
else:
parent_page.title = page_title.strip()
parent_page.description = meta_desc[0].strip() if meta_desc else ""
parent_page.keywords = meta_keywords[0].strip() if meta_keywords else ""
session.commit()
tasks = []
for el, attr, link, _ in tree.iterlinks():
if el.tag == 'a' and attr == 'href':
child_url = urljoin(url, link).split('#')[0].rstrip('/')
if child_url.startswith("mailto:") or child_url.startswith("javascript:") or child_url in visited:
continue
if urlparse(child_url).netloc != base_domain:
continue
child_page = session.query(Page).filter_by(url=child_url).first()
if not child_page:
child_page = Page(url=child_url)
session.add(child_page)
session.commit()
if child_page not in parent_page.children:
parent_page.children.append(child_page)
tasks.append(fetch_and_parse(session_http, child_url, base_domain, depth - 1))
session.commit()
await asyncio.gather(*tasks)
async def crawl(start_url, depth=2):
base_domain = urlparse(start_url).netloc
async with aiohttp.ClientSession(headers={'User-Agent': 'Mozilla/5.0'}) as session_http:
await fetch_and_parse(session_http, start_url, base_domain, depth)
# ───── Export to JSON ─────
def export_to_json(filename="pages_export.json"):
data = []
for page in session.query(Page).all():
data.append({
"url": page.url,
"title": page.title,
"description": page.description,
"keywords": page.keywords,
"children": [child.url for child in page.children]
})
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"[✓] Exported to {filename}")
# ───── Main Entrypoint ─────
if __name__ == "__main__":
start_url = "url"
asyncio.run(crawl(start_url, depth=2))
export_to_json()