Building a Near-Real-Time Rain Risk ML Pipeline on AWS with data.gov.sg

A practical engineering postmortem on architecture, tradeoffs, cost controls, and what I would improve next

ยท 12 min read
On this page

Building a Near-Real-Time Rain Risk ML Pipeline on AWS with data.gov.sg

This project is an end-to-end machine learning engineering build for station-level rain risk nowcasting. The goal is simple: generate actionable Low / Medium / High rain risk signals for outdoor event operations at short horizons (15, 30, and 60 minutes). The implementation is not simple: live API ingestion, time alignment, feature generation, model serving, orchestration, and dashboard delivery all need to work together under cost and reliability constraints.

I used data.gov.sg historical data for training and real-time weather APIs for inference.

Sources:

Why this writeup exists

The objective here is to show implementation decisions and tradeoffs that matter in production-like ML systems, not just report a single accuracy number.

Problem framing and scope

Operational problem

Event teams need earlier warning when rain risk increases, so they can decide on manpower, shelter activation, and schedule adjustments. Monitoring raw weather metrics manually is reactive and hard to scale. A categorized risk output is easier for operations than raw mm rainfall numbers.

Success criteria

Project success criteria:

  1. Automate ingestion from real-time weather APIs.
  2. Build a reproducible training dataset with stable schema.
  3. Train and deploy a model that serves risk predictions continuously.
  4. Surface outputs in analytics dashboards for decision support.
  5. Keep spend controlled with explicit cost optimizations.

This is not a perfect production platform. It is a robust engineering baseline with clear next steps.

End-to-end architecture

Lake layers

I implemented a layered pipeline:

  • raw: ingested API snapshots and historical extracts.
  • curated: cleaned, aligned, and standardized station-time records.
  • gold: model-ready rows (training_rows and inference rows).
  • serving: prediction outputs for downstream dashboards.

AWS services used

AWS services used:

  • S3 for lake storage across layers.
  • Glue for ETL and feature jobs.
  • Athena for SQL validation and ad-hoc analysis.
  • SageMaker Canvas for model training and endpoint deployment.
  • Step Functions + EventBridge for orchestration/scheduling.
  • QuickSight for business-facing dashboard consumption.

Design principle: train-serving consistency

The key engineering principle was train-serving consistency: inference rows should follow the same feature logic as training rows wherever possible.

Why data.gov.sg?

I intentionally used both historical and real-time data.gov.sg weather APIs because it mirrors real integration work:

  • historical backfill for training and feature sanity checks,
  • real-time snapshots for operational inference.

This highlights practical experience with public APIs: designing around inconsistent availability, null fields, and cadence mismatch while still building deterministic downstream datasets.

Development workflow: Jupyter + Jupytext + Terraform

I developed Glue jobs in Jupyter first for faster iteration and debugging, then converted notebooks to Python scripts using Jupytext, and deployed those scripts as Glue jobs using Terraform.

Why this workflow

Why:

  • Interactivity during development: quicker iteration on transformations and schema checks.
  • Faster test loops: I validate logic on a small sample dataset before full runs to reduce turnaround time and cost.
  • Script parity for deployment: no notebook-only logic trapped outside source control.
  • Infrastructure as code: repeatable deployment and easier maintenance for job updates.

Implementation flow

Workflow in practice:

Notebook -> Jupytext -> Git -> Terraform

  1. Prototype ETL or feature logic in notebook on sampled data.
  2. Sync notebook to .py via Jupytext as a pre-commit hook
  3. Commit changes
  4. Deploy/update Glue jobs with Terraform modules by running terraform apply

This allows me to leverage notebook speed and by using git hooks and terraform, it simplifies the workflow.

Hard data engineering problem #1: cadence mismatch

Issue

A major issue was timestamp cadence mismatch:

  • rainfall data at 5-minute cadence,
  • some additional metrics at 1-minute cadence.

Naive joins caused dropped rows or inconsistent alignment. That breaks feature reliability.

Decision

What I did:

  • treated rainfall cadence as the anchor for alignment,
  • aligned additional metrics to rainfall timestamps during curation,
  • preserved missingness explicitly instead of forcing aggressive drops.

Tradeoff

This was a deliberate tradeoff: keep broader station coverage and model robustness, even when some metrics are missing.

