From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (qmail 14871 invoked by alias); 25 Sep 2008 19:06:35 -0000 Received: (qmail 14865 invoked by alias); 25 Sep 2008 19:06:35 -0000 X-Spam-Status: No, hits=0.3 required=5.0 tests=AWL,BAYES_00,J_CHICKENPOX_46,J_CHICKENPOX_55,J_CHICKENPOX_62,J_CHICKENPOX_63,J_CHICKENPOX_64,J_CHICKENPOX_66,KAM_MX,SPF_HELO_PASS X-Spam-Check-By: sourceware.org X-Spam-Checker-Version: SpamAssassin 3.2.4 (2008-01-01) on bastion.fedora.phx.redhat.com X-Spam-Level: Subject: master - dlm_controld: add protocol negotiation To: cluster-cvs-relay@redhat.com X-Project: Cluster Project X-Git-Module: cluster.git X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 1a79300ba0deffdb374e4a9114273b32be4b35b7 X-Git-Newrev: 1ffd1effe1a2a4441a3ab151cbed4ad563256b3b From: David Teigland Message-Id: <20080925183654.E866E120438@lists.fedorahosted.org> Date: Thu, 25 Sep 2008 20:08:00 -0000 X-Scanned-By: MIMEDefang 2.58 on 172.16.52.254 Mailing-List: contact cluster-cvs-help@sourceware.org; run by ezmlm Precedence: bulk List-Id: List-Subscribe: List-Post: List-Help: , Sender: cluster-cvs-owner@sourceware.org X-SW-Source: 2008-q3/txt/msg00532.txt.bz2 Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=1ffd1effe1a2a4441a3ab151cbed4ad563256b3b Commit: 1ffd1effe1a2a4441a3ab151cbed4ad563256b3b Parent: 1a79300ba0deffdb374e4a9114273b32be4b35b7 Author: David Teigland AuthorDate: Thu Sep 25 13:21:48 2008 -0500 Committer: David Teigland CommitterDate: Thu Sep 25 13:24:25 2008 -0500 dlm_controld: add protocol negotiation Exact copy from gfs_controld. Also fix a check in gfs_controld. The kernel proto checking is not yet linked to the actual kernel version. Signed-off-by: David Teigland --- group/dlm_controld/cpg.c | 552 +++++++++++++++++++++++++++++++++++++-- group/dlm_controld/dlm_daemon.h | 5 +- group/dlm_controld/main.c | 6 +- group/gfs_controld/cpg-new.c | 4 +- 4 files changed, 534 insertions(+), 33 deletions(-) diff --git a/group/dlm_controld/cpg.c b/group/dlm_controld/cpg.c index 37c302a..e585878 100644 --- a/group/dlm_controld/cpg.c +++ b/group/dlm_controld/cpg.c @@ -3,9 +3,31 @@ uint32_t cpgname_to_crc(const char *data, int len); -int message_flow_control_on; -static cpg_handle_t daemon_handle; -static unsigned int protocol_active[3] = {1, 0, 0}; +struct protocol_version { + uint16_t major; + uint16_t minor; + uint16_t patch; + uint16_t flags; +}; + +struct protocol { + union { + struct protocol_version dm_ver; + uint16_t daemon_max[4]; + }; + union { + struct protocol_version km_ver; + uint16_t kernel_max[4]; + }; + union { + struct protocol_version dr_ver; + uint16_t daemon_run[4]; + }; + union { + struct protocol_version kr_ver; + uint16_t kernel_run[4]; + }; +}; struct member { struct list_head list; @@ -28,6 +50,8 @@ struct node { uint32_t added_seq; /* for queries */ uint32_t removed_seq; /* for queries */ int failed_reason; /* for queries */ + + struct protocol proto; }; /* One of these change structs is created for every confchg a cpg gets. */ @@ -66,6 +90,14 @@ struct id_info { int nodeid; }; +int message_flow_control_on; +static int daemon_cpg_fd; +static cpg_handle_t daemon_cpg_handle; +static struct protocol our_protocol; +static struct list_head daemon_nodes; +static struct cpg_address daemon_member[MAX_NODES]; +static int daemon_member_count; + static void ls_info_in(struct ls_info *li) { li->ls_info_size = le32_to_cpu(li->ls_info_size); @@ -164,9 +196,9 @@ void dlm_send_message(struct lockspace *ls, char *buf, int len) struct dlm_header *hd = (struct dlm_header *) buf; int type = hd->type; - hd->version[0] = cpu_to_le16(protocol_active[0]); - hd->version[1] = cpu_to_le16(protocol_active[1]); - hd->version[2] = cpu_to_le16(protocol_active[2]); + hd->version[0] = cpu_to_le16(our_protocol.daemon_run[0]); + hd->version[1] = cpu_to_le16(our_protocol.daemon_run[1]); + hd->version[2] = cpu_to_le16(our_protocol.daemon_run[2]); hd->type = cpu_to_le16(hd->type); hd->nodeid = cpu_to_le32(our_nodeid); hd->to_nodeid = cpu_to_le32(hd->to_nodeid); @@ -1269,6 +1301,19 @@ static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name, #endif } +static void dlm_header_in(struct dlm_header *hd) +{ + hd->version[0] = le16_to_cpu(hd->version[0]); + hd->version[1] = le16_to_cpu(hd->version[1]); + hd->version[2] = le16_to_cpu(hd->version[2]); + hd->type = le16_to_cpu(hd->type); + hd->nodeid = le32_to_cpu(hd->nodeid); + hd->to_nodeid = le32_to_cpu(hd->to_nodeid); + hd->global_id = le32_to_cpu(hd->global_id); + hd->flags = le32_to_cpu(hd->flags); + hd->msgdata = le32_to_cpu(hd->msgdata); +} + static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *data, int len) { @@ -1281,23 +1326,21 @@ static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name, return; } - hd = (struct dlm_header *)data; + if (len < sizeof(*hd)) { + log_error("deliver_cb short message %d", len); + return; + } - hd->version[0] = le16_to_cpu(hd->version[0]); - hd->version[1] = le16_to_cpu(hd->version[1]); - hd->version[2] = le16_to_cpu(hd->version[2]); - hd->type = le16_to_cpu(hd->type); - hd->nodeid = le32_to_cpu(hd->nodeid); - hd->to_nodeid = le32_to_cpu(hd->to_nodeid); - hd->global_id = le32_to_cpu(hd->global_id); - hd->flags = le32_to_cpu(hd->flags); - hd->msgdata = le32_to_cpu(hd->msgdata); + hd = (struct dlm_header *)data; + dlm_header_in(hd); - if (hd->version[0] != protocol_active[0]) { + if (hd->version[0] != our_protocol.daemon_run[0] || + hd->version[1] != our_protocol.daemon_run[1]) { log_error("reject message from %d version %u.%u.%u vs %u.%u.%u", nodeid, hd->version[0], hd->version[1], - hd->version[2], protocol_active[0], - protocol_active[1], protocol_active[2]); + hd->version[2], our_protocol.daemon_run[0], + our_protocol.daemon_run[1], + our_protocol.daemon_run[2]); return; } @@ -1362,8 +1405,9 @@ void update_flow_control_status(void) { cpg_flow_control_state_t flow_control_state; cpg_error_t error; - - error = cpg_flow_control_state_get(daemon_handle, &flow_control_state); + + error = cpg_flow_control_state_get(daemon_cpg_handle, + &flow_control_state); if (error != CPG_OK) { log_error("cpg_flow_control_state_get %d", error); return; @@ -1433,7 +1477,7 @@ int dlm_join_lockspace(struct lockspace *ls) ls->joining = 1; memset(&name, 0, sizeof(name)); - sprintf(name.value, "dlm:%s", ls->name); + sprintf(name.value, "dlm:ls:%s", ls->name); name.length = strlen(name.value) + 1; /* TODO: allow global_id to be set in cluster.conf? */ @@ -1475,7 +1519,7 @@ int dlm_leave_lockspace(struct lockspace *ls) ls->leaving = 1; memset(&name, 0, sizeof(name)); - sprintf(name.value, "dlm:%s", ls->name); + sprintf(name.value, "dlm:ls:%s", ls->name); name.length = strlen(name.value) + 1; retry: @@ -1492,21 +1536,471 @@ int dlm_leave_lockspace(struct lockspace *ls) return 0; } +static struct node *get_node_daemon(int nodeid) +{ + struct node *node; + + list_for_each_entry(node, &daemon_nodes, list) { + if (node->nodeid == nodeid) + return node; + } + return NULL; +} + +static void add_node_daemon(int nodeid) +{ + struct node *node; + + if (get_node_daemon(nodeid)) + return; + + node = malloc(sizeof(struct node)); + if (!node) { + log_error("add_node_daemon no mem"); + return; + } + memset(node, 0, sizeof(struct node)); + node->nodeid = nodeid; + list_add_tail(&node->list, &daemon_nodes); +} + +static void pv_in(struct protocol_version *pv) +{ + pv->major = le16_to_cpu(pv->major); + pv->minor = le16_to_cpu(pv->minor); + pv->patch = le16_to_cpu(pv->patch); + pv->flags = le16_to_cpu(pv->flags); +} + +static void pv_out(struct protocol_version *pv) +{ + pv->major = cpu_to_le16(pv->major); + pv->minor = cpu_to_le16(pv->minor); + pv->patch = cpu_to_le16(pv->patch); + pv->flags = cpu_to_le16(pv->flags); +} + +static void protocol_in(struct protocol *proto) +{ + pv_in(&proto->dm_ver); + pv_in(&proto->km_ver); + pv_in(&proto->dr_ver); + pv_in(&proto->kr_ver); +} + +static void protocol_out(struct protocol *proto) +{ + pv_out(&proto->dm_ver); + pv_out(&proto->km_ver); + pv_out(&proto->dr_ver); + pv_out(&proto->kr_ver); +} + +/* go through member list saved in last confchg, see if we have received a + proto message from each */ + +static int all_protocol_messages(void) +{ + struct node *node; + int i; + + if (!daemon_member_count) + return 0; + + for (i = 0; i < daemon_member_count; i++) { + node = get_node_daemon(daemon_member[i].nodeid); + if (!node) { + log_error("all_protocol_messages no node %d", + daemon_member[i].nodeid); + return 0; + } + + if (!node->proto.daemon_max[0]) + return 0; + } + return 1; +} + +static int pick_min_protocol(struct protocol *proto) +{ + uint16_t mind[4]; + uint16_t mink[4]; + struct node *node; + int i; + + memset(&mind, 0, sizeof(mind)); + memset(&mink, 0, sizeof(mink)); + + /* first choose the minimum major */ + + for (i = 0; i < daemon_member_count; i++) { + node = get_node_daemon(daemon_member[i].nodeid); + if (!node) { + log_error("pick_min_protocol no node %d", + daemon_member[i].nodeid); + return -1; + } + + if (!mind[0] || node->proto.daemon_max[0] < mind[0]) + mind[0] = node->proto.daemon_max[0]; + + if (!mink[0] || node->proto.kernel_max[0] < mink[0]) + mink[0] = node->proto.kernel_max[0]; + } + + if (!mind[0] || !mink[0]) { + log_error("pick_min_protocol zero major number"); + return -1; + } + + /* second pick the minimum minor with the chosen major */ + + for (i = 0; i < daemon_member_count; i++) { + node = get_node_daemon(daemon_member[i].nodeid); + if (!node) + continue; + + if (mind[0] == node->proto.daemon_max[0]) { + if (!mind[1] || node->proto.daemon_max[1] < mind[1]) + mind[1] = node->proto.daemon_max[1]; + } + + if (mink[0] == node->proto.kernel_max[0]) { + if (!mink[1] || node->proto.kernel_max[1] < mink[1]) + mink[1] = node->proto.kernel_max[1]; + } + } + + if (!mind[1] || !mink[1]) { + log_error("pick_min_protocol zero minor number"); + return -1; + } + + /* third pick the minimum patch with the chosen major.minor */ + + for (i = 0; i < daemon_member_count; i++) { + node = get_node_daemon(daemon_member[i].nodeid); + if (!node) + continue; + + if (mind[0] == node->proto.daemon_max[0] && + mind[1] == node->proto.daemon_max[1]) { + if (!mind[2] || node->proto.daemon_max[2] < mind[2]) + mind[2] = node->proto.daemon_max[2]; + } + + if (mink[0] == node->proto.kernel_max[0] && + mink[1] == node->proto.kernel_max[1]) { + if (!mink[2] || node->proto.kernel_max[2] < mink[2]) + mink[2] = node->proto.kernel_max[2]; + } + } + + if (!mind[2] || !mink[2]) { + log_error("pick_min_protocol zero patch number"); + return -1; + } + + memcpy(&proto->daemon_run, &mind, sizeof(mind)); + memcpy(&proto->kernel_run, &mink, sizeof(mink)); + return 0; +} + +static void receive_protocol(struct dlm_header *hd, int len) +{ + struct protocol *p; + struct node *node; + + p = (struct protocol *)((char *)hd + sizeof(struct dlm_header)); + protocol_in(p); + + if (len < sizeof(struct dlm_header) + sizeof(struct protocol)) { + log_error("receive_protocol invalid len %d from %d", + len, hd->nodeid); + return; + } + + /* zero is an invalid version value */ + + if (!p->daemon_max[0] || !p->daemon_max[1] || !p->daemon_max[2] || + !p->kernel_max[0] || !p->kernel_max[1] || !p->kernel_max[2]) { + log_error("receive_protocol invalid max value from %d " + "daemon %u.%u.%u kernel %u.%u.%u", hd->nodeid, + p->daemon_max[0], p->daemon_max[1], p->daemon_max[2], + p->kernel_max[0], p->kernel_max[1], p->kernel_max[2]); + return; + } + + /* the run values will be zero until a version is set, after + which none of the run values can be zero */ + + if (p->daemon_run[0] && (!p->daemon_run[1] || !p->daemon_run[2] || + !p->kernel_run[0] || !p->kernel_run[1] || !p->kernel_run[2])) { + log_error("receive_protocol invalid run value from %d " + "daemon %u.%u.%u kernel %u.%u.%u", hd->nodeid, + p->daemon_run[0], p->daemon_run[1], p->daemon_run[2], + p->kernel_run[0], p->kernel_run[1], p->kernel_run[2]); + return; + } + + /* if we have zero run values, and this msg has non-zero run values, + then adopt them as ours; otherwise save this proto message */ + + if (our_protocol.daemon_run[0]) + return; + + if (p->daemon_run[0]) { + memcpy(&our_protocol.daemon_run, &p->daemon_run, + sizeof(struct protocol_version)); + memcpy(&our_protocol.kernel_run, &p->kernel_run, + sizeof(struct protocol_version)); + log_debug("run protocol from nodeid %d", hd->nodeid); + return; + } + + /* save this node's proto so we can tell when we've got all, and + use it to select a minimum protocol from all */ + + node = get_node_daemon(hd->nodeid); + if (!node) { + log_error("receive_protocol no node %d", hd->nodeid); + return; + } + memcpy(&node->proto, p, sizeof(struct protocol)); +} + +static void send_protocol(struct protocol *proto) +{ + struct dlm_header *hd; + struct protocol *pr; + char *buf; + int len; + + len = sizeof(struct dlm_header) + sizeof(struct protocol); + buf = malloc(len); + if (!buf) { + log_error("send_protocol no mem %d", len); + return; + } + memset(buf, 0, len); + + hd = (struct dlm_header *)buf; + pr = (struct protocol *)(buf + sizeof(*hd)); + + hd->type = cpu_to_le16(DLM_MSG_PROTOCOL); + hd->nodeid = cpu_to_le32(our_nodeid); + + memcpy(pr, proto, sizeof(struct protocol)); + protocol_out(pr); + + _send_message(daemon_cpg_handle, buf, len, DLM_MSG_PROTOCOL); +} + +int set_protocol(void) +{ + struct protocol proto; + struct pollfd pollfd; + int sent_proposal = 0; + int rv; + + memset(&pollfd, 0, sizeof(pollfd)); + pollfd.fd = daemon_cpg_fd; + pollfd.events = POLLIN; + + while (1) { + if (our_protocol.daemon_run[0]) + break; + + if (!sent_proposal && all_protocol_messages()) { + /* propose a protocol; look through info from all + nodes and pick the min for both daemon and kernel, + and propose that */ + + sent_proposal = 1; + + /* copy our max values */ + memcpy(&proto, &our_protocol, sizeof(struct protocol)); + + rv = pick_min_protocol(&proto); + if (rv < 0) + return rv; + + log_debug("set_protocol member_count %d propose " + "daemon %u.%u.%u kernel %u.%u.%u", + daemon_member_count, + proto.daemon_run[0], proto.daemon_run[1], + proto.daemon_run[2], proto.kernel_run[0], + proto.kernel_run[1], proto.kernel_run[2]); + + send_protocol(&proto); + } + + /* only process messages/events from daemon cpg until protocol + is established */ + + rv = poll(&pollfd, 1, -1); + if (rv == -1 && errno == EINTR) { + if (daemon_quit) + return -1; + continue; + } + if (rv < 0) { + log_error("set_protocol poll errno %d", errno); + return -1; + } + + if (pollfd.revents & POLLIN) + process_cpg(0); + if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { + log_error("set_protocol poll revents %u", + pollfd.revents); + return -1; + } + } + + if (our_protocol.daemon_run[0] != our_protocol.daemon_max[0] || + our_protocol.daemon_run[1] > our_protocol.daemon_max[1]) { + log_error("incompatible daemon protocol run %u.%u.%u max %u.%u.%u", + our_protocol.daemon_run[0], + our_protocol.daemon_run[1], + our_protocol.daemon_run[2], + our_protocol.daemon_max[0], + our_protocol.daemon_max[1], + our_protocol.daemon_max[2]); + return -1; + } + + if (our_protocol.kernel_run[0] != our_protocol.kernel_max[0] || + our_protocol.kernel_run[1] > our_protocol.kernel_max[1]) { + log_error("incompatible kernel protocol run %u.%u.%u max %u.%u.%u", + our_protocol.kernel_run[0], + our_protocol.kernel_run[1], + our_protocol.kernel_run[2], + our_protocol.kernel_max[0], + our_protocol.kernel_max[1], + our_protocol.kernel_max[2]); + return -1; + } + + log_debug("daemon run %u.%u.%u max %u.%u.%u " + "kernel run %u.%u.%u max %u.%u.%u", + our_protocol.daemon_run[0], + our_protocol.daemon_run[1], + our_protocol.daemon_run[2], + our_protocol.daemon_max[0], + our_protocol.daemon_max[1], + our_protocol.daemon_max[2], + our_protocol.kernel_run[0], + our_protocol.kernel_run[1], + our_protocol.kernel_run[2], + our_protocol.kernel_max[0], + our_protocol.kernel_max[1], + our_protocol.kernel_max[2]); + return 0; +} + +static void deliver_cb_daemon(cpg_handle_t handle, struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *data, int len) +{ + struct dlm_header *hd; + + if (len < sizeof(*hd)) { + log_error("deliver_cb short message %d", len); + return; + } + + hd = (struct dlm_header *)data; + dlm_header_in(hd); + + switch (hd->type) { + case DLM_MSG_PROTOCOL: + receive_protocol(hd, len); + break; + default: + log_error("deliver_cb_daemon unknown msg type %d", hd->type); + } +} + +static void confchg_cb_daemon(cpg_handle_t handle, struct cpg_name *group_name, + struct cpg_address *member_list, int member_list_entries, + struct cpg_address *left_list, int left_list_entries, + struct cpg_address *joined_list, int joined_list_entries) +{ + int i; + + if (joined_list_entries) + send_protocol(&our_protocol); + + memset(&daemon_member, 0, sizeof(daemon_member)); + daemon_member_count = member_list_entries; + + for (i = 0; i < member_list_entries; i++) { + daemon_member[i] = member_list[i]; + add_node_daemon(member_list[i].nodeid); + } +} + +static cpg_callbacks_t cpg_callbacks_daemon = { + .cpg_deliver_fn = deliver_cb_daemon, + .cpg_confchg_fn = confchg_cb_daemon, +}; + +void process_cpg(int ci) +{ + cpg_error_t error; + + error = cpg_dispatch(daemon_cpg_handle, CPG_DISPATCH_ALL); + if (error != CPG_OK) + log_error("daemon cpg_dispatch error %d", error); +} + int setup_cpg(void) { cpg_error_t error; + struct cpg_name name; + int i = 0; - error = cpg_initialize(&daemon_handle, &cpg_callbacks); + INIT_LIST_HEAD(&daemon_nodes); + + memset(&our_protocol, 0, sizeof(our_protocol)); + our_protocol.daemon_max[0] = 1; + our_protocol.daemon_max[1] = 1; + our_protocol.daemon_max[2] = 1; + our_protocol.kernel_max[0] = 1; + our_protocol.kernel_max[1] = 1; + our_protocol.kernel_max[2] = 1; + + error = cpg_initialize(&daemon_cpg_handle, &cpg_callbacks_daemon); if (error != CPG_OK) { - log_error("setup_cpg cpg_initialize error %d", error); + log_error("daemon cpg_initialize error %d", error); return -1; } - /* join "dlm_controld" cpg to interact with other daemons in - the cluster before we start processing uevents? Could this - also help in handling transient partitions? */ + cpg_fd_get(daemon_cpg_handle, &daemon_cpg_fd); - return 0; + memset(&name, 0, sizeof(name)); + sprintf(name.value, "dlm:controld"); + name.length = strlen(name.value) + 1; + + retry: + error = cpg_join(daemon_cpg_handle, &name); + if (error == CPG_ERR_TRY_AGAIN) { + sleep(1); + if (!(++i % 10)) + log_error("daemon cpg_join error retrying"); + goto retry; + } + if (error != CPG_OK) { + log_error("daemon cpg_join error %d", error); + goto fail; + } + + log_debug("setup_cpg %d", daemon_cpg_fd); + return daemon_cpg_fd; + + fail: + cpg_finalize(daemon_cpg_handle); + return -1; } /* fs_controld has seen nodedown for nodeid; it's now ok for dlm to do diff --git a/group/dlm_controld/dlm_daemon.h b/group/dlm_controld/dlm_daemon.h index f1f4c29..c704b99 100644 --- a/group/dlm_controld/dlm_daemon.h +++ b/group/dlm_controld/dlm_daemon.h @@ -127,7 +127,8 @@ do { \ /* dlm_header types */ enum { - DLM_MSG_START = 1, + DLM_MSG_PROTOCOL = 1, + DLM_MSG_START, DLM_MSG_PLOCK, DLM_MSG_PLOCK_OWN, DLM_MSG_PLOCK_DROP, @@ -234,6 +235,8 @@ int get_weight(int nodeid, char *lockspace); /* cpg.c */ int setup_cpg(void); +void process_cpg(int ci); +int set_protocol(void); void process_lockspace_changes(void); void dlm_send_message(struct lockspace *ls, char *buf, int len); int dlm_join_lockspace(struct lockspace *ls); diff --git a/group/dlm_controld/main.c b/group/dlm_controld/main.c index 57feb0e..b0eba12 100644 --- a/group/dlm_controld/main.c +++ b/group/dlm_controld/main.c @@ -926,7 +926,11 @@ static void loop(void) rv = setup_cpg(); if (rv < 0) goto out; - /* client_add(rv, process_cpg, cluster_dead); */ + client_add(rv, process_cpg, cluster_dead); + + rv = set_protocol(); + if (rv < 0) + goto out; if (cfgd_enable_deadlk) { rv = setup_netlink(); diff --git a/group/gfs_controld/cpg-new.c b/group/gfs_controld/cpg-new.c index 1f0a6ec..d7bc4ed 100644 --- a/group/gfs_controld/cpg-new.c +++ b/group/gfs_controld/cpg-new.c @@ -3082,7 +3082,7 @@ int set_protocol(void) } if (our_protocol.daemon_run[0] != our_protocol.daemon_max[0] || - our_protocol.daemon_run[1] != our_protocol.daemon_max[1]) { + our_protocol.daemon_run[1] > our_protocol.daemon_max[1]) { log_error("incompatible daemon protocol run %u.%u.%u max %u.%u.%u", our_protocol.daemon_run[0], our_protocol.daemon_run[1], @@ -3094,7 +3094,7 @@ int set_protocol(void) } if (our_protocol.kernel_run[0] != our_protocol.kernel_max[0] || - our_protocol.kernel_run[1] != our_protocol.kernel_max[1]) { + our_protocol.kernel_run[1] > our_protocol.kernel_max[1]) { log_error("incompatible kernel protocol run %u.%u.%u max %u.%u.%u", our_protocol.kernel_run[0], our_protocol.kernel_run[1],