member.c 9.0 KB


  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved.
  5. **
  6. ** This copyrighted material is made available to anyone wishing to use,
  7. ** modify, copy, or redistribute it subject to the terms and conditions
  8. ** of the GNU General Public License v.2.
  9. **
  10. *******************************************************************************
  11. ******************************************************************************/
  12. #include "dlm_internal.h"
  13. #include "lockspace.h"
  14. #include "member.h"
  15. #include "recoverd.h"
  16. #include "recover.h"
  17. #include "rcom.h"
  18. #include "config.h"
  19. #include "lowcomms.h"
  20. static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
  21. {
  22. struct dlm_member *memb = NULL;
  23. struct list_head *tmp;
  24. struct list_head *newlist = &new->list;
  25. struct list_head *head = &ls->ls_nodes;
  26. list_for_each(tmp, head) {
  27. memb = list_entry(tmp, struct dlm_member, list);
  28. if (new->nodeid < memb->nodeid)
  29. break;
  30. }
  31. if (!memb)
  32. list_add_tail(newlist, head);
  33. else {
  34. /* FIXME: can use list macro here */
  35. newlist->prev = tmp->prev;
  36. newlist->next = tmp;
  37. tmp->prev->next = newlist;
  38. tmp->prev = newlist;
  39. }
  40. }
  41. static int dlm_add_member(struct dlm_ls *ls, int nodeid)
  42. {
  43. struct dlm_member *memb;
  44. int w, error;
  45. memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
  46. if (!memb)
  47. return -ENOMEM;
  48. w = dlm_node_weight(ls->ls_name, nodeid);
  49. if (w < 0) {
  50. kfree(memb);
  51. return w;
  52. }
  53. error = dlm_lowcomms_connect_node(nodeid);
  54. if (error < 0) {
  55. kfree(memb);
  56. return error;
  57. }
  58. memb->nodeid = nodeid;
  59. memb->weight = w;
  60. add_ordered_member(ls, memb);
  61. ls->ls_num_nodes++;
  62. return 0;
  63. }
  64. static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
  65. {
  66. list_move(&memb->list, &ls->ls_nodes_gone);
  67. ls->ls_num_nodes--;
  68. }
  69. int dlm_is_member(struct dlm_ls *ls, int nodeid)
  70. {
  71. struct dlm_member *memb;
  72. list_for_each_entry(memb, &ls->ls_nodes, list) {
  73. if (memb->nodeid == nodeid)
  74. return 1;
  75. }
  76. return 0;
  77. }
  78. int dlm_is_removed(struct dlm_ls *ls, int nodeid)
  79. {
  80. struct dlm_member *memb;
  81. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  82. if (memb->nodeid == nodeid)
  83. return 1;
  84. }
  85. return 0;
  86. }
  87. static void clear_memb_list(struct list_head *head)
  88. {
  89. struct dlm_member *memb;
  90. while (!list_empty(head)) {
  91. memb = list_entry(head->next, struct dlm_member, list);
  92. list_del(&memb->list);
  93. kfree(memb);
  94. }
  95. }
  96. void dlm_clear_members(struct dlm_ls *ls)
  97. {
  98. clear_memb_list(&ls->ls_nodes);
  99. ls->ls_num_nodes = 0;
  100. }
  101. void dlm_clear_members_gone(struct dlm_ls *ls)
  102. {
  103. clear_memb_list(&ls->ls_nodes_gone);
  104. }
  105. static void make_member_array(struct dlm_ls *ls)
  106. {
  107. struct dlm_member *memb;
  108. int i, w, x = 0, total = 0, all_zero = 0, *array;
  109. kfree(ls->ls_node_array);
  110. ls->ls_node_array = NULL;
  111. list_for_each_entry(memb, &ls->ls_nodes, list) {
  112. if (memb->weight)
  113. total += memb->weight;
  114. }
  115. /* all nodes revert to weight of 1 if all have weight 0 */
  116. if (!total) {
  117. total = ls->ls_num_nodes;
  118. all_zero = 1;
  119. }
  120. ls->ls_total_weight = total;
  121. array = kmalloc(sizeof(int) * total, GFP_NOFS);
  122. if (!array)
  123. return;
  124. list_for_each_entry(memb, &ls->ls_nodes, list) {
  125. if (!all_zero && !memb->weight)
  126. continue;
  127. if (all_zero)
  128. w = 1;
  129. else
  130. w = memb->weight;
  131. DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
  132. for (i = 0; i < w; i++)
  133. array[x++] = memb->nodeid;
  134. }
  135. ls->ls_node_array = array;
  136. }
  137. /* send a status request to all members just to establish comms connections */
  138. static int ping_members(struct dlm_ls *ls)
  139. {
  140. struct dlm_member *memb;
  141. int error = 0;
  142. list_for_each_entry(memb, &ls->ls_nodes, list) {
  143. error = dlm_recovery_stopped(ls);
  144. if (error)
  145. break;
  146. error = dlm_rcom_status(ls, memb->nodeid);
  147. if (error)
  148. break;
  149. }
  150. if (error)
  151. log_debug(ls, "ping_members aborted %d last nodeid %d",
  152. error, ls->ls_recover_nodeid);
  153. return error;
  154. }
  155. int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
  156. {
  157. struct dlm_member *memb, *safe;
  158. int i, error, found, pos = 0, neg = 0, low = -1;
  159. /* previously removed members that we've not finished removing need to
  160. count as a negative change so the "neg" recovery steps will happen */
  161. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  162. log_debug(ls, "prev removed member %d", memb->nodeid);
  163. neg++;
  164. }
  165. /* move departed members from ls_nodes to ls_nodes_gone */
  166. list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
  167. found = 0;
  168. for (i = 0; i < rv->node_count; i++) {
  169. if (memb->nodeid == rv->nodeids[i]) {
  170. found = 1;
  171. break;
  172. }
  173. }
  174. if (!found) {
  175. neg++;
  176. dlm_remove_member(ls, memb);
  177. log_debug(ls, "remove member %d", memb->nodeid);
  178. }
  179. }
  180. /* Add an entry to ls_nodes_gone for members that were removed and
  181. then added again, so that previous state for these nodes will be
  182. cleared during recovery. */
  183. for (i = 0; i < rv->new_count; i++) {
  184. if (!dlm_is_member(ls, rv->new[i]))
  185. continue;
  186. log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
  187. memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
  188. if (!memb)
  189. return -ENOMEM;
  190. memb->nodeid = rv->new[i];
  191. list_add_tail(&memb->list, &ls->ls_nodes_gone);
  192. neg++;
  193. }
  194. /* add new members to ls_nodes */
  195. for (i = 0; i < rv->node_count; i++) {
  196. if (dlm_is_member(ls, rv->nodeids[i]))
  197. continue;
  198. dlm_add_member(ls, rv->nodeids[i]);
  199. pos++;
  200. log_debug(ls, "add member %d", rv->nodeids[i]);
  201. }
  202. list_for_each_entry(memb, &ls->ls_nodes, list) {
  203. if (low == -1 || memb->nodeid < low)
  204. low = memb->nodeid;
  205. }
  206. ls->ls_low_nodeid = low;
  207. make_member_array(ls);
  208. dlm_set_recover_status(ls, DLM_RS_NODES);
  209. *neg_out = neg;
  210. error = ping_members(ls);
  211. if (!error || error == -EPROTO) {
  212. /* new_lockspace() may be waiting to know if the config
  213. is good or bad */
  214. ls->ls_members_result = error;
  215. complete(&ls->ls_members_done);
  216. }
  217. if (error)
  218. goto out;
  219. error = dlm_recover_members_wait(ls);
  220. out:
  221. log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
  222. return error;
  223. }
  224. /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
  225. dlm_ls_start() is called on any of them to start the new recovery. */
  226. int dlm_ls_stop(struct dlm_ls *ls)
  227. {
  228. int new;
  229. /*
  230. * Prevent dlm_recv from being in the middle of something when we do
  231. * the stop. This includes ensuring dlm_recv isn't processing a
  232. * recovery message (rcom), while dlm_recoverd is aborting and
  233. * resetting things from an in-progress recovery. i.e. we want
  234. * dlm_recoverd to abort its recovery without worrying about dlm_recv
  235. * processing an rcom at the same time. Stopping dlm_recv also makes
  236. * it easy for dlm_receive_message() to check locking stopped and add a
  237. * message to the requestqueue without races.
  238. */
  239. down_write(&ls->ls_recv_active);
  240. /*
  241. * Abort any recovery that's in progress (see RECOVERY_STOP,
  242. * dlm_recovery_stopped()) and tell any other threads running in the
  243. * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
  244. */
  245. spin_lock(&ls->ls_recover_lock);
  246. set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
  247. new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
  248. ls->ls_recover_seq++;
  249. spin_unlock(&ls->ls_recover_lock);
  250. /*
  251. * Let dlm_recv run again, now any normal messages will be saved on the
  252. * requestqueue for later.
  253. */
  254. up_write(&ls->ls_recv_active);
  255. /*
  256. * This in_recovery lock does two things:
  257. * 1) Keeps this function from returning until all threads are out
  258. * of locking routines and locking is truly stopped.
  259. * 2) Keeps any new requests from being processed until it's unlocked
  260. * when recovery is complete.
  261. */
  262. if (new)
  263. down_write(&ls->ls_in_recovery);
  264. /*
  265. * The recoverd suspend/resume makes sure that dlm_recoverd (if
  266. * running) has noticed RECOVERY_STOP above and quit processing the
  267. * previous recovery.
  268. */
  269. dlm_recoverd_suspend(ls);
  270. ls->ls_recover_status = 0;
  271. dlm_recoverd_resume(ls);
  272. if (!ls->ls_recover_begin)
  273. ls->ls_recover_begin = jiffies;
  274. return 0;
  275. }
  276. int dlm_ls_start(struct dlm_ls *ls)
  277. {
  278. struct dlm_recover *rv = NULL, *rv_old;
  279. int *ids = NULL, *new = NULL;
  280. int error, ids_count = 0, new_count = 0;
  281. rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
  282. if (!rv)
  283. return -ENOMEM;
  284. error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
  285. &new, &new_count);
  286. if (error < 0)
  287. goto fail;
  288. spin_lock(&ls->ls_recover_lock);
  289. /* the lockspace needs to be stopped before it can be started */
  290. if (!dlm_locking_stopped(ls)) {
  291. spin_unlock(&ls->ls_recover_lock);
  292. log_error(ls, "start ignored: lockspace running");
  293. error = -EINVAL;
  294. goto fail;
  295. }
  296. rv->nodeids = ids;
  297. rv->node_count = ids_count;
  298. rv->new = new;
  299. rv->new_count = new_count;
  300. rv->seq = ++ls->ls_recover_seq;
  301. rv_old = ls->ls_recover_args;
  302. ls->ls_recover_args = rv;
  303. spin_unlock(&ls->ls_recover_lock);
  304. if (rv_old) {
  305. log_error(ls, "unused recovery %llx %d",
  306. (unsigned long long)rv_old->seq, rv_old->node_count);
  307. kfree(rv_old->nodeids);
  308. kfree(rv_old->new);
  309. kfree(rv_old);
  310. }
  311. dlm_recoverd_kick(ls);
  312. return 0;
  313. fail:
  314. kfree(rv);
  315. kfree(ids);
  316. kfree(new);
  317. return error;
  318. }