redis源码分析之有序集SortedSet

不改变集SortedSet算是redis中三个很有特色的数据结构,通过那篇文章来总结一下那块知识点。

原稿地址:

一、有序集SortedSet命令简要介绍

redis中的有序集,允许客户使用钦赐值对放进去的要素进行排序,何况依照该已排序的汇集提供了一名目非常多丰裕的操作集结的API。
举个例子如下:

//添加元素,table1为有序集的名字,100为用于排序字段(redis把它叫做score),a为我们要存储的元素
127.0.0.1:6379> zadd table1 100 a
(integer) 1
127.0.0.1:6379> zadd table1 200 b
(integer) 1
127.0.0.1:6379> zadd table1 300 c
(integer) 1
//按照元素索引返回有序集中的元素,索引从0开始
127.0.0.1:6379> zrange table1 0 1
1) "a"
2) "b"
//按照元素排序范围返回有序集中的元素,这里用于排序的字段在redis中叫做score
127.0.0.1:6379> zrangebyscore table1 150 400
1) "b"
2) "c"
//删除元素
127.0.0.1:6379> zrem table1 b
(integer) 1

在有序集中,用于排序的值叫做score,实际存款和储蓄的值叫做member。

是因为有序聚集提供的API很多,这里只举了多少个科学普及的,具体可以参照redis文书档案。

有关有序集,大家有一个不行大范围的选用景况就是客户评价。在应用程式大概网址上发表一条新闻,下边会有不菲评价,平日展现是遵照揭橥时间倒序排列,这些供给就能够选择有序集,以发表争辩的光阴戳作为score,然后根据展示争论的数量倒序查找有序集。

二、有序集SortedSet命令源码剖析

规矩,大家照旧从server.c文件中的命令表中找到相关命令的管理函数,然后逐条解析。
照例从添港元素开始,zaddCommand函数:

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

此间能够观望流程转向了zaddGenericCommand,况且传入了一个格局标识。
有关SortedSet的操作格局这里差十分的少表达一(Wissu)下,先来看一条完整的zadd命令:

zadd key [NX|XX] [CH] [INCR] score member [score member ...]

中间的可选项我们各类看下:

  1. NX表示只要成分存在,则不实施替换操作直接再次来到。
  2. XX表示只操作已存在的因素。
  3. CH代表回去修改(富含丰硕,更新)成分的多少,只好被ZADD命令使用。
  4. INCEscort代表在本来的score基础上足够新的score,实际不是替换。

下面代码片段中的ZADD_NONE表示通常操作。

接下去看下zaddGenericCommand函数的源码,非常短,耐心一丝丝看:

