Python async scrapper with postgres and sqlalchemy

Python -- Posted on June 6, 2025

Python async scrapper with postgres and sqlalchemy

              
                import asyncio
import aiohttp
import json
from lxml import html
from urllib.parse import urljoin, urlparse

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

# ───── Database Setup ─────
DATABASE_URL = "postgresql+psycopg2://user:pass@localhost:5432/database"
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()

# Self-referential many-to-many relationship
page_children = Table(
    'page_children', Base.metadata,
    Column('parent_id', Integer, ForeignKey('pages.id', ondelete="CASCADE"), primary_key=True),
    Column('child_id', Integer, ForeignKey('pages.id', ondelete="CASCADE"), primary_key=True)
)

class Page(Base):
    __tablename__ = 'pages'
    id = Column(Integer, primary_key=True)
    url = Column(String, unique=True, nullable=False)
    title = Column(String)
    description = Column(String)
    keywords = Column(String)

    children = relationship(
        'Page',
        secondary=page_children,
        primaryjoin=id == page_children.c.parent_id,
        secondaryjoin=id == page_children.c.child_id,
        backref='parents'
    )

Base.metadata.create_all(engine)

# ───── Async Crawler ─────

visited = set()
semaphore = asyncio.Semaphore(10)  # Max 10 concurrent requests

async def fetch_and_parse(session_http, url, base_domain, depth):
    if url in visited or depth < 0 or urlparse(url).netloc != base_domain:
        return
    visited.add(url)
    print(f"[+] Crawling: {url} (depth={depth})")

    try:
        async with semaphore, session_http.get(url, timeout=10) as resp:
            if resp.status != 200:
                return
            content = await resp.read()
    except Exception as e:
        print(f"[-] Error fetching {url}: {e}")
        return

    tree = html.fromstring(content)

    # Extract metadata
    page_title = tree.findtext(".//title") or ""
    meta_desc = tree.xpath("//meta[@name='description']/@content")
    meta_keywords = tree.xpath("//meta[@name='keywords']/@content")

    parent_page = session.query(Page).filter_by(url=url).first()
    if not parent_page:
        parent_page = Page(
            url=url,
            title=page_title.strip(),
            description=meta_desc[0].strip() if meta_desc else "",
            keywords=meta_keywords[0].strip() if meta_keywords else ""
        )
        session.add(parent_page)
    else:
        parent_page.title = page_title.strip()
        parent_page.description = meta_desc[0].strip() if meta_desc else ""
        parent_page.keywords = meta_keywords[0].strip() if meta_keywords else ""

    session.commit()

    tasks = []
    for el, attr, link, _ in tree.iterlinks():
        if el.tag == 'a' and attr == 'href':
            child_url = urljoin(url, link).split('#')[0].rstrip('/')
            if child_url.startswith("mailto:") or child_url.startswith("javascript:") or child_url in visited:
                continue
            if urlparse(child_url).netloc != base_domain:
                continue

            child_page = session.query(Page).filter_by(url=child_url).first()
            if not child_page:
                child_page = Page(url=child_url)
                session.add(child_page)
                session.commit()

            if child_page not in parent_page.children:
                parent_page.children.append(child_page)

            tasks.append(fetch_and_parse(session_http, child_url, base_domain, depth - 1))

    session.commit()
    await asyncio.gather(*tasks)

async def crawl(start_url, depth=2):
    base_domain = urlparse(start_url).netloc
    async with aiohttp.ClientSession(headers={'User-Agent': 'Mozilla/5.0'}) as session_http:
        await fetch_and_parse(session_http, start_url, base_domain, depth)

# ───── Export to JSON ─────

def export_to_json(filename="pages_export.json"):
    data = []
    for page in session.query(Page).all():
        data.append({
            "url": page.url,
            "title": page.title,
            "description": page.description,
            "keywords": page.keywords,
            "children": [child.url for child in page.children]
        })
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print(f"[✓] Exported to {filename}")

# ───── Main Entrypoint ─────

if __name__ == "__main__":
    start_url = "url"
    asyncio.run(crawl(start_url, depth=2))
    export_to_json()
                  
   
            

Related Posts