在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ AOF 持久化策略
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開(kāi)始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

AOF 持久化策略

簡(jiǎn)介

AOF 持久化和 RDB 持久化的最主要區(qū)別在于,前者記錄了數(shù)據(jù)的變更,而后者是保存了數(shù)據(jù)本身。本篇主要講的是AOF 持久化,了解 AOF 的數(shù)據(jù)組織方式和運(yùn)作機(jī)制。Redis 主要在 aof.c 中實(shí)現(xiàn) AOF 的操作。

同樣,AOF 持久化也會(huì)涉及文件的讀寫(xiě),會(huì)用到數(shù)據(jù)結(jié)構(gòu) rio。關(guān)于 rio 已經(jīng)在上一個(gè)篇章已經(jīng)講述,在此不做展開(kāi)。

AOF 數(shù)據(jù)組織方式

假設(shè) redis 內(nèi)存有「name:Jhon」的鍵值對(duì),那么進(jìn)行 AOF 持久化后,AOF 文件有如下內(nèi)容:

*2     # 2 個(gè)參數(shù)
$6     # 第一個(gè)參數(shù)長(zhǎng)度為6
SELECT # 第一個(gè)參數(shù)
$1     # 第二參數(shù)長(zhǎng)度為1
8      # 第二參數(shù)
*3     # 3 個(gè)參數(shù)
$3     # 第一個(gè)參數(shù)長(zhǎng)度為4
SET    # 第一個(gè)參數(shù)
$4     # 第二參數(shù)長(zhǎng)度為4
name   # 第二個(gè)參數(shù)
$4     # 第三個(gè)參數(shù)長(zhǎng)度為4
Jhon   # 第二參數(shù)長(zhǎng)度為4

所以對(duì)上面的內(nèi)容進(jìn)行恢復(fù),能得到熟悉的一條 Redis 命令:SELECT 8;SET name Jhon. 可以想象的是,Redis 遍歷內(nèi)存數(shù)據(jù)集中的每個(gè) key-value 對(duì),依次寫(xiě)入磁盤(pán)中;Redis 啟動(dòng)的時(shí)候,從 AOF 文件中讀取數(shù)據(jù),恢復(fù)數(shù)據(jù)。

AOF 持久化運(yùn)作機(jī)制

和 redis RDB 持久化運(yùn)作機(jī)制不同,redis AOF 有后臺(tái)執(zhí)行和邊服務(wù)邊備份兩種方式。

http://wiki.jikexueyuan.com/project/redis/images/redis18.png" alt="" />

1)AOF 后臺(tái)執(zhí)行的方式和 RDB 有類(lèi)似的地方,fork 一個(gè)子進(jìn)程,主進(jìn)程仍進(jìn)行服務(wù),子進(jìn)程執(zhí)行AOF 持久化,數(shù)據(jù)被dump 到磁盤(pán)上。與 RDB 不同的是,后臺(tái)子進(jìn)程持久化過(guò)程中,主進(jìn)程會(huì)記錄期間的所有數(shù)據(jù)變更(主進(jìn)程還在服務(wù)),并存儲(chǔ)在 server.aof_rewrite_buf_blocks 中;后臺(tái)子進(jìn)程結(jié)束后,Redis 更新緩存追加到 AOF 文件中,是 RDB 持久化所不具備的。

來(lái)說(shuō)說(shuō)更新緩存這個(gè)東西。Redis 服務(wù)器產(chǎn)生數(shù)據(jù)變更的時(shí)候,譬如 set name Jhon,不僅僅會(huì)修改內(nèi)存數(shù)據(jù)集,也會(huì)記錄此更新(修改)操作,記錄的方式就是上面所說(shuō)的數(shù)據(jù)組織方式。