void zaddGenericCommand(client *c, int flags) {
    //一条错误提示信息
    static char *nanerr = "resulting score is not a number (NaN)";
    //有序集名字
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    //记录元素操作个数
    int added = 0;     
    int updated = 0;    
    int processed = 0;  

    //查找score的位置,默认score在位置2上,但由于有各种模式,所以需要判断
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        //判断命令中是否设置了各种模式
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    //设置模式
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    //通过上面的解析,scoreidx为真实的初始score的索引位置
    //这里客户端参数数量减去scoreidx就是剩余所有元素的数量
    elements = c->argc - scoreidx;
    //由于有序集中score,member成对出现,所以加一层判断
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    //这里计算score,member有多少对
    elements /= 2; 

    //参数合法性校验
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    //参数合法性校验
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    //这里开始解析score,先初始化scores数组
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        //填充数组,这里注意元素是成对出现,所以各个score之间要隔一个member
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    //这里首先在client对应的db中查找该key,即有序集
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        //没有指定有序集且模式为XX(只操作已存在的元素),直接返回
        if (xx) goto reply_to_client; 
        //根据元素数量选择不同的存储结构初始化有序集
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            //哈希表 + 跳表的组合模式
            zobj = createZsetObject();
        } else {
            //ziplist(压缩链表)模式
            zobj = createZsetZiplistObject();
        }
        //加入db中
        dbAdd(c->db,key,zobj);
    } else {
        //如果ZADD操作的集合类型不对,则返回
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    //这里开始往有序集中添加元素
    for (j = 0; j < elements; j++) {
        double newscore;
        //取出client传过来的score
        score = scores[j];
        int retflags = flags;
        //取出与之对应的member
        ele = c->argv[scoreidx+1+j*2]->ptr;
        //向有序集中添加元素,参数依次是有序集,要添加的元素的score,要添加的元素,操作模式,新的score
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        //添加失败则返回
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        //记录操作
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        //设置新score值
        score = newscore;
    }
    //操作记录
    server.dirty += (added+updated);

//返回逻辑
reply_to_client:
    if (incr) {
        if (processed)
            addReplyDouble(c,score);
        else
            addReply(c,shared.nullbulk);
    } else {
        addReplyLongLong(c,ch ? added+updated : added);
    }
//清理逻辑
cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

代码有一些长,来张图看一下仓库储存结构:
图片 1
注:每个entry都是由score+member组成

有了地点的布局图之后,能够想到删除操作应该便是基于差异的囤积结构实行,若是是ziplist就实行链表删除,纵然是哈希表+跳表结构,那就要把多个汇聚都开展删减。真实逻辑是什么啊?
作者们来看下删除函数zremCommand的源码,相对短一点:

void zremCommand(client *c) {
    //获取有序集名
    robj *key = c->argv[1];
    robj *zobj;
    int deleted = 0, keyremoved = 0, j;
    //做校验
    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;

    for (j = 2; j < c->argc; j++) {
        //一次删除指定元素
        if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
        //如果有序集中全部元素都被删除,则回收有序表
        if (zsetLength(zobj) == 0) {
            dbDelete(c->db,key);
            keyremoved = 1;
            break;
        }
    }
    //同步操作
    if (deleted) {
        notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
        if (keyremoved)
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        signalModifiedKey(c->db,key);
        server.dirty += deleted;
    }
    //返回
    addReplyLongLong(c,deleted);
}

看下具体的删减操作源码:

//参数zobj为有序集,ele为要删除的元素
int zsetDel(robj *zobj, sds ele) {
    //与添加元素相同,根据不同的存储结构执行不同的删除逻辑
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        //ziplist是一个简单的链表删除节点操作
        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
            zobj->ptr = zzlDelete(zobj->ptr,eptr);
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        dictEntry *de;
        double score;

        de = dictUnlink(zs->dict,ele);
        if (de != NULL) {
            //查询该元素的score
            score = *(double*)dictGetVal(de);
            //从哈希表中删除元素
            dictFreeUnlinkedEntry(zs->dict,de);

            //从跳表中删除元素
            int retval = zslDelete(zs->zsl,score,ele,NULL);
            serverAssert(retval);
            //如果有需要则对哈希表进行resize操作
            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    //没有找到指定元素返回0
    return 0;
}

末尾看叁个查询函数zrangeCommand源码,也是十分短,汗~~~,可是放心,有了上面的基本功,大约也能猜到查询逻辑应该是怎么体统的:

void zrangeCommand(client *c) {
    //第二个参数,0表示顺序,1表示倒序
    zrangeGenericCommand(c,0);
}

void zrangeGenericCommand(client *c, int reverse) {
    //有序集名
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    int llen;
    int rangelen;
    //参数校验
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

    //根据参数附加信息判断是否需要返回score
    if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
        withscores = 1;
    } else if (c->argc >= 5) {
        addReply(c,shared.syntaxerr);
        return;
    }
    //有序集校验
    if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
         || checkType(c,zobj,OBJ_ZSET)) return;

    //索引值重置
    llen = zsetLength(zobj);
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;
     //返回空集
    if (start > end || start >= llen) {
        addReply(c,shared.emptymultibulk);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    //返回给客户端结果长度
    addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
    //同样是根据有序集的不同结构执行不同的查询逻辑
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        //根据正序还是倒序计算起始索引
        if (reverse)
            eptr = ziplistIndex(zl,-2-(2*start));
        else
            eptr = ziplistIndex(zl,2*start);

        serverAssertWithInfo(c,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);

        while (rangelen--) {
            serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
            //注意嵌套的ziplistGet方法就是把eptr索引的值读出来保存在后面三个参数中
            serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            //返回value
            if (vstr == NULL)
                addReplyBulkLongLong(c,vlong);
            else
                addReplyBulkCBuffer(c,vstr,vlen);
            //如果需要则返回score
            if (withscores)
                addReplyDouble(c,zzlGetScore(sptr));
            //倒序从后往前,正序从前往后
            if (reverse)
                zzlPrev(zl,&eptr,&sptr);
            else
                zzlNext(zl,&eptr,&sptr);
        }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;

        //找到起始节点
        if (reverse) {
            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl,llen-start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl,start+1);
        }
         //遍历并返回给客户端
        while(rangelen--) {
            serverAssertWithInfo(c,zobj,ln != NULL);
            ele = ln->ele;
            addReplyBulkCBuffer(c,ele,sdslen(ele));
            if (withscores)
                addReplyDouble(c,ln->score);
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

下边正是有关有序集SortedSet的丰盛,删除,查找的源码。能够见见SortedSet会基于存放成分的数据选拔ziplist也许哈希表+跳表三种数据结构进行落到实处,之所以源码看上去非常短,主因也正是要基于差别的数据结构举行不相同的代码达成。只要精通了那几个宗旨情路,再看源码就不会太难。

三、有序集SortedSet命令计算

有序集的逻辑简单,就是代码有一点长,涉及到ziplist,skiplist,dict三套数据结构,在这之中除了常规的dict之外,别的七个数据结构内容都游人如织,筹算特别写小说进行总结,就不在这里赘述了。本文主要指标是计算一下江河行地集SortedSet的贯彻原理。

Post Author: admin

发表评论

电子邮件地址不会被公开。 必填项已用*标注