Data Engineering Cheatsheet

Snowflake, PySpark, Airflow, dbt, Kafka, data pipelines, warehousing & Python data libraries

Data / Engineering
Contents
#

Snowflake

Cloud-native data warehouse with separate compute & storage, auto-scaling, and near-zero maintenance.

Architecture

┌─────────────────────────────────────────────┐ │ Cloud Services Layer │ │ (Query parsing, optimization, metadata, │ │ access control, infrastructure mgmt) │ ├─────────────────────────────────────────────┤ │ Virtual Warehouses (Compute) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ XS WH │ │ M WH │ │ XL WH │ │ │ └─────────┘ └─────────┘ └─────────┘ │ ├─────────────────────────────────────────────┤ │ Centralized Storage (S3/Blob/GCS) │ │ Micro-partitions │ Columnar │ Compressed │ └─────────────────────────────────────────────┘

Key Concepts

ConceptDescription
Virtual WarehouseCompute cluster (XS to 6XL). Can suspend/resume. Pay only when running.
Database / SchemaLogical containers. DB.SCHEMA.TABLE naming convention.
Micro-partition50-500 MB compressed columnar files. Snowflake handles partitioning automatically.
Clustering KeyDefine sort order for micro-partitions — improves pruning on large tables.
Time TravelQuery data as it was up to 90 days ago. AT / BEFORE clauses.
Zero-Copy CloneInstant clone of DB/schema/table — no extra storage until data changes.
StagesInternal or external (S3/GCS) locations for loading data.
StreamsChange data capture (CDC) — track INSERTs, UPDATEs, DELETEs on a table.
TasksScheduled SQL execution. Chain tasks into DAGs.
PipesSnowpipe — continuous auto-ingest from stages.
#

Snowflake SQL

Warehouse & Database Setup

-- Create warehouse
CREATE WAREHOUSE analytics_wh
  WITH WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 300      -- suspend after 5 min idle
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3;  -- multi-cluster auto-scale

-- Create database & schema
CREATE DATABASE raw_data;
CREATE SCHEMA raw_data.events;