python
# %% [markdown]
# # 6) Build curated station_timeseries (rainfall as base)
 
 
# %%
# There are duplicate entries in some of the dataset. So this is needed.
def dedup_on_key(df, key_cols, order_cols=None):
    if order_cols is None:
        order_cols = [F.lit(1)]
    w = Window.partitionBy(*key_cols).orderBy(*order_cols)
    return (
        df.withColumn("_rn", F.row_number().over(w))
        .where(F.col("_rn") == 1)
        .drop("_rn")
    )
 
 
# %%
# This is the main design choice:
# - rainfall is the driving dataset
# - we LEFT JOIN other metrics so we keep all rainfall rows
# - missing values become NULL and can be handled in Gold feature engineering later
 
rain_select = rain.select(
    "station_id", "ts", "dt", "station_name", "latitude", "longitude", "rainfall_mm"
)
temp_select = dedup_on_key(
    temp.select("station_id", "ts", "air_temp_c"), ["station_id", "ts"]
)
humid_select = dedup_on_key(
    humid.select("station_id", "ts", "rel_humidity_pct"), ["station_id", "ts"]
)
wind_dir_select = dedup_on_key(
    wind_dir.select("station_id", "ts", "wind_dir_deg"), ["station_id", "ts"]
)
wind_speed_select = dedup_on_key(
    wind_speed.select("station_id", "ts", "wind_speed_knots"), ["station_id", "ts"]
)
 
station_timeseries = (
    rain_select.join(temp_select, on=["station_id", "ts"], how="left")
    .join(humid_select, on=["station_id", "ts"], how="left")
    .join(wind_dir_select, on=["station_id", "ts"], how="left")
    .join(wind_speed_select, on=["station_id", "ts"], how="left")
)

Hard data engineering problem #2: null coverage in real-time API

Issue

In real-time snapshots, several stations had null values for non-rain fields (e.g., air temperature, humidity, wind direction, wind speed), even when rainfall was available.

This was not random processing noise. It reflected source availability differences by station/metric.

Decision

Instead of dropping these rows, I added data quality signals:

  • missing_metric_count
  • is_complete_row

Tradeoff

These fields became useful both for observability and as input signals, allowing the model to infer with partial context rather than fail hard on sparsity.

Data quality as a first-class feature

In near-real-time systems, missingness is often part of the signal. Tracking it explicitly can improve reliability and transparency.

Feature engineering and label design

For training_rows, I engineered time-series features such as:

  • lag features (recent rainfall history),
  • rolling aggregates (short-window summaries),
  • calendar/time indicators,
  • wind-related transformations.

Target choice:

  • selected target: risk_class,
  • derived from future_rain_sum_mm using threshold logic:
    • < 0.2 -> Low
    • < 1.0 -> Medium
    • otherwise -> High

Important leakage control:

  • future_rain_sum_mm is label-derived context and is excluded from model inputs when training for risk_class.

This choice makes outputs more operationally interpretable than a raw regression target.

Model training and evaluation tradeoffs

I trained the model in SageMaker Canvas for fast experimentation and deployment integration.

Class imbalance strategy

A known challenge was class imbalance: Low class dominates, while Medium and High are minority classes but operationally more important.

Because of this, I set F1 Macro as the optimization metric during training.

Evaluation metrics that matter

Evaluation mindset used:

  • accuracy is reported but not used as the main decision metric,
  • macro metrics to reduce majority-class distortion,
  • class-wise recall to judge whether risky cases are being captured sufficiently.

This is a practical risk-oriented evaluation approach rather than leaderboard optimization.

Model performance

Overall metrics

Canvas output for the final classification model:

  • Accuracy: 97.486%
  • F1 Macro: 0.837 (~83.7%)
  • Precision Macro: 0.887 (~88.7%)
  • Recall Macro: 0.795 (~79.5%)
  • Balanced Accuracy: 0.795
  • Log Loss: 0.074

Class-wise metrics

Class-wise performance:

ClassPrecisionRecallF1
Low98.329%99.541%0.989
Medium78.893%66.154%0.720
High88.728%72.954%0.801

Interpretation

Interpretation:

  • Overall accuracy is high, but this is expected with a Low-dominant distribution.
  • The operational constraint is minority-class recall, especially for Medium and High.
  • This is why model selection focused on macro metrics and class behavior, not accuracy alone.

Training-serving runtime design

