Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] OCFS2 1.4: Patches backported from mainline
Please review the list of patches being applied to the ocfs2 1.4 tree. All patches list the mainline commit hash. Thanks Sunil
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 01/26] ocfs2: Wakeup the downconvert thread after a successful cancel convert
Mainline commit a4b91965d39d5d53b470d6aa62cba155a6f3ffe1 When two nodes holding PR locks on a resource concurrently attempt to upconvert the locks to EX, the master sends a BAST to one of the nodes. This message tells that node to first cancel convert the upconvert request, followed by downconvert to a NL. Only when this lock is downconverted to NL, can the master upconvert the first node's lock to EX. While the fs was doing the cancel convert, it was forgetting to wake up the dc thread after a successful cancel, leading to a deadlock. Reported-and-Tested-by: David Teigland <teigland at redhat.com> Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlmglue.c | 4 ++++ 1 files changed, 4 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index 7eceb0a..2580e4e 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -2745,6 +2745,10 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) case OCFS2_UNLOCK_CANCEL_CONVERT: mlog(0, "Cancel convert success for %s\n", lockres->l_name); lockres->l_action = OCFS2_AST_INVALID; + /* Downconvert thread may have requeued this lock, we + * need to wake it. */ + if (lockres->l_flags & OCFS2_LOCK_BLOCKED) + ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); break; case OCFS2_UNLOCK_DROP_LOCK: lockres->l_level = LKM_IVMODE; -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 02/26] ocfs2/dlm: Retract fix for race between purge and migrate
Mainline commit 7dc102b737e9f49dac426161294cb2d326a97d8e Mainline commit d4f7e650e55af6b235871126f747da88600e8040 attempts to delay the dlm_thread from sending the drop ref message if the lockres is being migrated. The problem is that we make the dlm_thread wait for the migration to complete. This causes a deadlock as dlm_thread also participates in the lockres migration process. A better fix for the original oss bugzilla#1012 is in testing. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Acked-by: Joel Becker <joel.becker at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmthread.c | 3 +-- 1 files changed, 1 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index d129520..4060bb3 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -181,8 +181,7 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); /* This ensures that clear refmap is sent after the set */ - __dlm_wait_on_lockres_flags(res, (DLM_LOCK_RES_SETREF_INPROG | - DLM_LOCK_RES_MIGRATING)); + __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); spin_unlock(&res->spinlock); /* clear our bit from the master's refmap, ignore errors */ -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 03/26] ocfs2: Cleanup the lockname print in dlmglue.c
Mainline commit c74ff8bb2235d848beb67fcfddae71ecbe3f92b1 The dentry lock has a different format than other locks. This patch fixes ocfs2_log_dlm_error() macro to make it print the dentry lock correctly. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Acked-by: Joel Becker <joel.becker at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlmglue.c | 16 ++++++++++++---- 1 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index 2580e4e..bacb092 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -334,10 +334,18 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres); static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert); -#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ - mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ - "resource %s: %s\n", dlm_errname(_stat), _func, \ - _lockres->l_name, dlm_errmsg(_stat)); \ +#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ + if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ + mlog(ML_ERROR, "DLM error %s while calling %s on " \ + "resource %s: %s\n", dlm_errname(_stat), _func, \ + _lockres->l_name, dlm_errmsg(_stat)); \ + else \ + mlog(ML_ERROR, "DLM error %s while calling %s on " \ + "resource %.*s%08x: %s\n", dlm_errname(_stat), \ + _func, OCFS2_DENTRY_LOCK_INO_START - 1, \ + _lockres->l_name, \ + (unsigned int)ocfs2_get_dentry_lock_ino(_lockres), \ + dlm_errmsg(_stat)); \ } while (0) static int ocfs2_downconvert_thread(void *arg); static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 04/26] ocfs2/dlm: Use ast_lock to protect ast_list
Mainline commit dabc47de7a23f57522dc762d9d2ad875700d3497 The code was using dlm->spinlock instead of dlm->ast_lock to protect the ast_list. This patch fixes the issue. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Acked-by: Joel Becker <joel.becker at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmunlock.c | 4 ++-- 1 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 86ca085..fcf879e 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -117,11 +117,11 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, else BUG_ON(res->owner == dlm->node_num); - spin_lock(&dlm->spinlock); + spin_lock(&dlm->ast_lock); /* We want to be sure that we're not freeing a lock * that still has AST's pending... */ in_use = !list_empty(&lock->ast_list); - spin_unlock(&dlm->spinlock); + spin_unlock(&dlm->ast_lock); if (in_use) { mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock " "while waiting for an ast!", res->lockname.len, -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 05/26] ocfs2/dlm: Make dlm_assert_master_handler() kill itself instead of the asserter
Mainline commit 53ecd25e148615e0ed2a72635cc76f4773f97f90 In dlm_assert_master_handler(), if we get an incorrect assert master from a node that, we reply with EINVAL asking the asserter to die. The problem is that an assert is sent after so many hoops, it is invariably the node that thinks the asserter is wrong, is actually wrong. So instead of killing the asserter, this patch kills the assertee. This patch papers over a race that is still being addressed. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Acked-by: Joel Becker <joel.becker at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmmaster.c | 12 ++++++------ 1 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index b0ea529..7d4e0ac 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1849,12 +1849,12 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, if (!mle) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && res->owner != assert->node_idx) { - mlog(ML_ERROR, "assert_master from " - "%u, but current owner is " - "%u! (%.*s)\n", - assert->node_idx, res->owner, - namelen, name); - goto kill; + mlog(ML_ERROR, "DIE! Mastery assert from %u, " + "but current owner is %u! (%.*s)\n", + assert->node_idx, res->owner, namelen, + name); + __dlm_print_one_lock_resource(res); + BUG(); } } else if (mle->type != DLM_MLE_MIGRATION) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 06/26] ocfs2: add IO error check in ocfs2_get_sector()
From: wengang wang <wen.gang.wang at oracle.com> Mainline commit 28d57d437786eb3e44f1ca3f0f41e7cfe29c6dd4 Check for IO error in ocfs2_get_sector(). Signed-off-by: Wengang Wang <wen.gang.wang at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/super.c | 7 +++++++ 1 files changed, 7 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index 535ccb5..5f5cb0f 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -1203,6 +1203,13 @@ static int ocfs2_get_sector(struct super_block *sb, unlock_buffer(*bh); ll_rw_block(READ, 1, bh); wait_on_buffer(*bh); + if (!buffer_uptodate(*bh)) { + mlog_errno(-EIO); + brelse(*bh); + *bh = NULL; + return -EIO; + } + return 0; } -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 07/26] ocfs2/hb: Expose the list of heartbeating nodes via debugfs
Mainline commit 87d3d3f3931f3e0fca44fbb5c06ad45fc4dca9bc This patch creates a debugfs file, o2hb/livesnodes, which exposes the aggregate list of heartbeating node across all heartbeat regions. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/cluster/heartbeat.c | 96 +++++++++++++++++++++++++++++++++++++++- fs/ocfs2/cluster/heartbeat.h | 3 +- fs/ocfs2/cluster/nodemanager.c | 9 +++- 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 3e52fa3..af5861d 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -33,6 +33,7 @@ #include <linux/random.h> #include <linux/crc32.h> #include <linux/time.h> +#include <linux/debugfs.h> #include "heartbeat.h" #include "tcp.h" @@ -60,6 +61,11 @@ static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; static LIST_HEAD(o2hb_node_events); static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); +#define O2HB_DEBUG_DIR "o2hb" +#define O2HB_DEBUG_LIVENODES "livenodes" +static struct dentry *o2hb_debug_dir; +static struct dentry *o2hb_debug_livenodes; + static LIST_HEAD(o2hb_all_regions); static struct o2hb_callback { @@ -913,7 +919,77 @@ static int o2hb_thread(void *data) return 0; } -void o2hb_init(void) +#ifdef CONFIG_DEBUG_FS +static int o2hb_debug_open(struct inode *inode, struct file *file) +{ + unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; + char *buf = NULL; + int i = -1; + int out = 0; + + buf = kmalloc(PAGE_SIZE, GFP_KERNEL); + if (!buf) + goto bail; + + o2hb_fill_node_map(map, sizeof(map)); + + while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) + out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i); + out += snprintf(buf + out, PAGE_SIZE - out, "\n"); + + i_size_write(inode, out); + + file->private_data = buf; + + return 0; +bail: + return -ENOMEM; +} + +static int o2hb_debug_release(struct inode *inode, struct file *file) +{ + kfree(file->private_data); + return 0; +} + +static ssize_t o2hb_debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) +{ + return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, + i_size_read(file->f_mapping->host)); +} +#else +static int o2hb_debug_open(struct inode *inode, struct file *file) +{ + return 0; +} +static int o2hb_debug_release(struct inode *inode, struct file *file) +{ + return 0; +} +static ssize_t o2hb_debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) +{ + return 0; +} +#endif /* CONFIG_DEBUG_FS */ + +static struct file_operations o2hb_debug_fops = { + .open = o2hb_debug_open, + .release = o2hb_debug_release, + .read = o2hb_debug_read, + .llseek = generic_file_llseek, +}; + +void o2hb_exit(void) +{ + if (o2hb_debug_livenodes) + debugfs_remove(o2hb_debug_livenodes); + if (o2hb_debug_dir) + debugfs_remove(o2hb_debug_dir); +} + +int o2hb_init(void) { int i; @@ -926,6 +1002,24 @@ void o2hb_init(void) INIT_LIST_HEAD(&o2hb_node_events); memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); + + o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL); + if (!o2hb_debug_dir) { + mlog_errno(-ENOMEM); + return -ENOMEM; + } + + o2hb_debug_livenodes = debugfs_create_file(O2HB_DEBUG_LIVENODES, + S_IFREG|S_IRUSR, + o2hb_debug_dir, NULL, + &o2hb_debug_fops); + if (!o2hb_debug_livenodes) { + mlog_errno(-ENOMEM); + debugfs_remove(o2hb_debug_dir); + return -ENOMEM; + } + + return 0; } /* if we're already in a callback then we're already serialized by the sem */ diff --git a/fs/ocfs2/cluster/heartbeat.h b/fs/ocfs2/cluster/heartbeat.h index e511339..2f16492 100644 --- a/fs/ocfs2/cluster/heartbeat.h +++ b/fs/ocfs2/cluster/heartbeat.h @@ -75,7 +75,8 @@ void o2hb_unregister_callback(const char *region_uuid, struct o2hb_callback_func *hc); void o2hb_fill_node_map(unsigned long *map, unsigned bytes); -void o2hb_init(void); +void o2hb_exit(void); +int o2hb_init(void); int o2hb_check_node_heartbeating(u8 node_num); int o2hb_check_node_heartbeating_from_callback(u8 node_num); int o2hb_check_local_node_heartbeating(void); diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c index 8eff5bf..4378986 100644 --- a/fs/ocfs2/cluster/nodemanager.c +++ b/fs/ocfs2/cluster/nodemanager.c @@ -950,6 +950,7 @@ static void __exit exit_o2nm(void) o2cb_sys_shutdown(); o2net_exit(); + o2hb_exit(); } static int __init init_o2nm(void) @@ -958,11 +959,13 @@ static int __init init_o2nm(void) cluster_print_version(); - o2hb_init(); + ret = o2hb_init(); + if (ret) + goto out; ret = o2net_init(); if (ret) - goto out; + goto out_o2hb; ocfs2_table_header = kapi_register_sysctl_table(ocfs2_root_table); if (!ocfs2_table_header) { @@ -994,6 +997,8 @@ out_sysctl: unregister_sysctl_table(ocfs2_table_header); out_o2net: o2net_exit(); +out_o2hb: + o2hb_exit(); out: return ret; } -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 08/26] ocfs2: Expose the file system state via debugfs
Mainline commit 50397507e856455b3f5cb3d5c7c482209f9e46a0 This patch creates a per mount debugfs file, fs_state, which exposes information like, cluster stack in use, states of the downconvert, recovery and commit threads, number of journal txns, some allocation stats, list of all slots, etc. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/ocfs2.h | 1 + fs/ocfs2/super.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index 83a30a0..e523a9c 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -293,6 +293,7 @@ struct ocfs2_super struct dlm_protocol_version osb_locking_proto; struct dentry *osb_debug_root; + struct dentry *osb_ctxt; wait_queue_head_t recovery_event; diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index 5f5cb0f..fc806d6 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -193,6 +193,159 @@ static match_table_t tokens = { {Opt_err, NULL} }; +#ifdef CONFIG_DEBUG_FS +static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len) +{ + int out = 0; + int i; + + out += snprintf(buf + out, len - out, + "%10s => Id: %-s Uuid: %-s Gen: 0x%X Label: %-s\n", + "Device", osb->dev_str, osb->uuid_str, + osb->fs_generation, osb->vol_label); + + out += snprintf(buf + out, len - out, + "%10s => State: %d Flags: 0x%lX\n", "Volume", + atomic_read(&osb->vol_state), osb->osb_flags); + + out += snprintf(buf + out, len - out, + "%10s => Block: %lu Cluster: %d\n", "Sizes", + osb->sb->s_blocksize, osb->s_clustersize); + + out += snprintf(buf + out, len - out, + "%10s => Compat: 0x%X Incompat: 0x%X " + "ROcompat: 0x%X\n", + "Features", osb->s_feature_compat, + osb->s_feature_incompat, osb->s_feature_ro_compat); + + out += snprintf(buf + out, len - out, + "%10s => Opts: 0x%lX AtimeQuanta: %u\n", "Mount", + osb->s_mount_opt, osb->s_atime_quantum); + + spin_lock(&osb->dc_task_lock); + out += snprintf(buf + out, len - out, + "%10s => Pid: %d Count: %lu WakeSeq: %lu " + "WorkSeq: %lu\n", "DownCnvt", + task_pid_nr(osb->dc_task), osb->blocked_lock_count, + osb->dc_wake_sequence, osb->dc_work_sequence); + spin_unlock(&osb->dc_task_lock); + + out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:", + "Recovery", + (osb->recovery_thread_task ? + task_pid_nr(osb->recovery_thread_task) : -1)); + if (ocfs2_node_map_is_empty(osb, &osb->recovery_map)) + out += snprintf(buf + out, len - out, " None\n"); + else { + for (i = 0; i < O2NM_MAX_NODES; i++) { + if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, i)) + out += snprintf(buf + out, len - out, " %d", i); + } + out += snprintf(buf + out, len - out, "\n"); + } + + out += snprintf(buf + out, len - out, + "%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit", + task_pid_nr(osb->commit_task), osb->osb_commit_interval, + atomic_read(&osb->needs_checkpoint)); + + out += snprintf(buf + out, len - out, + "%10s => State: %d NumTxns: %d TxnId: %lu\n", + "Journal", osb->journal->j_state, + atomic_read(&osb->journal->j_num_trans), + osb->journal->j_trans_id); + + out += snprintf(buf + out, len - out, + "%10s => GlobalAllocs: %d LocalAllocs: %d " + "SubAllocs: %d LAWinMoves: %d SAExtends: %d\n", + "Stats", + atomic_read(&osb->alloc_stats.bitmap_data), + atomic_read(&osb->alloc_stats.local_data), + atomic_read(&osb->alloc_stats.bg_allocs), + atomic_read(&osb->alloc_stats.moves), + atomic_read(&osb->alloc_stats.bg_extends)); + + out += snprintf(buf + out, len - out, + "%10s => State: %u Descriptor: %llu Size: %u bits " + "Default: %u bits\n", + "LocalAlloc", osb->local_alloc_state, + (unsigned long long)osb->la_last_gd, + osb->local_alloc_bits, osb->local_alloc_default_bits); + + spin_lock(&osb->osb_lock); + out += snprintf(buf + out, len - out, + "%10s => Slot: %d NumStolen: %d\n", "Steal", + osb->s_inode_steal_slot, + atomic_read(&osb->s_num_inodes_stolen)); + spin_unlock(&osb->osb_lock); + + out += snprintf(buf + out, len - out, "%10s => %3s %10s\n", + "Slots", "Num", "RecoGen"); + + for (i = 0; i < osb->max_slots; ++i) { + out += snprintf(buf + out, len - out, + "%10s %c %3d %10d\n", + " ", + (i == osb->slot_num ? '*' : ' '), + i, osb->slot_recovery_generations[i]); + } + + return out; +} + +static int ocfs2_osb_debug_open(struct inode *inode, struct file *file) +{ + struct ocfs2_super *osb = inode->i_private; + char *buf = NULL; + + buf = kmalloc(PAGE_SIZE, GFP_KERNEL); + if (!buf) + goto bail; + + i_size_write(inode, ocfs2_osb_dump(osb, buf, PAGE_SIZE)); + + file->private_data = buf; + + return 0; +bail: + return -ENOMEM; +} + +static int ocfs2_debug_release(struct inode *inode, struct file *file) +{ + kfree(file->private_data); + return 0; +} + +static ssize_t ocfs2_debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) +{ + return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, + i_size_read(file->f_mapping->host)); +} +#else +static int ocfs2_osb_debug_open(struct inode *inode, struct file *file) +{ + return 0; +} +static int ocfs2_debug_release(struct inode *inode, struct file *file) +{ + return 0; +} +static ssize_t ocfs2_debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) +{ + return 0; +} +#endif /* CONFIG_DEBUG_FS */ + +static struct file_operations ocfs2_osb_debug_fops = { + .open = ocfs2_osb_debug_open, + .release = ocfs2_debug_release, + .read = ocfs2_debug_read, + .llseek = generic_file_llseek, +}; + /* * write_super and sync_fs ripped right out of ext3. */ @@ -691,6 +844,16 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent) goto read_super_error; } + osb->osb_ctxt = debugfs_create_file("fs_state", S_IFREG|S_IRUSR, + osb->osb_debug_root, + osb, + &ocfs2_osb_debug_fops); + if (!osb->osb_ctxt) { + status = -EINVAL; + mlog_errno(status); + goto read_super_error; + } + status = ocfs2_mount_volume(sb); if (osb->root_inode) inode = igrab(osb->root_inode); @@ -1326,6 +1489,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err) osb = OCFS2_SB(sb); BUG_ON(!osb); + debugfs_remove(osb->osb_ctxt); + ocfs2_shutdown_local_alloc(osb); ocfs2_truncate_log_shutdown(osb); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 09/26] ocfs2: Remove debugfs file local_alloc_stats
Mainline commit 59b526a30722f29e5dba6210a6e0fc34e3149b94 This patch removes the debugfs file local_alloc_stats as that information is now included in the fs_state debugfs file. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/localalloc.c | 86 ------------------------------------------------- fs/ocfs2/ocfs2.h | 5 --- 2 files changed, 0 insertions(+), 91 deletions(-) diff --git a/fs/ocfs2/localalloc.c b/fs/ocfs2/localalloc.c index 2be8152..d4ff66a 100644 --- a/fs/ocfs2/localalloc.c +++ b/fs/ocfs2/localalloc.c @@ -28,7 +28,6 @@ #include <linux/slab.h> #include <linux/highmem.h> #include <linux/bitops.h> -#include <linux/debugfs.h> #define MLOG_MASK_PREFIX ML_DISK_ALLOC #include <cluster/masklog.h> @@ -74,84 +73,6 @@ static int ocfs2_local_alloc_new_window(struct ocfs2_super *osb, static int ocfs2_local_alloc_slide_window(struct ocfs2_super *osb, struct inode *local_alloc_inode); -#ifdef CONFIG_OCFS2_FS_STATS - -static int ocfs2_la_debug_open(struct inode *inode, struct file *file) -{ - file->private_data = inode->i_private; - return 0; -} - -#define LA_DEBUG_BUF_SZ PAGE_CACHE_SIZE -#define LA_DEBUG_VER 1 -static ssize_t ocfs2_la_debug_read(struct file *file, char __user *userbuf, - size_t count, loff_t *ppos) -{ - static DEFINE_MUTEX(la_debug_mutex); - struct ocfs2_super *osb = file->private_data; - int written, ret; - char *buf = osb->local_alloc_debug_buf; - - mutex_lock(&la_debug_mutex); - memset(buf, 0, LA_DEBUG_BUF_SZ); - - written = snprintf(buf, LA_DEBUG_BUF_SZ, - "0x%x\t0x%llx\t%u\t%u\t0x%x\n", - LA_DEBUG_VER, - (unsigned long long)osb->la_last_gd, - osb->local_alloc_default_bits, - osb->local_alloc_bits, osb->local_alloc_state); - - ret = simple_read_from_buffer(userbuf, count, ppos, buf, written); - - mutex_unlock(&la_debug_mutex); - return ret; -} - -static const struct file_operations ocfs2_la_debug_fops = { - .open = ocfs2_la_debug_open, - .read = ocfs2_la_debug_read, -}; - -static void ocfs2_init_la_debug(struct ocfs2_super *osb) -{ - osb->local_alloc_debug_buf = kmalloc(LA_DEBUG_BUF_SZ, GFP_NOFS); - if (!osb->local_alloc_debug_buf) - return; - - osb->local_alloc_debug = debugfs_create_file("local_alloc_stats", - S_IFREG|S_IRUSR, - osb->osb_debug_root, - osb, - &ocfs2_la_debug_fops); - if (!osb->local_alloc_debug) { - kfree(osb->local_alloc_debug_buf); - osb->local_alloc_debug_buf = NULL; - } -} - -static void ocfs2_shutdown_la_debug(struct ocfs2_super *osb) -{ - if (osb->local_alloc_debug) - debugfs_remove(osb->local_alloc_debug); - - if (osb->local_alloc_debug_buf) - kfree(osb->local_alloc_debug_buf); - - osb->local_alloc_debug_buf = NULL; - osb->local_alloc_debug = NULL; -} -#else /* CONFIG_OCFS2_FS_STATS */ -static void ocfs2_init_la_debug(struct ocfs2_super *osb) -{ - return; -} -static void ocfs2_shutdown_la_debug(struct ocfs2_super *osb) -{ - return; -} -#endif - static inline int ocfs2_la_state_enabled(struct ocfs2_super *osb) { return (osb->local_alloc_state == OCFS2_LA_THROTTLED || @@ -225,8 +146,6 @@ int ocfs2_load_local_alloc(struct ocfs2_super *osb) mlog_entry_void(); - ocfs2_init_la_debug(osb); - if (osb->local_alloc_bits == 0) goto bail; @@ -298,9 +217,6 @@ bail: if (inode) iput(inode); - if (status < 0) - ocfs2_shutdown_la_debug(osb); - mlog(0, "Local alloc window bits = %d\n", osb->local_alloc_bits); mlog_exit(status); @@ -330,8 +246,6 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb) cancel_delayed_work(&osb->la_enable_wq); flush_workqueue(ocfs2_wq); - ocfs2_shutdown_la_debug(osb); - if (osb->local_alloc_state == OCFS2_LA_UNUSED) goto out; diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index e523a9c..aa4fa41 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -272,11 +272,6 @@ struct ocfs2_super u64 la_last_gd; -#ifdef CONFIG_OCFS2_FS_STATS - struct dentry *local_alloc_debug; - char *local_alloc_debug_buf; -#endif - /* Next two fields are for local node slot recovery during * mount. */ int dirty; -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 10/26] ocfs2: Optimize inode allocation by remembering last group
From: Tao Ma <tao.ma at oracle.com> Mainline commit 138211515c102807a16c02fdc15feef1f6ef8124 In ocfs2, the inode block search looks for the "emptiest" inode group to allocate from. So if an inode alloc file has many equally (or almost equally) empty groups, new inodes will tend to get spread out amongst them, which in turn can put them all over the disk. This is undesirable because directory operations on conceptually "nearby" inodes force a large number of seeks. So we add ip_last_used_group in core directory inodes which records the last used allocation group. Another field named ip_last_used_slot is also added in case inode stealing happens. When claiming new inode, we passed in directory's inode so that the allocation can use this information. For more details, please see http://oss.oracle.com/osswiki/OCFS2/DesignDocs/InodeAllocationStrategy. Signed-off-by: Tao Ma <tao.ma at oracle.com> --- fs/ocfs2/inode.c | 2 ++ fs/ocfs2/inode.h | 4 ++++ fs/ocfs2/namei.c | 4 ++-- fs/ocfs2/suballoc.c | 36 ++++++++++++++++++++++++++++++++++++ fs/ocfs2/suballoc.h | 2 ++ 5 files changed, 46 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c index ee6bf13..784761c 100644 --- a/fs/ocfs2/inode.c +++ b/fs/ocfs2/inode.c @@ -346,6 +346,8 @@ int ocfs2_populate_inode(struct inode *inode, struct ocfs2_dinode *fe, ocfs2_set_inode_flags(inode); + OCFS2_I(inode)->ip_last_used_slot = 0; + OCFS2_I(inode)->ip_last_used_group = 0; status = 0; bail: mlog_exit(status); diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h index 01437ca..077eb59 100644 --- a/fs/ocfs2/inode.h +++ b/fs/ocfs2/inode.h @@ -68,6 +68,10 @@ struct ocfs2_inode_info struct ocfs2_extent_map ip_extent_map; struct inode vfs_inode; + + /* Only valid if the inode is the dir. */ + u32 ip_last_used_slot; + u64 ip_last_used_group; }; /* diff --git a/fs/ocfs2/namei.c b/fs/ocfs2/namei.c index c8206f9..eba1148 100644 --- a/fs/ocfs2/namei.c +++ b/fs/ocfs2/namei.c @@ -369,8 +369,8 @@ static int ocfs2_mknod_locked(struct ocfs2_super *osb, *new_fe_bh = NULL; *ret_inode = NULL; - status = ocfs2_claim_new_inode(osb, handle, inode_ac, &suballoc_bit, - &fe_blkno); + status = ocfs2_claim_new_inode(osb, handle, dir, parent_fe_bh, + inode_ac, &suballoc_bit, &fe_blkno); if (status < 0) { mlog_errno(status); goto leave; diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c index 228bf1c..0f1f2e9 100644 --- a/fs/ocfs2/suballoc.c +++ b/fs/ocfs2/suballoc.c @@ -1462,8 +1462,41 @@ bail: return status; } +static void ocfs2_init_inode_ac_group(struct inode *dir, + struct buffer_head *parent_fe_bh, + struct ocfs2_alloc_context *ac) +{ + struct ocfs2_dinode *fe = (struct ocfs2_dinode *)parent_fe_bh->b_data; + /* + * Try to allocate inodes from some specific group. + * + * If the parent dir has recorded the last group used in allocation, + * cool, use it. Otherwise if we try to allocate new inode from the + * same slot the parent dir belongs to, use the same chunk. + * + * We are very careful here to avoid the mistake of setting + * ac_last_group to a group descriptor from a different (unlocked) slot. + */ + if (OCFS2_I(dir)->ip_last_used_group && + OCFS2_I(dir)->ip_last_used_slot == ac->ac_alloc_slot) + ac->ac_last_group = OCFS2_I(dir)->ip_last_used_group; + else if (le16_to_cpu(fe->i_suballoc_slot) == ac->ac_alloc_slot) + ac->ac_last_group = ocfs2_which_suballoc_group( + le64_to_cpu(fe->i_blkno), + le16_to_cpu(fe->i_suballoc_bit)); +} + +static inline void ocfs2_save_inode_ac_group(struct inode *dir, + struct ocfs2_alloc_context *ac) +{ + OCFS2_I(dir)->ip_last_used_group = ac->ac_last_group; + OCFS2_I(dir)->ip_last_used_slot = ac->ac_alloc_slot; +} + int ocfs2_claim_new_inode(struct ocfs2_super *osb, handle_t *handle, + struct inode *dir, + struct buffer_head *parent_fe_bh, struct ocfs2_alloc_context *ac, u16 *suballoc_bit, u64 *fe_blkno) @@ -1479,6 +1512,8 @@ int ocfs2_claim_new_inode(struct ocfs2_super *osb, BUG_ON(ac->ac_bits_wanted != 1); BUG_ON(ac->ac_which != OCFS2_AC_USE_INODE); + ocfs2_init_inode_ac_group(dir, parent_fe_bh, ac); + status = ocfs2_claim_suballoc_bits(osb, ac, handle, @@ -1497,6 +1532,7 @@ int ocfs2_claim_new_inode(struct ocfs2_super *osb, *fe_blkno = bg_blkno + (u64) (*suballoc_bit); ac->ac_bits_given++; + ocfs2_save_inode_ac_group(dir, ac); status = 0; bail: mlog_exit(status); diff --git a/fs/ocfs2/suballoc.h b/fs/ocfs2/suballoc.h index 40d51da..16f9c9c 100644 --- a/fs/ocfs2/suballoc.h +++ b/fs/ocfs2/suballoc.h @@ -77,6 +77,8 @@ int ocfs2_claim_metadata(struct ocfs2_super *osb, u64 *blkno_start); int ocfs2_claim_new_inode(struct ocfs2_super *osb, handle_t *handle, + struct inode *dir, + struct buffer_head *parent_fe_bh, struct ocfs2_alloc_context *ac, u16 *suballoc_bit, u64 *fe_blkno); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 11/26] ocfs2: Allocate inode groups from global_bitmap.
From: Tao Ma <tao.ma at oracle.com> Mainline commit 60ca81e82dae4aa2e8ae84cf96b4d08535931669 Inode groups used to be allocated from local alloc file, but since we want all inodes to be contiguous enough, we will try to allocate them directly from global_bitmap. Signed-off-by: Tao Ma <tao.ma at oracle.com> --- fs/ocfs2/suballoc.c | 40 +++++++++++++++++++++++++++++----------- 1 files changed, 29 insertions(+), 11 deletions(-) diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c index 0f1f2e9..13e7b88 100644 --- a/fs/ocfs2/suballoc.c +++ b/fs/ocfs2/suballoc.c @@ -47,7 +47,8 @@ #include "buffer_head_io.h" #define NOT_ALLOC_NEW_GROUP 0 -#define ALLOC_NEW_GROUP 1 +#define ALLOC_NEW_GROUP 0x1 +#define ALLOC_GROUPS_FROM_GLOBAL 0x2 #define OCFS2_MAX_INODES_TO_STEAL 1024 @@ -62,7 +63,8 @@ static int ocfs2_block_group_fill(handle_t *handle, struct ocfs2_chain_list *cl); static int ocfs2_block_group_alloc(struct ocfs2_super *osb, struct inode *alloc_inode, - struct buffer_head *bh); + struct buffer_head *bh, + int flags); static int ocfs2_cluster_group_search(struct inode *inode, struct buffer_head *group_bh, @@ -110,6 +112,10 @@ static inline void ocfs2_block_to_cluster_group(struct inode *inode, u64 data_blkno, u64 *bg_blkno, u16 *bg_bit_off); +static int __ocfs2_reserve_clusters(struct ocfs2_super *osb, + u32 bits_wanted, + struct ocfs2_alloc_context **ac, + int flags); void ocfs2_free_ac_resource(struct ocfs2_alloc_context *ac) { @@ -274,7 +280,8 @@ static inline u16 ocfs2_find_smallest_chain(struct ocfs2_chain_list *cl) */ static int ocfs2_block_group_alloc(struct ocfs2_super *osb, struct inode *alloc_inode, - struct buffer_head *bh) + struct buffer_head *bh, + int flags) { int status, credits; struct ocfs2_dinode *fe = (struct ocfs2_dinode *) bh->b_data; @@ -292,9 +299,9 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb, mlog_entry_void(); cl = &fe->id2.i_chain; - status = ocfs2_reserve_clusters(osb, - le16_to_cpu(cl->cl_cpg), - &ac); + status = __ocfs2_reserve_clusters(osb, + le16_to_cpu(cl->cl_cpg), + &ac, flags); if (status < 0) { if (status != -ENOSPC) mlog_errno(status); @@ -402,7 +409,7 @@ static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb, struct ocfs2_alloc_context *ac, int type, u32 slot, - int alloc_new_group) + int flags) { int status; u32 bits_wanted = ac->ac_bits_wanted; @@ -458,7 +465,7 @@ static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb, goto bail; } - if (alloc_new_group != ALLOC_NEW_GROUP) { + if (!(flags & ALLOC_NEW_GROUP)) { mlog(0, "Alloc File %u Full: wanted=%u, free_bits=%u, " "and we don't alloc a new group for it.\n", slot, bits_wanted, free_bits); @@ -466,7 +473,7 @@ static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb, goto bail; } - status = ocfs2_block_group_alloc(osb, alloc_inode, bh); + status = ocfs2_block_group_alloc(osb, alloc_inode, bh, flags); if (status < 0) { if (status != -ENOSPC) mlog_errno(status); @@ -593,7 +600,9 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb, atomic_set(&osb->s_num_inodes_stolen, 0); status = ocfs2_reserve_suballoc_bits(osb, *ac, INODE_ALLOC_SYSTEM_INODE, - osb->slot_num, ALLOC_NEW_GROUP); + osb->slot_num, + ALLOC_NEW_GROUP | + ALLOC_GROUPS_FROM_GLOBAL); if (status >= 0) { status = 0; @@ -661,6 +670,14 @@ int ocfs2_reserve_clusters(struct ocfs2_super *osb, u32 bits_wanted, struct ocfs2_alloc_context **ac) { + return __ocfs2_reserve_clusters(osb, bits_wanted, ac, 0); +} + +static int __ocfs2_reserve_clusters(struct ocfs2_super *osb, + u32 bits_wanted, + struct ocfs2_alloc_context **ac, + int flags) +{ int status; mlog_entry_void(); @@ -675,7 +692,8 @@ int ocfs2_reserve_clusters(struct ocfs2_super *osb, (*ac)->ac_bits_wanted = bits_wanted; status = -ENOSPC; - if (ocfs2_alloc_should_use_local(osb, bits_wanted)) { + if (!(flags & ALLOC_GROUPS_FROM_GLOBAL) && + ocfs2_alloc_should_use_local(osb, bits_wanted)) { status = ocfs2_reserve_local_alloc_bits(osb, bits_wanted, *ac); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 12/26] ocfs2: Optimize inode group allocation by recording last used group.
From: Tao Ma <tao.ma at oracle.com> Mainline commit feb473a6e8bd19297d0f3bb377b25055c0228c0a In ocfs2, the block group search looks for the "emptiest" group to allocate from. So if the allocator has many equally(or almost equally) empty groups, new block group will tend to get spread out amongst them. So we add osb_inode_alloc_group in ocfs2_super to record the last used inode allocation group. For more details, please see http://oss.oracle.com/osswiki/OCFS2/DesignDocs/InodeAllocationStrategy. Signed-off-by: Tao Ma <tao.ma at oracle.com> --- fs/ocfs2/ocfs2.h | 3 +++ fs/ocfs2/suballoc.c | 32 ++++++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index aa4fa41..7c79c84 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -317,6 +317,9 @@ struct ocfs2_super struct ocfs2_node_map osb_recovering_orphan_dirs; unsigned int *osb_orphan_wipes; wait_queue_head_t osb_wipe_event; + + /* the group we used to allocate inodes. */ + u64 osb_inode_alloc_group; }; #define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info) diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c index 13e7b88..ca91445 100644 --- a/fs/ocfs2/suballoc.c +++ b/fs/ocfs2/suballoc.c @@ -64,6 +64,7 @@ static int ocfs2_block_group_fill(handle_t *handle, static int ocfs2_block_group_alloc(struct ocfs2_super *osb, struct inode *alloc_inode, struct buffer_head *bh, + u64 *last_alloc_group, int flags); static int ocfs2_cluster_group_search(struct inode *inode, @@ -281,6 +282,7 @@ static inline u16 ocfs2_find_smallest_chain(struct ocfs2_chain_list *cl) static int ocfs2_block_group_alloc(struct ocfs2_super *osb, struct inode *alloc_inode, struct buffer_head *bh, + u64 *last_alloc_group, int flags) { int status, credits; @@ -318,6 +320,11 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb, goto bail; } + if (last_alloc_group && *last_alloc_group != 0) { + mlog(0, "use old allocation group %llu for block group alloc\n", + (unsigned long long)*last_alloc_group); + ac->ac_last_group = *last_alloc_group; + } status = ocfs2_claim_clusters(osb, handle, ac, @@ -392,6 +399,11 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb, alloc_inode->i_blocks = ocfs2_inode_sector_count(alloc_inode); status = 0; + + /* save the new last alloc group so that the caller can cache it. */ + if (last_alloc_group) + *last_alloc_group = ac->ac_last_group; + bail: if (handle) ocfs2_commit_trans(osb, handle); @@ -409,6 +421,7 @@ static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb, struct ocfs2_alloc_context *ac, int type, u32 slot, + u64 *last_alloc_group, int flags) { int status; @@ -473,7 +486,8 @@ static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb, goto bail; } - status = ocfs2_block_group_alloc(osb, alloc_inode, bh, flags); + status = ocfs2_block_group_alloc(osb, alloc_inode, bh, + last_alloc_group, flags); if (status < 0) { if (status != -ENOSPC) mlog_errno(status); @@ -517,7 +531,7 @@ int ocfs2_reserve_new_metadata(struct ocfs2_super *osb, status = ocfs2_reserve_suballoc_bits(osb, (*ac), EXTENT_ALLOC_SYSTEM_INODE, - slot, ALLOC_NEW_GROUP); + slot, NULL, ALLOC_NEW_GROUP); if (status < 0) { if (status != -ENOSPC) mlog_errno(status); @@ -554,7 +568,8 @@ static int ocfs2_steal_inode_from_other_nodes(struct ocfs2_super *osb, status = ocfs2_reserve_suballoc_bits(osb, ac, INODE_ALLOC_SYSTEM_INODE, - slot, NOT_ALLOC_NEW_GROUP); + slot, NULL, + NOT_ALLOC_NEW_GROUP); if (status >= 0) { ocfs2_set_inode_steal_slot(osb, slot); break; @@ -571,6 +586,7 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb, { int status; s16 slot = ocfs2_get_inode_steal_slot(osb); + u64 alloc_group; *ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL); if (!(*ac)) { @@ -598,14 +614,22 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb, goto inode_steal; atomic_set(&osb->s_num_inodes_stolen, 0); + alloc_group = osb->osb_inode_alloc_group; status = ocfs2_reserve_suballoc_bits(osb, *ac, INODE_ALLOC_SYSTEM_INODE, osb->slot_num, + &alloc_group, ALLOC_NEW_GROUP | ALLOC_GROUPS_FROM_GLOBAL); if (status >= 0) { status = 0; + spin_lock(&osb->osb_lock); + osb->osb_inode_alloc_group = alloc_group; + spin_unlock(&osb->osb_lock); + mlog(0, "after reservation, new allocation group is " + "%llu\n", (unsigned long long)alloc_group); + /* * Some inodes must be freed by us, so try to allocate * from our own next time. @@ -652,7 +676,7 @@ int ocfs2_reserve_cluster_bitmap_bits(struct ocfs2_super *osb, status = ocfs2_reserve_suballoc_bits(osb, ac, GLOBAL_BITMAP_SYSTEM_INODE, - OCFS2_INVALID_SLOT, + OCFS2_INVALID_SLOT, NULL, ALLOC_NEW_GROUP); if (status < 0 && status != -ENOSPC) { mlog_errno(status); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 13/26] ocfs2/dlm: Encapsulate adding and removing of mle from dlm->master_list
Mainline commit 1c0845773ad9f4875603b752235aea8aa04565f3 This patch encapsulates adding and removing of the mle from the dlm->master_list. This patch is part of the series of patches that converts the mle list to a mle hash. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 3 +++ fs/ocfs2/dlm/dlmmaster.c | 34 +++++++++++++++++++++++----------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index a7ecccc..036b671 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -1008,6 +1008,9 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res) DLM_LOCK_RES_MIGRATING)); } +void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle); +void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle); + /* create/destroy slab caches */ int dlm_init_master_caches(void); void dlm_destroy_master_caches(void); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 7d4e0ac..0510f70 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -318,6 +318,21 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, __dlm_mle_attach_hb_events(dlm, mle); } +void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) +{ + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + + if (!list_empty(&mle->list)) + list_del_init(&mle->list); +} + +void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) +{ + assert_spin_locked(&dlm->master_lock); + + list_add(&mle->list, &dlm->master_list); +} /* returns 1 if found, 0 if not */ static int dlm_find_mle(struct dlm_ctxt *dlm, @@ -420,8 +435,7 @@ static void dlm_mle_release(struct kref *kref) assert_spin_locked(&dlm->master_lock); /* remove from list if not already */ - if (!list_empty(&mle->list)) - list_del_init(&mle->list); + __dlm_unlink_mle(dlm, mle); /* detach the mle from the domain node up/down events */ __dlm_mle_detach_hb_events(dlm, mle); @@ -843,7 +857,7 @@ lookup: alloc_mle = NULL; dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); set_bit(dlm->node_num, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); /* still holding the dlm spinlock, check the recovery map * to see if there are any nodes that still need to be @@ -1575,7 +1589,7 @@ way_up_top: // "add the block.\n"); dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); response = DLM_MASTER_RESP_NO; } else { // mlog(0, "mle was found\n"); @@ -1967,7 +1981,7 @@ ok: assert->node_idx, rr, extra_ref, mle->inuse); dlm_print_one_mle(mle); } - list_del_init(&mle->list); + __dlm_unlink_mle(dlm, mle); __dlm_mle_detach_hb_events(dlm, mle); __dlm_put_mle(mle); if (extra_ref) { @@ -3159,10 +3173,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, tmp->master = master; atomic_set(&tmp->woken, 1); wake_up(&tmp->wq); - /* remove it from the list so that only one - * mle will be found */ - list_del_init(&tmp->list); - /* this was obviously WRONG. mle is uninited here. should be tmp. */ + /* remove it so that only one mle will be found */ + __dlm_unlink_mle(dlm, tmp); __dlm_mle_detach_hb_events(dlm, tmp); ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; mlog(0, "%s:%.*s: master=%u, newmaster=%u, " @@ -3181,7 +3193,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); - list_add(&mle->list, &dlm->master_list); + __dlm_insert_mle(dlm, mle); return ret; } @@ -3264,7 +3276,7 @@ top: * list_head while in list_for_each_safe */ __dlm_mle_detach_hb_events(dlm, mle); spin_lock(&mle->spinlock); - list_del_init(&mle->list); + __dlm_unlink_mle(dlm, mle); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); wake_up(&mle->wq); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 14/26] ocfs2/dlm: Clean up struct dlm_lock_name
Mainline commit f77a9a78c3a1d995b3bf948dbcad5c4a1b2302d5 For master mle, the name it stored in the attached lockres in struct qstr. For block and migration mle, the name is stored inline in struct dlm_lock_name. This patch attempts to make struct dlm_lock_name look like a struct qstr. While we could use struct qstr, we don't because we want to avoid having to malloc and free the lockname string as the mle's lifetime is fairly short. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 8 ++-- fs/ocfs2/dlm/dlmdebug.c | 10 +++--- fs/ocfs2/dlm/dlmmaster.c | 79 +++++++++++++++++++++++++-------------------- 3 files changed, 53 insertions(+), 44 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 036b671..00c1dd1 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -56,8 +56,8 @@ enum dlm_mle_type { }; struct dlm_lock_name { - u8 len; - u8 name[DLM_LOCKID_NAME_MAX]; + unsigned int len; + unsigned char name[DLM_LOCKID_NAME_MAX]; }; struct dlm_master_list_entry { @@ -79,8 +79,8 @@ struct dlm_master_list_entry { struct o2hb_callback_func mle_hb_up; struct o2hb_callback_func mle_hb_down; union { - struct dlm_lock_resource *res; - struct dlm_lock_name name; + struct dlm_lock_resource *mleres; + struct dlm_lock_name mlename; } u; }; diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index b32f60a..c82feb7 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -288,15 +288,15 @@ static int dump_mle(struct dlm_master_list_entry *mle, char *buf, int len) { int out = 0; unsigned int namelen; - const char *name; + unsigned char *name; char *mle_type; if (mle->type != DLM_MLE_MASTER) { - namelen = mle->u.name.len; - name = mle->u.name.name; + name = mle->u.mlename.name; + namelen = mle->u.mlename.len; } else { - namelen = mle->u.res->lockname.len; - name = mle->u.res->lockname.name; + name = (unsigned char *)mle->u.mleres->lockname.name; + namelen = mle->u.mleres->lockname.len; } if (mle->type == DLM_MLE_BLOCK) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 0510f70..c388d21 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -68,27 +68,38 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, void *nodemap, u32 flags); static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); +static inline void __dlm_mle_name(struct dlm_master_list_entry *mle, + unsigned char **name, unsigned int *namelen) +{ + BUG_ON(mle->type != DLM_MLE_BLOCK && + mle->type != DLM_MLE_MASTER && + mle->type != DLM_MLE_MIGRATION); + + if (mle->type != DLM_MLE_MASTER) { + *name = mle->u.mlename.name; + *namelen = mle->u.mlename.len; + } else { + *name = (unsigned char *)mle->u.mleres->lockname.name; + *namelen = mle->u.mleres->lockname.len; + } +} + static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, const char *name, unsigned int namelen) { - struct dlm_lock_resource *res; + unsigned char *mlename; + unsigned int mlelen; if (dlm != mle->dlm) return 0; - if (mle->type == DLM_MLE_BLOCK || - mle->type == DLM_MLE_MIGRATION) { - if (namelen != mle->u.name.len || - memcmp(name, mle->u.name.name, namelen)!=0) - return 0; - } else { - res = mle->u.res; - if (namelen != res->lockname.len || - memcmp(res->lockname.name, name, namelen) != 0) - return 0; - } + __dlm_mle_name(mle, &mlename, &mlelen); + + if (namelen != mlelen || memcmp(name, mlename, namelen) != 0) + return 0; + return 1; } @@ -295,17 +306,17 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, mle->new_master = O2NM_MAX_NODES; mle->inuse = 0; + BUG_ON(mle->type != DLM_MLE_BLOCK && + mle->type != DLM_MLE_MASTER && + mle->type != DLM_MLE_MIGRATION); + if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); - mle->u.res = res; - } else if (mle->type == DLM_MLE_BLOCK) { - BUG_ON(!name); - memcpy(mle->u.name.name, name, namelen); - mle->u.name.len = namelen; - } else /* DLM_MLE_MIGRATION */ { + mle->u.mleres = res; + } else { BUG_ON(!name); - memcpy(mle->u.name.name, name, namelen); - mle->u.name.len = namelen; + memcpy(mle->u.mlename.name, name, namelen); + mle->u.mlename.len = namelen; } /* copy off the node_map and register hb callbacks on our copy */ @@ -425,11 +436,11 @@ static void dlm_mle_release(struct kref *kref) if (mle->type != DLM_MLE_MASTER) { mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.name.len, mle->u.name.name, mle->type); + mle->u.mlename.len, mle->u.mlename.name, mle->type); } else { mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.res->lockname.len, - mle->u.res->lockname.name, mle->type); + mle->u.mleres->lockname.len, + mle->u.mleres->lockname.name, mle->type); } assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); @@ -1284,7 +1295,7 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name); mle->type = DLM_MLE_MASTER; - mle->u.res = res; + mle->u.mleres = res; } } } @@ -1323,20 +1334,18 @@ static int dlm_do_master_request(struct dlm_lock_resource *res, struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; int ret, response=0, resend; + unsigned char *mlename; + unsigned int mlenamelen; memset(&request, 0, sizeof(request)); request.node_idx = dlm->node_num; BUG_ON(mle->type == DLM_MLE_MIGRATION); - if (mle->type != DLM_MLE_MASTER) { - request.namelen = mle->u.name.len; - memcpy(request.name, mle->u.name.name, request.namelen); - } else { - request.namelen = mle->u.res->lockname.len; - memcpy(request.name, mle->u.res->lockname.name, - request.namelen); - } + __dlm_mle_name(mle, &mlename, &mlenamelen); + + request.namelen = (u8)mlenamelen; + memcpy(request.name, mlename, request.namelen); again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, @@ -3286,9 +3295,9 @@ top: mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ - hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); - res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len, hash); + hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len); + res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, + mle->u.mlename.len, hash); if (res) { /* unfortunately if we hit this rare case, our * lock ordering is messed. we need to drop -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 15/26] ocfs2/dlm: Refactor dlm_clean_master_list()
Mainline commit c2cd4a44333034203cb198915e2b75c3227d41bf This patch refactors dlm_clean_master_list() so as to make it easier to convert the mle list to a hash. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmmaster.c | 148 ++++++++++++++++++++++++++------------------- 1 files changed, 85 insertions(+), 63 deletions(-) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index c388d21..cde4153 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -3207,12 +3207,87 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, return ret; } +/* + * Sets the owner of the lockres, associated to the mle, to UNKNOWN + */ +static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle) +{ + struct dlm_lock_resource *res; + unsigned int hash; + + /* Find the lockres associated to the mle and set its owner to UNK */ + hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len); + res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, mle->u.mlename.len, + hash); + if (res) { + spin_unlock(&dlm->master_lock); + + /* move lockres onto recovery list */ + spin_lock(&res->spinlock); + dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); + dlm_move_lockres_to_recovery_list(dlm, res); + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + + /* about to get rid of mle, detach from heartbeat */ + __dlm_mle_detach_hb_events(dlm, mle); + + /* dump the mle */ + spin_lock(&dlm->master_lock); + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + } + + return res; +} + +static void dlm_clean_migration_mle(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle) +{ + __dlm_mle_detach_hb_events(dlm, mle); + + spin_lock(&mle->spinlock); + __dlm_unlink_mle(dlm, mle); + atomic_set(&mle->woken, 1); + spin_unlock(&mle->spinlock); + + wake_up(&mle->wq); +} + +static void dlm_clean_block_mle(struct dlm_ctxt *dlm, + struct dlm_master_list_entry *mle, u8 dead_node) +{ + int bit; + + BUG_ON(mle->type != DLM_MLE_BLOCK); + + spin_lock(&mle->spinlock); + bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); + if (bit != dead_node) { + mlog(0, "mle found, but dead node %u would not have been " + "master\n", dead_node); + spin_unlock(&mle->spinlock); + } else { + /* Must drop the refcount by one since the assert_master will + * never arrive. This may result in the mle being unlinked and + * freed, but there may still be a process waiting in the + * dlmlock path which is fine. */ + mlog(0, "node %u was expected master\n", dead_node); + atomic_set(&mle->woken, 1); + spin_unlock(&mle->spinlock); + wake_up(&mle->wq); + + /* Do not need events any longer, so detach from heartbeat */ + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); + } +} void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) { struct dlm_master_list_entry *mle, *next; struct dlm_lock_resource *res; - unsigned int hash; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -3236,30 +3311,7 @@ top: * need to clean up if the dead node would have * been the master. */ if (mle->type == DLM_MLE_BLOCK) { - int bit; - - spin_lock(&mle->spinlock); - bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); - if (bit != dead_node) { - mlog(0, "mle found, but dead node %u would " - "not have been master\n", dead_node); - spin_unlock(&mle->spinlock); - } else { - /* must drop the refcount by one since the - * assert_master will never arrive. this - * may result in the mle being unlinked and - * freed, but there may still be a process - * waiting in the dlmlock path which is fine. */ - mlog(0, "node %u was expected master\n", - dead_node); - atomic_set(&mle->woken, 1); - spin_unlock(&mle->spinlock); - wake_up(&mle->wq); - /* do not need events any longer, so detach - * from heartbeat */ - __dlm_mle_detach_hb_events(dlm, mle); - __dlm_put_mle(mle); - } + dlm_clean_block_mle(dlm, mle, dead_node); continue; } @@ -3280,51 +3332,21 @@ top: /* if we have reached this point, this mle needs to * be removed from the list and freed. */ - - /* remove from the list early. NOTE: unlinking - * list_head while in list_for_each_safe */ - __dlm_mle_detach_hb_events(dlm, mle); - spin_lock(&mle->spinlock); - __dlm_unlink_mle(dlm, mle); - atomic_set(&mle->woken, 1); - spin_unlock(&mle->spinlock); - wake_up(&mle->wq); + dlm_clean_migration_mle(dlm, mle); mlog(0, "%s: node %u died during migration from " "%u to %u!\n", dlm->name, dead_node, mle->master, mle->new_master); - /* if there is a lockres associated with this - * mle, find it and set its owner to UNKNOWN */ - hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len); - res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, - mle->u.mlename.len, hash); - if (res) { - /* unfortunately if we hit this rare case, our - * lock ordering is messed. we need to drop - * the master lock so that we can take the - * lockres lock, meaning that we will have to - * restart from the head of list. */ - spin_unlock(&dlm->master_lock); - - /* move lockres onto recovery list */ - spin_lock(&res->spinlock); - dlm_set_lockres_owner(dlm, res, - DLM_LOCK_RES_OWNER_UNKNOWN); - dlm_move_lockres_to_recovery_list(dlm, res); - spin_unlock(&res->spinlock); - dlm_lockres_put(res); - - /* about to get rid of mle, detach from heartbeat */ - __dlm_mle_detach_hb_events(dlm, mle); - - /* dump the mle */ - spin_lock(&dlm->master_lock); - __dlm_put_mle(mle); - spin_unlock(&dlm->master_lock); + /* If we find a lockres associated with the mle, we've + * hit this rare case that messes up our lock ordering. + * If so, we need to drop the master lock so that we can + * take the lockres lock, meaning that we will have to + * restart from the head of list. */ + res = dlm_reset_mleres_owner(dlm, mle); + if (res) /* restart */ goto top; - } /* this may be the last reference */ __dlm_put_mle(mle); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 16/26] ocfs2/dlm: Create and destroy the dlm->master_hash
Mainline commit e2b66ddcce922529e058cf74d839c4c49c8379a1 This patch adds code to create and destroy the dlm->master_hash. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 8 ++++++++ fs/ocfs2/dlm/dlmdomain.c | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 00c1dd1..d9dba0a 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -151,6 +151,7 @@ struct dlm_ctxt unsigned long recovery_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct dlm_recovery_ctxt reco; spinlock_t master_lock; + struct hlist_head **master_hash; struct list_head master_list; struct list_head mle_hb_events; @@ -195,6 +196,13 @@ static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE); } +static inline struct hlist_head *dlm_master_hash(struct dlm_ctxt *dlm, + unsigned i) +{ + return dlm->master_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + + (i % DLM_BUCKETS_PER_PAGE); +} + /* these keventd work queue items are for less-frequently * called functions that cannot be directly called from the * net message handlers for some reason, usually because diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index c2e192e..7c95956 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -304,6 +304,9 @@ static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) if (dlm->lockres_hash) dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); + if (dlm->master_hash) + dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); + if (dlm->name) kfree(dlm->name); @@ -1534,12 +1537,27 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, for (i = 0; i < DLM_HASH_BUCKETS; i++) INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); + dlm->master_hash = (struct hlist_head **) + dlm_alloc_pagevec(DLM_HASH_PAGES); + if (!dlm->master_hash) { + mlog_errno(-ENOMEM); + dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); + kfree(dlm->name); + kfree(dlm); + dlm = NULL; + goto leave; + } + + for (i = 0; i < DLM_HASH_BUCKETS; i++) + INIT_HLIST_HEAD(dlm_master_hash(dlm, i)); + strcpy(dlm->name, domain); dlm->key = key; dlm->node_num = o2nm_this_node(); ret = dlm_create_debugfs_subroot(dlm); if (ret < 0) { + dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); kfree(dlm->name); kfree(dlm); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 17/26] ocfs2/dlm: Activate dlm->master_hash for master list entries
Mainline commit 2ed6c750d645d09b5948e46fada3ca1fda3157b5 With this patch, the mles are stored in a hash and not a simple list. This should improve the mle lookup time when the number of outstanding masteries is large. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 4 +- fs/ocfs2/dlm/dlmdebug.c | 24 +++++++++++------- fs/ocfs2/dlm/dlmdomain.c | 1 - fs/ocfs2/dlm/dlmmaster.c | 61 ++++++++++++++++++++++++++++++++------------- 4 files changed, 60 insertions(+), 30 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index d9dba0a..3024f60 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -56,12 +56,13 @@ enum dlm_mle_type { }; struct dlm_lock_name { + unsigned int hash; unsigned int len; unsigned char name[DLM_LOCKID_NAME_MAX]; }; struct dlm_master_list_entry { - struct list_head list; + struct hlist_node master_hash_node; struct list_head hb_events; struct dlm_ctxt *dlm; spinlock_t spinlock; @@ -152,7 +153,6 @@ struct dlm_ctxt struct dlm_recovery_ctxt reco; spinlock_t master_lock; struct hlist_head **master_hash; - struct list_head master_list; struct list_head mle_hb_events; /* these give a really vague idea of the system load */ diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index c82feb7..336a98e 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -501,18 +501,25 @@ static struct file_operations debug_purgelist_fops = { static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) { struct dlm_master_list_entry *mle; - int out = 0; + struct hlist_head *bucket; + struct hlist_node *list; + int i, out = 0; unsigned long total = 0; out += snprintf(db->buf + out, db->len - out, "Dumping MLEs for Domain: %s\n", dlm->name); spin_lock(&dlm->master_lock); - list_for_each_entry(mle, &dlm->master_list, list) { - ++total; - if (db->len - out < 200) - continue; - out += dump_mle(mle, db->buf + out, db->len - out); + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each(list, bucket) { + mle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); + ++total; + if (db->len - out < 200) + continue; + out += dump_mle(mle, db->buf + out, db->len - out); + } } spin_unlock(&dlm->master_lock); @@ -813,12 +820,11 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) /* Lists: Dirty=Empty Purge=InUse PendingASTs=Empty ... */ out += snprintf(db->buf + out, db->len - out, "Lists: Dirty=%s Purge=%s PendingASTs=%s " - "PendingBASTs=%s Master=%s\n", + "PendingBASTs=%s\n", (list_empty(&dlm->dirty_list) ? "Empty" : "InUse"), (list_empty(&dlm->purge_list) ? "Empty" : "InUse"), (list_empty(&dlm->pending_asts) ? "Empty" : "InUse"), - (list_empty(&dlm->pending_basts) ? "Empty" : "InUse"), - (list_empty(&dlm->master_list) ? "Empty" : "InUse")); + (list_empty(&dlm->pending_basts) ? "Empty" : "InUse")); /* Purge Count: xxx Refs: xxx */ out += snprintf(db->buf + out, db->len - out, diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 7c95956..780e0d7 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -1597,7 +1597,6 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, init_waitqueue_head(&dlm->reco.event); init_waitqueue_head(&dlm->ast_wq); init_waitqueue_head(&dlm->migration_wq); - INIT_LIST_HEAD(&dlm->master_list); INIT_LIST_HEAD(&dlm->mle_hb_events); dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index cde4153..c50eabe 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -69,7 +69,8 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); static inline void __dlm_mle_name(struct dlm_master_list_entry *mle, - unsigned char **name, unsigned int *namelen) + unsigned char **name, unsigned int *namelen, + unsigned int *namehash) { BUG_ON(mle->type != DLM_MLE_BLOCK && mle->type != DLM_MLE_MASTER && @@ -78,9 +79,13 @@ static inline void __dlm_mle_name(struct dlm_master_list_entry *mle, if (mle->type != DLM_MLE_MASTER) { *name = mle->u.mlename.name; *namelen = mle->u.mlename.len; + if (namehash) + *namehash = mle->u.mlename.hash; } else { *name = (unsigned char *)mle->u.mleres->lockname.name; *namelen = mle->u.mleres->lockname.len; + if (namehash) + *namehash = mle->u.mleres->lockname.hash; } } @@ -95,7 +100,7 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, if (dlm != mle->dlm) return 0; - __dlm_mle_name(mle, &mlename, &mlelen); + __dlm_mle_name(mle, &mlename, &mlelen, NULL); if (namelen != mlelen || memcmp(name, mlename, namelen) != 0) return 0; @@ -294,7 +299,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, mle->dlm = dlm; mle->type = type; - INIT_LIST_HEAD(&mle->list); + INIT_HLIST_NODE(&mle->master_hash_node); INIT_LIST_HEAD(&mle->hb_events); memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); spin_lock_init(&mle->spinlock); @@ -317,6 +322,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, BUG_ON(!name); memcpy(mle->u.mlename.name, name, namelen); mle->u.mlename.len = namelen; + mle->u.mlename.hash = dlm_lockid_hash(name, namelen); } /* copy off the node_map and register hb callbacks on our copy */ @@ -334,15 +340,21 @@ void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - if (!list_empty(&mle->list)) - list_del_init(&mle->list); + if (!hlist_unhashed(&mle->master_hash_node)) + hlist_del_init(&mle->master_hash_node); } void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) { + struct hlist_head *bucket; + unsigned char *mname; + unsigned int mlen, hash; + assert_spin_locked(&dlm->master_lock); - list_add(&mle->list, &dlm->master_list); + __dlm_mle_name(mle, &mname, &mlen, &hash); + bucket = dlm_master_hash(dlm, hash); + hlist_add_head(&mle->master_hash_node, bucket); } /* returns 1 if found, 0 if not */ @@ -351,10 +363,17 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, char *name, unsigned int namelen) { struct dlm_master_list_entry *tmpmle; + struct hlist_head *bucket; + struct hlist_node *list; + unsigned int hash; assert_spin_locked(&dlm->master_lock); - list_for_each_entry(tmpmle, &dlm->master_list, list) { + hash = dlm_lockid_hash(name, namelen); + bucket = dlm_master_hash(dlm, hash); + hlist_for_each(list, bucket) { + tmpmle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) continue; dlm_get_mle(tmpmle); @@ -428,23 +447,20 @@ static void dlm_mle_release(struct kref *kref) { struct dlm_master_list_entry *mle; struct dlm_ctxt *dlm; + unsigned char *mname; + unsigned int mlen; mlog_entry_void(); mle = container_of(kref, struct dlm_master_list_entry, mle_refs); dlm = mle->dlm; - if (mle->type != DLM_MLE_MASTER) { - mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.mlename.len, mle->u.mlename.name, mle->type); - } else { - mlog(0, "calling mle_release for %.*s, type %d\n", - mle->u.mleres->lockname.len, - mle->u.mleres->lockname.name, mle->type); - } assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); + __dlm_mle_name(mle, &mname, &mlen, NULL); + mlog(0, "Releasing mle for %.*s, type %d\n", mlen, mname, mle->type); + /* remove from list if not already */ __dlm_unlink_mle(dlm, mle); @@ -1342,7 +1358,7 @@ static int dlm_do_master_request(struct dlm_lock_resource *res, BUG_ON(mle->type == DLM_MLE_MIGRATION); - __dlm_mle_name(mle, &mlename, &mlenamelen); + __dlm_mle_name(mle, &mlename, &mlenamelen, NULL); request.namelen = (u8)mlenamelen; memcpy(request.name, mlename, request.namelen); @@ -3286,8 +3302,11 @@ static void dlm_clean_block_mle(struct dlm_ctxt *dlm, void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) { - struct dlm_master_list_entry *mle, *next; + struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; + struct hlist_head *bucket; + struct hlist_node *list; + unsigned int i; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -3295,7 +3314,12 @@ top: /* clean the master list */ spin_lock(&dlm->master_lock); - list_for_each_entry_safe(mle, next, &dlm->master_list, list) { + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each(list, bucket) { + mle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); + BUG_ON(mle->type != DLM_MLE_BLOCK && mle->type != DLM_MLE_MASTER && mle->type != DLM_MLE_MIGRATION); @@ -3351,6 +3375,7 @@ top: /* this may be the last reference */ __dlm_put_mle(mle); } + } spin_unlock(&dlm->master_lock); } -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 18/26] ocfs2/dlm: Indent dlm_cleanup_master_list()
Mainline commit 67ae1f0604da3bcf3ed6dec59ac71d07e54a404c The previous patch explicitly did not indent dlm_cleanup_master_list() so as to make the patch readable. This patch properly indents the function. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmmaster.c | 106 ++++++++++++++++++++++----------------------- 1 files changed, 52 insertions(+), 54 deletions(-) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index c50eabe..699c6d0 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -3320,66 +3320,64 @@ top: mle = hlist_entry(list, struct dlm_master_list_entry, master_hash_node); - BUG_ON(mle->type != DLM_MLE_BLOCK && - mle->type != DLM_MLE_MASTER && - mle->type != DLM_MLE_MIGRATION); - - /* MASTER mles are initiated locally. the waiting - * process will notice the node map change - * shortly. let that happen as normal. */ - if (mle->type == DLM_MLE_MASTER) - continue; - + BUG_ON(mle->type != DLM_MLE_BLOCK && + mle->type != DLM_MLE_MASTER && + mle->type != DLM_MLE_MIGRATION); + + /* MASTER mles are initiated locally. The waiting + * process will notice the node map change shortly. + * Let that happen as normal. */ + if (mle->type == DLM_MLE_MASTER) + continue; + + /* BLOCK mles are initiated by other nodes. Need to + * clean up if the dead node would have been the + * master. */ + if (mle->type == DLM_MLE_BLOCK) { + dlm_clean_block_mle(dlm, mle, dead_node); + continue; + } - /* BLOCK mles are initiated by other nodes. - * need to clean up if the dead node would have - * been the master. */ - if (mle->type == DLM_MLE_BLOCK) { - dlm_clean_block_mle(dlm, mle, dead_node); - continue; + /* Everything else is a MIGRATION mle */ + + /* The rule for MIGRATION mles is that the master + * becomes UNKNOWN if *either* the original or the new + * master dies. All UNKNOWN lockres' are sent to + * whichever node becomes the recovery master. The new + * master is responsible for determining if there is + * still a master for this lockres, or if he needs to + * take over mastery. Either way, this node should + * expect another message to resolve this. */ + + if (mle->master != dead_node && + mle->new_master != dead_node) + continue; + + /* If we have reached this point, this mle needs to be + * removed from the list and freed. */ + dlm_clean_migration_mle(dlm, mle); + + mlog(0, "%s: node %u died during migration from " + "%u to %u!\n", dlm->name, dead_node, mle->master, + mle->new_master); + + /* If we find a lockres associated with the mle, we've + * hit this rare case that messes up our lock ordering. + * If so, we need to drop the master lock so that we can + * take the lockres lock, meaning that we will have to + * restart from the head of list. */ + res = dlm_reset_mleres_owner(dlm, mle); + if (res) + /* restart */ + goto top; + + /* This may be the last reference */ + __dlm_put_mle(mle); } - - /* everything else is a MIGRATION mle */ - - /* the rule for MIGRATION mles is that the master - * becomes UNKNOWN if *either* the original or - * the new master dies. all UNKNOWN lockreses - * are sent to whichever node becomes the recovery - * master. the new master is responsible for - * determining if there is still a master for - * this lockres, or if he needs to take over - * mastery. either way, this node should expect - * another message to resolve this. */ - if (mle->master != dead_node && - mle->new_master != dead_node) - continue; - - /* if we have reached this point, this mle needs to - * be removed from the list and freed. */ - dlm_clean_migration_mle(dlm, mle); - - mlog(0, "%s: node %u died during migration from " - "%u to %u!\n", dlm->name, dead_node, - mle->master, mle->new_master); - - /* If we find a lockres associated with the mle, we've - * hit this rare case that messes up our lock ordering. - * If so, we need to drop the master lock so that we can - * take the lockres lock, meaning that we will have to - * restart from the head of list. */ - res = dlm_reset_mleres_owner(dlm, mle); - if (res) - /* restart */ - goto top; - - /* this may be the last reference */ - __dlm_put_mle(mle); - } } spin_unlock(&dlm->master_lock); } - int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 old_master) { -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 19/26] ocfs2/dlm: Track number of mles
Mainline commit 2041d8fdcec7603401829f60810c1dbd5e96c043 The lifetime of a mle is limited to the duration of the lockres mastery process. While typically this lifetime is fairly short, we have noticed the number of mles explode under certain circumstances. This patch tracks the number of each different types of mles and should help us determine how best to speed up the mastery process. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 5 ++++- fs/ocfs2/dlm/dlmdomain.c | 5 +++++ fs/ocfs2/dlm/dlmmaster.c | 5 +++++ 3 files changed, 14 insertions(+), 1 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 3024f60..c339172 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -52,7 +52,8 @@ enum dlm_mle_type { DLM_MLE_BLOCK, DLM_MLE_MASTER, - DLM_MLE_MIGRATION + DLM_MLE_MIGRATION, + DLM_MLE_NUM_TYPES }; struct dlm_lock_name { @@ -156,6 +157,8 @@ struct dlm_ctxt struct list_head mle_hb_events; /* these give a really vague idea of the system load */ + atomic_t mle_tot_count[DLM_MLE_NUM_TYPES]; + atomic_t mle_cur_count[DLM_MLE_NUM_TYPES]; atomic_t local_resources; atomic_t remote_resources; atomic_t unknown_resources; diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 780e0d7..49a3ae0 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -1608,6 +1608,11 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, atomic_set(&dlm->remote_resources, 0); atomic_set(&dlm->unknown_resources, 0); + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { + atomic_set(&dlm->mle_tot_count[i], 0); + atomic_set(&dlm->mle_cur_count[i], 0); + } + spin_lock_init(&dlm->work_lock); INIT_LIST_HEAD(&dlm->work_list); KAPI_INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 699c6d0..c53041b 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -325,6 +325,9 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, mle->u.mlename.hash = dlm_lockid_hash(name, namelen); } + atomic_inc(&dlm->mle_tot_count[mle->type]); + atomic_inc(&dlm->mle_cur_count[mle->type]); + /* copy off the node_map and register hb callbacks on our copy */ memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map)); memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map)); @@ -467,6 +470,8 @@ static void dlm_mle_release(struct kref *kref) /* detach the mle from the domain node up/down events */ __dlm_mle_detach_hb_events(dlm, mle); + atomic_dec(&dlm->mle_cur_count[mle->type]); + /* NOTE: kfree under spinlock here. * if this is bad, we can move this to a freelist. */ kmem_cache_free(dlm_mle_cache, mle); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 20/26] ocfs2/dlm: Improve lockres counts
Mainline commit 6800791ab773453bdec337efb3f0cec6557f3bb3 This patch replaces the lockres counts that tracked the number number of locally and remotely mastered lockres' with a current and total count. The total count is the number of lockres' that have been created since the dlm domain was created. The number of locally and remotely mastered counts can be computed using the locking_state output. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 5 ++--- fs/ocfs2/dlm/dlmdebug.c | 12 ------------ fs/ocfs2/dlm/dlmdomain.c | 5 ++--- fs/ocfs2/dlm/dlmmaster.c | 27 +++++++-------------------- 4 files changed, 11 insertions(+), 38 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index c339172..ff47c7f 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -159,9 +159,8 @@ struct dlm_ctxt /* these give a really vague idea of the system load */ atomic_t mle_tot_count[DLM_MLE_NUM_TYPES]; atomic_t mle_cur_count[DLM_MLE_NUM_TYPES]; - atomic_t local_resources; - atomic_t remote_resources; - atomic_t unknown_resources; + atomic_t res_tot_count; + atomic_t res_cur_count; struct dlm_debug_ctxt *dlm_debug_ctxt; struct dentry *dlm_debugfs_subroot; diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index 336a98e..d7decaa 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -763,12 +763,6 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) int out = 0; struct dlm_reco_node_data *node; char *state; - int lres, rres, ures, tres; - - lres = atomic_read(&dlm->local_resources); - rres = atomic_read(&dlm->remote_resources); - ures = atomic_read(&dlm->unknown_resources); - tres = lres + rres + ures; spin_lock(&dlm->spinlock); @@ -811,12 +805,6 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) db->buf + out, db->len - out); out += snprintf(db->buf + out, db->len - out, "\n"); - /* Mastered Resources Total: xxx Locally: xxx Remotely: ... */ - out += snprintf(db->buf + out, db->len - out, - "Mastered Resources Total: %d Locally: %d " - "Remotely: %d Unknown: %d\n", - tres, lres, rres, ures); - /* Lists: Dirty=Empty Purge=InUse PendingASTs=Empty ... */ out += snprintf(db->buf + out, db->len - out, "Lists: Dirty=%s Purge=%s PendingASTs=%s " diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 49a3ae0..d96a122 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -1604,10 +1604,9 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, dlm->reco.new_master = O2NM_INVALID_NODE_NUM; dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; - atomic_set(&dlm->local_resources, 0); - atomic_set(&dlm->remote_resources, 0); - atomic_set(&dlm->unknown_resources, 0); + atomic_set(&dlm->res_tot_count, 0); + atomic_set(&dlm->res_cur_count, 0); for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { atomic_set(&dlm->mle_tot_count[i], 0); atomic_set(&dlm->mle_cur_count[i], 0); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index c53041b..afd2948 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -517,15 +517,6 @@ static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, { assert_spin_locked(&res->spinlock); - mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner); - - if (owner == dlm->node_num) - atomic_inc(&dlm->local_resources); - else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN) - atomic_inc(&dlm->unknown_resources); - else - atomic_inc(&dlm->remote_resources); - res->owner = owner; } @@ -534,17 +525,8 @@ void dlm_change_lockres_owner(struct dlm_ctxt *dlm, { assert_spin_locked(&res->spinlock); - if (owner == res->owner) - return; - - if (res->owner == dlm->node_num) - atomic_dec(&dlm->local_resources); - else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) - atomic_dec(&dlm->unknown_resources); - else - atomic_dec(&dlm->remote_resources); - - dlm_set_lockres_owner(dlm, res, owner); + if (owner != res->owner) + dlm_set_lockres_owner(dlm, res, owner); } @@ -573,6 +555,8 @@ static void dlm_lockres_release(struct kref *kref) } spin_unlock(&dlm->track_lock); + atomic_dec(&dlm->res_cur_count); + dlm_put(dlm); if (!hlist_unhashed(&res->hash_node) || @@ -653,6 +637,9 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, kref_init(&res->refs); + atomic_inc(&dlm->res_tot_count); + atomic_inc(&dlm->res_cur_count); + /* just for consistency */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 21/26] ocfs2/dlm: dlm_set_lockres_owner() and dlm_change_lockres_owner() inlined
Mainline commit 7d62a978a8c85cd82301615840d744f0d83b87e7 This patch inlines dlm_set_lockres_owner() and dlm_change_lockres_owner(). Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 21 ++++++++++++++++++--- fs/ocfs2/dlm/dlmmaster.c | 19 ------------------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index ff47c7f..06aa4ea 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -858,9 +858,7 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, unsigned int len); int dlm_is_host_down(int errno); -void dlm_change_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 owner); + struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, const char *lockid, int namelen, @@ -1123,6 +1121,23 @@ static inline int dlm_node_iter_next(struct dlm_node_iter *iter) return bit; } +static inline void dlm_set_lockres_owner(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 owner) +{ + assert_spin_locked(&res->spinlock); + + res->owner = owner; +} +static inline void dlm_change_lockres_owner(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 owner) +{ + assert_spin_locked(&res->spinlock); + + if (owner != res->owner) + dlm_set_lockres_owner(dlm, res, owner); +} #endif /* DLMCOMMON_H */ diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index afd2948..6e8a372 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -511,25 +511,6 @@ void dlm_destroy_master_caches(void) kmem_cache_destroy(dlm_lockres_cache); } -static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 owner) -{ - assert_spin_locked(&res->spinlock); - - res->owner = owner; -} - -void dlm_change_lockres_owner(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, u8 owner) -{ - assert_spin_locked(&res->spinlock); - - if (owner != res->owner) - dlm_set_lockres_owner(dlm, res, owner); -} - - static void dlm_lockres_release(struct kref *kref) { struct dlm_lock_resource *res; -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 22/26] ocfs2/dlm: Show the number of lockres/mles in dlm_state
Mainline commit e64ff14607ac90b2f3349550a41cc8dc0c0b1324 This patch shows the number of lockres' and mles in the debugfs file, dlm_state. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmdebug.c | 36 ++++++++++++++++++++++++++++++++++++ 1 files changed, 36 insertions(+), 0 deletions(-) diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index d7decaa..bf9fa27 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -763,6 +763,8 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) int out = 0; struct dlm_reco_node_data *node; char *state; + int cur_mles = 0, tot_mles = 0; + int i; spin_lock(&dlm->spinlock); @@ -805,6 +807,40 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) db->buf + out, db->len - out); out += snprintf(db->buf + out, db->len - out, "\n"); + /* Lock Resources: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + "Lock Resources: %d (%d)\n", + atomic_read(&dlm->res_cur_count), + atomic_read(&dlm->res_tot_count)); + + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) + tot_mles += atomic_read(&dlm->mle_tot_count[i]); + + for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) + cur_mles += atomic_read(&dlm->mle_cur_count[i]); + + /* MLEs: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + "MLEs: %d (%d)\n", cur_mles, tot_mles); + + /* Blocking: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + " Blocking: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_BLOCK]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_BLOCK])); + + /* Mastery: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + " Mastery: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_MASTER]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_MASTER])); + + /* Migration: xxx (xxx) */ + out += snprintf(db->buf + out, db->len - out, + " Migration: %d (%d)\n", + atomic_read(&dlm->mle_cur_count[DLM_MLE_MIGRATION]), + atomic_read(&dlm->mle_tot_count[DLM_MLE_MIGRATION])); + /* Lists: Dirty=Empty Purge=InUse PendingASTs=Empty ... */ out += snprintf(db->buf + out, db->len - out, "Lists: Dirty=%s Purge=%s PendingASTs=%s " -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 23/26] ocfs2/dlm: Remove struct dlm_lock_name in struct dlm_master_list_entry
Mainline commit 7141514b8307734c117f100c4a3637887c5def45 This patch removes struct dlm_lock_name and adds the entries directly to struct dlm_master_list_entry. Under the new scheme, both mles that are backed by a lockres or not, will have the name populated in mle->mname. This allows us to get rid of code that was figuring out the location of the mle name. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmcommon.h | 14 +++------- fs/ocfs2/dlm/dlmdebug.c | 12 +------- fs/ocfs2/dlm/dlmmaster.c | 68 ++++++++++++--------------------------------- 3 files changed, 23 insertions(+), 71 deletions(-) diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 06aa4ea..0bb42c6 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -56,12 +56,6 @@ enum dlm_mle_type { DLM_MLE_NUM_TYPES }; -struct dlm_lock_name { - unsigned int hash; - unsigned int len; - unsigned char name[DLM_LOCKID_NAME_MAX]; -}; - struct dlm_master_list_entry { struct hlist_node master_hash_node; struct list_head hb_events; @@ -80,10 +74,10 @@ struct dlm_master_list_entry { enum dlm_mle_type type; struct o2hb_callback_func mle_hb_up; struct o2hb_callback_func mle_hb_down; - union { - struct dlm_lock_resource *mleres; - struct dlm_lock_name mlename; - } u; + struct dlm_lock_resource *mleres; + unsigned char mname[DLM_LOCKID_NAME_MAX]; + unsigned int mnamelen; + unsigned int mnamehash; }; enum dlm_ast_type { diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index bf9fa27..bdf1c78 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -287,18 +287,8 @@ static int stringify_nodemap(unsigned long *nodemap, int maxnodes, static int dump_mle(struct dlm_master_list_entry *mle, char *buf, int len) { int out = 0; - unsigned int namelen; - unsigned char *name; char *mle_type; - if (mle->type != DLM_MLE_MASTER) { - name = mle->u.mlename.name; - namelen = mle->u.mlename.len; - } else { - name = (unsigned char *)mle->u.mleres->lockname.name; - namelen = mle->u.mleres->lockname.len; - } - if (mle->type == DLM_MLE_BLOCK) mle_type = "BLK"; else if (mle->type == DLM_MLE_MASTER) @@ -306,7 +296,7 @@ static int dump_mle(struct dlm_master_list_entry *mle, char *buf, int len) else mle_type = "MIG"; - out += stringify_lockname(name, namelen, buf + out, len - out); + out += stringify_lockname(mle->mname, mle->mnamelen, buf + out, len - out); out += snprintf(buf + out, len - out, "\t%3s\tmas=%3u\tnew=%3u\tevt=%1d\tuse=%1d\tref=%3d\n", mle_type, mle->master, mle->new_master, diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 6e8a372..66d235b 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -68,41 +68,16 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, void *nodemap, u32 flags); static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); -static inline void __dlm_mle_name(struct dlm_master_list_entry *mle, - unsigned char **name, unsigned int *namelen, - unsigned int *namehash) -{ - BUG_ON(mle->type != DLM_MLE_BLOCK && - mle->type != DLM_MLE_MASTER && - mle->type != DLM_MLE_MIGRATION); - - if (mle->type != DLM_MLE_MASTER) { - *name = mle->u.mlename.name; - *namelen = mle->u.mlename.len; - if (namehash) - *namehash = mle->u.mlename.hash; - } else { - *name = (unsigned char *)mle->u.mleres->lockname.name; - *namelen = mle->u.mleres->lockname.len; - if (namehash) - *namehash = mle->u.mleres->lockname.hash; - } -} - static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, const char *name, unsigned int namelen) { - unsigned char *mlename; - unsigned int mlelen; - if (dlm != mle->dlm) return 0; - __dlm_mle_name(mle, &mlename, &mlelen, NULL); - - if (namelen != mlelen || memcmp(name, mlename, namelen) != 0) + if (namelen != mle->mnamelen || + memcmp(name, mle->mname, namelen) != 0) return 0; return 1; @@ -317,12 +292,16 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); - mle->u.mleres = res; + mle->mleres = res; + memcpy(mle->mname, res->lockname.name, res->lockname.len); + mle->mnamelen = res->lockname.len; + mle->mnamehash = res->lockname.hash; } else { BUG_ON(!name); - memcpy(mle->u.mlename.name, name, namelen); - mle->u.mlename.len = namelen; - mle->u.mlename.hash = dlm_lockid_hash(name, namelen); + mle->mleres = NULL; + memcpy(mle->mname, name, namelen); + mle->mnamelen = namelen; + mle->mnamehash = dlm_lockid_hash(name, namelen); } atomic_inc(&dlm->mle_tot_count[mle->type]); @@ -350,13 +329,10 @@ void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) { struct hlist_head *bucket; - unsigned char *mname; - unsigned int mlen, hash; assert_spin_locked(&dlm->master_lock); - __dlm_mle_name(mle, &mname, &mlen, &hash); - bucket = dlm_master_hash(dlm, hash); + bucket = dlm_master_hash(dlm, mle->mnamehash); hlist_add_head(&mle->master_hash_node, bucket); } @@ -450,8 +426,6 @@ static void dlm_mle_release(struct kref *kref) { struct dlm_master_list_entry *mle; struct dlm_ctxt *dlm; - unsigned char *mname; - unsigned int mlen; mlog_entry_void(); @@ -461,8 +435,8 @@ static void dlm_mle_release(struct kref *kref) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - __dlm_mle_name(mle, &mname, &mlen, NULL); - mlog(0, "Releasing mle for %.*s, type %d\n", mlen, mname, mle->type); + mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname, + mle->type); /* remove from list if not already */ __dlm_unlink_mle(dlm, mle); @@ -1284,7 +1258,7 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name); mle->type = DLM_MLE_MASTER; - mle->u.mleres = res; + mle->mleres = res; } } } @@ -1323,18 +1297,14 @@ static int dlm_do_master_request(struct dlm_lock_resource *res, struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; int ret, response=0, resend; - unsigned char *mlename; - unsigned int mlenamelen; memset(&request, 0, sizeof(request)); request.node_idx = dlm->node_num; BUG_ON(mle->type == DLM_MLE_MIGRATION); - __dlm_mle_name(mle, &mlename, &mlenamelen, NULL); - - request.namelen = (u8)mlenamelen; - memcpy(request.name, mlename, request.namelen); + request.namelen = (u8)mle->mnamelen; + memcpy(request.name, mle->mname, request.namelen); again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, @@ -3203,12 +3173,10 @@ static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle) { struct dlm_lock_resource *res; - unsigned int hash; /* Find the lockres associated to the mle and set its owner to UNK */ - hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len); - res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, mle->u.mlename.len, - hash); + res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen, + mle->mnamehash); if (res) { spin_unlock(&dlm->master_lock); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 24/26] ocfs2/dlm: Do not purge lockres that is being migrated dlm_purge_lockres()
Mainline commit 516b7e52abc7efd61c084b217c61985a403828ed This patch attempts to fix a fine race between purging and migration. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmthread.c | 20 ++++++++++++++++++-- 1 files changed, 18 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 4060bb3..d490b66 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -162,12 +162,28 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); if (!__dlm_lockres_unused(res)) { - spin_unlock(&res->spinlock); mlog(0, "%s:%.*s: tried to purge but not unused\n", dlm->name, res->lockname.len, res->lockname.name); - return -ENOTEMPTY; + __dlm_print_one_lock_resource(res); + spin_unlock(&res->spinlock); + BUG(); } + + if (res->state & DLM_LOCK_RES_MIGRATING) { + mlog(0, "%s:%.*s: Delay dropref as this lockres is " + "being remastered\n", dlm->name, res->lockname.len, + res->lockname.name); + /* Re-add the lockres to the end of the purge list */ + if (!list_empty(&res->purge)) { + list_del_init(&res->purge); + list_add_tail(&res->purge, &dlm->purge_list); + } + spin_unlock(&res->spinlock); + return 0; + } + master = (res->owner == dlm->node_num); + if (!master) res->state |= DLM_LOCK_RES_DROPPING_REF; spin_unlock(&res->spinlock); -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 25/26] ocfs2/dlm: Tweak mle_state output
Mainline commit 9405dccfd3201d2b76e120949bec81ba8cfbd2d0 The debugfs file, mle_state, now prints the number of largest number of mles in one hash link. Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com> Signed-off-by: Mark Fasheh <mfasheh at suse.com> --- fs/ocfs2/dlm/dlmdebug.c | 7 +++++-- 1 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index bdf1c78..df52f70 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -494,7 +494,7 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) struct hlist_head *bucket; struct hlist_node *list; int i, out = 0; - unsigned long total = 0; + unsigned long total = 0, longest = 0, bktcnt; out += snprintf(db->buf + out, db->len - out, "Dumping MLEs for Domain: %s\n", dlm->name); @@ -506,15 +506,18 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) mle = hlist_entry(list, struct dlm_master_list_entry, master_hash_node); ++total; + ++bktcnt; if (db->len - out < 200) continue; out += dump_mle(mle, db->buf + out, db->len - out); } + longest = max(longest, bktcnt); + bktcnt = 0; } spin_unlock(&dlm->master_lock); out += snprintf(db->buf + out, db->len - out, - "Total on list: %ld\n", total); + "Total: %ld, Longest: %ld\n", total, longest); return out; } -- 1.5.6.3
Sunil Mushran
2009-Apr-17 20:37 UTC
[Ocfs2-devel] [PATCH 26/26] ocfs2: recover orphans in offline slots during recovery and mount
From: Srinivas Eeda <srinivas.eeda at oracle.com> Mainline commit 9140db04ef185f934acf2b1b15b3dd5e6a6bfc22 During recovery, a node recovers orphans in it's slot and the dead node(s). But if the dead nodes were holding orphans in offline slots, they will be left unrecovered. If the dead node is the last one to die and is holding orphans in other slots and is the first one to mount, then it only recovers it's own slot, which leaves orphans in offline slots. This patch queues complete_recovery to clean orphans for all offline slots during mount and node recovery. Signed-off-by: Srinivas Eeda <srinivas.eeda at oracle.com> --- fs/ocfs2/journal.c | 141 +++++++++++++++++++++++++++++++++++++++++++++------- fs/ocfs2/journal.h | 2 + fs/ocfs2/ocfs2.h | 2 + fs/ocfs2/super.c | 6 ++ 4 files changed, 133 insertions(+), 18 deletions(-) diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index 03fb378..484ccb5 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -63,6 +63,102 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb, static int ocfs2_recover_orphans(struct ocfs2_super *osb, int slot); static int ocfs2_commit_thread(void *arg); +static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, + int slot_num, + struct ocfs2_dinode *la_dinode, + struct ocfs2_dinode *tl_dinode); + +/* + * This replay_map is to track online/offline slots, so we could recover + * offline slots during recovery and mount + */ + +enum ocfs2_replay_state { + REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */ + REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */ + REPLAY_DONE /* Replay was already queued */ +}; + +struct ocfs2_replay_map { + unsigned int rm_slots; + enum ocfs2_replay_state rm_state; + unsigned char rm_replay_slots[0]; +}; + +void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) +{ + if (!osb->replay_map) + return; + + /* If we've already queued the replay, we don't have any more to do */ + if (osb->replay_map->rm_state == REPLAY_DONE) + return; + + osb->replay_map->rm_state = state; +} + +int ocfs2_compute_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map; + struct ocfs2_slot_info *si = osb->slot_info; + int i; + + /* If replay map is already set, we don't do it again */ + if (osb->replay_map) + return 0; + + replay_map = kzalloc(sizeof(struct ocfs2_replay_map) + + (osb->max_slots * sizeof(char)), GFP_KERNEL); + + if (!replay_map) { + mlog_errno(-ENOMEM); + return -ENOMEM; + } + + spin_lock(&osb->osb_lock); + + replay_map->rm_slots = osb->max_slots; + replay_map->rm_state = REPLAY_UNNEEDED; + + /* set rm_replay_slots for offline slot(s) */ + for (i = 0; i < replay_map->rm_slots; i++) { + if (si->si_global_node_nums[i] == OCFS2_INVALID_SLOT) + replay_map->rm_replay_slots[i] = 1; + } + + osb->replay_map = replay_map; + spin_unlock(&osb->osb_lock); + return 0; +} + +void ocfs2_queue_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map = osb->replay_map; + int i; + + if (!replay_map) + return; + + if (replay_map->rm_state != REPLAY_NEEDED) + return; + + for (i = 0; i < replay_map->rm_slots; i++) + if (replay_map->rm_replay_slots[i]) + ocfs2_queue_recovery_completion(osb->journal, i, NULL, + NULL); + replay_map->rm_state = REPLAY_DONE; +} + +void ocfs2_free_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map = osb->replay_map; + + if (!osb->replay_map) + return; + + kfree(replay_map); + osb->replay_map = NULL; +} static int ocfs2_commit_cache(struct ocfs2_super *osb) { @@ -840,23 +936,24 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, } /* Called by the mount code to queue recovery the last part of - * recovery for it's own slot. */ + * recovery for it's own and offline slot(s). */ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) { struct ocfs2_journal *journal = osb->journal; - if (osb->dirty) { - /* No need to queue up our truncate_log as regular - * cleanup will catch that. */ - ocfs2_queue_recovery_completion(journal, - osb->slot_num, - osb->local_alloc_copy, - NULL); - ocfs2_schedule_truncate_log_flush(osb, 0); - - osb->local_alloc_copy = NULL; - osb->dirty = 0; - } + /* No need to queue up our truncate_log as regular cleanup will catch + * that. */ + ocfs2_queue_recovery_completion(journal, osb->slot_num, + osb->local_alloc_copy, NULL); + ocfs2_schedule_truncate_log_flush(osb, 0); + + osb->local_alloc_copy = NULL; + osb->dirty = 0; + + /* queue to recover orphan slots for all offline slots */ + ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); + ocfs2_queue_replay_slots(osb); + ocfs2_free_replay_slots(osb); } static int __ocfs2_recovery_thread(void *arg) @@ -878,6 +975,13 @@ restart: goto bail; } + status = ocfs2_compute_replay_slots(osb); + if (status < 0) + mlog_errno(status); + + /* queue recovery for our own slot */ + ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, NULL); + while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { node_num = ocfs2_node_map_first_set_bit(osb, &osb->recovery_map); @@ -907,11 +1011,8 @@ restart: ocfs2_super_unlock(osb, 1); - /* We always run recovery on our own orphan dir - the dead - * node(s) may have disallowd a previos inode delete. Re-processing - * is therefore required. */ - ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, - NULL); + /* queue recovery for offline slots */ + ocfs2_queue_replay_slots(osb); bail: mutex_lock(&osb->recovery_lock); @@ -921,6 +1022,7 @@ bail: goto restart; } + ocfs2_free_replay_slots(osb); osb->recovery_thread_task = NULL; mb(); /* sync with ocfs2_recovery_thread_running */ wake_up(&osb->recovery_event); @@ -1069,6 +1171,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, goto done; } + /* we need to run complete recovery for offline orphan slots */ + ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); + mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h index dc27100..df8e9de 100644 --- a/fs/ocfs2/journal.h +++ b/fs/ocfs2/journal.h @@ -135,6 +135,8 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb, /* Exported only for the journal struct init code in super.c. Do not call. */ void ocfs2_complete_recovery(kapi_work_struct_t *work); +int ocfs2_compute_replay_slots(struct ocfs2_super *osb); + /* * Journal Control: * Initialize, Load, Shutdown, Wipe a journal. diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index 7c79c84..e84185d 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -198,6 +198,7 @@ enum ocfs2_mount_options #define OCFS2_DEFAULT_ATIME_QUANTUM 60 struct ocfs2_journal; +struct ocfs2_replay_map; struct ocfs2_super { struct task_struct *commit_task; @@ -249,6 +250,7 @@ struct ocfs2_super atomic_t vol_state; struct mutex recovery_lock; + struct ocfs2_replay_map *replay_map; struct task_struct *recovery_thread_task; int disable_recovery; wait_queue_head_t checkpoint_event; diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index fc806d6..a421e7d 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -1962,6 +1962,12 @@ static int ocfs2_check_volume(struct ocfs2_super *osb) * lock, and it's marked as dirty, set the bit in the recover * map and launch a recovery thread for it. */ status = ocfs2_mark_dead_nodes(osb); + if (status < 0) { + mlog_errno(status); + goto finally; + } + + status = ocfs2_compute_replay_slots(osb); if (status < 0) mlog_errno(status); -- 1.5.6.3