memcached-java客户端xmemcached使用总结
1. 最近调研了下memcached,之前2篇博客简单得介绍了用法,现在做个总结就当调研结束,本博客将从以下几个方面进行总结,
a. xmemcached简介
b. xmemcached的分布式
c. xmemcached支持的存储对象
d. xmemcached的容错性
e. xmemcached的性能测试
由于memcached可以与spring集成,所以本博客以和spring集成来讲以上几点
先来看看如何与spring集成,首先在pom中引入相应的包(spring相关的不列出来)
<dependency> <groupId>com.googlecode.xmemcached</groupId> <artifactId>xmemcached</artifactId> <version>2.0.0</version> </dependency>
以下是properties文件:
# the pool size(the number of client) memcached.connectionPoolSize=50 # in this mode, when a node out, it will throws MemcachedException when call this node memcached.failureMode=true #server1 memcached.server1.host=192.168.88.140 memcached.server1.port=11211 memcached.server1.weight=1 #server2 memcached.server2.host=192.168.88.141 memcached.server2.port=11211 memcached.server2.weight=1 #server3 memcached.server3.host=192.168.88.142 memcached.server3.port=11211 memcached.server3.weight=1
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:c="http://www.springframework.org/schema/c" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <bean id="memcachedClientBuilder" class="net.rubyeye.xmemcached.XMemcachedClientBuilder" p:connectionPoolSize="${memcached.connectionPoolSize}" p:failureMode="${memcached.failureMode}"> <!-- XMemcachedClientBuilder have two arguments.First is server list,and second is weights array. --> <constructor-arg> <list> <bean class="java.net.InetSocketAddress"> <constructor-arg> <value>${memcached.server1.host}</value> </constructor-arg> <constructor-arg> <value>${memcached.server1.port}</value> </constructor-arg> </bean> <bean class="java.net.InetSocketAddress"> <constructor-arg> <value>${memcached.server2.host}</value> </constructor-arg> <constructor-arg> <value>${memcached.server2.port}</value> </constructor-arg> </bean> <bean class="java.net.InetSocketAddress"> <constructor-arg> <value>${memcached.server3.host}</value> </constructor-arg> <constructor-arg> <value>${memcached.server3.port}</value> </constructor-arg> </bean> </list> </constructor-arg> <constructor-arg> <list> <value>${memcached.server1.weight}</value> <value>${memcached.server2.weight}</value> <value>${memcached.server3.weight}</value> </list> </constructor-arg> <!-- BinaryCommandFactory --> <property name="commandFactory"> <bean class="net.rubyeye.xmemcached.command.BinaryCommandFactory" /> </property> <property name="transcoder"> <bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" /> </property> <property name="bufferAllocator"> <bean class="net.rubyeye.xmemcached.buffer.SimpleBufferAllocator"></bean> </property> </bean> <!-- Use factory bean to build memcached client --> <bean id="memcachedClient" factory-bean="memcachedClientBuilder" factory-method="build" destroy-method="shutdown" /> </beans>
这样就可以用memcachedClient了
举个简单的例子说明如何使用:
@Autowired private MemcachedClient memcachedClient; public Map<String, User> queryFromCache(List<String> keys) { Map<String, User> users = new HashMap<String, User>(); for (String key : keys) { try { User user = memcachedClient.get(key); users.put(key, user); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } } return users; }
xmemcached是一个新java memcached client。简单来说,Memcached 是一个高性能的分布式内存对象的key-value缓存系统,用于动态Web应用以减轻数据库负载,现在也有很多人将它作为内存式数据库在使用,memcached通过它的自定义协议与客户端交互,而XMemcached就是它的一个java客户端实现。
Memcached的java客户端已经存在两个了:官方提供的基于传统阻塞io的客户端 、Dustin Sallings实现的基于java nio的spymemcached 。另外还有一些在此基础上的改进版本。
XMemcached的主要特性
高性能
XMemcached同样是基于java nio的客户端,java nio相比于传统阻塞io模型来说,有效率高(特别在高并发下)和资源耗费相对较少的优点。传统阻塞IO为了提高效率,需要创建一定数量的连接形成连接池,而nio仅需要一个连接即可(当然,nio也是可以做池化处理),相对来说减少了线程创建和切换的开销,这一点在高并发下特别明显。因此XMemcached与Spymemcached在性能都非常优秀,在某些方面(存储的数据比较小的情况下)Xmemcached比Spymemcached的表现更为优秀,具体可以看这个 Java Memcached Clients Benchmark 。
支持完整的协议
Xmemcached支持所有的memcached协议,包括1.4.0正式开始使用的 二进制协议 。
支持客户端分布
Memcached的分布只能通过客户端来实现,XMemcached实现了此功能,并且提供了一致性哈希(consistent hash)算法的实现。
允许设置节点权重
XMemcached允许通过设置节点的权重来调节memcached的负载,设置的权重越高,该memcached节点存储的数据将越多,所承受的负载越大。
动态增删节点
XMemcached允许通过JMX或者代码编程实现节点的动态添加或者移除,方便用户扩展和替换节点等。
支持JMX
XMemcached通过JMX暴露的一些接口,支持client本身的监控和调整,允许动态设置调优参数、查看统计数据、动态增删节点等。
由于memcached不支持服务器端分布式,所以memcached的分布只能通过客户端来实现,xmemcached实现了此功能,并且提供了一致性哈希(consistent hash)算法的实现,关于一致性hashing见http://kb.cnblogs.com/page/42734/ 需要说明的是这还只是客户端分布式,不支持主从,各个节点其实还是单独存储,并且未实现存储冗余,节点挂了数据就会相应得丢失 具有如下优势:允许设置节点权重
XMemcached允许通过设置节点的权重来调节memcached的负载,设置的权重越高,该memcached节点存储的数据将越多,所承受的负载越大。
动态增删节点
XMemcached允许通过JMX或者代码编程实现节点的动态添加或者移除,方便用户扩展和替换节点等。
支持JMX
XMemcached通过JMX暴露的一些接口,支持client本身的监控和调整,允许动态设置调优参数、查看统计数据、动态增删节点等
xmemcached支持的存储比较简单,只要实现了java序列化接口(Serializable接口)的对象就可以存储,当然可以包含Map、Set、List这些复杂对象 如果当前连接了3个服务器端,其中有一个突然挂掉,那么执行client操作时会抛出异常,但是如果memcached.failureMode=true设置成false那么就不抛出异常,只不过你查询的那个数据显示为null 由于是客户端分布式,所以并未实现存储冗余,如果要冗余需要自己实现代码 这里有一个博客介绍了spymemcached和xmemcached的性能对比,http://www.blogjava.net/killme2008/archive/2009/03/04/257852.html 下面我自己来做一个测试 测试环境 memcached版本1.4.22 启动memcached的内存:1G 节点数:3 测试结果单位:tps
先测试简单的String,测试一共开100个线程,每个线程执行10000次操作,结果如下:
写 | 76062 |
读 | 91116 |
删 | 89863 |
再来测试复杂型的Map, 测试一共开50个线程,每个线程执行100次操作,结果如下:
写 | 2399 |
读 | 1901 |
删 | 13227 |
具体代码如下:
public class PerformanceTest { static class TestWriteRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestWriteRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); if (!mc.set(key, 0, key)) { System.err.println("set error"); } } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } static class TestReadRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestReadRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); String result = (String) mc.get(key); if (!key.equals(result)) { System.out.println(key + " " + result); System.err.println("get error"); } } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } static class TestDeleteRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestDeleteRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); if (!mc.delete(key)) System.err.println("delete error"); } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } // thread num=10, repeat=10000,size=2, all=200000 ,velocity=1057 , using // time:189187 static public void main(String[] args) { try { String address = "192.168.88.140:11211 192.168.88.141:11211 192.168.88.142:11211"; int size = Runtime.getRuntime().availableProcessors(); int thread = 100; int repeat = 10000; MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address), new int[] { 1, 1, 1 }); MemcachedClient mc = null; builder.setConnectionPoolSize(5); try { mc = builder.build(); } catch (IOException e) { e.printStackTrace(); } CountDownLatch cdl = new CountDownLatch(thread); long t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest.TestWriteRunnable(mc, i * 10000, cdl, repeat)).start(); } cdl.await(); long all = thread * repeat; long usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test write,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); cdl = new CountDownLatch(thread); t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest.TestReadRunnable(mc, i * 10000, cdl, repeat)).start(); } cdl.await(); all = thread * repeat; usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test read,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); cdl = new CountDownLatch(thread); t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest.TestDeleteRunnable(mc, i * 10000, cdl, repeat)).start(); } cdl.await(); all = thread * repeat; usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test delete,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); mc.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }
public class PerformanceTest2 { static Map<String, Book> map2 = new HashMap<String, Book>(); static final int ELEMENT_NUM = 100; static { for (int i = 0; i < ELEMENT_NUM; i++) map2.put(String.valueOf(i), new Book(i, String.valueOf(i), String.valueOf(i))); } static class TestWriteRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestWriteRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); if (!mc.set(key, 0, map2)) { System.err.println("set error"); } } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } static class TestReadRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestReadRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } @SuppressWarnings("unchecked") public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); Map<String, Book> result = (Map<String, Book>) mc.get(key); if (result.size() != ELEMENT_NUM) { System.err.println("get error"); } } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } static class TestDeleteRunnable implements Runnable { private MemcachedClient mc; private CountDownLatch cd; int repeat; int start; public TestDeleteRunnable(MemcachedClient mc, int start, CountDownLatch cdl, int repeat) { super(); this.mc = mc; this.start = start; this.cd = cdl; this.repeat = repeat; } public void run() { try { for (int i = 0; i < repeat; i++) { String key = String.valueOf(start + i); if (!mc.delete(key)) System.err.println("delete error"); } } catch (Exception e) { e.printStackTrace(); } finally { cd.countDown(); } } } // thread num=10, repeat=10000,size=2, all=200000 ,velocity=1057 , using // time:189187 static public void main(String[] args) { try { String address = "192.168.88.140:11211 192.168.88.141:11211 192.168.88.142:11211"; int size = Runtime.getRuntime().availableProcessors(); int thread = 50; int repeat = 100; MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address), new int[] { 1, 1, 1 }); MemcachedClient mc = null; builder.setConnectionPoolSize(5); try { mc = builder.build(); } catch (IOException e) { e.printStackTrace(); } CountDownLatch cdl = new CountDownLatch(thread); long t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest2.TestWriteRunnable(mc, i * 10000, cdl, repeat)).start(); } try { cdl.await(); } catch (InterruptedException e) { } long all = thread * repeat; long usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test write,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); cdl = new CountDownLatch(thread); t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest2.TestReadRunnable(mc, i * 10000, cdl, repeat)).start(); } try { cdl.await(); } catch (InterruptedException e) { } all = thread * repeat; usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test read,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); cdl = new CountDownLatch(thread); t = System.currentTimeMillis(); for (int i = 0; i < thread; i++) { new Thread(new PerformanceTest2.TestDeleteRunnable(mc, i * 10000, cdl, repeat)).start(); } try { cdl.await(); } catch (InterruptedException e) { } all = thread * repeat; usingtime = (System.currentTimeMillis() - t); System.out.println(String.format( "test delete,thread num=%d, repeat=%d,size=%d, all=%d ,velocity=%d , using time:%d", thread, repeat, size, all, 1000 * all / usingtime, usingtime)); mc.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }