Snowflake, PySpark, Airflow, dbt, Kafka, data pipelines, warehousing & Python data libraries
Data / EngineeringCloud-native data warehouse with separate compute & storage, auto-scaling, and near-zero maintenance.
| Concept | Description |
|---|---|
| Virtual Warehouse | Compute cluster (XS to 6XL). Can suspend/resume. Pay only when running. |
| Database / Schema | Logical containers. DB.SCHEMA.TABLE naming convention. |
| Micro-partition | 50-500 MB compressed columnar files. Snowflake handles partitioning automatically. |
| Clustering Key | Define sort order for micro-partitions — improves pruning on large tables. |
| Time Travel | Query data as it was up to 90 days ago. AT / BEFORE clauses. |
| Zero-Copy Clone | Instant clone of DB/schema/table — no extra storage until data changes. |
| Stages | Internal or external (S3/GCS) locations for loading data. |
| Streams | Change data capture (CDC) — track INSERTs, UPDATEs, DELETEs on a table. |
| Tasks | Scheduled SQL execution. Chain tasks into DAGs. |
| Pipes | Snowpipe — continuous auto-ingest from stages. |
-- Create warehouse
CREATE WAREHOUSE analytics_wh
WITH WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300 -- suspend after 5 min idle
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3; -- multi-cluster auto-scale
-- Create database & schema
CREATE DATABASE raw_data;
CREATE SCHEMA raw_data.events;
-- Create table
CREATE TABLE raw_data.events.clicks (
id INT AUTOINCREMENT,
user_id VARCHAR(50),
event VARCHAR(100),
payload VARIANT, -- semi-structured JSON
ts TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);-- Create stage (S3)
CREATE STAGE my_s3_stage
URL = 's3://my-bucket/data/'
CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...');
-- Create file format
CREATE FILE FORMAT csv_format
TYPE = 'CSV' FIELD_DELIMITER=',' SKIP_HEADER=1;
-- COPY INTO (bulk load)
COPY INTO raw_data.events.clicks
FROM @my_s3_stage/clicks/
FILE_FORMAT = csv_format
PATTERN = '.*\.csv';
-- Snowpipe (continuous ingest)
CREATE PIPE clicks_pipe AUTO_INGEST = TRUE AS
COPY INTO raw_data.events.clicks
FROM @my_s3_stage/clicks/
FILE_FORMAT = csv_format;-- Query data 1 hour ago
SELECT * FROM clicks AT(OFFSET => -3600);
-- Query data at specific timestamp
SELECT * FROM clicks AT(TIMESTAMP => '2025-01-15 10:00:00'::TIMESTAMP);
-- Undrop a table
UNDROP TABLE clicks;
-- Zero-copy clone
CREATE TABLE clicks_backup CLONE clicks;
CREATE DATABASE dev_db CLONE prod_db;-- Stream: tracks changes on a table
CREATE STREAM clicks_stream ON TABLE clicks;
-- Task: scheduled SQL
CREATE TASK process_clicks
WAREHOUSE = analytics_wh
SCHEDULE = 'USING CRON 0 * * * * UTC' -- every hour
WHEN SYSTEM$STREAM_HAS_DATA('clicks_stream')
AS
INSERT INTO analytics.clicks_agg
SELECT event, COUNT(*), DATE_TRUNC('hour', ts)
FROM clicks_stream
GROUP BY 1, 3;
ALTER TASK process_clicks RESUME;-- Query JSON inside VARIANT column
SELECT
payload:page::STRING AS page,
payload:duration::INT AS duration,
payload:tags[0]::STRING AS first_tag
FROM clicks
WHERE payload:page IS NOT NULL;
-- FLATTEN nested arrays
SELECT c.user_id, f.value::STRING AS tag
FROM clicks c,
LATERAL FLATTEN(input => c.payload:tags) f;Python API for Apache Spark — distributed data processing for big data at scale.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Create session
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Read data
df = spark.read.csv("s3://bucket/data.csv", header=True, inferSchema=True)
df = spark.read.parquet("s3://bucket/data.parquet")
df = spark.read.json("s3://bucket/data.json")
# Schema
df.printSchema()
df.dtypes # list of (col_name, dtype)
df.columns # column names
df.count() # row count# Select & filter
df.select("name", "age").filter(F.col("age") > 25)
df.where("age > 25 AND city = 'NYC'")
# Add / rename / drop columns
df.withColumn("age_next", F.col("age") + 1)
df.withColumnRenamed("name", "full_name")
df.drop("temp_col")
# Group by & aggregate
df.groupBy("city").agg(
F.count("*").alias("total"),
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
)
# Joins
df1.join(df2, df1.id == df2.id, "inner") # inner, left, right, full, cross, semi, anti
# Window functions
from pyspark.sql.window import Window
w = Window.partitionBy("dept").orderBy(F.desc("salary"))
df.withColumn("rank", F.row_number().over(w))
# Write
df.write.mode("overwrite").parquet("s3://output/")
df.write.partitionBy("year", "month").parquet("s3://output/")df.createOrReplaceTempView("users")
result = spark.sql("""
SELECT city, COUNT(*) as cnt, AVG(salary) as avg_sal
FROM users
WHERE age > 25
GROUP BY city
ORDER BY cnt DESC
""").cache() for reused DataFrames, .repartition(n) to balance partitions, .coalesce(1) to write single file, and broadcast small tables with F.broadcast(small_df) in joins.
import pandas as pd
import numpy as np
# Read / write
df = pd.read_csv("data.csv")
df = pd.read_parquet("data.parquet")
df = pd.read_json("data.json")
df.to_csv("out.csv", index=False)
# Inspect
df.shape # (rows, cols)
df.info() # dtypes, memory, non-null counts
df.describe() # summary stats
df.head(10) # first 10 rows
df.dtypes # column types
df.isnull().sum() # null counts per column
# Select / filter
df["col"] # Series
df[["col1", "col2"]] # DataFrame
df[df["age"] > 25] # filter rows
df.query("age > 25 and city == 'NYC'")
# Transforms
df["new"] = df["a"] + df["b"]
df.rename(columns={"old": "new"})
df.drop(columns=["temp"])
df.sort_values("col", ascending=False)
# Group by
df.groupby("city").agg({"salary": ["mean", "max"], "age": "count"})
df.groupby("city")["salary"].transform("rank")
# Joins
pd.merge(df1, df2, on="id", how="left")
# Pivot / melt
df.pivot_table(values="sales", index="region", columns="quarter", aggfunc="sum")
pd.melt(df, id_vars=["name"], value_vars=["q1", "q2"])Blazing-fast DataFrame library in Rust with a Python API. Lazy evaluation, multi-threaded, no GIL limitations.
import polars as pl
# Read
df = pl.read_csv("data.csv")
df = pl.read_parquet("data.parquet")
# Lazy mode (optimized query plan)
lf = pl.scan_parquet("data.parquet")
result = lf.filter(pl.col("age") > 25) \
.group_by("city") \
.agg(pl.col("salary").mean().alias("avg_sal")) \
.sort("avg_sal", descending=True) \
.collect() # execute the lazy plan
# Transformations (eager)
df.select(pl.col("name"), pl.col("age") + 1)
df.with_columns((pl.col("a") * pl.col("b")).alias("product"))
df.filter(pl.col("city").is_in(["NYC", "LA"]))
# Window functions
df.with_columns(
pl.col("salary").rank().over("dept").alias("rank")
).lazy() → chain transforms → .collect() for best performance.
Workflow orchestration platform for scheduling and monitoring data pipelines as code (DAGs).
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="daily_etl",
schedule="0 6 * * *", # daily at 6 AM
start_date=datetime(2025, 1, 1),
catchup=False,
default_args=default_args,
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
load = SnowflakeOperator(
task_id="load_to_snowflake",
sql="COPY INTO analytics.events FROM @stage/...",
snowflake_conn_id="snowflake_default",
)
extract >> transform >> load # task dependenciesDAG = Directed Acyclic Graph (pipeline definition), Task = unit of work, Operator = task type (Python, Bash, SQL, etc.), XCom = cross-task data passing, Sensor = wait for external condition.
SQL-first transformation tool. Write SELECT statements, dbt handles DDL, dependencies, testing, and documentation.
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
)
SELECT
id AS order_id,
customer_id,
amount::DECIMAL(10,2) AS order_amount,
created_at::DATE AS order_date
FROM source
WHERE amount > 0-- models/marts/fct_daily_revenue.sql
{{ config(materialized='incremental', unique_key='order_date') }}
SELECT
order_date,
COUNT(*) AS total_orders,
SUM(order_amount) AS revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
GROUP BY order_datedbt init my_project # scaffold project
dbt run # run all models
dbt run --select stg_orders # run specific model
dbt test # run all tests
dbt build # run + test in DAG order
dbt docs generate && dbt docs serve # auto-generated docs| Materialization | What It Does | Use When |
|---|---|---|
view | CREATE VIEW | Light transforms, infrequent queries |
table | CREATE TABLE AS | Frequently queried, stable data |
incremental | INSERT new rows only | Large tables, append-mostly data |
ephemeral | CTE (no object created) | Intermediate helper logic |
Distributed event streaming platform. Publish & subscribe to streams of records in real-time.
| Concept | Description |
|---|---|
| Topic | Named category of messages. Producers write to, consumers read from. |
| Partition | Ordered, immutable log within a topic. Enables parallelism. |
| Producer | Publishes messages to topics. |
| Consumer | Reads messages from topics. Belongs to a consumer group. |
| Consumer Group | Each partition consumed by exactly one member — enables parallel consumption. |
| Offset | Position of a message within a partition. Consumers track their offset. |
| Broker | A Kafka server. Cluster = multiple brokers. |
from confluent_kafka import Producer, Consumer
# Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('events', key='user_1', value='{"action":"click"}')
p.flush()
# Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
c.subscribe(['events'])
while True:
msg = c.poll(1.0)
if msg is not None:
print(msg.value().decode())| Concept | Description | Example |
|---|---|---|
| Fact Table | Measurable events (metrics, transactions) | fct_orders — amount, quantity |
| Dimension Table | Descriptive context for facts | dim_customer — name, city, segment |
| Star Schema | Fact in center, dimensions around it | Simple, fast queries |
| Snowflake Schema | Normalized dimensions (dim → sub-dim) | Less redundancy, more joins |
| SCD Type 1 | Overwrite old value (no history) | Customer address update |
| SCD Type 2 | New row for changes (full history) | valid_from, valid_to, is_current |
| Term | Description |
|---|---|
| OLTP | Online Transaction Processing — row-oriented, fast writes (MySQL, PostgreSQL) |
| OLAP | Online Analytical Processing — column-oriented, fast reads (Snowflake, BigQuery) |
| Data Lake | Raw storage for structured + unstructured data (S3, ADLS, GCS) |
| Data Lakehouse | Lake + Warehouse features (Delta Lake, Iceberg, Hudi) |
| Partitioning | Split data by date/region → skip irrelevant partitions |
| Columnar Storage | Store columns together → great compression + fast analytics |
| Format | Type | Compression | Best For |
|---|---|---|---|
| CSV | Row-based, text | None (gzip external) | Simple exchange, small data |
| JSON / NDJSON | Row-based, text | None (gzip external) | APIs, semi-structured data |
| Parquet | Columnar, binary | Snappy / Zstd / Gzip | Analytics, warehouses, Spark |
| ORC | Columnar, binary | Zlib / Snappy | Hive ecosystem |
| Avro | Row-based, binary | Snappy / Deflate | Kafka, schema evolution |
| Delta Lake | Parquet + tx log | Same as Parquet | Lakehouse, ACID on lake |
| Iceberg | Parquet + metadata | Same as Parquet | Open table format, Snowflake/Spark |
| Tool | Type | Best For |
|---|---|---|
| Apache Airflow | DAG-based, Python | Complex pipelines, mature ecosystem |
| Prefect | Python-native, flow-based | Modern Python data workflows |
| Dagster | Asset-based, Python | Software-defined data assets |
| dbt Cloud | SQL transforms | dbt-native scheduling |
| Mage | Notebook-style | Quick ETL pipelines |
| Cron | OS scheduler | Simple scripts, one-off jobs |
# schema.yml
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000import great_expectations as gx
context = gx.get_context()
validator = context.get_validator(datasource="my_ds", data_asset="orders")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0)
validator.expect_column_values_to_be_unique("order_id")
results = validator.validate()