七的博客

Redisson2.0源码分析12-分布式Map

源码分析

Redisson2.0源码分析12-分布式Map

跟之前 V1 版本差不多,也是一部分操作替换成 lua 脚本。 数据存储是使用 Redis Hash ,数据操作也是使用 Hash 相关的命令进行操作。

1. 源码分析

package org.redisson;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.core.Predicate;
import org.redisson.core.RMap;

import io.netty.util.concurrent.Future;


public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {

    private final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);

    protected RedissonMap(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    // 返回Map的大小
    @Override
    public int size() {
        return get(sizeAsync());
    }

    @Override
    public Future<Integer> sizeAsync() {
        return commandExecutor.readAsync(getName(), RedisCommands.HLEN, getName());
    }

    // 检查Map是否为空
    @Override
    public boolean isEmpty() {
        // 检查size()是否为0
        return size() == 0;
    }

    // 检查Map中是否包含指定 key 
    @Override
    public boolean containsKey(Object key) {
        // HEXISTS命令实现
        return commandExecutor.read(getName(), RedisCommands.HEXISTS, getName(), key);
    }

    @Override
    public Future<Boolean> containsKeyAsync(Object key) {
        return commandExecutor.readAsync(getName(), RedisCommands.HEXISTS, getName(), key);
    }

    // 检查Map中是否包含指定的 value
    @Override
    public boolean containsValue(Object value) {
        return get(containsValueAsync(value));
    }

    @Override
    public Future<Boolean> containsValueAsync(Object value) {
        // 使用Lua脚本遍历哈希表的所有 key 进行检查
        return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                "local s = redis.call('hvals', KEYS[1]);" +
                        "for i = 0, table.getn(s), 1 do "
                            + "if ARGV[1] == s[i] then "
                                + "return true "
                            + "end "
                       + "end;" +
                     "return false",
                Collections.<Object>singletonList(getName()), value);
    }

    // 获取指定 key 集合对应的所有 value
    @Override
    public Map<K, V> getAll(Set<K> keys) {
        if (keys.size() == 0) {
            return Collections.emptyMap();
        }

        // HMGET命令实现
        List<Object> args = new ArrayList<Object>(keys.size() + 1);
        args.add(getName());
        args.addAll(keys);
        List<V> list = commandExecutor.read(getName(), RedisCommands.HMGET, args.toArray());

        // 解析读取回来的数据
        Map<K, V> result = new HashMap<K, V>(list.size());
        for (int index = 0; index < args.size()-1; index++) {
            V value = list.get(index);
            if (value == null) {
                continue;
            }
            result.put((K) args.get(index+1), value);
        }
        return result;
    }

    // 获取指定 key 的 value
    @Override
    public V get(Object key) {
        return get(getAsync((K)key));
    }

    @Override
    public V put(K key, V value) {
        return get(putAsync(key, value));
    }

    // 移除指定 key
    @Override
    public V remove(Object key) {
        return get(removeAsync((K)key));
    }

    @Override
    public void putAll(final Map<? extends K, ? extends V> map) {
        if (map.size() == 0) {
            return;
        }
        // hmset 放入所有元素
        commandExecutor.write(getName(), RedisCommands.HMSET, getName(), map);
    }

    // 清除数据
    @Override
    public void clear() {
        delete();
    }

    // 返回所有的 key 集合
    @Override
    public Set<K> keySet() {
        return get(keySetAsync());
    }

    @Override
    public Future<Set<K>> keySetAsync() {
        // HKEYS 命令实现
        return commandExecutor.readAsync(getName(), RedisCommands.HKEYS, getName());
    }

    @Override
    public Collection<V> values() {
        return get(valuesAsync());
    }

    // 获取所有的 value
    @Override
    public Future<Collection<V>> valuesAsync() {
        // HVALS 命令实现
        return commandExecutor.readAsync(getName(), RedisCommands.HVALS, getName());
    }

    // 获取所有的 key value
    @Override
    public Set<java.util.Map.Entry<K, V>> entrySet() {
        // HGETALL 命令实现
        Map<K, V> map = commandExecutor.read(getName(), RedisCommands.HGETALL, getName());
        return map.entrySet();
    }

    @Override
    public V putIfAbsent(K key, V value) {
        return get(putIfAbsentAsync(key, value));
    }

    // 快速设置指定 key 的 value
    @Override
    public Future<V> putIfAbsentAsync(K key, V value) {
        return commandExecutor.evalWriteAsync(getName(), EVAL_PUT,
                "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end",
                Collections.<Object>singletonList(getName()), key, value);
    }

    @Override
    public boolean remove(Object key, Object value) {
        return get(removeAsync(key, value)) == 1;
    }

    @Override
    public Future<Long> removeAsync(Object key, Object value) {
        return commandExecutor.evalWriteAsync(getName(),
                new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP),
                "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return 0 end",
            Collections.<Object>singletonList(getName()), key, value);
    }

    // 替换指定 key 的 value ,如果value 符合预期则更新为新 value
    @Override
    public boolean replace(K key, V oldValue, V newValue) {
        return get(replaceAsync(key, oldValue, newValue));
    }

    @Override
    public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
        return commandExecutor.evalWriteAsync(getName(),
                new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4,
                    Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)),
                "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return true; else return false; end",
                Collections.<Object>singletonList(getName()), key, oldValue, newValue);
    }

     // 替换指定 key 的 value 为新 value
    @Override
    public V replace(K key, V value) {
        return get(replaceAsync(key, value));
    }

    @Override
    public Future<V> replaceAsync(K key, V value) {
        return commandExecutor.evalWriteAsync(getName(),
                new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE),
                "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end",
            Collections.<Object>singletonList(getName()), key, value);
    }

    @Override
    public Future<V> getAsync(K key) {
        return commandExecutor.readAsync(getName(), RedisCommands.HGET, getName(), key);
    }

    @Override
    public Future<V> putAsync(K key, V value) {
        return commandExecutor.evalWriteAsync(getName(), EVAL_PUT,
                "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v",
                Collections.<Object>singletonList(getName()), key, value);
    }


    @Override
    public Future<V> removeAsync(K key) {
        return commandExecutor.evalWriteAsync(getName(),
                new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE),
                "local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v",
                Collections.<Object>singletonList(getName()), key);
    }

    @Override
    public Future<Boolean> fastPutAsync(K key, V value) {
        return commandExecutor.writeAsync(getName(), RedisCommands.HSET, getName(), key, value);
    }

    @Override
    public boolean fastPut(K key, V value) {
        return get(fastPutAsync(key, value));
    }

    // 快速移除一批元素
    @Override
    public Future<Long> fastRemoveAsync(K ... keys) {
        if (keys == null || keys.length == 0) {
            return commandExecutor.getConnectionManager().getGroup().next().newSucceededFuture(0L);
        }

        List<Object> args = new ArrayList<Object>(keys.length + 1);
        args.add(getName());
        args.addAll(Arrays.asList(keys));
        // HDEL 命令实现批量删除
        return commandExecutor.writeAsync(getName(), RedisCommands.HDEL, args.toArray());
    }

    @Override
    public long fastRemove(K ... keys) {
        return get(fastRemoveAsync(keys));
    }

    private MapScanResult<Object, V> scanIterator(long startPos) {
        return commandExecutor.read(getName(), RedisCommands.HSCAN, getName(), startPos);
    }

    private Iterator<Map.Entry<K, V>> iterator() {
        return new Iterator<Map.Entry<K, V>>() {

            private Iterator<Map.Entry<K, V>> iter;
            private long iterPos = 0;

            private boolean removeExecuted;
            private Map.Entry<K,V> value;

            @Override
            public boolean hasNext() {
                if (iter == null
                        || (!iter.hasNext() && iterPos != 0)) {
                    MapScanResult<Object, V> res = scanIterator(iterPos);
                    iter = ((Map<K, V>)res.getMap()).entrySet().iterator();
                    iterPos = res.getPos();
                }
                return iter.hasNext();
            }

            @Override
            public Map.Entry<K, V> next() {
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element at index");
                }

                value = iter.next();
                removeExecuted = false;
                return value;
            }

            @Override
            public void remove() {
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }

                // lazy init iterator
                hasNext();
                iter.remove();
                RedissonMap.this.fastRemove(value.getKey());
                removeExecuted = true;
            }

        };
    }

    // 返回符合条件的 key 对
    @Override
    public Map<K, V> filterKeys(Predicate<K> predicate) {
        Map<K, V> result = new HashMap<K, V>();
        for (Iterator<Map.Entry<K, V>> iterator = iterator(); iterator.hasNext();) {
            Map.Entry<K, V> entry = iterator.next();
            if (predicate.apply(entry.getKey())) {
                result.put(entry.getKey(), entry.getValue());
            }
        }
        return result;
    }


    // 返回符合条件的 value 对
    @Override
    public Map<K, V> filterValues(Predicate<V> predicate) {
        Map<K, V> result = new HashMap<K, V>();
        for (Iterator<Map.Entry<K, V>> iterator = iterator(); iterator.hasNext();) {
            Map.Entry<K, V> entry = iterator.next();
            if (predicate.apply(entry.getValue())) {
                result.put(entry.getKey(), entry.getValue());
            }
        }
        return result;
    }

    public Map<K, V> filterEntries(Predicate<Map.Entry<K, V>> predicate) {
        Map<K, V> result = new HashMap<K, V>();
        for (Iterator<Map.Entry<K, V>> iterator = iterator(); iterator.hasNext();) {
            Map.Entry<K, V> entry = iterator.next();
            if (predicate.apply(entry)) {
                result.put(entry.getKey(), entry.getValue());
            }
        }
        return result;
    }

    @Override
    public V addAndGet(K key, Number value) {
        return get(addAndGetAsync(key, value));
    }

    @Override
    public Future<V> addAndGetAsync(K key, Number value) {
        return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE,
                new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
                   getName(), key, new BigDecimal(value.toString()).toPlainString());
    }

    @Override
    public boolean equals(Object o) {
        if (o == this)
            return true;

        if (!(o instanceof Map))
            return false;
        Map<?,?> m = (Map<?,?>) o;
        if (m.size() != size())
            return false;

        try {
            Iterator<Entry<K,V>> i = entrySet().iterator();
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                K key = e.getKey();
                V value = e.getValue();
                if (value == null) {
                    if (!(m.get(key)==null && m.containsKey(key)))
                        return false;
                } else {
                    if (!value.equals(m.get(key)))
                        return false;
                }
            }
        } catch (ClassCastException unused) {
            return false;
        } catch (NullPointerException unused) {
            return false;
        }

        return true;
    }

    @Override
    public int hashCode() {
        int h = 0;
        Iterator<Entry<K,V>> i = entrySet().iterator();
        while (i.hasNext())
            h += i.next().hashCode();
        return h;
    }

}

2. 核心 lua 源码分析

2.1 判断是否存在某个值 containsValue

  • KEYS[1]: 哈希表的名称
  • ARGV[1]: 要检查的值
---  使用 hvals 命令获取哈希表的所有值
local s = redis.call('hvals', KEYS[1]);

---  遍历所有值,检查是否有值等于 ARGV[1]
for i = 0, table.getn(s), 1 do

    ---  如果找到匹配的值,返回true.  否则返回false
    if ARGV[1] == s[i] then
        return true
    end
end;
return false

2.2 指定的key不存在设置键值对 putIfAbsent

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
  • ARGV[2]:值
---  使用 hexists 命令检查键是否存在
if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then
    --- 不存在,使用 hset  命令设置键值对,并返回nil
    redis.call('hset', KEYS[1], ARGV[1], ARGV[2]);
    return nil
else
    ----- 存在就返回当前键的值
    return redis.call('hget', KEYS[1], ARGV[1])
end

2.3 移除符合预期值的元素 remove

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
  • ARGV[2]:值
---- 使用 hget 命令获取键的当前值
if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then
    ----- 等于就直接使用 hdel 命令删除键,并返回删除的数量
    return redis.call('hdel', KEYS[1], ARGV[1])
else
    --- 不等于,返回0
    return 0
