From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (qmail 28862 invoked by alias); 28 Feb 2008 16:33:11 -0000 Received: (qmail 28831 invoked by uid 9478); 28 Feb 2008 16:33:10 -0000 Date: Thu, 28 Feb 2008 16:33:00 -0000 Message-ID: <20080228163310.28815.qmail@sourceware.org> From: jbrassow@sourceware.org To: cluster-cvs@sources.redhat.com Subject: Cluster Project branch, RHEL5, updated. cmirror-kernel_0_1_7-7-g15d12f2 X-Git-Refname: refs/heads/RHEL5 X-Git-Reftype: branch X-Git-Oldrev: 44e386b49148a7ab59a0bcc89fa9dab4c9585b37 X-Git-Newrev: 15d12f28f64661dbb6f4f80eae822f2b56e7a5d4 Mailing-List: contact cluster-cvs-help@sourceware.org; run by ezmlm Precedence: bulk List-Id: List-Subscribe: List-Post: List-Help: , Sender: cluster-cvs-owner@sourceware.org X-SW-Source: 2008-q1/txt/msg00255.txt.bz2 This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "Cluster Project". http://sources.redhat.com/git/gitweb.cgi?p=cluster.git;a=commitdiff;h=15d12f28f64661dbb6f4f80eae822f2b56e7a5d4 The branch, RHEL5 has been updated via 15d12f28f64661dbb6f4f80eae822f2b56e7a5d4 (commit) via 89961b360cee9d425bbb11f17396539c04db25d3 (commit) from 44e386b49148a7ab59a0bcc89fa9dab4c9585b37 (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- commit 15d12f28f64661dbb6f4f80eae822f2b56e7a5d4 Author: Jonathan Brassow Date: Wed Feb 27 16:29:27 2008 -0600 clogd: bug fixes related to joining/leaving CPGs - Was purging all entries in the cluster queue when one log was leaving... this could mean removing requests associated with other logs - Writing of checkpoint data could be done at the wrong time if the node was still initializing and receiving messages - Could skip a response if the server changed while queueing requests during initialization - spell receive correctly commit 89961b360cee9d425bbb11f17396539c04db25d3 Author: Jonathan Brassow Date: Wed Feb 27 16:23:49 2008 -0600 dm-log-clustered: minor touch-ups - Fix spelling of 'reciev*' - struct vs dereff'ed ptr as sizeof arg ----------------------------------------------------------------------- Summary of changes: cmirror-kernel/src/dm-clog-tfr.c | 47 ++++++------ cmirror-kernel/src/dm-clog.c | 2 +- cmirror/src/cluster.c | 154 +++++++++++++++++++++++--------------- cmirror/src/local.c | 13 +++- cmirror/src/queues.c | 2 + 5 files changed, 133 insertions(+), 85 deletions(-) diff --git a/cmirror-kernel/src/dm-clog-tfr.c b/cmirror-kernel/src/dm-clog-tfr.c index e8fda30..6056c19 100644 --- a/cmirror-kernel/src/dm-clog-tfr.c +++ b/cmirror-kernel/src/dm-clog-tfr.c @@ -26,7 +26,7 @@ static struct clog_tfr *prealloced_clog_tfr = NULL; static struct cb_id cn_clog_id = { 0x4, 0x1 }; static DEFINE_MUTEX(_lock); -struct recieving_pkg { +struct receiving_pkg { struct list_head list; struct completion complete; @@ -38,8 +38,8 @@ struct recieving_pkg { char *data; }; -static spinlock_t recieving_list_lock = SPIN_LOCK_UNLOCKED; -static struct list_head recieving_list; +static spinlock_t receiving_list_lock = SPIN_LOCK_UNLOCKED; +static struct list_head receiving_list; static int dm_clog_sendto_server(struct clog_tfr *tfr) { @@ -48,14 +48,14 @@ static int dm_clog_sendto_server(struct clog_tfr *tfr) struct cn_msg *msg = prealloced_cn_msg; if (tfr != prealloced_clog_tfr) { - size = sizeof(*msg) + sizeof(struct clog_tfr) + tfr->data_size; + size = sizeof(struct cn_msg) + sizeof(struct clog_tfr) + tfr->data_size; msg = kmalloc(size, GFP_NOIO); if (!msg) return -ENOMEM; memcpy((msg + 1), tfr, sizeof(struct clog_tfr) + tfr->data_size); } - memset(msg, 0, sizeof(*msg)); + memset(msg, 0, sizeof(struct cn_msg)); msg->id.idx = cn_clog_id.idx; msg->id.val = cn_clog_id.val; @@ -64,6 +64,7 @@ static int dm_clog_sendto_server(struct clog_tfr *tfr) msg->len = sizeof(struct clog_tfr) + tfr->data_size; r = cn_netlink_send(msg, 0, gfp_any()); + if (msg != prealloced_cn_msg) kfree(msg); @@ -75,24 +76,24 @@ static void cn_clog_callback(void *data) int found = 0; struct cn_msg *msg = (struct cn_msg *)data; struct clog_tfr *tfr = (struct clog_tfr *)(msg + 1); - struct recieving_pkg *pkg; + struct receiving_pkg *pkg; - spin_lock(&recieving_list_lock); + spin_lock(&receiving_list_lock); if (msg->len < sizeof(*tfr)) { - DMERR("Incomplete message recieved from cluster log server"); - spin_unlock(&recieving_list_lock); + DMERR("Incomplete message received from cluster log server"); + spin_unlock(&receiving_list_lock); return; } - list_for_each_entry(pkg, &recieving_list, list) { + list_for_each_entry(pkg, &receiving_list, list) { /* - DMINFO("Msg from userspace recieved [%s].", RQ_TYPE(tfr->request_type)); - DMINFO(" Seq # recieved: %llu Seq # wanted: %llu", + DMINFO("Msg from userspace received [%s].", RQ_TYPE(tfr->request_type)); + DMINFO(" Seq # received: %llu Seq # wanted: %llu", pkg->seq, tfr->seq); */ if (tfr->seq == pkg->seq) { if (tfr->data_size > *(pkg->data_size)) { - DMERR("Insufficient space to recieve package [%s]", + DMERR("Insufficient space to receive package [%s]", RQ_TYPE(tfr->request_type)); *(pkg->data_size) = 0; pkg->error = -ENOSPC; @@ -107,7 +108,7 @@ static void cn_clog_callback(void *data) } } - spin_unlock(&recieving_list_lock); + spin_unlock(&receiving_list_lock); if (!found) DMERR("Stray request returned: %s, %llu", RQ_TYPE(tfr->request_type), tfr->seq); @@ -135,7 +136,7 @@ int dm_clog_consult_server(const char *uuid, int request_type, int dummy = 0; int overhead_size = sizeof(struct clog_tfr *) + sizeof(struct cn_msg); struct clog_tfr *tfr = prealloced_clog_tfr; - struct recieving_pkg pkg; + struct receiving_pkg pkg; if (data_size > (DM_CLOG_PREALLOCED_SIZE - overhead_size)) { DMINFO("Size of tfr exceeds preallocated size"); @@ -168,9 +169,9 @@ resend: pkg.seq = tfr->seq; pkg.data_size = rdata_size; pkg.data = rdata; - spin_lock(&recieving_list_lock); - list_add(&(pkg.list), &recieving_list); - spin_unlock(&recieving_list_lock); + spin_lock(&receiving_list_lock); + list_add(&(pkg.list), &receiving_list); + spin_unlock(&receiving_list_lock); pkg.start_time = jiffies; r = dm_clog_sendto_server(tfr); @@ -187,9 +188,9 @@ resend: if (!r) { DMWARN("Request timed out on %s:%llu - retrying", RQ_TYPE(request_type), pkg.seq); - spin_lock(&recieving_list_lock); + spin_lock(&receiving_list_lock); list_del_init(&(pkg.list)); - spin_unlock(&recieving_list_lock); + spin_unlock(&receiving_list_lock); goto resend; } else { @@ -206,9 +207,9 @@ resend: RQ_TYPE(request_type), r); out: - spin_lock(&recieving_list_lock); + spin_lock(&receiving_list_lock); list_del_init(&(pkg.list)); - spin_unlock(&recieving_list_lock); + spin_unlock(&receiving_list_lock); if (tfr != (struct clog_tfr *)prealloced_clog_tfr) kfree(tfr); @@ -221,7 +222,7 @@ int dm_clog_tfr_init(void) int r; void *prealloced; - INIT_LIST_HEAD(&recieving_list); + INIT_LIST_HEAD(&receiving_list); prealloced = kmalloc(DM_CLOG_PREALLOCED_SIZE, GFP_KERNEL); if (!prealloced) diff --git a/cmirror-kernel/src/dm-clog.c b/cmirror-kernel/src/dm-clog.c index 92e8f33..b61de62 100644 --- a/cmirror-kernel/src/dm-clog.c +++ b/cmirror-kernel/src/dm-clog.c @@ -431,7 +431,7 @@ static int cluster_flush(struct dirty_log *log) fail: /* * We can safely remove these entries, even if failure. - * Calling code will recieve an error and will know that + * Calling code will receive an error and will know that * the log facility has failed. */ list_for_each_entry_safe(fe, tmp_fe, &flush_list, list) { diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c index 48d2f81..27077ae 100644 --- a/cmirror/src/cluster.c +++ b/cmirror/src/cluster.c @@ -20,14 +20,15 @@ #include "link_mon.h" #define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */ -#define DM_CLOG_CHECKPOINT_READY ((uint32_t)-1) +#define DM_CLOG_CHECKPOINT_READY 21 +#define DM_CLOG_CONFIG_CHANGE 22 static uint32_t my_cluster_id = 0xDEAD; static SaCkptHandleT ckpt_handle = 0; static SaCkptCallbacksT callbacks = { 0, 0 }; static SaVersionT version = { 'B', 1, 1 }; -#define DEBUGGING_HISTORY 20 +#define DEBUGGING_HISTORY 100 static char debugging[DEBUGGING_HISTORY][128]; static int idx = 0; static int memberz = 0; @@ -151,10 +152,10 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz) * Errors from previous functions are in the tfr struct. */ if (printz) - LOG_PRINT("[%s] Sending response to %u on cluster: [%s/%llu]", - SHORT_UUID(tfr->uuid), tfr->originator, - RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), - (unsigned long long)tfr->seq); + LOG_DBG("[%s] Sending response to %u on cluster: [%s/%llu]", + SHORT_UUID(tfr->uuid), tfr->originator, + RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), + (unsigned long long)tfr->seq); r = cluster_send(tfr); if (r) LOG_ERROR("cluster_send failed: %s", strerror(-r)); @@ -171,10 +172,8 @@ static int handle_cluster_response(struct clog_tfr *tfr) /* * If I didn't send it, then I don't care about the response */ - if (tfr->originator != my_cluster_id) { - LOG_DBG("Response not for me. Disgarding."); - goto out; - } + if (tfr->originator != my_cluster_id) + return 0; tfr->request_type &= ~DM_CLOG_RESPONSE; orig_tfr = queue_remove_match(cluster_queue, clog_tfr_cmp, tfr); @@ -203,8 +202,7 @@ static int handle_cluster_response(struct clog_tfr *tfr) queue_add(t, cluster_queue); } - r = -EINVAL; - goto out; + return -EINVAL; } /* FIXME: Ensure memcpy cannot explode */ @@ -213,7 +211,6 @@ static int handle_cluster_response(struct clog_tfr *tfr) if (r) LOG_ERROR("Failed to send response to kernel"); -out: return r; } @@ -531,7 +528,7 @@ open_retry: saCkptCheckpointUnlink(ckpt_handle, &name); if (no_read) { - LOG_DBG("Checkpoint for this log already recieved"); + LOG_DBG("Checkpoint for this log already received"); goto no_read; } @@ -555,6 +552,8 @@ init_retry: rv = saCkptSectionIterationNext(itr, &desc); if (rv == SA_AIS_OK) len++; + else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len) + break; else if (rv != SA_AIS_ERR_TRY_AGAIN) { LOG_ERROR("saCkptSectionIterationNext failure: %d", rv); break; @@ -700,28 +699,29 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, int i; int r = 0; int i_am_server; + int i_was_server; int response = 0; struct clog_tfr *tfr = msg; struct clog_tfr *startup_tfr = NULL; struct clog_cpg *match; if (msg_len != (sizeof(*tfr) + tfr->data_size)) - LOG_ERROR("Badly sized message recieved from cluster."); + LOG_ERROR("Badly sized message received from cluster."); if (tfr->request_type & DM_CLOG_RESPONSE) - LOG_DBG("Response to %u from %u recieved [%s/%llu]", + LOG_DBG("Response to %u from %u received [%s/%llu]", tfr->originator, nodeid, RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), (unsigned long long)tfr->seq); else - LOG_DBG("Request from %u recieved [%s/%llu]", + LOG_DBG("Request from %u received [%s/%llu]", nodeid, RQ_TYPE(tfr->request_type), (unsigned long long)tfr->seq); if (my_cluster_id == 0xDEAD) { - LOG_DBG("[%s] Message from %u before init [%s/%llu]", - SHORT_UUID(tfr->uuid), nodeid, - RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); + LOG_ERROR("[%s] Message from %u before init [%s/%llu]", + SHORT_UUID(tfr->uuid), nodeid, + RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); return; } @@ -732,9 +732,9 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, } if (match->lowest_id == 0xDEAD) { - LOG_DBG("[%s] Message from %u before init* [%s/%llu]", - SHORT_UUID(tfr->uuid), nodeid, - RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); + LOG_ERROR("[%s] Message from %u before init* [%s/%llu]", + SHORT_UUID(tfr->uuid), nodeid, + RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); return; } @@ -748,22 +748,37 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, /* Could we retry? */ goto out; } else if (!match->valid) { - LOG_DBG("[%s] Checkpoint data recieved. Log is now valid", + LOG_DBG("[%s] Checkpoint data received. Log is now valid", SHORT_UUID(match->name.value)); match->valid = 1; + while ((startup_tfr = queue_remove(match->startup_queue))) { - LOG_DBG("Processing delayed request %d: %s", - match->startup_queue->count, - RQ_TYPE(startup_tfr->request_type)); - r = handle_cluster_request(startup_tfr, i_am_server, 1); - - if (r) { - LOG_ERROR("Error while processing delayed CPG message"); - /* - * FIXME: If we error out here, we will never get - * another opportunity to retry these requests - */ - goto out; + if (startup_tfr->request_type == DM_CLOG_CONFIG_CHANGE) { + struct checkpoint_data *new; + + new = prepare_checkpoint(match, startup_tfr->originator); + if (!new) { + /* FIXME: Need better error handling */ + LOG_ERROR("Failed to prepare checkpoint for %u!!!", + startup_tfr->originator); + goto out; + } + } else { + LOG_DBG("Processing delayed request %d: %s", + match->startup_queue->count, + RQ_TYPE(startup_tfr->request_type)); + i_was_server = (startup_tfr->error == my_cluster_id) ? 1 : 0; + startup_tfr->error = 0; + r = handle_cluster_request(startup_tfr, i_was_server, 1); + + if (r) { + LOG_ERROR("Error while processing delayed CPG message"); + /* + * FIXME: If we error out here, we will never get + * another opportunity to retry these requests + */ + goto out; + } } queue_add(startup_tfr, free_queue); } @@ -818,6 +833,7 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, } memcpy(startup_tfr, tfr, sizeof(*tfr) + tfr->data_size); + startup_tfr->error = match->lowest_id; queue_add_tail(startup_tfr, match->startup_queue); goto out; } @@ -846,7 +862,8 @@ out: continue; LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]); } - } else if (!(tfr->request_type & DM_CLOG_RESPONSE)) { + } else if (!(tfr->request_type & DM_CLOG_RESPONSE) || + (tfr->originator == my_cluster_id)) { int len; idx++; idx = idx % DEBUGGING_HISTORY; @@ -915,8 +932,11 @@ static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname, /* Am I leaving? */ for (i = 0; i < left_list_entries; i++) if (my_cluster_id == left_list[i].nodeid) { + struct list_head l, *p, *n; struct clog_tfr *tfr; + INIT_LIST_HEAD(&l); + LOG_DBG("Finalizing leave..."); list_del_init(&match->list); @@ -925,20 +945,18 @@ static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname, cluster_postsuspend(match->name.value); - while (!queue_empty(cluster_queue)) { - tfr = queue_remove(cluster_queue); + queue_remove_all(&l, cluster_queue); + + list_for_each_safe(p, n, &l) { + list_del_init(p); + tfr = (struct clog_tfr *)p; - /* - * A postsuspend is place directly into - * the cluster_queue, without going out - * to the cluster. This means that only - * our postsuspend will ever exist in the - * cluster_queue. - */ if (tfr->request_type == DM_CLOG_POSTSUSPEND) kernel_send(tfr); - else + else if (!strcmp(match->name.value, tfr->uuid)) queue_add(tfr, free_queue); + else + queue_add(tfr, cluster_queue); } cpg_finalize(match->handle); @@ -1002,24 +1020,42 @@ static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname, goto out; for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) { + struct clog_tfr *tfr; LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid); - match->checkpoint_requesters[i + j] = joined_list[i].nodeid; + + if (queue_empty(match->startup_queue)) { + match->checkpoint_requesters[match->checkpoints_needed++] = joined_list[i].nodeid; + continue; + } + + tfr = queue_remove(free_queue); + if (!tfr) { + LOG_PRINT("cpg_config_callback: Preallocated transfer structs exhausted"); + tfr = malloc(DM_CLOG_TFR_SIZE); + if (!tfr) { + LOG_ERROR("cpg_config_callback: Unable to allocate transfer structs"); + LOG_ERROR("cpg_config_callback: Unable to perform checkpoint"); + return; + } + } + tfr->request_type = DM_CLOG_CONFIG_CHANGE; + tfr->originator = joined_list[i].nodeid; + queue_add_tail(tfr, match->startup_queue); } - match->checkpoints_needed += i; out: if (lowest != match->lowest_id) - LOG_PRINT("[%s] Server change %u -> %u (%u %s)", - SHORT_UUID(match->name.value), - lowest, match->lowest_id, - (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, - (joined_list_entries && (member_list_entries == 1)) ? - "is first to join" : (joined_list_entries) ? "joined" : "left"); + LOG_DBG("[%s] Server change %u -> %u (%u %s)", + SHORT_UUID(match->name.value), + lowest, match->lowest_id, + (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, + (joined_list_entries && (member_list_entries == 1)) ? + "is first to join" : (joined_list_entries) ? "joined" : "left"); else - LOG_PRINT("[%s] Server unchanged at %u (%u %s)", - SHORT_UUID(match->name.value), lowest, - (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, - (joined_list_entries) ? "joined" : "left"); + LOG_DBG("[%s] Server unchanged at %u (%u %s)", + SHORT_UUID(match->name.value), lowest, + (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, + (joined_list_entries) ? "joined" : "left"); if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id)) doit = 25; diff --git a/cmirror/src/local.c b/cmirror/src/local.c index 3b5bac7..93a99a8 100644 --- a/cmirror/src/local.c +++ b/cmirror/src/local.c @@ -43,10 +43,13 @@ static int kernel_recv_helper(void *data, int in_size) case NLMSG_DONE: msg = (struct cn_msg *) NLMSG_DATA((struct nlmsghdr *)buf); if (msg->len > in_size) { - LOG_ERROR("Not enough space to recieve kernel request (%d/%d)", + LOG_ERROR("Not enough space to receive kernel request (%d/%d)", msg->len, in_size); return -EBADE; } + if (!msg->len) + LOG_ERROR("Zero length message received"); + memcpy(data, msg->data, msg->len); r = 0; break; @@ -155,10 +158,16 @@ static int do_local_work(void *data) if (r) return r; - LOG_DBG("Request from kernel recieved [%s/%s/%llu]", + if (!tfr) + return -EBADE; + + LOG_DBG("Request from kernel received [%s/%s/%llu]", RQ_TYPE(tfr->request_type), SHORT_UUID(tfr->uuid), (unsigned long long)tfr->seq); switch (tfr->request_type) { + case 0: + LOG_ERROR("Invalid request_type"); + break; case DM_CLOG_CTR: case DM_CLOG_DTR: case DM_CLOG_IN_SYNC: diff --git a/cmirror/src/queues.c b/cmirror/src/queues.c index bdf1eab..0bf1aee 100644 --- a/cmirror/src/queues.c +++ b/cmirror/src/queues.c @@ -1,5 +1,6 @@ #include #include + #include "queues.h" #include "common.h" #include "logging.h" @@ -217,6 +218,7 @@ struct clog_tfr *queue_remove_match(struct queue *q, if (f(tfr, tfr_cmp)) { list_del_init(p); q->count--; + return tfr; } } hooks/post-receive -- Cluster Project