Skip to main content

RetailGrid Worker Development Guide

Audience: External developer implementing an analytics worker. You receive: This worker_sdk/ directory + a task brief describing which worker to build. You deliver: A worker.py + app.py that pass the acceptance criteria from your task brief.

Table of Contents

  1. Architecture Overview
  2. What You Receive
  3. Quick Start
  4. Your Contract: Two Methods
  5. JobMessage — What You Receive per Job
  6. Reading Data (JobDataReader)
  7. Writing Results (GridWriter)
  8. Reporting Progress
  9. Error Handling
  10. Complete Reference Example
  11. Testing Locally
  12. Deliverables Checklist
  13. Rules & Constraints
  14. FAQ

1. Architecture Overview

                     ┌─────────────┐
 User action ──────> │  Backend    │ ──> Creates Job row in DB
                     │  (API)      │ ──> Publishes Pub/Sub message
                     └─────────────┘

                    Pub/Sub message
                    (contains job_id)


                     ┌─────────────┐
                     │ YOUR WORKER │  <── Cloud Run container
                     │             │
                     │  1. Parse   │  ← message arrives via HTTP POST
                     │  2. Read    │  ← self.data_reader.get_*()
                     │  3. Compute │  ← your logic (pandas, numpy, etc.)
                     │  4. Write   │  ← self.grid_writer.create_dynamic_column()
                     │  5. Done    │  ← return result dict
                     └─────────────┘
Key points:
  • Your worker runs as an isolated container on Google Cloud Run.
  • It receives jobs as HTTP POST requests (Eventarc push model).
  • You never write SQL, touch database tables, or manage connections.
  • You interact with data exclusively through protocol interfaces (data_reader, grid_writer).
  • The SDK handles: Pub/Sub parsing, DB connections, job lifecycle, observability, advisory locks, retries.

2. What You Receive

your_worker/
├── worker.py               # ← YOU WRITE THIS (your computation logic)
├── app.py                  # ← YOU WRITE THIS (3-line entrypoint)
├── Dockerfile              # Container build (provided, adjust if needed)
├── requirements.txt        # Python deps (provided, add yours)
└── worker_sdk/             # SDK (provided — DO NOT MODIFY)
    ├── __init__.py          # Public API re-exports
    ├── protocols.py         # Protocol interfaces you code against
    ├── base_handler.py      # PubSubHandler base class
    ├── models.py            # JobMessage dataclass
    ├── constants.py         # JobType, JobStatus enums
    ├── factory.py           # Dependency builder (from env vars)
    ├── runner.py            # FastAPI app factory
    ├── observability.py     # Tracing + structured logging
    └── _impl/               # Internal (do not import directly)
Do not modify anything inside worker_sdk/. Treat it as a library.

3. Quick Start

3.1. Setup

# Python 3.11+ required
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

3.2. Minimal worker.py

from __future__ import annotations
from typing import Any, Dict
from worker_sdk import PubSubHandler, JobMessage
from worker_sdk.protocols import GridWriter, JobDataReader, JobProgressReporter

class MyWorker(PubSubHandler):
    def __init__(self, project_id, progress, data_reader, grid_writer, subscription_name=None):
        super().__init__(project_id, progress, subscription_name)
        self.data_reader = data_reader
        self.grid_writer = grid_writer

    def should_process_job(self, job: JobMessage) -> bool:
        return job.job_type == "my_analysis_type"

    def process_job(self, job: JobMessage) -> Dict[str, Any]:
        workbook_id = job.parameters["workbook_id"]
        self.update_job_progress(job.job_id, 10, "Loading data")

        # Read data, compute, write results...
        products = self.data_reader.get_products(workbook_id)

        self.update_job_progress(job.job_id, 90, "Done")
        return {"status": "completed", "product_count": len(products)}

3.3. Minimal app.py

from worker_sdk import create_worker_app
from worker import MyWorker

app = create_worker_app(MyWorker, service_name="retailgrid-worker-my-worker")

3.4. Run Locally

export MODELED_DB_HOST=<provided>
export MODELED_DB_NAME=<provided>
export MODELED_DB_USER=<provided>
export MODELED_DB_PASSWORD=<provided>
export OTEL_TRACES_EXPORTER=none  # disable tracing locally

