// 这个函数被封装为私有,它通过p指针指的位置,将长度为slen的字符串s插入到zl所指的压缩列表中,然后返回插入完成后的新的ziplist unsignedchar *__ziplistInsert(unsignedchar *zl, unsignedchar *p, unsignedchar *s, unsignedint slen) { // curlen为当前压缩链表的长度,reqlen将来用于保存节点的长度 size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen; unsignedint prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsignedchar encoding = 0; // 初始化一个值,用来避免出现警告 longlong value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail;
/* Find out prevlen for the entry that is inserted. */ // 获取前面的节点长度,以此为新节点设立prevlen if (p[0] != ZIP_END) { // 如果p不指向结尾,那么ziplist不为空,通过ZIP_DECODE_PREVLEN宏获取prevlen ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else { // p指向结尾,获取到结尾指针ptail unsignedchar *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { // 如果ptail不为空,说明ziplist不为空,ptail指向的是最后一个节点 // 此时要在结尾插入,ptail指向的节点为前置节点,通过它的长度设置prevlen prevlen = zipRawEntryLengthSafe(zl, curlen, ptail); } }
/* See if the entry can be encoded */ // 查看是否能将entry编码,尝试将string转换为整数 if (zipTryEncoding(s,slen,&value,&encoding)) { // 可以转换为整数,encoding已经被设置为最适合的编码类型,并且value被设置为转换后的值 /* 'encoding' is set to the appropriate integer encoding */ // zipIntSize函数通过encoding返回合适的长度 reqlen = zipIntSize(encoding); } else { // 不可以转换,此时encoding仍为0 /* 'encoding' is untouched, however zipStoreEntryEncoding will use the * string length to figure out how to encode it. */ // reqlen暂时设置为slen reqlen = slen; } /* We need space for both the length of the previous entry and * the length of the payload. */ // 首先根据前面节点获取prevlen到底为1还是5,将这个值加到reqlen上 reqlen += zipStorePrevEntryLength(NULL,prevlen); // 再加上当前节点本身的值所需的大小,在这个函数中同时处理了字符串的encoding reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
/* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry's length in * its prevlen field. */ // 当要插入的位置不是尾部时,就需要检查后面的节点的prevlen是否需要扩容 int forcelarge = 0; // 要插入的位置不是尾部时,通过zipPrevLenByteDiff函数,传入p指针和reqlen,进行如下计算: // 通过p指向的节点计算现在用来编码p的字节数:A,通过reqlen计算所需要的字节数:B // 用B-A得到结果,如果是负数,说明所需节点可以用更少的空间存储,如果为0则不需要变化,如果为正数说明所需节点需要更多空间 // 将这个结果赋值给nextdiff nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; if (nextdiff == -4 && reqlen < 4) { // 如果新节点长度小于4,把nextdiff设为0,标记forcelarge设置为1 nextdiff = 0; forcelarge = 1; }
/* Store offset because a realloc may change the address of zl. */ //存储偏移量,因为realloc可能会改变zl的地址 offset = p-zl; // newlen=ziplist原来的长度+新节点的长度+后面一个节点需要扩容的长度 newlen = curlen+reqlen+nextdiff; // 调整ziplist的大小 zl = ziplistResize(zl,newlen); // 根据偏移量,还原p的位置 p = zl+offset;
/* Apply memory move when necessary and update tail offset. */ // 如果插入的节点不在尾部,要对内存进行调整 if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ // 从p-nextdiff位置开始,整体后移,为新节点腾出位置 memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
/* Encode this entry's raw length in the next entry. */ // 为后面的节点设置encoding if (forcelarge) // forcelarge=1 zipStorePrevEntryLengthLarge(p+reqlen,reqlen); else // forcelarge=0 zipStorePrevEntryLength(p+reqlen,reqlen);
/* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn't have an effect on the *tail* offset. */ // 安全更新整个结构体的信息 assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1)); if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { // 如果要插入的节点后面不止一个节点,更新ziplist尾部偏移量的时候要加上nextdiff // 不然的话对于prevlen的更新对于表尾的节点没有效果 ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ // 如果要插入的节点就是尾节点,更新尾节点offset ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); }
/* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ // 当nextdiff不为0时,说明我们更新了后续节点的长度,因此还需要级联更新再后续的节点 if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; }
/* Write the entry */ // 把prevlen写入新节点 p += zipStorePrevEntryLength(p,prevlen); // 将更新encoding p += zipStoreEntryEncoding(p,encoding,slen); if (ZIP_IS_STR(encoding)) { // 如果是字符串 memcpy(p,s,slen); } else { // 如果是数字 zipSaveInteger(p,value,encoding); } // 更新ziplist的节点数量 ZIPLIST_INCR_LENGTH(zl,1); return zl; }
/* We use this function to receive information about a ziplist entry. * Note that this is not how the data is actually encoded, is just what we * get filled by a function in order to operate more easily. */ // 我们使用这个结构体,以便更好地操作ziplist节点,因此这个结构体并不是节点的实际编码形式 typedefstructzlentry { unsignedint prevrawlensize; /* 前一个节点使用的字节数*/ unsignedint prevrawlen; /* 前一个节点的长度*/ unsignedint lensize; /* 当前节点使用的字节数 */ unsignedint len; /* 当前节点的长度 */ unsignedint headersize; /* 当前节点header字节数,它等于prevrawlensize + lensize */ unsignedchar encoding; /* 节点的编码类型 */ unsignedchar *p; /* 指向当前节点的指针 */ } zlentry;
unsignedchar *__ziplistCascadeUpdate(unsignedchar *zl, unsignedchar *p) { zlentry cur; size_t prevlen, prevlensize, prevoffset; /* Informat of the last changed entry. */ size_t firstentrylen; /* Used to handle insert at head. */ size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl)); size_t extra = 0, cnt = 0, offset; size_t delta = 4; /* Extra bytes needed to update a entry's prevlen (5-1). */ unsignedchar *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));
/* Empty ziplist */ if (p[0] == ZIP_END) return zl;
zipEntry(p, &cur); /* no need for "safe" variant since the input pointer was validated by the function that returned it. */ firstentrylen = prevlen = cur.headersize + cur.len; prevlensize = zipStorePrevEntryLength(NULL, prevlen); prevoffset = p - zl; p += prevlen;
/* Iterate ziplist to find out how many extra bytes do we need to update it. */ // 迭代ziplist确定多少字节需要更新 while (p[0] != ZIP_END) { assert(zipEntrySafe(zl, curlen, p, &cur, 0));
/* Abort when "prevlen" has not changed. */ // 当后面的不需要更新时,结束循环 if (cur.prevrawlen == prevlen) break;
/* Abort when entry's "prevlensize" is big enough. */ if (cur.prevrawlensize >= prevlensize) { if (cur.prevrawlensize == prevlensize) { //cur节点长度等于prevlensize zipStorePrevEntryLength(p, prevlen); } else { // cur节点长度小于prevlensize,这种情况意味着缩容,但是这里并不缩容 // 因为可能会发生抖动现象 /* This would result in shrinking, which we want to avoid. * So, set "prevlen" in the available bytes. */ zipStorePrevEntryLengthLarge(p, prevlen); } break; }
/* cur.prevrawlen means cur is the former head entry. */ assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);
/* Update prev entry's info and advance the cursor. */ // 更新前一个节点的信息并移动指针 rawlen = cur.headersize + cur.len; prevlen = rawlen + delta; prevlensize = zipStorePrevEntryLength(NULL, prevlen); prevoffset = p - zl; p += rawlen; extra += delta; cnt++; }
/* Extra bytes is zero all update has been done(or no need to update). */ // 如果extra为0说明已经更新完成(或者不需要更新) if (extra == 0) return zl;
/* Update tail offset after loop. */ // 更新尾节点偏移 if (tail == zl + prevoffset) { /* When the the last entry we need to update is also the tail, update tail offset * unless this is the only entry that was updated (so the tail offset didn't change). */ // 如果我们需要更新的最后一个节点也是尾节点,就需要更新偏移 if (extra - delta != 0) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta); } } else { /* Update the tail offset in cases where the last entry we updated is not the tail. */ // 不是尾节点时,更新偏移 ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra); }
/* Now "p" points at the first unchanged byte in original ziplist, * move data after that to new ziplist. */ offset = p - zl; zl = ziplistResize(zl, curlen + extra); p = zl + offset; memmove(p + extra, p, curlen - offset - 1); p += extra;
/* Iterate all entries that need to be updated tail to head. */ // 从尾到头遍历所有需要被更新的节点 while (cnt) { zipEntry(zl + prevoffset, &cur); /* no need for "safe" variant since we already iterated on all these entries above. */ rawlen = cur.headersize + cur.len; /* Move entry to tail and reset prevlen. */ memmove(p - (rawlen - cur.prevrawlensize), zl + prevoffset + cur.prevrawlensize, rawlen - cur.prevrawlensize); p -= (rawlen + delta); if (cur.prevrawlen == 0) { /* "cur" is the previous head entry, update its prevlen with firstentrylen. */ zipStorePrevEntryLength(p, firstentrylen); } else { /* An entry's prevlen can only increment 4 bytes. */ zipStorePrevEntryLength(p, cur.prevrawlen+delta); } /* Foward to previous entry. */ prevoffset -= cur.prevrawlen; cnt--; } return zl; }
// 在64位机器上,它是一个40字节的结构体 typedefstructquicklist { quicklistNode *head; quicklistNode *tail; unsignedlong count; /* total count of all entries in all ziplists */ unsignedlong len; /* number of quicklistNodes */ int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsignedint compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsignedint bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist;
/* new_sz overestimates if 'sz' encodes to an integer type */ // encoding在数字类型上有一些提前量(原本可能只需要1字节,这里直接给5字节) // new_sz=原本的大小+头部大小+数据大小 unsignedint new_sz = node->sz + sz + ziplist_overhead; if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill))) // 按照填充因子检测大小是否满足要求 return1; /* when we return 1 above we know that the limit is a size limit (which is * safe, see comments next to optimization_level and SIZE_SAFETY_LIMIT) */ // 当上面返回1,说明ziplist的限制是根据占用内存大小进行限制的 // 返回0,则说明超限了,或者ziplist是根据元素个数限制的 // 此时也会检查内存是否超出SIZE_SAFETY_LIMIT(默认为8192) elseif (!sizeMeetsSafetyLimit(new_sz)) // 内存超出默认限制 return0; elseif ((int)node->count < fill) // 内存没有超出默认的限制,那么按照个数限制进行检查 // 此时节点中的数据个数小于填充因子的限制,返回1 return1; else // 否则返回0 return0; }
这里要介绍填充因子fill了,这个字段用于限制ziplist的内存大小,如果fill为负数:
设置为-1,代表每个节点的ziplist大小不能超过4k字节
设置为-2,代表每个节点的ziplist大小不能超过8k字节
设置为-3,代表每个节点的ziplist大小不能超过16k字节
设置为-4,代表每个节点的ziplist大小不能超过32k字节
设置为-5,代表每个节点的ziplist大小不能超过64k字节
在源码中是这样定义的:
1 2 3 4
/* Optimization levels for size-based filling. * Note that the largest possible limit is 16k, so even if each record takes * just one byte, it still won't overflow the 16 bit count field. */ staticconstsize_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
// 在现有的entry前/后插入一个新的entry // 如果after为1,则插入到后面,如果为0插入到前面 REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry, void *value, constsize_t sz, int after) { //full用来标记插入后是否会超出填充因子fill的限制 //at_tail用于标记要插入的位置是否在对应的ziplist的末尾 //at_head用于标记要插入的位置是否在对应的ziplist的开头 //full_next用于标记下一个node中的ziplist能否插入 //full_next用于标记上一个node中的ziplist能否插入 int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0; // 获得填充因子 int fill = quicklist->fill; // 获得节点 quicklistNode *node = entry->node; quicklistNode *new_node = NULL; assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
if (!node) { // 如果node为空,新建node,把元素插入到该node中并更新计数 /* we have no reference node, so let's create only node in the list */ D("No node given!"); new_node = quicklistCreateNode(); new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); __quicklistInsertNode(quicklist, NULL, new_node, after); new_node->count++; quicklist->count++; return; }
/* Populate accounting flags for easier boolean checks later */ if (!_quicklistNodeAllowInsert(node, fill, sz)) { D("Current node is full with count %d with requested fill %lu", node->count, fill); // 检查是否会超出限制,如果超出就将full标志置1,方便后续判断 full = 1; }
if (after && (entry->offset == node->count)) { // after为1(在后面插入),并且要插入到当前节点的尾部 D("At Tail of current ziplist"); // 将at_tail置1 at_tail = 1; // 检查下一个节点能否插入 if (!_quicklistNodeAllowInsert(node->next, fill, sz)) { // 如果不能插入,说明下一个节点也满了 D("Next node is full too."); // 将full_next置1 full_next = 1; } }
if (!after && (entry->offset == 0)) { // 如果after为0(在前面插入),并且插入到当前节点的头部 D("At Head"); at_head = 1; // 检查上一个节点 if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) { D("Prev node is full too."); // 如果上一节点不能插入,将full_prev置1 full_prev = 1; } }
/* Now determine where and how to insert the new element */ // 现在标志位已经更新,开始判断到底应该如何插入新元素 if (!full && after) { // 如果当前节点可以插入,并且after为1,则直接插入到后面 D("Not full, inserting after current position."); // 可能需要解压node quicklistDecompressNodeForUse(node); // 获得ziplist下一个元素(如果没有下一个元素就返回NULL) unsignedchar *next = ziplistNext(node->zl, entry->zi); if (next == NULL) { // 如果没有下一个元素,就直接将新元素插入到ziplist尾部,底层调用的是__ziplistInsert(2.2节介绍过) node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL); } else { // 否则将next整体后移,把新元素插入到next所指位置,底层调用的是__ziplistInsert(2.2节介绍过) node->zl = ziplistInsert(node->zl, next, value, sz); } // 更新相关信息 node->count++; quicklistNodeUpdateSz(node); // 如果之前经过解压,这里重新压缩 quicklistRecompressOnly(quicklist, node); } elseif (!full && !after) { // 如果当前节点可以插入,并且after为0,则直接插入到前面,逻辑和上面相同 D("Not full, inserting before current position."); quicklistDecompressNodeForUse(node); node->zl = ziplistInsert(node->zl, entry->zi, value, sz); node->count++; quicklistNodeUpdateSz(node); quicklistRecompressOnly(quicklist, node); } elseif (full && at_tail && node->next && !full_next && after) { // 如果当前节点不可以插入、并且要插入到当前节点的尾部、下一个节点存在、下一个节点可以插入 // 那么就把元素插入到下一个节点的最前面 /* If we are: at tail, next has free space, and inserting after: * - insert entry at head of next node. */ D("Full and tail, but next isn't full; inserting next node head"); new_node = node->next; quicklistDecompressNodeForUse(new_node); new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD); new_node->count++; quicklistNodeUpdateSz(new_node); quicklistRecompressOnly(quicklist, new_node); } elseif (full && at_head && node->prev && !full_prev && !after) { // 如果当前节点不可以插入、并且要插入到当前节点的头部、上一个节点存在、上一个节点可以插入 // 那么就把元素插入到上一个节点的最后面 /* If we are: at head, previous has free space, and inserting before: * - insert entry at tail of previous node. */ D("Full and head, but prev isn't full, inserting prev node tail"); new_node = node->prev; quicklistDecompressNodeForUse(new_node); new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL); new_node->count++; quicklistNodeUpdateSz(new_node); quicklistRecompressOnly(quicklist, new_node); } elseif (full && ((at_tail && node->next && full_next && after) || (at_head && node->prev && full_prev && !after))) { // 如果当前节点不可以插入、要插入到当前节点的头部/尾部,上一个节点或下一个节点存在,但是都不可以插入 // 则创建新节点,把元素插入到新节点中 /* If we are: full, and our prev/next is full, then: * - create new node and attach to quicklist */ D("\tprovisioning new node..."); new_node = quicklistCreateNode(); new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); new_node->count++; quicklistNodeUpdateSz(new_node); // 根据after,把新节点插入到现节点的前/后面 __quicklistInsertNode(quicklist, node, new_node, after); } elseif (full) { // 否则,要插入的位置不是头部也不是尾部,而是ziplist的中间位置 // 此时需要拆分节点 /* else, node is full we need to split it. */ /* covers both after and !after cases */ D("\tsplitting node..."); quicklistDecompressNodeForUse(node); // 根据after把节点拆分 new_node = _quicklistSplitNode(node, entry->offset, after); // 把元素插入到新节点,根据after插入到头/尾部 new_node->zl = ziplistPush(new_node->zl, value, sz, after ? ZIPLIST_HEAD : ZIPLIST_TAIL); // 更新相关信息 new_node->count++; quicklistNodeUpdateSz(new_node); __quicklistInsertNode(quicklist, node, new_node, after); // 最后,尝试合并节点 _quicklistMergeNodes(quicklist, node); }
/* Copy original ziplist so we can split it */ memcpy(new_node->zl, node->zl, zl_sz);
/* Ranges to be trimmed: -1 here means "continue deleting until the list ends" */ // 划分删除范围 int orig_start = after ? offset + 1 : 0; int orig_extent = after ? -1 : offset; int new_start = after ? 0 : offset; int new_extent = after ? offset + 1 : -1;
if (center->next) { // center后面后节点 next = center->next; if (center->next->next) // center后面的后面有节点 next_next = center->next->next; }
/* Try to merge prev_prev and prev */ // 尝试将center前节点与前前节点合并,检查合并后是否超过填充因子的限制 // _quicklistNodeAllowMerge函数的整个逻辑与前面介绍的插入检查_quicklistNodeAllowInsert函数基本相同 if (_quicklistNodeAllowMerge(prev, prev_prev, fill)) { // 合并 _quicklistZiplistMerge(quicklist, prev_prev, prev); prev_prev = prev = NULL; /* they could have moved, invalidate them. */ }
/* Try to merge next and next_next */ // 尝试将center后节点与后后节点合并,检查合并后是否超过填充因子的限制 if (_quicklistNodeAllowMerge(next, next_next, fill)) { _quicklistZiplistMerge(quicklist, next, next_next); next = next_next = NULL; /* they could have moved, invalidate them. */ }
/* Try to merge center node and previous node */ // 尝试将center节点与前节点合并,检查合并后是否超过填充因子的限制 if (_quicklistNodeAllowMerge(center, center->prev, fill)) { target = _quicklistZiplistMerge(quicklist, center->prev, center); center = NULL; /* center could have been deleted, invalidate it. */ } else { /* else, we didn't merge here, but target needs to be valid below. */ target = center; }
/* Use result of center merge (or original) to merge with next node. */ // 尝试将center节点与后节点合并,检查合并后是否超过填充因子的限制 if (_quicklistNodeAllowMerge(target, target->next, fill)) { _quicklistZiplistMerge(quicklist, target, target->next); } }
/* Get the index of the new element, or -1 if * the element already exists. */ // 首先使用dictHashKey获取hash,然后调用_dictKeyIndex获取这个key对应的索引 if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) // 如果这个key已经存在,index=-1,返回NULL returnNULL;
/* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ // 如果未重复,为entry新分配内存,以及更新计数等 // 如果处于rehash状态,则新的键值会被插入到第二个哈希表中,否则插入到第一个哈希表中 ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++;
/* Set the hash entry fields. */ // 设置key,但不设置value dictSetKey(d, entry, key); return entry; }
/* Try to add the element. If the key * does not exists dictAdd will succeed. */ // 调用dictAddRaw尝试获得entry entry = dictAddRaw(d,key,&existing); if (entry) { // entry不为NULL说明是新元素,设置值并返回1 dictSetVal(d, entry, val); return1; }
/* Set the new value and free the old one. Note that it is important * to do that in this order, as the value may just be exactly the same * as the previous one. In this context, think to reference counting, * you want to increment (set), and then decrement (free), and not the * reverse. */ // 否则说明key已经存在,根据key更新该值,并释放掉旧值 // 顺序很重要,由于新的key-value可能与旧的相同,在这种情况下考虑引用计数 // 必须先增加,然后再释放掉旧的 auxentry = *existing; dictSetVal(d, existing, val); dictFreeVal(d, &auxentry); return0; }
/* Remove an element, returning DICT_OK on success or DICT_ERR if the * element was not found. */ intdictDelete(dict *ht, constvoid *key){ return dictGenericDelete(ht,key,0) ? DICT_OK : DICT_ERR; }
/* Expand the hash table if needed */ staticint _dictExpandIfNeeded(dict *d) { /* Incremental rehashing already in progress. Return. */ // 如果rehash正在进行,返回0 if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */ // 如果哈希表为空,将其扩容到初始大小(DICT_HT_INITIAL_SIZE默认为4) if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ // 首先明确一个概念叫做负载因子:负载因子 = 哈希表已保存节点数量 / 哈希表大小 // 如果满足: // - 哈希表当前元素数量超过表大小size,并且: // dict_can_resize参数为1(默认为1)或者 负载因子大于5(当前元素数量是哈希表大小的5倍以上) // 则执行扩容 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) && dictTypeExpandAllowed(d)) { return dictExpand(d, d->ht[0].used + 1); } return DICT_OK; }
/* This function is called once a background process of some kind terminates, * as we want to avoid resizing the hash tables when there is a child in order * to play well with copy-on-write (otherwise when a resize happens lots of * memory pages are copied). The goal of this function is to update the ability * for dict.c to resize the hash tables accordingly to the fact we have an * active fork child running. */ voidupdateDictResizePolicy(void){ if (!hasActiveChildProcess()) dictEnableResize(); else dictDisableResize(); }
// 扩大或者创建哈希表,当第三个参数malloc_failed不为NULL时,会避免返回错误 // 如果成功返回0,否则返回1 int _dictExpand(dict *d, unsignedlong size, int* malloc_failed) { if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of * elements already inside the hash table */ // 如果正在进行rehash,或者传来的size小于元素数量,返回错误 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;
/* Rehashing to the same table size is not useful. */ // 新大小不能与原来的大小相同 if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */ // 分配内存,并把所有指针设置为NULL n.size = realsize; n.sizemask = realsize-1; if (malloc_failed) { n.table = ztrycalloc(realsize*sizeof(dictEntry*)); *malloc_failed = n.table == NULL; if (*malloc_failed) return DICT_ERR; } else n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ // 如果是第一次初始化,这并不是真正的rehash if (d->ht[0].table == NULL) { // 把新表赋值给ht[0]即可 d->ht[0] = n; return DICT_OK; }
/* Prepare a second hash table for incremental rehashing */ // 把新表赋值给ht[1] d->ht[1] = n; // rehashidx设置为0,表示从现在开始进行rehash d->rehashidx = 0; return DICT_OK; }
具体要扩容多大,由其中的_dictNextPower决定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Our hash table capability is a power of two */ staticunsignedlong _dictNextPower(unsignedlong size) { unsignedlong i = DICT_HT_INITIAL_SIZE; // 如果大小已经超过了最大值,返回LONG_MAX+1 if (size >= LONG_MAX) return LONG_MAX + 1LU; // 否则以2的指数增长,直到其首次比size大时返回 while(1) { if (i >= size) return i; i *= 2; } // 这也解释了为什么在执行完该函数为什么会判断溢出: // 因为有可能一开始size没有超过LONG_MAX,但经过指数增长后,超过LONG_MAX }
// 它接收两个参数,对字典d进行n次rehash操作 intdictRehash(dict *d, int n){ // 10*n个空桶访问次数 int empty_visits = n*10; /* Max number of empty buckets to visit. */ // 如果rehashidx仍小于0,说明没有进行rehash if (!dictIsRehashing(d)) return0; // 循环n次,且每次循环时ht[0]中必须还有键值对 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ // rehashidx不能超过size,否则会溢出 assert(d->ht[0].size > (unsignedlong)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) { // 尝试访问rehashidx位置的桶,如果为空则访问下一个 d->rehashidx++; // 并将空桶访问次数减一,如果累计访问了10*n个空桶,则直接返回1 // 这是为了防止时间过长导致阻塞 if (--empty_visits == 0) return1; } // 找到不为NULL的桶,de即dictEntry de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ // 将这个桶中所有的键值对(由于拉链法所以可能不止一个键值对)迁移到ht[1] while(de) { uint64_t h;
nextde = de->next; /* Get the index in the new hash table */ // 重新计算在ht[1]中的hash h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 此时ht[1]中可能有键值,所以要进行头插 de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; // 更新计数 d->ht[0].used--; d->ht[1].used++; de = nextde; } // 迁移后把ht[0]对应位置设置为NULL d->ht[0].table[d->rehashidx] = NULL; // 更新索引 d->rehashidx++; }
/* Check if we already rehashed the whole table... */ // 检查是否已经把ht[0]全部迁移到ht[1] if (d->ht[0].used == 0) { // 把ht[0]释放 zfree(d->ht[0].table); // 把ht[1]作为ht[0] d->ht[0] = d->ht[1]; // 重置ht[1] _dictReset(&d->ht[1]); // 将rehashidx设置为-1,表示rehash结束 d->rehashidx = -1; return0; }
/* More to rehash... */ // 否则,说明还未迁移完毕,返回1 return1; }
voiddatabasesCron(void){ .... /* Rehash */ if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { int work_done = incrementallyRehash(rehash_db); .... } } } }
在incrementallyRehash中:
1 2 3 4 5 6 7 8 9 10 11 12 13
intincrementallyRehash(int dbid){ /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return1; /* already used our millisecond for this loop... */ } return0; }
最终调用dictRehashMilliseconds:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger * than 0, and is smaller than 1 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/ intdictRehashMilliseconds(dict *d, int ms){ if (d->pauserehash > 0) return0;
/* Insert an integer in the intset */ intset *intsetAdd(intset *is, int64_t value, uint8_t *success){ // 检查要插入的value,看看使用哪种encoding类型可以存的下 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; // 如果传入success,先设为1 if (success) *success = 1;
/* Upgrade encoding if necessary. If we need to upgrade, we know that * this value should be either appended (if > 0) or prepended (if < 0), * because it lies outside the range of existing values. */ // 如果需要的话,进行升级操作。如果进行升级,需要知道这个value要被插入在前面还是后面 if (valenc > intrev32ifbe(is->encoding)) { /* This always succeeds, so we don't need to curry *success. */ // 使用intsetUpgradeAndAdd进行升级并插入 return intsetUpgradeAndAdd(is,value); } else { /* Abort if the value is already present in the set. * This call will populate "pos" with the right position to insert * the value when it cannot be found. */ // 否则说明不需要升级,检查value是否已经存在 if (intsetSearch(is,value,&pos)) { // 如果已经存在,则直接返回 if (success) *success = 0; return is; } // 到这里说明value不在集合中,分配内存空间 is = intsetResize(is,intrev32ifbe(is->length)+1); // 如果要插入的位置不是末尾,则需要将pos后面的所有元素整体后移,为value腾出位置 if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); } // 设置值 _intsetSet(is,pos,value); // 修改计数 is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; }
// 根据value在is中寻找,如果is中有指定值,则返回1.并把位置赋值给pos // 如果没有找到,返回0,并把pos设置为value可以被插入的位置 staticuint8_tintsetSearch(intset *is, int64_t value, uint32_t *pos){ int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1;
/* The value can never be found when the set is empty */ if (intrev32ifbe(is->length) == 0) { // 如果is为空,返回0 if (pos) *pos = 0; return0; } else { /* Check for the case where we know we cannot find the value, * but do know the insert position. */ // is不为空,检查其他失败的情况 if (value > _intsetGet(is,max)) { // _intsetGet用于获得is中指定位置的值 // 由于is从小到大有序,所以如果value比is中最大的元素还大,则肯定找不到 // 此时直接把pos设置为is的末尾即可 if (pos) *pos = intrev32ifbe(is->length); return0; } elseif (value < _intsetGet(is,0)) { // 同理,如果value比is中的最小值还要小,则直接把pos设置为开头 if (pos) *pos = 0; return0; } } // 否则,进行二分查找 O(logn) while(max >= min) { // 使用移位操作来代替除以2,佩服作者的优化功力 mid = ((unsignedint)min + (unsignedint)max) >> 1; cur = _intsetGet(is,mid); if (value > cur) { min = mid+1; } elseif (value < cur) { max = mid-1; } else { break; } }
if (value == cur) { // 如果集合中已有指定值,则返回1,并把pos位置设置为当前mid if (pos) *pos = mid; return1; } else { // 没有指定值,则返回0,并把pos设置为当前min if (pos) *pos = min; return0; } }
/* First set new encoding and resize */ // 设置新的encoding is->encoding = intrev32ifbe(newenc); // 调整is的大小 is = intsetResize(is,intrev32ifbe(is->length)+1);
/* Upgrade back-to-front so we don't overwrite values. * Note that the "prepend" variable is used to make sure we have an empty * space at either the beginning or the end of the intset. */ // 从后向前进行元素的重新赋值,所以并不会覆盖掉原先的值 // prepend用于保证is的前/后有空余的空间存储新value while(length--) _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
/* Set the value at the beginning or the end. */ // 将新value插入到最前面或最后面 if (prepend) _intsetSet(is,0,value); else _intsetSet(is,intrev32ifbe(is->length),value); // 更新计数 is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; }
为了直观地理解这个过程,举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
假设原来有 curenc 编码的三个元素,它们在数组中排列如下: | x | y | z | 假设分配内存后,整个内存空间如下(? 表示未使用的内存): | x | y | z | ? | ? | ? | 这时程序从数组后端开始,重新插入元素: | x | y | z | ? | z | ? | | x | y | y | z | ? | | x | y | z | ? | 最后,程序可以将新元素添加到最后 ? 号标示的位置中: | x | y | z | new | 上面演示的是新元素比原来的所有元素都大的情况,也即是 prepend == 0 当新元素比原来的所有元素都小时(prepend == 1),调整的过程如下: | x | y | z | ? | ? | ? | | x | y | z | ? | ? | z | | x | y | z | ? | y | z | | x | y | x | y | z | 当添加新值时,原本的 | x | y | 的数据将被新值代替 | new | x | y | z |
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele){ // 创建一个指向node节点的指针数组update,以及x zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsignedint rank[ZSKIPLIST_MAXLEVEL]; int i, level;
serverAssert(!isnan(score)); // x初始指向跳表的头节点 x = zsl->header; // 从最高层开始,逐步向下 for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ // 每次循环,rank[zsl->level-1]的位置总跨度初始为0,其余位置初始为上一个位置rank[i+1]的跨度 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; // 判断:当前层级前向指针不为NULL,并且: // 当前层级前向节点的分数小于当前分数,或者当前层级前向节点的分数等于当前分数时,前向节点的元素比当前元素小(通过sdscmp比较) while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { // 总结:前向节点存在,且前向节点比当前节点小,会进入循环 // 此时会用span值累加rank[i] rank[i] += x->level[i].span; // 向后找 x = x->level[i].forward; } // 并更新update[i],updated[i]中保存的是新节点每层level的前驱 update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ // 我们认为插入的元素不存在。 // 由于允许重复的scores,调用该函数之前就应该确认这个元素到底在不在里面,所以重新插入相同的元素永远不会发生
// 新插入节点的level随机 level = zslRandomLevel(); // 更新level if (level > zsl->level) { // 如果这个随机层数level > 跳表的最高高度,就将跳表的层数增加到level for (i = zsl->level; i < level; i++) { // 将多出来的层级对应的rank设置为0 rank[i] = 0; // update节点设置为header update[i] = zsl->header; update[i]->level[i].span = zsl->length; } // 更新跳表的层高 zsl->level = level; } // 新建节点, x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { // 新节点的每层level进行头插 x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */ // 新节点的span等于其前驱的span-(rank[0]-rank[i]) x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); // 新节点前驱的span等于(rank[0] - rank[i]) + 1 update[i]->level[i].span = (rank[0] - rank[i]) + 1; }
/* increment span for untouched levels */ // 如果level本身小于跳表的level,还要更新高于level的部分 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } // 设置新节点的前向指针 x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) // 如果x有前驱,就将前驱的前向指针指向自己 x->level[0].forward->backward = x; else // 否则说明x为最后一个节点,将跳表的tail指向x zsl->tail = x; // 节点长度+1 zsl->length++; return x; }
intzslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node){ // 删除节点只需要update,也就是所有的前驱level zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; // x初始指向跳表的头节点 x = zsl->header; // 查找的过程和插入节点类似 for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ // 由于可能有多个元素拥有相同的得分,所以还要保证找到分数和对象都正确的节点 // x = x->level[0].forward是可能要删除的节点 x = x->level[0].forward; // 保证删除正确的节点 if (x && score == x->score && sdscmp(x->ele,ele) == 0) { // 删除该节点 zslDeleteNode(zsl, x, update); if (!node) // 如果没有传入node参数,直接释放节点 zslFreeNode(x); else // 否则,将其赋值给node,以供调用者使用 *node = x; // 找到节点返回1 return1; } // 没有找到节点 返回0 return0; /* not found */ }
/* We need to seek to element to update to start: this is useful anyway, * we'll have to update or remove it. */ x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < curscore || (x->level[i].forward->score == curscore && sdscmp(x->level[i].forward->ele,ele) < 0))) { x = x->level[i].forward; } update[i] = x; }
/* Jump to our element: note that this function assumes that the * element with the matching score exists. */ x = x->level[0].forward; serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
/* If the node, after the score update, would be still exactly * at the same position, we can just update the score without * actually removing and re-inserting the element in the skiplist. */ if ((x->backward == NULL || x->backward->score < newscore) && (x->level[0].forward == NULL || x->level[0].forward->score > newscore)) { x->score = newscore; return x; }
/* No way to reuse the old node: we need to remove and insert a new * one at a different place. */ zslDeleteNode(zsl, x, update); zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele); /* We reused the old node x->ele SDS string, free the node now * since zslInsert created a new one. */ x->ele = NULL; zslFreeNode(x); return newnode; }
typedefstructredisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;
/* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ // lru相关 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; } else { o->lru = LRU_CLOCK(); } return o; }
robj *tryObjectEncoding(robj *o){ // 传入一个redis字符串对象,为其选择合适的编码以节省空间 long value; sds s = o->ptr; size_t len;
/* Make sure this is a string object, the only type we encode * in this function. Other types use encoded memory efficient * representations but are handled by the commands implementing * the type. */ // 确保这是一个字符串对象 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
/* We try some specialized encoding only for objects that are * RAW or EMBSTR encoded, in other words objects that are still * in represented by an actually array of chars. */ // 如果对象o的编码不是OBJ_ENCODING_RAW或者OBJ_ENCODING_EMBSTR,直接返回 if (!sdsEncodedObject(o)) return o;
/* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ // 如果引用计数大于1,返回;对于引用计数大于1的对象进行编码是不安全的 if (o->refcount > 1) return o;
/* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not * representable as a 32 nor 64 bit integer. */ // 检查是否能将这个字符串表示为一个长整型 // 获取字符串长度 len = sdslen(s); // 如果长度小于20,并且可以from string to long(由字符串转长整型) if (len <= 20 && string2l(s,len,&value)) { /* This object is encodable as a long. Try to use a shared object. * Note that we avoid using shared integers when maxmemory is used * because every object needs to have a private LRU field for the LRU * algorithm to work well. */ // 尝试用共享类型,共享类型指的是启动时就创建好的整数 // 但是,当开启了maxmemory时不使用这种方式,因为对象的lru字段是私有的 if ((server.maxmemory == 0 || !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) && value >= 0 && value < OBJ_SHARED_INTEGERS) { // OBJ_SHARED_INTEGERS,默认为10000,即小于10000的数 // 引用计数减一 decrRefCount(o); // 将共享整数的引用加一并返回 incrRefCount(shared.integers[value]); return shared.integers[value]; } else { // 否则判断encoding是否为OBJ_ENCODING_RAW if (o->encoding == OBJ_ENCODING_RAW) { sdsfree(o->ptr); // 将encoding改为OBJ_ENCODING_INT o->encoding = OBJ_ENCODING_INT; // 直接使用指针存储 o->ptr = (void*) value; return o; } elseif (o->encoding == OBJ_ENCODING_EMBSTR) { // 判断encoding为OBJ_ENCODING_EMBSTR decrRefCount(o); // 调用createStringObjectFromLongLongForValue return createStringObjectFromLongLongForValue(value); } } }
/* If the string is small and is still RAW encoded, * try the EMBSTR encoding which is more efficient. * In this representation the object and the SDS string are allocated * in the same chunk of memory to save space and cache misses. */ // 如果进行到这里仍然是RAW编码,则尝试将其编码为EMBSTR编码 // 在这种编码格式下,对象和SDS字符串被分配在同一块内存中(连续),以节省空间、增加缓存命中 if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { robj *emb;
/* We can't encode the object... * * Do the last try, and at least optimize the SDS string inside * the string object to require little space, in case there * is more than 10% of free space at the end of the SDS string. * * We do that only for relatively large strings as this branch * is only entered if the length of the string is greater than * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */ // 进行到这里说明该对象无法编码,进行最后的尝试 // 对sds对象进行空间优化,保证其末尾的空闲空间不超过10% trimStringObjectIfNeeded(o);