tunnels: fix netns vs proto registration ordering
[safe/jmp/linux-2.6] / fs / ocfs2 / cluster / heartbeat.c
index 277ca67..eda5b8b 100644 (file)
@@ -33,6 +33,7 @@
 #include <linux/random.h>
 #include <linux/crc32.h>
 #include <linux/time.h>
+#include <linux/debugfs.h>
 
 #include "heartbeat.h"
 #include "tcp.h"
@@ -60,6 +61,11 @@ static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 static LIST_HEAD(o2hb_node_events);
 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 
+#define O2HB_DEBUG_DIR                 "o2hb"
+#define O2HB_DEBUG_LIVENODES           "livenodes"
+static struct dentry *o2hb_debug_dir;
+static struct dentry *o2hb_debug_livenodes;
+
 static LIST_HEAD(o2hb_all_regions);
 
 static struct o2hb_callback {
@@ -170,7 +176,8 @@ static void o2hb_write_timeout(struct work_struct *work)
 
 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
 {
-       mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
+       mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
+            O2HB_MAX_WRITE_TIMEOUT_MS);
 
        cancel_delayed_work(&reg->hr_write_timeout_work);
        reg->hr_last_timeout_start = jiffies;
@@ -184,10 +191,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
        flush_scheduled_work();
 }
 
-static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
-                                     unsigned int num_ios)
+static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 {
-       atomic_set(&wc->wc_num_reqs, num_ios);
+       atomic_set(&wc->wc_num_reqs, 1);
        init_completion(&wc->wc_io_complete);
        wc->wc_error = 0;
 }
@@ -212,12 +218,12 @@ static void o2hb_wait_on_io(struct o2hb_region *reg,
        struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
 
        blk_run_address_space(mapping);
+       o2hb_bio_wait_dec(wc, 1);
 
        wait_for_completion(&wc->wc_io_complete);
 }
 
-static int o2hb_bio_end_io(struct bio *bio,
-                          unsigned int bytes_done,
+static void o2hb_bio_end_io(struct bio *bio,
                           int error)
 {
        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
@@ -227,34 +233,30 @@ static int o2hb_bio_end_io(struct bio *bio,
                wc->wc_error = error;
        }
 
-       if (bio->bi_size)
-               return 1;
-
        o2hb_bio_wait_dec(wc, 1);
-       return 0;
+       bio_put(bio);
 }
 
 /* Setup a Bio to cover I/O against num_slots slots starting at
  * start_slot. */
 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
                                      struct o2hb_bio_wait_ctxt *wc,
-                                     unsigned int start_slot,
-                                     unsigned int num_slots)
+                                     unsigned int *current_slot,
+                                     unsigned int max_slots)
 {
-       int i, nr_vecs, len, first_page, last_page;
+       int len, current_page;
        unsigned int vec_len, vec_start;
        unsigned int bits = reg->hr_block_bits;
        unsigned int spp = reg->hr_slots_per_page;
+       unsigned int cs = *current_slot;
        struct bio *bio;
        struct page *page;
 
-       nr_vecs = (num_slots + spp - 1) / spp;
-
        /* Testing has shown this allocation to take long enough under
         * GFP_KERNEL that the local node can get fenced. It would be
         * nicest if we could pre-allocate these bios and avoid this
         * all together. */
-       bio = bio_alloc(GFP_ATOMIC, nr_vecs);
+       bio = bio_alloc(GFP_ATOMIC, 16);
        if (!bio) {
                mlog(ML_ERROR, "Could not alloc slots BIO!\n");
                bio = ERR_PTR(-ENOMEM);
@@ -262,137 +264,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
        }
 
        /* Must put everything in 512 byte sectors for the bio... */
-       bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
+       bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
        bio->bi_bdev = reg->hr_bdev;
        bio->bi_private = wc;
        bio->bi_end_io = o2hb_bio_end_io;
 
-       first_page = start_slot / spp;
-       last_page = first_page + nr_vecs;
-       vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
-       for(i = first_page; i < last_page; i++) {
-               page = reg->hr_slot_data[i];
+       vec_start = (cs << bits) % PAGE_CACHE_SIZE;
+       while(cs < max_slots) {
+               current_page = cs / spp;
+               page = reg->hr_slot_data[current_page];
 
-               vec_len = PAGE_CACHE_SIZE;
-               /* last page might be short */
-               if (((i + 1) * spp) > (start_slot + num_slots))
-                       vec_len = ((num_slots + start_slot) % spp) << bits;
-               vec_len -=  vec_start;
+               vec_len = min(PAGE_CACHE_SIZE - vec_start,
+                             (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
 
                mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
-                    i, vec_len, vec_start);
+                    current_page, vec_len, vec_start);
 
                len = bio_add_page(bio, page, vec_len, vec_start);
-               if (len != vec_len) {
-                       bio_put(bio);
-                       bio = ERR_PTR(-EIO);
-
-                       mlog(ML_ERROR, "Error adding page to bio i = %d, "
-                            "vec_len = %u, len = %d\n, start = %u\n",
-                            i, vec_len, len, vec_start);
-                       goto bail;
-               }
+               if (len != vec_len) break;
 
+               cs += vec_len / (PAGE_CACHE_SIZE/spp);
                vec_start = 0;
        }
 
 bail:
+       *current_slot = cs;
        return bio;
 }
 
-/*
- * Compute the maximum number of sectors the bdev can handle in one bio,
- * as a power of two.
- *
- * Stolen from oracleasm, thanks Joel!
- */
-static int compute_max_sectors(struct block_device *bdev)
-{
-       int max_pages, max_sectors, pow_two_sectors;
-
-       struct request_queue *q;
-
-       q = bdev_get_queue(bdev);
-       max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
-       if (max_pages > BIO_MAX_PAGES)
-               max_pages = BIO_MAX_PAGES;
-       if (max_pages > q->max_phys_segments)
-               max_pages = q->max_phys_segments;
-       if (max_pages > q->max_hw_segments)
-               max_pages = q->max_hw_segments;
-       max_pages--; /* Handle I/Os that straddle a page */
-
-       if (max_pages) {
-               max_sectors = max_pages << (PAGE_SHIFT - 9);
-       } else {
-               /* If BIO contains 1 or less than 1 page. */
-               max_sectors = q->max_sectors;
-       }
-       /* Why is fls() 1-based???? */
-       pow_two_sectors = 1 << (fls(max_sectors) - 1);
-
-       return pow_two_sectors;
-}
-
-static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
-                                              unsigned int num_slots,
-                                              unsigned int *num_bios,
-                                              unsigned int *slots_per_bio)
-{
-       unsigned int max_sectors, io_sectors;
-
-       max_sectors = compute_max_sectors(reg->hr_bdev);
-
-       io_sectors = num_slots << (reg->hr_block_bits - 9);
-
-       *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
-       *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
-
-       mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
-            "device can handle %u sectors of I/O\n", io_sectors, num_slots,
-            max_sectors);
-       mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
-            *num_bios, *slots_per_bio);
-}
-
 static int o2hb_read_slots(struct o2hb_region *reg,
                           unsigned int max_slots)
 {
-       unsigned int num_bios, slots_per_bio, start_slot, num_slots;
-       int i, status;
+       unsigned int current_slot=0;
+       int status;
        struct o2hb_bio_wait_ctxt wc;
-       struct bio **bios;
        struct bio *bio;
 
-       o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
+       o2hb_bio_wait_init(&wc);
 
-       bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
-       if (!bios) {
-               status = -ENOMEM;
-               mlog_errno(status);
-               return status;
-       }
-
-       o2hb_bio_wait_init(&wc, num_bios);
-
-       num_slots = slots_per_bio;
-       for(i = 0; i < num_bios; i++) {
-               start_slot = i * slots_per_bio;
-
-               /* adjust num_slots at last bio */
-               if (max_slots < (start_slot + num_slots))
-                       num_slots = max_slots - start_slot;
-
-               bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
+       while(current_slot < max_slots) {
+               bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
                if (IS_ERR(bio)) {
-                       o2hb_bio_wait_dec(&wc, num_bios - i);
-
                        status = PTR_ERR(bio);
                        mlog_errno(status);
                        goto bail_and_wait;
                }
-               bios[i] = bio;
 
+               atomic_inc(&wc.wc_num_reqs);
                submit_bio(READ, bio);
        }
 
@@ -403,38 +321,30 @@ bail_and_wait:
        if (wc.wc_error && !status)
                status = wc.wc_error;
 
-       if (bios) {
-               for(i = 0; i < num_bios; i++)
-                       if (bios[i])
-                               bio_put(bios[i]);
-               kfree(bios);
-       }
-
        return status;
 }
 
 static int o2hb_issue_node_write(struct o2hb_region *reg,
-                                struct bio **write_bio,
                                 struct o2hb_bio_wait_ctxt *write_wc)
 {
        int status;
        unsigned int slot;
        struct bio *bio;
 
-       o2hb_bio_wait_init(write_wc, 1);
+       o2hb_bio_wait_init(write_wc);
 
        slot = o2nm_this_node();
 
-       bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
+       bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
        if (IS_ERR(bio)) {
                status = PTR_ERR(bio);
                mlog_errno(status);
                goto bail;
        }
 
+       atomic_inc(&write_wc->wc_num_reqs);
        submit_bio(WRITE, bio);
 
-       *write_bio = bio;
        status = 0;
 bail:
        return status;
@@ -530,7 +440,7 @@ static inline void o2hb_prepare_block(struct o2hb_region *reg,
                                                                   hb_block));
 
        mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
-            (long long)cpu_to_le64(generation),
+            (long long)generation,
             le32_to_cpu(hb_block->hb_cksum));
 }
 