uvicorn app:app --host 0.0.0.0 --port 8080 --reload

4. Your Contract: Two Methods

You subclass PubSubHandler and implement exactly two methods:

should_process_job(self, job: JobMessage) -> bool

Return True if your worker should handle this job type. Typically a one-liner:
def should_process_job(self, job: JobMessage) -> bool:
    return job.job_type == "my_analysis_type"
The job_type string will be specified in your task brief.

process_job(self, job: JobMessage) -> Dict[str, Any]

Your main logic. This is called after the SDK has already:
  • Parsed the Pub/Sub message
  • Acquired an advisory lock (prevents duplicate execution)
  • Marked the job as running in the database
You must return a dict. If the dict contains "status": "failed", the SDK marks the job as failed. Otherwise it marks it as completed and stores your dict as result_summary. If you raise an exception, the SDK catches it, marks the job as failed with the exception message, and re-raises.

5. JobMessage — What You Receive per Job

@dataclass
class JobMessage:
    job_id: str              # Unique UUID for this job
    job_type: str            # e.g. "my_analysis_type"
    model_id: str | None     # Optional model reference
    template_id: str | None  # Optional template reference
    parameters: Dict[str, Any]   # Job-specific parameters (see below)
    metadata: Dict[str, Any]     # Full message metadata
    workbook_id: str | None = None  # Workbook this job belongs to
    metadata: Dict[str, Any]     # Full original message payload
parameters is your primary input. It always contains at least workbook_id (UUID string). Additional keys depend on the job type — your task brief will specify them. Example parameters:
{
    "workbook_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "columns": [
        {"column_id": "revenue", "params": {"time_period": "last_30_days"}}
    ],
    "time_period": "last_30_days"
}

6. Reading Data (JobDataReader)

Access via self.data_reader. All methods are read-only and scoped by workbook_id.

6.1. Products (denormalized from Grid)

df = self.data_reader.get_products(workbook_id)
Returns a DataFrame of all product+variant rows from the denormalized grid table. If a product has 5 variants, 5 rows are returned — each with product-level fields duplicated alongside variant-level fields. All 29 standard product/variant fields:
ColumnTypeDescription
product_idstrUnique product identifier
product_namestrProduct display name
manufacturerstrManufacturer name
product_group_namestrProduct group / department
brandstrBrand name
product_statusstrProduct status
product_typestrProduct type classification
catalog_1strPrimary catalog / class name
category_1 .. category_5strCategory hierarchy levels
image_urlstrProduct image URL
link_urlstrProduct page URL
product_create_datedateProduct creation date
vatfloatVAT / tax rate percentage
sku_codestrVariant SKU code
variant_namestrVariant name / option label
eanstrEAN / barcode
sizestrVariant size
uomstrUnit of measure
stockfloatCurrent stock quantity
pricefloatCurrent selling price
regular_pricefloatRegular / compare-at price
rrpfloatRecommended retail price
costfloatUnit cost
currency_codestrISO 4217 currency code
price_effective_atdateEffective date for current price
cost_effective_atdateEffective date for current cost

6.2. Transactions (full load)

df = self.data_reader.get_workbook_data(workbook_id)
Returns joined transactions + products + catalog. Columns:
ColumnTypeDescription
itemstrProduct ID
classstrCatalog name
storestrStore identifier
regionstrRegion (may be null)
datedateTransaction date
quantityfloatUnits sold
revenuefloatRevenue amount
pricefloatTransaction price
reg_pricefloatRegular/list price
ohfloatStock on hand
costfloatUnit cost
promo_typestrPromotion type (may be null)
zone_pricefloatZone price (may be null)

6.3. Transactions (chunked streaming)

For large datasets, stream in chunks to avoid memory issues:
for chunk_df in self.data_reader.iter_workbook_data(workbook_id, chunksize=50_000):
    # process chunk_df (same columns as get_workbook_data)
    pass
Or raw transactions without product/catalog joins:
for chunk_df in self.data_reader.iter_transactions(workbook_id, chunksize=50_000):
    # columns: product_id, store_id, date, units_sold, revenue, price,
    #          reg_price, stock_on_hand, gross_profit, promo_type, zone_price
    pass

6.4. Grid Metrics

