Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] ocfs2 cleanups, bug fixes for Linux kernel 3.1 - final drop
Joel, Hopefully this is the final drop before the 3.1 merge window. I had posted most of the patches before. In fact the first 12 are exactly the same. The 13th one is modified and 14,15,16 are new. The previous drop can be accessed here: http://oss.oracle.com/pipermail/ocfs2-devel/2011-June/008243.html [PATCH 01/16] ocfs2/cluster: Abort heartbeat start on hard-ro devices [PATCH 02/16] ocfs2/cluster: Clean up messages in o2net [PATCH 03/16] ocfs2/dlm: Clean up messages in o2dlm [PATCH 04/16] ocfs2: Clean up messages in stack_o2cb.c [PATCH 05/16] ocfs2/dlm: Cleanup up dlm_finish_local_lockres_recovery() [PATCH 06/16] ocfs2/dlm: Clean up refmap helpers [PATCH 07/16] ocfs2/dlm: Trace insert/remove of resource to/from hash [PATCH 08/16] ocfs2/dlm: Cleanup dlm_wait_for_node_death() and dlm_wait_for_node_recovery() [PATCH 09/16] ocfs2/dlm: Take inflight reference count for remotely mastered resources too [PATCH 10/16] ocfs2/dlm: dlmlock_remote() needs to account for remastery [PATCH 11/16] ocfs2/cluster: Fix output in file elapsed_time_in_ms [PATCH 12/16] ocfs2/cluster: Add new function o2net_fill_node_map() Same as the last drop. [PATCH 13/16] ocfs2/cluster: Cluster up now includes network connections too One change. It now waits for a bit for the two bitmaps to align. [PATCH 14/16] ocfs2: Clean up messages in the fs [PATCH 15/16] ocfs2: Add comment about orphan scanning I added the comment so that we remember the reason we added orphan scanning in the first place. [PATCH 16/16] ocfs2: Fix ocfs2_page_mkwrite() This is Wengang's patch + some cleanups suggested by Mark. Thanks Sunil
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 01/16] ocfs2/cluster: Abort heartbeat start on hard-ro devices
Currently if the heartbeat device is hard-ro, the o2hb thread keeps chugging along and dumping errors along the way. The user needs to manually stop the heartbeat. The patch addresses this shortcoming by adding a limit to the number of times the hb thread will iterate in an unsteady state. If the hb thread does not ready steady state in that many interation, the start is aborted. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/cluster/heartbeat.c | 185 ++++++++++++++++++++++++++---------------- 1 files changed, 116 insertions(+), 69 deletions(-) diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 9a3e6bb..4728a74 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -216,6 +216,7 @@ struct o2hb_region { struct list_head hr_all_item; unsigned hr_unclean_stop:1, + hr_aborted_start:1, hr_item_pinned:1, hr_item_dropped:1; @@ -254,6 +255,10 @@ struct o2hb_region { * a more complete api that doesn't lead to this sort of fragility. */ atomic_t hr_steady_iterations; + /* terminate o2hb thread if it does not reach steady state + * (hr_steady_iterations == 0) within hr_unsteady_iterations */ + atomic_t hr_unsteady_iterations; + char hr_dev_name[BDEVNAME_SIZE]; unsigned int hr_timeout_ms; @@ -324,6 +329,10 @@ static void o2hb_write_timeout(struct work_struct *work) static void o2hb_arm_write_timeout(struct o2hb_region *reg) { + /* Arm writeout only after thread reaches steady state */ + if (atomic_read(®->hr_steady_iterations) != 0) + return; + mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); @@ -537,9 +546,14 @@ static int o2hb_verify_crc(struct o2hb_region *reg, return read == computed; } -/* We want to make sure that nobody is heartbeating on top of us -- - * this will help detect an invalid configuration. */ -static void o2hb_check_last_timestamp(struct o2hb_region *reg) +/* + * Compare the slot data with what we wrote in the last iteration. + * If the match fails, print an appropriate error message. This is to + * detect errors like... another node hearting on the same slot, + * flaky device that is losing writes, etc. + * Returns 1 if check succeeds, 0 otherwise. + */ +static int o2hb_check_own_slot(struct o2hb_region *reg) { struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; @@ -548,13 +562,13 @@ static void o2hb_check_last_timestamp(struct o2hb_region *reg) slot = ®->hr_slots[o2nm_this_node()]; /* Don't check on our 1st timestamp */ if (!slot->ds_last_time) - return; + return 0; hb_block = slot->ds_raw_block; if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time && le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation && hb_block->hb_node == slot->ds_node_num) - return; + return 1; #define ERRSTR1 "Another node is heartbeating on device" #define ERRSTR2 "Heartbeat generation mismatch on device" @@ -574,6 +588,8 @@ static void o2hb_check_last_timestamp(struct o2hb_region *reg) (unsigned long long)slot->ds_last_time, hb_block->hb_node, (unsigned long long)le64_to_cpu(hb_block->hb_generation), (unsigned long long)le64_to_cpu(hb_block->hb_seq)); + + return 0; } static inline void o2hb_prepare_block(struct o2hb_region *reg, @@ -719,17 +735,24 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) o2nm_node_put(node); } -static void o2hb_set_quorum_device(struct o2hb_region *reg, - struct o2hb_disk_slot *slot) +static void o2hb_set_quorum_device(struct o2hb_region *reg) { - assert_spin_locked(&o2hb_live_lock); - if (!o2hb_global_heartbeat_active()) return; - if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) + /* Prevent race with o2hb_heartbeat_group_drop_item() */ + if (kthread_should_stop()) + return; + + /* Tag region as quorum only after thread reaches steady state */ + if (atomic_read(®->hr_steady_iterations) != 0) return; + spin_lock(&o2hb_live_lock); + + if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) + goto unlock; + /* * A region can be added to the quorum only when it sees all * live nodes heartbeat on it. In other words, the region has been @@ -737,13 +760,10 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg, */ if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, sizeof(o2hb_live_node_bitmap))) - return; - - if (slot->ds_changed_samples < O2HB_LIVE_THRESHOLD) - return; + goto unlock; - printk(KERN_NOTICE "o2hb: Region %s is now a quorum device\n", - config_item_name(®->hr_item)); + printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n", + config_item_name(®->hr_item), reg->hr_dev_name); set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); @@ -754,6 +774,8 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg, if (o2hb_pop_count(&o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) o2hb_region_unpin(NULL); +unlock: + spin_unlock(&o2hb_live_lock); } static int o2hb_check_slot(struct o2hb_region *reg, @@ -925,8 +947,6 @@ fire_callbacks: slot->ds_equal_samples = 0; } out: - o2hb_set_quorum_device(reg, slot); - spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); @@ -957,7 +977,8 @@ static int o2hb_highest_node(unsigned long *nodes, static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) { - int i, ret, highest_node, change = 0; + int i, ret, highest_node; + int membership_change = 0, own_slot_ok = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct o2hb_bio_wait_ctxt write_wc; @@ -966,7 +987,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) sizeof(configured_nodes)); if (ret) { mlog_errno(ret); - return ret; + goto bail; } /* @@ -982,8 +1003,9 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); if (highest_node >= O2NM_MAX_NODES) { - mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); - return -EINVAL; + mlog(ML_NOTICE, "o2hb: No configured nodes found!\n"); + ret = -EINVAL; + goto bail; } /* No sense in reading the slots of nodes that don't exist @@ -993,29 +1015,27 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) ret = o2hb_read_slots(reg, highest_node + 1); if (ret < 0) { mlog_errno(ret); - return ret; + goto bail; } /* With an up to date view of the slots, we can check that no * other node has been improperly configured to heartbeat in * our slot. */ - o2hb_check_last_timestamp(reg); + own_slot_ok = o2hb_check_own_slot(reg); /* fill in the proper info for our next heartbeat */ o2hb_prepare_block(reg, reg->hr_generation); - /* And fire off the write. Note that we don't wait on this I/O - * until later. */ ret = o2hb_issue_node_write(reg, &write_wc); if (ret < 0) { mlog_errno(ret); - return ret; + goto bail; } i = -1; while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { - change |= o2hb_check_slot(reg, ®->hr_slots[i]); + membership_change |= o2hb_check_slot(reg, ®->hr_slots[i]); } /* @@ -1030,18 +1050,39 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * disk */ mlog(ML_ERROR, "Write error %d on device \"%s\"\n", write_wc.wc_error, reg->hr_dev_name); - return write_wc.wc_error; + ret = write_wc.wc_error; + goto bail; } - o2hb_arm_write_timeout(reg); + /* Skip disarming the timeout if own slot has stale/bad data */ + if (own_slot_ok) { + o2hb_set_quorum_device(reg); + o2hb_arm_write_timeout(reg); + } +bail: /* let the person who launched us know when things are steady */ - if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { - if (atomic_dec_and_test(®->hr_steady_iterations)) + if (atomic_read(®->hr_steady_iterations) != 0) { + if (!ret && own_slot_ok && !membership_change) { + if (atomic_dec_and_test(®->hr_steady_iterations)) + wake_up(&o2hb_steady_queue); + } + } + + if (atomic_read(®->hr_steady_iterations) != 0) { + if (atomic_dec_and_test(®->hr_unsteady_iterations)) { + printk(KERN_NOTICE "o2hb: Unable to stabilize " + "heartbeart on region %s (%s)\n", + config_item_name(®->hr_item), + reg->hr_dev_name); + atomic_set(®->hr_steady_iterations, 0); + reg->hr_aborted_start = 1; wake_up(&o2hb_steady_queue); + ret = -EIO; + } } - return 0; + return ret; } /* Subtract b from a, storing the result in a. a *must* have a larger @@ -1095,7 +1136,8 @@ static int o2hb_thread(void *data) /* Pin node */ o2nm_depend_this_node(); - while (!kthread_should_stop() && !reg->hr_unclean_stop) { + while (!kthread_should_stop() && + !reg->hr_unclean_stop && !reg->hr_aborted_start) { /* We track the time spent inside * o2hb_do_disk_heartbeat so that we avoid more than * hr_timeout_ms between disk writes. On busy systems @@ -1103,10 +1145,7 @@ static int o2hb_thread(void *data) * likely to time itself out. */ do_gettimeofday(&before_hb); - i = 0; - do { - ret = o2hb_do_disk_heartbeat(reg); - } while (ret && ++i < 2); + ret = o2hb_do_disk_heartbeat(reg); do_gettimeofday(&after_hb); elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); @@ -1117,7 +1156,8 @@ static int o2hb_thread(void *data) after_hb.tv_sec, (unsigned long) after_hb.tv_usec, elapsed_msec); - if (elapsed_msec < reg->hr_timeout_ms) { + if (!kthread_should_stop() && + elapsed_msec < reg->hr_timeout_ms) { /* the kthread api has blocked signals for us so no * need to record the return value. */ msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); @@ -1134,20 +1174,20 @@ static int o2hb_thread(void *data) * to timeout on this region when we could just as easily * write a clear generation - thus indicating to them that * this node has left this region. - * - * XXX: Should we skip this on unclean_stop? */ - o2hb_prepare_block(reg, 0); - ret = o2hb_issue_node_write(reg, &write_wc); - if (ret == 0) { - o2hb_wait_on_io(reg, &write_wc); - } else { - mlog_errno(ret); + */ + if (!reg->hr_unclean_stop && !reg->hr_aborted_start) { + o2hb_prepare_block(reg, 0); + ret = o2hb_issue_node_write(reg, &write_wc); + if (ret == 0) + o2hb_wait_on_io(reg, &write_wc); + else + mlog_errno(ret); } /* Unpin node */ o2nm_undepend_this_node(); - mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); + mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n"); return 0; } @@ -1426,6 +1466,8 @@ static void o2hb_region_release(struct config_item *item) struct page *page; struct o2hb_region *reg = to_o2hb_region(item); + mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name); + if (reg->hr_tmp_block) kfree(reg->hr_tmp_block); @@ -1792,7 +1834,10 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, live_threshold <<= 1; spin_unlock(&o2hb_live_lock); } - atomic_set(®->hr_steady_iterations, live_threshold + 1); + ++live_threshold; + atomic_set(®->hr_steady_iterations, live_threshold); + /* unsteady_iterations is double the steady_iterations */ + atomic_set(®->hr_unsteady_iterations, (live_threshold << 1)); hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", reg->hr_item.ci_name); @@ -1809,14 +1854,12 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, ret = wait_event_interruptible(o2hb_steady_queue, atomic_read(®->hr_steady_iterations) == 0); if (ret) { - /* We got interrupted (hello ptrace!). Clean up */ - spin_lock(&o2hb_live_lock); - hb_task = reg->hr_task; - reg->hr_task = NULL; - spin_unlock(&o2hb_live_lock); + atomic_set(®->hr_steady_iterations, 0); + reg->hr_aborted_start = 1; + } - if (hb_task) - kthread_stop(hb_task); + if (reg->hr_aborted_start) { + ret = -EIO; goto out; } @@ -1833,8 +1876,8 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, ret = -EIO; if (hb_task && o2hb_global_heartbeat_active()) - printk(KERN_NOTICE "o2hb: Heartbeat started on region %s\n", - config_item_name(®->hr_item)); + printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n", + config_item_name(®->hr_item), reg->hr_dev_name); out: if (filp) @@ -2092,13 +2135,6 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group, /* stop the thread when the user removes the region dir */ spin_lock(&o2hb_live_lock); - if (o2hb_global_heartbeat_active()) { - clear_bit(reg->hr_region_num, o2hb_region_bitmap); - clear_bit(reg->hr_region_num, o2hb_live_region_bitmap); - if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) - quorum_region = 1; - clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); - } hb_task = reg->hr_task; reg->hr_task = NULL; reg->hr_item_dropped = 1; @@ -2107,19 +2143,30 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group, if (hb_task) kthread_stop(hb_task); + if (o2hb_global_heartbeat_active()) { + spin_lock(&o2hb_live_lock); + clear_bit(reg->hr_region_num, o2hb_region_bitmap); + clear_bit(reg->hr_region_num, o2hb_live_region_bitmap); + if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) + quorum_region = 1; + clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); + spin_unlock(&o2hb_live_lock); + printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n", + ((atomic_read(®->hr_steady_iterations) == 0) ? + "stopped" : "start aborted"), config_item_name(item), + reg->hr_dev_name); + } + /* * If we're racing a dev_write(), we need to wake them. They will * check reg->hr_task */ if (atomic_read(®->hr_steady_iterations) != 0) { + reg->hr_aborted_start = 1; atomic_set(®->hr_steady_iterations, 0); wake_up(&o2hb_steady_queue); } - if (o2hb_global_heartbeat_active()) - printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n", - config_item_name(®->hr_item)); - config_item_put(item); if (!o2hb_global_heartbeat_active() || !quorum_region) -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 02/16] ocfs2/cluster: Clean up messages in o2net
o2net messages needed a facelift. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/cluster/tcp.c | 119 +++++++++++++++++++++-------------------------- 1 files changed, 53 insertions(+), 66 deletions(-) diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index db5ee4b..6e97895 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -545,7 +545,7 @@ static void o2net_set_nn_state(struct o2net_node *nn, } if (was_valid && !valid) { - printk(KERN_NOTICE "o2net: no longer connected to " + printk(KERN_NOTICE "o2net: No longer connected to " SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); o2net_complete_nodes_nsw(nn); } @@ -555,7 +555,7 @@ static void o2net_set_nn_state(struct o2net_node *nn, cancel_delayed_work(&nn->nn_connect_expired); printk(KERN_NOTICE "o2net: %s " SC_NODEF_FMT "\n", o2nm_this_node() > sc->sc_node->nd_num ? - "connected to" : "accepted connection from", + "Connected to" : "Accepted connection from", SC_NODEF_ARGS(sc)); } @@ -643,7 +643,7 @@ static void o2net_state_change(struct sock *sk) o2net_sc_queue_work(sc, &sc->sc_connect_work); break; default: - printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT + printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT " shutdown, state %d\n", SC_NODEF_ARGS(sc), sk->sk_state); o2net_sc_queue_work(sc, &sc->sc_shutdown_work); @@ -1284,11 +1284,11 @@ static int o2net_check_handshake(struct o2net_sock_container *sc) struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) { - mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol " - "version %llu but %llu is required, disconnecting\n", - SC_NODEF_ARGS(sc), - (unsigned long long)be64_to_cpu(hand->protocol_version), - O2NET_PROTOCOL_VERSION); + printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " Advertised net " + "protocol version %llu but %llu is required. " + "Disconnecting.\n", SC_NODEF_ARGS(sc), + (unsigned long long)be64_to_cpu(hand->protocol_version), + O2NET_PROTOCOL_VERSION); /* don't bother reconnecting if its the wrong version. */ o2net_ensure_shutdown(nn, sc, -ENOTCONN); @@ -1302,33 +1302,33 @@ static int o2net_check_handshake(struct o2net_sock_container *sc) */ if (be32_to_cpu(hand->o2net_idle_timeout_ms) ! o2net_idle_timeout()) { - mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of " - "%u ms, but we use %u ms locally. disconnecting\n", - SC_NODEF_ARGS(sc), - be32_to_cpu(hand->o2net_idle_timeout_ms), - o2net_idle_timeout()); + printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a network " + "idle timeout of %u ms, but we use %u ms locally. " + "Disconnecting.\n", SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_idle_timeout_ms), + o2net_idle_timeout()); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } if (be32_to_cpu(hand->o2net_keepalive_delay_ms) ! o2net_keepalive_delay()) { - mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of " - "%u ms, but we use %u ms locally. disconnecting\n", - SC_NODEF_ARGS(sc), - be32_to_cpu(hand->o2net_keepalive_delay_ms), - o2net_keepalive_delay()); + printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a keepalive " + "delay of %u ms, but we use %u ms locally. " + "Disconnecting.\n", SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2net_keepalive_delay_ms), + o2net_keepalive_delay()); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) ! O2HB_MAX_WRITE_TIMEOUT_MS) { - mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of " - "%u ms, but we use %u ms locally. disconnecting\n", - SC_NODEF_ARGS(sc), - be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), - O2HB_MAX_WRITE_TIMEOUT_MS); + printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a heartbeat " + "timeout of %u ms, but we use %u ms locally. " + "Disconnecting.\n", SC_NODEF_ARGS(sc), + be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), + O2HB_MAX_WRITE_TIMEOUT_MS); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } @@ -1539,28 +1539,16 @@ static void o2net_idle_timer(unsigned long data) { struct o2net_sock_container *sc = (struct o2net_sock_container *)data; struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); - #ifdef CONFIG_DEBUG_FS - ktime_t now = ktime_get(); + unsigned long msecs = ktime_to_ms(ktime_get()) - + ktime_to_ms(sc->sc_tv_timer); +#else + unsigned long msecs = o2net_idle_timeout(); #endif - printk(KERN_NOTICE "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u " - "seconds, shutting it down.\n", SC_NODEF_ARGS(sc), - o2net_idle_timeout() / 1000, - o2net_idle_timeout() % 1000); - -#ifdef CONFIG_DEBUG_FS - mlog(ML_NOTICE, "Here are some times that might help debug the " - "situation: (Timer: %lld, Now %lld, DataReady %lld, Advance %lld-%lld, " - "Key 0x%08x, Func %u, FuncTime %lld-%lld)\n", - (long long)ktime_to_us(sc->sc_tv_timer), (long long)ktime_to_us(now), - (long long)ktime_to_us(sc->sc_tv_data_ready), - (long long)ktime_to_us(sc->sc_tv_advance_start), - (long long)ktime_to_us(sc->sc_tv_advance_stop), - sc->sc_msg_key, sc->sc_msg_type, - (long long)ktime_to_us(sc->sc_tv_func_start), - (long long)ktime_to_us(sc->sc_tv_func_stop)); -#endif + printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been " + "idle for %lu.%lu secs, shutting it down.\n", SC_NODEF_ARGS(sc), + msecs / 1000, msecs % 1000); /* * Initialize the nn_timeout so that the next connection attempt @@ -1693,8 +1681,8 @@ static void o2net_start_connect(struct work_struct *work) out: if (ret) { - mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed " - "with errno %d\n", SC_NODEF_ARGS(sc), ret); + printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT + " failed with errno %d\n", SC_NODEF_ARGS(sc), ret); /* 0 err so that another will be queued and attempted * from set_nn_state */ if (sc) @@ -1717,8 +1705,8 @@ static void o2net_connect_expired(struct work_struct *work) spin_lock(&nn->nn_lock); if (!nn->nn_sc_valid) { - mlog(ML_ERROR, "no connection established with node %u after " - "%u.%u seconds, giving up and returning errors.\n", + printk(KERN_NOTICE "o2net: No connection established with " + "node %u after %u.%u seconds, giving up.\n", o2net_num_from_nn(nn), o2net_idle_timeout() / 1000, o2net_idle_timeout() % 1000); @@ -1861,21 +1849,21 @@ static int o2net_accept_one(struct socket *sock) node = o2nm_get_node_by_ip(sin.sin_addr.s_addr); if (node == NULL) { - mlog(ML_NOTICE, "attempt to connect from unknown node at %pI4:%d\n", - &sin.sin_addr.s_addr, ntohs(sin.sin_port)); + printk(KERN_NOTICE "o2net: Attempt to connect from unknown " + "node at %pI4:%d\n", &sin.sin_addr.s_addr, + ntohs(sin.sin_port)); ret = -EINVAL; goto out; } if (o2nm_this_node() >= node->nd_num) { local_node = o2nm_get_node_by_num(o2nm_this_node()); - mlog(ML_NOTICE, "unexpected connect attempt seen at node '%s' (" - "%u, %pI4:%d) from node '%s' (%u, %pI4:%d)\n", - local_node->nd_name, local_node->nd_num, - &(local_node->nd_ipv4_address), - ntohs(local_node->nd_ipv4_port), - node->nd_name, node->nd_num, &sin.sin_addr.s_addr, - ntohs(sin.sin_port)); + printk(KERN_NOTICE "o2net: Unexpected connect attempt seen " + "at node '%s' (%u, %pI4:%d) from node '%s' (%u, " + "%pI4:%d)\n", local_node->nd_name, local_node->nd_num, + &(local_node->nd_ipv4_address), + ntohs(local_node->nd_ipv4_port), node->nd_name, + node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port)); ret = -EINVAL; goto out; } @@ -1900,10 +1888,10 @@ static int o2net_accept_one(struct socket *sock) ret = 0; spin_unlock(&nn->nn_lock); if (ret) { - mlog(ML_NOTICE, "attempt to connect from node '%s' at " - "%pI4:%d but it already has an open connection\n", - node->nd_name, &sin.sin_addr.s_addr, - ntohs(sin.sin_port)); + printk(KERN_NOTICE "o2net: Attempt to connect from node '%s' " + "at %pI4:%d but it already has an open connection\n", + node->nd_name, &sin.sin_addr.s_addr, + ntohs(sin.sin_port)); goto out; } @@ -1983,7 +1971,7 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port) ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) { - mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret); + printk(KERN_ERR "o2net: Error %d while creating socket\n", ret); goto out; } @@ -2000,16 +1988,15 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port) sock->sk->sk_reuse = 1; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); if (ret < 0) { - mlog(ML_ERROR, "unable to bind socket at %pI4:%u, " - "ret=%d\n", &addr, ntohs(port), ret); + printk(KERN_ERR "o2net: Error %d while binding socket at " + "%pI4:%u\n", ret, &addr, ntohs(port)); goto out; } ret = sock->ops->listen(sock, 64); - if (ret < 0) { - mlog(ML_ERROR, "unable to listen on %pI4:%u, ret=%d\n", - &addr, ntohs(port), ret); - } + if (ret < 0) + printk(KERN_ERR "o2net: Error %d while listening on %pI4:%u\n", + ret, &addr, ntohs(port)); out: if (ret) { -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 03/16] ocfs2/dlm: Clean up messages in o2dlm
o2dlm messages needed a facelift. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmdomain.c | 18 ++++++++------ fs/ocfs2/dlm/dlmlock.c | 22 +++++++---------- fs/ocfs2/dlm/dlmmaster.c | 46 +++++++++++++++++-------------------- fs/ocfs2/dlm/dlmrecovery.c | 53 +++++++++++++++++++++++++------------------ fs/ocfs2/dlm/dlmthread.c | 2 - 5 files changed, 71 insertions(+), 70 deletions(-) diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 6ed6b95..ce225fb 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -539,17 +539,17 @@ again: static void __dlm_print_nodes(struct dlm_ctxt *dlm) { - int node = -1; + int node = -1, num = 0; assert_spin_locked(&dlm->spinlock); - printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name); - + printk("( "); while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1)) < O2NM_MAX_NODES) { printk("%d ", node); + ++num; } - printk("\n"); + printk(") %u nodes\n", num); } static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, @@ -566,11 +566,10 @@ static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, node = exit_msg->node_idx; - printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name); - spin_lock(&dlm->spinlock); clear_bit(node, dlm->domain_map); clear_bit(node, dlm->exit_domain_map); + printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name); __dlm_print_nodes(dlm); /* notify anything attached to the heartbeat events */ @@ -755,6 +754,7 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) dlm_mark_domain_leaving(dlm); dlm_leave_domain(dlm); + printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name); dlm_force_free_mles(dlm); dlm_complete_dlm_shutdown(dlm); } @@ -970,7 +970,7 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, clear_bit(assert->node_idx, dlm->exit_domain_map); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); - printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n", + printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ", assert->node_idx, dlm->name); __dlm_print_nodes(dlm); @@ -1701,8 +1701,10 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) bail: spin_lock(&dlm->spinlock); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); - if (!status) + if (!status) { + printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name); __dlm_print_nodes(dlm); + } spin_unlock(&dlm->spinlock); if (ctxt) { diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 8d39e0f..c7f3e22 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -319,27 +319,23 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), res->owner, &status); if (tmpret >= 0) { - // successfully sent and received - ret = status; // this is already a dlm_status + ret = status; if (ret == DLM_REJECTED) { - mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " - "no longer owned by %u. that node is coming back " - "up currently.\n", dlm->name, create.namelen, + mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer " + "owned by node %u. That node is coming back up " + "currently.\n", dlm->name, create.namelen, create.name, res->owner); dlm_print_one_lock_resource(res); BUG(); } } else { - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", tmpret, DLM_CREATE_LOCK_MSG, dlm->key, - res->owner); - if (dlm_is_host_down(tmpret)) { + mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to " + "node %u\n", dlm->name, create.namelen, create.name, + tmpret, res->owner); + if (dlm_is_host_down(tmpret)) ret = DLM_RECOVERING; - mlog(0, "node %u died so returning DLM_RECOVERING " - "from lock message!\n", res->owner); - } else { + else ret = dlm_err_to_dlm_status(tmpret); - } } return ret; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 11eefb8..9f3b093 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -829,8 +829,8 @@ lookup: * but they might own this lockres. wait on them. */ bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " - "recover before lock mastery can begin\n", + mlog(0, "%s: res %.*s, At least one node (%d) " + "to recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } @@ -864,8 +864,8 @@ redo_request: * dlm spinlock would be detectable be a change on the mle, * so we only need to clear out the recovery map once. */ if (dlm_is_recovery_lock(lockid, namelen)) { - mlog(ML_NOTICE, "%s: recovery map is not empty, but " - "must master $RECOVERY lock now\n", dlm->name); + mlog(0, "%s: Recovery map is not empty, but must " + "master $RECOVERY lock now\n", dlm->name); if (!dlm_pre_master_reco_lockres(dlm, res)) wait_on_recovery = 0; else { @@ -883,8 +883,8 @@ redo_request: spin_lock(&dlm->spinlock); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " - "recover before lock mastery can begin\n", + mlog(0, "%s: res %.*s, At least one node (%d) " + "to recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } else @@ -913,8 +913,8 @@ redo_request: * yet, keep going until it does. this is how the * master will know that asserts are needed back to * the lower nodes. */ - mlog(0, "%s:%.*s: requests only up to %u but master " - "is %u, keep going\n", dlm->name, namelen, + mlog(0, "%s: res %.*s, Requests only up to %u but " + "master is %u, keep going\n", dlm->name, namelen, lockid, nodenum, mle->master); } } @@ -924,13 +924,12 @@ wait: ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { wait_on_recovery = 1; - mlog(0, "%s:%.*s: node map changed, redo the " - "master request now, blocked=%d\n", - dlm->name, res->lockname.len, + mlog(0, "%s: res %.*s, Node map changed, redo the master " + "request now, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); if (++tries > 20) { - mlog(ML_ERROR, "%s:%.*s: spinning on " - "dlm_wait_for_lock_mastery, blocked=%d\n", + mlog(ML_ERROR, "%s: res %.*s, Spinning on " + "dlm_wait_for_lock_mastery, blocked = %d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); @@ -940,7 +939,8 @@ wait: goto redo_request; } - mlog(0, "lockres mastered by %u\n", res->owner); + mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len, + res->lockname.name, res->owner); /* make sure we never continue without this */ BUG_ON(res->owner == O2NM_MAX_NODES); @@ -2187,8 +2187,6 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); - mlog(0, "%s:%.*s: sending deref to %d\n", - dlm->name, namelen, lockname, res->owner); memset(&deref, 0, sizeof(deref)); deref.node_idx = dlm->node_num; deref.namelen = namelen; @@ -2197,14 +2195,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, &deref, sizeof(deref), res->owner, &r); if (ret < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key, - res->owner); + mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n", + dlm->name, namelen, lockname, ret, res->owner); else if (r < 0) { /* BAD. other node says I did not have a ref. */ - mlog(ML_ERROR,"while dropping ref on %s:%.*s " - "(master=%u) got %d.\n", dlm->name, namelen, - lockname, res->owner, r); + mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n", + dlm->name, namelen, lockname, res->owner, r); dlm_print_one_lock_resource(res); BUG(); } @@ -2916,9 +2912,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, &migrate, sizeof(migrate), nodenum, &status); if (ret < 0) { - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG, - dlm->key, nodenum); + mlog(ML_ERROR, "%s: res %.*s, Error %d send " + "MIGRATE_REQUEST to node %u\n", dlm->name, + migrate.namelen, migrate.name, ret, nodenum); if (!dlm_is_host_down(ret)) { mlog(ML_ERROR, "unhandled error=%d!\n", ret); BUG(); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 7efab6d..a3c312c 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -430,6 +430,8 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm) { spin_lock(&dlm->spinlock); BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); + printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n", + dlm->name, dlm->reco.dead_node); dlm->reco.state |= DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock); } @@ -440,9 +442,18 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm) BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock); + printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); wake_up(&dlm->reco.event); } +static void dlm_print_recovery_master(struct dlm_ctxt *dlm) +{ + printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the " + "dead node %u in domain %s\n", dlm->reco.new_master, + (dlm->node_num == dlm->reco.new_master ? "me" : "he"), + dlm->reco.dead_node, dlm->name); +} + static int dlm_do_recovery(struct dlm_ctxt *dlm) { int status = 0; @@ -505,9 +516,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) } mlog(0, "another node will master this recovery session.\n"); } - mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", - dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, - dlm->node_num, dlm->reco.dead_node); + + dlm_print_recovery_master(dlm); /* it is safe to start everything back up here * because all of the dead node's lock resources @@ -518,15 +528,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) return 0; master_here: - mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node " - "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task), - dlm->node_num, dlm->reco.dead_node, dlm->name); + dlm_print_recovery_master(dlm); status = dlm_remaster_locks(dlm, dlm->reco.dead_node); if (status < 0) { /* we should never hit this anymore */ - mlog(ML_ERROR, "error %d remastering locks for node %u, " - "retrying.\n", status, dlm->reco.dead_node); + mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, " + "retrying.\n", dlm->name, status, dlm->reco.dead_node); /* yield a bit to allow any final network messages * to get handled on remaining nodes */ msleep(100); @@ -567,7 +575,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); ndata->state = DLM_RECO_NODE_DATA_REQUESTING; - mlog(0, "requesting lock info from node %u\n", + mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, ndata->node_num); if (ndata->node_num == dlm->node_num) { @@ -640,7 +648,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) spin_unlock(&dlm_reco_state_lock); } - mlog(0, "done requesting all lock info\n"); + mlog(0, "%s: Done requesting all lock info\n", dlm->name); /* nodes should be sending reco data now * just need to wait */ @@ -802,10 +810,9 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, /* negative status is handled by caller */ if (ret < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_LOCK_REQUEST_MSG, - dlm->key, request_from); - + mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u " + "to recover dead node %u\n", dlm->name, ret, + request_from, dead_node); // return from here, then // sleep until all received or error return ret; @@ -956,9 +963,9 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); if (ret < 0) { - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_RECO_DATA_DONE_MSG, - dlm->key, send_to); + mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u " + "to recover dead node %u\n", dlm->name, ret, send_to, + dead_node); if (!dlm_is_host_down(ret)) { BUG(); } @@ -1127,9 +1134,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, if (ret < 0) { /* XXX: negative status is not handled. * this will end up killing this node. */ - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_MIG_LOCKRES_MSG, - dlm->key, send_to); + mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to " + "node %u (%s)\n", dlm->name, mres->lockname_len, + mres->lockname, ret, send_to, + (orig_flags & DLM_MRES_MIGRATION ? + "migration" : "recovery")); } else { /* might get an -ENOMEM back here */ ret = status; @@ -2324,9 +2333,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) dlm_revalidate_lvb(dlm, res, dead_node); if (res->owner == dead_node) { if (res->state & DLM_LOCK_RES_DROPPING_REF) { - mlog(ML_NOTICE, "Ignore %.*s for " + mlog(ML_NOTICE, "%s: res %.*s, Skip " "recovery as it is being freed\n", - res->lockname.len, + dlm->name, res->lockname.len, res->lockname.name); } else dlm_move_lockres_to_recovery_list(dlm, diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 1d6d1d2..46faac2 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -185,8 +185,6 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm, /* clear our bit from the master's refmap, ignore errors */ ret = dlm_drop_lockres_ref(dlm, res); if (ret < 0) { - mlog(ML_ERROR, "%s: deref %.*s failed %d\n", dlm->name, - res->lockname.len, res->lockname.name, ret); if (!dlm_is_host_down(ret)) BUG(); } -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 04/16] ocfs2: Clean up messages in stack_o2cb.c
o2cb messages needed a facelift. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/stack_o2cb.c | 4 ++-- 1 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c index 19965b0..be935d8 100644 --- a/fs/ocfs2/stack_o2cb.c +++ b/fs/ocfs2/stack_o2cb.c @@ -263,8 +263,8 @@ static void o2dlm_eviction_cb(int node_num, void *data) { struct ocfs2_cluster_connection *conn = data; - mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n", - node_num, conn->cc_namelen, conn->cc_name); + printk(KERN_NOTICE "o2cb: o2dlm has evicted node %d from domain %.*s\n", + node_num, conn->cc_namelen, conn->cc_name); conn->cc_recovery_handler(node_num, conn->cc_recovery_data); } -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 05/16] ocfs2/dlm: Cleanup up dlm_finish_local_lockres_recovery()
dlm_finish_local_lockres_recovery() needed a facelift. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmrecovery.c | 59 +++++++++++++++++++------------------------ 1 files changed, 26 insertions(+), 33 deletions(-) diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index a3c312c..41e74f0 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -2093,6 +2093,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { if (res->owner == dead_node) { + mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->owner, new_master); list_del_init(&res->recovering); spin_lock(&res->spinlock); /* new_master has our reference from @@ -2114,40 +2117,30 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, hash_iter, bucket, hash_node) { - if (res->state & DLM_LOCK_RES_RECOVERING) { - if (res->owner == dead_node) { - mlog(0, "(this=%u) res %.*s owner=%u " - "was not on recovering list, but " - "clearing state anyway\n", - dlm->node_num, res->lockname.len, - res->lockname.name, new_master); - } else if (res->owner == dlm->node_num) { - mlog(0, "(this=%u) res %.*s owner=%u " - "was not on recovering list, " - "owner is THIS node, clearing\n", - dlm->node_num, res->lockname.len, - res->lockname.name, new_master); - } else - continue; + if (!(res->state & DLM_LOCK_RES_RECOVERING)) + continue; - if (!list_empty(&res->recovering)) { - mlog(0, "%s:%.*s: lockres was " - "marked RECOVERING, owner=%u\n", - dlm->name, res->lockname.len, - res->lockname.name, res->owner); - list_del_init(&res->recovering); - dlm_lockres_put(res); - } - spin_lock(&res->spinlock); - /* new_master has our reference from - * the lock state sent during recovery */ - dlm_change_lockres_owner(dlm, res, new_master); - res->state &= ~DLM_LOCK_RES_RECOVERING; - if (__dlm_lockres_has_locks(res)) - __dlm_dirty_lockres(dlm, res); - spin_unlock(&res->spinlock); - wake_up(&res->wq); + if (res->owner != dead_node && + res->owner != dlm->node_num) + continue; + + if (!list_empty(&res->recovering)) { + list_del_init(&res->recovering); + dlm_lockres_put(res); } + + /* new_master has our reference from + * the lock state sent during recovery */ + mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->owner, new_master); + spin_lock(&res->spinlock); + dlm_change_lockres_owner(dlm, res, new_master); + res->state &= ~DLM_LOCK_RES_RECOVERING; + if (__dlm_lockres_has_locks(res)) + __dlm_dirty_lockres(dlm, res); + spin_unlock(&res->spinlock); + wake_up(&res->wq); } } } -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 06/16] ocfs2/dlm: Clean up refmap helpers
Patch cleans up helpers that set/clear refmap bits and grab/drop inflight lock ref counts. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmcommon.h | 49 ++++-------------------- fs/ocfs2/dlm/dlmmaster.c | 88 ++++++++++++++++++++++++++----------------- fs/ocfs2/dlm/dlmrecovery.c | 8 ++-- 3 files changed, 66 insertions(+), 79 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index d602abb..4a5d580 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -902,46 +902,15 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int namelen); -#define dlm_lockres_set_refmap_bit(bit,res) \ - __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__) -#define dlm_lockres_clear_refmap_bit(bit,res) \ - __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__) - -static inline void __dlm_lockres_set_refmap_bit(int bit, - struct dlm_lock_resource *res, - const char *file, - int line) -{ - //printk("%s:%d:%.*s: setting bit %d\n", file, line, - // res->lockname.len, res->lockname.name, bit); - set_bit(bit, res->refmap); -} - -static inline void __dlm_lockres_clear_refmap_bit(int bit, - struct dlm_lock_resource *res, - const char *file, - int line) -{ - //printk("%s:%d:%.*s: clearing bit %d\n", file, line, - // res->lockname.len, res->lockname.name, bit); - clear_bit(bit, res->refmap); -} - -void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - const char *file, - int line); -void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - int new_lockres, - const char *file, - int line); -#define dlm_lockres_drop_inflight_ref(d,r) \ - __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__) -#define dlm_lockres_grab_inflight_ref(d,r) \ - __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__) -#define dlm_lockres_grab_inflight_ref_new(d,r) \ - __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__) +void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit); +void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit); + +void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); +void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 9f3b093..11e446f 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -631,39 +631,58 @@ error: return NULL; } -void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - int new_lockres, - const char *file, - int line) +void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit) { - if (!new_lockres) - assert_spin_locked(&res->spinlock); + assert_spin_locked(&res->spinlock); + + mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len, + res->lockname.name, bit, __builtin_return_address(0)); + + set_bit(bit, res->refmap); +} + +void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit) +{ + assert_spin_locked(&res->spinlock); + + mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len, + res->lockname.name, bit, __builtin_return_address(0)); + + clear_bit(bit, res->refmap); +} + + +void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); if (!test_bit(dlm->node_num, res->refmap)) { BUG_ON(res->inflight_locks != 0); - dlm_lockres_set_refmap_bit(dlm->node_num, res); + dlm_lockres_set_refmap_bit(dlm, res, dlm->node_num); } res->inflight_locks++; - mlog(0, "%s:%.*s: inflight++: now %u\n", - dlm->name, res->lockname.len, res->lockname.name, - res->inflight_locks); + mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name, + res->lockname.len, res->lockname.name, res->inflight_locks, + __builtin_return_address(0)); } -void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - const char *file, - int line) +void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { assert_spin_locked(&res->spinlock); BUG_ON(res->inflight_locks == 0); + res->inflight_locks--; - mlog(0, "%s:%.*s: inflight--: now %u\n", - dlm->name, res->lockname.len, res->lockname.name, - res->inflight_locks); + mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name, + res->lockname.len, res->lockname.name, res->inflight_locks, + __builtin_return_address(0)); + if (res->inflight_locks == 0) - dlm_lockres_clear_refmap_bit(dlm->node_num, res); + dlm_lockres_clear_refmap_bit(dlm, res, dlm->node_num); wake_up(&res->wq); } @@ -843,8 +862,10 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); - /* since this lockres is new it doesn't not require the spinlock */ - dlm_lockres_grab_inflight_ref_new(dlm, res); + + spin_lock(&res->spinlock); + dlm_lockres_grab_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); /* if this node does not become the master make sure to drop * this inflight reference below */ @@ -1426,9 +1447,7 @@ way_up_top: } if (res->owner == dlm->node_num) { - mlog(0, "%s:%.*s: setting bit %u in refmap\n", - dlm->name, namelen, name, request->node_idx); - dlm_lockres_set_refmap_bit(request->node_idx, res); + dlm_lockres_set_refmap_bit(dlm, res, request->node_idx); spin_unlock(&res->spinlock); response = DLM_MASTER_RESP_YES; if (mle) @@ -1493,10 +1512,8 @@ way_up_top: * go back and clean the mles on any * other nodes */ dispatch_assert = 1; - dlm_lockres_set_refmap_bit(request->node_idx, res); - mlog(0, "%s:%.*s: setting bit %u in refmap\n", - dlm->name, namelen, name, - request->node_idx); + dlm_lockres_set_refmap_bit(dlm, res, + request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { @@ -1702,7 +1719,7 @@ again: "lockres, set the bit in the refmap\n", namelen, lockname, to); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(to, res); + dlm_lockres_set_refmap_bit(dlm, res, to); spin_unlock(&res->spinlock); } } @@ -2256,7 +2273,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, else { BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); if (test_bit(node, res->refmap)) { - dlm_lockres_clear_refmap_bit(node, res); + dlm_lockres_clear_refmap_bit(dlm, res, node); cleared = 1; } } @@ -2316,7 +2333,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); if (test_bit(node, res->refmap)) { __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); - dlm_lockres_clear_refmap_bit(node, res); + dlm_lockres_clear_refmap_bit(dlm, res, node); cleared = 1; } spin_unlock(&res->spinlock); @@ -2798,7 +2815,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); - dlm_lockres_clear_refmap_bit(lock->ml.node, res); + dlm_lockres_clear_refmap_bit(dlm, res, + lock->ml.node); list_del_init(&lock->list); dlm_lock_put(lock); /* In a normal unlock, we would have added a @@ -2819,7 +2837,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, mlog(0, "%s:%.*s: node %u had a ref to this " "migrating lockres, clearing\n", dlm->name, res->lockname.len, res->lockname.name, bit); - dlm_lockres_clear_refmap_bit(bit, res); + dlm_lockres_clear_refmap_bit(dlm, res, bit); } bit++; } @@ -2933,7 +2951,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, dlm->name, res->lockname.len, res->lockname.name, nodenum); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(nodenum, res); + dlm_lockres_set_refmap_bit(dlm, res, nodenum); spin_unlock(&res->spinlock); } } @@ -3267,7 +3285,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, * mastery reference here since old_master will briefly have * a reference after the migration completes */ spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(old_master, res); + dlm_lockres_set_refmap_bit(dlm, res, old_master); spin_unlock(&res->spinlock); mlog(0, "now time to do a migrate request to other nodes\n"); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 41e74f0..f1a3aa8 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1776,7 +1776,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, dlm->name, mres->lockname_len, mres->lockname, from); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(from, res); + dlm_lockres_set_refmap_bit(dlm, res, from); spin_unlock(&res->spinlock); added++; break; @@ -1974,7 +1974,7 @@ skip_lvb: mlog(0, "%s:%.*s: added lock for node %u, " "setting refmap bit\n", dlm->name, res->lockname.len, res->lockname.name, ml->node); - dlm_lockres_set_refmap_bit(ml->node, res); + dlm_lockres_set_refmap_bit(dlm, res, ml->node); added++; } spin_unlock(&res->spinlock); @@ -2254,12 +2254,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name, freed, dead_node); __dlm_print_one_lock_resource(res); } - dlm_lockres_clear_refmap_bit(dead_node, res); + dlm_lockres_clear_refmap_bit(dlm, res, dead_node); } else if (test_bit(dead_node, res->refmap)) { mlog(0, "%s:%.*s: dead node %u had a ref, but had " "no locks and had not purged before dying\n", dlm->name, res->lockname.len, res->lockname.name, dead_node); - dlm_lockres_clear_refmap_bit(dead_node, res); + dlm_lockres_clear_refmap_bit(dlm, res, dead_node); } /* do not kick thread yet */ -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 07/16] ocfs2/dlm: Trace insert/remove of resource to/from hash
Add mlog to trace adding and removing the resource from/to the hash table. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmcommon.h | 5 ++--- fs/ocfs2/dlm/dlmdomain.c | 19 ++++++++++++------- fs/ocfs2/dlm/dlmthread.c | 2 +- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 4a5d580..b1748ac 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -877,9 +877,8 @@ static inline void dlm_lockres_get(struct dlm_lock_resource *res) kref_get(&res->refs); } void dlm_lockres_put(struct dlm_lock_resource *res); -void __dlm_unhash_lockres(struct dlm_lock_resource *res); -void __dlm_insert_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res); +void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); +void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, const char *name, unsigned int len, diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index ce225fb..200d988 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -157,16 +157,18 @@ static int dlm_protocol_compare(struct dlm_protocol_version *existing, static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); -void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) +void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - if (!hlist_unhashed(&lockres->hash_node)) { - hlist_del_init(&lockres->hash_node); - dlm_lockres_put(lockres); - } + if (hlist_unhashed(&res->hash_node)) + return; + + mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + hlist_del_init(&res->hash_node); + dlm_lockres_put(res); } -void __dlm_insert_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res) +void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { struct hlist_head *bucket; struct qstr *q; @@ -180,6 +182,9 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, dlm_lockres_get(res); hlist_add_head(&res->hash_node, bucket); + + mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); } struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 46faac2..4eff65e 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -207,7 +207,7 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm, BUG(); } - __dlm_unhash_lockres(res); + __dlm_unhash_lockres(dlm, res); /* lockres is not in the hash now. drop the flag and wake up * any processes waiting in dlm_get_lock_resource. */ -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 08/16] ocfs2/dlm: Cleanup dlm_wait_for_node_death() and dlm_wait_for_node_recovery()
dlm_wait_for_node_death() and dlm_wait_for_node_recovery() needed a facelift. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmcommon.h | 4 +- fs/ocfs2/dlm/dlmrecovery.c | 46 +++++++++++++++++++++---------------------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index b1748ac..a5952ce 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -859,8 +859,8 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); -int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); -int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); +void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); +void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_put(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index f1a3aa8..01ebfd0 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -362,40 +362,38 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) } -int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) +void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) { - if (timeout) { - mlog(ML_NOTICE, "%s: waiting %dms for notification of " - "death of node %u\n", dlm->name, timeout, node); + if (dlm_is_node_dead(dlm, node)) + return; + + printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in " + "domain %s\n", node, dlm->name); + + if (timeout) wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_dead(dlm, node), - msecs_to_jiffies(timeout)); - } else { - mlog(ML_NOTICE, "%s: waiting indefinitely for notification " - "of death of node %u\n", dlm->name, node); + dlm_is_node_dead(dlm, node), + msecs_to_jiffies(timeout)); + else wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, node)); - } - /* for now, return 0 */ - return 0; } -int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) +void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) { - if (timeout) { - mlog(0, "%s: waiting %dms for notification of " - "recovery of node %u\n", dlm->name, timeout, node); + if (dlm_is_node_recovered(dlm, node)) + return; + + printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in " + "domain %s\n", node, dlm->name); + + if (timeout) wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_recovered(dlm, node), - msecs_to_jiffies(timeout)); - } else { - mlog(0, "%s: waiting indefinitely for notification " - "of recovery of node %u\n", dlm->name, node); + dlm_is_node_recovered(dlm, node), + msecs_to_jiffies(timeout)); + else wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_recovered(dlm, node)); - } - /* for now, return 0 */ - return 0; } /* callers of the top-level api calls (dlmlock/dlmunlock) should -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 09/16] ocfs2/dlm: Take inflight reference count for remotely mastered resources too
The inflight reference count, in the lock resource, is taken to pin the resource in memory. We take it when a new resource is created and release it after a lock is attached to it. We do this to prevent the resource from getting purged prematurely. Earlier this reference count was being taken for locally mastered resources only. This patch extends the same functionality for remotely mastered ones. We are doing this because the same premature purging could occur for remotely mastered resources if the remote node were to die before completion of the create lock. Fix for Oracle bug#12405575. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmlock.c | 12 +++++++--- fs/ocfs2/dlm/dlmmaster.c | 47 ++++++++++++++++----------------------------- fs/ocfs2/dlm/dlmthread.c | 12 ++++++---- 3 files changed, 32 insertions(+), 39 deletions(-) diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index c7f3e22..3ef2c1a 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -183,10 +183,6 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, kick_thread = 1; } } - /* reduce the inflight count, this may result in the lockres - * being purged below during calc_usage */ - if (lock->ml.node == dlm->node_num) - dlm_lockres_drop_inflight_ref(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -737,6 +733,14 @@ retry_lock: } } + /* Inflight taken in dlm_get_lock_resource() is dropped here */ + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); + + dlm_lockres_calc_usage(dlm, res); + dlm_kick_thread(dlm, res); + if (status != DLM_NORMAL) { lock->lksb->flags &= ~DLM_LKSB_GET_LVB; if (status != DLM_NOTQUEUED) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 11e446f..005261c 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -659,11 +659,8 @@ void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, { assert_spin_locked(&res->spinlock); - if (!test_bit(dlm->node_num, res->refmap)) { - BUG_ON(res->inflight_locks != 0); - dlm_lockres_set_refmap_bit(dlm, res, dlm->node_num); - } res->inflight_locks++; + mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name, res->lockname.len, res->lockname.name, res->inflight_locks, __builtin_return_address(0)); @@ -677,12 +674,11 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, BUG_ON(res->inflight_locks == 0); res->inflight_locks--; + mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name, res->lockname.len, res->lockname.name, res->inflight_locks, __builtin_return_address(0)); - if (res->inflight_locks == 0) - dlm_lockres_clear_refmap_bit(dlm, res, dlm->node_num); wake_up(&res->wq); } @@ -716,7 +712,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; - int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); @@ -728,36 +723,33 @@ lookup: spin_lock(&dlm->spinlock); tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { - int dropping_ref = 0; - spin_unlock(&dlm->spinlock); - spin_lock(&tmpres->spinlock); - /* We wait for the other thread that is mastering the resource */ + /* Wait on the thread that is mastering the resource */ if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { __dlm_wait_on_lockres(tmpres); BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN); + spin_unlock(&tmpres->spinlock); + dlm_lockres_put(tmpres); + tmpres = NULL; + goto lookup; } - if (tmpres->owner == dlm->node_num) { - BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); - dlm_lockres_grab_inflight_ref(dlm, tmpres); - } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) - dropping_ref = 1; - spin_unlock(&tmpres->spinlock); - - /* wait until done messaging the master, drop our ref to allow - * the lockres to be purged, start over. */ - if (dropping_ref) { - spin_lock(&tmpres->spinlock); - __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); + /* Wait on the resource purge to complete before continuing */ + if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) { + BUG_ON(tmpres->owner == dlm->node_num); + __dlm_wait_on_lockres_flags(tmpres, + DLM_LOCK_RES_DROPPING_REF); spin_unlock(&tmpres->spinlock); dlm_lockres_put(tmpres); tmpres = NULL; goto lookup; } - mlog(0, "found in hash!\n"); + /* Grab inflight ref to pin the resource */ + dlm_lockres_grab_inflight_ref(dlm, tmpres); + + spin_unlock(&tmpres->spinlock); if (res) dlm_lockres_put(res); res = tmpres; @@ -863,14 +855,11 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); + /* Grab inflight ref to pin the resource */ spin_lock(&res->spinlock); dlm_lockres_grab_inflight_ref(dlm, res); spin_unlock(&res->spinlock); - /* if this node does not become the master make sure to drop - * this inflight reference below */ - drop_inflight_if_nonlocal = 1; - /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we @@ -973,8 +962,6 @@ wait: wake_waiters: spin_lock(&res->spinlock); - if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) - dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq); diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 4eff65e..e73c833 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -94,24 +94,26 @@ int __dlm_lockres_unused(struct dlm_lock_resource *res) { int bit; + assert_spin_locked(&res->spinlock); + if (__dlm_lockres_has_locks(res)) return 0; + /* Locks are in the process of being created */ + if (res->inflight_locks) + return 0; + if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) return 0; if (res->state & DLM_LOCK_RES_RECOVERING) return 0; + /* Another node has this resource with this node as the master */ bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) return 0; - /* - * since the bit for dlm->node_num is not set, inflight_locks better - * be zero - */ - BUG_ON(res->inflight_locks != 0); return 1; } -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 10/16] ocfs2/dlm: dlmlock_remote() needs to account for remastery
In dlmlock_remote(), we wait for the resource to stop being active before setting the inprogress flag. Active includes recovery, migration, etc. The problem here is that if the resource was being recovered or migrated, the new owner could very well be that node itself (and thus not a remote node). This problem was observed in Oracle bug#12583620. The error messages observed were as follows: dlm_send_remote_lock_request:337 ERROR: Error -40 (ELOOP) when sending message 503 (key 0xd6d8c7) to node 2 dlmlock_remote:271 ERROR: dlm status = DLM_BADARGS dlmlock:751 ERROR: dlm status = DLM_BADARGS Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmlock.c | 18 ++++++++---------- 1 files changed, 8 insertions(+), 10 deletions(-) diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 3ef2c1a..f32fcba 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -227,10 +227,16 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, lock->ml.type, res->lockname.len, res->lockname.name, flags); + /* + * Wait if resource is getting recovered, remastered, etc. + * If the resource was remastered and new owner is self, then exit. + */ spin_lock(&res->spinlock); - - /* will exit this call with spinlock held */ __dlm_wait_on_lockres(res); + if (res->owner == dlm->node_num) { + spin_unlock(&res->spinlock); + return DLM_RECOVERING; + } res->state |= DLM_LOCK_RES_IN_PROGRESS; /* add lock to local (secondary) queue */ @@ -710,18 +716,10 @@ retry_lock: if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { - mlog(0, "retrying lock with migration/" - "recovery/in progress\n"); msleep(100); - /* no waiting for dlm_reco_thread */ if (recovery) { if (status != DLM_RECOVERING) goto retry_lock; - - mlog(0, "%s: got RECOVERING " - "for $RECOVERY lock, master " - "was %u\n", dlm->name, - res->owner); /* wait to see the node go down, then * drop down and allow the lockres to * get cleaned up. need to remaster. */ -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 11/16] ocfs2/cluster: Fix output in file elapsed_time_in_ms
The o2hb debugfs file, elapsed_time_in_ms, should return values only after the timer is armed atleast once. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/cluster/heartbeat.c | 9 ++++++--- 1 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 4728a74..a4e855e 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -1198,6 +1198,7 @@ static int o2hb_debug_open(struct inode *inode, struct file *file) struct o2hb_debug_buf *db = inode->i_private; struct o2hb_region *reg; unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; + unsigned long lts; char *buf = NULL; int i = -1; int out = 0; @@ -1234,9 +1235,11 @@ static int o2hb_debug_open(struct inode *inode, struct file *file) case O2HB_DB_TYPE_REGION_ELAPSED_TIME: reg = (struct o2hb_region *)db->db_data; - out += snprintf(buf + out, PAGE_SIZE - out, "%u\n", - jiffies_to_msecs(jiffies - - reg->hr_last_timeout_start)); + lts = reg->hr_last_timeout_start; + /* If 0, it has never been set before */ + if (lts) + lts = jiffies_to_msecs(jiffies - lts); + out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts); goto done; case O2HB_DB_TYPE_REGION_PINNED: -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 12/16] ocfs2/cluster: Add new function o2net_fill_node_map()
Patch adds function o2net_fill_node_map() to return the bitmap of nodes that it is connected to. This bitmap is also accessible by the user via the debugfs file, /sys/kernel/debug/o2net/connected_nodes. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/cluster/netdebug.c | 102 +++++++++++++++++++++++++++++-------------- fs/ocfs2/cluster/tcp.c | 19 ++++++++ fs/ocfs2/cluster/tcp.h | 2 + 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/fs/ocfs2/cluster/netdebug.c b/fs/ocfs2/cluster/netdebug.c index 3a58359..dc45deb 100644 --- a/fs/ocfs2/cluster/netdebug.c +++ b/fs/ocfs2/cluster/netdebug.c @@ -47,6 +47,7 @@ #define SC_DEBUG_NAME "sock_containers" #define NST_DEBUG_NAME "send_tracking" #define STATS_DEBUG_NAME "stats" +#define NODES_DEBUG_NAME "connected_nodes" #define SHOW_SOCK_CONTAINERS 0 #define SHOW_SOCK_STATS 1 @@ -55,6 +56,7 @@ static struct dentry *o2net_dentry; static struct dentry *sc_dentry; static struct dentry *nst_dentry; static struct dentry *stats_dentry; +static struct dentry *nodes_dentry; static DEFINE_SPINLOCK(o2net_debug_lock); @@ -491,53 +493,87 @@ static const struct file_operations sc_seq_fops = { .release = sc_fop_release, }; -int o2net_debugfs_init(void) +static int o2net_fill_bitmap(char *buf, int len) { - o2net_dentry = debugfs_create_dir(O2NET_DEBUG_DIR, NULL); - if (!o2net_dentry) { - mlog_errno(-ENOMEM); - goto bail; - } + unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; + int i = -1, out = 0; - nst_dentry = debugfs_create_file(NST_DEBUG_NAME, S_IFREG|S_IRUSR, - o2net_dentry, NULL, - &nst_seq_fops); - if (!nst_dentry) { - mlog_errno(-ENOMEM); - goto bail; - } + o2net_fill_node_map(map, sizeof(map)); - sc_dentry = debugfs_create_file(SC_DEBUG_NAME, S_IFREG|S_IRUSR, - o2net_dentry, NULL, - &sc_seq_fops); - if (!sc_dentry) { - mlog_errno(-ENOMEM); - goto bail; - } + while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) + out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i); + out += snprintf(buf + out, PAGE_SIZE - out, "\n"); - stats_dentry = debugfs_create_file(STATS_DEBUG_NAME, S_IFREG|S_IRUSR, - o2net_dentry, NULL, - &stats_seq_fops); - if (!stats_dentry) { - mlog_errno(-ENOMEM); - goto bail; - } + return out; +} + +static int nodes_fop_open(struct inode *inode, struct file *file) +{ + char *buf; + + buf = kmalloc(PAGE_SIZE, GFP_KERNEL); + if (!buf) + return -ENOMEM; + + i_size_write(inode, o2net_fill_bitmap(buf, PAGE_SIZE)); + + file->private_data = buf; return 0; -bail: - debugfs_remove(stats_dentry); - debugfs_remove(sc_dentry); - debugfs_remove(nst_dentry); - debugfs_remove(o2net_dentry); - return -ENOMEM; } +static int o2net_debug_release(struct inode *inode, struct file *file) +{ + kfree(file->private_data); + return 0; +} + +static ssize_t o2net_debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) +{ + return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, + i_size_read(file->f_mapping->host)); +} + +static const struct file_operations nodes_fops = { + .open = nodes_fop_open, + .release = o2net_debug_release, + .read = o2net_debug_read, + .llseek = generic_file_llseek, +}; + void o2net_debugfs_exit(void) { + debugfs_remove(nodes_dentry); debugfs_remove(stats_dentry); debugfs_remove(sc_dentry); debugfs_remove(nst_dentry); debugfs_remove(o2net_dentry); } +int o2net_debugfs_init(void) +{ + mode_t mode = S_IFREG|S_IRUSR; + + o2net_dentry = debugfs_create_dir(O2NET_DEBUG_DIR, NULL); + if (o2net_dentry) + nst_dentry = debugfs_create_file(NST_DEBUG_NAME, mode, + o2net_dentry, NULL, &nst_seq_fops); + if (nst_dentry) + sc_dentry = debugfs_create_file(SC_DEBUG_NAME, mode, + o2net_dentry, NULL, &sc_seq_fops); + if (sc_dentry) + stats_dentry = debugfs_create_file(STATS_DEBUG_NAME, mode, + o2net_dentry, NULL, &stats_seq_fops); + if (stats_dentry) + nodes_dentry = debugfs_create_file(NODES_DEBUG_NAME, mode, + o2net_dentry, NULL, &nodes_fops); + if (nodes_dentry) + return 0; + + o2net_debugfs_exit(); + mlog_errno(-ENOMEM); + return -ENOMEM; +} + #endif /* CONFIG_DEBUG_FS */ diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index 6e97895..ae13d5c 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -1034,6 +1034,25 @@ static int o2net_tx_can_proceed(struct o2net_node *nn, return ret; } +/* Get a map of all nodes to which this node is currently connected to */ +void o2net_fill_node_map(unsigned long *map, unsigned bytes) +{ + struct o2net_sock_container *sc; + int node, ret; + + BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); + + memset(map, 0, bytes); + for (node = 0; node < O2NM_MAX_NODES; ++node) { + o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret); + if (!ret) { + set_bit(node, map); + sc_put(sc); + } + } +} +EXPORT_SYMBOL_GPL(o2net_fill_node_map); + int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, size_t caller_veclen, u8 target_node, int *status) { diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h index fd6179e..5bada2a 100644 --- a/fs/ocfs2/cluster/tcp.h +++ b/fs/ocfs2/cluster/tcp.h @@ -106,6 +106,8 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, struct list_head *unreg_list); void o2net_unregister_handler_list(struct list_head *list); +void o2net_fill_node_map(unsigned long *map, unsigned bytes); + struct o2nm_node; int o2net_register_hb_callbacks(void); void o2net_unregister_hb_callbacks(void); -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 13/16] ocfs2/cluster: Cluster up now includes network connections too
The cluster up check only checks to see if the node is heartbeating or not. If yes it continues assuming that the node is connected to all the nodes. But if that is not the case, the cluster join aborts with a stack of errors that are not easy to comprehend. This patch adds the network connect check upfront and prints the nodes that the node is not yet connected to, before aborting. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/dlm/dlmdomain.c | 7 ----- fs/ocfs2/stack_o2cb.c | 67 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 200d988..92f2ead 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -2138,13 +2138,6 @@ struct dlm_ctxt * dlm_register_domain(const char *domain, goto leave; } - if (!o2hb_check_local_node_heartbeating()) { - mlog(ML_ERROR, "the local node has not been configured, or is " - "not heartbeating\n"); - ret = -EPROTO; - goto leave; - } - mlog(0, "register called for domain \"%s\"\n", domain); retry: diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c index be935d8..9436801 100644 --- a/fs/ocfs2/stack_o2cb.c +++ b/fs/ocfs2/stack_o2cb.c @@ -28,6 +28,7 @@ #include "cluster/masklog.h" #include "cluster/nodemanager.h" #include "cluster/heartbeat.h" +#include "cluster/tcp.h" #include "stackglue.h" @@ -256,6 +257,61 @@ static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb) } /* + * Check if this node is heartbeating and is connected to all other + * heartbeating nodes. + */ +static int o2cb_cluster_check(void) +{ + u8 node_num; + int i; + unsigned long hbmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + unsigned long netmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + + node_num = o2nm_this_node(); + if (node_num == O2NM_MAX_NODES) { + printk(KERN_ERR "o2cb: This node has not been configured.\n"); + return -EINVAL; + } + + /* + * o2dlm expects o2net sockets to be created. If not, then + * dlm_join_domain() fails with a stack of errors which are both cryptic + * and incomplete. The idea here is to detect upfront whether we have + * managed to connect to all nodes or not. If not, then list the nodes + * to allow the user to check the configuration (incorrect IP, firewall, + * etc.) Yes, this is racy. But its not the end of the world. + */ +#define O2CB_MAP_STABILIZE_COUNT 60 + for (i = 0; i < O2CB_MAP_STABILIZE_COUNT; ++i) { + o2hb_fill_node_map(hbmap, sizeof(hbmap)); + if (!test_bit(node_num, hbmap)) { + printk(KERN_ERR "o2cb: %s heartbeat has not been " + "started.\n", (o2hb_global_heartbeat_active() ? + "Global" : "Local")); + return -EINVAL; + } + o2net_fill_node_map(netmap, sizeof(netmap)); + /* Force set the current node to allow easy compare */ + set_bit(node_num, netmap); + if (!memcmp(hbmap, netmap, sizeof(hbmap))) + return 0; + if (i < O2CB_MAP_STABILIZE_COUNT) + msleep(1000); + } + + printk(KERN_ERR "o2cb: This node could not connect to nodes:"); + i = -1; + while ((i = find_next_bit(hbmap, O2NM_MAX_NODES, + i + 1)) < O2NM_MAX_NODES) { + if (!test_bit(i, netmap)) + printk(" %u", i); + } + printk(".\n"); + + return -ENOTCONN; +} + +/* * Called from the dlm when it's about to evict a node. This is how the * classic stack signals node death. */ @@ -280,12 +336,11 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn) BUG_ON(conn == NULL); BUG_ON(conn->cc_proto == NULL); - /* for now we only have one cluster/node, make sure we see it - * in the heartbeat universe */ - if (!o2hb_check_local_node_heartbeating()) { - if (o2hb_global_heartbeat_active()) - mlog(ML_ERROR, "Global heartbeat not started\n"); - rc = -EINVAL; + /* Ensure cluster stack is up and all nodes are connected */ + rc = o2cb_cluster_check(); + if (rc) { + printk(KERN_ERR "o2cb: Cluster check failed. Fix errors " + "before retrying.\n"); goto out; } -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 14/16] ocfs2: Clean up messages in the fs
Convert useful messages from ML_NOTICE to KERN_NOTICE to improve readability. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/journal.c | 9 ++++++--- fs/ocfs2/quota_local.c | 13 +++++++++---- fs/ocfs2/slot_map.c | 4 ++-- fs/ocfs2/super.c | 10 +++++----- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index 295d564..8eaaa78 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -1544,9 +1544,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, /* we need to run complete recovery for offline orphan slots */ ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); - mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", - node_num, slot_num, - MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); + printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\ + "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), + MINOR(osb->sb->s_dev)); OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters); @@ -1601,6 +1601,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, jbd2_journal_destroy(journal); + printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\ + "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), + MINOR(osb->sb->s_dev)); done: /* drop the lock on this nodes journal */ if (got_lock) diff --git a/fs/ocfs2/quota_local.c b/fs/ocfs2/quota_local.c index dc8007f..942fd65 100644 --- a/fs/ocfs2/quota_local.c +++ b/fs/ocfs2/quota_local.c @@ -404,7 +404,9 @@ struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery( int status = 0; struct ocfs2_quota_recovery *rec; - mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num); + printk(KERN_NOTICE "ocfs2: Beginning quota recovery on device (%s) for " + "slot %u\n", osb->dev_str, slot_num); + rec = ocfs2_alloc_quota_recovery(); if (!rec) return ERR_PTR(-ENOMEM); @@ -596,7 +598,9 @@ int ocfs2_finish_quota_recovery(struct ocfs2_super *osb, struct inode *lqinode; unsigned int flags; - mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num); + printk(KERN_NOTICE "ocfs2: Finishing quota recovery on device (%s) for " + "slot %u\n", osb->dev_str, slot_num); + mutex_lock(&sb_dqopt(sb)->dqonoff_mutex); for (type = 0; type < MAXQUOTAS; type++) { if (list_empty(&(rec->r_list[type]))) @@ -612,8 +616,9 @@ int ocfs2_finish_quota_recovery(struct ocfs2_super *osb, /* Someone else is holding the lock? Then he must be * doing the recovery. Just skip the file... */ if (status == -EAGAIN) { - mlog(ML_NOTICE, "skipping quota recovery for slot %d " - "because quota file is locked.\n", slot_num); + printk(KERN_NOTICE "ocfs2: Skipping quota recovery on " + "device (%s) for slot %d because quota file is " + "locked.\n", osb->dev_str, slot_num); status = 0; goto out_put; } else if (status < 0) { diff --git a/fs/ocfs2/slot_map.c b/fs/ocfs2/slot_map.c index 26fc001..1424c15 100644 --- a/fs/ocfs2/slot_map.c +++ b/fs/ocfs2/slot_map.c @@ -493,8 +493,8 @@ int ocfs2_find_slot(struct ocfs2_super *osb) goto bail; } } else - mlog(ML_NOTICE, "slot %d is already allocated to this node!\n", - slot); + printk(KERN_INFO "ocfs2: Slot %d on device (%s) was already " + "allocated to this node!\n", slot, osb->dev_str); ocfs2_set_slot(si, slot, osb->node_num); osb->slot_num = slot; diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index cdbaf5e..41b4b23 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -1107,9 +1107,9 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent) ocfs2_set_ro_flag(osb, 1); - printk(KERN_NOTICE "Readonly device detected. No cluster " - "services will be utilized for this mount. Recovery " - "will be skipped.\n"); + printk(KERN_NOTICE "ocfs2: Readonly device (%s) detected. " + "Cluster services will not be used for this mount. " + "Recovery will be skipped.\n", osb->dev_str); } if (!ocfs2_is_hard_readonly(osb)) { @@ -2462,8 +2462,8 @@ static int ocfs2_check_volume(struct ocfs2_super *osb) goto finally; } } else { - mlog(ML_NOTICE, "File system was not unmounted cleanly, " - "recovering volume.\n"); + printk(KERN_NOTICE "ocfs2: File system on device (%s) was not " + "unmounted cleanly, recovering it.\n", osb->dev_str); } local = ocfs2_mount_local(osb); -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 15/16] ocfs2: Add comment about orphan scanning
Add a comment that explains the reason as to why orphan scan scans all the slots. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/journal.c | 14 ++++++++++++++ 1 files changed, 14 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index 8eaaa78..0a42ae9 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -1811,6 +1811,20 @@ static inline unsigned long ocfs2_orphan_scan_timeout(void) * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This * is done to catch any orphans that are left over in orphan directories. * + * It scans all slots, even ones that are in use. It does so to handle the + * case described below: + * + * Node 1 has an inode it was using. The dentry went away due to memory + * pressure. Node 1 closes the inode, but it's on the free list. The node + * has the open lock. + * Node 2 unlinks the inode. It grabs the dentry lock to notify others, + * but node 1 has no dentry and doesn't get the message. It trylocks the + * open lock, sees that another node has a PR, and does nothing. + * Later node 2 runs its orphan dir. It igets the inode, trylocks the + * open lock, sees the PR still, and does nothing. + * Basically, we have to trigger an orphan iput on node 1. The only way + * for this to happen is if node 1 runs node 2's orphan dir. + * * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT * seconds. It gets an EX lock on os_lockres and checks sequence number * stored in LVB. If the sequence number has changed, it means some other -- 1.7.4.1
Sunil Mushran
2011-Jul-14 22:50 UTC
[Ocfs2-devel] [PATCH 16/16] ocfs2: Fix ocfs2_page_mkwrite()
From: Wengang Wang <wen.gang.wang at oracle.com> This patch address two shortcomings in ocfs2_page_mkwrite(): 1. Makes the function return better VM_FAULT_* errors. 2. It handles a error that is triggered when a page is dropped from the mapping due to memory pressure. This patch locks the page to prevent that. [Patch was cleaned up by Sunil Mushran.] Signed-off-by: Wengang Wang <wen.gang.wang at oracle.com> Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> --- fs/ocfs2/aops.c | 51 +++++++++++++++++++++++++++++++++++++++++++-------- fs/ocfs2/mmap.c | 53 ++++++++++++++++++++++++----------------------------- 2 files changed, 67 insertions(+), 37 deletions(-) diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c index ac97bca..92e363b 100644 --- a/fs/ocfs2/aops.c +++ b/fs/ocfs2/aops.c @@ -865,6 +865,12 @@ struct ocfs2_write_ctxt { struct page *w_target_page; /* + * w_target_locked is used for page_mkwrite path indicating no unlocking + * against w_target_page in ocfs2_write_end_nolock. + */ + unsigned int w_target_locked:1; + + /* * ocfs2_write_end() uses this to know what the real range to * write in the target should be. */ @@ -897,6 +903,24 @@ void ocfs2_unlock_and_free_pages(struct page **pages, int num_pages) static void ocfs2_free_write_ctxt(struct ocfs2_write_ctxt *wc) { + int i; + + /* + * w_target_locked is only set to true in the page_mkwrite() case. + * The intent is to allow us to lock the target page from write_begin() + * to write_end(). The caller must hold a ref on w_target_page. + */ + if (wc->w_target_locked) { + BUG_ON(!wc->w_target_page); + for (i = 0; i < wc->w_num_pages; i++) { + if (wc->w_target_page == wc->w_pages[i]) { + wc->w_pages[i] = NULL; + break; + } + } + mark_page_accessed(wc->w_target_page); + page_cache_release(wc->w_target_page); + } ocfs2_unlock_and_free_pages(wc->w_pages, wc->w_num_pages); brelse(wc->w_di_bh); @@ -1134,20 +1158,17 @@ static int ocfs2_grab_pages_for_write(struct address_space *mapping, */ lock_page(mmap_page); + /* Exit and let the caller retry */ if (mmap_page->mapping != mapping) { + WARN_ON(mmap_page->mapping); unlock_page(mmap_page); - /* - * Sanity check - the locking in - * ocfs2_pagemkwrite() should ensure - * that this code doesn't trigger. - */ - ret = -EINVAL; - mlog_errno(ret); + ret = -EAGAIN; goto out; } page_cache_get(mmap_page); wc->w_pages[i] = mmap_page; + wc->w_target_locked = true; } else { wc->w_pages[i] = find_or_create_page(mapping, index, GFP_NOFS); @@ -1162,6 +1183,8 @@ static int ocfs2_grab_pages_for_write(struct address_space *mapping, wc->w_target_page = wc->w_pages[i]; } out: + if (ret) + wc->w_target_locked = false; return ret; } @@ -1819,11 +1842,23 @@ try_again: */ ret = ocfs2_grab_pages_for_write(mapping, wc, wc->w_cpos, pos, len, cluster_of_pages, mmap_page); - if (ret) { + if (ret && ret != -EAGAIN) { mlog_errno(ret); goto out_quota; } + /* + * ocfs2_grab_pages_for_write() returns -EAGAIN if it could not lock + * the target page. In this case, we exit with no error and no target + * page. This will trigger the caller, page_mkwrite(), to re-try + * the operation. + */ + if (ret == -EAGAIN) { + BUG_ON(wc->w_target_page); + ret = 0; + goto out_quota; + } + ret = ocfs2_write_cluster_by_desc(mapping, data_ac, meta_ac, wc, pos, len); if (ret) { diff --git a/fs/ocfs2/mmap.c b/fs/ocfs2/mmap.c index 3e9393c..9cd4108 100644 --- a/fs/ocfs2/mmap.c +++ b/fs/ocfs2/mmap.c @@ -61,7 +61,7 @@ static int ocfs2_fault(struct vm_area_struct *area, struct vm_fault *vmf) static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh, struct page *page) { - int ret; + int ret = VM_FAULT_NOPAGE; struct inode *inode = file->f_path.dentry->d_inode; struct address_space *mapping = inode->i_mapping; loff_t pos = page_offset(page); @@ -71,32 +71,25 @@ static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh, void *fsdata; loff_t size = i_size_read(inode); - /* - * Another node might have truncated while we were waiting on - * cluster locks. - * We don't check size == 0 before the shift. This is borrowed - * from do_generic_file_read. - */ last_index = (size - 1) >> PAGE_CACHE_SHIFT; - if (unlikely(!size || page->index > last_index)) { - ret = -EINVAL; - goto out; - } /* - * The i_size check above doesn't catch the case where nodes - * truncated and then re-extended the file. We'll re-check the - * page mapping after taking the page lock inside of - * ocfs2_write_begin_nolock(). + * There are cases that lead to the page no longer bebongs to the + * mapping. + * 1) pagecache truncates locally due to memory pressure. + * 2) pagecache truncates when another is taking EX lock against + * inode lock. see ocfs2_data_convert_worker. + * + * The i_size check doesn't catch the case where nodes truncated and + * then re-extended the file. We'll re-check the page mapping after + * taking the page lock inside of ocfs2_write_begin_nolock(). + * + * Let VM retry with these cases. */ - if (!PageUptodate(page) || page->mapping != inode->i_mapping) { - /* - * the page has been umapped in ocfs2_data_downconvert_worker. - * So return 0 here and let VFS retry. - */ - ret = 0; + if ((page->mapping != inode->i_mapping) || + (!PageUptodate(page)) || + (page_offset(page) >= size)) goto out; - } /* * Call ocfs2_write_begin() and ocfs2_write_end() to take @@ -116,17 +109,21 @@ static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh, if (ret) { if (ret != -ENOSPC) mlog_errno(ret); + if (ret == -ENOMEM) + ret = VM_FAULT_OOM; + else + ret = VM_FAULT_SIGBUS; goto out; } - ret = ocfs2_write_end_nolock(mapping, pos, len, len, locked_page, - fsdata); - if (ret < 0) { - mlog_errno(ret); + if (!locked_page) { + ret = VM_FAULT_NOPAGE; goto out; } + ret = ocfs2_write_end_nolock(mapping, pos, len, len, locked_page, + fsdata); BUG_ON(ret != len); - ret = 0; + ret = VM_FAULT_LOCKED; out: return ret; } @@ -168,8 +165,6 @@ static int ocfs2_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) out: ocfs2_unblock_signals(&oldset); - if (ret) - ret = VM_FAULT_SIGBUS; return ret; } -- 1.7.4.1
Joel Becker
2011-Aug-22 04:23 UTC
[Ocfs2-devel] ocfs2 cleanups, bug fixes for Linux kernel 3.1 - final drop
On Thu, Jul 14, 2011 at 03:50:11PM -0700, Sunil Mushran wrote:> Joel, > > Hopefully this is the final drop before the 3.1 merge window. I had posted most > of the patches before. In fact the first 12 are exactly the same. The 13th one > is modified and 14,15,16 are new.Hey Sunil, These changes are now in the 'fixes' branch of ocfs2.git. Sorry this took so long. I plan to send the SEEK_HOLE patch along, and if Linus wants it out I will take it out. My only comment on the series, which is otherwise quite nicely put together, is that new mlog()s should be replaced by trace events. I'm going to let them in for this time, but please consider updating them in another patch. Joel -- "If you are ever in doubt as to whether or not to kiss a pretty girl, give her the benefit of the doubt" -Thomas Carlyle http://www.jlbec.org/ jlbec at evilplan.org