123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- /******************************************************************************
- *******************************************************************************
- **
- ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
- ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
- **
- ** This copyrighted material is made available to anyone wishing to use,
- ** modify, copy, or redistribute it subject to the terms and conditions
- ** of the GNU General Public License v.2.
- **
- *******************************************************************************
- ******************************************************************************/
- #include "dlm_internal.h"
- #include "lockspace.h"
- #include "member.h"
- #include "lowcomms.h"
- #include "midcomms.h"
- #include "rcom.h"
- #include "recover.h"
- #include "dir.h"
- #include "config.h"
- #include "memory.h"
- #include "lock.h"
- #include "util.h"
- #include "member.h"
- static int rcom_response(struct dlm_ls *ls)
- {
- return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
- }
- static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
- struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- char *mb;
- int mb_len = sizeof(struct dlm_rcom) + len;
- mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
- if (!mh) {
- log_print("create_rcom to %d type %d len %d ENOBUFS",
- to_nodeid, type, len);
- return -ENOBUFS;
- }
- memset(mb, 0, mb_len);
- rc = (struct dlm_rcom *) mb;
- rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
- rc->rc_header.h_lockspace = ls->ls_global_id;
- rc->rc_header.h_nodeid = dlm_our_nodeid();
- rc->rc_header.h_length = mb_len;
- rc->rc_header.h_cmd = DLM_RCOM;
- rc->rc_type = type;
- spin_lock(&ls->ls_recover_lock);
- rc->rc_seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
- *mh_ret = mh;
- *rc_ret = rc;
- return 0;
- }
- static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
- struct dlm_rcom *rc)
- {
- dlm_rcom_out(rc);
- dlm_lowcomms_commit_buffer(mh);
- }
- static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
- uint32_t flags)
- {
- rs->rs_flags = cpu_to_le32(flags);
- }
- /* When replying to a status request, a node also sends back its
- configuration values. The requesting node then checks that the remote
- node is configured the same way as itself. */
- static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
- uint32_t num_slots)
- {
- rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
- rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
- rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
- rf->rf_num_slots = cpu_to_le16(num_slots);
- rf->rf_generation = cpu_to_le32(ls->ls_generation);
- }
- static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
- {
- struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
- if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
- log_error(ls, "version mismatch: %x nodeid %d: %x",
- DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
- rc->rc_header.h_version);
- return -EPROTO;
- }
- if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
- le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
- log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
- ls->ls_lvblen, ls->ls_exflags, nodeid,
- le32_to_cpu(rf->rf_lvblen),
- le32_to_cpu(rf->rf_lsflags));
- return -EPROTO;
- }
- return 0;
- }
- static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
- {
- spin_lock(&ls->ls_rcom_spin);
- *new_seq = ++ls->ls_rcom_seq;
- set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
- }
- static void disallow_sync_reply(struct dlm_ls *ls)
- {
- spin_lock(&ls->ls_rcom_spin);
- clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
- clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
- spin_unlock(&ls->ls_rcom_spin);
- }
- /*
- * low nodeid gathers one slot value at a time from each node.
- * it sets need_slots=0, and saves rf_our_slot returned from each
- * rcom_config.
- *
- * other nodes gather all slot values at once from the low nodeid.
- * they set need_slots=1, and ignore the rf_our_slot returned from each
- * rcom_config. they use the rf_num_slots returned from the low
- * node's rcom_config.
- */
- int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- int error = 0;
- ls->ls_recover_nodeid = nodeid;
- if (nodeid == dlm_our_nodeid()) {
- rc = ls->ls_recover_buf;
- rc->rc_result = dlm_recover_status(ls);
- goto out;
- }
- error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
- sizeof(struct rcom_status), &rc, &mh);
- if (error)
- goto out;
- set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
- allow_sync_reply(ls, &rc->rc_id);
- memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
- send_rcom(ls, mh, rc);
- error = dlm_wait_function(ls, &rcom_response);
- disallow_sync_reply(ls);
- if (error)
- goto out;
- rc = ls->ls_recover_buf;
- if (rc->rc_result == -ESRCH) {
- /* we pretend the remote lockspace exists with 0 status */
- log_debug(ls, "remote node %d not ready", nodeid);
- rc->rc_result = 0;
- error = 0;
- } else {
- error = check_rcom_config(ls, rc, nodeid);
- }
- /* the caller looks at rc_result for the remote recovery status */
- out:
- return error;
- }
- static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- struct rcom_status *rs;
- uint32_t status;
- int nodeid = rc_in->rc_header.h_nodeid;
- int len = sizeof(struct rcom_config);
- int num_slots = 0;
- int error;
- if (!dlm_slots_version(&rc_in->rc_header)) {
- status = dlm_recover_status(ls);
- goto do_create;
- }
- rs = (struct rcom_status *)rc_in->rc_buf;
- if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
- status = dlm_recover_status(ls);
- goto do_create;
- }
- spin_lock(&ls->ls_recover_lock);
- status = ls->ls_recover_status;
- num_slots = ls->ls_num_slots;
- spin_unlock(&ls->ls_recover_lock);
- len += num_slots * sizeof(struct rcom_slot);
- do_create:
- error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
- len, &rc, &mh);
- if (error)
- return;
- rc->rc_id = rc_in->rc_id;
- rc->rc_seq_reply = rc_in->rc_seq;
- rc->rc_result = status;
- set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
- if (!num_slots)
- goto do_send;
- spin_lock(&ls->ls_recover_lock);
- if (ls->ls_num_slots != num_slots) {
- spin_unlock(&ls->ls_recover_lock);
- log_debug(ls, "receive_rcom_status num_slots %d to %d",
- num_slots, ls->ls_num_slots);
- rc->rc_result = 0;
- set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
- goto do_send;
- }
- dlm_slots_copy_out(ls, rc);
- spin_unlock(&ls->ls_recover_lock);
- do_send:
- send_rcom(ls, mh, rc);
- }
- static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- spin_lock(&ls->ls_rcom_spin);
- if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
- rc_in->rc_id != ls->ls_rcom_seq) {
- log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
- rc_in->rc_type, rc_in->rc_header.h_nodeid,
- (unsigned long long)rc_in->rc_id,
- (unsigned long long)ls->ls_rcom_seq);
- goto out;
- }
- memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
- set_bit(LSFL_RCOM_READY, &ls->ls_flags);
- clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
- wake_up(&ls->ls_wait_general);
- out:
- spin_unlock(&ls->ls_rcom_spin);
- }
- int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- int error = 0;
- int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
- ls->ls_recover_nodeid = nodeid;
- if (nodeid == dlm_our_nodeid()) {
- ls->ls_recover_buf->rc_header.h_length =
- dlm_config.ci_buffer_size;
- dlm_copy_master_names(ls, last_name, last_len,
- ls->ls_recover_buf->rc_buf,
- max_size, nodeid);
- goto out;
- }
- error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
- if (error)
- goto out;
- memcpy(rc->rc_buf, last_name, last_len);
- allow_sync_reply(ls, &rc->rc_id);
- memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
- send_rcom(ls, mh, rc);
- error = dlm_wait_function(ls, &rcom_response);
- disallow_sync_reply(ls);
- out:
- return error;
- }
- static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- int error, inlen, outlen, nodeid;
- nodeid = rc_in->rc_header.h_nodeid;
- inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
- outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
- error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
- if (error)
- return;
- rc->rc_id = rc_in->rc_id;
- rc->rc_seq_reply = rc_in->rc_seq;
- dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
- nodeid);
- send_rcom(ls, mh, rc);
- }
- int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- struct dlm_ls *ls = r->res_ls;
- int error;
- error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
- &rc, &mh);
- if (error)
- goto out;
- memcpy(rc->rc_buf, r->res_name, r->res_length);
- rc->rc_id = (unsigned long) r;
- send_rcom(ls, mh, rc);
- out:
- return error;
- }
- static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
- int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
- error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
- if (error)
- return;
- error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
- if (error)
- ret_nodeid = error;
- rc->rc_result = ret_nodeid;
- rc->rc_id = rc_in->rc_id;
- rc->rc_seq_reply = rc_in->rc_seq;
- send_rcom(ls, mh, rc);
- }
- static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- dlm_recover_master_reply(ls, rc_in);
- }
- static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct rcom_lock *rl)
- {
- memset(rl, 0, sizeof(*rl));
- rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
- rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
- rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
- rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
- rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
- rl->rl_rqmode = lkb->lkb_rqmode;
- rl->rl_grmode = lkb->lkb_grmode;
- rl->rl_status = lkb->lkb_status;
- rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
- if (lkb->lkb_bastfn)
- rl->rl_asts |= DLM_CB_BAST;
- if (lkb->lkb_astfn)
- rl->rl_asts |= DLM_CB_CAST;
- rl->rl_namelen = cpu_to_le16(r->res_length);
- memcpy(rl->rl_name, r->res_name, r->res_length);
- /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
- If so, receive_rcom_lock_args() won't take this copy. */
- if (lkb->lkb_lvbptr)
- memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
- }
- int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
- {
- struct dlm_ls *ls = r->res_ls;
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- struct rcom_lock *rl;
- int error, len = sizeof(struct rcom_lock);
- if (lkb->lkb_lvbptr)
- len += ls->ls_lvblen;
- error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
- if (error)
- goto out;
- rl = (struct rcom_lock *) rc->rc_buf;
- pack_rcom_lock(r, lkb, rl);
- rc->rc_id = (unsigned long) r;
- send_rcom(ls, mh, rc);
- out:
- return error;
- }
- /* needs at least dlm_rcom + rcom_lock */
- static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
- {
- struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- int error, nodeid = rc_in->rc_header.h_nodeid;
- dlm_recover_master_copy(ls, rc_in);
- error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
- sizeof(struct rcom_lock), &rc, &mh);
- if (error)
- return;
- /* We send back the same rcom_lock struct we received, but
- dlm_recover_master_copy() has filled in rl_remid and rl_result */
- memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
- rc->rc_id = rc_in->rc_id;
- rc->rc_seq_reply = rc_in->rc_seq;
- send_rcom(ls, mh, rc);
- }
- /* If the lockspace doesn't exist then still send a status message
- back; it's possible that it just doesn't have its global_id yet. */
- int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
- {
- struct dlm_rcom *rc;
- struct rcom_config *rf;
- struct dlm_mhandle *mh;
- char *mb;
- int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
- mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
- if (!mh)
- return -ENOBUFS;
- memset(mb, 0, mb_len);
- rc = (struct dlm_rcom *) mb;
- rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
- rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
- rc->rc_header.h_nodeid = dlm_our_nodeid();
- rc->rc_header.h_length = mb_len;
- rc->rc_header.h_cmd = DLM_RCOM;
- rc->rc_type = DLM_RCOM_STATUS_REPLY;
- rc->rc_id = rc_in->rc_id;
- rc->rc_seq_reply = rc_in->rc_seq;
- rc->rc_result = -ESRCH;
- rf = (struct rcom_config *) rc->rc_buf;
- rf->rf_lvblen = cpu_to_le32(~0U);
- dlm_rcom_out(rc);
- dlm_lowcomms_commit_buffer(mh);
- return 0;
- }
- static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
- {
- uint64_t seq;
- int rv = 0;
- switch (rc->rc_type) {
- case DLM_RCOM_STATUS_REPLY:
- case DLM_RCOM_NAMES_REPLY:
- case DLM_RCOM_LOOKUP_REPLY:
- case DLM_RCOM_LOCK_REPLY:
- spin_lock(&ls->ls_recover_lock);
- seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
- if (rc->rc_seq_reply != seq) {
- log_debug(ls, "ignoring old reply %x from %d "
- "seq_reply %llx expect %llx",
- rc->rc_type, rc->rc_header.h_nodeid,
- (unsigned long long)rc->rc_seq_reply,
- (unsigned long long)seq);
- rv = 1;
- }
- }
- return rv;
- }
- /* Called by dlm_recv; corresponds to dlm_receive_message() but special
- recovery-only comms are sent through here. */
- void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
- {
- int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
- if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
- log_debug(ls, "ignoring recovery message %x from %d",
- rc->rc_type, nodeid);
- goto out;
- }
- if (is_old_reply(ls, rc))
- goto out;
- switch (rc->rc_type) {
- case DLM_RCOM_STATUS:
- receive_rcom_status(ls, rc);
- break;
- case DLM_RCOM_NAMES:
- receive_rcom_names(ls, rc);
- break;
- case DLM_RCOM_LOOKUP:
- receive_rcom_lookup(ls, rc);
- break;
- case DLM_RCOM_LOCK:
- if (rc->rc_header.h_length < lock_size)
- goto Eshort;
- receive_rcom_lock(ls, rc);
- break;
- case DLM_RCOM_STATUS_REPLY:
- receive_sync_reply(ls, rc);
- break;
- case DLM_RCOM_NAMES_REPLY:
- receive_sync_reply(ls, rc);
- break;
- case DLM_RCOM_LOOKUP_REPLY:
- receive_rcom_lookup_reply(ls, rc);
- break;
- case DLM_RCOM_LOCK_REPLY:
- if (rc->rc_header.h_length < lock_size)
- goto Eshort;
- dlm_recover_process_copy(ls, rc);
- break;
- default:
- log_error(ls, "receive_rcom bad type %d", rc->rc_type);
- }
- out:
- return;
- Eshort:
- log_error(ls, "recovery message %x from %d is too short",
- rc->rc_type, nodeid);
- }
|