杰瑞科技汇

memcached java 使用

目录

  1. Memcached 简介
  2. 环境准备

    安装并启动 Memcached 服务器

    memcached java 使用-图1
    (图片来源网络,侵删)
  3. Java 客户端选择
    • XMemcached (推荐)
    • SpyMemcached
  4. XMemcached 快速入门
    • Maven/Gradle 依赖
    • 连接 Memcached 服务器
    • 基本操作 (增删改查)
    • 处理 "CAS" (Check-And-Set) 机制
  5. 高级特性
    • 序列化
    • 连接池配置
    • 节点动态增删
  6. 最佳实践与注意事项
    • Key 的设计
    • Value 的序列化
    • 缓存穿透、击穿、雪崩
    • 缓存与数据库的一致性
    • 监控与维护
  7. 完整代码示例

Memcached 简介

Memcached 是一个高性能、分布式内存对象缓存系统,它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。

核心特点:

  • 内存存储:所有数据都存储在内存中,读写速度极快。
  • 简单协议:使用简单的文本协议,易于实现。
  • 多客户端支持:支持多种编程语言的客户端。
  • 分布式:虽然本身不提供分布式集群管理,但可以通过客户端或中间件(如 Magent)实现分布式缓存,将数据分散到多个 Memcached 实例上。

环境准备:安装并启动 Memcached 服务器

在 Java 代码运行之前,你需要一个正在运行的 Memcached 服务器。

以 Linux (Ubuntu/Debian) 为例:

memcached java 使用-图2
(图片来源网络,侵删)
# 1. 安装 Memcached
sudo apt-get update
sudo apt-get install memcached
# 2. 启动 Memcached 服务
# 默认监听 11211 端口,最大内存 64MB,最大连接数 1024
sudo systemctl start memcached
# 3. 检查服务状态
sudo systemctl status memcached
# 4. (可选) 测试连接
# 使用 telnet 连接到 11211 端口
telnet 127.0.0.1 11211
# 然后输入 stats 命令,如果返回服务器统计信息,则表示成功
# quit 退出

Windows 用户: 可以从 https://memcached.org/downloads 下载 Windows 版本,或使用 WSL (Windows Subsystem for Linux) 来运行。

Java 客户端选择

Java 中最流行的两个 Memcached 客户端是 XMemcachedSpyMemcached

特性 XMemcached SpyMemcached
性能 更高,基于 NIO,异步模型,性能更优。 基于 Java NIO,性能也不错,但略逊于 XMemcached。
功能 功能丰富,支持连接池、动态增删节点、序列化框架集成等。 功能相对基础,专注于核心的缓存操作。
活跃度 非常活跃,社区维护良好,更新频繁。 活跃度一般,更新较慢。
推荐度 强烈推荐,尤其对性能和功能有较高要求的现代应用。 可以用于小型项目或不那么关键的场景。

除非有特殊原因,否则 XMemcached 是你的首选。

XMemcached 快速入门

我们将以 XMemcached 为例进行讲解。

memcached java 使用-图3
(图片来源网络,侵删)

Maven/Gradle 依赖

pom.xml 中添加 XMemcached 依赖:

<dependency>
    <groupId>com.googlecode.xmemcached</groupId>
    <artifactId>xmemcached</artifactId>
    <version>2.4.7</version> <!-- 请使用最新版本 -->
</dependency>

或者在 build.gradle 中:

implementation 'com.googlecode.xmemcached:xmemcached:2.4.7' // 请使用最新版本

连接 Memcached 服务器

