Building a Near-Real-Time Rain Risk ML Pipeline on AWS with data.gov.sg
A practical engineering postmortem on architecture, tradeoffs, cost controls, and what I would improve next
On this page
Building a Near-Real-Time Rain Risk ML Pipeline on AWS with data.gov.sg
This project is an end-to-end machine learning engineering build for station-level rain risk nowcasting. The goal is simple: generate actionable Low / Medium / High rain risk signals for outdoor event operations at short horizons (15, 30, and 60 minutes). The implementation is not simple: live API ingestion, time alignment, feature generation, model serving, orchestration, and dashboard delivery all need to work together under cost and reliability constraints.
I used data.gov.sg historical data for training and real-time weather APIs for inference.
Sources:
- Historical datasets curated for training (2024 backfill for feature and label generation):
- Real-time API used for near-real-time inference cycles:
Why this writeup existsThe objective here is to show implementation decisions and tradeoffs that matter in production-like ML systems, not just report a single accuracy number.
Problem framing and scope
Operational problem
Event teams need earlier warning when rain risk increases, so they can decide on manpower, shelter activation, and schedule adjustments. Monitoring raw weather metrics manually is reactive and hard to scale. A categorized risk output is easier for operations than raw mm rainfall numbers.
Success criteria
Project success criteria:
- Automate ingestion from real-time weather APIs.
- Build a reproducible training dataset with stable schema.
- Train and deploy a model that serves risk predictions continuously.
- Surface outputs in analytics dashboards for decision support.
- Keep spend controlled with explicit cost optimizations.
This is not a perfect production platform. It is a robust engineering baseline with clear next steps.
End-to-end architecture
Lake layers
I implemented a layered pipeline:
raw: ingested API snapshots and historical extracts.curated: cleaned, aligned, and standardized station-time records.gold: model-ready rows (training_rowsand inference rows).serving: prediction outputs for downstream dashboards.
AWS services used
AWS services used:
- S3 for lake storage across layers.
- Glue for ETL and feature jobs.
- Athena for SQL validation and ad-hoc analysis.
- SageMaker Canvas for model training and endpoint deployment.
- Step Functions + EventBridge for orchestration/scheduling.
- QuickSight for business-facing dashboard consumption.
Design principle: train-serving consistency
The key engineering principle was train-serving consistency: inference rows should follow the same feature logic as training rows wherever possible.
Why data.gov.sg?
I intentionally used both historical and real-time data.gov.sg weather APIs because it mirrors real integration work:
- historical backfill for training and feature sanity checks,
- real-time snapshots for operational inference.
This highlights practical experience with public APIs: designing around inconsistent availability, null fields, and cadence mismatch while still building deterministic downstream datasets.
Development workflow: Jupyter + Jupytext + Terraform
I developed Glue jobs in Jupyter first for faster iteration and debugging, then converted notebooks to Python scripts using Jupytext, and deployed those scripts as Glue jobs using Terraform.
Why this workflow
Why:
- Interactivity during development: quicker iteration on transformations and schema checks.
- Faster test loops: I validate logic on a small sample dataset before full runs to reduce turnaround time and cost.
- Script parity for deployment: no notebook-only logic trapped outside source control.
- Infrastructure as code: repeatable deployment and easier maintenance for job updates.
Implementation flow
Workflow in practice:
Notebook -> Jupytext -> Git -> Terraform
- Prototype ETL or feature logic in notebook on sampled data.
- Sync notebook to
.pyvia Jupytext as a pre-commit hook - Commit changes
- Deploy/update Glue jobs with Terraform modules by running
terraform apply
This allows me to leverage notebook speed and by using git hooks and terraform, it simplifies the workflow.
Hard data engineering problem #1: cadence mismatch
Issue
A major issue was timestamp cadence mismatch:
- rainfall data at 5-minute cadence,
- some additional metrics at 1-minute cadence.
Naive joins caused dropped rows or inconsistent alignment. That breaks feature reliability.
Decision
What I did:
- treated rainfall cadence as the anchor for alignment,
- aligned additional metrics to rainfall timestamps during curation,
- preserved missingness explicitly instead of forcing aggressive drops.
Tradeoff
This was a deliberate tradeoff: keep broader station coverage and model robustness, even when some metrics are missing.
# %% [markdown]
# # 6) Build curated station_timeseries (rainfall as base)
# %%
# There are duplicate entries in some of the dataset. So this is needed.
def dedup_on_key(df, key_cols, order_cols=None):
if order_cols is None:
order_cols = [F.lit(1)]
w = Window.partitionBy(*key_cols).orderBy(*order_cols)
return (
df.withColumn("_rn", F.row_number().over(w))
.where(F.col("_rn") == 1)
.drop("_rn")
)
# %%
# This is the main design choice:
# - rainfall is the driving dataset
# - we LEFT JOIN other metrics so we keep all rainfall rows
# - missing values become NULL and can be handled in Gold feature engineering later
rain_select = rain.select(
"station_id", "ts", "dt", "station_name", "latitude", "longitude", "rainfall_mm"
)
temp_select = dedup_on_key(
temp.select("station_id", "ts", "air_temp_c"), ["station_id", "ts"]
)
humid_select = dedup_on_key(
humid.select("station_id", "ts", "rel_humidity_pct"), ["station_id", "ts"]
)
wind_dir_select = dedup_on_key(
wind_dir.select("station_id", "ts", "wind_dir_deg"), ["station_id", "ts"]
)
wind_speed_select = dedup_on_key(
wind_speed.select("station_id", "ts", "wind_speed_knots"), ["station_id", "ts"]
)
station_timeseries = (
rain_select.join(temp_select, on=["station_id", "ts"], how="left")
.join(humid_select, on=["station_id", "ts"], how="left")
.join(wind_dir_select, on=["station_id", "ts"], how="left")
.join(wind_speed_select, on=["station_id", "ts"], how="left")
)Hard data engineering problem #2: null coverage in real-time API
Issue
In real-time snapshots, several stations had null values for non-rain fields (e.g., air temperature, humidity, wind direction, wind speed), even when rainfall was available.
This was not random processing noise. It reflected source availability differences by station/metric.
Decision
Instead of dropping these rows, I added data quality signals:
missing_metric_countis_complete_row
Tradeoff
These fields became useful both for observability and as input signals, allowing the model to infer with partial context rather than fail hard on sparsity.
Data quality as a first-class featureIn near-real-time systems, missingness is often part of the signal. Tracking it explicitly can improve reliability and transparency.
Feature engineering and label design
For training_rows, I engineered time-series features such as:
- lag features (recent rainfall history),
- rolling aggregates (short-window summaries),
- calendar/time indicators,
- wind-related transformations.
Target choice:
- selected target:
risk_class, - derived from
future_rain_sum_mmusing threshold logic:< 0.2->Low< 1.0->Medium- otherwise ->
High
Important leakage control:
future_rain_sum_mmis label-derived context and is excluded from model inputs when training forrisk_class.
This choice makes outputs more operationally interpretable than a raw regression target.
Model training and evaluation tradeoffs
I trained the model in SageMaker Canvas for fast experimentation and deployment integration.
Class imbalance strategy
A known challenge was class imbalance: Low class dominates, while Medium and High are minority classes but operationally more important.
Because of this, I set F1 Macro as the optimization metric during training.
Evaluation metrics that matter
Evaluation mindset used:
- accuracy is reported but not used as the main decision metric,
- macro metrics to reduce majority-class distortion,
- class-wise recall to judge whether risky cases are being captured sufficiently.
This is a practical risk-oriented evaluation approach rather than leaderboard optimization.
Model performance
Overall metrics
Canvas output for the final classification model:
- Accuracy:
97.486% - F1 Macro:
0.837(~83.7%) - Precision Macro:
0.887(~88.7%) - Recall Macro:
0.795(~79.5%) - Balanced Accuracy:
0.795 - Log Loss:
0.074
Class-wise metrics
Class-wise performance:
| Class | Precision | Recall | F1 |
|---|---|---|---|
| Low | 98.329% | 99.541% | 0.989 |
| Medium | 78.893% | 66.154% | 0.720 |
| High | 88.728% | 72.954% | 0.801 |
Interpretation
Interpretation:
- Overall accuracy is high, but this is expected with a
Low-dominant distribution. - The operational constraint is minority-class recall, especially for
MediumandHigh. - This is why model selection focused on macro metrics and class behavior, not accuracy alone.
Training-serving runtime design
Inference pipeline cadence:
- ingest API snapshots every 5 minutes,
- process and infer every 15 minutes.
Why not full 5-minute end-to-end inference? Cost and runtime efficiency. Running all downstream steps every 5 minutes was not the best cost-performance point since the minimum horizon needed is 15 minutes. Besides, the downstream data processing took 7-8 minutes.
I used endpoint inference (instead of batch transform) for lower-latency operational scoring loops.
Orchestration design:
- EventBridge schedules triggers,
- Step Functions chains ETL/feature/inference stages,
- serving outputs are written to S3 and queryable in Athena views.
This gives a predictable, auditable pipeline lifecycle.
SELECT *
FROM "weather_ml"."serving_risk_predictions_enriched_latest"
limit 10;Billing and charges: what cost, why it cost, and how I optimized
I wanted this implementation to be financially defensible apart from technically working.
Billing period: 2026-02-01 to 2026-02-27 (month-to-date extract). Total cost: $308.33.
Service-level breakdown
Service-level breakdown:
| Service | Why used | Main cost driver | Optimization applied | Cost (USD) |
|---|---|---|---|---|
| SageMaker | Model training and endpoint inference | Training/hosting compute + Canvas sessions | Reused one endpoint, tuned cadence, avoided unnecessary retrains | 171.14 |
| Glue | ETL + feature pipelines + crawlers | DPU runtime hours | Right-sized workers, scoped jobs, sampled local testing before full runs | 81.63 |
| CloudWatch | Logs and metrics | Metric monitor usage + log ingestion | Kept monitoring practical, reviewed noisy metrics/log volume | 27.18 |
| S3 | Multi-layer lake storage | Storage + request volume | Partitioned lake layout and bounded read/write patterns | 2.08 |
| Secrets Manager | API key and secret handling | Secret storage + API calls | Reused secrets and avoided unnecessary rotations/API calls | 0.34 |
| Step Functions | Pipeline orchestration | State transitions | Lean state-machine design with clear stage boundaries | 0.27 |
| KMS | Encryption operations | KMS API request volume | Minimized unnecessary re-encryption paths | 0.19 |
| Athena | SQL validation and QA queries | Data scanned per query | Partition pruning + bounded lookback windows | 0.06 |
| EventBridge (CloudWatch Events) | Scheduling | Rule invocations | Consolidated schedules | 0.00 |
| QuickSight | Dashboard delivery | Reader/author/session usage | Lightweight dashboard scope during this cycle | 0.00 |
Primary cost drivers
Primary cost drivers from detailed usage types:
USE1-Train:ml.m5.12xlarge:$126.76(largest single line item)USE1-ETL-DPU-Hour:$61.81USE1-Canvas:Session-Hrs:$23.84USE1-Crawler-DPU-Hour:$16.48USE1-GlueInteractiveSession-DPU-Hour:$3.35
Optimization actions
Concrete optimizations I implemented:
- Bounded lookback windows to avoid scanning unnecessary real-time data per run.
- Cadence separation (5-minute ingest, 15-minute infer) to reduce repeated heavy work.
- Partition-aware reads and writes in lake tables.
- Operational endpoint management discipline to avoid idle spend.
- Right-sizing Glue jobs for expected record volume in scheduled windows.
Dashboard outcome and QuickSight limitation
I delivered a QuickSight dashboard with KPI/risk overview and station-level map views. It was sufficient for analytics and project delivery, but there are UX constraints.
QuickSight limitation (for this use case):
- great for managed analytics and quick visualization,
- less flexible for highly interactive operational workflows compared to a custom application.
So the dashboard is a strong analytics endpoint, but a full production UX would likely use an application layer for richer interactions, better control of workflows, and tighter decision support loops.
What I would improve next (v2 roadmap)
If I continue this project, the roadmap is:
Engineering lessons learned
Three practical lessons stand out:
- Data quality problems are not edge cases in real-time ML systems; they are core design inputs.
- Train-serving consistency beats isolated model tuning for operational reliability.
- Cost and orchestration decisions are model decisions in production-like ML, because they directly shape latency, availability, and business usability.
This project gave me hands-on experience across data engineering, ML pipeline design, cloud operations, and delivery tradeoffs. The model is one component; the system is the product.
If you are building similar near-real-time ML pipelines on public data, I am happy to compare architecture and tradeoff decisions.