Build a simple ETL (Extract, Transform, Load) pipeline. Implement three stages: extract data from a source (e.g., list of raw records), transform it (clean, normalize, or compute features), and load the results into a structured output.
def extract(raw_data: list[dict]) -> list[dict]:
extracted = []
for record in raw_data:
if record is not None and isinstance(record, dict):
extracted.append(record)
return extracted
def transform(records: list[dict], key_map: dict = None,
numeric_fields: list[str] = None) -> list[dict]:
transformed = []
for record in records:
new_record = {}
for k, v in record.items():
new_key = key_map.get(k, k) if key_map else k
if numeric_fields and new_key in numeric_fields:
try:
v = float(v)
except (ValueError, TypeError):
v = 0.0
if isinstance(v, str):
v = v.strip().lower()
new_record[new_key] = v
transformed.append(new_record)
return transformed
def load(records: list[dict]) -> dict:
return {
"data": records,
"count": len(records),
"columns": list(records[0].keys()) if records else []
}
def etl_pipeline(raw_data: list[dict], key_map: dict = None,
numeric_fields: list[str] = None) -> dict:
extracted = extract(raw_data)
transformed = transform(extracted, key_map, numeric_fields)
result = load(transformed)
return resultNone values and non-dict entries from the raw input.key_map, convert specified fields to floats, and normalize strings (strip whitespace, lowercase).etl_pipeline function chains the three stages together.