← back

Build a Simple ETL Pipeline (MLOps)

#187 · MLOps · Medium

⊣ Solve on deep-ml.com

Problem

Build a simple ETL (Extract, Transform, Load) pipeline. Implement three stages: extract data from a source (e.g., list of raw records), transform it (clean, normalize, or compute features), and load the results into a structured output.

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def extract(raw_data: list[dict]) -> list[dict]:
    extracted = []
    for record in raw_data:
        if record is not None and isinstance(record, dict):
            extracted.append(record)
    return extracted

def transform(records: list[dict], key_map: dict = None,
              numeric_fields: list[str] = None) -> list[dict]:
    transformed = []
    for record in records:
        new_record = {}
        for k, v in record.items():
            new_key = key_map.get(k, k) if key_map else k
            if numeric_fields and new_key in numeric_fields:
                try:
                    v = float(v)
                except (ValueError, TypeError):
                    v = 0.0
            if isinstance(v, str):
                v = v.strip().lower()
            new_record[new_key] = v
        transformed.append(new_record)
    return transformed

def load(records: list[dict]) -> dict:
    return {
        "data": records,
        "count": len(records),
        "columns": list(records[0].keys()) if records else []
    }

def etl_pipeline(raw_data: list[dict], key_map: dict = None,
                 numeric_fields: list[str] = None) -> dict:
    extracted = extract(raw_data)
    transformed = transform(extracted, key_map, numeric_fields)
    result = load(transformed)
    return result

Explanation

  1. Extract: Filter out None values and non-dict entries from the raw input.
  2. Transform: Rename keys using an optional key_map, convert specified fields to floats, and normalize strings (strip whitespace, lowercase).
  3. Load: Package the cleaned records into a structured dictionary with metadata (count, column names).
  4. The etl_pipeline function chains the three stages together.

Complexity

  • Time: O(n * m) where n is the number of records and m is the number of fields per record
  • Space: O(n * m) for storing the transformed records