Skip to main content

ETL Engine best practice

When writing ETL Engine SQL, consider the following best practice:

Goal: Avoid accidental data shuffles.

In distributed systems like Spark, removing duplicates is expensive because it requires moving data across the network (Shuffle) to compare rows.

DISTINCT

  • The issue: SELECT DISTINCT forces a huge shuffle of your entire dataset.

  • Best practice: Only use DISTINCT if you have verified that duplicates actually exist. Never use it "just in case."

UNION vs. UNION ALL

UNION: Appends data AND removes duplicates. (Slow: Requires Shuffle).

  • Best practice: Fast append. Handle duplicates later only if needed.

    SELECT * FROM table_A
    UNION ALL
    SELECT * FROM table_B;
  • Avoid this: Triggers a heavy shuffle to check for duplicates

    SELECT * FROM table_A
    UNION
    SELECT * FROM table_B;

Goal: Reduce the amount of data Spark reads from the disk.

Spark uses Columnar Storage (Parquet/Delta). This means every column is stored separately on the hard drive.

  • Best practice: SELECT id, status (Reads only the files you need).

  • Avoid this: SELECT * (Reads every single column file, even unused ones).

Goal: Filter out rows that exist in another table (e.g., "Get customers who have never placed an order").

  • Best practice: Both of the following methods are safe and efficient. Spark's optimizer (Catalyst) converts NOT EXISTS into the same efficient execution plan (Anti Join) as the explicit join syntax. Spark reads the Left table (Customers) and attempts to find a match in the Right table (Orders). If a match is found, the row is discarded. If no match is found, the row is kept. This process is highly optimized and ignores NULL values safely.

    • Option A: LEFT ANTI JOIN (Explicit) - Think of this as efficient subtraction.

      SELECT * FROM customers c
      LEFT ANTI JOIN orders o
      ON c.customer_id = o.customer_id;
    • Option B: NOT EXISTS (Standard SQL) - This is safer than NOT IN because it handles NULLs correctly and allows Spark to optimize.

      SELECT * FROM customers c
      WHERE NOT EXISTS (
          SELECT 1 FROM orders o
          WHERE c.customer_id = o.customer_id
      );
  • Avoid this: Using NOT IN with a subquery is dangerous. If the subquery contains even a single NULL value, Spark often falls back to a slow, single-threaded process (Cartesian Product) to handle the logic.

    SELECT * FROM customers
    WHERE customer_id NOT IN (SELECT customer_id FROM orders);

Goal: Filter rows to keep only those that exist in another table (e.g., "Get the details of customers who have placed at least one order").

Best Practice: LEFT SEMI JOIN, EXISTS, or IN - All three of these methods are safe and highly efficient. Spark's optimizer converts EXISTS and IN into the exact same execution plan as LEFT SEMI JOIN.

How it works: Spark reads the Left table (Customers) and searches the Right table (Orders). The massive advantage here is that Spark stops searching the moment it finds the very first match. Because it doesn't merge the data, it completely prevents row duplication and saves a tremendous amount of processing time.

  • Option A - LEFT SEMI JOIN (Explicit): Think of this as a highly efficient filter rather than a data merge. It explicitly tells Spark you only care about returning data from the left side.

    -- ✅ BEST PRACTICE: Fast, explicit, and prevents duplication
    SELECT c.* FROM customers c
    LEFT SEMI JOIN orders o 
      ON c.customer_id = o.customer_id;
  • Option B - EXISTS (Standard SQL): This reads naturally and is highly optimized by Spark, making it an excellent choice for complex subqueries.

    -- ✅ BEST PRACTICE: Excellent readability for complex subqueries
    SELECT * FROM customers c
    WHERE EXISTS (
        SELECT 1 FROM orders o
        WHERE c.customer_id = o.customer_id
    );
  • Option C - IN (Standard SQL): Works the exact same way as EXISTS for positive checks. While good for simple subqueries, it is also the best choice when filtering against a hardcoded list of values.

    -- ✅ BEST PRACTICE: Perfect for simple subqueries or hardcoded lists
    
    -- Using IN with a subquery
    SELECT * FROM customers 
    WHERE customer_id IN (SELECT customer_id FROM orders);
    
    -- Using IN with a hardcoded list
    SELECT * FROM customers 
    WHERE status IN ('active', 'pending_renewal');

