Fix scrapers, dashboard pages, and API for production use

Scrapers:
- Rewrite Tesco scraper to handle Akamai WAF and obfuscated CSS
- Fix Dunnes category discovery to top-level only (29 vs 1603)
- Rewrite Lidl parser to extract from data-grid-data JSON attributes
- Improve Aldi and SuperValu scrapers with better error handling

API:
- Add /api/search-prices endpoint for cross-store product comparison
- Fix timezone mismatch in price history endpoint (naive vs aware datetime)
- Fix scrape status filter (success/partial instead of done)

Dashboard:
- Rewrite all 4 pages to match actual API response schemas
- Fix Price Battle button state management with st.rerun()
- Add popular search buttons for real product comparison
- Add product catalogue with pagination and image support
- Fix store colour matching to use partial name matching
- Remove last_scrape from overview, add battle pie chart
This commit is contained in:
authentik Default Admin 2026-02-11 09:52:14 +00:00
parent f9c4389f5a
commit 82430864f7
14 changed files with 1900 additions and 929 deletions

View file

@ -0,0 +1,100 @@
"""initial schema
Revision ID: 19718223ee0e
Revises:
Create Date: 2026-02-11 07:10:30.445380
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '19718223ee0e'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('categories',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('slug', sa.String(length=50), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('slug')
)
op.create_table('stores',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('slug', sa.String(length=50), nullable=False),
sa.Column('base_url', sa.String(length=255), nullable=False),
sa.Column('logo_url', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('slug')
)
op.create_table('products',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('brand', sa.String(length=100), nullable=True),
sa.Column('ean', sa.String(length=13), nullable=True),
sa.Column('category_id', sa.Integer(), nullable=True),
sa.Column('unit', sa.String(length=20), nullable=True),
sa.Column('unit_size', sa.Numeric(precision=10, scale=3), nullable=True),
sa.Column('image_url', sa.String(length=255), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.ForeignKeyConstraint(['category_id'], ['categories.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_products_ean'), 'products', ['ean'], unique=False)
op.create_table('scrape_runs',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('store_id', sa.Integer(), nullable=False),
sa.Column('started_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.Column('finished_at', sa.DateTime(), nullable=True),
sa.Column('status', sa.String(length=20), nullable=False),
sa.Column('products_scraped', sa.Integer(), nullable=False),
sa.Column('errors', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['store_id'], ['stores.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('store_products',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('store_id', sa.Integer(), nullable=False),
sa.Column('store_sku', sa.String(length=100), nullable=True),
sa.Column('store_name', sa.String(length=255), nullable=False),
sa.Column('store_url', sa.String(length=500), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['product_id'], ['products.id'], ),
sa.ForeignKeyConstraint(['store_id'], ['stores.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('price_records',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('store_product_id', sa.Integer(), nullable=False),
sa.Column('price', sa.Numeric(precision=8, scale=2), nullable=False),
sa.Column('promo_price', sa.Numeric(precision=8, scale=2), nullable=True),
sa.Column('promo_label', sa.String(length=100), nullable=True),
sa.Column('unit_price', sa.Numeric(precision=8, scale=4), nullable=True),
sa.Column('in_stock', sa.Boolean(), nullable=False),
sa.Column('scraped_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.ForeignKeyConstraint(['store_product_id'], ['store_products.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_price_records_store_product_scraped', 'price_records', ['store_product_id', 'scraped_at'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_price_records_store_product_scraped', table_name='price_records')
op.drop_table('price_records')
op.drop_table('store_products')
op.drop_table('scrape_runs')
op.drop_index(op.f('ix_products_ean'), table_name='products')
op.drop_table('products')
op.drop_table('stores')
op.drop_table('categories')
# ### end Alembic commands ###

View file

@ -34,6 +34,9 @@ dev = [
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View file

@ -2,7 +2,7 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from decimal import Decimal
from fastapi import APIRouter, Depends, HTTPException, Query
@ -35,7 +35,7 @@ async def price_history(
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
since = datetime.now(timezone.utc) - timedelta(days=days)
since = datetime.utcnow() - timedelta(days=days)
# Fetch store products with their stores
sp_stmt = (
@ -70,6 +70,88 @@ async def price_history(
return histories
@router.get("/search-prices")
async def search_prices(
q: str = Query(..., min_length=2, description="Search term"),
limit: int = Query(30, ge=1, le=100),
session: AsyncSession = Depends(get_session),
):
"""Search products by name and return their latest prices grouped by store.
This is useful for cross-store comparison: search 'milk' to see milk prices
across Tesco, Aldi, Dunnes, etc.
"""
# Latest price per store_product (window function)
latest_price_subq = (
select(
PriceRecord.store_product_id,
PriceRecord.price,
PriceRecord.promo_price,
PriceRecord.promo_label,
PriceRecord.unit_price,
func.row_number()
.over(
partition_by=PriceRecord.store_product_id,
order_by=PriceRecord.scraped_at.desc(),
)
.label("rn"),
)
.subquery()
)
latest = (
select(
latest_price_subq.c.store_product_id,
latest_price_subq.c.price,
latest_price_subq.c.promo_price,
latest_price_subq.c.promo_label,
latest_price_subq.c.unit_price,
)
.where(latest_price_subq.c.rn == 1)
.subquery()
)
# Join store_products -> stores -> latest prices, filter by name
stmt = (
select(
StoreProduct.store_name,
Store.name.label("store"),
Store.slug.label("store_slug"),
latest.c.price,
latest.c.promo_price,
latest.c.promo_label,
latest.c.unit_price,
Product.image_url,
StoreProduct.store_url,
)
.join(Store, Store.id == StoreProduct.store_id)
.join(Product, Product.id == StoreProduct.product_id)
.join(latest, latest.c.store_product_id == StoreProduct.id)
.where(StoreProduct.store_name.ilike(f"%{q}%"))
.order_by(StoreProduct.store_name, Store.name)
.limit(limit)
)
rows = (await session.execute(stmt)).all()
results = []
for row in rows:
effective = float(row.promo_price) if row.promo_price else float(row.price)
results.append({
"product_name": row.store_name,
"store": row.store,
"store_slug": row.store_slug,
"price": float(row.price),
"promo_price": float(row.promo_price) if row.promo_price else None,
"promo_label": row.promo_label,
"effective_price": effective,
"unit_price": float(row.unit_price) if row.unit_price else None,
"image_url": row.image_url,
"product_url": row.store_url,
})
return results
@router.get("/stats", response_model=StatsOut)
async def stats(
session: AsyncSession = Depends(get_session),
@ -84,7 +166,7 @@ async def stats(
# Last scrape time
last_scrape_row = await session.execute(
select(ScrapeRun.finished_at)
.where(ScrapeRun.status == "done")
.where(ScrapeRun.status.in_(["success", "partial"]))
.order_by(ScrapeRun.finished_at.desc())
.limit(1)
)

View file

@ -22,13 +22,17 @@ _DEFAULT_COLOUR_SEQUENCE = list(STORE_COLOURS.values())
def _colour_map(stores: list[str]) -> dict[str, str]:
"""Return a colour mapping, falling back to the palette for unknown stores."""
"""Return a colour mapping, using partial matching and falling back to the palette."""
palette_iter = iter(_DEFAULT_COLOUR_SEQUENCE)
mapping: dict[str, str] = {}
for s in stores:
if s in STORE_COLOURS:
mapping[s] = STORE_COLOURS[s]
else:
matched = False
for key, val in STORE_COLOURS.items():
if key.lower() in s.lower():
mapping[s] = val
matched = True
break
if not matched:
mapping[s] = next(palette_iter, "#888888")
return mapping

View file

@ -40,8 +40,8 @@ def _compare_basket(items: list[dict[str, Any]]) -> dict[str, Any]:
"""POST the basket to the API and return comparison results."""
try:
resp = httpx.post(
f"{API}/api/baskets/compare",
json={"items": items},
f"{API}/api/baskets",
json={"name": "My Basket", "items": items},
timeout=15,
)
resp.raise_for_status()
@ -61,7 +61,7 @@ if "basket_items" not in st.session_state:
# ---------------------------------------------------------------------------
# Page content
# ---------------------------------------------------------------------------
st.title("\U0001f6d2 Basket Compare")
st.title("Basket Compare")
st.caption(
"Build a shopping list, then compare the total cost at each store."
)
@ -119,7 +119,7 @@ st.subheader("Your Basket")
if not st.session_state.basket_items:
st.info("Your basket is empty. Search and add products above.")
else:
# Show basket as an editable table
# Show basket as a table
basket_df = pd.DataFrame(
[
{
@ -175,78 +175,45 @@ else:
st.divider()
st.subheader("Comparison Results")
# ---- Totals per store --------------------------------------------
store_totals: list[dict[str, Any]] = result.get("store_totals", [])
# ---- Totals per store (from BasketCompareOut.stores) ------
store_totals: list[dict[str, Any]] = result.get("stores", [])
if store_totals:
# Sort cheapest first
store_totals_sorted = sorted(store_totals, key=lambda s: s.get("total", float("inf")))
# Filter out stores with 0 items found
active_stores = [s for s in store_totals if s.get("items_found", 0) > 0]
if not active_stores:
st.warning("None of the stores carry these products.")
else:
# Sort cheapest first
active_sorted = sorted(active_stores, key=lambda s: float(s.get("total", 99999)))
# Metrics row
metric_cols = st.columns(len(store_totals_sorted))
cheapest_total = store_totals_sorted[0]["total"] if store_totals_sorted else 0
for idx, st_total in enumerate(store_totals_sorted):
name = st_total.get("store_name", "Unknown")
total = st_total.get("total", 0)
delta = total - cheapest_total
metric_cols[idx].metric(
label=name,
value=f"\u20ac{total:.2f}",
delta=f"+\u20ac{delta:.2f}" if delta > 0 else "Cheapest",
delta_color="inverse" if delta > 0 else "off",
)
# Metrics row
metric_cols = st.columns(len(active_sorted))
cheapest_total = float(active_sorted[0]["total"]) if active_sorted else 0
for idx, st_total in enumerate(active_sorted):
store_info = st_total.get("store", {})
name = store_info.get("name", "Unknown")
total = float(st_total.get("total", 0))
found = st_total.get("items_found", 0)
missing = st_total.get("items_missing", 0)
delta = total - cheapest_total
# Bar chart
chart_data = [
{"store_name": s["store_name"], "total": s["total"]}
for s in store_totals_sorted
]
fig = basket_comparison_bar(chart_data)
st.plotly_chart(fig, use_container_width=True)
# ---- Item breakdown per store ------------------------------------
breakdown: list[dict[str, Any]] = result.get("breakdown", [])
if breakdown:
st.divider()
st.subheader("Item Breakdown")
rows: list[dict[str, Any]] = []
for entry in breakdown:
row: dict[str, Any] = {
"Product": entry.get("product_name", "Unknown"),
"Qty": entry.get("quantity", 1),
}
prices = entry.get("prices", {})
for store_name, price in prices.items():
row[store_name] = (
f"\u20ac{price:.2f}" if price is not None else "\u2014"
metric_cols[idx].metric(
label=name,
value=f"\u20ac{total:.2f}",
delta=f"+\u20ac{delta:.2f}" if delta > 0 else "Cheapest",
delta_color="inverse" if delta > 0 else "off",
)
rows.append(row)
metric_cols[idx].caption(f"{found} found, {missing} missing")
breakdown_df = pd.DataFrame(rows)
# Highlight cheapest per row
store_cols = [
c for c in breakdown_df.columns if c not in ("Product", "Qty")
]
def _highlight_row(row: pd.Series) -> list[str]:
styles = [""] * len(row)
min_val = float("inf")
min_idx = -1
for i, col in enumerate(row.index):
if col in store_cols:
val_str = row[col]
if val_str and val_str != "\u2014":
try:
val = float(val_str.replace("\u20ac", ""))
if val < min_val:
min_val = val
min_idx = i
except ValueError:
pass
if min_idx >= 0:
styles[min_idx] = "background-color: #d4edda; font-weight: bold;"
return styles
styled = breakdown_df.style.apply(_highlight_row, axis=1)
st.dataframe(styled, use_container_width=True, hide_index=True)
# Bar chart
chart_data = [
{
"store_name": s["store"]["name"],
"total": float(s["total"]),
}
for s in active_sorted
]
fig = basket_comparison_bar(chart_data)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No store comparison data available.")

View file

@ -5,10 +5,10 @@ from __future__ import annotations
from typing import Any
import httpx
import pandas as pd
import streamlit as st
from src.core.config import settings
from src.dashboard.components.charts import battle_pie_chart
API = settings.api_base_url
@ -27,12 +27,22 @@ def _fetch_stats() -> dict[str, Any]:
@st.cache_data(ttl=120, show_spinner=False)
def _fetch_battle(category_id: int | None = None) -> dict[str, Any]:
params: dict[str, Any] = {}
if category_id is not None:
params["category_id"] = category_id
def _fetch_products(page: int = 1, limit: int = 50, search: str = "") -> dict[str, Any]:
params: dict[str, Any] = {"page": page, "limit": limit}
if search:
params["search"] = search
try:
resp = httpx.get(f"{API}/api/battle", params=params, timeout=10)
resp = httpx.get(f"{API}/api/products", params=params, timeout=10)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
return {"items": [], "total": 0}
@st.cache_data(ttl=120, show_spinner=False)
def _fetch_battle() -> dict[str, Any]:
try:
resp = httpx.get(f"{API}/api/battle", timeout=10)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
@ -42,11 +52,10 @@ def _fetch_battle(category_id: int | None = None) -> dict[str, Any]:
# ---------------------------------------------------------------------------
# Page content
# ---------------------------------------------------------------------------
st.title("\U0001f4ca Overview")
st.caption("Key performance indicators and today's highlights.")
st.title("Overview")
st.caption("Key performance indicators and product catalogue.")
stats = _fetch_stats()
battle = _fetch_battle()
if not stats:
st.error(
@ -56,7 +65,7 @@ if not stats:
st.stop()
# ---- KPI cards -----------------------------------------------------------
kpi1, kpi2, kpi3, kpi4 = st.columns(4)
kpi1, kpi2, kpi3 = st.columns(3)
kpi1.metric(
label="Products Tracked",
@ -70,88 +79,120 @@ kpi3.metric(
label="Price Records",
value=f"{stats.get('total_price_records', 0):,}",
)
kpi4.metric(
label="Last Scrape",
value=stats.get("last_scrape_time", "N/A"),
)
st.divider()
# ---- Cheapest store of the day -------------------------------------------
cheapest_store = stats.get("cheapest_store")
if cheapest_store:
st.subheader("Cheapest Store Today")
cs_col1, cs_col2 = st.columns([1, 3])
with cs_col1:
st.markdown(
f"<div style='text-align:center;padding:1rem;background:#f0f2f6;"
f"border-radius:0.5rem;'>"
f"<h2 style='margin:0;'>{cheapest_store.get('name', 'N/A')}</h2>"
f"<p style='margin:0;color:grey;'>avg. \u20ac{cheapest_store.get('avg_price', 0):.2f}</p>"
f"</div>",
unsafe_allow_html=True,
)
with cs_col2:
st.markdown(
f"Based on the average price across all tracked products today, "
f"**{cheapest_store.get('name', 'N/A')}** offers the best overall value."
# ---- Average Price by Store ----------------------------------------------
avg_by_store = stats.get("avg_prices_by_store", [])
if avg_by_store:
st.subheader("Average Price by Store")
store_cols = st.columns(len(avg_by_store))
for idx, entry in enumerate(avg_by_store):
store_info = entry.get("store", {})
store_name = store_info.get("name", "Unknown")
avg_price = entry.get("avg_price", "0")
store_cols[idx].metric(
label=store_name,
value=f"\u20ac{float(avg_price):.2f}",
)
st.divider()
# ---- Price battle pie chart + Top 5 biggest differences ------------------
left_col, right_col = st.columns(2)
# ---- Battle summary (if multiple stores) ---------------------------------
battle = _fetch_battle()
battle_results = battle.get("results", [])
stores_with_wins = [r for r in battle_results if r.get("wins", 0) > 0]
if stores_with_wins:
from src.dashboard.components.charts import battle_pie_chart
with left_col:
st.subheader("Cheapest Store Breakdown")
if battle:
wins: dict[str, int] = battle.get("wins", {})
if wins:
fig = battle_pie_chart(wins)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No battle data available yet.")
wins_dict = {r["store"]["name"]: r["wins"] for r in stores_with_wins}
col_chart, col_stats = st.columns(2)
with col_chart:
fig = battle_pie_chart(wins_dict)
st.plotly_chart(fig, use_container_width=True)
with col_stats:
for r in battle_results:
store_name = r["store"]["name"]
wins = r.get("wins", 0)
avg = r.get("avg_price", 0)
pct = r.get("cheapest_pct", 0)
if wins > 0 or float(avg) > 0:
st.markdown(
f"**{store_name}**: {wins} wins ({pct}%) "
f"| avg \u20ac{float(avg):.2f}"
)
st.divider()
# ---- Product catalogue table ---------------------------------------------
st.subheader("Product Catalogue")
# Search bar
search_query = st.text_input(
"Search products",
placeholder="e.g. milk, bread, chicken ...",
key="overview_search",
)
# Pagination
if "overview_page" not in st.session_state:
st.session_state.overview_page = 1
PAGE_SIZE = 25
data = _fetch_products(
page=st.session_state.overview_page, limit=PAGE_SIZE, search=search_query
)
items = data.get("items", [])
total = data.get("total", 0)
total_pages = max(1, (total + PAGE_SIZE - 1) // PAGE_SIZE)
if items:
rows = []
for p in items:
cat = p.get("category")
rows.append({
"ID": p.get("id"),
"Name": p.get("name", ""),
"Brand": p.get("brand") or "\u2014",
"Category": cat.get("name", "") if cat else "\u2014",
"Unit": f"{p['unit_size']} {p['unit']}" if p.get("unit_size") and p.get("unit") else "\u2014",
"Image": p.get("image_url") or "",
})
df = pd.DataFrame(rows)
# Show image column if available
has_images = any(r["Image"] for r in rows)
if has_images:
st.dataframe(
df,
use_container_width=True,
hide_index=True,
column_config={
"Image": st.column_config.ImageColumn("Image", width="small"),
"ID": st.column_config.NumberColumn("ID", width="small"),
},
height=min(len(rows) * 40 + 50, 700),
)
else:
st.info("No battle data available yet.")
display_df = df.drop(columns=["Image"])
st.dataframe(display_df, use_container_width=True, hide_index=True)
with right_col:
st.subheader("Top 5 Biggest Price Differences")
top_diffs: list[dict[str, Any]] = stats.get("top_price_differences", [])
if top_diffs:
for i, item in enumerate(top_diffs[:5], start=1):
product_name = item.get("product_name", "Unknown")
cheapest = item.get("cheapest_price", 0)
most_expensive = item.get("most_expensive_price", 0)
diff = most_expensive - cheapest
st.markdown(
f"**{i}. {product_name}** \n"
f"\u20ac{cheapest:.2f} \u2013 \u20ac{most_expensive:.2f} "
f"(diff: **\u20ac{diff:.2f}**)"
)
else:
st.info("No price difference data available yet.")
# Pagination controls
st.caption(f"Showing {len(items)} of {total} products (page {st.session_state.overview_page}/{total_pages})")
st.divider()
# ---- Recent price changes ------------------------------------------------
st.subheader("Recent Price Changes")
recent_changes: list[dict[str, Any]] = stats.get("recent_price_changes", [])
if recent_changes:
import pandas as pd
df = pd.DataFrame(recent_changes)
display_cols = [
c
for c in ["product_name", "store_name", "old_price", "new_price", "change", "date"]
if c in df.columns
]
if display_cols:
df = df[display_cols]
# Format currency columns
for col in ("old_price", "new_price", "change"):
if col in df.columns:
df[col] = df[col].apply(lambda v: f"\u20ac{v:.2f}" if v is not None else "")
st.dataframe(df, use_container_width=True, hide_index=True)
nav_cols = st.columns([1, 1, 4])
with nav_cols[0]:
if st.button("Previous", disabled=st.session_state.overview_page <= 1):
st.session_state.overview_page -= 1
st.rerun()
with nav_cols[1]:
if st.button("Next", disabled=st.session_state.overview_page >= total_pages):
st.session_state.overview_page += 1
st.rerun()
else:
st.info("No recent price changes recorded yet.")
if search_query:
st.warning("No products found for your search.")
else:
st.info("No products in the database yet. Run a scraper first!")

View file

@ -6,14 +6,21 @@ from typing import Any
import httpx
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
from src.core.config import settings
from src.dashboard.components.charts import battle_pie_chart
from src.dashboard.components.charts import STORE_COLOURS, battle_pie_chart
from src.dashboard.components.filters import category_filter
API = settings.api_base_url
POPULAR_SEARCHES = [
"milk", "bread", "chicken", "rice", "butter", "cheese",
"eggs", "pasta", "sugar", "tea", "coffee", "water",
"beef", "salmon", "yoghurt", "cereal", "oil", "flour",
]
# ---------------------------------------------------------------------------
# Data fetching
@ -31,108 +38,171 @@ def _fetch_battle(category_id: int | None = None) -> dict[str, Any]:
return {}
@st.cache_data(ttl=60, show_spinner=False)
def _search_prices(query: str) -> list[dict[str, Any]]:
if not query:
return []
try:
resp = httpx.get(
f"{API}/api/search-prices",
params={"q": query, "limit": 60},
timeout=10,
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
return []
# ---------------------------------------------------------------------------
# Page content
# ---------------------------------------------------------------------------
st.title("\u2694\ufe0f Price Battle")
st.caption("See which store offers the cheapest price for every product.")
st.title("Price Battle")
st.caption("Compare real product prices across Irish supermarkets.")
# ---- Filters -------------------------------------------------------------
with st.sidebar:
st.subheader("Filters")
selected_category = category_filter(key="battle_category")
# ---- Store Rankings (compact) --------------------------------------------
battle = _fetch_battle()
results = battle.get("results", [])
stores_with_data = [r for r in results if float(r.get("avg_price", 0)) > 0]
# ---- Fetch data ----------------------------------------------------------
battle = _fetch_battle(category_id=selected_category)
if stores_with_data:
st.subheader("Store Overview")
metric_cols = st.columns(len(stores_with_data))
for idx, r in enumerate(stores_with_data):
store_name = r["store"]["name"]
avg_price = float(r.get("avg_price", 0))
product_count = r.get("wins", 0)
metric_cols[idx].metric(
label=store_name,
value=f"\u20ac{avg_price:.2f} avg",
)
st.divider()
if not battle:
st.error(
"Unable to load battle data. Please make sure the API is running "
f"at **{API}**."
)
st.stop()
# ---- Product Price Comparison --------------------------------------------
st.subheader("Compare Products")
# ---- Summary statistics --------------------------------------------------
products: list[dict[str, Any]] = battle.get("products", [])
wins: dict[str, int] = battle.get("wins", {})
store_names: list[str] = battle.get("stores", [])
# Popular search buttons
st.caption("Popular searches:")
button_cols = st.columns(9)
for idx, term in enumerate(POPULAR_SEARCHES[:9]):
with button_cols[idx]:
if st.button(term.capitalize(), key=f"pop_{term}", use_container_width=True):
st.session_state.battle_search_input = term
st.rerun()
if not products:
st.info("No products found for the selected category.")
st.stop()
# Second row of popular searches
button_cols2 = st.columns(9)
for idx, term in enumerate(POPULAR_SEARCHES[9:18]):
with button_cols2[idx]:
if st.button(term.capitalize(), key=f"pop_{term}", use_container_width=True):
st.session_state.battle_search_input = term
st.rerun()
st.subheader("Summary")
summary_cols = st.columns(len(wins) if wins else 1)
for idx, (store, count) in enumerate(sorted(wins.items(), key=lambda x: -x[1])):
summary_cols[idx % len(summary_cols)].metric(
label=store,
value=f"{count} wins",
)
# Search input
actual_query = st.text_input(
"Search for a product to compare prices",
placeholder="e.g. milk, bread, chicken ...",
key="battle_search_input",
)
st.divider()
if actual_query:
results_data = _search_prices(actual_query)
# ---- Pie chart + table side by side --------------------------------------
chart_col, table_col = st.columns([1, 2])
if not results_data:
st.warning(f"No products found for '{actual_query}'.")
else:
# Build comparison table
rows = []
for item in results_data:
price = item["price"]
promo = item.get("promo_price")
effective = item["effective_price"]
with chart_col:
if wins:
fig = battle_pie_chart(wins)
row = {
"Store": item["store"],
"Product": item["product_name"],
"Price": price,
"Effective": effective,
"Promo": item.get("promo_label") or "",
}
rows.append(row)
df = pd.DataFrame(rows)
# Sort by effective price
df = df.sort_values("Effective")
# Show count per store
store_counts = df["Store"].value_counts()
st.caption(
f"Found {len(df)} products matching '{actual_query}': "
+ ", ".join(f"{store} ({count})" for store, count in store_counts.items())
)
# Format for display
display_df = df.copy()
display_df["Price"] = display_df["Price"].apply(lambda p: f"\u20ac{p:.2f}")
display_df["Effective"] = display_df["Effective"].apply(lambda p: f"\u20ac{p:.2f}")
# Color-code by store
def _style_store(row: pd.Series) -> list[str]:
store = row.get("Store", "")
color = STORE_COLOURS.get(store, "")
# Match partial store names
for key, val in STORE_COLOURS.items():
if key.lower() in store.lower():
color = val
break
if color:
return [f"border-left: 4px solid {color}"] + [""] * (len(row) - 1)
return [""] * len(row)
styled = display_df.style.apply(_style_store, axis=1)
st.dataframe(
styled,
use_container_width=True,
hide_index=True,
height=min(len(display_df) * 38 + 50, 600),
)
# Average price chart per store for this search
st.subheader(f"Average price for '{actual_query}' by store")
avg_by_store = df.groupby("Store")["Effective"].mean().sort_values()
colors = []
for store in avg_by_store.index:
color = "#888888"
for key, val in STORE_COLOURS.items():
if key.lower() in store.lower():
color = val
break
colors.append(color)
fig = go.Figure(
go.Bar(
x=avg_by_store.index,
y=avg_by_store.values,
marker_color=colors,
text=[f"\u20ac{v:.2f}" for v in avg_by_store.values],
textposition="outside",
)
)
fig.update_layout(
yaxis_title="Average Price (\u20ac)",
yaxis_tickprefix="\u20ac",
margin=dict(l=40, r=20, t=20, b=40),
template="plotly_white",
height=350,
)
st.plotly_chart(fig, use_container_width=True)
with table_col:
st.subheader("Product Comparison Table")
# Build a DataFrame: Product | Store1 | Store2 | ... | Cheapest
rows: list[dict[str, Any]] = []
for prod in products:
row: dict[str, Any] = {"Product": prod.get("product_name", "Unknown")}
prices: dict[str, float | None] = prod.get("prices", {})
valid_prices: dict[str, float] = {}
for store in store_names:
price = prices.get(store)
row[store] = f"\u20ac{price:.2f}" if price is not None else "\u2014"
if price is not None:
valid_prices[store] = price
if valid_prices:
cheapest_store = min(valid_prices, key=valid_prices.get) # type: ignore[arg-type]
row["Cheapest"] = cheapest_store
else:
row["Cheapest"] = "\u2014"
rows.append(row)
df = pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Highlight the cheapest price cell per row in green
# ---------------------------------------------------------------------------
def _highlight_cheapest(row: pd.Series) -> list[str]:
"""Return a list of CSS styles, highlighting the cheapest store cell."""
styles = [""] * len(row)
cheapest = row.get("Cheapest", "\u2014")
if cheapest == "\u2014":
return styles
for i, col in enumerate(row.index):
if col == cheapest:
styles[i] = "background-color: #d4edda; font-weight: bold;"
return styles
styled = df.style.apply(_highlight_cheapest, axis=1)
st.dataframe(styled, use_container_width=True, hide_index=True, height=500)
st.divider()
# ---- Detailed stats -------------------------------------------------------
st.subheader("Detailed Statistics")
if wins:
total_products = len(products)
stats_rows = []
for store, count in sorted(wins.items(), key=lambda x: -x[1]):
pct = (count / total_products * 100) if total_products else 0
stats_rows.append(
{"Store": store, "Wins": count, "Win %": f"{pct:.1f}%"}
)
st.dataframe(
pd.DataFrame(stats_rows),
use_container_width=True,
hide_index=True,
)
# Cheapest finds
st.subheader("Best Deals")
cheapest = df.nsmallest(5, "Effective")
for _, row in cheapest.iterrows():
promo_text = f" ({row['Promo']})" if row["Promo"] else ""
st.markdown(
f"**\u20ac{row['Effective']:.2f}** - {row['Product']} @ {row['Store']}{promo_text}"
)
else:
st.info("Search for a product above or click a popular category to compare prices across stores.")

View file

@ -7,10 +7,11 @@ from typing import Any
import httpx
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
from src.core.config import settings
from src.dashboard.components.charts import price_history_chart, store_comparison_bar
from src.dashboard.components.charts import STORE_COLOURS, price_history_chart, store_comparison_bar
from src.dashboard.components.filters import date_range_filter, search_filter
API = settings.api_base_url
@ -31,7 +32,6 @@ def _search_products(query: str) -> list[dict[str, Any]]:
)
resp.raise_for_status()
payload = resp.json()
# Support both a bare list and a paginated wrapper ({items: [...]})
if isinstance(payload, list):
return payload
return payload.get("items", payload.get("results", []))
@ -40,7 +40,7 @@ def _search_products(query: str) -> list[dict[str, Any]]:
@st.cache_data(ttl=60, show_spinner=False)
def _fetch_price_history(product_id: int, days: int = 30) -> list[dict[str, Any]]:
def _fetch_price_history(product_id: int, days: int = 90) -> list[dict[str, Any]]:
try:
resp = httpx.get(
f"{API}/api/products/{product_id}/prices",
@ -54,11 +54,27 @@ def _fetch_price_history(product_id: int, days: int = 30) -> list[dict[str, Any]
@st.cache_data(ttl=60, show_spinner=False)
def _fetch_comparison(product_id: int) -> list[dict[str, Any]]:
def _fetch_comparison(product_id: int) -> dict[str, Any]:
try:
resp = httpx.get(f"{API}/api/products/{product_id}/compare", timeout=10)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
return {}
@st.cache_data(ttl=60, show_spinner=False)
def _search_prices(query: str) -> list[dict[str, Any]]:
if not query:
return []
try:
resp = httpx.get(
f"{API}/api/search-prices",
params={"q": query, "limit": 100},
timeout=10,
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
return []
@ -66,13 +82,13 @@ def _fetch_comparison(product_id: int) -> list[dict[str, Any]]:
# ---------------------------------------------------------------------------
# Page content
# ---------------------------------------------------------------------------
st.title("\U0001f4c8 Product History")
st.title("Product History")
st.caption("Search for a product and explore its price history across stores.")
# ---- Sidebar filters ------------------------------------------------------
with st.sidebar:
st.subheader("Filters")
start_date, end_date = date_range_filter(key="history_date")
start_date, end_date = date_range_filter(key="history_date", default_days=90)
# ---- Search & select product ---------------------------------------------
query = search_filter(key="product_history_search")
@ -102,26 +118,33 @@ product_id: int = product_options[selected_name]
# ---- Calculate days from date range --------------------------------------
days = (end_date - start_date).days
if days < 1:
days = 30
days = 90
# ---- Price history chart --------------------------------------------------
# ---- Price history time series chart -------------------------------------
st.subheader("Price History")
history = _fetch_price_history(product_id, days=days)
if history:
# Filter data to requested date range
filtered: list[dict[str, Any]] = []
# The API returns list of {store: {...}, prices: [{price, promo_price, scraped_at, ...}]}
chart_data: list[dict[str, Any]] = []
for entry in history:
entry_date = entry.get("date", "")
try:
d = datetime.date.fromisoformat(entry_date[:10])
except (ValueError, TypeError):
filtered.append(entry)
continue
if start_date <= d <= end_date:
filtered.append(entry)
store_info = entry.get("store", {})
store_name = store_info.get("name", "Unknown")
prices = entry.get("prices", [])
for pr in prices:
scraped_at = pr.get("scraped_at", "")
price = float(pr.get("price", 0))
promo = pr.get("promo_price")
effective = float(promo) if promo else price
chart_data.append({
"date": scraped_at,
"price": effective,
"store_name": store_name,
"is_promo": pr.get("promo_label") is not None,
})
if filtered:
fig = price_history_chart(filtered)
if chart_data:
fig = price_history_chart(chart_data)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No price data in the selected date range.")
@ -130,42 +153,71 @@ else:
st.divider()
# ---- Current prices table -------------------------------------------------
st.subheader("Current Prices")
# ---- Current prices across stores ----------------------------------------
st.subheader("Current Prices Across Stores")
comparison = _fetch_comparison(product_id)
if comparison:
comp_df = pd.DataFrame(comparison)
display_cols = [
c
for c in ["store_name", "price", "is_promo", "last_updated"]
if c in comp_df.columns
]
if display_cols:
comp_df = comp_df[display_cols]
stores_list = comparison.get("stores", [])
if stores_list:
rows = []
bar_data = []
for sp in stores_list:
store_info = sp.get("store", {})
store_name = store_info.get("name", "Unknown")
price = sp.get("latest_price")
promo_price = sp.get("promo_price")
promo_label = sp.get("promo_label")
# Format
if "price" in comp_df.columns:
comp_df["price"] = comp_df["price"].apply(
lambda v: f"\u20ac{v:.2f}" if v is not None else "\u2014"
)
if "is_promo" in comp_df.columns:
comp_df["is_promo"] = comp_df["is_promo"].apply(
lambda v: "Yes" if v else "No"
)
effective_price = promo_price if promo_price is not None else price
comp_df.columns = [c.replace("_", " ").title() for c in comp_df.columns]
st.dataframe(comp_df, use_container_width=True, hide_index=True)
row = {
"Store": store_name,
"Price": f"\u20ac{float(price):.2f}" if price is not None else "\u2014",
"Promo": promo_label or "\u2014",
}
if promo_price is not None:
row["Promo Price"] = f"\u20ac{float(promo_price):.2f}"
rows.append(row)
# Also show a bar comparison chart
raw_comparison = _fetch_comparison(product_id)
bar_data = [
{"store_name": r["store_name"], "price": r["price"]}
for r in raw_comparison
if r.get("price") is not None
]
if bar_data:
fig2 = store_comparison_bar(bar_data)
st.plotly_chart(fig2, use_container_width=True)
if effective_price is not None:
bar_data.append({
"store_name": store_name,
"price": float(effective_price),
})
df = pd.DataFrame(rows)
st.dataframe(df, use_container_width=True, hide_index=True)
if bar_data:
fig2 = store_comparison_bar(bar_data)
st.plotly_chart(fig2, use_container_width=True)
else:
st.info("This product is not available in any store currently.")
else:
st.info("No comparison data available for this product.")
st.divider()
# ---- Similar products across stores (using search) -----------------------
st.subheader("Similar Products Across Stores")
st.caption(f"Other products matching '{query}' across all stores.")
similar = _search_prices(query) if query else []
if similar:
sim_rows = []
for item in similar:
price = item["price"]
effective = item["effective_price"]
sim_rows.append({
"Store": item["store"],
"Product": item["product_name"],
"Price": f"\u20ac{price:.2f}",
"Effective": f"\u20ac{effective:.2f}",
"Promo": item.get("promo_label") or "",
})
sim_df = pd.DataFrame(sim_rows).sort_values("Effective")
st.dataframe(sim_df, use_container_width=True, hide_index=True, height=min(len(sim_df) * 38 + 50, 400))
else:
if query:
st.info("No similar products found across stores.")

View file

@ -10,11 +10,12 @@ from __future__ import annotations
import asyncio
import logging
import re
import sys
from decimal import Decimal, InvalidOperation
import httpx
from bs4 import BeautifulSoup
from playwright.async_api import Page
from playwright.async_api import Page, Response
from src.scrapers.base import (
BaseScraper,
@ -46,7 +47,7 @@ CATEGORY_PATHS = [
]
# Special offers page (rendered with JS, needs Playwright)
SPECIAL_OFFERS_URL = f"{BASE_URL}/special-offers"
SPECIAL_OFFERS_URL = f"{BASE_URL}/specials"
class AldiScraper(BaseScraper):
@ -69,7 +70,7 @@ class AldiScraper(BaseScraper):
# ------------------------------------------------------------------
async def scrape_category(self, category_url: str) -> list[RawProduct]:
# Special offers page needs Playwright
if "special-offers" in category_url:
if "/specials" in category_url:
return await self._scrape_special_offers(category_url)
# Standard category pages — try httpx first
@ -254,6 +255,128 @@ class AldiScraper(BaseScraper):
return products
# ------------------------------------------------------------------
# SAP Commerce OCC API interception
# ------------------------------------------------------------------
async def _intercept_api(self, page: Page, url: str) -> list[dict]:
"""Load a page while intercepting SAP Commerce OCC API responses."""
api_products: list[dict] = []
async def handle_response(response: Response) -> None:
resp_url = response.url
if "/occ/" in resp_url or "/rest/" in resp_url:
try:
content_type = response.headers.get("content-type", "")
if "application/json" not in content_type:
return
data = await response.json()
if isinstance(data, dict):
products = data.get("products", [])
if isinstance(products, list) and products:
api_products.extend(products)
except Exception:
pass
page.on("response", handle_response)
await page.goto(url, wait_until="networkidle", timeout=60_000)
return api_products
def _parse_occ_product(self, item: dict) -> RawProduct | None:
"""Parse a product from SAP Commerce OCC API response."""
code = item.get("code", "")
name = item.get("name", "")
if not code or not name:
return None
price_data = item.get("price", {})
price_val = price_data.get("value")
if price_val is None:
return None
try:
price = Decimal(str(price_val))
except (InvalidOperation, TypeError, ValueError):
return None
if price == 0:
return None
# Promo / was-price
promo_price = None
promo_label = None
was_price_data = item.get("wasPrice", {})
if was_price_data and was_price_data.get("value") is not None:
try:
promo_price = price # current price is the promo
price = Decimal(str(was_price_data["value"]))
promo_label = item.get("promotionText") or "Special Offer"
except (InvalidOperation, TypeError, ValueError):
promo_price = None
promo_label = None
# Unit price
unit_price = None
unit = None
unit_price_data = item.get("basePrice") or item.get("unitPrice")
if isinstance(unit_price_data, dict):
try:
unit_price = Decimal(str(unit_price_data.get("value", "")))
except (InvalidOperation, TypeError, ValueError):
pass
unit = unit_price_data.get("unit", unit_price_data.get("currencyIso"))
# Unit size from name
unit_size = None
size_match = re.search(
r"(\d+(?:\.\d+)?)\s*(ml|l|g|kg|cl|pk|pack)\b", name, re.IGNORECASE
)
if size_match:
try:
unit_size = Decimal(size_match.group(1))
unit = unit or size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# Image
image_url = None
images = item.get("images", [])
if isinstance(images, list) and images:
for img in images:
if isinstance(img, dict) and img.get("url"):
image_url = img["url"]
if image_url.startswith("//"):
image_url = f"https:{image_url}"
elif image_url.startswith("/"):
image_url = f"{BASE_URL}{image_url}"
break
# Product URL
product_url = item.get("url", "")
if product_url and not product_url.startswith("http"):
product_url = f"{BASE_URL}{product_url}"
# Brand
brand = None
brand_data = item.get("brand")
if isinstance(brand_data, dict):
brand = brand_data.get("name")
elif isinstance(brand_data, str):
brand = brand_data
return RawProduct(
store_sku=str(code),
name=name.strip(),
price=price,
promo_price=promo_price,
promo_label=promo_label,
unit_price=unit_price,
unit=unit,
unit_size=unit_size,
brand=brand,
image_url=image_url or None,
product_url=product_url or None,
)
# ------------------------------------------------------------------
# Playwright-based scraping (fallback for standard pages)
# ------------------------------------------------------------------
@ -265,15 +388,31 @@ class AldiScraper(BaseScraper):
try:
page = await context.new_page()
logger.info("[aldi] Playwright loading %s", category_url)
await page.goto(category_url, wait_until="domcontentloaded", timeout=60_000)
# Try to intercept OCC API responses while loading the page
api_products = await self._intercept_api(page, category_url)
await asyncio.sleep(3)
await self._dismiss_overlays(page)
await self._scroll_page(page)
html = await page.content()
soup = BeautifulSoup(html, "html.parser")
products = self._parse_html(soup, category_url)
# Parse products from intercepted API data first
if api_products:
logger.info("[aldi] Intercepted %d OCC API products", len(api_products))
for item in api_products:
try:
product = self._parse_occ_product(item)
if product:
products.append(product)
except Exception:
logger.debug("[aldi] Failed to parse OCC product", exc_info=True)
# Fall back to DOM scraping if API interception yielded nothing
if not products:
logger.info("[aldi] Falling back to DOM scraping for %s", category_url)
html = await page.content()
soup = BeautifulSoup(html, "html.parser")
products = self._parse_html(soup, category_url)
finally:
await context.close()
@ -286,69 +425,87 @@ class AldiScraper(BaseScraper):
# Special offers scraping (always Playwright)
# ------------------------------------------------------------------
async def _scrape_special_offers(self, url: str) -> list[RawProduct]:
"""Scrape the Aldi special-offers page (JS-rendered)."""
"""Scrape the Aldi specials page (JS-rendered)."""
products: list[RawProduct] = []
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
logger.info("[aldi] Loading special offers %s", url)
await page.goto(url, wait_until="domcontentloaded", timeout=60_000)
# Try to intercept OCC API responses while loading the page
api_products = await self._intercept_api(page, url)
await asyncio.sleep(3)
await self._dismiss_overlays(page)
await self._scroll_page(page, scrolls=8)
# Special offer tiles
tiles = page.locator(
"div[class*='SpecialBuy'], "
"div[class*='product-tile'], "
"div[data-qa='special-buy-tile'], "
"article[class*='product']"
)
count = await tiles.count()
logger.info("[aldi] Found %d special offer tiles", count)
# Parse products from intercepted API data first
if api_products:
logger.info("[aldi] Intercepted %d OCC API special offer products", len(api_products))
for item in api_products:
try:
product = self._parse_occ_product(item)
if product:
# Override promo label for special offers
product.promo_label = product.promo_label or "Special Offer"
products.append(product)
except Exception:
logger.debug("[aldi] Failed to parse OCC special offer product", exc_info=True)
for i in range(count):
try:
tile = tiles.nth(i)
# Fall back to DOM scraping if API interception yielded nothing
if not products:
logger.info("[aldi] Falling back to DOM scraping for specials")
# Special offer tiles
tiles = page.locator(
"div[class*='SpecialBuy'], "
"div[class*='product-tile'], "
"div[data-qa='special-buy-tile'], "
"article[class*='product']"
)
count = await tiles.count()
logger.info("[aldi] Found %d special offer tiles", count)
name_el = tile.locator("h4, h3, a[class*='Title'], p[class*='title']")
name = ""
if await name_el.count() > 0:
name = (await name_el.first.inner_text()).strip()
if not name:
continue
for i in range(count):
try:
tile = tiles.nth(i)
price_el = tile.locator("span[class*='price'], span[class*='Price']")
price_text = ""
if await price_el.count() > 0:
price_text = await price_el.first.inner_text()
price = self._parse_price(price_text)
if price is None or price == 0:
continue
name_el = tile.locator("h4, h3, a[class*='Title'], p[class*='title']")
name = ""
if await name_el.count() > 0:
name = (await name_el.first.inner_text()).strip()
if not name:
continue
sku = f"aldi-offer-{hash(name) % 1000000}"
price_el = tile.locator("span[class*='price'], span[class*='Price']")
price_text = ""
if await price_el.count() > 0:
price_text = await price_el.first.inner_text()
price = self._parse_price(price_text)
if price is None or price == 0:
continue
# Image
image_url = None
img_el = tile.locator("img")
if await img_el.count() > 0:
image_url = await img_el.first.get_attribute("src")
if image_url and not image_url.startswith("http"):
image_url = f"{BASE_URL}{image_url}"
sku = f"aldi-offer-{hash(name) % 1000000}"
products.append(
RawProduct(
store_sku=sku,
name=name,
price=price,
promo_label="Special Offer",
image_url=image_url,
# Image
image_url = None
img_el = tile.locator("img")
if await img_el.count() > 0:
image_url = await img_el.first.get_attribute("src")
if image_url and not image_url.startswith("http"):
image_url = f"{BASE_URL}{image_url}"
products.append(
RawProduct(
store_sku=sku,
name=name,
price=price,
promo_label="Special Offer",
image_url=image_url,
)
)
)
except Exception:
logger.debug("[aldi] Failed to parse special offer tile %d", i, exc_info=True)
except Exception:
logger.debug("[aldi] Failed to parse special offer tile %d", i, exc_info=True)
finally:
await context.close()
@ -404,14 +561,37 @@ async def main() -> None:
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s - %(message)s",
)
scraper = AldiScraper()
result = await scraper.run()
print(f"\nDone: {result.status}")
print(f"Products scraped: {len(result.products)}")
if result.errors:
print(f"Errors ({len(result.errors)}):")
for err in result.errors:
print(f" - {err}")
dry_run = "--dry-run" in sys.argv
if dry_run:
# Dry-run mode: scrape categories and print products without hitting the DB
scraper = AldiScraper()
category_urls = await scraper.get_category_urls()
all_products: list[RawProduct] = []
for url in category_urls:
try:
products = await scraper.scrape_category(url)
all_products.extend(products)
print(f"[dry-run] {url} -> {len(products)} products")
except Exception as exc:
print(f"[dry-run] {url} -> ERROR: {exc}")
await random_delay(1.0, 3.0)
print(f"\n[dry-run] Total products scraped: {len(all_products)}")
for p in all_products[:20]:
print(f" {p.store_sku:>12s} {str(p.price):>8s} {p.name}")
if len(all_products) > 20:
print(f" ... and {len(all_products) - 20} more")
else:
scraper = AldiScraper()
result = await scraper.run()
print(f"\nDone: {result.status}")
print(f"Products scraped: {len(result.products)}")
if result.errors:
print(f"Errors ({len(result.errors)}):")
for err in result.errors:
print(f" - {err}")
if __name__ == "__main__":

View file

@ -12,6 +12,7 @@ from datetime import datetime
from decimal import Decimal
from playwright.async_api import async_playwright, BrowserContext
from playwright_stealth import Stealth
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@ -326,10 +327,16 @@ class BaseScraper(ABC):
@staticmethod
async def _get_browser_context(
headless: bool = True,
block_resources: bool = True,
**extra_context_kwargs,
) -> tuple:
"""Create and return ``(playwright, browser, context)``.
Args:
headless: Run in headless mode.
block_resources: Block images/fonts to speed up scraping.
Disable for sites with strict WAF (e.g. Tesco/Akamai).
Caller is responsible for closing them via::
await context.close()
@ -337,7 +344,15 @@ class BaseScraper(ABC):
await pw.stop()
"""
pw = await async_playwright().start()
browser = await pw.chromium.launch(headless=headless)
# Apply stealth patches to bypass bot detection (Akamai, etc.)
stealth = Stealth(navigator_platform_override="MacIntel")
stealth.hook_playwright_context(pw)
browser = await pw.chromium.launch(
headless=headless,
args=["--disable-blink-features=AutomationControlled"],
)
context = await browser.new_context(
user_agent=random_user_agent(),
viewport={"width": 1366, "height": 768},
@ -345,9 +360,9 @@ class BaseScraper(ABC):
timezone_id="Europe/Dublin",
**extra_context_kwargs,
)
# Block unnecessary resources to speed up scraping
await context.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,eot}",
lambda route: route.abort(),
)
if block_resources:
await context.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,eot}",
lambda route: route.abort(),
)
return pw, browser, context

View file

@ -1,8 +1,11 @@
"""Scraper for Dunnes Stores (dunnesstores.com).
"""Scraper for Dunnes Stores Grocery (dunnesstoresgrocery.com).
Dunnes has a JavaScript-heavy storefront with anti-bot protections.
We use Playwright exclusively, with user-agent rotation, random delays,
and careful DOM extraction.
IMPORTANT: The grocery site is at www.dunnesstoresgrocery.com (NOT dunnesstores.com).
Category URLs use the format /categories/{slug}-id-{numeric_id}.
"""
from __future__ import annotations
@ -22,23 +25,15 @@ from src.scrapers.base import (
logger = logging.getLogger(__name__)
BASE_URL = "https://www.dunnesstores.com"
BASE_URL = "https://www.dunnesstoresgrocery.com"
# Top-level food / grocery categories on Dunnes Stores
# Confirmed category paths on dunnesstoresgrocery.com
# Format: /categories/{slug}-id-{id}
# We keep a small seed list of confirmed categories; the rest are
# discovered dynamically from the site navigation.
CATEGORY_PATHS = [
"/c/food/fruit-and-vegetables",
"/c/food/dairy",
"/c/food/meat-poultry-and-fish",
"/c/food/bakery",
"/c/food/frozen",
"/c/food/drinks",
"/c/food/snacks-and-confectionery",
"/c/food/cupboard-essentials",
"/c/food/baby-and-toddler",
"/c/food/household",
"/c/food/health-and-beauty",
"/c/food/deli-and-prepared-food",
"/c/food/world-foods",
"/categories/fresh-meat-poultry-id-47181",
"/categories/bakery-id-47171",
]
@ -49,8 +44,89 @@ class DunnesScraper(BaseScraper):
# Category URLs
# ------------------------------------------------------------------
async def get_category_urls(self) -> list[str]:
"""Return category URLs, preferring dynamic discovery.
Falls back to the static seed list if discovery finds nothing.
"""
discovered = await self._discover_categories()
if discovered:
logger.info("[dunnes] Discovered %d category URLs from navigation", len(discovered))
return discovered
logger.warning("[dunnes] Category discovery found nothing; using static seed list")
return [f"{BASE_URL}{path}" for path in CATEGORY_PATHS]
async def _discover_categories(self) -> list[str]:
"""Discover category URLs from the site navigation."""
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
logger.info("[dunnes] Discovering categories from %s", BASE_URL)
await page.goto(BASE_URL, wait_until="domcontentloaded", timeout=60_000)
await asyncio.sleep(3)
await self._dismiss_overlays(page)
links = await page.evaluate('''() => {
return [...document.querySelectorAll('a[href*="/categories/"]')]
.map(a => a.href)
.filter(href => {
// Only keep top-level categories: /categories/{slug}-id-{id}
// Skip deep subcategories: /categories/{parent}/{child}-id-{id}
try {
const path = new URL(href).pathname;
const parts = path.split('/').filter(Boolean);
return parts.length === 2
&& parts[0] === 'categories'
&& parts[1].includes('-id-');
} catch(e) { return false; }
});
}''')
unique = list(set(links))
# If homepage didn't yield enough, also try interacting with nav menus
if len(unique) < 5:
logger.debug("[dunnes] Few links found, attempting to expand nav menus")
nav_triggers = page.locator(
"button[class*='nav'], "
"a[class*='nav'], "
"button[aria-expanded='false'], "
"li[class*='menu'] > a"
)
trigger_count = await nav_triggers.count()
for idx in range(min(trigger_count, 10)):
try:
trigger = nav_triggers.nth(idx)
if await trigger.is_visible():
await trigger.click()
await asyncio.sleep(0.5)
except Exception:
pass
more_links = await page.evaluate('''() => {
return [...document.querySelectorAll('a[href*="/categories/"]')]
.map(a => a.href)
.filter(href => {
try {
const path = new URL(href).pathname;
const parts = path.split('/').filter(Boolean);
return parts.length === 2
&& parts[0] === 'categories'
&& parts[1].includes('-id-');
} catch(e) { return false; }
});
}''')
unique = list(set(unique + more_links))
return unique
except Exception:
logger.warning("[dunnes] Category discovery failed", exc_info=True)
return []
finally:
await context.close()
await browser.close()
await pw.stop()
# ------------------------------------------------------------------
# Scrape one category page (with pagination)
# ------------------------------------------------------------------
@ -110,19 +186,47 @@ class DunnesScraper(BaseScraper):
# DOM extraction
# ------------------------------------------------------------------
async def _extract_products(self, page: Page, category_url: str) -> list[RawProduct]:
"""Extract product data from the currently loaded DOM."""
"""Extract product data from the currently loaded DOM.
Uses a two-pass approach:
1. Try extracting structured data from the page's JS state (dataLayer,
__NEXT_DATA__, or similar embedded JSON).
2. Fall back to broad CSS-selector scraping of product tiles.
"""
# --- Pass 1: try to pull data from JS state ---
js_products = await self._extract_from_js_state(page, category_url)
if js_products:
logger.info("[dunnes] Extracted %d products from JS state", len(js_products))
return js_products
# --- Pass 2: DOM selector scraping ---
products: list[RawProduct] = []
# Dunnes uses product cards / tiles in their listing pages
# dunnesstoresgrocery.com may use different class names;
# cast a wide net with multiple selector patterns
tiles = page.locator(
"div[data-ref='productListItem'], "
"div[class*='ProductCard'], "
"li[class*='ProductCard'], "
"article[class*='product-card'], "
"div[class*='product-list-item']"
"div[class*='product-list-item'], "
"div[class*='product-tile'], "
"div[class*='productTile'], "
"a[class*='product-card'], "
"div[data-product-id]"
)
count = await tiles.count()
if count == 0:
# Broader fallback: look for any repeated card-like structure
logger.debug("[dunnes] Primary selectors found 0 tiles; trying broader selectors")
tiles = page.locator(
"[class*='product'] a[href*='/'], "
"[class*='card'][class*='product'], "
"[class*='item'][data-product-id]"
)
count = await tiles.count()
for i in range(count):
try:
tile = tiles.nth(i)
@ -133,8 +237,10 @@ class DunnesScraper(BaseScraper):
"a[class*='product-card__title'], "
"a[data-ref='productCardTitle'], "
"p[class*='ProductCard__title'], "
"h3 a, "
"a[class*='Title']"
"h3 a, h2 a, h3, h2, "
"a[class*='Title'], "
"span[class*='title'], "
"p[class*='title']"
)
name = ""
href = ""
@ -156,9 +262,14 @@ class DunnesScraper(BaseScraper):
sku = ""
data_id = await tile.get_attribute("data-product-id") or ""
data_sku = await tile.get_attribute("data-sku") or ""
sku = data_id or data_sku
data_ref = await tile.get_attribute("data-ref") or ""
sku = data_id or data_sku or data_ref
if not sku and href:
sku_match = re.search(r"/p/(\d+)", href) or re.search(r"/(\d+)(?:\?|$)", href)
sku_match = (
re.search(r"/p/(\d+)", href)
or re.search(r"-id-(\d+)", href)
or re.search(r"/(\d+)(?:\?|$)", href)
)
sku = sku_match.group(1) if sku_match else ""
if not sku:
sku = f"dunnes-{hash(name) % 1000000}"
@ -169,7 +280,9 @@ class DunnesScraper(BaseScraper):
"span[class*='ProductCard__price'], "
"span[data-ref='productCardPrice'], "
"span[class*='price-value'], "
"span.price"
"span[class*='price'], "
"span.price, "
"div[class*='price']"
)
price_text = ""
if await price_el.count() > 0:
@ -187,7 +300,9 @@ class DunnesScraper(BaseScraper):
"span[class*='price-was'], "
"span[class*='offer'], "
"div[class*='PromoBadge'], "
"span[data-ref='productCardPromo']"
"span[data-ref='productCardPromo'], "
"del, s, "
"span[class*='was']"
)
if await promo_el.count() > 0:
promo_label = (await promo_el.first.inner_text()).strip() or None
@ -210,6 +325,8 @@ class DunnesScraper(BaseScraper):
)
if image_url and image_url.startswith("//"):
image_url = f"https:{image_url}"
elif image_url and image_url.startswith("/"):
image_url = f"{BASE_URL}{image_url}"
# --- Unit price ---
unit_price = None
@ -217,7 +334,8 @@ class DunnesScraper(BaseScraper):
unit_el = tile.locator(
"span[class*='UnitPrice'], "
"span[class*='unit-price'], "
"span[data-ref='productCardUnitPrice']"
"span[data-ref='productCardUnitPrice'], "
"span[class*='per-unit']"
)
if await unit_el.count() > 0:
unit_text = await unit_el.first.inner_text()
@ -259,6 +377,121 @@ class DunnesScraper(BaseScraper):
return products
async def _extract_from_js_state(
self, page: Page, category_url: str
) -> list[RawProduct]:
"""Try to extract product data from embedded JS state on the page.
Many modern grocery sites embed product data in __NEXT_DATA__,
dataLayer, or similar global JS objects. This is more reliable
than scraping CSS selectors when it works.
"""
try:
js_data = await page.evaluate('''() => {
// Attempt 1: __NEXT_DATA__ (Next.js)
if (window.__NEXT_DATA__) {
try {
const props = window.__NEXT_DATA__.props;
if (props && props.pageProps && props.pageProps.products) {
return { source: 'next', items: props.pageProps.products };
}
if (props && props.pageProps && props.pageProps.category
&& props.pageProps.category.products) {
return { source: 'next', items: props.pageProps.category.products };
}
// Recurse one level into pageProps looking for product arrays
if (props && props.pageProps) {
for (const [key, val] of Object.entries(props.pageProps)) {
if (Array.isArray(val) && val.length > 0 && val[0].name) {
return { source: 'next', items: val };
}
}
}
} catch (e) {}
}
// Attempt 2: dataLayer product impressions
if (window.dataLayer) {
for (const entry of window.dataLayer) {
if (entry.ecommerce && entry.ecommerce.impressions) {
return { source: 'dl', items: entry.ecommerce.impressions };
}
}
}
// Attempt 3: look for JSON-LD structured data
const scripts = document.querySelectorAll('script[type="application/ld+json"]');
for (const s of scripts) {
try {
const d = JSON.parse(s.textContent);
if (d['@type'] === 'ItemList' && d.itemListElement) {
return { source: 'ld', items: d.itemListElement };
}
} catch (e) {}
}
return null;
}''')
if not js_data or not js_data.get("items"):
return []
products: list[RawProduct] = []
source = js_data.get("source", "unknown")
logger.debug("[dunnes] Found JS product data via %s", source)
for item in js_data["items"]:
try:
name = str(item.get("name") or item.get("title") or "").strip()
if not name:
continue
price_raw = item.get("price") or item.get("current_price") or 0
price = self._parse_price(str(price_raw))
if price is None or price == 0:
continue
sku = str(
item.get("id")
or item.get("sku")
or item.get("product_id")
or f"dunnes-{hash(name) % 1000000}"
)
brand = item.get("brand") or None
image_url = item.get("image") or item.get("image_url") or None
product_url = item.get("url") or item.get("link") or None
if product_url and not product_url.startswith("http"):
product_url = f"{BASE_URL}{product_url}"
# Promo handling
promo_price = None
promo_label = None
original_price = item.get("original_price") or item.get("was_price")
if original_price:
op = self._parse_price(str(original_price))
if op and op > price:
promo_price = price
price = op
products.append(
RawProduct(
store_sku=sku,
name=name,
price=price,
promo_price=promo_price,
promo_label=promo_label,
brand=brand,
image_url=image_url,
product_url=product_url,
)
)
except Exception:
logger.debug("[dunnes] Failed to parse JS product item", exc_info=True)
return products
except Exception:
logger.debug("[dunnes] JS state extraction failed", exc_info=True)
return []
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