@@ -826,7 +736,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 {
        int i, ret, highest_node, change = 0;
        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
-       struct bio *write_bio;
        struct o2hb_bio_wait_ctxt write_wc;
 
        ret = o2nm_configured_node_map(configured_nodes,
@@ -864,7 +773,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 
        /* And fire off the write. Note that we don't wait on this I/O
         * until later. */
-       ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+       ret = o2hb_issue_node_write(reg, &write_wc);
        if (ret < 0) {
                mlog_errno(ret);
                return ret;
@@ -882,7 +791,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
         * people we find in our steady state have seen us.
         */
        o2hb_wait_on_io(reg, &write_wc);
-       bio_put(write_bio);
        if (write_wc.wc_error) {
                /* Do not re-arm the write timeout on I/O error - we
                 * can't be sure that the new block ever made it to
@@ -943,7 +851,6 @@ static int o2hb_thread(void *data)
 {
        int i, ret;
        struct o2hb_region *reg = data;
-       struct bio *write_bio;
        struct o2hb_bio_wait_ctxt write_wc;
        struct timeval before_hb, after_hb;
        unsigned int elapsed_msec;
@@ -954,7 +861,7 @@ static int o2hb_thread(void *data)
 
        while (!kthread_should_stop() && !reg->hr_unclean_stop) {
                /* We track the time spent inside
-                * o2hb_do_disk_heartbeat so that we avoid more then
+                * o2hb_do_disk_heartbeat so that we avoid more than
                 * hr_timeout_ms between disk writes. On busy systems
                 * this should result in a heartbeat which is less
                 * likely to time itself out. */
@@ -968,7 +875,8 @@ static int o2hb_thread(void *data)
                do_gettimeofday(&after_hb);
                elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
 
-               mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
+               mlog(ML_HEARTBEAT,
+                    "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
                     before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
                     after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
                     elapsed_msec);
@@ -993,10 +901,9 @@ static int o2hb_thread(void *data)
         *
         * XXX: Should we skip this on unclean_stop? */
        o2hb_prepare_block(reg, 0);
-       ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+       ret = o2hb_issue_node_write(reg, &write_wc);
        if (ret == 0) {
                o2hb_wait_on_io(reg, &write_wc);
-               bio_put(write_bio);
        } else {
                mlog_errno(ret);
        }
@@ -1006,7 +913,77 @@ static int o2hb_thread(void *data)
        return 0;
 }
 
-void o2hb_init(void)
+#ifdef CONFIG_DEBUG_FS
+static int o2hb_debug_open(struct inode *inode, struct file *file)
+{
+       unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+       char *buf = NULL;
+       int i = -1;
+       int out = 0;
+
+       buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
+       if (!buf)
+               goto bail;
+
+       o2hb_fill_node_map(map, sizeof(map));
+
+       while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES)
+               out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
+       out += snprintf(buf + out, PAGE_SIZE - out, "\n");
+
+       i_size_write(inode, out);
+
+       file->private_data = buf;
+
+       return 0;
+bail:
+       return -ENOMEM;
+}
+
+static int o2hb_debug_release(struct inode *inode, struct file *file)
+{
+       kfree(file->private_data);
+       return 0;
+}
+
+static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
+                                size_t nbytes, loff_t *ppos)
+{
+       return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
+                                      i_size_read(file->f_mapping->host));
+}
+#else
+static int o2hb_debug_open(struct inode *inode, struct file *file)
+{
+       return 0;
+}
+static int o2hb_debug_release(struct inode *inode, struct file *file)
+{
+       return 0;
+}
+static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
+                              size_t nbytes, loff_t *ppos)
+{
+       return 0;
+}
+#endif  /* CONFIG_DEBUG_FS */
+
+static const struct file_operations o2hb_debug_fops = {
+       .open =         o2hb_debug_open,
+       .release =      o2hb_debug_release,
+       .read =         o2hb_debug_read,
+       .llseek =       generic_file_llseek,
+};
+
+void o2hb_exit(void)
+{
+       if (o2hb_debug_livenodes)
+               debugfs_remove(o2hb_debug_livenodes);
+       if (o2hb_debug_dir)
+               debugfs_remove(o2hb_debug_dir);
+}
+
+int o2hb_init(void)
 {
        int i;
 
@@ -1019,6 +996,24 @@ void o2hb_init(void)
        INIT_LIST_HEAD(&o2hb_node_events);
 
        memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
+
+       o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
+       if (!o2hb_debug_dir) {
+               mlog_errno(-ENOMEM);
+               return -ENOMEM;
+       }
+
+       o2hb_debug_livenodes = debugfs_create_file(O2HB_DEBUG_LIVENODES,
+                                                  S_IFREG|S_IRUSR,
+                                                  o2hb_debug_dir, NULL,
+                                                  &o2hb_debug_fops);
+       if (!o2hb_debug_livenodes) {
+               mlog_errno(-ENOMEM);
+               debugfs_remove(o2hb_debug_dir);
+               return -ENOMEM;
+       }
+
+       return 0;
 }
 
 /* if we're already in a callback then we're already serialized by the sem */