更新緩存可以存儲(chǔ)在 server.aofbuf 中,你可以把它理解為一個(gè)小型臨時(shí)中轉(zhuǎn)站,所有累積的更新緩存都會(huì)先放入這里,它會(huì)在特定時(shí)機(jī)寫(xiě)入文件或者插入到server.aof-rewrite_buf_blocks 下鏈表(下面會(huì)詳述);server.aofbuf 中的數(shù)據(jù)在 propagrate() 添加,在涉及數(shù)據(jù)更新的地方都會(huì)調(diào)用propagrate() 以累積變更。更新緩存也可以存儲(chǔ)在 server.aof-rewrite_buf_blocks,這是一個(gè)元素類(lèi)型為 struct aofrwblock 的鏈表,你可以把它理解為一個(gè)倉(cāng)庫(kù),當(dāng)后臺(tái)有AOF 子進(jìn)程的時(shí)候,會(huì)將累積的更新緩存(在 server.aof_buf 中)插入到鏈表中,而當(dāng) AOF 子進(jìn)程結(jié)束,它會(huì)被整個(gè)寫(xiě)入到文件。兩者是有關(guān)聯(lián)的。

這里的意圖即是不用每次出現(xiàn)數(shù)據(jù)變更的時(shí)候都觸發(fā)一個(gè)寫(xiě)操作,可以將寫(xiě)操作先緩存到內(nèi)存中,待到合適的時(shí)機(jī)寫(xiě)入到磁盤(pán),如此避免頻繁的寫(xiě)操作。當(dāng)然,完全可以實(shí)現(xiàn)讓數(shù)據(jù)變更及時(shí)更新到磁盤(pán)中。兩種做法的好壞就是一種博弈了。

下面是后臺(tái)執(zhí)行的主要代碼:

// 啟動(dòng)后臺(tái)子進(jìn)程,執(zhí)行AOF 持久化操作。bgrewriteaofCommand(),startAppendOnly(),
// serverCron() 中會(huì)調(diào)用此函數(shù)
/* This is how rewriting of the append only file in background works:
**1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* * 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;
    // 已經(jīng)有正在執(zhí)行備份的子進(jìn)程
    if (server.aof_child_pid != -1) return REDIS_ERR;
        start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];
        // 子進(jìn)程
        /* Child */
        // 關(guān)閉監(jiān)聽(tīng)
        closeListeningSockets(0);
        // 設(shè)置進(jìn)程title
        redisSetProcTitle("redis-aof-rewrite");
        // 臨時(shí)文件名
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        // 開(kāi)始執(zhí)行AOF 持久化
    if (rewriteAppendO nlyFile(tmpfile) == REDIS_OK) {
        // 臟數(shù)據(jù),其實(shí)就是子進(jìn)程所消耗的內(nèi)存大小
        // 獲取臟數(shù)據(jù)大小
        size_t private_dirty = zmalloc_get_private_dirty();
        // 記錄臟數(shù)據(jù)
    if (private_dirty) {
        redisLog(REDIS_NOTICE,
        "AOF rewrite: %zu MB of memory used by copy-on-write",
        private_dirty/(1024*1024));
    }
        exitFromChild(0);
    } else {
        exitFromChild(1);
    }
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
    if (childpid == -1) {
        redisLog(REDIS_WARNING,
        "Can't rewrite append only file in background: fork: %s",
        strerror(errno));
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,
    "Background append only file rewriting started by pid %d",childpid);
    // AOF 已經(jīng)開(kāi)始執(zhí)行,取消AOF 計(jì)劃
    server.aof_rewrite_scheduled = 0;
    // AOF 最近一次執(zhí)行的起始時(shí)間
    server.aof_rewrite_time_start = time(NULL);
    // 子進(jìn)程ID
    server.aof_child_pid = childpid;
    updateDictResizePolicy();
// 因?yàn)楦戮彺娑紝?xiě)入文件,要強(qiáng)制產(chǎn)生選擇數(shù)據(jù)集的指令SELECT ,以防出現(xiàn)數(shù)據(jù)
// 合并錯(cuò)誤。
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
    server.aof_selected_db = -1;
    replicationScriptCacheFlush();
    return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

如上,子進(jìn)程執(zhí)行 AOF 持久化,父進(jìn)程則會(huì)記錄一些 AOF 的執(zhí)行信息。下面來(lái)看看 AOF 持久化具體是怎么做的?

