Choosing the Right Partition Key
The partition key is the most critical decision in sharding architecture. It determines how data is distributed, affects query performance, influences operational complexity, and constrains future growth. A poorly chosen partition key can cause severe problems: uneven data distribution, operational bottlenecks, and expensive resharding operations. Choosing well creates a system that scales smoothly for years.
What Makes a Good Partition Key
A partition key must balance multiple sometimes-conflicting requirements:
1. Distribution
The key must distribute data evenly across all shards. If 80% of records map to one shard, that shard becomes the bottleneck regardless of how powerful it is.
Bad distribution:
Shard 1: 8M records (region = 'US')
Shard 2: 1M records (region = 'EU')
Shard 3: 1M records (region = 'ASIA')
Result: Shard 1 is 80% of the load
Good distribution:
Shard 1: 3M-3.5M records (user_id mod 3)
Shard 2: 3M-3.5M records (user_id mod 3)
Shard 3: 3M-3.5M records (user_id mod 3)
Result: Balanced load across all shards
Uniform distribution means:
- All shards handle roughly equal load
- Adding new shards balances evenly
- No shard becomes a bottleneck
2. Queryability
The key should enable efficient queries. Queries that don’t filter by the partition key must scan all shards, multiplying latency and cost.
Queryable partition key (user_id):
SELECT * FROM orders WHERE user_id = 123
→ Routes to single shard, fast query
Non-queryable filtering:
SELECT * FROM orders WHERE user_id = 123 AND date > '2024-01-01'
→ User_id is queryable; date is not
→ Routes to one shard but still must filter by date
Non-queryable partition key:
SELECT * FROM orders WHERE date > '2024-01-01'
→ Must scan all shards for date filtering
→ Massive latency and resource cost
The partition key doesn’t need to be the only filter, but it should be part of most queries.
3. Stability
The partition key should rarely change for existing records. If the key changes, records must be moved to different shards (resharding).
Stable partition key (user_id):
→ Once assigned, doesn't change
→ Record stays on same shard forever
→ No resharding required
Unstable partition key (country):
→ Users move countries, travel, relocate
→ Record must move to different shard
→ Expensive resharding operation
Immutable keys (unique IDs) work better than mutable attributes (geographic location, subscription tier).
4. Cardinality
Cardinality measures how many distinct values the partition key can have. High cardinality keys distribute evenly; low cardinality keys group data unevenly.
High cardinality (user_id):
→ Billions of distinct values
→ Can distribute to thousands of shards
→ Each shard gets narrow range
Low cardinality (country):
→ ~200 distinct values
→ Can only use ~200 shards effectively
→ More shards than countries wastes resources
Cardinality constraint: Effectively, you can have no more shards than distinct partition key values. If you have 200 countries and 1000 shards, 800 shards will be empty.
5. Scalability
A good partition key supports growth. As data volume increases, you should be able to add shards without major restructuring.
Modulo-based partitioning (user_id % num_shards):
→ Requires resharding when num_shards changes
→ Adding new shard: record redistribution
Range-based partitioning (user_id ranges):
→ Adding new shard: split one range
→ Cleaner growth model
Cardinality Analysis
Cardinality is the most underestimated constraint. Choose too low-cardinality a key and you’ll hit ceilings.
Calculating Effective Shards
max_effective_shards = cardinality / target_records_per_shard
Example: user_id partitioning
├── 1 billion users
├── Target 10M records per shard for performance
└── max_effective_shards = 1B / 10M = 100,000 shards
Example: country partitioning
├── ~200 countries
├── Target 10M records per shard
└── max_effective_shards = 200 / 1 = 200 shards
└── Can only scale to 200 shards effectively
Low-cardinality keys lock you into a scaling ceiling:
Partition by country:
├── 200 distinct values
├── Max 200 shards
├── Each shard might handle millions of records
├── Each shard becomes overloaded as data grows
└── Can't add more shards to solve overload
Partition by user_id:
├── Billions of distinct values
├── Can scale to thousands of shards
├── Each shard stays reasonably sized
└── Add shards as needed
Access Pattern Analysis
The best partition key aligns with your most common queries.
Identify Query Patterns
E-commerce platform queries:
├── Search products (by category, brand, price)
│ → Doesn't involve partition key well
│ → Requires scanning many shards
├── Browse user's order history (by user_id)
│ → Would benefit from user_id partitioning
│ → Routes to single shard
└── Track inventory (by product_id)
→ Would benefit from product_id partitioning
→ Routes to single shard
Decision: What's queried most?
If "view my orders" is 70% of queries → partition by user_id
If "check product inventory" is 70% of queries → partition by product_id
Hot vs. Cold Access
Some partition key values are accessed much more than others. This creates hotspots:
Partition by user_id in social network:
├── Celebrity user (10M followers)
│ └── Shard receives 10x normal traffic
│ └── Becomes bottleneck
├── Regular users (100 followers each)
│ └── Shard receives normal traffic
└── Inactive users (no access)
└── Shard barely used
Result: Uneven load despite even distribution
Solutions:
- Separate hot data: Give celebrity profiles dedicated shard(s)
- Replicate hot keys: Read replicas for hot users
- Accept hotspots: Design system expecting them
Composite Partition Keys
When no single column is perfect, combine multiple columns:
Two-Level Partitioning
Partition by (tenant_id, user_id):
├── First level: tenant_id
│ └── Isolates each customer's data
├── Second level: user_id
│ └── Distributes within tenant
└── Benefits:
├── Isolates multi-tenant data
├── Distributes within each tenant
└── Supports both queries efficiently
Example: SaaS platform with multiple customers
def get_shard(tenant_id, user_id, num_shards):
# Two-level hashing
tenant_hash = hash(tenant_id) % num_tenant_shards
user_hash = hash(user_id) % users_per_tenant_shard
return (tenant_hash, user_hash)
# Query: All users in tenant 123
SELECT * FROM users WHERE tenant_id = 123
→ Query tenant_hash shard, scan users within
→ Efficient: confined to one tenant shard
# Query: User 456 in tenant 123
SELECT * FROM users WHERE tenant_id = 123 AND user_id = 456
→ Query specific (tenant, user) shard
→ Very efficient: single shard
Composite Range Partitioning
Partition by (date, user_id):
├── Primary: date (yyyy-MM-dd)
│ └── Events from January in one set of shards
│ └── Events from February in another set
├── Secondary: user_id
│ └── Within January, user 1-1M in shard A
│ └── Within January, user 1M-2M in shard B
└── Benefits:
├── Query "events in January" doesn't scan all time
├── Query "user's events" doesn't scan all users
└── Query "user's events in January" is very efficient
Examples from Real Systems
1. User_ID Partitioning (User-Centric Apps)
Systems: Facebook, Twitter, Discord, Instagram Why: Social networks are primarily user-centric
# Social network: timeline query
def get_timeline(user_id):
shard = hash(user_id) % num_shards
tweets = db_shards[shard].query(
"SELECT * FROM tweets WHERE user_id IN (SELECT following_id FROM follows WHERE user_id = %s) ORDER BY created_at DESC",
(user_id,)
)
return tweets
Advantages:
- User’s own content is on their shard
- Friends’ content requires small number of shard queries
- High cardinality (billions of users)
- Stable (user_id doesn’t change)
Disadvantages:
- “Global trending topics” requires scanning all shards
- Hotspots for celebrity users
2. Timestamp Partitioning (Time-Series Data)
Systems: Prometheus, InfluxDB, DataDog, CloudWatch Why: Queries are time-bound; older data is archived
# Time-series database: metrics query
def get_metrics(metric_name, start_time, end_time):
# Only query shards covering the time range
relevant_shards = get_shards_for_range(start_time, end_time)
results = []
for shard in relevant_shards:
results.extend(
shard.query(
"SELECT * FROM metrics WHERE name = %s AND timestamp BETWEEN %s AND %s",
(metric_name, start_time, end_time)
)
)
return results
Advantages:
- Queries naturally specify time range
- Old data can be archived/deleted per shard
- New data always goes to “current” shard
- Good cardinality (thousands of time periods)
Disadvantages:
- Queries without time filter require scanning all shards
- Uneven load (recent shard gets all writes)
3. Geographic Partitioning (Location-Aware Apps)
Systems: Uber, DoorDash, Google Maps, Airbnb Why: Geographic queries are common; enables latency optimization
# Food delivery: nearby restaurants
def get_nearby_restaurants(lat, lng, radius_km):
region = get_region(lat, lng)
shard = region_to_shard[region]
return shard.query(
"SELECT * FROM restaurants WHERE distance(location, %s) <= %s",
((lat, lng), radius_km)
)
Advantages:
- Geographic queries are routed to relevant region
- Can replicate to regional datacenters
- Enables data residency requirements
- Good cardinality (thousands of regions)
Disadvantages:
- Uneven load (cities more active than rural areas)
- User movement causes resharding
- Cross-region queries are expensive
4. Product_ID Partitioning (Catalog-Centric Apps)
Systems: Amazon, eBay, Alibaba Why: Inventory and product data is central
# E-commerce: product inventory
def get_inventory(product_id):
shard = hash(product_id) % num_shards
inventory = db_shards[shard].query(
"SELECT * FROM inventory WHERE product_id = %s",
(product_id,)
)
return inventory
Advantages:
- Product data stays together across all metadata
- Inventory queries are single-shard
- Millions of products (good cardinality)
Disadvantages:
- “User’s orders” requires scanning all shards
- Usually combined with user_id partitioning (second index)
5. Tenant_ID Partitioning (Multi-Tenant SaaS)
Systems: Salesforce, Stripe, Auth0, Intercom Why: Data isolation and per-tenant scaling
# SaaS: query customer's data
def get_customer_data(tenant_id, resource_type):
shard = hash(tenant_id) % num_shards
return db_shards[shard].query(
"SELECT * FROM %s WHERE tenant_id = %s" % (resource_type, tenant_id)
)
Advantages:
- Complete isolation between tenants
- Per-tenant data export/deletion
- Uneven cardinality is acceptable (some tenants big, some small)
- Easier compliance and GDPR requirements
Disadvantages:
- Many tenants (thousands) lead to many shards
- Harder to add new tenants if shards are full
Common Mistakes in Partition Key Selection
Mistake 1: Choosing Low-Cardinality Keys
# ❌ Wrong: Partition by is_active (only 2 values)
shard = 0 if is_active else 1
→ Two shards, one inactive becomes huge
# ✅ Right: Partition by user_id (billions of values)
shard = hash(user_id) % num_shards
→ Distributes evenly across any number of shards
Mistake 2: Partitioning on Mutable Attributes
# ❌ Wrong: Partition by subscription_tier (changes when user upgrades)
old_shard = hash(user_id, "free") % num_shards
new_shard = hash(user_id, "premium") % num_shards
→ Must move record between shards (expensive)
# ✅ Right: Partition by user_id (immutable)
shard = hash(user_id) % num_shards
→ Record stays on same shard forever
Mistake 3: Ignoring Query Patterns
# ❌ Wrong: Partition by random_token (supports no common query)
SELECT * FROM logs WHERE request_id = X
→ Must scan all shards
# ✅ Right: Partition by user_id (supports "get my logs" query)
SELECT * FROM logs WHERE user_id = 123
→ Routes to single shard
Mistake 4: Not Planning for Growth
# ❌ Wrong: Partition by timestamp_minute (too many values)
shard = timestamp_minute % num_shards
→ Each minute creates new partition key value
→ Hard to combine/aggregate time ranges
→ Scaling requires complex reorganization
# ✅ Right: Partition by timestamp_day (reasonable granularity)
shard = timestamp_day % num_shards
→ Each day creates new partition key value
→ Can aggregate/archive by day
→ Scaling more manageable
Mistake 5: Creating Hotspots with Popular Keys
# ❌ Problem: Celebrity user dominates one shard
# ❌ Partitioning doesn't help here
# ✅ Solution 1: Detect hotspots, split at application level
if user_id == "celebrity_123":
read_from multiple_replicas()
else:
read_from single_shard()
# ✅ Solution 2: Use directory-based sharding
user_to_shard = load_lookup_table()
shard = user_to_shard.get(user_id)
→ Can map celebrity_123 to multiple shards
Advanced: Directory-Based Sharding
When partition key constraints become too limiting, use a directory (lookup table):
# Instead of: shard = hash(user_id) % num_shards
# Use: directory lookup
# Directory table (replicated, cached)
shard_directory = {
"user_123": 0,
"user_456": 2,
"user_789": 1,
"celebrity_user": [0, 1, 2], # split across multiple
}
def get_shard(user_id):
shards = shard_directory.get(user_id, [])
if isinstance(shards, list):
return shards # Hot user, multiple shards
else:
return [shards] # Normal user, one shard
Advantages:
- Can rebalance without resharding
- Can split hotspots dynamically
- Explicit control over distribution
Disadvantages:
- Directory becomes critical bottleneck
- Must be cached and replicated
- Adds operational complexity
Key Takeaways
- Partition key is the most critical sharding decision — it determines distribution, queryability, and scalability
- High cardinality is essential — aim for billions of distinct values to support thousands of shards
- Stability matters — immutable keys avoid expensive resharding
- Align with query patterns — partition by the column used in your most common queries
- Plan for access inequality — some partition key values will be hotter than others
- Composite keys add flexibility — combine multiple columns for better query support
- Directory-based sharding solves constraints — when hash-based partitioning doesn’t fit your needs
- Common mistakes are predictable — low cardinality, mutable attributes, and query misalignment cause most problems
Connection to Next Section
Choosing the right partition key is half the battle. The other half is managing the operational challenges that partitioning creates: how do you handle queries that span multiple shards? How do you rebalance data as it grows? How do you maintain consistency across partitions? These challenges form the foundation of distributed query execution, which we explore next.