布隆过滤器
是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。但是布隆过滤器可以控制错误率。
具体的布隆过滤器相关的内容可查找相关资料,非常详细,其优势就是占用内存比hash表要小得多,非常适合用于做过滤的场景
Guava中的布隆过滤器
Guava是google开发的java基础库,其中提供了布隆过滤器的实现,即名为BloomFilter的类,其使用方式类似如下:
使用Redis实现布隆过滤器
当布隆过滤器也需要使用大量内存,并要求在多台机器之间共享时,Guava提供的BloomFilter就难以满足需求了。BloomFilter在数据存在上,实际上可以认为是一个非常大了位图,而redis支持bitmap数据结构,正好可以用于实现布隆过滤器。
然而,我们如何实现BloomFilter呢,我们可以先看看guava中的BloomFilter的实现方式:
BloomFilter.put()方法中,直接调用了strategy.put(),我们可以继续进入到这个Strategy中:
可以看到,Strategy是BloomFilter类中的内部接口,是用于当布隆过滤器存储的对象转换成bits,guava中提供的实现是一个enum:
我们继续看看其put方法的实现:
其中,除了hash以外,就是对LockFreeBitArray的操作,因此,如果我们能通过redis实现一个新的LockFreeBitArray,那我们就能实现一个基于redis的布隆过滤器了,但是很可惜,LockFreeBitArray是final的类,且是包访问权限,我们无法从LockFreeBitArray类做扩展。
那么我们只有使用两种方式:
- 自己从头开始实现BloomFilter
- 拿来主义,都是开源的了,抄代码吧,把BloomFilter相关的代码copy出来,替换掉LockFreeBitArray
我这里使得了第二种方式,将guava中的BloomFilter复制一份,并加上JedisPool参数用于访问redis,然后基于redis实现一个LockFreeBitArray,其中基于redis的LockFreeBitArray的实现如下:
static final class LockFreeBitArray { private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class); private static final int LONG_ADDRESSABLE_BITS = 6; private final JedisPool jedisPool; private final String redisKey; private final long numBits; // Used by serialization LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) { checkNotNull(jedisPool, "jedisPool is null!"); checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!"); this.jedisPool = jedisPool; this.redisKey = redisKey; this.numBits = numBits; } /** * Returns true if the bit changed value. */ boolean set(long... bitIndexes) { final Closer closer = Closer.create(); try { final Jedis jedis = closer.register(jedisPool.getResource()); final Pipeline pipeline = closer.register(jedis.pipelined()); for (long bitIndex : bitIndexes) { pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true); } final Response<List<Object>> responses = pipeline.exec(); boolean changed = false; final List<Object> rsts = responses.get(); for (Object rst : rsts) { changed |= (Boolean) rst; } return changed; } finally { try { closer.close(); } catch (IOException e) { logger.error("close resource failed", e); } } } boolean get(long... bitIndexes) { final Closer closer = Closer.create(); try { final Jedis jedis = closer.register(jedisPool.getResource()); final Pipeline pipeline = closer.register(jedis.pipelined()); for (long bitIndex : bitIndexes) { pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS); } final Response<List<Object>> responses = pipeline.exec(); final List<Object> rsts = responses.get(); for (Object rst : rsts) { if (!(Boolean) rst) { return false; } } return true; } finally { try { closer.close(); } catch (IOException e) { logger.error("close resource failed", e); } } } long bitSize() { return numBits; } long bitCount() { try (final Jedis jedis = jedisPool.getResource()) { return jedis.bitcount(redisKey); } } @Override public boolean equals(@Nullable Object o) { if (o instanceof LockFreeBitArray) { LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o; return Objects.equals(redisKey, lockFreeBitArray.redisKey); } return false; } @Override public int hashCode() { return Objects.hashCode(redisKey); } }
可以看到,本质上就是通过一个key创建出一个bitmap,代码本身只是将原来guava的LockFreeBitArray中的byte数据替换成了redis和bitmap
整个BloomFilterStrategies的重新实现如下:
enum BloomFilterStrategies implements RedisBloomFilter.Strategy { MURMUR128_MITZ_32() { @Override public <T> boolean put( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong(); int hash1 = (int) hash64; int hash2 = (int) (hash64 >>> 32); long[] indexes = new long[numHashFunctions]; for (int i = 1; i <= numHashFunctions; i++) { int combinedHash = hash1 + (i * hash2); // Flip all the bits if it's negative (guaranteed positive number) if (combinedHash < 0) { combinedHash = ~combinedHash; } indexes[i] = combinedHash & bitSize; } return bits.set(indexes); } @Override public <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong(); int hash1 = (int) hash64; int hash2 = (int) (hash64 >>> 32); for (int i = 1; i <= numHashFunctions; i++) { int combinedHash = hash1 + (i * hash2); // Flip all the bits if it's negative (guaranteed positive number) if (combinedHash < 0) { combinedHash = ~combinedHash; } if (!bits.get(combinedHash % bitSize)) { return false; } } return true; } }, /** * This strategy uses all 128 bits of {@link Hashing#murmur3_128} when hashing. It looks different * than the implementation in MURMUR128_MITZ_32 because we're avoiding the multiplication in the * loop and doing a (much simpler) += hash2\. We're also changing the index to a positive number by * AND'ing with Long.MAX_VALUE instead of flipping the bits. */ MURMUR128_MITZ_64() { @Override public <T> boolean put( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); long combinedHash = hash1; long[] indexes = new long[numHashFunctions]; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable indexes[i] = (combinedHash & Long.MAX_VALUE) % bitSize; combinedHash += hash2; } return bits.set(indexes); } @Override public <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); long combinedHash = hash1; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { return false; } combinedHash += hash2; } return true; } private /* static */ long lowerEight(byte[] bytes) { return Longs.fromBytes( bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); } private /* static */ long upperEight(byte[] bytes) { return Longs.fromBytes( bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]); } }; static final class LockFreeBitArray { private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class); private static final int LONG_ADDRESSABLE_BITS = 6; private final JedisPool jedisPool; private final String redisKey; private final long numBits; // Used by serialization LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) { checkNotNull(jedisPool, "jedisPool is null!"); checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!"); this.jedisPool = jedisPool; this.redisKey = redisKey; this.numBits = numBits; } /** * Returns true if the bit changed value. */ boolean set(long... bitIndexes) { final Closer closer = Closer.create(); try { final Jedis jedis = closer.register(jedisPool.getResource()); final Pipeline pipeline = closer.register(jedis.pipelined()); for (long bitIndex : bitIndexes) { pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true); } final Response<List<Object>> responses = pipeline.exec(); boolean changed = false; final List<Object> rsts = responses.get(); for (Object rst : rsts) { changed |= (Boolean) rst; } return changed; } finally { try { closer.close(); } catch (IOException e) { logger.error("close resource failed", e); } } } boolean get(long... bitIndexes) { final Closer closer = Closer.create(); try { final Jedis jedis = closer.register(jedisPool.getResource()); final Pipeline pipeline = closer.register(jedis.pipelined()); for (long bitIndex : bitIndexes) { pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS); } final Response<List<Object>> responses = pipeline.exec(); final List<Object> rsts = responses.get(); for (Object rst : rsts) { if (!(Boolean) rst) { return false; } } return true; } finally { try { closer.close(); } catch (IOException e) { logger.error("close resource failed", e); } } } long bitSize() { return numBits; } long bitCount() { try (final Jedis jedis = jedisPool.getResource()) { return jedis.bitcount(redisKey); } } @Override public boolean equals(@Nullable Object o) { if (o instanceof LockFreeBitArray) { LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o; return Objects.equals(redisKey, lockFreeBitArray.redisKey); } return false; } @Override public int hashCode() { return Objects.hashCode(redisKey); } } }
最后是与BloomFilter几乎一样的RedisBloomFilter:
import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.hash.Funnel; import com.google.common.math.DoubleMath; import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.checkerframework.checker.nullness.qual.Nullable; import redis.clients.jedis.JedisPool; import java.io.Serializable; import java.math.RoundingMode; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** * @see com.google.common.hash.BloomFilter */ public final class RedisBloomFilter<T> implements Predicate<T>, Serializable { interface Strategy extends java.io.Serializable { <T> boolean put( T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits); <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits); int ordinal(); } private final BloomFilterStrategies.LockFreeBitArray bits; private final int numHashFunctions; private final Funnel<? super T> funnel; private final Strategy strategy; private RedisBloomFilter( BloomFilterStrategies.LockFreeBitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) { checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions); checkArgument( numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions); this.bits = checkNotNull(bits); this.numHashFunctions = numHashFunctions; this.funnel = checkNotNull(funnel); this.strategy = checkNotNull(strategy); } public boolean mightContain(T object) { return strategy.mightContain(object, funnel, numHashFunctions, bits); } @Deprecated @Override public boolean apply(T input) { return mightContain(input); } @CanIgnoreReturnValue public boolean put(T object) { return strategy.put(object, funnel, numHashFunctions, bits); } public double expectedFpp() { // You down with FPP? (Yeah you know me!) Who's down with FPP? (Every last homie!) return Math.pow((double) bits.bitCount() / bitSize(), numHashFunctions); } public long approximateElementCount() { long bitSize = bits.bitSize(); long bitCount = bits.bitCount(); double fractionOfBitsSet = (double) bitCount / bitSize; return DoubleMath.roundToLong( -Math.log1p(-fractionOfBitsSet) * bitSize / numHashFunctions, RoundingMode.HALF_UP); } @VisibleForTesting long bitSize() { return bits.bitSize(); } public boolean isCompatible(RedisBloomFilter<T> that) { checkNotNull(that); return this != that && this.numHashFunctions == that.numHashFunctions && this.bitSize() == that.bitSize() && this.strategy.equals(that.strategy) && this.funnel.equals(that.funnel); } @Override public boolean equals(@Nullable Object object) { if (object == this) { return true; } if (object instanceof RedisBloomFilter) { RedisBloomFilter<?> that = (RedisBloomFilter<?>) object; return this.numHashFunctions == that.numHashFunctions && this.funnel.equals(that.funnel) && this.bits.equals(that.bits) && this.strategy.equals(that.strategy); } return false; } @Override public int hashCode() { return Objects.hashCode(numHashFunctions, funnel, strategy, bits); } public static <T> RedisBloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, JedisPool jedisPool, String redisKey) { return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64, jedisPool, redisKey); } @VisibleForTesting static <T> RedisBloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy, JedisPool jedisPool, String key) { checkNotNull(funnel); checkArgument( expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions); checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp); checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp); checkNotNull(strategy); checkNotNull(jedisPool); if (expectedInsertions == 0) { expectedInsertions = 1; } long numBits = optimalNumOfBits(expectedInsertions, fpp); int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); try { return new RedisBloomFilter<T>(new BloomFilterStrategies.LockFreeBitArray(numBits, key, jedisPool), numHashFunctions, funnel, strategy); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Could not create RedisBloomFilter of " + numBits + " bits", e); } } public static <T> RedisBloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions, JedisPool jedisPool, String redisKey) { return create(funnel, expectedInsertions, 0.03, jedisPool, redisKey); // FYI, for 3%, we always get 5 hash functions } @VisibleForTesting static int optimalNumOfHashFunctions(long n, long m) { // (m / n) * log(2), but avoid truncation due to division! return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); } @VisibleForTesting static long optimalNumOfBits(long n, double p) { if (p == 0) { p = Double.MIN_VALUE; } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } }
待优化点
目前的环境中使得的redis是单机的,所以这样使用是没问题的,但是对于使用redis集群而言,这样做就不太好了,因为整个BloomFilter只关联了一个key,无法分散到redis集群中的各台机器上,因此可以针对集群做一个优化,一种可行的思路就是将一个BloomFilter拆分成多个BloomFilter,生成不同的key,将BloomFilter的数据分散到redis集群中不同的redis机器上,这样可充分发挥出redis集群的性能和缓存的容量
没有帐号? 现在注册.