Goal: Get the "latest," "largest," or "most recent" record for every group.

  • Best practice: Use ROW_NUMBER() to rank items in a single pass.

    WITH RankedUpdates AS (
        SELECT
            user_id, status, updated_at,
            ROW_NUMBER() OVER (
                PARTITION BY user_id
                ORDER BY updated_at DESC
            ) as rn
        FROM user_updates
    )
    SELECT user_id, status, updated_at
    FROM RankedUpdates
    WHERE rn = 1;

    Window frames (Ascending vs. Descending)

    If you are using running totals or sliding windows (e.g., SUM(...) OVER ...), the direction of your sort matters significantly for the Databricks Photon engine.

    • The trap: Using ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING (looking forward) often forces Spark to abandon the fast Vectorized/Photon engine and fall back to slow Java row-by-row processing.

    • The fix: Always try to look backwards. If you need to look forward, simply reverse your sort order and look backwards.

      Example:

      • Fast: ORDER BY date DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

      • Slow: ORDER BY date ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING

  • Avoid this: Finding the "Max Date" and then joining the table back to itself forces Spark to read the data twice and perform a heavy shuffle.

Goal: Speed up queries by eliminating data before processing it.

  • Best practice: Filter on columns as they exist in the source data. This allows Spark to skip reading entire files or partitions ("Partition Pruning").

    Spark reads only relevant data first:

    SELECT *, TO_DATE(timestamp_col) as evt_date
    FROM huge_event_log
    WHERE timestamp_col LIKE '2024-01-01%
  • Avoid this: Applying filters after heavy transformations forces Spark to process the entire dataset, only to throw most of it away later.

    Parses all timestamps before filtering:

    SELECT * FROM (
      SELECT *, TO_DATE(timestamp_col) as evt_date
      FROM huge_event_log
    )
    WHERE evt_date = '2024-01-01';

Goal: Remove rows efficiently.

Rule: Use DELETE for simple logic (e.g., date ranges). Use MERGE when removing a list or joining to another table.

Why: MERGE is optimized for "Join-based" updates. DELETE with subqueries (IN or EXISTS) can be slower and harder for the optimizer to execute efficiently.

  • Best practice: Use MERGE to join and delete in one optimized step.

    Efficient join based delete:

    MERGE INTO huge_table target
    USING banned_users source
    ON target.user_id = source.user_id
    WHEN MATCHED THEN DELETE;
  • Avoid this: Using subqueries inside DELETE is often inefficient.

    DELETE FROM huge_table
    WHERE EXISTS (
       SELECT 1 FROM banned_users b
       WHERE huge_table.user_id = b.user_id
    );

Goal: Avoid expensive "Fuzzy Matching" (LIKE, %, Regex) which forces slow table scans and loops.

  • Case A: Joining Tables (Avoid Non-Equi Joins) - Spark is optimized for Equi-Joins (where Key A = Key B). If you use LIKE in a Join, Spark falls back to a Nested Loop Join, which is extremely slow.

    • Best practice: Refactor the join key using LPAD or SUBSTRING to force an exact match.

      Enables fast Hash Joins by forcing '=':

      SELECT * FROM huge_invoices a
      JOIN huge_customers b

      Example: Pad the shorter code to match the longer ID:

      ON a.invoice_id = lpad(b.customer_code, 10, '0');
    • Avoid this: Forces a Nested Loop Join (Timeout Risk)

      SELECT * FROM huge_invoices a
      JOIN huge_customers b
      ON a.invoice_id LIKE concat('%', b.customer_code);
  • Case B: Complex Filtering (Preprocess Logic). If you have complex logic (regex/case statements) in a WHERE clause, Spark has to evaluate it for every single row at runtime.

    • Best practice: Extract the logic into a clean column once using a Temporary View or Table, then filter on that simple column.

      • 1. Preprocess: Extract logic into a clean column:

        CREATE OR REPLACE TEMP VIEW CleanLogs AS
        SELECT *,
          CASE
            WHEN message RLIKE 'Error-[0-9]+' OR message LIKE '%CRITICAL%' THEN 'High'
            ELSE 'Low'
          END as urgency
        FROM huge_logs;
      • 2. Analyze: Fast filtering on the new simple column:

        SELECT * FROM CleanLogs WHERE urgency = 'High'; 

Goal: Update a table's structure without breaking concurrent queries or losing history.

  • Best practice: CREATE OR REPLACE TABLE ... (Atomic swap; zero downtime).QL

    CREATE OR REPLACE TABLE my_table
    AS SELECT * FROM ...;
  • Avoid this: DROP TABLE ... followed by CREATE TABLE ... (Causes downtime and "Table Not Found" errors).

Goal: Prevent "Dead End" columns that cause UNION failures and schema evolution crashes.

When you define a table (especially using CREATE TABLE AS SELECT or CTAS) and include a column defined simply as NULL without a cast, Spark infers the data type as NullType (or VoidType). This is a "silent killer" in pipelines.

The problems it causes:

  • Union mismatches: If you try to UNION this table with another table where the column is populated (e.g., it is a String), Spark will crash. The engine cannot automatically reconcile the difference between a "typed" column (String) and a "void" column (NullType) in strict mode.

  • Schema evolution crashes: If you try to append real data to this table later, the merge will fail because NullType cannot safely evolve into StringType automatically.

  • Best practice: Always cast explicit NULLs to their intended future type.

    CREATE TABLE good_table AS
    SELECT
        id,
        CAST(NULL as STRING) as status_code,
        CAST(NULL as INT) as retry_count
    FROM source_data;
  • Avoid this; Spark creates a "dead" column that breaks future Unions.

    Table A has status_code as NullType:

    SELECT id, NULL as status_code FROM table_A
    UNION ALL

    Table B has status_code as StringType -> CRASH

    SELECT id, 'Active' as status_code FROM table_B;

Goal: Guarantee reproducible results in rankings and running totals by eliminating random row shuffling.

Window functions (like ROW_NUMBER, RANK, or running SUM) rely entirely on the sequence rows defined in the OVER clause. If your ORDER BY clause contains duplicate values and lacks a unique tie-breaker, the SQL engine is free to shuffle the rows arbitrarily within those duplicates.

The problems it causes:

  • Non-deterministic results: Running the same query twice on the exact same data can produce different results. For example, a record might be ranked #1 in the first run and #2 in the second run if they share the same sorting value.

  • Data consistency errors: In pagination or deduplication logic (e.g., "Keep only the latest record"), a lack of a tie-breaker means the "winner" is chosen randomly, leading to data loss or "flickering" datasets.

Best practice: Always add a column with unique values (like a Primary Key or UUID) to the ORDER BY clause to act as a tie-breaker.

SELECT
    event_id,
    created_at,
    ROW_NUMBER() OVER (
        PARTITION BY user_id
        ORDER BY
            created_at,      -- Primary sort
            event_id         -- ✅ Tie-breaker forces stable, deterministic order
    ) as event_rank
FROM user_events;

Avoid this: Relying on a non-unique column alone leaves the sort order ambiguous.

-- ⚠️ 'created_at' may have duplicates (multiple events per second).
-- The engine arbitrarily decides which row comes first for the same timestamp.
SELECT
    event_id,
    created_at,
    ROW_NUMBER() OVER (
        PARTITION BY user_id
        ORDER BY created_at
    ) as event_rank -- ⚠️ Non-deterministic!
FROM user_events;

Goal: Query hierarchical data, such as traversing an employee org chart, or resolving a bill of materials.

How it works: A Recursive Common Table Expression (CTE) is a query that references itself. Think of it as a loop inside SQL. It consists of two parts connected by a UNION ALL:

  1. The anchor member: The starting point (e.g., finding the CEO who has no manager). Spark runs this first.

  2. The recursive member: The looping step. It takes the results from the previous step and uses them to find the next level of data. The loop stops when a step returns zero new rows.

The problems it causes:

  • Infinite Loops & The "Rule of 100": If your data has circular references (e.g., A manages B, and B manages A), the query will loop endlessly. By default, Spark has a hard limit of 100 recursive iterations. If a query requires more than 100 loops, it will fail.

  • Implicit Row Limits: Recursive CTEs have a default generation limit of 1 million rows. If your results exceed this limit, the query will also fail

Best Practice 1 - Safe Recursivity (Depth Control & Row Limits) : When using recursive CTEs, always create a depth counter in your anchor step, increment it in the recursive step, and add a hard stop in your WHERE clause. To bypass the implicit 1-million-row limit, append a massive LIMIT to your final SELECT.

-- ✅ BEST PRACTICE: Safe recursion with depth control and bypassed limits
WITH RECURSIVE org_chart AS (
    -- 1. THE ANCHOR: Start with the top of the hierarchy (Depth = 1)
    SELECT employee_id, manager_id, name, 1 AS depth
    FROM employees
    WHERE manager_id IS NULL
    
    UNION ALL
    
    -- 2. THE RECURSIVE: Join the previous results back to the table
    SELECT e.employee_id, e.manager_id, e.name, oc.depth + 1
    FROM employees e
    JOIN org_chart oc 
      ON e.manager_id = oc.employee_id
    -- BEST PRACTICE: Stop the loop safely before the 100-iteration hard limit
    WHERE oc.depth < 10 
)
-- BEST PRACTICE: Overcome the implicit 1M row limit for large results
SELECT * FROM org_chart
LIMIT 999999999;