-- Create table
CREATE TABLE raw_data.events.clicks (
  id        INT AUTOINCREMENT,
  user_id   VARCHAR(50),
  event     VARCHAR(100),
  payload   VARIANT,        -- semi-structured JSON
  ts        TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

Data Loading

-- Create stage (S3)
CREATE STAGE my_s3_stage
  URL = 's3://my-bucket/data/'
  CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...');

-- Create file format
CREATE FILE FORMAT csv_format
  TYPE = 'CSV' FIELD_DELIMITER=',' SKIP_HEADER=1;

-- COPY INTO (bulk load)
COPY INTO raw_data.events.clicks
FROM @my_s3_stage/clicks/
FILE_FORMAT = csv_format
PATTERN = '.*\.csv';

-- Snowpipe (continuous ingest)
CREATE PIPE clicks_pipe AUTO_INGEST = TRUE AS
  COPY INTO raw_data.events.clicks
  FROM @my_s3_stage/clicks/
  FILE_FORMAT = csv_format;

Time Travel & Cloning

-- Query data 1 hour ago
SELECT * FROM clicks AT(OFFSET => -3600);

-- Query data at specific timestamp
SELECT * FROM clicks AT(TIMESTAMP => '2025-01-15 10:00:00'::TIMESTAMP);

-- Undrop a table
UNDROP TABLE clicks;

-- Zero-copy clone
CREATE TABLE clicks_backup CLONE clicks;
CREATE DATABASE dev_db CLONE prod_db;

Streams & Tasks (CDC + Scheduling)

-- Stream: tracks changes on a table
CREATE STREAM clicks_stream ON TABLE clicks;

-- Task: scheduled SQL
CREATE TASK process_clicks
  WAREHOUSE = analytics_wh
  SCHEDULE = 'USING CRON 0 * * * * UTC'  -- every hour
  WHEN SYSTEM$STREAM_HAS_DATA('clicks_stream')
AS
  INSERT INTO analytics.clicks_agg
  SELECT event, COUNT(*), DATE_TRUNC('hour', ts)
  FROM clicks_stream
  GROUP BY 1, 3;

ALTER TASK process_clicks RESUME;

Semi-structured Data (VARIANT)

-- Query JSON inside VARIANT column
SELECT
  payload:page::STRING AS page,
  payload:duration::INT AS duration,
  payload:tags[0]::STRING AS first_tag
FROM clicks
WHERE payload:page IS NOT NULL;

-- FLATTEN nested arrays
SELECT c.user_id, f.value::STRING AS tag
FROM clicks c,
  LATERAL FLATTEN(input => c.payload:tags) f;
#

PySpark

Python API for Apache Spark — distributed data processing for big data at scale.

Session & DataFrame Basics

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create session
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.shuffle.partitions", 200) \
    .getOrCreate()

# Read data
df = spark.read.csv("s3://bucket/data.csv", header=True, inferSchema=True)
df = spark.read.parquet("s3://bucket/data.parquet")
df = spark.read.json("s3://bucket/data.json")

# Schema
df.printSchema()
df.dtypes              # list of (col_name, dtype)
df.columns             # column names
df.count()             # row count

Transformations

# Select & filter
df.select("name", "age").filter(F.col("age") > 25)
df.where("age > 25 AND city = 'NYC'")

# Add / rename / drop columns
df.withColumn("age_next", F.col("age") + 1)
df.withColumnRenamed("name", "full_name")
df.drop("temp_col")

# Group by & aggregate
df.groupBy("city").agg(
    F.count("*").alias("total"),
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
)

# Joins
df1.join(df2, df1.id == df2.id, "inner")     # inner, left, right, full, cross, semi, anti

# Window functions
from pyspark.sql.window import Window
w = Window.partitionBy("dept").orderBy(F.desc("salary"))
df.withColumn("rank", F.row_number().over(w))

# Write
df.write.mode("overwrite").parquet("s3://output/")
df.write.partitionBy("year", "month").parquet("s3://output/")

Spark SQL

df.createOrReplaceTempView("users")

result = spark.sql("""
    SELECT city, COUNT(*) as cnt, AVG(salary) as avg_sal
    FROM users
    WHERE age > 25
    GROUP BY city
    ORDER BY cnt DESC
""")
Performance tips: Use .cache() for reused DataFrames, .repartition(n) to balance partitions, .coalesce(1) to write single file, and broadcast small tables with F.broadcast(small_df) in joins.
#

Pandas Essentials

import pandas as pd
import numpy as np

# Read / write
df = pd.read_csv("data.csv")
df = pd.read_parquet("data.parquet")
df = pd.read_json("data.json")
df.to_csv("out.csv", index=False)

# Inspect
df.shape          # (rows, cols)
df.info()         # dtypes, memory, non-null counts
df.describe()     # summary stats
df.head(10)       # first 10 rows
df.dtypes         # column types
df.isnull().sum() # null counts per column

# Select / filter
df["col"]                        # Series
df[["col1", "col2"]]            # DataFrame
df[df["age"] > 25]               # filter rows
df.query("age > 25 and city == 'NYC'")

# Transforms
df["new"] = df["a"] + df["b"]
df.rename(columns={"old": "new"})
df.drop(columns=["temp"])
df.sort_values("col", ascending=False)

# Group by
df.groupby("city").agg({"salary": ["mean", "max"], "age": "count"})
df.groupby("city")["salary"].transform("rank")

# Joins
pd.merge(df1, df2, on="id", how="left")

# Pivot / melt
df.pivot_table(values="sales", index="region", columns="quarter", aggfunc="sum")
pd.melt(df, id_vars=["name"], value_vars=["q1", "q2"])
#

Polars

Blazing-fast DataFrame library in Rust with a Python API. Lazy evaluation, multi-threaded, no GIL limitations.

import polars as pl

# Read
df = pl.read_csv("data.csv")
df = pl.read_parquet("data.parquet")

# Lazy mode (optimized query plan)
lf = pl.scan_parquet("data.parquet")
result = lf.filter(pl.col("age") > 25) \
    .group_by("city") \
    .agg(pl.col("salary").mean().alias("avg_sal")) \
    .sort("avg_sal", descending=True) \
    .collect()   # execute the lazy plan

# Transformations (eager)
df.select(pl.col("name"), pl.col("age") + 1)
df.with_columns((pl.col("a") * pl.col("b")).alias("product"))
df.filter(pl.col("city").is_in(["NYC", "LA"]))

# Window functions
df.with_columns(
    pl.col("salary").rank().over("dept").alias("rank")
)
Polars vs Pandas: Polars is 10–100x faster for large datasets thanks to Rust, lazy execution, and multi-threading. Use .lazy() → chain transforms → .collect() for best performance.
#

Apache Airflow

Workflow orchestration platform for scheduling and monitoring data pipelines as code (DAGs).

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="daily_etl",
    schedule="0 6 * * *",          # daily at 6 AM
    start_date=datetime(2025, 1, 1),
    catchup=False,
    default_args=default_args,
) as dag:

    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id="transform",
        python_callable=transform_data,
    )

    load = SnowflakeOperator(
        task_id="load_to_snowflake",
        sql="COPY INTO analytics.events FROM @stage/...",
        snowflake_conn_id="snowflake_default",
    )

    extract >> transform >> load   # task dependencies
Key concepts: DAG = Directed Acyclic Graph (pipeline definition), Task = unit of work, Operator = task type (Python, Bash, SQL, etc.), XCom = cross-task data passing, Sensor = wait for external condition.
#

dbt (Data Build Tool)

SQL-first transformation tool. Write SELECT statements, dbt handles DDL, dependencies, testing, and documentation.

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
)
SELECT
    id AS order_id,
    customer_id,
    amount::DECIMAL(10,2) AS order_amount,
    created_at::DATE AS order_date
FROM source
WHERE amount > 0
-- models/marts/fct_daily_revenue.sql
{{ config(materialized='incremental', unique_key='order_date') }}

SELECT
    order_date,
    COUNT(*) AS total_orders,
    SUM(order_amount) AS revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
    WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