// AOF 持久化主函數(shù)。只在rewriteAppendOnlyFileBackground() 中會(huì)調(diào)用此函數(shù)
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
**
In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
    int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    /* Note that we have to use a different temp name here compared to the
    * one used by rewriteAppendOnlyFileBackground() function. */

    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    // 打開(kāi)文件
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in"
        "rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }
        // 初始化rio 結(jié)構(gòu)體
        rioInitWithFile(&aof,fp);
        // 如果設(shè)置了自動(dòng)備份參數(shù),將進(jìn)行設(shè)置
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
        // 備份每一個(gè)數(shù)據(jù)集
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
    if (dictSize(d) == 0) continue;
        // 獲取數(shù)據(jù)集的迭代器
        di = dictGetSafeIterator(d);
    if (!di) {
        fclose(fp);
        return REDIS_ERR;
    }
    // 寫(xiě)入AOF 操作碼
    /* SELECT the new DB */
    if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
    // 寫(xiě)入數(shù)據(jù)集序號(hào)
    if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
    // 寫(xiě)入數(shù)據(jù)集中每一個(gè)數(shù)據(jù)項(xiàng)
    /* Iterate this DB writing every entry */
    while((de = dictNext(di)) != NULL) {
        sds keystr;
        robj key, *o;
        long long expiretime;
        keystr = dictGetKey(de);
        o = dictGetVal(de);
        // 將keystr 封裝在robj 里
        initStaticStringObject(key,keystr);
        // 獲取過(guò)期時(shí)間
        expiretime = getExpire(db,&key);

        // 如果已經(jīng)過(guò)期,放棄存儲(chǔ)
        /* If this key is already expired skip it */
    if (expiretime != -1 && expiretime < now) continue;
        // 寫(xiě)入鍵值對(duì)應(yīng)的寫(xiě)操作
        /* Save the key and associated value */
    if (o->type == REDIS_STRING) {
        /* Emit a SET command */
        char cmd[]="*3\r\n$3\r\nSET\r\n";
    if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
        /* Key and value */
    if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
    if (rioWriteBulkObject(&aof,o) == 0) goto werr;
    } else if (o->type == REDIS_LIST) {
    if (rewriteListObject(&aof,&key,o) == 0) goto werr;
    } else if (o->type == REDIS_SET) {
    if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
    } else if (o->type == REDIS_ZSET) {
    if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
    } else if (o->type == REDIS_HASH) {
    if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
    } else {
        redisPanic("Unknown object type");
    }
    // 寫(xiě)入過(guò)期時(shí)間
    /* Save the expire time */
    if (expiretime != -1) {
        char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
    if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
    if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
    if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
    }
}
    // 釋放迭代器
    dictReleaseIterator(di);
}
    // 寫(xiě)入磁盤(pán)
    /* Make sure data will not remain on the OS's output buffers */
    fflush(fp);
    aof_fsync(fileno(fp));
    fclose(fp);
    // 重寫(xiě)文件名
    /* Use RENAME to make sure the DB file is changed atomically only
    * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the "
        "final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;
    werr:
    // 清理工作
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: "
    "%s", strerror(errno));
    if (di) dictReleaseIterator(di);
        return REDIS_ERR;
}

剛才所說(shuō),AOF 在持久化結(jié)束后,持久化過(guò)程產(chǎn)生的數(shù)據(jù)變更也會(huì)追加到 AOF 文件中。如果有留意定時(shí)處理函數(shù) serverCorn():父進(jìn)程會(huì)在子進(jìn)程結(jié)束后,將 AOF 持久化過(guò)程中產(chǎn)生的數(shù)據(jù)變更,追加到 AOF 文件。這就是 backgroundRewriteDoneHandler() 要做的:將 server.aof_rewrite_buf_blocks 追加到 AOF 文件。

// 后臺(tái)子進(jìn)程結(jié)束后,Redis 更新緩存server.aof_rewrite_buf_blocks 追加到AOF 文件中
// 在AOF 持久化結(jié)束后會(huì)執(zhí)行這個(gè)函數(shù), backgroundRewriteDoneHandler() 主要工作是
// 將server.aof_rewrite_buf_blocks,即AOF 緩存寫(xiě)入文件
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
    void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ......
    // 將AOF 緩存server.aof_rewrite_buf_blocks 的AOF 寫(xiě)入磁盤(pán)
    if (aofRewriteBufferWrite(newfd) == -1) {
        redisLog(REDIS_WARNING,
        "Error trying to flush the parent diff to the rewritten AOF: %s",
        strerror(errno));
        close(newfd);
        goto cleanup;
    }
    ......
    }
// 將累積的更新緩存server.aof_rewrite_buf_blocks 同步到磁盤(pán)
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
    ssize_t aofRewriteBufferWrite(int fd) {
    listNode *ln;
    listIter li;
    ssize_t count = 0;
    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
    aofrwblock *block = listNodeValue(ln);
    ssize_t nwritten;
    if (block->used) {
        nwritten = write(fd,block->buf,block->used);
    if (nwritten != block->used) {
    if (nwritten == 0) errno = EIO;
        return -1;
    }
        count += nwritten;
    }
  }
  return count;
}

2)邊服務(wù)邊備份的方式,即 Redis 服務(wù)器會(huì)把所有的數(shù)據(jù)變更存儲(chǔ)在 server.aof_buf 中,并在特定時(shí)機(jī)將更新緩存寫(xiě)入預(yù)設(shè)定的文件(server.aof_filename)。特定時(shí)機(jī)有三種:

  1. 進(jìn)入事件循環(huán)之前
  2. Redis 服務(wù)器定時(shí)程序 serverCron() 中
  3. 停止 AOF 策略的 stopAppendOnly() 中

Redis 無(wú)非是不想服務(wù)器突然崩潰終止,導(dǎo)致過(guò)多的數(shù)據(jù)丟失。Redis 默認(rèn)是每隔固定時(shí)間進(jìn)行一次邊服務(wù)邊備份,即隔固定時(shí)間將累積的變更的寫(xiě)入文件。

下面是邊服務(wù)邊執(zhí)行 AOF 持久化的主要代碼:

// 同步磁盤(pán);將所有累積的更新server.aof_buf 寫(xiě)入磁盤(pán)
/* Write the append only file buffer on disk.
**
Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
**
About the 'force' argument:
**
When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
**
However if force is set to 1 we'll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {

    ssize_t nwritten;
    int sync_in_progress = 0;
    // 無(wú)數(shù)據(jù),無(wú)需同步到磁盤(pán)
    if (sdslen(server.aof_buf) == 0) return;
    // 創(chuàng)建線(xiàn)程任務(wù),主要調(diào)用fsync()
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
    // 如果沒(méi)有設(shè)置強(qiáng)制同步的選項(xiàng),可能不會(huì)立即進(jìn)行同步
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
    // 推遲執(zhí)行AOF
    /* With this append fsync policy we do background fsyncing.
    * If the fsync is still in progress we can try to delay
    * the write for a couple of seconds. */
    if (sync_in_progress) {
    if (server.aof_flush_postponed_start == 0) {
        // 設(shè)置延遲沖洗時(shí)間選項(xiàng)
    /* No previous write postponinig, remember that we are
    * postponing the flush and return. */
    // /* Unix time sampled every cron cycle. */
        server.aof_flush_postponed_start = server.unixtime;
        return;
    // 沒(méi)有超過(guò)2s,直接結(jié)束
    } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
    /* We were already waiting for fsync to finish, but for less
    * than two seconds this is still ok. Postpone again. */
    return;
    }
    // 否則,要強(qiáng)制寫(xiě)入磁盤(pán)
    /* Otherwise fall trough, and go write since we can't wait
    * over two seconds. */
        server.aof_delayed_fsync++;
        redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk"
    " is busy?). Writing the AOF buffer without waiting for fsync to "
    "complete, this may slow down Redis.");
    }
  }
    // 取消延遲沖洗時(shí)間設(shè)置
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
// AOF 文件已經(jīng)打開(kāi)了。將server.aof_buf 中的所有緩存數(shù)據(jù)寫(xiě)入文件

    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {
    /* Ooops, we are in troubles. The best thing to do for now is
    * aborting instead of giving the illusion that everything is
    * working as expected. */
    if (nwritten == -1) {
        redisLog(REDIS_WARNING,"Exiting on error writing to the append-only"
        " file: %s",strerror(errno));
    } else {
        redisLog(REDIS_WARNING,"Exiting on short write while writing to "
        "the append-only file: %s (nwritten=%ld, "
        "expected=%ld)",
        strerror(errno),
        (long)nwritten,
        (long)sdslen(server.aof_buf));
    if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
        redisLog(REDIS_WARNING, "Could not remove short write "
        "from the append-only file. Redis may refuse "
        "to load the AOF the next time it starts. "
        "ftruncate: %s", strerror(errno));
        }
    }
    exit(1);
}
    // 更新AOF 文件的大小
    server.aof_current_size += nwritten;
    // 當(dāng)server.aof_buf 足夠小, 重新利用空間,防止頻繁的內(nèi)存分配。
    // 相反,當(dāng)server.aof_buf 占據(jù)大量的空間,采取的策略是釋放空間,可見(jiàn)redis
    // 對(duì)內(nèi)存很敏感。
    /* Re-use AOF buffer when it is small enough. The maximum comes from the
    * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
    * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
    (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
    return;
    // sync, 寫(xiě)入磁盤(pán)
    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
        * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
        server.unixtime > server.aof_last_fsync)) {
    if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

細(xì)說(shuō)更新緩存

上面兩次提到了「更新緩存」,它即是 Redis 累積的數(shù)據(jù)變更。

更新緩存可以存儲(chǔ)在 server.aof_buf 中,可以存儲(chǔ)在 server.server.aof_rewrite_buf_blocks 連表中。他們的關(guān)系是:每一次數(shù)據(jù)變更記錄都會(huì)寫(xiě)入 server.aof_buf 中,同時(shí)如果后臺(tái)子進(jìn)程在持久化,變更記錄還會(huì)被寫(xiě)入 server.server.aof_rewrite_buf_blocks 中。server.aof_buf 會(huì)在特定時(shí)期寫(xiě)入指定文件,server.server.aof_rewrite_buf_blocks 會(huì)在后臺(tái)持久化結(jié)束后追加到文件。

Redis 源碼中是這么實(shí)現(xiàn)的:propagrate()->feedAppendOnlyFile()->aofRewriteBufferAppend()

注意,feedAppendOnlyFile() 會(huì)把更新添加到server.aof_buf;接下來(lái)會(huì)有一個(gè)判斷,如果存在 AOF 子進(jìn)程,則調(diào)用aofRewriteBufferAppend() 將server.aof_buf 中的所有數(shù)據(jù)插入到 server.aof_rewrite_buf_blocks 鏈表。這樣,就能夠理解為什么在AOF 持久化子進(jìn)程結(jié)束后,父進(jìn)程會(huì)將 server.aof_rewrite_buf_blocks 追加到 AOF 文件了。

// 向AOF 和從機(jī)發(fā)布數(shù)據(jù)更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
**
flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
    int flags)
    {
    // AOF 策略需要打開(kāi),且設(shè)置AOF 傳播標(biāo)記,將更新發(fā)布給本地文件
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    // 設(shè)置了從機(jī)傳播標(biāo)記,將更新發(fā)布給從機(jī)
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
    }
    // 將數(shù)據(jù)更新記錄到AOF 緩存中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv,
    int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];
    /* The DB this command was targeting is not the same as the last command
    * we appendend. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
    (unsigned long)strlen(seldb),seldb);
    server.aof_selected_db = dictid;
    }
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
    tmpargv[1] = argv[1];
    tmpargv[2] = argv[3];
    buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
    decrRefCount(tmpargv[0]);
    buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
    /* All the other commands don't need translation or need the
    * same translation already operated in the command vector
    * for the replication itself. */
    buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }
// 將生成的AOF 追加到server.aof_buf 中。server. 在下一次進(jìn)入事件循環(huán)之前,
// aof_buf 中的內(nèi)容將會(huì)寫(xiě)到磁盤(pán)上
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果已經(jīng)有AOF 子進(jìn)程運(yùn)行,redis 采取的策略是累積子進(jìn)程AOF 備份的數(shù)據(jù)和
// 內(nèi)存中數(shù)據(jù)集的差異。aofRewriteBufferAppend() 把buf 的內(nèi)容追加到
// server.aof_rewrite_buf_blocks 數(shù)組中
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
        sdsfree(buf);
    }
    // 將數(shù)據(jù)更新記錄寫(xiě)入server.aof_rewrite_buf_blocks,此函數(shù)只由
    // feedAppendOnlyFile() 調(diào)用
    /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    // 尾插法
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;
    while(len) {
    /* If we already got at least an allocated block, try appending
    * at least some piece into it. */
    if (block) {
        unsigned long thislen = (block->free < len) ? block->free : len;
    if (thislen) { /* The current block is not already full. */
        memcpy(block->buf+block->used, s, thislen);
        block->used += thislen;
        block->free -= thislen;
        s += thislen;
        len -= thislen;
    }
}
    if (len) { /* First block to allocate, or need another block. */
        int numblocks;
        // 創(chuàng)建新的節(jié)點(diǎn),插到尾部
        block = zmalloc(sizeof(*block));
        block->free = AOF_RW_BUF_BLOCK_SIZE;
        block->used = 0;
        // 尾插法
        listAddNodeTail(server.aof_rewrite_buf_blocks,block);
        /* Log every time we cross more 10 or 100 blocks, respectively
        * as a notice or warning. */
        numblocks = listLength(server.aof_rewrite_buf_blocks);
    if (((numblocks+1) % 10) == 0) {
        int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
        REDIS_NOTICE;
        redisLog(level,"Background AOF buffer size: %lu MB",
        aofRewriteBufferSize()/(1024*1024));
         }
      }
   }
}