View file

@ -1,13 +1,24 @@
"""Scraper for Lidl Ireland (lidl.ie).
Similar to Aldi, Lidl has a relatively static product catalogue that we can
scrape with httpx + BeautifulSoup. Weekly special offers are rendered with
JavaScript, so we fall back to Playwright for those pages.
Lidl Ireland uses a Nuxt/Vue-based front-end. Product data is embedded in
server-rendered HTML as JSON inside ``data-grid-data`` attributes on
``div.AProductGridbox__GridTilePlaceholder`` elements.
There are two flavours of category page:
* **Campaign / offer pages** (``/c/{slug}/a{id}``) -- these include product
tiles in the initial SSR HTML and work with plain httpx.
* **Static range pages** (``/c/{slug}/s{id}``) -- these are fully
client-rendered by JavaScript (Nuxt hydration) and return *no* product
tiles with httpx. They require Playwright to render the JS first.
The grocery landing page at ``/grocery-range`` contains links to both types.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from decimal import Decimal, InvalidOperation
@ -28,28 +39,19 @@ logger = logging.getLogger(__name__)
BASE_URL = "https://www.lidl.ie"
# Lidl Ireland product category paths
# Category URL format: /c/{slug}/{type}{id}
# - 'a' prefix = campaign / offers page (SSR, works with httpx)
# - 's' prefix = static range page (JS-rendered, needs Playwright)
# We keep a small seed list; remaining categories are discovered dynamically.
CATEGORY_PATHS = [
"/products/fruit-and-vegetables/",
"/products/bakery/",
"/products/meat-and-fish/",
"/products/dairy-and-eggs/",
"/products/chilled/",
"/products/frozen/",
"/products/drinks/",
"/products/food-cupboard/",
"/products/snacks-and-sweets/",
"/products/baby-and-toddler/",
"/products/health-and-beauty/",
"/products/household/",
"/products/pet/",
"/grocery-range", # Main grocery landing page (for discovery only)
]
# Weekly specials — JS-rendered, needs Playwright
# Weekly offer / campaign URLs (confirmed format -- httpx works)
WEEKLY_OFFERS_URLS = [
f"{BASE_URL}/our-offers",
f"{BASE_URL}/our-offers/this-week",
f"{BASE_URL}/our-offers/next-week",
f"{BASE_URL}/c/middle-aisle-highlights/a10027271",
f"{BASE_URL}/c/super-savers/a10028883",
f"{BASE_URL}/c/lidl-plus-offers/a10073407",
]
@ -60,31 +62,106 @@ class LidlScraper(BaseScraper):
# Category URLs
# ------------------------------------------------------------------
async def get_category_urls(self) -> list[str]:
"""Return category URLs, preferring dynamic discovery.
Falls back to the static seed list plus weekly offers if discovery
finds nothing.
"""
discovered = await self._discover_categories()
if discovered:
logger.info("[lidl] Discovered %d category URLs from /grocery-range", len(discovered))
# Add weekly offer URLs that may not appear in discovery
all_urls = list(set(discovered + WEEKLY_OFFERS_URLS))
return all_urls
logger.warning("[lidl] Category discovery found nothing; using static seed list")
urls = [f"{BASE_URL}{path}" for path in CATEGORY_PATHS]
urls.extend(WEEKLY_OFFERS_URLS)
return urls
async def _discover_categories(self) -> list[str]:
"""Discover category URLs from /grocery-range landing page.
Uses httpx first (cheaper), falling back to Playwright if needed.
"""
try:
return await self._discover_categories_httpx()
except Exception:
logger.info("[lidl] httpx category discovery failed, trying Playwright")
return await self._discover_categories_playwright()
async def _discover_categories_httpx(self) -> list[str]:
"""Discover category links from /grocery-range using httpx."""
headers = {**DEFAULT_HEADERS, "User-Agent": random_user_agent()}
async with httpx.AsyncClient(
headers=headers, follow_redirects=True, timeout=30.0,
) as client:
resp = await client.get(f"{BASE_URL}/grocery-range")
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
links: set[str] = set()
for a_tag in soup.select("a[href*='/c/']"):
href = a_tag.get("href", "")
if not href:
continue
if not href.startswith("http"):
href = f"{BASE_URL}{href}"
# Strip tracking query params for dedup
href = href.split("?")[0]
# Only keep Lidl Ireland links
if href.startswith(BASE_URL):
links.add(href)
if not links:
raise RuntimeError("No /c/ links found on /grocery-range")
return list(links)
async def _discover_categories_playwright(self) -> list[str]:
"""Discover category URLs from /grocery-range using Playwright."""
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
logger.info("[lidl] Discovering categories from %s/grocery-range", BASE_URL)
await page.goto(
f"{BASE_URL}/grocery-range",
wait_until="domcontentloaded",
timeout=60_000,
)
await asyncio.sleep(3)
await self._dismiss_overlays(page)
links = await page.evaluate('''() => {
return [...document.querySelectorAll('a[href*="/c/"]')]
.map(a => a.href.split("?")[0])
.filter((v, i, a) => a.indexOf(v) === i);
}''')
return list(set(links))
except Exception:
logger.warning("[lidl] Playwright category discovery failed", exc_info=True)
return []
finally:
await context.close()
await browser.close()
await pw.stop()
# ------------------------------------------------------------------
# Scrape one category
# ------------------------------------------------------------------
async def scrape_category(self, category_url: str) -> list[RawProduct]:
# Weekly offers pages need Playwright
if "/our-offers" in category_url:
return await self._scrape_offers_page(category_url)
# Standard category — try httpx first
try:
# Campaign / offer pages (/a{id}) have SSR product tiles -- try httpx
if re.search(r"/c/.+/a\d+", category_url):
return await self._scrape_with_httpx(category_url)
except Exception as exc:
logger.warning(
"[lidl] httpx failed for %s (%s), falling back to Playwright",
category_url,
exc,
)
return await self._scrape_with_playwright(category_url)
# Static range pages (/s{id}) and other pages are JS-rendered
# and require Playwright.
return await self._scrape_with_playwright(category_url)
# ------------------------------------------------------------------
# httpx-based scraping
# httpx-based scraping (works for /a{id} campaign pages)
# ------------------------------------------------------------------
async def _scrape_with_httpx(self, category_url: str) -> list[RawProduct]:
products: list[RawProduct] = []
@ -114,7 +191,16 @@ class LidlScraper(BaseScraper):
len(products),
)
# Pagination
if not batch:
# No products found -- page may need JS rendering
logger.warning(
"[lidl] httpx returned 0 products for %s; "
"page may require Playwright",
current_url,
)
# Pagination -- Lidl campaign pages do not typically paginate,
# but we keep this in case they start.
next_link = soup.select_one(
"a[rel='next'], "
"a.pagination__next, "
@ -133,178 +219,283 @@ class LidlScraper(BaseScraper):
return products
# ------------------------------------------------------------------
# HTML parsing -- extract from data-grid-data JSON attributes
# ------------------------------------------------------------------
def _parse_html(self, soup: BeautifulSoup) -> list[RawProduct]:
"""Parse product tiles from a Lidl category page."""
"""Parse product tiles from a Lidl page.
Lidl embeds product data as a JSON blob in the ``data-grid-data``
attribute of ``div.AProductGridbox__GridTilePlaceholder`` elements.
The inner HTML of these tiles is only skeleton/loading placeholders;
all meaningful data lives in the attribute.
"""
products: list[RawProduct] = []
# Lidl product grid items
# Primary selector: the confirmed SSR tile class.
# Also match any element with a data-grid-data attribute as fallback.
tiles = soup.select(
"div[class*='product-grid-box'], "
"div[class*='ACampaignGrid__item'], "
"article[class*='product'], "
"div[class*='ProductTile'], "
"div.ret-o-card"
"div.AProductGridbox__GridTilePlaceholder, "
"[data-grid-data]"
)
if not tiles:
tiles = soup.select(
"div[class*='product-item'], "
"li[class*='product-item'], "
"div[class*='product-card']"
)
for tile in tiles:
try:
# --- Name + link ---
name_el = (
tile.select_one(
"h3[class*='product-title'], "
"a[class*='product-title'], "
"h2[class*='title'], "
"p[class*='product-grid-box__title'], "
"strong[class*='title']"
)
or tile.select_one("h3, h2, a")
)
if not name_el:
continue
name = name_el.get_text(strip=True)
if not name:
continue
# Try to get link
link_el = tile.select_one("a[href]") or name_el
href = link_el.get("href", "") if link_el else ""
# --- SKU ---
sku = tile.get("data-product-id", "") or tile.get("data-id", "")
if not sku and href:
sku_match = re.search(r"/p(\d+)", href) or re.search(r"/(\d{4,})", href)
sku = sku_match.group(1) if sku_match else ""
if not sku:
sku = f"lidl-{hash(name) % 1000000}"
# --- Price ---
price_el = tile.select_one(
"span[class*='price'], "
"span[class*='pricebox__price'], "
"div[class*='price'], "
"strong[class*='price']"
)
price_text = price_el.get_text(strip=True) if price_el else ""
price = self._parse_price(price_text)
if price is None or price == 0:
continue
# --- Strikethrough / original price ---
promo_price = None
promo_label = None
was_el = tile.select_one(
"del, "
"s, "
"span[class*='strikethrough'], "
"span[class*='pricebox__old-price']"
)
if was_el:
original = self._parse_price(was_el.get_text(strip=True))
if original and original > price:
promo_price = price
price = original
# Promo badge text
badge_el = tile.select_one(
"span[class*='badge'], "
"div[class*='ribbon'], "
"span[class*='sticker']"
)
if badge_el:
promo_label = badge_el.get_text(strip=True) or promo_label
# --- Image ---
image_url = None
img_el = tile.select_one("img")
if img_el:
image_url = (
img_el.get("src")
or img_el.get("data-src")
or img_el.get("srcset", "").split(",")[0].split(" ")[0]
)
if image_url and image_url.startswith("//"):
image_url = f"https:{image_url}"
elif image_url and image_url.startswith("/"):
image_url = f"{BASE_URL}{image_url}"
# --- Unit / size from name ---
unit_size = None
unit = None
size_match = re.search(
r"(\d+(?:\.\d+)?)\s*(ml|l|g|kg|cl|pk|pack)\b", name, re.IGNORECASE
)
if size_match:
try:
unit_size = Decimal(size_match.group(1))
unit = size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# --- Unit price ---
unit_price = None
unit_price_el = tile.select_one(
"span[class*='unit-price'], "
"span[class*='pricebox__basic-quantity'], "
"div[class*='unit-price']"
)
if unit_price_el:
up_text = unit_price_el.get_text(strip=True)
up_match = re.search(r"([\d.,]+)\s*/\s*(\w+)", up_text)
if up_match:
cleaned = up_match.group(1).replace(",", ".")
try:
unit_price = Decimal(cleaned)
unit = unit or up_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# --- Brand ---
brand = None
brand_el = tile.select_one(
"span[class*='brand'], "
"p[class*='brand'], "
"span[class*='keyfact']"
)
if brand_el:
brand = brand_el.get_text(strip=True) or None
product_url = href
if product_url and not product_url.startswith("http"):
product_url = f"{BASE_URL}{product_url}"
products.append(
RawProduct(
store_sku=sku,
name=name,
price=price,
promo_price=promo_price,
promo_label=promo_label,
unit_price=unit_price,
unit=unit,
unit_size=unit_size,
brand=brand,
image_url=image_url,
product_url=product_url or None,
)
)
product = self._parse_tile(tile)
if product is not None:
products.append(product)
except Exception:
logger.debug("[lidl] Failed to parse product tile", exc_info=True)
return products
def _parse_tile(self, tile) -> RawProduct | None:
"""Extract a RawProduct from a single tile element.
Data is primarily extracted from the ``data-grid-data`` JSON
attribute. If that attribute is missing, we fall back to
HTML attributes (``fulltitle``, ``productid``, ``canonicalurl``,
``image``) which Lidl also renders on the element.
"""
grid_data_raw = tile.get("data-grid-data", "")
grid_data: dict = {}
if grid_data_raw:
try:
grid_data = json.loads(grid_data_raw)
except (json.JSONDecodeError, TypeError):
logger.debug("[lidl] Invalid JSON in data-grid-data")
# --- Name ---
name = (
grid_data.get("fullTitle")
or grid_data.get("title")
or tile.get("fulltitle", "")
)
if not name:
return None
# --- Product ID / SKU ---
product_id = str(
grid_data.get("productId")
or grid_data.get("itemId")
or grid_data.get("erpNumber")
or tile.get("productid", "")
or tile.get("itemid", "")
)
if not product_id:
product_id = f"lidl-{hash(name) % 1000000}"
# --- Product URL ---
canonical = (
grid_data.get("canonicalUrl")
or grid_data.get("canonicalPath")
or tile.get("canonicalurl", "")
or tile.get("canonicalpath", "")
)
product_url = None
if canonical:
product_url = canonical if canonical.startswith("http") else f"{BASE_URL}{canonical}"
# --- Price ---
# Price can come from two places:
# 1. price.price (top-level, for regular / non-Lidl-Plus items)
# 2. lidlPlus[0].price.price (for Lidl Plus offer items)
price: Decimal | None = None
promo_price: Decimal | None = None
promo_label: str | None = None
price_obj = grid_data.get("price", {})
lidl_plus_list = grid_data.get("lidlPlus", [])
top_level_price = price_obj.get("price")
if top_level_price is not None:
try:
price = Decimal(str(top_level_price))
except (InvalidOperation, ValueError):
pass
# Lidl Plus price data (often present for offer / campaign pages)
if lidl_plus_list:
lp_entry = lidl_plus_list[0] if isinstance(lidl_plus_list, list) else {}
lp_price_obj = lp_entry.get("price", {})
lp_price_val = lp_price_obj.get("price")
lp_discount = lp_price_obj.get("discount", {})
deleted_price = lp_discount.get("deletedPrice")
old_price = lp_price_obj.get("oldPrice")
highlight_text = lp_entry.get("highlightText", "")
lidl_plus_text = lp_entry.get("lidlPlusText", "")
if lp_price_val is not None:
try:
lp_price = Decimal(str(lp_price_val))
except (InvalidOperation, ValueError):
lp_price = None
if lp_price is not None:
# Determine original / struck-through price
original = None
for candidate in (deleted_price, old_price):
if candidate is not None:
try:
original = Decimal(str(candidate))
break
except (InvalidOperation, ValueError):
pass
if original and original > lp_price:
# There IS a discount: original is the shelf price,
# lp_price is the promo price.
price = original
promo_price = lp_price
# Build a promo label from highlight / lidl plus text
parts = [p for p in (highlight_text, lidl_plus_text) if p]
promo_label = " - ".join(parts) if parts else "Lidl Plus Offer"
elif price is None:
# No top-level price, use Lidl Plus price as the base
price = lp_price
if highlight_text or lidl_plus_text:
parts = [p for p in (highlight_text, lidl_plus_text) if p]
promo_label = " - ".join(parts)
if price is None or price == 0:
return None
# --- Image ---
image_url = grid_data.get("image") or tile.get("image")
if not image_url:
image_list = grid_data.get("imageList") or grid_data.get("imageList_V1")
if image_list and isinstance(image_list, list):
first = image_list[0]
if isinstance(first, dict):
image_url = first.get("image")
elif isinstance(first, str):
image_url = first
if image_url:
if image_url.startswith("//"):
image_url = f"https:{image_url}"
elif image_url.startswith("/"):
image_url = f"{BASE_URL}{image_url}"
# --- Brand ---
brand = None
brand_obj = grid_data.get("brand", {})
if isinstance(brand_obj, dict) and brand_obj.get("showBrand"):
brand = brand_obj.get("name")
# --- Category ---
category = grid_data.get("category") or tile.get("category")
# --- EAN ---
ean = None
ians = grid_data.get("ians")
if ians and isinstance(ians, list) and ians[0]:
ean = str(ians[0])
# --- Unit / packaging from lidlPlus or price data ---
unit_size: Decimal | None = None
unit: str | None = None
packaging_text = None
# Try lidlPlus packaging first
if lidl_plus_list:
lp_entry = lidl_plus_list[0] if isinstance(lidl_plus_list, list) else {}
packaging_text = (
lp_entry.get("price", {}).get("packaging", {}).get("text")
)
# Fall back to top-level price packaging
if not packaging_text:
packaging_text = price_obj.get("packaging", {}).get("text")
if packaging_text:
size_match = re.search(
r"(\d+(?:[.,]\d+)?)\s*(ml|l|g|kg|cl|pk|pack|cm)\b",
packaging_text,
re.IGNORECASE,
)
if size_match:
try:
unit_size = Decimal(size_match.group(1).replace(",", "."))
unit = size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# Fall back: extract unit/size from product name
if unit_size is None:
size_match = re.search(
r"(\d+(?:[.,]\d+)?)\s*(ml|l|g|kg|cl|pk|pack)\b",
name,
re.IGNORECASE,
)
if size_match:
try:
unit_size = Decimal(size_match.group(1).replace(",", "."))
unit = size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# --- Unit price (base price) ---
unit_price: Decimal | None = None
base_price_obj = None
if lidl_plus_list:
lp_entry = lidl_plus_list[0] if isinstance(lidl_plus_list, list) else {}
base_price_obj = lp_entry.get("price", {}).get("basePrice")
if not base_price_obj:
base_price_obj = price_obj.get("basePrice")
if isinstance(base_price_obj, dict):
bp_val = base_price_obj.get("price")
if bp_val is not None:
try:
unit_price = Decimal(str(bp_val))
except (InvalidOperation, ValueError):
pass
# --- In stock ---
stock_info = grid_data.get("stockAvailability", {})
in_stock = True
if isinstance(stock_info, dict):
indicator = stock_info.get("availabilityIndicator")
# 0 = available, higher values indicate limited/out of stock
if indicator is not None and indicator > 2:
in_stock = False
# --- Promo label from ribbons if not already set ---
if not promo_label:
ribbons = grid_data.get("ribbons", [])
if ribbons and isinstance(ribbons, list):
ribbon_texts = [r.get("text", "") for r in ribbons if isinstance(r, dict)]
ribbon_str = " | ".join(t for t in ribbon_texts if t)
if ribbon_str:
promo_label = ribbon_str
return RawProduct(
store_sku=product_id,
name=name,
price=price,
promo_price=promo_price,
promo_label=promo_label,
unit_price=unit_price,
unit=unit,
unit_size=unit_size,
brand=brand,
ean=ean,
category=category,
image_url=image_url,
product_url=product_url,
in_stock=in_stock,
)
# ------------------------------------------------------------------
# Playwright-based scraping (fallback / offers)
# Playwright-based scraping (for /s{id} range pages and fallback)
# ------------------------------------------------------------------
async def _scrape_with_playwright(self, url: str) -> list[RawProduct]:
"""Scrape a page using Playwright.
Required for ``/c/{slug}/s{id}`` range pages and the ``/grocery-range``
landing page, which are fully JS-rendered (Nuxt hydration).
After Playwright renders the page, we extract the same
``data-grid-data`` JSON that the httpx path uses.
"""
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
@ -313,121 +504,69 @@ class LidlScraper(BaseScraper):
await asyncio.sleep(3)
await self._dismiss_overlays(page)
await self._scroll_page(page)
await self._scroll_page(page, scrolls=8)
html = await page.content()
soup = BeautifulSoup(html, "html.parser")
return self._parse_html(soup)
products = self._parse_html(soup)
# If _parse_html found nothing, try extracting from Playwright
# locators directly (the data-grid-data may also be available
# on the live DOM even if not in the serialised HTML).
if not products:
products = await self._extract_from_playwright(page)
return products
finally:
await context.close()
await browser.close()
await pw.stop()
async def _scrape_offers_page(self, url: str) -> list[RawProduct]:
"""Scrape Lidl weekly offers page (JS-rendered)."""
async def _extract_from_playwright(self, page: Page) -> list[RawProduct]:
"""Extract products directly from the Playwright page DOM.
Evaluates JS to pull data-grid-data JSON from all tile elements.
"""
products: list[RawProduct] = []
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
logger.info("[lidl] Loading offers page %s", url)
await page.goto(url, wait_until="domcontentloaded", timeout=60_000)
await asyncio.sleep(3)
raw_items = await page.evaluate('''() => {
const tiles = document.querySelectorAll(
'div.AProductGridbox__GridTilePlaceholder, [data-grid-data]'
);
return [...tiles].map(el => {
try {
const raw = el.getAttribute('data-grid-data');
return raw ? JSON.parse(raw) : null;
} catch { return null; }
}).filter(Boolean);
}''')
await self._dismiss_overlays(page)
await self._scroll_page(page, scrolls=10)
# Offer tiles may use different markup to the main catalogue
tiles = page.locator(
"div[class*='AOfferCard'], "
"div[class*='OfferCard'], "
"div[class*='product-grid-box'], "
"article[class*='product'], "
"a[class*='ret-o-card']"
)
count = await tiles.count()
logger.info("[lidl] Found %d offer tiles", count)
for i in range(count):
try:
tile = tiles.nth(i)
name_el = tile.locator(
"h3, h2, "
"strong[class*='title'], "
"p[class*='title'], "
"span[class*='title']"
)
name = ""
if await name_el.count() > 0:
name = (await name_el.first.inner_text()).strip()
if not name:
continue
price_el = tile.locator(
"span[class*='price'], "
"strong[class*='price'], "
"div[class*='pricebox__price']"
)
price_text = ""
if await price_el.count() > 0:
price_text = await price_el.first.inner_text()
price = self._parse_price(price_text)
if price is None or price == 0:
continue
sku = f"lidl-offer-{hash(name) % 1000000}"
# Was price
promo_price = None
promo_label = "Weekly Offer"
was_el = tile.locator("del, s, span[class*='old-price']")
if await was_el.count() > 0:
was_text = await was_el.first.inner_text()
original = self._parse_price(was_text)
if original and original > price:
promo_price = price
price = original
# Dates / availability label
date_el = tile.locator(
"span[class*='date'], "
"span[class*='availability']"
)
if await date_el.count() > 0:
avail = (await date_el.first.inner_text()).strip()
if avail:
promo_label = f"Weekly Offer - {avail}"
# Image
image_url = None
img_el = tile.locator("img")
if await img_el.count() > 0:
image_url = await img_el.first.get_attribute("src")
if image_url and not image_url.startswith("http"):
image_url = f"{BASE_URL}{image_url}"
products.append(
RawProduct(
store_sku=sku,
name=name,
price=price,
promo_price=promo_price,
promo_label=promo_label,
image_url=image_url,
)
)
except Exception:
logger.debug("[lidl] Failed to parse offer tile %d", i, exc_info=True)
finally:
await context.close()
await browser.close()
await pw.stop()
for gd in raw_items:
try:
product = self._parse_grid_data_dict(gd)
if product is not None:
products.append(product)
except Exception:
logger.debug("[lidl] Failed to parse Playwright-extracted tile", exc_info=True)
logger.info("[lidl] Playwright JS extraction found %d products", len(products))
return products
def _parse_grid_data_dict(self, grid_data: dict) -> RawProduct | None:
"""Parse a RawProduct from a pre-parsed data-grid-data dict.
Shares logic with ``_parse_tile`` but takes a plain dict instead
of a BeautifulSoup element.
"""
# Build a minimal mock tag with the grid_data as attribute
# so we can reuse _parse_tile. This is a lightweight approach.
from bs4 import Tag
tag = Tag(name="div")
tag["data-grid-data"] = json.dumps(grid_data)
return self._parse_tile(tag)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

View file

@ -3,6 +3,10 @@
SuperValu requires authentication to browse the full catalogue. We use
Playwright to log in with the credentials from settings and then browse
each category.
IMPORTANT: Login URL is at supervalu.ie/login/ (NOT shop.supervalu.ie/login).
Category URLs use the format /categories/{slug}-id-{code}.
After login, a store must be selected before browsing products.
"""
from __future__ import annotations
@ -24,24 +28,14 @@ from src.scrapers.base import (
logger = logging.getLogger(__name__)
BASE_URL = "https://shop.supervalu.ie"
LOGIN_URL = f"{BASE_URL}/login"
LOGIN_URL = "https://supervalu.ie/login/"
# SuperValu grocery categories
# Confirmed SuperValu category paths (format: /categories/{slug}-id-{code})
CATEGORY_PATHS = [
"/shopping/fresh/",
"/shopping/bakery/",
"/shopping/dairy-eggs-chilled/",
"/shopping/meat-poultry-fish/",
"/shopping/fruit-vegetables/",
"/shopping/frozen/",
"/shopping/drinks/",
"/shopping/food-cupboard/",
"/shopping/snacks-confectionery/",
"/shopping/household/",
"/shopping/health-beauty/",
"/shopping/baby-toddler/",
"/shopping/pet-care/",
"/shopping/alcohol/",
"/categories/fruit-vegetables-id-O100001",
"/categories/meat-%26-poultry-id-O100015",
"/categories/chilled-food-id-O100030",
"/categories/frozen-foods-id-O100045",
]
@ -61,8 +55,55 @@ class SuperValuScraper(BaseScraper):
# Category URLs
# ------------------------------------------------------------------
async def get_category_urls(self) -> list[str]:
"""Return category URLs, preferring dynamic discovery.
Falls back to the static seed list if discovery finds nothing.
"""
discovered = await self._discover_categories()
if discovered:
logger.info(
"[supervalu] Discovered %d category URLs from allaisles", len(discovered)
)
return discovered
logger.warning("[supervalu] Category discovery found nothing; using static seed list")
return [f"{BASE_URL}{path}" for path in CATEGORY_PATHS]
async def _discover_categories(self) -> list[str]:
"""Discover category URLs from /shopping/allaisles."""
pw, browser, context = await self._get_browser_context(headless=True)
try:
page = await context.new_page()
# Must log in first to access the catalogue
await self._login(page)
await self._select_store(page)
await random_delay(1.0, 2.0)
logger.info("[supervalu] Discovering categories from allaisles page")
await page.goto(
f"{BASE_URL}/shopping/allaisles",
wait_until="domcontentloaded",
timeout=60_000,
)
await asyncio.sleep(3)
links = await page.evaluate('''() => {
return [...document.querySelectorAll('a[href*="/categories/"]')]
.map(a => a.href)
.filter(href => href.includes('-id-'));
}''')
unique = list(set(links))
return unique
except Exception:
logger.warning("[supervalu] Category discovery failed", exc_info=True)
return []
finally:
await context.close()
await browser.close()
await pw.stop()
# ------------------------------------------------------------------
# Scrape one category
# ------------------------------------------------------------------
@ -77,6 +118,10 @@ class SuperValuScraper(BaseScraper):
await self._login(page)
await random_delay(1.0, 2.0)
# Select a store (required before browsing products)
await self._select_store(page)
await random_delay(0.5, 1.0)
# Navigate to category
logger.info("[supervalu] Loading category %s", category_url)
await page.goto(category_url, wait_until="domcontentloaded", timeout=60_000)
@ -182,6 +227,64 @@ class SuperValuScraper(BaseScraper):
else:
logger.info("[supervalu] Login appears successful (now at %s)", page.url)
async def _select_store(self, page: Page) -> None:
"""After login, select a store by navigating to allaisles or entering Eircode.
SuperValu requires a store/delivery area to be selected before
product prices and availability are shown.
"""
try:
# First check if we're already on a page that has store selected
# (i.e., products are visible)
product_check = page.locator("[class*='ProductCard'], [class*='product-card']")
if await product_check.count() > 0:
logger.debug("[supervalu] Store appears already selected")
return
# Look for Eircode / postcode input (store selection modal or page)
eircode_input = page.locator(
"input[placeholder*='Eircode' i], "
"input[name*='eircode' i], "
"input[placeholder*='postcode' i], "
"input[placeholder*='Enter your area' i], "
"input[id*='eircode' i], "
"input[id*='postcode' i]"
)
if await eircode_input.count() > 0:
logger.info("[supervalu] Found Eircode input, entering D01 F5P2")
await eircode_input.first.fill("D01 F5P2") # Dublin city center
await asyncio.sleep(1)
# Click search/submit button
submit = page.locator(
"button[type='submit'], "
"button:has-text('Find'), "
"button:has-text('Search'), "
"button:has-text('Go'), "
"button[aria-label*='search' i]"
)
if await submit.count() > 0:
await submit.first.click()
await asyncio.sleep(2)
# If a store list appears, pick the first one
store_option = page.locator(
"button:has-text('Select'), "
"a:has-text('Select Store'), "
"button:has-text('Choose'), "
"li[class*='store'] button, "
"div[class*='store-item'] button"
)
if await store_option.count() > 0:
await store_option.first.click()
await asyncio.sleep(2)
logger.info("[supervalu] Store selected via Eircode search")
else:
logger.debug("[supervalu] No Eircode input found; store may already be set")
except Exception:
logger.debug("[supervalu] Store selection handling failed", exc_info=True)
# ------------------------------------------------------------------
# DOM extraction
# ------------------------------------------------------------------

View file

@ -11,6 +11,7 @@ from __future__ import annotations
import asyncio
import logging
import re
import sys
from decimal import Decimal, InvalidOperation
from playwright.async_api import Page, Response
@ -57,260 +58,215 @@ class TescoScraper(BaseScraper):
# Scrape a single category
# ------------------------------------------------------------------
async def scrape_category(self, category_url: str) -> list[RawProduct]:
"""Load a Tesco category page, intercept API responses, and parse products."""
products: list[RawProduct] = []
api_products: list[dict] = []
"""Load a Tesco category page and extract products via JS evaluation.
pw, browser, context = await self._get_browser_context(headless=True)
Tesco uses Akamai WAF + obfuscated CSS module class names.
The most reliable approach is to use JavaScript evaluation to extract
product data from the rendered DOM rather than relying on brittle
CSS selectors.
"""
# Tesco uses Akamai WAF — resource blocking triggers bot detection
pw, browser, context = await self._get_browser_context(
headless=True, block_resources=False
)
try:
page = await context.new_page()
# Intercept the product listing API response
async def _handle_response(response: Response) -> None:
url = response.url
if "/resources/products/" in url or "/search?" in url:
try:
body = await response.json()
if isinstance(body, dict):
# Tesco returns products under "results" or "productItems"
items = (
body.get("results", [])
or body.get("productItems", [])
or body.get("data", {}).get("results", {}).get("productItems", [])
)
if isinstance(items, list):
api_products.extend(items)
except Exception:
pass
page.on("response", _handle_response)
logger.info("[tesco] Loading %s", category_url)
await page.goto(category_url, wait_until="networkidle", timeout=60_000)
await asyncio.sleep(2)
await page.goto(category_url, wait_until="domcontentloaded", timeout=60_000)
await asyncio.sleep(5)
# Handle cookie consent banner if present
try:
accept_btn = page.locator("button:has-text('Accept All Cookies')")
if await accept_btn.count() > 0:
await accept_btn.first.click()
await asyncio.sleep(1)
except Exception:
pass
# Scroll down to trigger lazy-loading of additional products
await self._scroll_page(page)
# Attempt pagination — Tesco uses "Show more" or numbered pages
while True:
for sel in ["#onetrust-accept-btn-handler", "button:has-text('Accept All')"]:
try:
show_more = page.locator(
"a[data-auto='load-more'], "
"button[data-auto='load-more'], "
"a.pagination--page-selector-next"
)
if await show_more.count() > 0 and await show_more.first.is_visible():
await show_more.first.click()
await page.wait_for_load_state("networkidle", timeout=15_000)
await asyncio.sleep(1.5)
await self._scroll_page(page)
else:
btn = page.locator(sel)
if await btn.count() > 0 and await btn.first.is_visible():
await btn.first.click()
await asyncio.sleep(1)
break
except Exception:
break
pass
# --- Parse products from intercepted API data ---
if api_products:
logger.info("[tesco] Intercepted %d API product items", len(api_products))
for item in api_products:
try:
product = self._parse_api_product(item)
if product:
products.append(product)
except Exception:
logger.debug("[tesco] Failed to parse API product item", exc_info=True)
await asyncio.sleep(2)
# --- Fallback: DOM scraping if we got nothing from the API ---
if not products:
logger.info("[tesco] Falling back to DOM scraping for %s", category_url)
products = await self._scrape_dom(page, category_url)
# Scroll to load lazy content
await self._scroll_page(page, scrolls=6)
# Extract products using JavaScript evaluation (bypasses CSS obfuscation)
products = await self._extract_products_js(page)
logger.info("[tesco] Extracted %d products from %s", len(products), category_url)
return products
finally:
await context.close()
await browser.close()
await pw.stop()
return products
# ------------------------------------------------------------------
# API response parser
# JS-based product extraction (reliable against obfuscated CSS)
# ------------------------------------------------------------------
def _parse_api_product(self, item: dict) -> RawProduct | None:
"""Parse a product dict from Tesco's API response."""
# Tesco wraps product data in different shapes depending on the endpoint
product_data = item.get("product", item)
async def _extract_products_js(self, page: Page) -> list[RawProduct]:
"""Extract product data via JavaScript evaluation.
sku = str(product_data.get("id", product_data.get("tpnb", "")))
name = product_data.get("title", product_data.get("name", ""))
if not sku or not name:
return None
Tesco uses obfuscated CSS module class names that change every build.
Instead of brittle CSS selectors, we find product tiles by structural
patterns: the product list ``ul#list-content``, product links matching
``/products/\\d+``, and nearby price elements.
"""
raw_items = await page.evaluate("""() => {
const results = [];
// The product list container uses id="list-content"
const list = document.getElementById('list-content');
const tiles = list ? list.querySelectorAll(':scope > li') : [];
price_str = (
product_data.get("price", "")
or product_data.get("retailPrice", {}).get("price", "")
)
try:
price = Decimal(str(price_str))
except (InvalidOperation, TypeError, ValueError):
return None
for (const tile of tiles) {
try {
// Find the product title link (href contains /products/{id})
const links = tile.querySelectorAll('a[href*="/products/"]');
let name = '';
let href = '';
for (const link of links) {
const text = link.textContent.trim();
if (text && text.length > 2) {
name = text;
href = link.href || link.getAttribute('href') || '';
break;
}
}
if (!name) continue;
# Promo / clubcard price
promo_price = None
promo_label = None
offer = product_data.get("promotions") or product_data.get("offers") or []
if isinstance(offer, list) and offer:
first_offer = offer[0] if isinstance(offer[0], dict) else {}
promo_label = first_offer.get("offerText", first_offer.get("description"))
promo_price_val = first_offer.get("price")
if promo_price_val is not None:
try:
promo_price = Decimal(str(promo_price_val))
except (InvalidOperation, TypeError):
pass
// Extract SKU from href
const skuMatch = href.match(/\\/products\\/(\\d+)/);
const sku = skuMatch ? skuMatch[1] : '';
if (!sku) continue;
# Unit price
unit_price = None
unit = None
unit_price_raw = product_data.get("unitPrice", product_data.get("unitOfMeasurePrice"))
if isinstance(unit_price_raw, dict):
try:
unit_price = Decimal(str(unit_price_raw.get("price", "")))
except (InvalidOperation, TypeError, ValueError):
pass
unit = unit_price_raw.get("unit", unit_price_raw.get("measure"))
elif unit_price_raw is not None:
try:
unit_price = Decimal(str(unit_price_raw))
except (InvalidOperation, TypeError, ValueError):
pass
// Find price: look for the main price text (format: X.XX)
// The price container has ddsweb-price or priceText in class
let priceText = '';
let unitPriceText = '';
const allPs = tile.querySelectorAll('p');
for (const p of allPs) {
const cls = p.className || '';
const text = p.textContent.trim();
if (text.startsWith('') && !priceText) {
if (text.includes('/')) {
// Unit price like "€0.28/each" or "€1.55/kg"
if (!unitPriceText) unitPriceText = text;
} else {
priceText = text;
}
}
}
# Unit size from the title e.g. "Avonmore Milk 2L"
unit_size = None
size_match = re.search(r"(\d+(?:\.\d+)?)\s*(ml|l|g|kg|cl)\b", name, re.IGNORECASE)
if size_match:
try:
unit_size = Decimal(size_match.group(1))
unit = unit or size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
// Also check span elements for price
if (!priceText) {
const spans = tile.querySelectorAll('span');
for (const s of spans) {
const text = s.textContent.trim();
if (text.match(/^\\d/) && !text.includes('/')) {
priceText = text;
break;
}
}
}
brand = product_data.get("brand", product_data.get("brandName"))
ean = product_data.get("ean", product_data.get("gtin"))
image_url = product_data.get("defaultImageUrl", product_data.get("imageUrl", ""))
if image_url and image_url.startswith("//"):
image_url = f"https:{image_url}"
if (!priceText) continue;
product_url = product_data.get("productUrl", product_data.get("href", ""))
if product_url and not product_url.startswith("http"):
product_url = f"{BASE_URL}{product_url}"
// Find promo/offer text
let promoLabel = '';
const offerEl = tile.querySelector('[data-auto="offer-text"]');
if (offerEl) {
promoLabel = offerEl.textContent.trim();
}
// Also check for Aldi Price Match or Clubcard badges
if (!promoLabel) {
const badges = tile.querySelectorAll('span[class*="logo"], span[class*="promo"], span[class*="offer"]');
for (const b of badges) {
const t = b.textContent.trim();
if (t && t.length > 2 && t.length < 80) {
promoLabel = t;
break;
}
}
}
in_stock = product_data.get("isAvailable", product_data.get("status", "")) != "OutOfStock"
if isinstance(in_stock, str):
in_stock = in_stock.lower() not in ("false", "outofstock", "unavailable")
// Find image
let imageUrl = '';
const img = tile.querySelector('img');
if (img) {
imageUrl = img.src || img.getAttribute('data-src') || '';
}
return RawProduct(
store_sku=sku,
name=name.strip(),
price=price,
promo_price=promo_price,
promo_label=promo_label,
unit_price=unit_price,
unit=unit,
unit_size=unit_size,
brand=brand,
ean=str(ean) if ean else None,
image_url=image_url or None,
product_url=product_url or None,
in_stock=bool(in_stock),
)
results.push({
sku: sku,
name: name,
price: priceText,
unitPrice: unitPriceText,
promoLabel: promoLabel,
imageUrl: imageUrl,
href: href,
});
} catch (e) {
// skip tile
}
}
return results;
}""")
# ------------------------------------------------------------------
# DOM fallback
# ------------------------------------------------------------------
async def _scrape_dom(self, page: Page, category_url: str) -> list[RawProduct]:
"""Scrape product data directly from the rendered DOM."""
products: list[RawProduct] = []
# Tesco uses product tiles in the category listing
product_tiles = page.locator(
"li[class*='product-list--list-item'], "
"div[data-auto='product-tile'], "
"div[class*='product-tile-wrapper']"
)
count = await product_tiles.count()
logger.info("[tesco] Found %d product tiles in DOM", count)
for i in range(count):
for item in raw_items:
try:
tile = product_tiles.nth(i)
# Product name / link
name_el = tile.locator(
"a[data-auto='product-tile--title'], "
"a[class*='product-tile--title'], "
"h3 a, "
"a.product-title"
)
name = (await name_el.first.inner_text()).strip() if await name_el.count() > 0 else ""
href = await name_el.first.get_attribute("href") if await name_el.count() > 0 else ""
if not name:
name = item.get("name", "").strip()
sku = item.get("sku", "")
if not name or not sku:
continue
# SKU from href e.g. /groceries/en-IE/products/123456789
sku = ""
if href:
sku_match = re.search(r"/products/(\d+)", href)
sku = sku_match.group(1) if sku_match else ""
if not sku:
sku = f"tesco-{i}-{hash(name) % 100000}"
# Price
price_el = tile.locator(
"span[data-auto='price-value'], "
"p[class*='price-per-sellable-unit'], "
"span.value"
)
price_text = ""
if await price_el.count() > 0:
price_text = await price_el.first.inner_text()
price_text = re.sub(r"[^\d.]", "", price_text)
# Parse price
price_text = re.sub(r"[^\d.]", "", item.get("price", ""))
try:
price = Decimal(price_text) if price_text else Decimal("0")
price = Decimal(price_text) if price_text else None
except InvalidOperation:
price = Decimal("0")
if price == 0:
price = None
if not price or price == 0:
continue
# Parse unit price
unit_price = None
unit = None
up_text = item.get("unitPrice", "")
if up_text:
up_match = re.search(r"€([\d.]+)/([\w]+)", up_text)
if up_match:
try:
unit_price = Decimal(up_match.group(1))
unit = up_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# Unit size from name
unit_size = None
size_match = re.search(
r"(\d+(?:\.\d+)?)\s*(ml|l|g|kg|cl|pk|pack)\b", name, re.IGNORECASE
)
if size_match:
try:
unit_size = Decimal(size_match.group(1))
unit = unit or size_match.group(2).lower()
except (InvalidOperation, ValueError):
pass
# Promo
promo_label = None
promo_el = tile.locator(
"span[data-auto='offer-text'], "
"div[class*='offer-text'], "
"span[class*='promo-content-small']"
)
if await promo_el.count() > 0:
promo_label = (await promo_el.first.inner_text()).strip() or None
promo_label = item.get("promoLabel") or None
# Image
img_el = tile.locator("img")
image_url = None
if await img_el.count() > 0:
image_url = await img_el.first.get_attribute("src")
if image_url and image_url.startswith("//"):
image_url = f"https:{image_url}"
image_url = item.get("imageUrl") or None
if image_url and image_url.startswith("//"):
image_url = f"https:{image_url}"
product_url = f"{BASE_URL}{href}" if href and not href.startswith("http") else href
# Product URL
href = item.get("href", "")
product_url = href if href.startswith("http") else f"{BASE_URL}{href}" if href else None
products.append(
RawProduct(
@ -318,12 +274,15 @@ class TescoScraper(BaseScraper):
name=name,
price=price,
promo_label=promo_label,
unit_price=unit_price,
unit=unit,
unit_size=unit_size,
image_url=image_url,
product_url=product_url or None,
product_url=product_url,
)
)
except Exception:
logger.debug("[tesco] Failed to parse tile %d", i, exc_info=True)
logger.debug("[tesco] Failed to parse JS-extracted product", exc_info=True)
return products
@ -346,14 +305,37 @@ async def main() -> None:
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s - %(message)s",
)
scraper = TescoScraper()
result = await scraper.run()
print(f"\nDone: {result.status}")
print(f"Products scraped: {len(result.products)}")
if result.errors:
print(f"Errors ({len(result.errors)}):")
for err in result.errors:
print(f" - {err}")
dry_run = "--dry-run" in sys.argv
if dry_run:
# Dry-run mode: scrape categories and print products without hitting the DB
scraper = TescoScraper()
category_urls = await scraper.get_category_urls()
all_products: list[RawProduct] = []
for url in category_urls:
try:
products = await scraper.scrape_category(url)
all_products.extend(products)
print(f"[dry-run] {url} -> {len(products)} products")
except Exception as exc:
print(f"[dry-run] {url} -> ERROR: {exc}")
await random_delay(1.0, 3.0)
print(f"\n[dry-run] Total products scraped: {len(all_products)}")
for p in all_products[:20]:
print(f" {p.store_sku:>12s} {str(p.price):>8s} {p.name}")
if len(all_products) > 20:
print(f" ... and {len(all_products) - 20} more")
else:
scraper = TescoScraper()
result = await scraper.run()
print(f"\nDone: {result.status}")
print(f"Products scraped: {len(result.products)}")
if result.errors:
print(f"Errors ({len(result.errors)}):")
for err in result.errors:
print(f" - {err}")
if __name__ == "__main__":