Logo Search packages:      
Sourcecode: db4.3 version File versions

rep_backup.c

/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 2004
 *    Sleepycat Software.  All rights reserved.
 *
 * $Id: rep_backup.c,v 1.33 2004/10/29 18:08:09 bostic Exp $
 */

#include "db_config.h"

#ifndef NO_SYSTEM_INCLUDES
#if TIME_WITH_SYS_TIME
#include <sys/time.h>
#include <time.h>
#else
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#else
#include <time.h>
#endif
#endif

#include <string.h>
#endif

#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_shash.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/qam.h"
#include "dbinc/txn.h"

static int __rep_filedone __P((DB_ENV *, int, REP *, __rep_fileinfo_args *,
    u_int32_t));
static int __rep_files_data __P((DB_ENV *, u_int8_t *, size_t *,
    size_t *, int *));
static int __rep_files_inmem __P((DB_ENV *, u_int8_t *, size_t *,
    size_t *, int *));
static int __rep_get_fileinfo __P((DB_ENV *, const char *,
    __rep_fileinfo_args *, u_int8_t *, int *));
static int __rep_log_setup __P((DB_ENV *, REP *));
static int __rep_mpf_open __P((DB_ENV *, DB_MPOOLFILE **,
    __rep_fileinfo_args *));
static int __rep_page_gap __P((DB_ENV *, REP *, __rep_fileinfo_args *,
    u_int32_t));
static int __rep_page_sendpages __P((DB_ENV *, int,
    __rep_fileinfo_args *, DB_MPOOLFILE *, DB *));
static int __rep_queue_filedone __P((DB_ENV *, REP *, __rep_fileinfo_args *));
static int __rep_walk_dir __P((DB_ENV *, const char *, u_int8_t *,
    size_t *, size_t *, int *));
static int __rep_write_page __P((DB_ENV *, REP *, __rep_fileinfo_args *));

/*
 * __rep_update_req -
 *    Process an update_req and send the file information to the client.
 *
 * PUBLIC: int __rep_update_req __P((DB_ENV *, int));
 */
int
__rep_update_req(dbenv, eid)
      DB_ENV *dbenv;
      int eid;
{
      DBT updbt;
      DB_LOG *dblp;
      DB_LOGC *logc;
      DB_LSN lsn;
      DBT data_dbt;
      size_t filelen, filesz, updlen;
      u_int8_t *buf, *fp;
      int filecnt, ret, t_ret;

      /*
       * Allocate enough for all currently open files and then some.
       * Optimize for the common use of having most databases open.
       * Allocate dbentry_cnt * 2 plus an estimated 60 bytes per
       * file for the filename/path (or multiplied by 120).
       *
       * The data we send looks like this:
       *    __rep_update_args
       *    __rep_fileinfo_args
       *    __rep_fileinfo_args
       *    ...
       */
      dblp = dbenv->lg_handle;
      filecnt = 0;
      filelen = 0;
      updlen = 0;
      filesz = MEGABYTE;
      if ((ret = __os_calloc(dbenv, 1, filesz, &buf)) != 0)
            return (ret);

      /*
       * First get our file information.  Get in-memory files first
       * then get on-disk files.
       */
      fp = buf + sizeof(__rep_update_args);
      if ((ret = __rep_files_inmem(dbenv, fp, &filesz, &filelen,
          &filecnt)) != 0)
            goto err;
      if ((ret = __rep_files_data(dbenv, fp, &filesz, &filelen,
          &filecnt)) != 0)
            goto err;

      /*
       * Now get our first LSN.
       */
      if ((ret = __log_cursor(dbenv, &logc)) != 0)
            goto err;
      memset(&data_dbt, 0, sizeof(data_dbt));
      ret = __log_c_get(logc, &lsn, &data_dbt, DB_FIRST);
      if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
            ret = t_ret;
      if (ret != 0)
            goto err;

      /*
       * Package up the update information.
       */
      if ((ret = __rep_update_buf(buf, filesz, &updlen, &lsn, filecnt)) != 0)
            goto err;
      /*
       * We have all the file information now.  Send it to the client.
       */
      memset(&updbt, 0, sizeof(updbt));
      updbt.data = buf;
      updbt.size = (u_int32_t)(filelen + updlen);
      R_LOCK(dbenv, &dblp->reginfo);
      lsn = ((LOG *)dblp->reginfo.primary)->lsn;
      R_UNLOCK(dbenv, &dblp->reginfo);
      (void)__rep_send_message(dbenv, eid, REP_UPDATE, &lsn, &updbt, 0);

err:
      __os_free(dbenv, buf);
      return (ret);
}

/*
 * __rep_files_data -
 *    Walk through all the files in the env's data_dirs.  We need to
 *    open them, gather the necessary information and then close them.
 *    Then we need to figure out if they're already in the dbentry array.
 */
static int
__rep_files_data(dbenv, fp, fileszp, filelenp, filecntp)
      DB_ENV *dbenv;
      u_int8_t *fp;
      size_t *fileszp, *filelenp;
      int *filecntp;
{
      int ret;
      char **ddir;

      ret = 0;
      if (dbenv->db_data_dir == NULL) {
            /*
             * If we don't have a data dir, we have just the
             * env home dir.
             */
            ret = __rep_walk_dir(dbenv, dbenv->db_home, fp,
                fileszp, filelenp, filecntp);
      } else {
            for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir)
                  if ((ret = __rep_walk_dir(dbenv, *ddir, fp,
                      fileszp, filelenp, filecntp)) != 0)
                        break;
      }
      return (ret);
}

static int
__rep_walk_dir(dbenv, dir, fp, fileszp, filelenp, filecntp)
      DB_ENV *dbenv;
      const char *dir;
      u_int8_t *fp;
      size_t *fileszp, *filelenp;
      int *filecntp;
{
      DBT namedbt, uiddbt;
      __rep_fileinfo_args tmpfp;
      size_t len, offset;
      int cnt, i, ret;
      u_int8_t *rfp, uid[DB_FILE_ID_LEN];
      char **names;
#ifdef DIAGNOSTIC
      REP *rep;
      DB_MSGBUF mb;
      DB_REP *db_rep;

      db_rep = dbenv->rep_handle;
      rep = db_rep->region;
#endif
      memset(&namedbt, 0, sizeof(namedbt));
      memset(&uiddbt, 0, sizeof(uiddbt));
      RPRINT(dbenv, rep, (dbenv, &mb,
          "Walk_dir: Getting info for dir: %s", dir));
      if ((ret = __os_dirlist(dbenv, dir, &names, &cnt)) != 0)
            return (ret);
      rfp = fp;
      RPRINT(dbenv, rep, (dbenv, &mb,
          "Walk_dir: Dir %s has %d files", dir, cnt));
      for (i = 0; i < cnt; i++) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Walk_dir: File %d name: %s", i, names[i]));
            /*
             * Skip DB-owned files: ., ..,  __db*, DB_CONFIG, log*
             */
            if (strcmp(names[i], ".") == 0)
                  continue;
            if (strcmp(names[i], "..") == 0)
                  continue;
            if (strncmp(names[i], "__db", 4) == 0)
                  continue;
            if (strncmp(names[i], "DB_CONFIG", 9) == 0)
                  continue;
            if (strncmp(names[i], "log", 3) == 0)
                  continue;
            /*
             * We found a file to process.  Check if we need
             * to allocate more space.
             */
            if ((ret = __rep_get_fileinfo(dbenv, names[i], &tmpfp, uid,
                filecntp)) != 0) {
                  /*
                   * If we find a file that isn't a database, skip it.
                   */
                  RPRINT(dbenv, rep, (dbenv, &mb,
                      "Walk_dir: File %d %s: returned error %s",
                      i, names[i], db_strerror(ret)));
                  ret = 0;
                  continue;
            }
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Walk_dir: File %d (of %d) %s: pgsize %lu, max_pgno %lu",
                tmpfp.filenum, i, names[i],
                (u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno));
            namedbt.data = names[i];
            namedbt.size = (u_int32_t)strlen(names[i]) + 1;
            uiddbt.data = uid;
            uiddbt.size = DB_FILE_ID_LEN;
retry:
            ret = __rep_fileinfo_buf(rfp, *fileszp, &len,
                tmpfp.pgsize, tmpfp.pgno, tmpfp.max_pgno,
                tmpfp.filenum, tmpfp.id, tmpfp.type,
                tmpfp.flags, &uiddbt, &namedbt);
            if (ret == ENOMEM) {
                  offset = (size_t)(rfp - fp);
                  *fileszp *= 2;
                  /*
                   * Need to account for update info on both sides
                   * of the allocation.
                   */
                  fp -= sizeof(__rep_update_args);
                  if ((ret = __os_realloc(dbenv, *fileszp, fp)) != 0)
                        break;
                  fp += sizeof(__rep_update_args);
                  rfp = fp + offset;
                  /*
                   * Now that we've reallocated the space, try to
                   * store it again.
                   */
                  goto retry;
            }
            rfp += len;
            *filelenp += len;
      }
      __os_dirfree(dbenv, names, cnt);
      return (ret);
}

static int
__rep_get_fileinfo(dbenv, file, rfp, uid, filecntp)
      DB_ENV *dbenv;
      const char *file;
      __rep_fileinfo_args *rfp;
      u_int8_t *uid;
      int *filecntp;
{

      DB *dbp, *entdbp;
      DB_LOCK lk;
      DB_LOG *dblp;
      DB_MPOOLFILE *mpf;
      DBC *dbc;
      DBMETA *dbmeta;
      PAGE *pagep;
      int i, ret, t_ret;

      dbp = NULL;
      dbc = NULL;
      pagep = NULL;
      mpf = NULL;
      LOCK_INIT(lk);

      dblp = dbenv->lg_handle;
      if ((ret = db_create(&dbp, dbenv, 0)) != 0)
            goto err;
      if ((ret = __db_open(dbp, NULL, file, NULL, DB_UNKNOWN,
          DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0),
          0, PGNO_BASE_MD)) != 0)
            goto err;

      if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
            goto err;
      if ((ret = __db_lget(
          dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0)
            goto err;
      if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, 0, &pagep)) != 0)
            goto err;
      /*
       * We have the meta page.  Set up our information.
       */
      dbmeta = (DBMETA *)pagep;
      rfp->pgno = 0;
      /*
       * Queue is a special-case.  We need to set max_pgno to 0 so that
       * the client can compute the pages from the meta-data.
       */
      if (dbp->type == DB_QUEUE)
            rfp->max_pgno = 0;
      else
            rfp->max_pgno = dbmeta->last_pgno;
      rfp->pgsize = dbp->pgsize;
      memcpy(uid, dbp->fileid, DB_FILE_ID_LEN);
      rfp->filenum = (*filecntp)++;
      rfp->type = dbp->type;
      rfp->flags = dbp->flags;
      rfp->id = DB_LOGFILEID_INVALID;
      ret = __memp_fput(dbp->mpf, pagep, 0);
      pagep = NULL;
      if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
            ret = t_ret;
      if (ret != 0)
            goto err;
err:
      if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
            ret = t_ret;
      if (dbc != NULL && (t_ret = __db_c_close(dbc)) != 0 && ret == 0)
            ret = t_ret;
      if (pagep != NULL &&
          (t_ret = __memp_fput(mpf, pagep, 0)) != 0 && ret == 0)
            ret = t_ret;
      if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
            ret = t_ret;
      /*
       * We walk the entry table now, after closing the dbp because
       * otherwise we find the open from this function and the id
       * is useless in that case.
       */
      if (ret == 0) {
            MUTEX_THREAD_LOCK(dbenv, dblp->mutexp);
            /*
             * Walk entry table looking for this uid.
             * If we find it, save the id.
             */
            for (i = 0; i < dblp->dbentry_cnt; i++) {
                  entdbp = dblp->dbentry[i].dbp;
                  if (entdbp == NULL)
                        break;
                  DB_ASSERT(entdbp->log_filename != NULL);
                  if (memcmp(uid,
                      entdbp->log_filename->ufid,
                      DB_FILE_ID_LEN) == 0)
                        rfp->id = i;
            }
            MUTEX_THREAD_UNLOCK(dbenv, dblp->mutexp);
      }
      return (ret);
}

/*
 * __rep_files_inmem -
 *    Gather all the information about in-memory files.
 */
static int
__rep_files_inmem(dbenv, fp, fileszp, filelenp, filecntp)
      DB_ENV *dbenv;
      u_int8_t *fp;
      size_t *fileszp, *filelenp;
      int *filecntp;
{

      int ret;

      COMPQUIET(dbenv, NULL);
      COMPQUIET(fp, NULL);
      COMPQUIET(fileszp, NULL);
      COMPQUIET(filelenp, NULL);
      COMPQUIET(filecntp, NULL);
      ret = 0;
      return (ret);
}

/*
 * __rep_page_req
 *    Process a page_req and send the page information to the client.
 *
 * PUBLIC: int __rep_page_req __P((DB_ENV *, int, DBT *));
 */
int
__rep_page_req(dbenv, eid, rec)
      DB_ENV *dbenv;
      int eid;
      DBT *rec;
{
      DB *dbp;
      DBT msgdbt;
      DB_LOG *dblp;
      DB_MPOOLFILE *mpf;
      __rep_fileinfo_args *msgfp;
      int ret, t_ret;
      void *next;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
      DB_REP *db_rep;
      REP *rep;

      db_rep = dbenv->rep_handle;
      rep = db_rep->region;
#endif
      dblp = dbenv->lg_handle;
      if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
            return (ret);
      /*
       * See if we can find it already.  If so we can quickly
       * access its mpool and process.  Otherwise we have to
       * open the file ourselves.
       */
      RPRINT(dbenv, rep, (dbenv, &mb, "page_req: file %d page %lu to %lu",
          msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
      MUTEX_THREAD_LOCK(dbenv, dblp->mutexp);
      if (msgfp->id >= 0 && dblp->dbentry_cnt > msgfp->id) {
            dbp = dblp->dbentry[msgfp->id].dbp;
            if (dbp != NULL) {
                  DB_ASSERT(dbp->log_filename != NULL);
                  if (memcmp(msgfp->uid.data, dbp->log_filename->ufid,
                      DB_FILE_ID_LEN) == 0) {
                        MUTEX_THREAD_UNLOCK(dbenv, dblp->mutexp);
                        RPRINT(dbenv, rep, (dbenv, &mb,
                            "page_req: found %d in dbreg",
                            msgfp->filenum));
                        ret = __rep_page_sendpages(dbenv, eid,
                            msgfp, dbp->mpf, dbp);
                        goto err;
                  }
            }
      }
      MUTEX_THREAD_UNLOCK(dbenv, dblp->mutexp);

      /*
       * If we get here, we do not have the file open via dbreg.
       * We need to open the file and then send its pages.
       * If we cannot open the file, we send REP_FILE_FAIL.
       */
      RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d via mpf_open",
          msgfp->filenum));
      if ((ret = __rep_mpf_open(dbenv, &mpf, msgfp)) != 0) {
            memset(&msgdbt, 0, sizeof(msgdbt));
            msgdbt.data = msgfp;
            msgdbt.size = sizeof(*msgfp);
            RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d failed",
                msgfp->filenum));
            (void)__rep_send_message(dbenv, eid, REP_FILE_FAIL,
                NULL, &msgdbt, 0);
            goto err;
      }

      ret = __rep_page_sendpages(dbenv, eid, msgfp, mpf, NULL);
      t_ret = __memp_fclose(mpf, 0);
      if (ret == 0 && t_ret != 0)
            ret = t_ret;
err:
      __os_free(dbenv, msgfp);
      return (ret);
}

static int
__rep_page_sendpages(dbenv, eid, msgfp, mpf, dbp)
      DB_ENV *dbenv;
      int eid;
      __rep_fileinfo_args *msgfp;
      DB_MPOOLFILE *mpf;
      DB *dbp;
{
      DB *qdbp;
      DBT msgdbt, pgdbt;
      DB_LOG *dblp;
      DB_LSN lsn;
      DB_MSGBUF mb;
      DB_REP *db_rep;
      PAGE *pagep;
      REP *rep;
      db_pgno_t p;
      size_t len, msgsz;
      u_int32_t bytes, gbytes, type;
      int check_limit, opened, ret, t_ret;
      u_int8_t *buf;

#ifndef DIAGNOSTIC
      DB_MSGBUF_INIT(&mb);
#endif
      db_rep = dbenv->rep_handle;
      rep = db_rep->region;
      opened = 0;
      gbytes = bytes = 0;
      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      gbytes = rep->gbytes;
      bytes = rep->bytes;
      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      check_limit = gbytes != 0 || bytes != 0;
      qdbp = NULL;
      buf = NULL;
      if (msgfp->type == DB_QUEUE) {
            if (dbp == NULL) {
                  if ((ret = db_create(&qdbp, dbenv, 0)) != 0)
                        goto err;
                  if ((ret = __db_open(qdbp, NULL, msgfp->info.data,
                      NULL, DB_UNKNOWN,
                      DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ?
                      DB_THREAD : 0), 0, PGNO_BASE_MD)) != 0)
                        goto err;
                  opened = 1;
            } else
                  qdbp = dbp;
      }
      msgsz = sizeof(__rep_fileinfo_args) + DB_FILE_ID_LEN + msgfp->pgsize;
      if ((ret = __os_calloc(dbenv, 1, msgsz, &buf)) != 0)
            return (ret);
      memset(&msgdbt, 0, sizeof(msgdbt));
      memset(&pgdbt, 0, sizeof(pgdbt));
      RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: file %d page %lu to %lu",
          msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
      for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) {
            if (msgfp->type == DB_QUEUE && p != 0)
#ifdef HAVE_QUEUE
                  ret = __qam_fget(qdbp, &p, DB_MPOOL_CREATE, &pagep);
#else
                  ret = DB_PAGE_NOTFOUND;
#endif
            else
                  ret = __memp_fget(mpf, &p, DB_MPOOL_CREATE, &pagep);
            type = REP_PAGE;
            if (ret == DB_PAGE_NOTFOUND) {
                  memset(&pgdbt, 0, sizeof(pgdbt));
                  ret = 0;
                  ZERO_LSN(lsn);
                  RPRINT(dbenv, rep, (dbenv, &mb,
                      "sendpages: PAGE_FAIL on page %lu", (u_long)p));
                  type = REP_PAGE_FAIL;
                  msgfp->pgno = p;
                  goto send;
            } else if (ret != 0)
                  goto err;
            else {
                  pgdbt.data = pagep;
                  pgdbt.size = (u_int32_t)msgfp->pgsize;
            }
            len = 0;
            ret = __rep_fileinfo_buf(buf, msgsz, &len,
                msgfp->pgsize, p, msgfp->max_pgno,
                msgfp->filenum, msgfp->id, msgfp->type,
                msgfp->flags, &msgfp->uid, &pgdbt);
            if (ret != 0)
                  goto err;

            DB_ASSERT(len <= msgsz);
            msgdbt.data = buf;
            msgdbt.size = (u_int32_t)len;

            dblp = dbenv->lg_handle;
            R_LOCK(dbenv, &dblp->reginfo);
            lsn = ((LOG *)dblp->reginfo.primary)->lsn;
            R_UNLOCK(dbenv, &dblp->reginfo);
            if (check_limit) {
                  /*
                   * msgdbt.size is only the size of the page and
                   * other information we're sending.  It doesn't
                   * count the size of the control structure.  Factor
                   * that in as well so we're not off by a lot if
                   * pages are small.
                   */
                  while (bytes < msgdbt.size + sizeof(REP_CONTROL)) {
                        if (gbytes > 0) {
                              bytes += GIGABYTE;
                              --gbytes;
                              continue;
                        }
                        /*
                         * We don't hold the rep mutex, and may
                         * miscount.
                         */
                        rep->stat.st_nthrottles++;
                        type = REP_PAGE_MORE;
                        goto send;
                  }
                  bytes -= (msgdbt.size + sizeof(REP_CONTROL));
            }
send:
            RPRINT(dbenv, rep, (dbenv, &mb,
                "sendpages: %s %lu, lsn [%lu][%lu]",
                (type == REP_PAGE ? "PAGE" :
                (type == REP_PAGE_MORE ? "PAGE_MORE" : "PAGE_FAIL")),
                (u_long)p, (u_long)lsn.file, (u_long)lsn.offset));
            (void)__rep_send_message(dbenv, eid, type, &lsn, &msgdbt, 0);
            /*
             * If we have REP_PAGE_FAIL we need to break before trying
             * to give the page back to mpool.  If we have REP_PAGE_MORE
             * we need to break this loop after giving the page back
             * to mpool.  Otherwise, with REP_PAGE, we keep going.
             */
            if (type == REP_PAGE_FAIL)
                  break;
            if (msgfp->type != DB_QUEUE || p == 0)
                  ret = __memp_fput(mpf, pagep, 0);
#ifdef HAVE_QUEUE
            else
                  /*
                   * We don't need an #else for HAVE_QUEUE here because if
                   * we're not compiled with queue, then we're guaranteed
                   * to have set REP_PAGE_FAIL above.
                   */
                  ret = __qam_fput(qdbp, p, pagep, 0);
#endif
            if (type == REP_PAGE_MORE)
                  break;
      }
err:
      if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 &&
          ret == 0)
            ret = t_ret;
      if (buf != NULL)
            __os_free(dbenv, buf);
      return (ret);
}

/*
 * __rep_update_setup
 *    Process and setup with this file information.
 *
 * PUBLIC: int __rep_update_setup __P((DB_ENV *, int, REP_CONTROL *, DBT *));
 */
int
__rep_update_setup(dbenv, eid, rp, rec)
      DB_ENV *dbenv;
      int eid;
      REP_CONTROL *rp;
      DBT *rec;
{
      DB_LOG *dblp;
      DB_LSN lsn;
      DB_REP *db_rep;
      DBT pagereq_dbt;
      LOG *lp;
      REGENV *renv;
      REGINFO *infop;
      REP *rep;
      __rep_update_args *rup;
      int ret;
      u_int32_t count, infolen;
      void *next;
#ifdef DIAGNOSTIC
      __rep_fileinfo_args *msgfp;
      DB_MSGBUF mb;
#endif

      db_rep = dbenv->rep_handle;
      rep = db_rep->region;
      dblp = dbenv->lg_handle;
      lp = dblp->reginfo.primary;
      ret = 0;

      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      if (!F_ISSET(rep, REP_F_RECOVER_UPDATE)) {
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (0);
      }
      F_CLR(rep, REP_F_RECOVER_UPDATE);
      /*
       * We know we're the first to come in here due to the
       * REP_F_RECOVER_UPDATE flag.  REP_F_READY should not be set.
       */
      DB_ASSERT(!F_ISSET(rep, REP_F_READY));
      F_SET(rep, REP_F_RECOVER_PAGE);
      /*
       * We do not clear REP_F_READY or rep->in_recovery in this code.
       * We'll eventually call the normal __rep_verify_match recovery
       * code and that will clear all the flags and allow others to
       * proceed.
       */
      __rep_lockout(dbenv, db_rep, rep, 1);
      /*
       * We need to update the timestamp and kill any open handles
       * on this client.  The files are changing completely.
       */
      infop = dbenv->reginfo;
      renv = infop->primary;
      (void)time(&renv->rep_timestamp);

      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      MUTEX_LOCK(dbenv, db_rep->db_mutexp);
      lp->wait_recs = rep->request_gap;
      lp->rcvd_recs = 0;
      ZERO_LSN(lp->ready_lsn);
      ZERO_LSN(lp->waiting_lsn);
      ZERO_LSN(lp->max_wait_lsn);
      ZERO_LSN(lp->max_perm_lsn);
      MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
      if ((ret = __rep_update_read(dbenv, rec->data, &next, &rup)) != 0)
            goto err;
      R_LOCK(dbenv, &dblp->reginfo);
      lsn = lp->lsn;
      R_UNLOCK(dbenv, &dblp->reginfo);

      /*
       * We need to empty out any old log records that might be in the
       * temp database.
       */
      if ((ret = __db_truncate(db_rep->rep_db, NULL, &count)) != 0)
            goto err;

      /*
       * If our log is before the master's beginning of log,
       * we need to request from the master's beginning.
       * If we have some log, we need the earlier of the
       * master's last checkpoint LSN or our current LSN.
       */
      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      if (log_compare(&lsn, &rup->first_lsn) < 0)
            rep->first_lsn = rup->first_lsn;
      else
            rep->first_lsn = lsn;
      rep->last_lsn = rp->lsn;
      rep->nfiles = rup->num_files;
      rep->curfile = 0;
      rep->ready_pg = 0;
      rep->npages = 0;
      rep->waiting_pg = PGNO_INVALID;
      rep->max_wait_pg = PGNO_INVALID;

      __os_free(dbenv, rup);

      RPRINT(dbenv, rep, (dbenv, &mb,
          "Update setup for %d files.", rep->nfiles));
      RPRINT(dbenv, rep, (dbenv, &mb, "Update setup:  First LSN [%lu][%lu].",
          (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset));
      RPRINT(dbenv, rep, (dbenv, &mb, "Update setup:  Last LSN [%lu][%lu]",
          (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));

      infolen = rec->size - sizeof(__rep_update_args);
      if ((ret = __os_calloc(dbenv, 1, infolen, &rep->originfo)) != 0)
            goto err;
      memcpy(rep->originfo, next, infolen);
      rep->finfo = rep->originfo;
      if ((ret = __rep_fileinfo_read(dbenv,
          rep->finfo, &next, &rep->curinfo)) != 0) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Update setup: Fileinfo read: %s", db_strerror(ret)));
            goto errmem1;
      }
      rep->nextinfo = next;

#ifdef DIAGNOSTIC
      msgfp = rep->curinfo;
      DB_ASSERT(msgfp->pgno == 0);
#endif

      /*
       * We want to create/open our dbp to the database
       * where we'll keep our page information.
       */
      if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Update setup: Client_dbinit %s", db_strerror(ret)));
            goto errmem;
      }

      /*
       * We should get file info 'ready to go' to avoid data copies.
       */
      memset(&pagereq_dbt, 0, sizeof(pagereq_dbt));
      pagereq_dbt.data = rep->finfo;
      pagereq_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
          (u_int8_t *)rep->finfo);

      RPRINT(dbenv, rep, (dbenv, &mb,
          "Update PAGE_REQ file 0: pgsize %lu, maxpg %lu",
          (u_long)rep->curinfo->pgsize,
          (u_long)rep->curinfo->max_pgno));
      /*
       * We set up pagereq_dbt as we went along.  Send it now.
       */
      (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
          NULL, &pagereq_dbt, 0);
      if (0) {
errmem:           __os_free(dbenv, rep->curinfo);
errmem1:    __os_free(dbenv, rep->originfo);
            rep->finfo = NULL;
            rep->curinfo = NULL;
            rep->originfo = NULL;
      }
err:
      /*
       * If we get an error, we cannot leave ourselves in the
       * RECOVER_PAGE state because we have no file information.
       * That also means undo'ing the rep_lockout.
       * We need to move back to the RECOVER_UPDATE stage.
       */
      if (ret != 0) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Update_setup: Error: Clear PAGE, set UPDATE again. %s",
                db_strerror(ret)));
            F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY);
            rep->in_recovery = 0;
            F_SET(rep, REP_F_RECOVER_UPDATE);
      }
      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      return (ret);
}

/*
 * __rep_page
 *    Process a page message.
 *
 * PUBLIC: int __rep_page __P((DB_ENV *, int, REP_CONTROL *, DBT *));
 */
int
__rep_page(dbenv, eid, rp, rec)
      DB_ENV *dbenv;
      int eid;
      REP_CONTROL *rp;
      DBT *rec;
{

      DB_REP *db_rep;
      DBT key, data;
      REP *rep;
      __rep_fileinfo_args *msgfp;
      db_recno_t recno;
      int ret;
      void *next;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
#endif

      ret = 0;
      db_rep = dbenv->rep_handle;
      rep = db_rep->region;

      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (0);
      }
      if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (ret);
      }
      RPRINT(dbenv, rep, (dbenv, &mb,
          "PAGE: Received page %lu from file %d",
          (u_long)msgfp->pgno, msgfp->filenum));
      /*
       * Check if this page is from the file we're expecting.
       * This may be an old or delayed page message.
       */
      /*
       * !!!
       * If we allow dbrename/dbremove on the master while a client
       * is updating, then we'd have to verify the file's uid here too.
       */
      if (msgfp->filenum != rep->curfile) {
            RPRINT(dbenv, rep,
                (dbenv, &mb, "Msg file %d != curfile %d",
                msgfp->filenum, rep->curfile));
            goto err;
      }
      /*
       * We want to create/open our dbp to the database
       * where we'll keep our page information.
       */
      if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0)
            goto err;

      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      memset(&key, 0, sizeof(key));
      memset(&data, 0, sizeof(data));
      recno = (db_recno_t)(msgfp->pgno + 1);
      key.data = &recno;
      key.ulen = key.size = sizeof(db_recno_t);
      key.flags = DB_DBT_USERMEM;

      /*
       * If we already have this page, then we don't want to bother
       * rewriting it into the file.  Otherwise, any other error
       * we want to return.
       */
      ret = __db_put(rep->file_dbp, NULL, &key, &data, DB_NOOVERWRITE);
      if (ret == DB_KEYEXIST) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "PAGE: Received duplicate page %lu from file %d",
                (u_long)msgfp->pgno, msgfp->filenum));
            rep->stat.st_pg_duplicated++;
            ret = 0;
            goto err_nolock;
      }
      if (ret != 0)
            goto err_nolock;

      RPRINT(dbenv, rep, (dbenv, &mb,
          "PAGE: Write page %lu into mpool", (u_long)msgfp->pgno));
      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      /*
       * We put the page in the database file itself.
       */
      ret = __rep_write_page(dbenv, rep, msgfp);
      if (ret != 0) {
            /*
             * We got an error storing the page, therefore, we need
             * remove this page marker from the page database too.
             * !!!
             * I'm ignoring errors from the delete because we want to
             * return the original error.  If we cannot write the page
             * and we cannot delete the item we just put, what should
             * we do?  Panic the env and return DB_RUNRECOVERY?
             */
            (void)__db_del(rep->file_dbp, NULL, &key, 0);
            goto err;
      }
      rep->stat.st_pg_records++;
      rep->npages++;

      /*
       * Now check the LSN on the page and save it if it is later
       * than the one we have.
       */
      if (log_compare(&rp->lsn, &rep->last_lsn) > 0)
            rep->last_lsn = rp->lsn;

      /*
       * We've successfully written the page.  Now we need to see if
       * we're done with this file.  __rep_filedone will check if we
       * have all the pages expected and if so, set up for the next
       * file and send out a page request for the next file's pages.
       */
      ret = __rep_filedone(dbenv, eid, rep, msgfp, rp->rectype);

err:
      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
err_nolock:
      __os_free(dbenv, msgfp);
      return (ret);
}

/*
 * __rep_page_fail
 *    Process a page fail message.
 *
 * PUBLIC: int __rep_page_fail __P((DB_ENV *, int, DBT *));
 */
int
__rep_page_fail(dbenv, eid, rec)
      DB_ENV *dbenv;
      int eid;
      DBT *rec;
{

      DB_REP *db_rep;
      REP *rep;
      __rep_fileinfo_args *msgfp, *rfp;
      int ret;
      void *next;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
#endif

      ret = 0;
      db_rep = dbenv->rep_handle;
      rep = db_rep->region;

      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (0);
      }
      if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (ret);
      }
      /*
       * Check if this page is from the file we're expecting.
       * This may be an old or delayed page message.
       */
      /*
       * !!!
       * If we allow dbrename/dbremove on the master while a client
       * is updating, then we'd have to verify the file's uid here too.
       */
      if (msgfp->filenum != rep->curfile) {
            RPRINT(dbenv, rep, (dbenv, &mb, "Msg file %d != curfile %d",
                msgfp->filenum, rep->curfile));
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            return (0);
      }
      rfp = rep->curinfo;
      if (rfp->type != DB_QUEUE)
            --rfp->max_pgno;
      else {
            /*
             * Queue is special.  Pages at the beginning of the queue
             * may disappear, as well as at the end.  Use msgfp->pgno
             * to adjust accordingly.
             */
            RPRINT(dbenv, rep, (dbenv, &mb,
          "page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d",
                (u_long)msgfp->pgno, (u_long)rep->ready_pg,
                (u_long)rfp->max_pgno, rep->npages));
            if (msgfp->pgno == rfp->max_pgno)
                  --rfp->max_pgno;
            if (msgfp->pgno >= rep->ready_pg) {
                  rep->ready_pg = msgfp->pgno + 1;
                  rep->npages = rep->ready_pg;
            }
            RPRINT(dbenv, rep, (dbenv, &mb,
          "page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d",
                (u_long)msgfp->pgno, (u_long)rep->ready_pg,
                (u_long)rfp->max_pgno, rep->npages));
      }
      /*
       * We've lowered the number of pages expected.  It is possible that
       * this was the last page we were expecting.  Now we need to see if
       * we're done with this file.  __rep_filedone will check if we have
       * all the pages expected and if so, set up for the next file and
       * send out a page request for the next file's pages.
       */
      ret = __rep_filedone(dbenv, eid, rep, msgfp, REP_PAGE_FAIL);
      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      return (ret);
}

/*
 * __rep_write_page -
 *    Write this page into a database.
 */
static int
__rep_write_page(dbenv, rep, msgfp)
      DB_ENV *dbenv;
      REP *rep;
      __rep_fileinfo_args *msgfp;
{
      __rep_fileinfo_args *rfp;
      DB_FH *rfh;
      int ret;
      void *dst;
      char *real_name;

      real_name = NULL;

      /*
       * If this is the first page we're putting in this database, we need
       * to create the mpool file.  Otherwise call memp_fget to create the
       * page in mpool.  Then copy the data to the page, and memp_fput the
       * page to give it back to mpool.
       *
       * We need to create the file, removing any existing file and associate
       * the correct file ID with the new one.
       */
      if (rep->file_mpf == NULL) {
            rfp = rep->curinfo;

            if (!F_ISSET(rfp, DB_AM_INMEM)) {
                  if ((ret = __db_appname(dbenv, DB_APP_DATA,
                      rfp->info.data, 0, NULL, &real_name)) != 0)
                        goto err;
                  /*
                   * Calling memp_nameop will both purge any matching
                   * fileid from mpool and unlink it on disk.
                   */
                  if ((ret = __memp_nameop(dbenv,
                      rfp->uid.data, NULL, real_name, NULL)) != 0)
                        goto err;
                  /*
                   * Create the file on disk.  We'll be putting the data
                   * into the file via mpool.
                   */
                  if ((ret = __os_open(dbenv, real_name,
                      DB_OSO_CREATE, dbenv->db_mode, &rfh)) == 0)
                        ret = __os_closehandle(dbenv, rfh);
                  if (ret != 0)
                        goto err;
            }

            if ((ret =
                __rep_mpf_open(dbenv, &rep->file_mpf, rep->curinfo)) != 0)
                  goto err;
      }
      /*
       * Handle queue specially.  If we're a QUEUE database, we need to
       * use the __qam_fget/put calls.  We need to use rep->queue_dbp for
       * that.  That dbp is opened after getting the metapage for the
       * queue database.  Since the meta-page is always in the queue file,
       * we'll use the normal path for that first page.  After that we
       * can assume the dbp is opened.
       */
      if (msgfp->type == DB_QUEUE && msgfp->pgno != 0) {
#ifdef HAVE_QUEUE
            if ((ret = __qam_fget(
                rep->queue_dbp, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
#else
            if ((ret = __db_no_queue_am(dbenv)) != 0)
#endif
                  goto err;
      } else if ((ret = __memp_fget(
                rep->file_mpf, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
                  goto err;

      memcpy(dst, msgfp->info.data, msgfp->pgsize);
      if (msgfp->type != DB_QUEUE || msgfp->pgno == 0)
            ret = __memp_fput(rep->file_mpf, dst, DB_MPOOL_DIRTY);
#ifdef HAVE_QUEUE
      else
            ret = __qam_fput(rep->queue_dbp, msgfp->pgno, dst,
                DB_MPOOL_DIRTY);
#endif

err:  if (real_name != NULL)
             __os_free(dbenv, real_name);
      return (ret);
}

/*
 * __rep_page_gap -
 *    After we've put the page into the database, we need to check if
 *    we have a page gap and whether we need to request pages.
 */
static int
__rep_page_gap(dbenv, rep, msgfp, type)
      DB_ENV *dbenv;
      REP *rep;
      __rep_fileinfo_args *msgfp;
      u_int32_t type;
{
      DB_LOG *dblp;
      DB_REP *db_rep;
      DBT data, key;
      LOG *lp;
      __rep_fileinfo_args *rfp;
      db_recno_t recno;
      int ret;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
#endif

      db_rep = dbenv->rep_handle;
      ret = 0;
      dblp = dbenv->lg_handle;
      lp = dblp->reginfo.primary;

      /*
       * We've successfully put this page into our file.
       * Now we need to account for it and re-request new pages
       * if necessary.
       */
      /*
       * We already hold the rep mutex, but we also need the db mutex.
       * So we need to drop it, acquire both in the right order and
       * then recheck the state of the world.
       */
      MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
      MUTEX_LOCK(dbenv, db_rep->db_mutexp);
      MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
      rfp = rep->curinfo;

      /*
       * Make sure we're still talking about the same file.
       * If not, we're done here.
       */
      if (rfp->filenum != msgfp->filenum) {
            ret = DB_REP_PAGEDONE;
            goto err;
      }

      /*
       * We have 3 possible states:
       * 1.  We receive a page we already have.
       *    msg pgno < ready pgno
       * 2.  We receive a page that is beyond a gap.
       *    msg pgno > ready pgno
       * 3.  We receive the page we're expecting.
       *    msg pgno == ready pgno
       */
      /*
       * State 1.  This should not happen because this function
       * should only be called once per page received because we
       * check for DB_KEY_EXIST when we save the page information.
       */
      DB_ASSERT(msgfp->pgno >= rep->ready_pg);

      /*
       * State 2.  This page is beyond the page we're expecting.
       * We need to update waiting_pg if this page is less than
       * (earlier) the current waiting_pg.  There is nothing
       * to do but see if we need to request.
       */
      RPRINT(dbenv, rep, (dbenv, &mb,
    "PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu",
          (u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg,
          (u_long)rep->waiting_pg, (u_long)rep->max_wait_pg));
      if (msgfp->pgno > rep->ready_pg) {
            if (rep->waiting_pg == PGNO_INVALID ||
                msgfp->pgno < rep->waiting_pg)
                  rep->waiting_pg = msgfp->pgno;
      } else {
            /*
             * We received the page we're expecting.
             */
            rep->ready_pg++;
            lp->rcvd_recs = 0;
            while (ret == 0 && rep->ready_pg == rep->waiting_pg) {
                  /*
                   * If we get here we know we just filled a gap.
                   */
                  lp->wait_recs = 0;
                  lp->rcvd_recs = 0;
                  rep->max_wait_pg = PGNO_INVALID;
                  /*
                   * We need to walk the recno database looking for the
                   * next page we need or expect.
                   */
                  memset(&key, 0, sizeof(key));
                  memset(&data, 0, sizeof(data));
                  recno = (db_recno_t)rep->ready_pg;
                  key.data = &recno;
                  key.ulen = key.size = sizeof(db_recno_t);
                  key.flags = DB_DBT_USERMEM;
                  ret = __db_get(rep->file_dbp, NULL, &key, &data, 0);
                  if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY)
                        break;
                  else if (ret != 0)
                        goto err;
                  rep->ready_pg++;
            }
      }

      /*
       * If we filled a gap and now have the entire file, there's
       * nothing to do.  We're done when ready_pg is > max_pgno
       * because ready_pg is larger than the last page we received.
       */
      if (rep->ready_pg > rfp->max_pgno)
            goto err;

      /*
       * Check if we need to ask for more pages.
       */
      if ((rep->waiting_pg != PGNO_INVALID &&
          rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) {
            /*
             * We got a page but we may still be waiting for more.
             */
            if (lp->wait_recs == 0) {
                  /*
                   * This is a new gap. Initialize the number of
                   * records that we should wait before requesting
                   * that it be resent.  We grab the limits out of
                   * the rep without the mutex.
                   */
                  lp->wait_recs = rep->request_gap;
                  lp->rcvd_recs = 0;
                  rep->max_wait_pg = PGNO_INVALID;
            }
            /*
             * If we got REP_PAGE_MORE we always want to ask for more.
             */
            if ((__rep_check_doreq(dbenv, rep) || type == REP_PAGE_MORE) &&
                ((ret = __rep_pggap_req(dbenv, rep, rfp,
                type == REP_PAGE_MORE)) != 0))
                  goto err;
      } else {
            lp->wait_recs = 0;
            rep->max_wait_pg = PGNO_INVALID;
      }

err:
      MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
      return (ret);
}

/*
 * __rep_filedone -
 *    We need to check if we're done with the current file after
 *    processing the current page.  Stat the database to see if
 *    we have all the pages.  If so, we need to clean up/close
 *    this one, set up for the next one, and ask for its pages,
 *    or if this is the last file, request the log records and
 *    move to the REP_RECOVER_LOG state.
 */
static int
__rep_filedone(dbenv, eid, rep, msgfp, type)
      DB_ENV *dbenv;
      int eid;
      REP *rep;
      __rep_fileinfo_args *msgfp;
      u_int32_t type;
{
      DBT dbt;
      DB_REP *db_rep;
      __rep_fileinfo_args *rfp;
      int ret;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
#endif

      db_rep = dbenv->rep_handle;
      /*
       * We've put our page, now we need to do any gap processing
       * that might be needed to re-request pages.
       */
      ret = __rep_page_gap(dbenv, rep, msgfp, type);
      /*
       * The world changed while we were doing gap processing.
       * We're done here.
       */
      if (ret == DB_REP_PAGEDONE)
            return (0);

      rfp = rep->curinfo;
      /*
       * max_pgno is 0-based and npages is 1-based, so we don't have
       * all the pages until npages is > max_pgno.
       */
      RPRINT(dbenv, rep, (dbenv, &mb, "FILEDONE: have %lu pages. Need %lu.",
          (u_long)rep->npages, (u_long)rfp->max_pgno + 1));
      if (rep->npages <= rfp->max_pgno)
            return (0);

      /*
       * If we're queue and we think we have all the pages for this file,
       * we need to do special queue processing.  Queue is handled in
       * several stages.
       */
      if (rfp->type == DB_QUEUE &&
          ((ret = __rep_queue_filedone(dbenv, rep, rfp)) !=
          DB_REP_PAGEDONE))
            return (ret);
      /*
       * We have all the pages for this file.  We need to:
       * 1.  Close up the file data pointer we used.
       * 2.  Close/reset the page database.
       * 3.  Check if we have all file data.  If so, request logs.
       * 4.  If not, set curfile to next file and request its pages.
       */
      /*
       * 1.  Close up the file data pointer we used.
       */
      if (rep->file_mpf != NULL) {
            ret = __memp_fclose(rep->file_mpf, 0);
            rep->file_mpf = NULL;
            if (ret != 0)
                  goto err;
      }

      /*
       * 2.  Close/reset the page database.
       */
      ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC);
      rep->file_dbp = NULL;
      if (ret != 0)
            goto err;

      /*
       * 3.  Check if we have all file data.  If so, request logs.
       */
      __os_free(dbenv, rep->curinfo);
      if (++rep->curfile == rep->nfiles) {
            RPRINT(dbenv, rep, (dbenv, &mb,
                "FILEDONE: have %d files.  RECOVER_LOG now", rep->nfiles));
            /*
             * Move to REP_RECOVER_LOG state.
             * Request logs.
             */
            __os_free(dbenv, rep->originfo);
            /*
             * We need to do a sync here so that any later opens
             * can find the file and file id.  We need to do it
             * before we clear REP_F_RECOVER_PAGE so that we do not
             * try to flush the log.
             */
            if ((ret = __memp_sync(dbenv, NULL)) != 0)
                  goto err;
            F_CLR(rep, REP_F_RECOVER_PAGE);
            F_SET(rep, REP_F_RECOVER_LOG);
            memset(&dbt, 0, sizeof(dbt));
            dbt.data = &rep->last_lsn;
            dbt.size = sizeof(rep->last_lsn);
            RPRINT(dbenv, rep, (dbenv, &mb,
                "FILEDONE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]",
                (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset,
                (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
            MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
            if ((ret = __rep_log_setup(dbenv, rep)) != 0)
                  goto err;
            (void)__rep_send_message(dbenv, eid,
                REP_LOG_REQ, &rep->first_lsn, &dbt, 0);
            MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
            return (0);
      }

      /*
       * 4.  If not, set curinfo to next file and request its pages.
       */
      rep->finfo = rep->nextinfo;
      if ((ret = __rep_fileinfo_read(dbenv, rep->finfo, &rep->nextinfo,
          &rep->curinfo)) != 0)
            goto err;
      DB_ASSERT(rep->curinfo->pgno == 0);
      rep->ready_pg = 0;
      rep->npages = 0;
      rep->waiting_pg = PGNO_INVALID;
      rep->max_wait_pg = PGNO_INVALID;
      memset(&dbt, 0, sizeof(dbt));
      RPRINT(dbenv, rep, (dbenv, &mb,
          "FILEDONE: Next file %d.  Request pages 0 to %lu",
          rep->curinfo->filenum, (u_long)rep->curinfo->max_pgno));
      dbt.data = rep->finfo;
      dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
          (u_int8_t *)rep->finfo);
      (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
          NULL, &dbt, 0);
err:
      return (ret);
}

/*
 * __rep_mpf_open -
 *    Create and open the mpool file for a database.
 *    Used by both master and client to bring files into mpool.
 */
static int
__rep_mpf_open(dbenv, mpfp, rfp)
      DB_ENV *dbenv;
      DB_MPOOLFILE **mpfp;
      __rep_fileinfo_args *rfp;
{
      DB db;
      int ret;

      if ((ret = __memp_fcreate(dbenv, mpfp)) != 0)
            return (ret);

      /*
       * We need a dbp to pass into to __db_dbenv_mpool.  Set up
       * only the parts that it needs.
       */
      db.type = rfp->type;
      db.pgsize = (u_int32_t)rfp->pgsize;
      memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN);
      db.flags = rfp->flags;
      db.mpf = *mpfp;
      db.dbenv = dbenv;
      if ((ret = __db_dbenv_mpool(&db, rfp->info.data, 0)) != 0) {
            (void)__memp_fclose(*mpfp, 0);
            *mpfp = NULL;
      }
      return (ret);
}

/*
 * __rep_pggap_req -
 *    Request a page gap.  Assumes the caller holds the rep_mutexp.
 *
 * PUBLIC: int __rep_pggap_req __P((DB_ENV *, REP *, __rep_fileinfo_args *,
 * PUBLIC:    int));
 */
int
__rep_pggap_req(dbenv, rep, reqfp, moregap)
      DB_ENV *dbenv;
      REP *rep;
      __rep_fileinfo_args *reqfp;
      int moregap;
{
      DBT max_pg_dbt;
      __rep_fileinfo_args *tmpfp;
      size_t len;
      int alloc, ret;

      ret = 0;
      alloc = 0;
      /*
       * There is a window where we have to set REP_RECOVER_PAGE when
       * we receive the update information to transition from getting
       * file information to getting page information.  However, that
       * thread does release and then reacquire mutexes.  So, we might
       * try re-requesting before the original thread can get curinfo
       * setup.  If curinfo isn't set up there is nothing to do.
       */
      if (rep->curinfo == NULL)
            return (0);
      if (reqfp == NULL) {
            if ((ret = __rep_finfo_alloc(dbenv, rep->curinfo, &tmpfp)) != 0)
                  return (ret);
            alloc = 1;
      } else
            tmpfp = reqfp;

      /*
       * If we've never requested this page, then
       * request everything between it and the first
       * page we have.  If we have requested this page
       * then only request this record, not the entire gap.
       */
      memset(&max_pg_dbt, 0, sizeof(max_pg_dbt));
      tmpfp->pgno = rep->ready_pg;
      max_pg_dbt.data = rep->finfo;
      max_pg_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
          (u_int8_t *)rep->finfo);
      if (rep->max_wait_pg == PGNO_INVALID || moregap) {
            /*
             * Request the gap - set max to waiting_pg - 1 or if
             * there is no waiting_pg, just ask for one.
             */
            if (rep->waiting_pg == PGNO_INVALID) {
                  if (moregap)
                        rep->max_wait_pg = rep->curinfo->max_pgno;
                  else
                        rep->max_wait_pg = rep->ready_pg;
            } else
                  rep->max_wait_pg = rep->waiting_pg - 1;
            tmpfp->max_pgno = rep->max_wait_pg;
      } else {
            /*
             * Request 1 page - set max to ready_pg.
             */
            rep->max_wait_pg = rep->ready_pg;
            tmpfp->max_pgno = rep->ready_pg;
      }
      if (rep->master_id != DB_EID_INVALID) {
            rep->stat.st_pg_requested++;
            /*
             * We need to request the pages, but we need to get the
             * new info into rep->finfo.  Assert that the sizes never
             * change.  The only thing this should do is change
             * the pgno field.  Everything else remains the same.
             */
            ret = __rep_fileinfo_buf(rep->finfo, max_pg_dbt.size, &len,
                tmpfp->pgsize, tmpfp->pgno, tmpfp->max_pgno,
                tmpfp->filenum, tmpfp->id, tmpfp->type,
                tmpfp->flags, &tmpfp->uid, &tmpfp->info);
            DB_ASSERT(len == max_pg_dbt.size);
            (void)__rep_send_message(dbenv, rep->master_id,
                REP_PAGE_REQ, NULL, &max_pg_dbt, 0);
      } else
            (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
                REP_MASTER_REQ, NULL, NULL, 0);

      if (alloc)
            __os_free(dbenv, tmpfp);
      return (ret);
}

/*
 * __rep_loggap_req -
 *    Request a log gap.  Assumes the caller holds the db_mutexp.
 *
 * PUBLIC: void __rep_loggap_req __P((DB_ENV *, REP *, DB_LSN *, int));
 */
void
__rep_loggap_req(dbenv, rep, lsnp, moregap)
      DB_ENV *dbenv;
      REP *rep;
      DB_LSN *lsnp;
      int moregap;
{
      DB_LOG *dblp;
      DBT max_lsn_dbt, *max_lsn_dbtp;
      DB_LSN next_lsn;
      LOG *lp;

      dblp = dbenv->lg_handle;
      lp = dblp->reginfo.primary;
      R_LOCK(dbenv, &dblp->reginfo);
      next_lsn = lp->lsn;
      R_UNLOCK(dbenv, &dblp->reginfo);

      if (moregap ||
          (lsnp != NULL &&
          (log_compare(lsnp, &lp->max_wait_lsn) == 0 ||
          IS_ZERO_LSN(lp->max_wait_lsn)))) {
            /*
             * We need to ask for the gap.  Either we never asked
             * for records before, or we asked for a single record
             * and received it.
             */
            lp->max_wait_lsn = lp->waiting_lsn;
            memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt));
            max_lsn_dbt.data = &lp->waiting_lsn;
            max_lsn_dbt.size = sizeof(lp->waiting_lsn);
            max_lsn_dbtp = &max_lsn_dbt;
      } else {
            max_lsn_dbtp = NULL;
            lp->max_wait_lsn = next_lsn;
      }
      if (rep->master_id != DB_EID_INVALID) {
            rep->stat.st_log_requested++;
            (void)__rep_send_message(dbenv, rep->master_id,
                REP_LOG_REQ, &next_lsn, max_lsn_dbtp, 0);
      } else
            (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
                REP_MASTER_REQ, NULL, NULL, 0);

      return;
}

/*
 * __rep_finfo_alloc -
 *    Allocate and initialize a fileinfo structure.
 *
 * PUBLIC: int __rep_finfo_alloc __P((DB_ENV *, __rep_fileinfo_args *,
 * PUBLIC:    __rep_fileinfo_args **));
 */
int
__rep_finfo_alloc(dbenv, rfpsrc, rfpp)
      DB_ENV *dbenv;
      __rep_fileinfo_args *rfpsrc, **rfpp;
{
      size_t size;
      int ret;

      size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size +
          rfpsrc->info.size;
      if ((ret = __os_malloc(dbenv, size, rfpp)) != 0)
            return (ret);

      memcpy(*rfpp, rfpsrc, size);
      return (ret);
}

/*
 * __rep_log_setup -
 *    We know our first LSN and need to reset the log subsystem
 *    to get our logs set up for the proper file.
 */
static int
__rep_log_setup(dbenv, rep)
      DB_ENV *dbenv;
      REP *rep;
{
      DB_LOG *dblp;
      DB_LSN lsn;
      u_int32_t fnum;
      int ret;
      char *name;

      dblp = dbenv->lg_handle;
      /*
       * Set up the log starting at the file number of the first LSN we
       * need to get from the master.
       */
      if ((ret = __log_newfile(dblp, &lsn, rep->first_lsn.file)) == 0) {
            /*
             * We do know we want to start this client's log at
             * log file 'first_lsn.file'.  So we want to forcibly
             * remove any log files earlier than that number.
             * We don't know what might be in any earlier log files
             * so we cannot just use __log_autoremove.
             */
            for (fnum = 1; fnum < rep->first_lsn.file; fnum++) {
                  if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0)
                        goto err;
                  (void)__os_unlink(dbenv, name);
                  __os_free(dbenv, name);
            }
      }
err:
      return (ret);
}

/*
 * __rep_queue_filedone -
 *    Determine if we're really done getting the pages for a queue file.
 *    Queue is handled in several steps.
 *    1.  First we get the meta page only.
 *    2.  We use the meta-page information to figure out first and last
 *        page numbers (and if queue wraps, first can be > last.
 *    3.  If first < last, we do a REP_PAGE_REQ for all pages.
 *    4.  If first > last, we REP_PAGE_REQ from first -> max page number.
 *        Then we'll ask for page 1 -> last.
 *
 * This function can return several things:
 *    DB_REP_PAGEDONE - if we're done with this file.
 *    0 - if we're not doen with this file.
 *    error - if we get an error doing some operations.
 *
 * This function will open a dbp handle to the queue file.  This is needed
 * by most of the QAM macros.  We'll open it on the first pass through
 * here and we'll close it whenever we decide we're done.
 */
static int
__rep_queue_filedone(dbenv, rep, rfp)
      DB_ENV *dbenv;
      REP *rep;
      __rep_fileinfo_args *rfp;
{
#ifndef HAVE_QUEUE
      COMPQUIET(rep, NULL);
      COMPQUIET(rfp, NULL);
      return (__db_no_queue_am(dbenv));
#else
      db_pgno_t first, last;
      u_int32_t flags;
      int empty, ret, t_ret;
#ifdef DIAGNOSTIC
      DB_MSGBUF mb;
#endif

      ret = 0;
      if (rep->queue_dbp == NULL) {
            /*
             * We need to do a sync here so that the open
             * can find the file and file id.
             */
            if ((ret = __memp_sync(dbenv, NULL)) != 0)
                  goto out;
            if ((ret = db_create(&rep->queue_dbp, dbenv,
                DB_REP_CREATE)) != 0)
                  goto out;
            flags = DB_NO_AUTO_COMMIT |
                (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);
            if ((ret = __db_open(rep->queue_dbp, NULL, rfp->info.data,
                NULL, DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0)
                  goto out;
      }
      if ((ret = __queue_pageinfo(rep->queue_dbp,
          &first, &last, &empty, 0, 0)) != 0)
            goto out;
      RPRINT(dbenv, rep, (dbenv, &mb,
          "Queue fileinfo: first %lu, last %lu, empty %d",
          (u_long)first, (u_long)last, empty));
      /*
       * We can be at the end of 3 possible states.
       * 1.  We have received the meta-page and now need to get the
       *     rest of the pages in the database.
       * 2.  We have received from first -> max_pgno.  We might be done,
       *     or we might need to ask for wrapped pages.
       * 3.  We have received all pages in the file.  We're done.
       */
      if (rfp->max_pgno == 0) {
            /*
             * We have just received the meta page.  Set up the next
             * pages to ask for and check if the file is empty.
             */
            if (empty)
                  goto out;
            if (first > last) {
                  rfp->max_pgno =
                      QAM_RECNO_PAGE(rep->queue_dbp, UINT32_MAX);
            } else
                  rfp->max_pgno = last;
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Queue fileinfo: First req: first %lu, last %lu",
                (u_long)first, (u_long)rfp->max_pgno));
            goto req;
      } else if (rfp->max_pgno != last) {
            /*
             * If max_pgno != last that means we're dealing with a
             * wrapped situation.  Request next batch of pages.
             * Set npages to 1 because we already have page 0, the
             * meta-page, now we need pages 1-max_pgno.
             */
            first = 1;
            rfp->max_pgno = last;
            RPRINT(dbenv, rep, (dbenv, &mb,
                "Queue fileinfo: Wrap req: first %lu, last %lu",
                (u_long)first, (u_long)last));
req:
            /*
             * Since we're simulating a "gap" to resend new PAGE_REQ
             * for this file, we need to set waiting page to last + 1
             * so that we'll ask for all from ready_pg -> last.
             */
            rep->npages = first;
            rep->ready_pg = first;
            rep->waiting_pg = rfp->max_pgno + 1;
            rep->max_wait_pg = PGNO_INVALID;
            ret = __rep_pggap_req(dbenv, rep, rfp, 0);
            return (ret);
      }
      /*
       * max_pgno == last
       * If we get here, we have all the pages we need.
       * Close the dbp and return.
       */
out:
      if (rep->queue_dbp != NULL &&
          (t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC)) != 0 &&
          ret == 0)
            ret = t_ret;
      rep->queue_dbp = NULL;
      if (ret == 0)
            ret = DB_REP_PAGEDONE;
      return (ret);
#endif
}

Generated by  Doxygen 1.6.0   Back to index