一副可以緩解視力疲勞的圖片——AOF 持久化運(yùn)作機(jī)制:

http://wiki.jikexueyuan.com/project/redis/images/redis19.png" alt="" />

兩種數(shù)據(jù)落地的方式,就是 AOF 的兩個(gè)主線(xiàn)。因此,redis AOF 持久化機(jī)制有兩條主線(xiàn):后臺(tái)執(zhí)行和邊服務(wù)邊備份,抓住這兩點(diǎn)就能理解 redis AOF 了。

這里有一個(gè)疑問(wèn),兩條主線(xiàn)都會(huì)涉及文件的寫(xiě):后臺(tái)執(zhí)行會(huì)寫(xiě)一個(gè)AOF 文件,邊服務(wù)邊備份也會(huì)寫(xiě)一個(gè),以哪個(gè)為準(zhǔn)?

后臺(tái)持久化的數(shù)據(jù)首先會(huì)被寫(xiě)入“temp-rewriteaof-bg-%d.aof”,其中“%d”是AOF 子進(jìn)程 id;待 AOF 子進(jìn)程結(jié)束后,“temp-rewriteaof-bg-%d.aof”會(huì)被以追加的方式打開(kāi),繼而寫(xiě)入 server.aof_rewrite_buf_blocks 中的更新緩存,最后“temp-rewriteaof-bg-%d.aof”文件被命名為 server.aof_filename,所以之前的名為 server.aof_filename 的文件會(huì)被刪除,也就是說(shuō)邊服務(wù)邊備份寫(xiě)入的文件會(huì)被刪除。邊服務(wù)邊備份的數(shù)據(jù)會(huì)被一直寫(xiě)入到 server.aof_filename文件中。

