RetailGrid Worker Development Guide
Audience: External developer implementing an analytics worker. You receive: Thisworker_sdk/directory + a task brief describing which worker to build. You deliver: Aworker.py+app.pythat pass the acceptance criteria from your task brief.
Table of Contents
- Architecture Overview
- What You Receive
- Quick Start
- Your Contract: Two Methods
- JobMessage — What You Receive per Job
- Reading Data (JobDataReader)
- Writing Results (GridWriter)
- Reporting Progress
- Error Handling
- Complete Reference Example
- Testing Locally
- Deliverables Checklist
- Rules & Constraints
- FAQ
1. Architecture Overview
- Your worker runs as an isolated container on Google Cloud Run.
- It receives jobs as HTTP POST requests (Eventarc push model).
- You never write SQL, touch database tables, or manage connections.
- You interact with data exclusively through protocol interfaces (
data_reader,grid_writer). - The SDK handles: Pub/Sub parsing, DB connections, job lifecycle, observability, advisory locks, retries.
2. What You Receive
worker_sdk/. Treat it as a library.
3. Quick Start
3.1. Setup
3.2. Minimal worker.py
3.3. Minimal app.py
3.4. Run Locally
4. Your Contract: Two Methods
You subclassPubSubHandler and implement exactly two methods:
should_process_job(self, job: JobMessage) -> bool
Return True if your worker should handle this job type. Typically a one-liner:
job_type string will be specified in your task brief.
process_job(self, job: JobMessage) -> Dict[str, Any]
Your main logic. This is called after the SDK has already:
- Parsed the Pub/Sub message
- Acquired an advisory lock (prevents duplicate execution)
- Marked the job as
runningin the database
"status": "failed", the SDK marks the job as failed. Otherwise it marks it as completed and stores your dict as result_summary.
If you raise an exception, the SDK catches it, marks the job as failed with the exception message, and re-raises.
5. JobMessage — What You Receive per Job
parameters is your primary input. It always contains at least workbook_id (UUID string). Additional keys depend on the job type — your task brief will specify them.
Example parameters:
6. Reading Data (JobDataReader)
Access viaself.data_reader. All methods are read-only and scoped by workbook_id.
6.1. Products (denormalized from Grid)
| Column | Type | Description |
|---|---|---|
product_id | str | Unique product identifier |
product_name | str | Product display name |
manufacturer | str | Manufacturer name |
product_group_name | str | Product group / department |
brand | str | Brand name |
product_status | str | Product status |
product_type | str | Product type classification |
catalog_1 | str | Primary catalog / class name |
category_1 .. category_5 | str | Category hierarchy levels |
image_url | str | Product image URL |
link_url | str | Product page URL |
product_create_date | date | Product creation date |
vat | float | VAT / tax rate percentage |
sku_code | str | Variant SKU code |
variant_name | str | Variant name / option label |
ean | str | EAN / barcode |
size | str | Variant size |
uom | str | Unit of measure |
stock | float | Current stock quantity |
price | float | Current selling price |
regular_price | float | Regular / compare-at price |
rrp | float | Recommended retail price |
cost | float | Unit cost |
currency_code | str | ISO 4217 currency code |
price_effective_at | date | Effective date for current price |
cost_effective_at | date | Effective date for current cost |
6.2. Transactions (full load)
| Column | Type | Description |
|---|---|---|
item | str | Product ID |
class | str | Catalog name |
store | str | Store identifier |
region | str | Region (may be null) |
date | date | Transaction date |
quantity | float | Units sold |
revenue | float | Revenue amount |
price | float | Transaction price |
reg_price | float | Regular/list price |
oh | float | Stock on hand |
cost | float | Unit cost |
promo_type | str | Promotion type (may be null) |
zone_price | float | Zone price (may be null) |
6.3. Transactions (chunked streaming)
For large datasets, stream in chunks to avoid memory issues:6.4. Grid Metrics
| Column | Type | Description |
|---|---|---|
product_id | str | Product ID |
current_price | float | Most recent price |
cost | float | Cost |
quantity | float | Total quantity |
revenue | float | Total revenue |
6.5. Transaction Metrics by Period
| Column | Type | Description |
|---|---|---|
product_id | str | Product ID |
current_price | float | Latest price in period |
min_price | float | Minimum price in period |
max_price | float | Maximum price in period |
quantity | float | Total units sold |
revenue | float | Total revenue |
gross_profit | float | Total gross profit (may be null) |
gross_profit_count | int | Number of rows with gross profit data |
unit_cost | float | Product unit cost |
6.6. Other Reads
7. Writing Results (GridWriter)
Access viaself.grid_writer. The primary write pattern is dynamic columns.
7.1. Dynamic Columns (most common pattern)
Dynamic columns are custom computed columns that appear in the user’s grid. The two-step pattern: Step 1 — Register the column definition:column_type values:
| Type | product_values value type | Example |
|---|---|---|
numeric | float | {"PROD-1": 42.5} |
integer | int | {"PROD-1": 42} |
string | str | {"PROD-1": "High"} |
boolean | bool | {"PROD-1": True} |
date | str (ISO format) | {"PROD-1": "2025-01-15"} |
datetime | str (ISO format) | {"PROD-1": "2025-01-15T10:30:00Z"} |
7.2. Ensuring Grid Exists
Before writing dynamic columns, the grid must have rows. Check and fill if needed:7.3. Analysis Summary
Store a summary of your analysis for the UI:8. Reporting Progress
Callself.update_job_progress() throughout process_job() so the UI shows a progress bar:
- Progress is 0–100 (integer).
- Updates are rate-limited to 1/sec per job automatically; use
force=Trueto bypass. mark_started(),mark_completed(),mark_failed()are called automatically by the SDK.
9. Error Handling
Validation errors (bad input)
RaiseValueError for permanent errors that should not be retried:
Computation errors
Any unhandled exception is caught by the SDK, which:- Marks the job as
failedwith the error message - Logs the full traceback
- Returns HTTP 500 (so Cloud Run may retry)
Returning failure explicitly
10. Complete Reference Example
This is a real production worker that computes historical sales metrics. Study this as your primary reference.worker.py:
app.py:
11. Testing Locally
11.1. Send a Test Job
Once your server is running (uvicorn app:app --port 8080), send a simulated Eventarc push:
11.2. Health Check
11.3. Unit Testing Your Logic
You can unit test_compute() and helper methods independently without the SDK:
12. Deliverables Checklist
When your task is complete, deliver:-
worker.py— your worker class withshould_process_job()andprocess_job() -
app.py— entrypoint usingcreate_worker_app() -
requirements.txt— add any extra dependencies you used (e.g.numpy,scikit-learn) - Updated
Dockerfileif you need system-level packages - Brief notes on what parameters your worker expects in
job.parameters
worker.pyimports ONLY fromworker_sdk.*, standard library, and your own extra pip packages- No direct SQL, no raw database access, no
sqlalchemyimports in your code process_job()returns a dict with"status": "completed"on success- Progress is reported at meaningful intervals (at least 3 updates)
- Errors are raised early with clear
ValueErrormessages for bad input
13. Rules & Constraints
| Rule | Why |
|---|---|
Do not modify worker_sdk/ | It is a shared SDK. Changes break deployment. |
Do not import from backend.* or shared.* | These packages do not exist in your container. |
| Do not write SQL | Use self.data_reader and self.grid_writer methods. |
| Do not manage DB connections | The SDK handles connection pooling and retries. |
Always validate parameters early | Raise ValueError before doing expensive work. |
| Always report progress | The UI shows a progress bar to the user. |
Return "status": "completed" | Or the job is recorded as failed. |
Use self.logger | Built-in structured logger with trace correlation. |
14. FAQ
Q: Where do I get test database credentials? A: They will be provided separately alongside your task brief. Never commit them. Q: Can I add pip dependencies? A: Yes. Add them torequirements.txt. Prefer well-maintained packages (pandas, numpy, scikit-learn, etc.).
Q: What if I need a new data_reader method that doesn’t exist?
A: Contact us. We can add new protocol methods to the SDK. Do not write raw SQL.
Q: Can I use async?
A: process_job() is synchronous. The SDK wraps it in a FastAPI endpoint. Heavy async I/O is not needed since data reads are handled by the SDK.
Q: How big can datasets be?
A: Workbooks can have 100K+ products and millions of transactions. Use iter_workbook_data() or iter_transactions() with chunking for large datasets. Never load everything into memory at once for large workbooks.
Q: How do I debug?
A: Use self.logger.info(...), self.logger.warning(...), self.logger.error(...). Logs are structured JSON in production. Locally they appear as normal log lines.
Q: What Python version?
A: 3.11 or 3.12. Use modern syntax (str | None, dict[str, Any], etc.).