end

2.4 替换符合预期值的元素 replace

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
  • ARGV[2]:旧值
  • ARGV[3]:新值
------   使用 hget 命令获取键的当前值
if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then
    ------ 当前值等于ARGV[2],使用hset命令设置新值,并返回true
    redis.call('hset', KEYS[1], ARGV[1], ARGV[3]);
    return true;
else
    ---- 不等于返回false
    return false;
end

2.5 替换指定键符合预期的值 replaceAsync

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
  • ARGV[2]:新值
------   使用 hget 命令获取键的当前值
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
    --- 存在,获取旧值并设置新值,返回旧值
    local v = redis.call('hget', KEYS[1], ARGV[1]);
    redis.call('hset', KEYS[1], ARGV[1], ARGV[2]);
    return v;
else
    --- 不存在返回nil
    return nil;
end

2.6 设置指定键的值并返回旧值 put

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
  • ARGV[2]:新值
----  使用 hget 命令获取键的当前值
local v = redis.call('hget', KEYS[1], ARGV[1]);

---  使用 hset 命令设置键的新值
redis.call('hset', KEYS[1], ARGV[1], ARGV[2]);

return v  --  返回旧值
  

2.7 移除指定键并返回值 remove

  • KEYS[1]:哈希表的名称
  • ARGV[1]:键
----  使用 hget 命令获取键的当前值
local v = redis.call('hget', KEYS[1], ARGV[1]);

redis.call('hdel', KEYS[1], ARGV[1]);   --- 删除这个键

return v  -- 返回旧的值