GROUP BY order_date

CLI Commands

dbt init my_project           # scaffold project
dbt run                        # run all models
dbt run --select stg_orders    # run specific model
dbt test                       # run all tests
dbt build                      # run + test in DAG order
dbt docs generate && dbt docs serve  # auto-generated docs
MaterializationWhat It DoesUse When
viewCREATE VIEWLight transforms, infrequent queries
tableCREATE TABLE ASFrequently queried, stable data
incrementalINSERT new rows onlyLarge tables, append-mostly data
ephemeralCTE (no object created)Intermediate helper logic
#

Apache Kafka

Distributed event streaming platform. Publish & subscribe to streams of records in real-time.

ConceptDescription
TopicNamed category of messages. Producers write to, consumers read from.
PartitionOrdered, immutable log within a topic. Enables parallelism.
ProducerPublishes messages to topics.
ConsumerReads messages from topics. Belongs to a consumer group.
Consumer GroupEach partition consumed by exactly one member — enables parallel consumption.
OffsetPosition of a message within a partition. Consumers track their offset.
BrokerA Kafka server. Cluster = multiple brokers.

Python (confluent-kafka)

from confluent_kafka import Producer, Consumer

# Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('events', key='user_1', value='{"action":"click"}')
p.flush()

# Consumer
c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})
c.subscribe(['events'])
while True:
    msg = c.poll(1.0)
    if msg is not None:
        print(msg.value().decode())
#

ETL / ELT Patterns

ETL (Extract → Transform → Load)

  • Transform before loading into warehouse
  • Traditional approach (on-prem era)
  • Tools: Informatica, Talend, custom scripts
  • Less storage needed in warehouse

ELT (Extract → Load → Transform)

  • Load raw data, transform inside warehouse
  • Modern approach (cloud-native)
  • Tools: dbt, Snowflake, BigQuery
  • Leverage warehouse compute power

Modern Data Stack

Ingestion Storage Transform BI / Analytics ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌──────────────┐ │ Fivetran │──▶│ Snowflake │──▶│ dbt │──▶│ Looker / │ │ Airbyte │ │ BigQuery │ │ Spark │ │ Metabase / │ │ Stitch │ │ Redshift │ │ Airflow │ │ Tableau │ └──────────┘ └──────────────┘ └──────────┘ └──────────────┘
#

Data Modeling

Kimball Dimensional Modeling

ConceptDescriptionExample
Fact TableMeasurable events (metrics, transactions)fct_orders — amount, quantity
Dimension TableDescriptive context for factsdim_customer — name, city, segment
Star SchemaFact in center, dimensions around itSimple, fast queries
Snowflake SchemaNormalized dimensions (dim → sub-dim)Less redundancy, more joins
SCD Type 1Overwrite old value (no history)Customer address update
SCD Type 2New row for changes (full history)valid_from, valid_to, is_current

Medallion Architecture (Lakehouse)

Bronze (Raw) Silver (Cleaned) Gold (Business) ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ Raw ingested │─────▶│ Deduplicated│─────▶│ Aggregated │ │ data as-is │ │ Typed │ │ Business logic │ │ Append-only │ │ Validated │ │ Ready for BI │ └─────────────┘ └─────────────┘ └─────────────────┘
#

Data Warehouse Concepts

TermDescription
OLTPOnline Transaction Processing — row-oriented, fast writes (MySQL, PostgreSQL)
OLAPOnline Analytical Processing — column-oriented, fast reads (Snowflake, BigQuery)
Data LakeRaw storage for structured + unstructured data (S3, ADLS, GCS)
Data LakehouseLake + Warehouse features (Delta Lake, Iceberg, Hudi)
PartitioningSplit data by date/region → skip irrelevant partitions
Columnar StorageStore columns together → great compression + fast analytics
#

File Formats

FormatTypeCompressionBest For
CSVRow-based, textNone (gzip external)Simple exchange, small data
JSON / NDJSONRow-based, textNone (gzip external)APIs, semi-structured data
ParquetColumnar, binarySnappy / Zstd / GzipAnalytics, warehouses, Spark
ORCColumnar, binaryZlib / SnappyHive ecosystem
AvroRow-based, binarySnappy / DeflateKafka, schema evolution
Delta LakeParquet + tx logSame as ParquetLakehouse, ACID on lake
IcebergParquet + metadataSame as ParquetOpen table format, Snowflake/Spark
#

Orchestration & Scheduling

ToolTypeBest For
Apache AirflowDAG-based, PythonComplex pipelines, mature ecosystem
PrefectPython-native, flow-basedModern Python data workflows
DagsterAsset-based, PythonSoftware-defined data assets
dbt CloudSQL transformsdbt-native scheduling
MageNotebook-styleQuick ETL pipelines
CronOS schedulerSimple scripts, one-off jobs
#

Data Quality

dbt Tests

# schema.yml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000

Great Expectations (Python)

import great_expectations as gx

context = gx.get_context()
validator = context.get_validator(datasource="my_ds", data_asset="orders")

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0)
validator.expect_column_values_to_be_unique("order_id")
results = validator.validate()
Back to top