abeekhof@suse.de
2006-Dec-04 05:12 UTC
[Ocfs2-devel] [patch 1/3] ocfs2-expose-o2nm_cluster.patch
From: Andrew Beekhof <abeekhof@suse.de> Subject: [patch 1/3] OCFS2 - Expose struct o2nm_cluster Subsequent patches (namely userspace heartbeat and configurable timeouts) require access to the o2nm_cluster struct. This patch does the necessary shuffling. Signed-off-by: Andrew Beekhof <abeekhof@suse.de> --- fs/ocfs2/cluster/nodemanager.c | 13 +------------ fs/ocfs2/cluster/nodemanager.h | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 12 deletions(-) Index: fs/ocfs2/cluster/nodemanager.c ==================================================================--- fs/ocfs2/cluster/nodemanager.c.orig 2006-11-15 11:58:34.000000000 +0100 +++ fs/ocfs2/cluster/nodemanager.c 2006-11-15 12:02:57.000000000 +0100 @@ -35,7 +35,7 @@ /* for now we operate under the assertion that there can be only one * cluster active at a time. Changing this will require trickling * cluster references throughout where nodes are looked up */ -static struct o2nm_cluster *o2nm_single_cluster = NULL; +struct o2nm_cluster *o2nm_single_cluster = NULL; #define OCFS2_MAX_HB_CTL_PATH 256 static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl"; @@ -97,17 +97,6 @@ const char *o2nm_get_hb_ctl_path(void) } EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path); -struct o2nm_cluster { - struct config_group cl_group; - unsigned cl_has_local:1; - u8 cl_local_node; - rwlock_t cl_nodes_lock; - struct o2nm_node *cl_nodes[O2NM_MAX_NODES]; - struct rb_root cl_node_ip_tree; - /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */ - unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; -}; - struct o2nm_node *o2nm_get_node_by_num(u8 node_num) { struct o2nm_node *node = NULL; Index: fs/ocfs2/cluster/nodemanager.h ==================================================================--- fs/ocfs2/cluster/nodemanager.h.orig 2006-11-15 12:01:55.000000000 +0100 +++ fs/ocfs2/cluster/nodemanager.h 2006-11-15 12:03:25.000000000 +0100 @@ -53,6 +53,20 @@ struct o2nm_node { unsigned long nd_set_attributes; }; +struct o2nm_cluster { + struct config_group cl_group; + unsigned cl_has_local:1; + u8 cl_local_node; + rwlock_t cl_nodes_lock; + struct o2nm_node *cl_nodes[O2NM_MAX_NODES]; + struct rb_root cl_node_ip_tree; + + /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */ + unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; +}; + +extern struct o2nm_cluster *o2nm_single_cluster; + u8 o2nm_this_node(void); int o2nm_configured_node_map(unsigned long *map, unsigned bytes); --
abeekhof@suse.de
2006-Dec-04 05:12 UTC
[Ocfs2-devel] [patch 0/3] OCFS Configurable timeouts - Revision 5
Minor improvements to the handshake logic - Ensure failed handshakes are only processed once - Skip the o2net_msg code after a sucessful handshake Do we need logic to handle inbound messages that arrive before a valid handshake? Or does this not happen? -- Andrew
abeekhof@suse.de
2006-Dec-04 05:12 UTC
[Ocfs2-devel] [patch 2/3] ocfs2-configurable-timeout.patch
From: Jeff Mahoney <jeffm@suse.de> Subject: [patch 2/3] OCFS2 Configurable timeouts Allow configuration of OCFS2 timeouts from userspace via configfs Signed-off-by: Andrew Beekhof <abeekhof@suse.de> --- fs/ocfs2/cluster/nodemanager.c | 161 ++++++++++++++++++++++++++++++++++++++++ fs/ocfs2/cluster/nodemanager.h | 3 fs/ocfs2/cluster/tcp.c | 60 +++++++++++--- fs/ocfs2/cluster/tcp.h | 7 + fs/ocfs2/cluster/tcp_internal.h | 6 - 5 files changed, 219 insertions(+), 18 deletions(-) Index: fs/ocfs2/cluster/nodemanager.c ==================================================================--- fs/ocfs2/cluster/nodemanager.c.orig 2006-11-30 18:49:35.000000000 +0100 +++ fs/ocfs2/cluster/nodemanager.c 2006-11-30 18:56:02.000000000 +0100 @@ -532,6 +532,161 @@ static struct o2nm_node_group *to_o2nm_n } #endif +struct o2nm_cluster_attribute { + struct configfs_attribute attr; + ssize_t (*show)(struct o2nm_cluster *, char *); + ssize_t (*store)(struct o2nm_cluster *, const char *, size_t); +}; + +static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count, + unsigned int *val) +{ + unsigned long tmp; + char *p = (char *)page; + + tmp = simple_strtoul(p, &p, 0); + if (!p || (*p && (*p != '\n'))) + return -EINVAL; + + if (tmp == 0) + return -EINVAL; + if (tmp >= (u32)-1) + return -ERANGE; + + *val = tmp; + + return count; +} + +static ssize_t o2nm_cluster_attr_idle_timeout_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms); +} + +static ssize_t o2nm_cluster_attr_idle_timeout_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + ssize_t ret; + unsigned int val; + + ret = o2nm_cluster_attr_write(page, count, &val); + + if (ret > 0) { + if (val <= cluster->cl_keepalive_delay_ms) { + mlog(ML_NOTICE, "o2net: idle timeout must be larger " + "than keepalive delay\n"); + return -EINVAL; + } + cluster->cl_idle_timeout_ms = val; + } + + return ret; +} + +static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms); +} + +static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + ssize_t ret; + unsigned int val; + + ret = o2nm_cluster_attr_write(page, count, &val); + + if (ret > 0) { + if (val >= cluster->cl_idle_timeout_ms) { + mlog(ML_NOTICE, "o2net: keepalive delay must be " + "smaller than idle timeout\n"); + return -EINVAL; + } + cluster->cl_keepalive_delay_ms = val; + } + + return ret; +} + +static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read( + struct o2nm_cluster *cluster, char *page) +{ + return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms); +} + +static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write( + struct o2nm_cluster *cluster, const char *page, size_t count) +{ + return o2nm_cluster_attr_write(page, count, + &cluster->cl_reconnect_delay_ms); +} +static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "idle_timeout_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_idle_timeout_ms_read, + .store = o2nm_cluster_attr_idle_timeout_ms_write, +}; + +static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "keepalive_delay_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_keepalive_delay_ms_read, + .store = o2nm_cluster_attr_keepalive_delay_ms_write, +}; + +static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "reconnect_delay_ms", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = o2nm_cluster_attr_reconnect_delay_ms_read, + .store = o2nm_cluster_attr_reconnect_delay_ms_write, +}; + +static struct configfs_attribute *o2nm_cluster_attrs[] = { + &o2nm_cluster_attr_idle_timeout_ms.attr, + &o2nm_cluster_attr_keepalive_delay_ms.attr, + &o2nm_cluster_attr_reconnect_delay_ms.attr, + NULL, +}; +static ssize_t o2nm_cluster_show(struct config_item *item, + struct configfs_attribute *attr, + char *page) +{ + struct o2nm_cluster *cluster = to_o2nm_cluster(item); + struct o2nm_cluster_attribute *o2nm_cluster_attr + container_of(attr, struct o2nm_cluster_attribute, attr); + ssize_t ret = 0; + + if (o2nm_cluster_attr->show) + ret = o2nm_cluster_attr->show(cluster, page); + return ret; +} + +static ssize_t o2nm_cluster_store(struct config_item *item, + struct configfs_attribute *attr, + const char *page, size_t count) +{ + struct o2nm_cluster *cluster = to_o2nm_cluster(item); + struct o2nm_cluster_attribute *o2nm_cluster_attr + container_of(attr, struct o2nm_cluster_attribute, attr); + ssize_t ret; + + if (o2nm_cluster_attr->store == NULL) { + ret = -EINVAL; + goto out; + } + + ret = o2nm_cluster_attr->store(cluster, page, count); + if (ret < count) + goto out; +out: + return ret; +} + static struct config_item *o2nm_node_group_make_item(struct config_group *group, const char *name) { @@ -613,10 +768,13 @@ static void o2nm_cluster_release(struct static struct configfs_item_operations o2nm_cluster_item_ops = { .release = o2nm_cluster_release, + .show_attribute = o2nm_cluster_show, + .store_attribute = o2nm_cluster_store, }; static struct config_item_type o2nm_cluster_type = { .ct_item_ops = &o2nm_cluster_item_ops, + .ct_attrs = o2nm_cluster_attrs, .ct_owner = THIS_MODULE, }; @@ -667,6 +825,9 @@ static struct config_group *o2nm_cluster cluster->cl_group.default_groups[2] = NULL; rwlock_init(&cluster->cl_nodes_lock); cluster->cl_node_ip_tree = RB_ROOT; + cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT; + cluster->cl_idle_timeout_ms = O2NET_IDLE_TIMEOUT_MS_DEFAULT; + cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT; ret = &cluster->cl_group; o2nm_single_cluster = cluster; Index: fs/ocfs2/cluster/nodemanager.h ==================================================================--- fs/ocfs2/cluster/nodemanager.h.orig 2006-11-30 18:49:35.000000000 +0100 +++ fs/ocfs2/cluster/nodemanager.h 2006-11-30 18:49:37.000000000 +0100 @@ -60,6 +60,9 @@ struct o2nm_cluster { rwlock_t cl_nodes_lock; struct o2nm_node *cl_nodes[O2NM_MAX_NODES]; struct rb_root cl_node_ip_tree; + unsigned int cl_idle_timeout_ms; + unsigned int cl_keepalive_delay_ms; + unsigned int cl_reconnect_delay_ms; /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */ unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; Index: fs/ocfs2/cluster/tcp.c ==================================================================--- fs/ocfs2/cluster/tcp.c.orig 2006-11-30 14:54:10.000000000 +0100 +++ fs/ocfs2/cluster/tcp.c 2006-11-30 18:53:24.000000000 +0100 @@ -147,6 +147,28 @@ static void o2net_listen_data_ready(stru static void o2net_sc_send_keep_req(void *arg); static void o2net_idle_timer(unsigned long data); static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); +static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc); + +/* + * FIXME: These should use to_o2nm_cluster_from_node(), but we end up + * losing our parent link to the cluster during shutdown. This can be + * solved by adding a pre-removal callback to configfs, or passing + * around the cluster with the node. -jeffm + */ +static inline int o2net_reconnect_delay(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_reconnect_delay_ms; +} + +static inline int o2net_keepalive_delay(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_keepalive_delay_ms; +} + +static inline int o2net_idle_timeout(struct o2nm_node *node) +{ + return o2nm_single_cluster->cl_idle_timeout_ms; +} static inline int o2net_sys_err_to_errno(enum o2net_system_error err) { @@ -271,6 +293,8 @@ static void sc_kref_release(struct kref { struct o2net_sock_container *sc = container_of(kref, struct o2net_sock_container, sc_kref); + BUG_ON(timer_pending(&sc->sc_idle_timeout)); + sclog(sc, "releasing\n"); if (sc->sc_sock) { @@ -424,9 +448,9 @@ static void o2net_set_nn_state(struct o2 /* delay if we're withing a RECONNECT_DELAY of the * last attempt */ delay = (nn->nn_last_connect_attempt + - msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) + msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node))) - jiffies; - if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) + if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node))) delay = 0; mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay); queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay); @@ -1103,7 +1127,7 @@ static int o2net_check_handshake(struct /* set valid and queue the idle timers only if it hasn't been * shut down already */ if (nn->nn_sc == sc) { - o2net_sc_postpone_idle(sc); + o2net_sc_reset_idle_timer(sc); o2net_set_nn_state(nn, sc, 1, 0); } spin_unlock(&nn->nn_lock); @@ -1280,8 +1304,10 @@ static void o2net_idle_timer(unsigned lo do_gettimeofday(&now); - printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 " - "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); + printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u " + "seconds, shutting it down.\n", SC_NODEF_ARGS(sc), + o2net_idle_timeout(sc->sc_node) / 1000, + o2net_idle_timeout(sc->sc_node) % 1000); mlog(ML_NOTICE, "here are some times that might help debug the " "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", @@ -1299,14 +1325,21 @@ static void o2net_idle_timer(unsigned lo o2net_sc_queue_work(sc, &sc->sc_shutdown_work); } -static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) +static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc) { o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, - O2NET_KEEPALIVE_DELAY_SECS * HZ); + msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node))); do_gettimeofday(&sc->sc_tv_timer); mod_timer(&sc->sc_idle_timeout, - jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ)); + jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node))); +} + +static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) +{ + /* Only push out an existing timer */ + if (timer_pending(&sc->sc_idle_timeout)) + o2net_sc_reset_idle_timer(sc); } /* this work func is kicked whenever a path sets the nn state which doesn't @@ -1427,9 +1460,12 @@ static void o2net_connect_expired(void * spin_lock(&nn->nn_lock); if (!nn->nn_sc_valid) { + struct o2nm_node *node = nn->nn_sc->sc_node; mlog(ML_ERROR, "no connection established with node %u after " - "%u seconds, giving up and returning errors.\n", - o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); + "%u.%u seconds, giving up and returning errors.\n", + o2net_num_from_nn(nn), + o2net_idle_timeout(node) / 1000, + o2net_idle_timeout(node) % 1000); o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); } @@ -1480,14 +1516,14 @@ static void o2net_hb_node_up_cb(struct o /* ensure an immediate connect attempt */ nn->nn_last_connect_attempt = jiffies - - (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); + (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1); if (node_num != o2nm_this_node()) { /* heartbeat doesn't work unless a local node number is * configured and doing so brings up the o2net_wq, so we can * use it.. */ queue_delayed_work(o2net_wq, &nn->nn_connect_expired, - O2NET_IDLE_TIMEOUT_SECS * HZ); + msecs_to_jiffies(o2net_idle_timeout(node))); /* believe it or not, accept and node hearbeating testing * can succeed for this node before we got here.. so Index: fs/ocfs2/cluster/tcp.h ==================================================================--- fs/ocfs2/cluster/tcp.h.orig 2006-11-30 14:54:10.000000000 +0100 +++ fs/ocfs2/cluster/tcp.h 2006-11-30 18:56:45.000000000 +0100 @@ -54,6 +54,13 @@ typedef int (o2net_msg_handler_func)(str #define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) +/* same as hb delay, we're waiting for another node to recognize our hb */ +#define O2NET_RECONNECT_DELAY_MS_DEFAULT 2000 + +#define O2NET_KEEPALIVE_DELAY_MS_DEFAULT 5000 +#define O2NET_IDLE_TIMEOUT_MS_DEFAULT 10000 + + /* TODO: figure this out.... */ static inline int o2net_link_down(int err, struct socket *sock) { Index: fs/ocfs2/cluster/tcp_internal.h ==================================================================--- fs/ocfs2/cluster/tcp_internal.h.orig 2006-11-30 14:54:10.000000000 +0100 +++ fs/ocfs2/cluster/tcp_internal.h 2006-11-30 18:53:24.000000000 +0100 @@ -27,17 +27,11 @@ #define O2NET_MSG_KEEP_REQ_MAGIC ((u16)0xfa57) #define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58) -/* same as hb delay, we're waiting for another node to recognize our hb */ -#define O2NET_RECONNECT_DELAY_MS O2HB_REGION_TIMEOUT_MS - /* we're delaying our quorum decision so that heartbeat will have timed * out truly dead nodes by the time we come around to making decisions * on their number */ #define O2NET_QUORUM_DELAY_MS ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS) -#define O2NET_KEEPALIVE_DELAY_SECS 5 -#define O2NET_IDLE_TIMEOUT_SECS 10 - /* * This version number represents quite a lot, unfortunately. It not * only represents the raw network message protocol on the wire but also --
abeekhof@suse.de
2006-Dec-04 05:12 UTC
[Ocfs2-devel] [patch 3/3] ocfs2-timeout-protocol.patch
From: Andrew Beekhof <abeekhof@suse.de> Subject: [patch 3/3] OCFS2 Configurable timeouts - Protocol changes Modify the OCFS2 handshake to ensure essential timeouts are configured identically on all nodes. Only allow changes when there are no connected peers Improves the logic in o2net_advance_rx() which broke now that sizeof(struct o2net_handshake) is greater than sizeof(struct o2net_msg) Included is the field for userspace-heartbeat timeout to avoid the need for further protocol changes. Uses a global spinlock to ensure the decisions to update configfs entries are made on the correct value. The region covered by the spinlock when incrimenting the counter is much larger as this is the more critical case. Signed-off-by: Andrew Beekhof <abeekhof@suse.de> --- fs/ocfs2/cluster/nodemanager.c | 19 +++++++ fs/ocfs2/cluster/tcp.c | 96 +++++++++++++++++++++++++++++++++++----- fs/ocfs2/cluster/tcp.h | 1 fs/ocfs2/cluster/nodemanager.c | 30 ++++++++++-- fs/ocfs2/cluster/tcp.c | 94 ++++++++++++++++++++++++++++++++++++---- fs/ocfs2/cluster/tcp.h | 1 fs/ocfs2/cluster/tcp_internal.h | 6 ++ 4 files changed, 115 insertions(+), 16 deletions(-) Index: fs/ocfs2/cluster/nodemanager.c ==================================================================--- fs/ocfs2/cluster/nodemanager.c.orig 2006-12-04 12:07:32.000000000 +0100 +++ fs/ocfs2/cluster/nodemanager.c 2006-12-04 12:07:32.000000000 +0100 @@ -573,12 +573,21 @@ static ssize_t o2nm_cluster_attr_idle_ti ret = o2nm_cluster_attr_write(page, count, &val); if (ret > 0) { - if (val <= cluster->cl_keepalive_delay_ms) { + if (cluster->cl_idle_timeout_ms != val + && o2net_num_connected_peers()) { + mlog(ML_NOTICE, + "o2net: cannot change idle timeout after " + "the first peer has agreed to it." + " %d connected peers\n", + o2net_num_connected_peers()); + ret = -EINVAL; + } else if (val <= cluster->cl_keepalive_delay_ms) { mlog(ML_NOTICE, "o2net: idle timeout must be larger " "than keepalive delay\n"); - return -EINVAL; + ret = -EINVAL; + } else { + cluster->cl_idle_timeout_ms = val; } - cluster->cl_idle_timeout_ms = val; } return ret; @@ -599,12 +608,21 @@ static ssize_t o2nm_cluster_attr_keepali ret = o2nm_cluster_attr_write(page, count, &val); if (ret > 0) { - if (val >= cluster->cl_idle_timeout_ms) { + if (cluster->cl_keepalive_delay_ms != val + && o2net_num_connected_peers()) { + mlog(ML_NOTICE, + "o2net: cannot change keepalive delay after" + " the first peer has agreed to it." + " %d connected peers\n", + o2net_num_connected_peers()); + ret = -EINVAL; + } else if (val >= cluster->cl_idle_timeout_ms) { mlog(ML_NOTICE, "o2net: keepalive delay must be " "smaller than idle timeout\n"); - return -EINVAL; + ret = -EINVAL; + } else { + cluster->cl_keepalive_delay_ms = val; } - cluster->cl_keepalive_delay_ms = val; } return ret; Index: fs/ocfs2/cluster/tcp.c ==================================================================--- fs/ocfs2/cluster/tcp.c.orig 2006-12-04 12:07:32.000000000 +0100 +++ fs/ocfs2/cluster/tcp.c 2006-12-04 13:59:41.000000000 +0100 @@ -380,6 +380,13 @@ static void o2net_sc_cancel_delayed_work sc_put(sc); } +atomic_t o2net_connected_peers = ATOMIC_INIT(0); + +int o2net_num_connected_peers(void) +{ + return atomic_read(&o2net_connected_peers); +} + static void o2net_set_nn_state(struct o2net_node *nn, struct o2net_sock_container *sc, unsigned valid, int err) @@ -390,6 +397,13 @@ static void o2net_set_nn_state(struct o2 assert_spin_locked(&nn->nn_lock); + if (old_sc != sc) { + if (old_sc) + atomic_dec(&o2net_connected_peers); + else + atomic_inc(&o2net_connected_peers); + } + /* the node num comparison and single connect/accept path should stop * an non-null sc from being overwritten with another */ BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); @@ -1121,6 +1135,44 @@ static int o2net_check_handshake(struct return -1; } + /* + * Ensure timeouts are consistent with other nodes, otherwise + * we can end up with one node thinking that the other must be down, + * but isn't. This can ultimately cause corruption. + */ + if (be32_to_cpu(hand->o2net_idle_timeout_ms) !+ o2net_idle_timeout(sc->sc_node)) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_idle_timeout_ms), + o2net_idle_timeout(sc->sc_node)); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + + if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !+ o2net_keepalive_delay(sc->sc_node)) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_keepalive_delay_ms), + o2net_keepalive_delay(sc->sc_node)); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + + if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !+ O2HB_MAX_WRITE_TIMEOUT_MS) { + mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of " + "%u ms, but we use %u ms locally. disconnecting\n", + SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), + O2HB_MAX_WRITE_TIMEOUT_MS); + o2net_ensure_shutdown(nn, sc, -ENOTCONN); + return -1; + } + sc->sc_handshake_ok = 1; spin_lock(&nn->nn_lock); @@ -1153,6 +1205,23 @@ static int o2net_advance_rx(struct o2net sclog(sc, "receiving\n"); do_gettimeofday(&sc->sc_tv_advance_start); + if (unlikely(sc->sc_handshake_ok == 0)) { + if(sc->sc_page_off < sizeof(struct o2net_handshake)) { + data = page_address(sc->sc_page) + sc->sc_page_off; + datalen = sizeof(struct o2net_handshake) - sc->sc_page_off; + ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); + if (ret > 0) + sc->sc_page_off += ret; + } + + if (sc->sc_page_off == sizeof(struct o2net_handshake)) { + o2net_check_handshake(sc); + if (unlikely(sc->sc_handshake_ok == 0)) + ret = -EPROTO; + } + goto out; + } + /* do we need more header? */ if (sc->sc_page_off < sizeof(struct o2net_msg)) { data = page_address(sc->sc_page) + sc->sc_page_off; @@ -1160,15 +1229,6 @@ static int o2net_advance_rx(struct o2net ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) { sc->sc_page_off += ret; - - /* this working relies on the handshake being - * smaller than the normal message header */ - if (sc->sc_page_off >= sizeof(struct o2net_handshake)&& - !sc->sc_handshake_ok && o2net_check_handshake(sc)) { - ret = -EPROTO; - goto out; - } - /* only swab incoming here.. we can * only get here once as we cross from * being under to over */ @@ -1269,6 +1329,18 @@ static int o2net_set_nodelay(struct sock return ret; } +static void o2net_initialize_handshake(void) +{ + o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32( + O2HB_MAX_WRITE_TIMEOUT_MS); + o2net_hand->o2net_idle_timeout_ms = cpu_to_be32( + o2net_idle_timeout(NULL)); + o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32( + o2net_keepalive_delay(NULL)); + o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32( + o2net_reconnect_delay(NULL)); +} + /* ------------------------------------------------------------ */ /* called when a connect completes and after a sock is accepted. the @@ -1281,6 +1353,7 @@ static void o2net_sc_connect_completed(v (unsigned long long)O2NET_PROTOCOL_VERSION, (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); + o2net_initialize_handshake(); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); sc_put(sc); } @@ -1505,6 +1578,8 @@ static void o2net_hb_node_down_cb(struct if (node_num != o2nm_this_node()) o2net_disconnect_node(node); + + BUG_ON(atomic_read(&o2net_connected_peers) < 0); } static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, @@ -1668,6 +1743,7 @@ static int o2net_accept_one(struct socke o2net_register_callbacks(sc->sc_sock->sk, sc); o2net_sc_queue_work(sc, &sc->sc_rx_work); + o2net_initialize_handshake(); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); out: Index: fs/ocfs2/cluster/tcp.h ==================================================================--- fs/ocfs2/cluster/tcp.h.orig 2006-12-04 12:07:32.000000000 +0100 +++ fs/ocfs2/cluster/tcp.h 2006-12-04 12:07:32.000000000 +0100 @@ -108,6 +108,7 @@ void o2net_unregister_hb_callbacks(void) int o2net_start_listening(struct o2nm_node *node); void o2net_stop_listening(struct o2nm_node *node); void o2net_disconnect_node(struct o2nm_node *node); +int o2net_num_connected_peers(void); int o2net_init(void); void o2net_exit(void); Index: fs/ocfs2/cluster/tcp_internal.h ==================================================================--- fs/ocfs2/cluster/tcp_internal.h.orig 2006-12-04 12:07:32.000000000 +0100 +++ fs/ocfs2/cluster/tcp_internal.h 2006-12-04 12:07:32.000000000 +0100 @@ -48,10 +48,14 @@ * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 4ULL +#define O2NET_PROTOCOL_VERSION 5ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; + __be32 o2hb_heartbeat_timeout_ms; + __be32 o2net_idle_timeout_ms; + __be32 o2net_keepalive_delay_ms; + __be32 o2net_reconnect_delay_ms; }; struct o2net_node { --