df = self.data_reader.get_grid_product_metrics(workbook_id)
Returns per-product metrics from the grid (with dynamic column overlays):
ColumnTypeDescription
product_idstrProduct ID
current_pricefloatMost recent price
costfloatCost
quantityfloatTotal quantity
revenuefloatTotal revenue

6.5. Transaction Metrics by Period

from datetime import date

df = self.data_reader.get_transaction_metrics_by_period(
    workbook_id=workbook_id,
    start_date=date(2025, 1, 1),
    end_date=date(2025, 12, 31),
)
Returns aggregated metrics filtered by date range:
ColumnTypeDescription
product_idstrProduct ID
current_pricefloatLatest price in period
min_pricefloatMinimum price in period
max_pricefloatMaximum price in period
quantityfloatTotal units sold
revenuefloatTotal revenue
gross_profitfloatTotal gross profit (may be null)
gross_profit_countintNumber of rows with gross profit data
unit_costfloatProduct unit cost

6.6. Other Reads

# Job metadata
job_info = self.data_reader.get_job_info(job_id)  # -> dict | None

# Workbook metadata
wb_info = self.data_reader.get_workbook_info(workbook_id)  # -> dict | None

# Grid row count
count = self.data_reader.get_grid_row_count(workbook_id)  # -> int

# Existing dynamic column keys
keys = self.data_reader.get_dynamic_column_keys(workbook_id)  # -> set[str]

7. Writing Results (GridWriter)

Access via self.grid_writer. The primary write pattern is dynamic columns.

7.1. Dynamic Columns (most common pattern)

Dynamic columns are custom computed columns that appear in the user’s grid. The two-step pattern: Step 1 — Register the column definition:
col_def_id = self.grid_writer.create_dynamic_column(
    workbook_id=workbook_id,
    column_key="my_metric",           # Unique key (snake_case)
    column_name="My Metric",          # Display name shown to user
    column_type="numeric",            # One of: numeric, integer, string, date, datetime, boolean
    source_job_id=job.job_id,         # Links column to this job
    computation_source="my_worker",   # Identifies which worker computed this
    formatting={"precision": 2},      # Optional display formatting
    template_id=job.template_id,      # Optional template reference
)
# Returns: int (column_def_id used in step 2)
Step 2 — Write values keyed by product_id:
product_values = {
    "PROD-001": 42.5,
    "PROD-002": 18.3,
    "PROD-003": 91.7,
    # ... one entry per product
}

rows_written = self.grid_writer.bulk_write_dynamic_values(
    workbook_id=workbook_id,
    column_def_id=col_def_id,
    column_type="numeric",
    product_values=product_values,
    precision=2,                     # Optional: decimal places for rounding
)
Supported column_type values:
Typeproduct_values value typeExample
numericfloat{"PROD-1": 42.5}
integerint{"PROD-1": 42}
stringstr{"PROD-1": "High"}
booleanbool{"PROD-1": True}
datestr (ISO format){"PROD-1": "2025-01-15"}
datetimestr (ISO format){"PROD-1": "2025-01-15T10:30:00Z"}

7.2. Ensuring Grid Exists

Before writing dynamic columns, the grid must have rows. Check and fill if needed:
grid_rows = self.data_reader.get_grid_row_count(workbook_id)
if grid_rows <= 0:
    self.grid_writer.fill_grid_from_products_transactions(workbook_id)

7.3. Analysis Summary

Store a summary of your analysis for the UI:
self.grid_writer.save_analysis_summary(
    job_id=job.job_id,
    workbook_id=job.workbook_id,
    analysis_type="my_analysis",
    summary_metrics={
        "total_products_analyzed": 150,
        "avg_score": 72.3,
        "execution_time_seconds": 12.5,
    },
    visualizations=None,  # Optional: dict of chart data
)

8. Reporting Progress

Call self.update_job_progress() throughout process_job() so the UI shows a progress bar:
self.update_job_progress(job.job_id, 10, "Loading data")
# ... work ...
self.update_job_progress(job.job_id, 40, "Processing transactions")
# ... work ...
self.update_job_progress(job.job_id, 80, "Writing results")
# ... work ...
self.update_job_progress(job.job_id, 95, "Finalizing")
  • Progress is 0–100 (integer).
  • Updates are rate-limited to 1/sec per job automatically; use force=True to bypass.
  • mark_started(), mark_completed(), mark_failed() are called automatically by the SDK.