最简单的方式是创建一个 MemcachedClient 实例,并提供服务器地址列表。

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
public class MemcachedExample {
    public static void main(String[] args) {
        // 创建 Memcached 客户端
        // AddrUtil.getAddresses("host1:port1 host2:port2 ...") 用于配置多个节点
        MemcachedClient memcachedClient = null;
        try {
            MemcachedClientBuilder builder = new XMemcachedClientBuilder(
                AddrUtil.getAddresses("127.0.0.1:11211")
            );
            memcachedClient = builder.build();
            // ... 在这里进行缓存操作 ...
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭客户端,释放资源
            if (memcachedClient != null) {
                try {
                    memcachedClient.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

基本操作 (增删改查)

XMemcached 提供了非常简洁的 API。

// 假设 memcachedClient 已成功创建
// 1. 添加 (Add) - key 不存在则添加,存在则失败
memcachedClient.add("user:1001:profile", 3600, "Alice"); // 过期时间为 3600 秒 (1 小时)
System.out.println("Add user:1001:profile: " + memcachedClient.get("user:1001:profile")); // 输出: Alice
// 尝试添加一个已存在的 key
boolean addResult = memcachedClient.add("user:1001:profile", 3600, "Bob");
System.out.println("Add existing key, result: " + addResult); // 输出: false
// 2. 替换 (Replace) - key 存在则替换,不存在则失败
boolean replaceResult = memcachedClient.replace("user:1001:profile", 3600, "Alice Updated");
System.out.println("Replace existing key, result: " + replaceResult); // 输出: true
System.out.println("Get after replace: " + memcachedClient.get("user:1001:profile")); // 输出: Alice Updated
// 3. 设置 (Set) - key 存在则覆盖,不存在则添加 (最常用的操作)
memcachedClient.set("user:1001:profile", 3600, "Alice Smith");
System.out.println("Get after set: " + memcachedClient.get("user:1001:profile")); // 输出: Alice Smith
// 4. 获取 (Get)
String value = memcachedClient.get("user:1001:profile");
System.out.println("Get value: " + value);
// 5. 删除 (Delete)
memcachedClient.delete("user:1001:profile");
String valueAfterDelete = memcachedClient.get("user:1001:profile");
System.out.println("Get after delete: " + valueAfterDelete); // 输出: null

处理 "CAS" (Check-And-Set) 机制

CAS 是 Memcached 提供的一种乐观锁机制,用于防止并发写入时的数据覆盖问题,它通过一个唯一的 cas 标识来实现。

// 1. 获取带 CAS 值的版本
String key = "product:stock:123";
memcachedClient.set(key, 0, 100); // 初始库存 100
// 获取值和 CAS ID
GetsResponse<Integer> response = memcachedClient.gets(key);
int currentValue = response.get();
long casId = response.getCas();
System.out.println("Current stock: " + currentValue + ", CAS ID: " + casId);
// 模拟多个客户端同时修改
// 客户端 A: 检查并减少库存
if (currentValue > 0) {
    // 使用 casId 来确保我们修改的是最新的版本
    boolean success = memcachedClient.cas(key, 0, currentValue - 1, casId);
    if (success) {
        System.out.println("Client A: Stock decreased successfully.");
    } else {
        System.out.println("Client A: CAS failed, stock was modified by another client.");
    }
}
// 客户端 B: 尝试在 A 修改前也修改
// 在 A 修改后,casId 已经改变,B 的操作会失败
// (在实际应用中,客户端 B 会先重新获取 gets,然后再尝试 cas)

高级特性

序列化

默认情况下,XMemcached 使用 Java 序列化,对于性能要求高的场景,建议使用更高效的序列化方式,如 KryoProtostuffFastJSON

使用 Kryo 序列化:

// 1. 添加 Kryo 依赖
// implementation "com.esotericsoftware:kryo:5.4.0"
// 2. 创建 Kryo 序列化转换器
import net.rubyeye.xmemcached.transcoders.Transcoder;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class KryoTranscoder<T> implements Transcoder<T> {
    private final Kryo kryo = new Kryo();
    private final Class<T> type;
    public KryoTranscoder(Class<T> type) {
        this.type = type;
        // kryo.setReferences(true); // 可选,处理对象引用
        // kryo.register(type); // 可选,注册类以提高性能
    }
    @Override
    public byte[] encode(T obj) {
        if (obj == null) {
            return new byte[0];
        }
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
             Output output = new Output(bos)) {
            kryo.writeClassAndObject(output, obj);
            output.flush();
            return bos.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("Kryo encode failed", e);
        }
    }
    @Override
    public T decode(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        try (Input input = new Input(bytes)) {
            return (T) kryo.readClassAndObject(input);
        } catch (Exception e) {
            throw new RuntimeException("Kryo decode failed", e);
        }
    }
    @Override
    public int getMaxSize() {
        return 1024 * 1024; // 1MB
    }
    // ... 其他方法实现 ...
}
// 3. 在 MemcachedClientBuilder 中配置
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("127.0.0.1:11211"));
// 为特定类型的对象设置自定义序列化
builder.setTranscoder(new KryoTranscoder<>(YourCustomObject.class));
// 或者全局设置
// builder.setTranscoder(new SerializingTranscoder()); // 默认

连接池配置

XMemcached 内置了连接池,可以通过 ConnectionPoolConfiguration 进行精细配置。

MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("127.0.0.1:11211"));
// 配置连接池
builder.getConfiguration().setSessionIdleTimeout(60000); // 空闲连接超时时间 (ms)
builder.getConfiguration().setConnectionPoolSize(10); // 每个服务器的连接池大小
builder.getConfiguration().setFailureMode(true); // 失败模式:true-快速失败, false-尝试下一个节点
MemcachedClient memcachedClient = builder.build();

节点动态增删

对于需要动态扩容缩容的场景,可以动态添加或删除 Memcached 节点。

// 假设 client 已创建
MemcachedClient client = ...;
// 添加节点
String newAddress = "192.168.1.102:11211";
boolean added = client.addServer(newAddress);
System.out.println("Add server " + newAddress + " result: " + added);
// 删除节点
boolean removed = client.removeServer("127.0.0.1:11211");
System.out.println("Remove server 127.0.0.1:11211 result: " + removed);

最佳实践与注意事项

Key 的设计

  • 清晰命名:使用有意义的、可读性强的 Key,user:1001:profileproduct:123:inventory
  • 使用前缀:为不同业务模块使用不同的前缀,便于管理和避免 Key 冲突,session:, cache:, data:
  • 保持简洁:Key 越短,占用的内存和网络带宽越少。
  • 避免特殊字符:虽然 Memcached Key 可以包含空格和换行,但最好只使用字母、数字、下划线和冒号。

Value 的序列化

  • 避免 Java 原生序列化:它性能较差,且可能存在安全问题。
  • 选择高效的序列化框架:如 KryoProtostuffFSTHessian,它们能显著减少数据体积并提高序列化/反序列化速度。
  • 考虑使用 JSON:如果数据结构简单,或者需要与前端等非 Java 系统交互,可以使用 JSON (如 Fastjson, Jackson, Gson),但要注意 JSON 的性能通常不如二进制序列化框架。

缓存穿透、击穿、雪崩

  • 缓存穿透:查询一个根本不存在的数据。
    • 解决方案
      1. 缓存空对象:如果查询结果为空,也将其缓存起来,并设置一个较短的过期时间。
      2. 布隆过滤器:在访问缓存前,使用布隆过滤器快速判断 Key 是否可能存在,如果不存在则直接返回。
  • 缓存击穿:某个热点 Key某一刻失效,大量并发请求直接打到数据库上。
    • 解决方案
      1. 互斥锁:当缓存失效时,只允许一个线程去查询数据库并重建缓存,其他线程等待,可以使用 synchronized 或分布式锁 (如 Redisson)。
      2. 热点数据永不过期:逻辑上设置一个过期时间,但不使用 Memcached 的过期机制,由后台任务定时刷新。
  • 缓存雪崩大量 Key 在同一时间集中过期,导致大量请求瞬间涌向数据库。
    • 解决方案
      1. 过期时间加随机数:为 Key 的过期时间增加一个随机范围,避免同时失效,基础过期时间 1 小时,加上 0-300 秒的随机数。
      2. 高可用集群:部署多个 Memcached 实例,避免单点故障。
      3. 持久化与预热:Memcached 支持,可以配置数据持久化,并在服务重启时进行预热。

缓存与数据库的一致性

  • Cache-Aside (旁路缓存) 模式:这是最常用的模式。
    • 读流程:先读缓存,缓存未命中则读数据库,然后将数据写入缓存。
    • 写流程:先更新数据库,然后删除缓存中的对应数据。(为什么是删除而不是更新?因为更新缓存需要两次网络开销,而删除只需要一次,即使读请求在删除前到达,也只是短暂不一致,很快会被修复)。
  • Write-Through (穿透写) 模式:应用只和缓存交互,由缓存来负责与数据库同步,实现较复杂,较少使用。

监控与维护

  • 监控:定期使用 stats 命令查看 Memcached 的运行状态,包括内存使用、连接数、命中率、网络 I/O 等。
  • 内存管理:监控 evictions (淘汰) 指标,如果该值很高,说明内存不足,需要增加 Memcached 服务器的内存或优化 Key 的设计。
  • 重启:Memcached 重启会清空所有数据,对于有高可用要求的服务,需要使用 Keepalived 等工具实现平滑重启。

完整代码示例

这是一个整合了连接、基本操作、CAS 和异常处理的完整示例。

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.utils.AddrUtil;
import java.util.concurrent.TimeoutException;
public class MemcachedFullExample {
    public static void main(String[] args) {
        MemcachedClient memcachedClient = null;
        try {
            // 1. 创建客户端
            MemcachedClientBuilder builder = new XMemcachedClientBuilder(
                AddrUtil.getAddresses("127.0.0.1:11211")
            );
            // 可选:配置连接池
            builder.getConfiguration().setConnectionPoolSize(5);
            memcachedClient = builder.build();
            System.out.println("------ 基本操作 ------");
            // Set
            memcachedClient.set("user:session:abc123", 3600, "user_id:1001");
            System.out.println("Set 'user:session:abc123'");
            // Get
            String sessionValue = memcachedClient.get("user:session:abc123");
            System.out.println("Get 'user:session:abc123': " + sessionValue);
            // Delete
            memcachedClient.delete("user:session:abc123");
            String deletedValue = memcachedClient.get("user:session:abc123");
            System.out.println("Get after delete: " + deletedValue); // 应为 null
            System.out.println("\n------ CAS 操作 ------");
            String productKey = "product:stock:555";
            memcachedClient.set(productKey, 0, 50); // 初始库存 50
            System.out.println("Initial stock for product 555: " + memcachedClient.get(productKey));
            // 模拟并发减库存
            for (int i = 0; i < 3; i++) {
                new Thread(() -> {
                    try {
                        // 每次操作前都重新获取最新值和 CAS ID
                        GetsResponse<Integer> response = memcachedClient.gets(productKey);
                        int stock = response.get();
                        long cas = response.getCas();
                        if (stock > 0) {
                            boolean success = memcachedClient.cas(productKey, 0, stock - 1, cas);
                            if (success) {
                                System.out.println(Thread.currentThread().getName() + ": Stock decreased to " + (stock - 1));
                            } else {
                                System.out.println(Thread.currentThread().getName() + ": CAS failed, will retry.");
                            }
                        } else {
                            System.out.println(Thread.currentThread().getName() + ": Stock is empty.");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            // 等待所有线程完成
            Thread.sleep(1000);
            System.out.println("Final stock for product 555: " + memcachedClient.get(productKey));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 2. 关闭客户端
            if (memcachedClient != null) {
                try {
                    memcachedClient.shutdown();
                    System.out.println("\nMemcached client shutdown.");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

希望这份详细的指南能帮助你顺利地在 Java 项目中使用 Memcached!

分享:
扫描分享到社交APP
上一篇
下一篇