在数据量爆炸式增长的今天,如何高效处理海量数据查询成为每个后端工程师必须面对的挑战。最近我在准备架构师面试时,深入研究了10亿手机号按尾号毫秒级查询的解决方案,不仅完成了理论设计,还进行了完整的实战测试。本文将分享从架构设计到代码实现的完整过程。
假设我们有一个包含10亿手机号码的用户表,需要支持按手机尾号(后4位)进行快速查询。很多工程师的第一反应是使用LIKE查询或普通索引:
sql-- 错误做法:全表扫描
SELECT * FROM users WHERE phone LIKE '%1234';
-- 效果有限:无法利用最左前缀原则
CREATE INDEX idx_phone ON users(phone);
但实际测试发现,在百万级数据量下,LIKE查询已经需要秒级响应,10亿数据根本不可行。B+树索引的"最左前缀原则"让后缀查询无法有效利用索引。
异构索引的核心思路是:为不同的查询维度建立独立的索引表,通过数据冗余换取查询性能。具体到手机尾号查询场景:
(尾号, 用户ID)映射关系
主表结构(按user_id分片):
sqlCREATE TABLE users_main (
user_id BIGINT AUTO_INCREMENT PRIMARY KEY,
phone_number VARCHAR(20) NOT NULL,
username VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
异构索引表(按尾号取模分表):
sql-- 分表示例
CREATE TABLE phone_index_0 (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
phone_suffix CHAR(4) NOT NULL,
user_id BIGINT NOT NULL,
phone_number VARCHAR(20) NOT NULL,
INDEX idx_suffix (phone_suffix)
);
pythonimport mysql.connector
import random
import time
from concurrent.futures import ThreadPoolExecutor
def create_tables_and_data():
"""创建表并生成测试数据"""
conn = mysql.connector.connect(
host='localhost', user='root',
password='your_password', database='phone_system'
)
cursor = conn.cursor()
# 创建主表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users_main (
user_id BIGINT AUTO_INCREMENT PRIMARY KEY,
phone_number VARCHAR(20) NOT NULL,
username VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建10个索引分表
for i in range(10):
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS phone_index_{i} (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
phone_suffix CHAR(4) NOT NULL,
user_id BIGINT NOT NULL,
phone_number VARCHAR(20) NOT NULL,
INDEX idx_suffix (phone_suffix)
)
""")
print("表创建成功")
# 生成100万条测试数据
print("开始生成 1000000 条测试数据...")
start_time = time.time()
for batch in range(100): # 100批,每批10000条
main_values = []
index_values = {i: [] for i in range(10)}
for i in range(10000):
phone = generate_phone_number()
suffix = phone[-4:]
table_index = int(suffix) % 10
main_values.append((phone, f"user_{batch * 10000 + i}"))
index_values[table_index].append((suffix, batch * 10000 + i + 1, phone))
# 批量插入
cursor.executemany("INSERT INTO users_main (phone_number, username) VALUES (%s, %s)", main_values)
for table_idx, values in index_values.items():
if values:
sql = f"INSERT INTO phone_index_{table_idx} (phone_suffix, user_id, phone_number) VALUES (%s, %s, %s)"
cursor.executemany(sql, values)
conn.commit()
if (batch + 1) % 10 == 0:
elapsed = time.time() - start_time
print(f"进度: {(batch + 1) * 10000}/1000000 ({(batch + 1) * 10}%), 耗时: {elapsed:.2f}秒")
total_time = time.time() - start_time
print(f"数据生成完成! 总耗时: {total_time:.2f}秒")
# 显示统计信息
cursor.execute("SELECT COUNT(*) FROM users_main")
print(f"主表总记录数: {cursor.fetchone()[0]}")
for i in range(10):
cursor.execute(f"SELECT COUNT(*) FROM phone_index_{i}")
print(f"索引表 {i} 记录数: {cursor.fetchone()[0]}")
conn.close()
def generate_phone_number():
"""生成随机手机号"""
prefixes = ['130', '131', '132', '133', '134', '135', '136', '137', '138', '139',
'150', '151', '152', '153', '155', '156', '157', '158', '159',
'180', '181', '182', '183', '184', '185', '186', '187', '188', '189']
prefix = random.choice(prefixes)
return f"{prefix}{random.randint(1000,9999)}{random.randint(1000,9999)}"
运行数据生成脚本的实际结果:
开始生成 1000000 条测试数据... 进度: 100000/1000000 (10.0%), 耗时: 32.92秒 进度: 200000/1000000 (20.0%), 耗时: 47.69秒 ... 进度: 1000000/1000000 (100.0%), 耗时: 392.31秒 数据生成完成! 总耗时: 392.31秒 主表总记录数: 1000000 索引表 0 记录数: 99823 索引表 1 记录数: 100117 索引表 2 记录数: 99999 索引表 3 记录数: 100378 索引表 4 记录数: 100168 索引表 5 记录数: 99684 索引表 6 记录数: 99820 索引表 7 记录数: 100412 索引表 8 记录数: 99590 索引表 9 记录数: 100009
关键发现:数据分布非常均匀,验证了手机尾号的自然均匀分布特性。
pythonimport mysql.connector
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def query_performance_test():
"""查询性能测试"""
conn = mysql.connector.connect(
host='localhost', user='root',
password='your_password', database='phone_system'
)
cursor = conn.cursor(dictionary=True)
# 单次查询测试
print("=== 单个查询测试 ===")
start_time = time.time()
cursor.execute("""
SELECT ui.phone_suffix, ui.phone_number, um.username
FROM phone_index_4 ui
JOIN users_main um ON ui.user_id = um.user_id
WHERE ui.phone_suffix = '1234'
ORDER BY ui.id DESC
LIMIT 20
""")
results = cursor.fetchall()
query_time = (time.time() - start_time) * 1000
print(f"查询成功: 找到{len(results)}条记录, 耗时{query_time:.2f}毫秒")
for i, row in enumerate(results[:3]):
print(f" {i+1}. {row['phone_number']} - {row['username']}")
# 并发查询测试
print("\n开始并发查询测试:")
concurrent_users = 10
queries_per_user = 5
def worker(user_id):
results = []
for _ in range(queries_per_user):
suffix = f"{random.randint(0,9999):04d}"
table_index = int(suffix) % 10
start_time = time.time()
cursor.execute(f"""
SELECT COUNT(*) as cnt
FROM phone_index_{table_index}
WHERE phone_suffix = %s
""", (suffix,))
count = cursor.fetchone()['cnt']
query_time = (time.time() - start_time) * 1000
results.append({
'suffix': suffix,
'table_index': table_index,
'query_time': query_time,
'count': count
})
return results
start_time = time.time()
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(worker, i) for i in range(concurrent_users)]
all_results = []
for future in as_completed(futures):
all_results.extend(future.result())
total_time = time.time() - start_time
query_times = [r['query_time'] for r in all_results]
print(f"总查询数: {len(all_results)}")
print(f"总耗时: {total_time:.2f}秒")
print(f"平均QPS: {len(all_results)/total_time:.2f}")
print(f"平均查询时间: {sum(query_times)/len(query_times):.2f}毫秒")
print(f"最快查询: {min(query_times):.2f}毫秒")
print(f"最慢查询: {max(query_times):.2f}毫秒")
conn.close()
# 性能对比测试
def compare_methods():
"""对比异构索引和LIKE查询性能"""
print("开始性能对比测试...")
# 测试异构索引
print("\n异构索引方案:")
test_heterogeneous()
# 测试LIKE查询
print("\nLIKE查询方案:")
test_like_query()
def test_heterogeneous():
"""测试异构索引性能"""
conn = mysql.connector.connect(...)
cursor = conn.cursor(dictionary=True)
start_time = time.time()
query_count = 15
for _ in range(query_count):
suffix = f"{random.randint(0,9999):04d}"
table_index = int(suffix) % 10
cursor.execute(f"""
SELECT 1 FROM phone_index_{table_index}
WHERE phone_suffix = %s LIMIT 1
""", (suffix,))
total_time = time.time() - start_time
print(f"平均查询时间: {total_time*1000/query_count:.2f}毫秒")
print(f"QPS: {query_count/total_time:.2f}")
conn.close()
首次测试结果:
=== 单个查询测试 === 查询成功: 找到20条记录, 耗时61.40毫秒 前3条结果: 1. 13434421234 - user_982255 2. 13820691234 - user_951705 3. 18397591234 - user_951226 开始并发查询测试: 总查询数: 50 成功查询: 50 失败查询: 0 总耗时: 0.52秒 平均QPS: 96.32 平均查询时间: 86.47毫秒 最快查询: 1.00毫秒 最慢查询: 444.52毫秒
第二次测试(缓存预热后):
=== 单个查询测试 === 查询成功: 找到20条记录, 耗时1.03毫秒 开始并发查询测试: 总查询数: 50 成功查询: 50 总耗时: 0.83秒 平均QPS: 60.26 平均查询时间: 137.94毫秒 最快查询: 1.00毫秒
性能对比测试结果:
异构索引方案: 平均查询时间: 13.40毫秒 QPS: 189.12 LIKE查询方案: 平均查询时间: 224.54毫秒 QPS: 20.47

传统分库分表按user_id分片,按尾号查询需要扫描所有分片。异构索引通过独立的尾号分片,实现精准路由,查询只会命中单个分表。

当数据量继续增长时,只需增加异构索引表的分片数量即可。路由算法尾号 % 分片数保证数据均匀分布。
基于测试数据,异构索引方案相比LIKE查询:
pythondef insert_user_transactional(phone, username):
"""使用事务保证数据一致性"""
conn = mysql.connector.connect(...)
try:
conn.start_transaction()
# 插入主表
cursor.execute("INSERT INTO users_main (phone_number, username) VALUES (%s, %s)",
(phone, username))
user_id = cursor.lastrowid
# 插入索引表
suffix = phone[-4:]
table_index = int(suffix) % 10
cursor.execute(f"""
INSERT INTO phone_index_{table_index}
(phone_suffix, user_id, phone_number) VALUES (%s, %s, %s)
""", (suffix, user_id, phone))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
问题1:首次查询性能波动
问题2:偶发性慢查询
问题3:数据分布验证
通过这次从理论到实战的完整探索,我深刻体会到异构索引架构在海量数据查询中的价值。关键收获包括:
生产环境建议:
这个方案不仅适用于手机尾号查询,还可以推广到各种需要按特定维度快速查询海量数据的场景。架构的本质是权衡艺术,异构索引用适度的复杂度换来了巨大的性能提升。
感谢:https://mp.weixin.qq.com/s/QraLp40GFEnSVwJ1lFEIRg
(完整代码和测试数据已在实际环境中验证,可直接用于生产环境参考)
本文作者:sea-whales
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!