9. Error Handling

Validation errors (bad input)

Raise ValueError for permanent errors that should not be retried:
if not workbook_id:
    raise ValueError("parameters.workbook_id is required")

Computation errors

Any unhandled exception is caught by the SDK, which:
  1. Marks the job as failed with the error message
  2. Logs the full traceback
  3. Returns HTTP 500 (so Cloud Run may retry)

Returning failure explicitly

return {"status": "failed", "error_message": "Not enough data to compute"}

10. Complete Reference Example

This is a real production worker that computes historical sales metrics. Study this as your primary reference. worker.py:
from __future__ import annotations

import calendar
from datetime import date, timedelta
from typing import Any, Dict, Iterable

import pandas as pd

from worker_sdk import PubSubHandler, JobMessage
from worker_sdk.constants import JobType
from worker_sdk.protocols import GridWriter, JobDataReader, JobProgressReporter


class HistoricalSalesAnalyticsWorker(PubSubHandler):

    def __init__(self, project_id, progress, data_reader, grid_writer, subscription_name=None):
        super().__init__(project_id, progress, subscription_name)
        self.data_reader = data_reader
        self.grid_writer = grid_writer

    def should_process_job(self, job: JobMessage) -> bool:
        return job.job_type == JobType.HISTORICAL_SALES_ANALYTICS

    def process_job(self, job: JobMessage) -> Dict[str, Any]:
        self.update_job_progress(job.job_id, 5, "Loading parameters", force=True)

        workbook_id = job.parameters.get("workbook_id")
        if not workbook_id:
            raise ValueError("parameters.workbook_id is required")

        columns = job.parameters.get("columns", [])
        if not columns:
            raise ValueError("columns must be a non-empty list")

        time_period = job.parameters.get("time_period", "last_30_days")
        start_date, end_date = self._period_to_range(time_period)

        # ---- Read ----
        self.update_job_progress(job.job_id, 15, "Aggregating transactions")
        metrics_df = self.data_reader.get_transaction_metrics_by_period(
            workbook_id=workbook_id,
            start_date=start_date,
            end_date=end_date,
        )

        # Ensure grid rows exist
        if self.data_reader.get_grid_row_count(workbook_id) <= 0:
            self.grid_writer.fill_grid_from_products_transactions(workbook_id)

        # ---- Compute ----
        self.update_job_progress(job.job_id, 50, "Computing metrics")
        computed = self._compute(metrics_df)

        # ---- Write ----
        self.update_job_progress(job.job_id, 70, "Saving columns")
        counts = {}
        for col_spec in columns:
            col_key = col_spec if isinstance(col_spec, str) else col_spec.get("column_id")
            if col_key not in computed.columns:
                continue
            col_def_id = self.grid_writer.create_dynamic_column(
                workbook_id=workbook_id,
                column_key=col_key,
                column_name=col_key.replace("_", " ").title(),
                column_type="numeric",
                source_job_id=job.job_id,
                computation_source=f"historical_sales_analytics:{time_period}",
                formatting={"format": "currency", "precision": 2},
            )
            values = {
                str(pid): float(v)
                for pid, v in zip(computed["product_id"], computed[col_key])
                if pd.notna(v)
            }
            counts[col_key] = self.grid_writer.bulk_write_dynamic_values(
                workbook_id=workbook_id,
                column_def_id=col_def_id,
                column_type="numeric",
                product_values=values,
                precision=2,
            )

        self.update_job_progress(job.job_id, 95, "Finalizing")
        return {"status": "completed", "columns_computed": list(counts.keys()), "counts": counts}

    def _period_to_range(self, time_period: str) -> tuple[date | None, date | None]:
        today = date.today()
        mapping = {
            "last_7_days":  timedelta(days=6),
            "last_30_days": timedelta(days=29),
            "last_90_days": timedelta(days=89),
        }
        delta = mapping.get(time_period)
        if delta:
            return today - delta, today
        raise ValueError(f"Unsupported time_period: {time_period}")

    def _compute(self, df: pd.DataFrame) -> pd.DataFrame:
        if df is None or df.empty:
            return pd.DataFrame(columns=["product_id", "current_price", "revenue", "cost"])
        df = df.copy()
        df["product_id"] = df["product_id"].astype(str)
        for c in ("current_price", "revenue", "quantity", "unit_cost"):
            if c in df.columns:
                df[c] = pd.to_numeric(df[c], errors="coerce")
        df["cost"] = df["unit_cost"].fillna(0) * df["quantity"].fillna(0)
        return df