因此,確實(shí)會(huì)產(chǎn)生兩個(gè)文件,但是最后都會(huì)變成 server.aof_filename 文件。這里可能還有一個(gè)疑問(wèn),既然有了后臺(tái)持久化,為什么還要邊服務(wù)邊備份?邊服務(wù)邊備份時(shí)間長(zhǎng)了會(huì)產(chǎn)生數(shù)據(jù)冗余甚至備份過(guò)舊的數(shù)據(jù),而后臺(tái)持久化可以消除這些東西。看,這里是 Redis 的雙保險(xiǎn)。

AOF 恢復(fù)過(guò)程

AOF 的數(shù)據(jù)恢復(fù)過(guò)程設(shè)計(jì)很巧妙,它模擬一個(gè) Redis 的服務(wù)過(guò)程。Redis 首先虛擬一個(gè)客戶(hù)端,讀取 AOF 文件恢復(fù) Redis 命令和參數(shù);接著過(guò)程就和服務(wù)客戶(hù)端一樣執(zhí)行命令相應(yīng)的函數(shù),從而恢復(fù)數(shù)據(jù),這樣做的目的無(wú)非是提高代碼的復(fù)用率。這些過(guò)程主要在 loadAppendOnlyFile() 中實(shí)現(xiàn)。

// 加載AOF 文件,恢復(fù)數(shù)據(jù)
/* Replay the append log file. On error REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
    struct redisClient *fakeClient;
    FILE *fp = fopen(filename,"r");
    struct redis_stat sb;
    int old_aof_state = server.aof_state;
    long loops = 0;
    // 文件大小不能為0
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        server.aof_current_size = 0;
        fclose(fp);
        return REDIS_ERR;
    }
    if (fp == NULL) {
        redisLog(REDIS_WARNING,"Fatal error: can't open the append log file "
        "for reading: %s",strerror(errno));
        exit(1);
    }
    // 正在執(zhí)行AOF 加載操作,于是暫時(shí)禁止AOF 的所有操作,以免混淆
    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
    * to the same file we're about to read. */
    server.aof_state = REDIS_AOF_OFF;
    // 虛擬出一個(gè)客戶(hù)端,即redisClient
    fakeClient = createFakeClient();
    startLoading(fp);
    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;
        // 每循環(huán)1000 次,在恢復(fù)數(shù)據(jù)的同時(shí),服務(wù)器也為客戶(hù)端服務(wù)。
        // aeProcessEvents() 會(huì)進(jìn)入事件循環(huán)
        /* Serve the clients from time to time */
    if (!(loops++ % 1000)) {
        loadingProgress(ftello(fp));
        aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
    }
    // 可能aof 文件到了結(jié)尾
    if (fgets(buf,sizeof(buf),fp) == NULL) {
    if (feof(fp))
        break;
    else
        goto readerr;
    }
    // 必須以“*”開(kāi)頭,格式不對(duì),退出
    if (buf[0] != '*') goto fmterr;
        // 參數(shù)的個(gè)數(shù)
        argc = atoi(buf+1);
        // 參數(shù)個(gè)數(shù)錯(cuò)誤
    if (argc < 1) goto fmterr;
        // 為參數(shù)分配空間
        argv = zmalloc(sizeof(robj*)*argc);
        // 依次讀取參數(shù)
    for (j = 0; j < argc; j++) {
    if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
    if (buf[0] != '$') goto fmterr;
        len = strtol(buf+1,NULL,10);
        argsds = sdsnewlen(NULL,len);
    if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
        argv[j] = createObject(REDIS_STRING,argsds);
    if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
    }
    // 找到相應(yīng)的命令
    /* Command lookup */
    cmd = lookupCommand(argv[0]->ptr);
    if (!cmd) {
        redisLog(REDIS_WARNING,"Unknown command '%s' reading the "
        "append only file", (char*)argv[0]->ptr);
        exit(1);
    }
    // 執(zhí)行命令,模擬服務(wù)客戶(hù)端請(qǐng)求的過(guò)程,從而寫(xiě)入數(shù)據(jù)
    /* Run the command in the context of a fake client */
    fakeClient->argc = argc;
    fakeClient->argv = argv;
    cmd->proc(fakeClient);
    /* The fake client should not have a reply */
    redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply)
    == 0);
    /* The fake client should never get blocked */
    redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
    // 釋放虛擬客戶(hù)端空間
    /* Clean up. Command code may have changed argv/argc so we use the
    * argv/argc of the client instead of the local variables. */
    for (j = 0; j < fakeClient->argc; j++)
        decrRefCount(fakeClient->argv[j]);
        zfree(fakeClient->argv);
    }
    /* This point can only be reached when EOF is reached without errors.
    * If the client is in the middle of a MULTI/EXEC, log error and quit. */
    if (fakeClient->flags & REDIS_MULTI) goto readerr;
        // 清理工作
        fclose(fp);
        freeFakeClient(fakeClient);
        // 恢復(fù)舊的AOF 狀態(tài)
        server.aof_state = old_aof_state;
        stopLoading();
        // 記錄最近AOF 操作的文件大小
        aofUpdateCurrentSize();
        server.aof_rewrite_base_size = server.aof_current_size;
        return REDIS_OK;
        readerr:
    // 錯(cuò)誤,清理工作
    if (feof(fp)) {
        redisLog(REDIS_WARNING,"Unexpected end of file reading the append "
        "only file");
    } else {
        redisLog(REDIS_WARNING,"Unrecoverable error reading the append only "
        "file: %s", strerror(errno));
    }
    exit(1);
    fmterr:
    redisLog(REDIS_WARNING,"Bad file format reading the append only file: "
    "make a backup of your AOF file, then use ./redis-check-aof --fix "
    "<filename>");
    exit(1);
}

AOF 的適用場(chǎng)景

如果對(duì)數(shù)據(jù)比較關(guān)心,分秒必爭(zhēng),可以用 AOF 持久化,而且AOF 文件很容易進(jìn)行分析。