示例和教程¶
本页提供使用 libCacheSim Python 绑定的实际示例和深入教程。
基础示例¶
简单缓存模拟¶
最基本的缓存模拟示例:
import libcachesim as lcs
# 创建一个1MB大小的LRU缓存
cache = lcs.LRU(cache_size=1024*1024)
# 模拟一些请求
requests = [
(1, 100), # 对象1,大小100字节
(2, 200), # 对象2,大小200字节
(1, 100), # 对象1,再次访问(命中)
(3, 150), # 对象3,大小150字节
]
for obj_id, size in requests:
hit = cache.get(obj_id, size)
print(f"对象 {obj_id}: {'命中' if hit else '缺失'}")
# 获取统计信息
print(f"命中率: {cache.get_hit_ratio():.2%}")
跟踪文件处理¶
从CSV文件读取和处理跟踪:
import libcachesim as lcs
# 配置跟踪读取器
reader_params = lcs.ReaderInitParam()
reader_params.has_header = True
reader_params.delimiter = ","
reader_params.time_field = 1
reader_params.obj_id_field = 2
reader_params.obj_size_field = 3
# 创建跟踪读取器
reader = lcs.TraceReader("workload.csv", lcs.TraceType.CSV_TRACE, reader_params)
# 创建缓存
cache = lcs.LRU(cache_size=1024*1024)
# 处理跟踪
request_count = 0
for request in reader:
hit = cache.get(request.obj_id, request.obj_size)
request_count += 1
if request_count % 10000 == 0:
print(f"处理了 {request_count} 个请求,命中率: {cache.get_hit_ratio():.2%}")
print(f"最终命中率: {cache.get_hit_ratio():.2%}")
合成工作负载生成¶
Zipf分布请求¶
生成具有Zipf分布的合成工作负载:
import libcachesim as lcs
# 创建Zipf分布的合成读取器
reader = lcs.SyntheticReader(
num_objects=10000,
num_requests=100000,
distribution="zipf",
alpha=1.0, # Zipf偏斜参数
obj_size=4096,
seed=42 # 为了可重现性
)
# 创建缓存
cache = lcs.LRU(cache_size=10*1024*1024) # 10MB
# 运行模拟
for request in reader:
cache.get(request.obj_id, request.obj_size)
print(f"Zipf工作负载 (α=1.0) 命中率: {cache.get_hit_ratio():.2%}")
# 尝试不同的偏斜参数
for alpha in [0.5, 1.0, 1.5, 2.0]:
reader = lcs.SyntheticReader(
num_objects=10000,
num_requests=50000,
distribution="zipf",
alpha=alpha,
obj_size=4096,
seed=42
)
cache = lcs.LRU(cache_size=5*1024*1024)
for request in reader:
cache.get(request.obj_id, request.obj_size)
print(f"α={alpha}: 命中率 {cache.get_hit_ratio():.2%}")
均匀分布请求¶
import libcachesim as lcs
# 创建均匀分布的合成读取器
reader = lcs.SyntheticReader(
num_objects=5000,
num_requests=50000,
distribution="uniform",
obj_size=4096,
seed=42
)
cache = lcs.LRU(cache_size=5*1024*1024)
for request in reader:
cache.get(request.obj_id, request.obj_size)
print(f"均匀工作负载命中率: {cache.get_hit_ratio():.2%}")
缓存算法比较¶
多算法评估¶
比较不同缓存算法的性能:
import libcachesim as lcs
# 创建合成工作负载
reader = lcs.SyntheticReader(
num_objects=10000,
num_requests=100000,
distribution="zipf",
alpha=1.2,
obj_size=4096,
seed=42
)
# 保存请求以便重用
requests = list(reader)
# 测试的算法
algorithms = {
'LRU': lcs.LRU,
'LFU': lcs.LFU,
'FIFO': lcs.FIFO,
'ARC': lcs.ARC,
'S3FIFO': lcs.S3FIFO,
'Sieve': lcs.Sieve,
}
cache_size = 10*1024*1024 # 10MB
results = {}
for name, algorithm in algorithms.items():
cache = algorithm(cache_size)
for request in requests:
cache.get(request.obj_id, request.obj_size)
results[name] = cache.get_hit_ratio()
print(f"{name:8}: {cache.get_hit_ratio():.2%}")
# 找到最佳算法
best_algo = max(results, key=results.get)
print(f"\n最佳算法: {best_algo} ({results[best_algo]:.2%})")
跟踪采样¶
空间采样¶
使用采样减少大型跟踪的大小:
import libcachesim as lcs
# 设置采样参数
sampler = lcs.Sampler(
sample_ratio=0.1, # 采样10%的请求
type=lcs.SamplerType.SPATIAL_SAMPLER
)
reader_params = lcs.ReaderInitParam()
reader_params.has_header = True
reader_params.sampler = sampler
# 读取采样跟踪
reader = lcs.TraceReader("large_trace.csv", lcs.TraceType.CSV_TRACE, reader_params)
cache = lcs.LRU(cache_size=1024*1024)
request_count = 0
for request in reader:
cache.get(request.obj_id, request.obj_size)
request_count += 1
print(f"处理了 {request_count} 个采样请求")
print(f"采样命中率: {cache.get_hit_ratio():.2%}")
时间采样¶
import libcachesim as lcs
# 时间采样配置
sampler = lcs.Sampler(
sample_ratio=0.05, # 采样5%
type=lcs.SamplerType.TEMPORAL_SAMPLER
)
reader_params = lcs.ReaderInitParam()
reader_params.sampler = sampler
reader = lcs.TraceReader("timestamped_trace.csv", lcs.TraceType.CSV_TRACE, reader_params)
# 运行模拟...
跟踪分析¶
基本跟踪统计¶
分析跟踪特征:
import libcachesim as lcs
# 创建跟踪分析器
analyzer = lcs.TraceAnalyzer("workload.csv", lcs.TraceType.CSV_TRACE)
# 分析基本统计
print("跟踪分析:")
print(f"总请求数: {analyzer.get_num_requests():,}")
print(f"唯一对象数: {analyzer.get_num_objects():,}")
print(f"平均对象大小: {analyzer.get_average_obj_size():.2f} 字节")
print(f"总数据大小: {analyzer.get_total_size():,} 字节")
# 分析重用距离
reuse_distances = analyzer.get_reuse_distance()
print(f"平均重用距离: {sum(reuse_distances)/len(reuse_distances):.2f}")
流行度分析¶
import libcachesim as lcs
import matplotlib.pyplot as plt
# 创建分析器
analyzer = lcs.TraceAnalyzer("workload.csv", lcs.TraceType.CSV_TRACE)
# 获取对象流行度
popularity = analyzer.get_popularity()
# 绘制流行度分布
plt.figure(figsize=(10, 6))
plt.loglog(range(1, len(popularity)+1), sorted(popularity, reverse=True))
plt.xlabel('对象排名')
plt.ylabel('访问频率')
plt.title('对象流行度分布')
plt.grid(True)
plt.show()
高级场景¶
缓存层次结构¶
模拟多级缓存层次结构:
import libcachesim as lcs
class CacheHierarchy:
def __init__(self, l1_size, l2_size):
self.l1_cache = lcs.LRU(l1_size) # L1缓存
self.l2_cache = lcs.LRU(l2_size) # L2缓存
self.l1_hits = 0
self.l2_hits = 0
self.misses = 0
def get(self, obj_id, obj_size):
# 首先检查L1
if self.l1_cache.get(obj_id, obj_size):
self.l1_hits += 1
return True
# 然后检查L2
if self.l2_cache.get(obj_id, obj_size):
self.l2_hits += 1
# 将对象提升到L1
self.l1_cache.get(obj_id, obj_size)
return True
# 完全缺失
self.misses += 1
# 将对象加载到两个级别
self.l1_cache.get(obj_id, obj_size)
self.l2_cache.get(obj_id, obj_size)
return False
def get_stats(self):
total = self.l1_hits + self.l2_hits + self.misses
return {
'l1_hit_ratio': self.l1_hits / total,
'l2_hit_ratio': self.l2_hits / total,
'overall_hit_ratio': (self.l1_hits + self.l2_hits) / total
}
# 使用缓存层次结构
hierarchy = CacheHierarchy(l1_size=1024*1024, l2_size=10*1024*1024)
reader = lcs.SyntheticReader(
num_objects=50000,
num_requests=100000,
distribution="zipf",
alpha=1.0,
obj_size=4096,
seed=42
)
for request in reader:
hierarchy.get(request.obj_id, request.obj_size)
stats = hierarchy.get_stats()
print(f"L1命中率: {stats['l1_hit_ratio']:.2%}")
print(f"L2命中率: {stats['l2_hit_ratio']:.2%}")
print(f"总命中率: {stats['overall_hit_ratio']:.2%}")
缓存预热¶
在评估前预热缓存:
import libcachesim as lcs
reader = lcs.SyntheticReader(
num_objects=10000,
num_requests=200000,
distribution="zipf",
alpha=1.0,
obj_size=4096,
seed=42
)
cache = lcs.LRU(cache_size=5*1024*1024)
# 分为预热和评估阶段
warmup_requests = 50000
eval_requests = 0
for i, request in enumerate(reader):
hit = cache.get(request.obj_id, request.obj_size)
if i < warmup_requests:
# 预热阶段 - 不计算统计
continue
else:
# 评估阶段
eval_requests += 1
print(f"预热后命中率: {cache.get_hit_ratio():.2%}")
print(f"评估请求数: {eval_requests}")
动态缓存大小¶
随时间变化缓存大小:
import libcachesim as lcs
reader = lcs.SyntheticReader(
num_objects=10000,
num_requests=100000,
distribution="zipf",
alpha=1.0,
obj_size=4096,
seed=42
)
# 从小缓存开始
initial_size = 1024*1024 # 1MB
max_size = 10*1024*1024 # 10MB
growth_interval = 10000 # 每10000个请求增长
cache = lcs.LRU(initial_size)
current_size = initial_size
for i, request in enumerate(reader):
# 定期增加缓存大小
if i > 0 and i % growth_interval == 0 and current_size < max_size:
current_size = min(current_size * 2, max_size)
# 注意:这里需要创建新缓存,因为现有缓存大小无法动态更改
new_cache = lcs.LRU(current_size)
cache = new_cache
print(f"在请求 {i} 处将缓存大小增加到 {current_size/1024/1024:.1f}MB")
cache.get(request.obj_id, request.obj_size)
print(f"最终命中率: {cache.get_hit_ratio():.2%}")
性能优化技巧¶
批量处理¶
import libcachesim as lcs
# 处理大型跟踪时批量处理请求
def process_trace_in_batches(filename, cache, batch_size=10000):
reader = lcs.TraceReader(filename, lcs.TraceType.CSV_TRACE)
batch = []
total_processed = 0
for request in reader:
batch.append(request)
if len(batch) >= batch_size:
# 处理批次
for req in batch:
cache.get(req.obj_id, req.obj_size)
total_processed += len(batch)
print(f"处理了 {total_processed} 个请求")
batch = []
# 处理剩余请求
for req in batch:
cache.get(req.obj_id, req.obj_size)
return total_processed + len(batch)
# 使用
cache = lcs.LRU(cache_size=10*1024*1024)
total = process_trace_in_batches("large_trace.csv", cache)
print(f"总共处理了 {total} 个请求")
内存高效的请求处理¶
import libcachesim as lcs
def memory_efficient_simulation(filename, cache_size):
"""内存高效的缓存模拟。"""
reader_params = lcs.ReaderInitParam()
reader_params.cap_at_n_req = 1000000 # 限制内存中的请求数
reader = lcs.TraceReader(filename, lcs.TraceType.CSV_TRACE, reader_params)
cache = lcs.LRU(cache_size)
request_count = 0
for request in reader:
cache.get(request.obj_id, request.obj_size)
request_count += 1
# 定期报告进度
if request_count % 100000 == 0:
print(f"进度: {request_count:,} 请求,命中率: {cache.get_hit_ratio():.2%}")
return cache.get_hit_ratio()
# 使用
hit_ratio = memory_efficient_simulation("workload.csv", 10*1024*1024)
print(f"最终命中率: {hit_ratio:.2%}")
这些示例展示了libCacheSim Python绑定的各种使用场景,从基础缓存模拟到高级性能分析和优化技术。根据您的具体需求调整这些示例。