@@ -1077,7 +1072,7 @@ static void o2hb_region_release(struct config_item *item)
        }
 
        if (reg->hr_bdev)
-               blkdev_put(reg->hr_bdev);
+               blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
 
        if (reg->hr_slots)
                kfree(reg->hr_slots);
@@ -1330,6 +1325,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
                                     const char *page,
                                     size_t count)
 {
+       struct task_struct *hb_task;
        long fd;
        int sectsize;
        char *p = (char *)page;
@@ -1368,7 +1364,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
                goto out;
 
        reg->hr_bdev = I_BDEV(filp->f_mapping->host);
-       ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
+       ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ);
        if (ret) {
                reg->hr_bdev = NULL;
                goto out;
@@ -1377,7 +1373,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
 
        bdevname(reg->hr_bdev, reg->hr_dev_name);
 
-       sectsize = bdev_hardsect_size(reg->hr_bdev);
+       sectsize = bdev_logical_block_size(reg->hr_bdev);
        if (sectsize != reg->hr_block_bytes) {
                mlog(ML_ERROR,
                     "blocksize %u incorrect for device, expected %d",
@@ -1415,24 +1411,42 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
         */
        atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
 
-       reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
-                                  reg->hr_item.ci_name);
-       if (IS_ERR(reg->hr_task)) {
-               ret = PTR_ERR(reg->hr_task);
+       hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
+                             reg->hr_item.ci_name);
+       if (IS_ERR(hb_task)) {
+               ret = PTR_ERR(hb_task);
                mlog_errno(ret);
-               reg->hr_task = NULL;
                goto out;
        }
 
+       spin_lock(&o2hb_live_lock);
+       reg->hr_task = hb_task;
+       spin_unlock(&o2hb_live_lock);
+
        ret = wait_event_interruptible(o2hb_steady_queue,
                                atomic_read(&reg->hr_steady_iterations) == 0);
        if (ret) {
-               kthread_stop(reg->hr_task);
+               /* We got interrupted (hello ptrace!).  Clean up */
+               spin_lock(&o2hb_live_lock);
+               hb_task = reg->hr_task;
                reg->hr_task = NULL;
+               spin_unlock(&o2hb_live_lock);
+
+               if (hb_task)
+                       kthread_stop(hb_task);
                goto out;
        }
 
-       ret = count;
+       /* Ok, we were woken.  Make sure it wasn't by drop_item() */
+       spin_lock(&o2hb_live_lock);
+       hb_task = reg->hr_task;
+       spin_unlock(&o2hb_live_lock);
+
+       if (hb_task)
+               ret = count;
+       else
+               ret = -EIO;
+
 out:
        if (filp)
                fput(filp);
@@ -1440,7 +1454,7 @@ out:
                iput(inode);
        if (ret < 0) {
                if (reg->hr_bdev) {
-                       blkdev_put(reg->hr_bdev);
+                       blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
                        reg->hr_bdev = NULL;
                }
        }
@@ -1450,10 +1464,17 @@ out:
 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
                                       char *page)
 {
-       if (!reg->hr_task)
+       pid_t pid = 0;
+
+       spin_lock(&o2hb_live_lock);
+       if (reg->hr_task)
+               pid = task_pid_nr(reg->hr_task);
+       spin_unlock(&o2hb_live_lock);
+
+       if (!pid)
                return 0;
 
-       return sprintf(page, "%u\n", reg->hr_task->pid);
+       return sprintf(page, "%u\n", pid);
 }
 
 struct o2hb_region_attribute {
@@ -1568,35 +1589,42 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g
                                                          const char *name)
 {
        struct o2hb_region *reg = NULL;
-       struct config_item *ret = NULL;
 
        reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
        if (reg == NULL)
-               goto out; /* ENOMEM */
+               return ERR_PTR(-ENOMEM);
 
        config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
 
-       ret = &reg->hr_item;
-
        spin_lock(&o2hb_live_lock);
        list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
        spin_unlock(&o2hb_live_lock);
-out:
-       if (ret == NULL)
-               kfree(reg);
 
-       return ret;
+       return &reg->hr_item;
 }
 
 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
                                           struct config_item *item)
 {
+       struct task_struct *hb_task;
        struct o2hb_region *reg = to_o2hb_region(item);
 
        /* stop the thread when the user removes the region dir */
-       if (reg->hr_task) {
-               kthread_stop(reg->hr_task);
-               reg->hr_task = NULL;
+       spin_lock(&o2hb_live_lock);
+       hb_task = reg->hr_task;
+       reg->hr_task = NULL;
+       spin_unlock(&o2hb_live_lock);
+
+       if (hb_task)
+               kthread_stop(hb_task);
+
+       /*
+        * If we're racing a dev_write(), we need to wake them.  They will
+        * check reg->hr_task
+        */
+       if (atomic_read(&reg->hr_steady_iterations) != 0) {
+               atomic_set(&reg->hr_steady_iterations, 0);
+               wake_up(&o2hb_steady_queue);
        }
 
        config_item_put(item);
@@ -1741,7 +1769,67 @@ void o2hb_setup_callback(struct o2hb_callback_func *hc,
 }
 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
 
-int o2hb_register_callback(struct o2hb_callback_func *hc)
+static struct o2hb_region *o2hb_find_region(const char *region_uuid)
+{
+       struct o2hb_region *p, *reg = NULL;
+
+       assert_spin_locked(&o2hb_live_lock);
+
+       list_for_each_entry(p, &o2hb_all_regions, hr_all_item) {
+               if (!strcmp(region_uuid, config_item_name(&p->hr_item))) {
+                       reg = p;
+                       break;
+               }
+       }
+
+       return reg;
+}
+
+static int o2hb_region_get(const char *region_uuid)
+{
+       int ret = 0;
+       struct o2hb_region *reg;
+
+       spin_lock(&o2hb_live_lock);
+
+       reg = o2hb_find_region(region_uuid);
+       if (!reg)
+               ret = -ENOENT;
+       spin_unlock(&o2hb_live_lock);
+
+       if (ret)
+               goto out;
+
+       ret = o2nm_depend_this_node();
+       if (ret)
+               goto out;
+
+       ret = o2nm_depend_item(&reg->hr_item);
+       if (ret)
+               o2nm_undepend_this_node();
+
+out:
+       return ret;
+}
+
+static void o2hb_region_put(const char *region_uuid)
+{
+       struct o2hb_region *reg;
+
+       spin_lock(&o2hb_live_lock);
+
+       reg = o2hb_find_region(region_uuid);
+
+       spin_unlock(&o2hb_live_lock);
+
+       if (reg) {
+               o2nm_undepend_item(&reg->hr_item);
+               o2nm_undepend_this_node();
+       }
+}
+
+int o2hb_register_callback(const char *region_uuid,
+                          struct o2hb_callback_func *hc)
 {
        struct o2hb_callback_func *tmp;
        struct list_head *iter;
@@ -1757,6 +1845,12 @@ int o2hb_register_callback(struct o2hb_callback_func *hc)
                goto out;
        }
 
+       if (region_uuid) {
+               ret = o2hb_region_get(region_uuid);
+               if (ret)
+                       goto out;
+       }
+
        down_write(&o2hb_callback_sem);
 
        list_for_each(iter, &hbcall->list) {
@@ -1778,23 +1872,26 @@ out:
 }
 EXPORT_SYMBOL_GPL(o2hb_register_callback);
 
-int o2hb_unregister_callback(struct o2hb_callback_func *hc)
+void o2hb_unregister_callback(const char *region_uuid,
+                             struct o2hb_callback_func *hc)
 {
        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
 
        mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
             __builtin_return_address(0), hc);
 
+       /* XXX Can this happen _with_ a region reference? */
        if (list_empty(&hc->hc_item))
-               return 0;
+               return;
+
+       if (region_uuid)
+               o2hb_region_put(region_uuid);
 
        down_write(&o2hb_callback_sem);
 
        list_del_init(&hc->hc_item);
 
        up_write(&o2hb_callback_sem);
-
-       return 0;
 }
 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);