Best Practice 2 - The Stepped CTE Approach (For Combinations): If you must use recursive CTEs to generate thousands of combinations (like dates), break the recursion into smaller hierarchical steps and multiply them using a CROSS JOIN. Since the maximum number of days in a month is 31, no single CTE will ever exceed the 100-loop limit.

-- ✅ BEST PRACTICE: Stepped CTEs keep individual loops safely under the 100 limit
WITH RECURSIVE years AS (
    SELECT 2024 AS yr UNION ALL SELECT yr + 1 FROM years WHERE yr < 2030
),
months AS (
    SELECT 1 AS mth UNION ALL SELECT mth + 1 FROM months WHERE mth < 12
),
days AS (
    SELECT 1 AS dy UNION ALL SELECT dy + 1 FROM days WHERE dy < 31
)
-- Combine them to generate dates using concat() and try_to_timestamp()
-- try_to_timestamp safely returns NULL for invalid dates (like Feb 30), 
-- preventing strict-mode crashes.
SELECT try_to_timestamp(concat(yr, '-', mth, '-', dy), 'yyyy-M-d') AS calendar_date
FROM years 
CROSS JOIN months 
CROSS JOIN days
WHERE try_to_timestamp(concat(yr, '-', mth, '-', dy), 'yyyy-M-d') IS NOT NULL
ORDER BY calendar_date;

Best Practice 3 - Native Functions (Highly Recommended for Sequences): While the stepped CTE approach works, Databricks has a native, highly optimized way to generate tables without using recursion at all. Use sequence() to generate an array, and explode() to turn it into rows. The fastest, cleanest way to generate dates in Spark which generates 10 years of days instantly with zero recursion

SELECT explode(
    sequence(try_to_timestamp('2020-01-01'), try_to_timestamp('2030-12-31'), INTERVAL 1 DAY)
) AS calendar_date;

Avoid this: Linear Recursion & 100 Iterations. Do not use open-ended recursion, and don't use a Recursive CTE for operations that require more than 100 linear steps (like generating a calendar day-by-day).

-- ⚠️ AVOID THIS: Fails because generating a year of days takes 365 loops (> 100 limit)
WITH RECURSIVE calendar AS (
    SELECT try_to_timestamp('2024-01-01') AS cal_date 
    UNION ALL 
    SELECT date_add(cal_date, 1) 
    FROM calendar 
    WHERE cal_date < try_to_timestamp('2024-12-31')
)
SELECT * FROM calendar;

Goal: Choose the most efficient way to store intermediate data based on query cost and how often the data is reused in your transformation.

Because Spark evaluates TEMP VIEW lazily, the underlying logic is recalculated every single time the view is queried. Deciding when to use a view versus writing a physical table (checkpointing) is critical for the transformation performance.

The problems it causes:

  • Massive recomputation: Relying on a TEMP VIEW for complex queries (massive joins, heavy aggregations) that are referenced multiple times downstream forces Spark to re-execute the expensive operations from scratch every single time.

  • Wasted I/O: Conversely, writing simple, single-use transformations to a physical table wastes disk I/O, storage space, and slows down the transformation with unnecessary write operations.

Best Practice - Match the Tool to the Task: Default to a TEMP VIEW for lightweight logic. If the query is heavy AND reused, save it as a physical table to act as a "checkpoint," trading a one-time disk write for massive CPU savings.

-- ✅ BEST PRACTICE: Option A - Temporary View (The Lightweight Default)
-- Fast, zero I/O, and self-cleaning. Perfect for simple filters or single-use logic.
CREATE OR REPLACE TEMP VIEW clean_users AS
SELECT * FROM raw_users WHERE email IS NOT NULL;


-- ✅ BEST PRACTICE: Option B - Physical Table (The "Checkpoint")
-- Computes the expensive math ONCE and saves it to disk. 

CREATE TABLE heavy_math_checkpoint AS
SELECT customer_id, complex_heavy_aggregation(history) AS score
FROM massive_transaction_table
GROUP BY customer_id;

-- These subsequent queries are now fast because the operations are already done
SELECT * FROM heavy_math_checkpoint WHERE score > 90;
SELECT AVG(score) FROM heavy_math_checkpoint;

-- Explicitly clean up at the end of the transformation
DROP TABLE heavy_math_checkpoint;