app.py:
from worker_sdk import create_worker_app
from worker import HistoricalSalesAnalyticsWorker

app = create_worker_app(
    HistoricalSalesAnalyticsWorker,
    service_name="retailgrid-worker-historical-sales-analytics",
)
That’s it. Two files. The SDK handles everything else.

11. Testing Locally

11.1. Send a Test Job

Once your server is running (uvicorn app:app --port 8080), send a simulated Eventarc push:
# Encode your job payload as base64
JOB_JSON='{"job_id":"test-001","job_type":"my_analysis_type","workbook_id":"YOUR_WORKBOOK_UUID","parameters":{}}'
DATA_B64=$(echo -n "$JOB_JSON" | base64)

# Send the push request
curl -s -X POST http://localhost:8080/ \
  -H "Content-Type: application/json" \
  -d "{\"message\":{\"data\":\"$DATA_B64\",\"attributes\":{}}}" | python3 -m json.tool

11.2. Health Check

curl http://localhost:8080/health
# {"status":"healthy","service":"retailgrid-worker-my-worker"}

11.3. Unit Testing Your Logic

You can unit test _compute() and helper methods independently without the SDK:
import pandas as pd
from worker import MyWorker

# Test your computation logic directly
df = pd.DataFrame({
    "product_id": ["A", "B"],
    "revenue": [100.0, 200.0],
    "quantity": [10, 20],
})
# Create a minimal instance (mocked) or just call static/private methods

12. Deliverables Checklist

When your task is complete, deliver:
  • worker.py — your worker class with should_process_job() and process_job()
  • app.py — entrypoint using create_worker_app()
  • requirements.txt — add any extra dependencies you used (e.g. numpy, scikit-learn)
  • Updated Dockerfile if you need system-level packages
  • Brief notes on what parameters your worker expects in job.parameters
Validation criteria:
  • worker.py imports ONLY from worker_sdk.*, standard library, and your own extra pip packages
  • No direct SQL, no raw database access, no sqlalchemy imports in your code
  • process_job() returns a dict with "status": "completed" on success
  • Progress is reported at meaningful intervals (at least 3 updates)
  • Errors are raised early with clear ValueError messages for bad input

13. Rules & Constraints

RuleWhy
Do not modify worker_sdk/It is a shared SDK. Changes break deployment.
Do not import from backend.* or shared.*These packages do not exist in your container.
Do not write SQLUse self.data_reader and self.grid_writer methods.
Do not manage DB connectionsThe SDK handles connection pooling and retries.
Always validate parameters earlyRaise ValueError before doing expensive work.
Always report progressThe UI shows a progress bar to the user.
Return "status": "completed"Or the job is recorded as failed.
Use self.loggerBuilt-in structured logger with trace correlation.

14. FAQ

Q: Where do I get test database credentials? A: They will be provided separately alongside your task brief. Never commit them. Q: Can I add pip dependencies? A: Yes. Add them to requirements.txt. Prefer well-maintained packages (pandas, numpy, scikit-learn, etc.). Q: What if I need a new data_reader method that doesn’t exist? A: Contact us. We can add new protocol methods to the SDK. Do not write raw SQL. Q: Can I use async? A: process_job() is synchronous. The SDK wraps it in a FastAPI endpoint. Heavy async I/O is not needed since data reads are handled by the SDK. Q: How big can datasets be? A: Workbooks can have 100K+ products and millions of transactions. Use iter_workbook_data() or iter_transactions() with chunking for large datasets. Never load everything into memory at once for large workbooks. Q: How do I debug? A: Use self.logger.info(...), self.logger.warning(...), self.logger.error(...). Logs are structured JSON in production. Locally they appear as normal log lines. Q: What Python version? A: 3.11 or 3.12. Use modern syntax (str | None, dict[str, Any], etc.).