Inference pipeline cadence:

  • ingest API snapshots every 5 minutes,
  • process and infer every 15 minutes.

Why not full 5-minute end-to-end inference? Cost and runtime efficiency. Running all downstream steps every 5 minutes was not the best cost-performance point since the minimum horizon needed is 15 minutes. Besides, the downstream data processing took 7-8 minutes.

I used endpoint inference (instead of batch transform) for lower-latency operational scoring loops.

Orchestration design:

  • EventBridge schedules triggers,
  • Step Functions chains ETL/feature/inference stages,
  • serving outputs are written to S3 and queryable in Athena views.

This gives a predictable, auditable pipeline lifecycle.

sql
SELECT *
FROM "weather_ml"."serving_risk_predictions_enriched_latest"
limit 10;

Billing and charges: what cost, why it cost, and how I optimized

I wanted this implementation to be financially defensible apart from technically working.

Billing period: 2026-02-01 to 2026-02-27 (month-to-date extract). Total cost: $308.33.

Service-level breakdown

Service-level breakdown:

ServiceWhy usedMain cost driverOptimization appliedCost (USD)
SageMakerModel training and endpoint inferenceTraining/hosting compute + Canvas sessionsReused one endpoint, tuned cadence, avoided unnecessary retrains171.14
GlueETL + feature pipelines + crawlersDPU runtime hoursRight-sized workers, scoped jobs, sampled local testing before full runs81.63
CloudWatchLogs and metricsMetric monitor usage + log ingestionKept monitoring practical, reviewed noisy metrics/log volume27.18
S3Multi-layer lake storageStorage + request volumePartitioned lake layout and bounded read/write patterns2.08
Secrets ManagerAPI key and secret handlingSecret storage + API callsReused secrets and avoided unnecessary rotations/API calls0.34
Step FunctionsPipeline orchestrationState transitionsLean state-machine design with clear stage boundaries0.27
KMSEncryption operationsKMS API request volumeMinimized unnecessary re-encryption paths0.19
AthenaSQL validation and QA queriesData scanned per queryPartition pruning + bounded lookback windows0.06
EventBridge (CloudWatch Events)SchedulingRule invocationsConsolidated schedules0.00
QuickSightDashboard deliveryReader/author/session usageLightweight dashboard scope during this cycle0.00

Primary cost drivers

Primary cost drivers from detailed usage types:

  • USE1-Train:ml.m5.12xlarge: $126.76 (largest single line item)
  • USE1-ETL-DPU-Hour: $61.81
  • USE1-Canvas:Session-Hrs: $23.84
  • USE1-Crawler-DPU-Hour: $16.48
  • USE1-GlueInteractiveSession-DPU-Hour: $3.35

Optimization actions

Concrete optimizations I implemented:

  1. Bounded lookback windows to avoid scanning unnecessary real-time data per run.
  2. Cadence separation (5-minute ingest, 15-minute infer) to reduce repeated heavy work.
  3. Partition-aware reads and writes in lake tables.
  4. Operational endpoint management discipline to avoid idle spend.
  5. Right-sizing Glue jobs for expected record volume in scheduled windows.

Dashboard outcome and QuickSight limitation

I delivered a QuickSight dashboard with KPI/risk overview and station-level map views. It was sufficient for analytics and project delivery, but there are UX constraints.

QuickSight limitation (for this use case):

  • great for managed analytics and quick visualization,
  • less flexible for highly interactive operational workflows compared to a custom application.

So the dashboard is a strong analytics endpoint, but a full production UX would likely use an application layer for richer interactions, better control of workflows, and tighter decision support loops.

What I would improve next (v2 roadmap)

If I continue this project, the roadmap is:

Engineering lessons learned

Three practical lessons stand out:

  1. Data quality problems are not edge cases in real-time ML systems; they are core design inputs.
  2. Train-serving consistency beats isolated model tuning for operational reliability.
  3. Cost and orchestration decisions are model decisions in production-like ML, because they directly shape latency, availability, and business usability.

This project gave me hands-on experience across data engineering, ML pipeline design, cloud operations, and delivery tradeoffs. The model is one component; the system is the product.

If you are building similar near-real-time ML pipelines on public data, I am happy to compare architecture and tradeoff decisions.

  • machine-learning
  • ml-engineering
  • data-engineering
  • aws
  • data-gov-sg
  • sagemaker
  • glue
  • terraform
  • timeseries