[RHEL8,COMMIT] ploop: Organize BAT entries into rbtree of pages

Submitted by Konstantin Khorenko on April 20, 2020, 7:24 a.m.

Details

Message ID 202004200724.03K7Oi8j015007@finist_co8.work.ct
State New
Series "ploop: Organize BAT entries into rbtree of pages"
Headers show

Commit Message

Konstantin Khorenko April 20, 2020, 7:24 a.m.
The commit is pushed to "branch-rh8-4.18.0-80.1.2.vz8.3.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-80.1.2.vz8.3.5
------>
commit a027dcb44c3a039c646f4eb973d27964fd0cc874
Author: Kirill Tkhai <ktkhai@virtuozzo.com>
Date:   Mon Apr 20 10:24:43 2020 +0300

    ploop: Organize BAT entries into rbtree of pages
    
    Before the patch, cached BAT was represented as array of entries.
    Array size grows lineary with device size growth, and the future
    plan will be to make cached BAT entries to be shrinkable. This patch
    is a big preparation for that.
    
    Now cached BAT page becomes represented as struct md_page, and
    all of them are linked into rbtree. Currently, all the tree is
    populated on .ctr, and all the pages remain there during all
    target life. Making the pages shrinkable will require additional
    actions like repopulation the tree with shrinked pages on some
    maintaince operations.
    
    Signed-off-by: Kirill Tkhai <ktkhai@virtuozzo.com>
---
 drivers/md/dm-ploop-bat.c    | 259 +++++++++++++++++++--------
 drivers/md/dm-ploop-cmd.c    | 405 +++++++++++++++++++++++++++----------------
 drivers/md/dm-ploop-map.c    |  82 +++++----
 drivers/md/dm-ploop-target.c |  19 +-
 drivers/md/dm-ploop.h        | 124 ++++++++++++-
 5 files changed, 620 insertions(+), 269 deletions(-)

Patch hide | download patch | download mbox

diff --git a/drivers/md/dm-ploop-bat.c b/drivers/md/dm-ploop-bat.c
index 2a8057cfde4c..d6b687806118 100644
--- a/drivers/md/dm-ploop-bat.c
+++ b/drivers/md/dm-ploop-bat.c
@@ -3,64 +3,197 @@ 
 #include <linux/mm.h>
 #include "dm-ploop.h"
 
+struct md_page * md_page_find(struct ploop *ploop, unsigned int id)
+{
+	struct rb_node *node;
+	struct md_page *md;
+
+	node = ploop->bat_entries.rb_node;
+
+	while (node) {
+		md = rb_entry(node, struct md_page, node);
+		if (id < md->id)
+			node = node->rb_left;
+		else if (id > md->id)
+			node = node->rb_right;
+		else
+			return md;
+	}
+
+	return NULL;
+}
+
+void md_page_insert(struct ploop *ploop, struct md_page *new_md)
+{
+	struct rb_root *root = &ploop->bat_entries;
+	unsigned int new_id = new_md->id;
+	struct rb_node *parent, **node;
+	struct md_page *md;
+
+	node = &root->rb_node;
+	parent = NULL;
+
+	while (*node) {
+		parent = *node;
+		md = rb_entry(*node, struct md_page, node);
+		if (new_id < md->id)
+			node = &parent->rb_left;
+		else if (new_id > md->id)
+			node = &parent->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&new_md->node, parent, node);
+	rb_insert_color(&new_md->node, root);
+}
+
+struct md_page * alloc_md_page(unsigned int id)
+{
+	struct md_page *md;
+	struct page *page;
+	unsigned int size;
+	u8 *levels;
+
+	md = kmalloc(sizeof(*md), GFP_KERNEL); /* FIXME: memcache */
+	if (!md)
+		return NULL;
+	size = sizeof(u8) * PAGE_SIZE / sizeof(map_index_t);
+	levels = kzalloc(size, GFP_KERNEL);
+	if (!levels)
+		goto err_levels;
+
+	page = alloc_page(GFP_KERNEL);
+	if (!page)
+		goto err_page;
+
+	md->bat_levels = levels;
+	md->page = page;
+	md->id = id;
+	return md;
+err_page:
+	kfree(levels);
+err_levels:
+	kfree(md);
+	return NULL;
+}
+
+void free_md_page(struct md_page *md)
+{
+	put_page(md->page);
+	kfree(md->bat_levels);
+	kfree(md);
+}
+
+bool try_update_bat_entry(struct ploop *ploop, unsigned int cluster,
+			  u8 level, unsigned int dst_cluster)
+{
+	unsigned int *bat_entries, id = bat_clu_to_page_nr(cluster);
+	struct md_page *md = md_page_find(ploop, id);
+
+	lockdep_assert_held(&ploop->bat_rwlock);
+
+	if (!md)
+		return false;
+
+	cluster = bat_clu_idx_in_page(cluster); /* relative offset */
+
+	if (md->bat_levels[cluster] == level) {
+		bat_entries = kmap_atomic(md->page);
+		bat_entries[cluster] = dst_cluster;
+		kunmap_atomic(bat_entries);
+		return true;
+	}
+	return false;
+}
+
 /*
- * Read from disk and fill bat_entries[]. Note, that on enter here, cluster #0
+ * Clear all clusters, which are referred to in BAT, from holes_bitmap.
+ * Set bat_levels[] to top delta's level. Mark unmapped clusters as
+ * BAT_ENTRY_NONE.
+ */
+static int parse_bat_entries(struct ploop *ploop, map_index_t *bat_entries,
+		     u8 *bat_levels, unsigned int nr, unsigned int page_id)
+{
+	int i = 0;
+
+	if (page_id == 0)
+		i = PLOOP_MAP_OFFSET;
+
+	for (; i < nr; i++) {
+		if (bat_entries[i] == BAT_ENTRY_NONE)
+			return -EINVAL;
+		if (bat_entries[i]) {
+			bat_levels[i] = BAT_LEVEL_TOP;
+			/* Cluster may refer out holes_bitmap after shrinking */
+			if (bat_entries[i] < ploop->hb_nr)
+				ploop_hole_clear_bit(bat_entries[i], ploop);
+		} else {
+			bat_entries[i] = BAT_ENTRY_NONE;
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * Read from disk and fill bat_entries. Note, that on enter here, cluster #0
  * is already read from disk (with header) -- just parse bio pages content.
  */
 static int ploop_read_bat(struct ploop *ploop, struct bio *bio)
 {
-	unsigned int entries_per_page, nr_copy, page, i = 0;
-	map_index_t *addr, off, cluster = 0;
+	unsigned int id, entries_per_page, nr_copy, nr_all, page, i = 0;
+	map_index_t *from, *to, cluster = 0;
+	struct md_page *md;
 	int ret = 0;
 
 	entries_per_page = PAGE_SIZE / sizeof(map_index_t);
+	nr_all = ploop->nr_bat_entries + PLOOP_MAP_OFFSET;
 
 	do {
 		for (page = 0; page < nr_pages_in_cluster(ploop); page++) {
-			if (i == 0)
-				off = PLOOP_MAP_OFFSET;
-			else
-				off = 0;
-
-			nr_copy = entries_per_page - off;
-			if (i + nr_copy > ploop->nr_bat_entries)
-				nr_copy = ploop->nr_bat_entries - i;
-
-			addr = kmap(bio->bi_io_vec[page].bv_page);
-			memcpy(&ploop->bat_entries[i], addr + off,
-				nr_copy * sizeof(map_index_t));
+			id = i * sizeof(map_index_t) / PAGE_SIZE;
+			md = alloc_md_page(id);
+			if (!md) {
+				ret = -ENOMEM;
+				goto out;
+			}
+			md_page_insert(ploop, md);
+
+			nr_copy = entries_per_page;
+			if (i + nr_copy > nr_all)
+				nr_copy = nr_all - i;
+
+			to = kmap(md->page);
+			from = kmap(bio->bi_io_vec[page].bv_page);
+			memcpy(to, from, nr_copy * sizeof(map_index_t));
 			kunmap(bio->bi_io_vec[page].bv_page);
-			i += nr_copy;
+			ret = parse_bat_entries(ploop, to, md->bat_levels,
+						nr_copy, id);
+			kunmap(md->page);
+			if (ret)
+				goto out;
 
-			if (i >= ploop->nr_bat_entries)
+			i += nr_copy;
+			if (i >= nr_all)
 				goto out;
 		}
 
 		ret = ploop_read_cluster_sync(ploop, bio, ++cluster);
 		if (ret)
-			goto err;
+			goto out;
 
 	} while (1);
 
 out:
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (ploop->bat_entries[i] == BAT_ENTRY_NONE) {
-			ret = -EINVAL;
-			goto err;
-		}
-		if (!ploop->bat_entries[i])
-			ploop->bat_entries[i] = BAT_ENTRY_NONE;
-	}
-
-err:
 	return ret;
 }
 
 /* Alloc holes_bitmap and set bits of free clusters */
-static int ploop_assign_hb_and_levels(struct ploop *ploop,
-				      unsigned int bat_clusters)
+static int ploop_setup_holes_bitmap(struct ploop *ploop,
+				    unsigned int bat_clusters)
 {
-	unsigned int i, size, dst_cluster;
+	unsigned int i, size;
 
 	/*
 	 * + number of data clusters.
@@ -78,29 +211,10 @@  static int ploop_assign_hb_and_levels(struct ploop *ploop,
 		return -ENOMEM;
 	memset(ploop->holes_bitmap, 0xff, size);
 
-	size = ploop->nr_bat_entries * sizeof(ploop->bat_levels[0]);
-	ploop->bat_levels = kvzalloc(size, GFP_KERNEL);
-	if (!ploop->bat_levels)
-		return -ENOMEM;
-
 	/* Mark all BAT clusters as occupied. */
 	for (i = 0; i < bat_clusters; i++)
 		ploop_hole_clear_bit(i, ploop);
 
-	/*
-	 * Clear all clusters, which are referred to in BAT, from holes_bitmap.
-	 * Set bat_levels[] to top delta's level.
-	 */
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		dst_cluster = ploop->bat_entries[i];
-		if (dst_cluster != BAT_ENTRY_NONE) {
-			ploop->bat_levels[i] = BAT_LEVEL_TOP;
-			/* Cluster may refer out holes_bitmap after shrinking */
-			if (dst_cluster < ploop->hb_nr)
-				ploop_hole_clear_bit(dst_cluster, ploop);
-		}
-	}
-
 	return 0;
 }
 
@@ -116,7 +230,6 @@  int ploop_read_metadata(struct dm_target *ti, struct ploop *ploop)
 	struct page *page;
 	struct bio *bio;
 	int ret;
-	void *data;
 
 	cluster_log = ploop->cluster_log;
 
@@ -156,29 +269,14 @@  int ploop_read_metadata(struct dm_target *ti, struct ploop *ploop)
 		pr_err("ploop: custom FirstBlockOffset\n");
 		goto out;
 	}
-
-	ret = -ENOMEM;
-	/*
-	 * Memory for hdr and array of BAT mapping. We keep them
-	 * neighbours like they are stored on disk to simplify
-	 * BAT update code.
-	 */
-	data = vmalloc(size);
-	if (!data)
-		goto out;
-	BUG_ON((unsigned long)data & ~PAGE_MASK);
-
-	memcpy(data, m_hdr, sizeof(*m_hdr));
-	ploop->hdr = data;
-	ploop->bat_entries = data + sizeof(*m_hdr);
 	kunmap(page);
 	m_hdr = NULL;
 
-	ret = ploop_read_bat(ploop, bio);
+	ret = ploop_setup_holes_bitmap(ploop, bat_clusters);
 	if (ret)
 		goto out;
 
-	ret = ploop_assign_hb_and_levels(ploop, bat_clusters);
+	ret = ploop_read_bat(ploop, bio);
 out:
 	if (m_hdr)
 		kunmap(page);
@@ -190,22 +288,28 @@  static int ploop_delta_check_header(struct ploop *ploop, struct page *page,
 		       unsigned int *nr_pages, unsigned int *last_page_len)
 {
 	unsigned int bytes, delta_nr_be, offset_clusters, bat_clusters, cluster_log;
-	struct ploop_pvd_header *hdr;
+	struct ploop_pvd_header *d_hdr, *hdr;
 	u64 size, delta_size;
+	struct md_page *md;
 	int ret = -EPROTO;
 
-	hdr = kmap(page);
+	md = md_page_find(ploop, 0);
+	if (!md)
+		return -ENXIO;
+
+	hdr = kmap(md->page);
+	d_hdr = kmap(page);
 
-	if (memcmp(hdr->m_Sig, ploop->hdr->m_Sig, sizeof(hdr->m_Sig)) ||
-	    hdr->m_Sectors != ploop->hdr->m_Sectors ||
-	    hdr->m_Type != ploop->hdr->m_Type)
+	if (memcmp(d_hdr->m_Sig, hdr->m_Sig, sizeof(d_hdr->m_Sig)) ||
+	    d_hdr->m_Sectors != hdr->m_Sectors ||
+	    d_hdr->m_Type != hdr->m_Type)
 		goto out;
 
-	delta_size = le64_to_cpu(hdr->m_SizeInSectors_v2);
-	delta_nr_be = le32_to_cpu(hdr->m_Size);
-	size = ploop->hdr->m_SizeInSectors_v2;
+	delta_size = le64_to_cpu(d_hdr->m_SizeInSectors_v2);
+	delta_nr_be = le32_to_cpu(d_hdr->m_Size);
+	size = hdr->m_SizeInSectors_v2;
 	cluster_log = ploop->cluster_log;
-	offset_clusters = le32_to_cpu(hdr->m_FirstBlockOffset) >> cluster_log;
+	offset_clusters = le32_to_cpu(d_hdr->m_FirstBlockOffset) >> cluster_log;
 	bytes = (PLOOP_MAP_OFFSET + delta_nr_be) * sizeof(map_index_t);
 	bat_clusters = DIV_ROUND_UP(bytes, 1 << (cluster_log + 9));
 
@@ -218,6 +322,7 @@  static int ploop_delta_check_header(struct ploop *ploop, struct page *page,
 	*last_page_len = bytes ? : PAGE_SIZE;
 	ret = 0;
 out:
+	kunmap(md->page);
 	kunmap(page);
 	return ret;
 }
diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index a6597f573151..5f2c93579e53 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -22,10 +22,12 @@  static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
  * Assign newly allocated memory for BAT array and holes_bitmap
  * before grow.
  */
-static void ploop_advance_bat_and_holes(struct ploop *ploop,
-					struct ploop_cmd *cmd)
+static void ploop_advance_holes_bitmap(struct ploop *ploop,
+				       struct ploop_cmd *cmd)
 {
-	unsigned int i, size, dst_cluster;
+	unsigned int i, end, size, dst_cluster, *bat_entries;
+	struct rb_node *node;
+	struct md_page *md;
 
 	/* This is called only once */
 	if (cmd->resize.stage != PLOOP_GROW_STAGE_INITIAL)
@@ -40,27 +42,21 @@  static void ploop_advance_bat_and_holes(struct ploop *ploop,
 	for (i = ploop->hb_nr; i < size * 8; i++)
 		set_bit(i, ploop->holes_bitmap);
 	swap(cmd->resize.hb_nr, ploop->hb_nr);
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (!cluster_is_in_top_delta(ploop, i))
-			continue;
-		dst_cluster = ploop->bat_entries[i];
-		if (dst_cluster < ploop->hb_nr &&
-		    test_bit(dst_cluster, ploop->holes_bitmap)) {
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (!md_page_cluster_is_in_top_delta(md, i))
+				continue;
+			dst_cluster = bat_entries[i];
 			/* This may happen after grow->shrink->(now) grow */
-			ploop_hole_clear_bit(dst_cluster, ploop);
+			if (dst_cluster < ploop->hb_nr &&
+			    test_bit(dst_cluster, ploop->holes_bitmap)) {
+				ploop_hole_clear_bit(dst_cluster, ploop);
+			}
 		}
+		kunmap_atomic(bat_entries);
 	}
-
-	/* Copy and swap bat_entries */
-	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
-	memcpy(cmd->resize.hdr, ploop->hdr, size);
-	swap(cmd->resize.hdr, ploop->hdr);
-	ploop->bat_entries = (void *)ploop->hdr + sizeof(*ploop->hdr);
-
-	/* Copy and swap bat_levels */
-	size = ploop->nr_bat_entries * sizeof(ploop->bat_levels[0]);
-	memcpy(cmd->resize.bat_levels, ploop->bat_levels, size);
-	swap(cmd->resize.bat_levels, ploop->bat_levels);
 	write_unlock_irq(&ploop->bat_rwlock);
 }
 
@@ -131,16 +127,25 @@  static unsigned int ploop_find_bat_entry(struct ploop *ploop,
 					 unsigned int dst_cluster,
 					 bool *is_locked)
 {
-	unsigned int i, cluster = UINT_MAX;
+	unsigned int i, end, *bat_entries, cluster = UINT_MAX;
+	struct rb_node *node;
+	struct md_page *md;
 
 	read_lock_irq(&ploop->bat_rwlock);
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (ploop->bat_entries[i] != dst_cluster)
-			continue;
-		if (cluster_is_in_top_delta(ploop, i)) {
-			cluster = i;
-			break;
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (bat_entries[i] != dst_cluster)
+				continue;
+			if (md_page_cluster_is_in_top_delta(md, i)) {
+				cluster = page_clu_idx_to_bat_clu(md->id, i);
+				break;
+			}
 		}
+		kunmap_atomic(bat_entries);
+		if (cluster != UINT_MAX)
+			break;
 	}
 	read_unlock_irq(&ploop->bat_rwlock);
 
@@ -283,8 +288,7 @@  static int ploop_grow_relocate_cluster(struct ploop *ploop,
 
 	/* Update local BAT copy */
 	write_lock_irq(&ploop->bat_rwlock);
-	ploop->bat_entries[cluster] = new_dst;
-	WARN_ON(!cluster_is_in_top_delta(ploop, cluster));
+	WARN_ON(!try_update_bat_entry(ploop, cluster, BAT_LEVEL_TOP, new_dst));
 	write_unlock_irq(&ploop->bat_rwlock);
 not_occupied:
 	/*
@@ -330,20 +334,22 @@  static int ploop_grow_update_header(struct ploop *ploop,
 
 	ploop_submit_index_wb_sync(ploop, piwb);
 	ret = blk_status_to_errno(piwb->bi_status);
-	if (ret)
-		goto out;
 
-	/* Update header local copy */
-	hdr = kmap_atomic(piwb->bat_page);
-	write_lock_irq(&ploop->bat_rwlock);
-	memcpy(ploop->hdr, hdr, sizeof(*hdr));
-	write_unlock_irq(&ploop->bat_rwlock);
-	kunmap_atomic(hdr);
-out:
 	ploop_reset_bat_update(piwb);
 	return ret;
 }
 
+static void ploop_add_md_pages(struct ploop *ploop, struct rb_root *from)
+{
+	struct rb_node *node;
+        struct md_page *md;
+
+        while ((node = from->rb_node) != NULL) {
+		md = rb_entry(node, struct md_page, node);
+		rb_erase(node, from);
+		md_page_insert(ploop, md);
+	}
+}
 /*
  * Here we relocate data clusters, which may intersect with BAT area
  * of disk after resize. For user they look as already written to disk,
@@ -363,7 +369,7 @@  static void process_resize_cmd(struct ploop *ploop, struct ploop_index_wb *piwb,
 	 *  Update memory arrays and hb_nr, but do not update nr_bat_entries.
 	 *  This is noop except first enter to this function.
 	 */
-	ploop_advance_bat_and_holes(ploop, cmd);
+	ploop_advance_holes_bitmap(ploop, cmd);
 
 	if (cmd->resize.dst_cluster <= cmd->resize.end_dst_cluster) {
 		ret = ploop_grow_relocate_cluster(ploop, piwb, cmd);
@@ -389,8 +395,10 @@  static void process_resize_cmd(struct ploop *ploop, struct ploop_index_wb *piwb,
 			dst_cluster--;
 		}
 		swap(ploop->hb_nr, cmd->resize.hb_nr);
-	} else
+	} else {
+		ploop_add_md_pages(ploop, &cmd->resize.md_pages_root);
 		swap(ploop->nr_bat_entries, cmd->resize.nr_bat_entries);
+	}
 	write_unlock_irq(&ploop->bat_rwlock);
 
 	cmd->retval = ret;
@@ -448,13 +456,40 @@  void free_bio_with_pages(struct ploop *ploop, struct bio *bio)
 	bio_put(bio);
 }
 
+static int prealloc_md_pages(struct rb_root *root, unsigned int nr_bat_entries,
+			     unsigned int new_nr_bat_entries)
+{
+	unsigned int i, nr_pages, new_nr_pages;
+	struct md_page *md;
+	void *addr;
+
+	new_nr_pages = bat_clu_to_page_nr(new_nr_bat_entries - 1) + 1;
+	nr_pages = bat_clu_to_page_nr(nr_bat_entries - 1) + 1;
+
+	for (i = nr_pages; i < new_nr_pages; i++) {
+		md = alloc_md_page(i); /* Any id is OK */
+		if (!md)
+			return -ENOMEM;
+		addr = kmap_atomic(md->page);
+		memset(addr, 0, PAGE_SIZE);
+		kunmap_atomic(addr);
+
+		/* No order */
+		rb_link_node(&md->node, NULL, &root->rb_node);
+		rb_insert_color(&md->node, root);
+	}
+
+	return 0;
+}
+
 /* @new_size is in sectors */
 static int ploop_resize(struct ploop *ploop, u64 new_size)
 {
 	unsigned int nr_bat_entries, nr_old_bat_clusters, nr_bat_clusters;
 	unsigned int hb_nr, size, cluster_log = ploop->cluster_log;
-	struct ploop_pvd_header *hdr = ploop->hdr;
-	struct ploop_cmd cmd = { {0} };
+	struct ploop_cmd cmd = { .resize.md_pages_root = RB_ROOT };
+	struct ploop_pvd_header *hdr;
+	struct md_page *md;
 	int ret = -ENOMEM;
 	u64 old_size;
 
@@ -462,7 +497,14 @@  static int ploop_resize(struct ploop *ploop, u64 new_size)
 		return -EBUSY;
 	if (ploop_is_ro(ploop))
 		return -EROFS;
+
+	md = md_page_find(ploop, 0);
+	if (WARN_ON(!md))
+		return -EIO;
+	hdr = kmap(md->page);
 	old_size = le64_to_cpu(hdr->m_SizeInSectors_v2);
+	kunmap(md->page);
+
 	if (old_size == new_size)
 		return 0;
 	if (old_size > new_size) {
@@ -478,18 +520,12 @@  static int ploop_resize(struct ploop *ploop, u64 new_size)
 
 	nr_bat_entries = (new_size >> cluster_log);
 
-	size = nr_bat_entries * sizeof(ploop->bat_levels[0]);
-	cmd.resize.bat_levels = kvzalloc(size, GFP_KERNEL);
-	if (!cmd.resize.bat_levels)
+	/* Memory for new md pages */
+	if (prealloc_md_pages(&cmd.resize.md_pages_root,
+			      ploop->nr_bat_entries, nr_bat_entries) < 0)
 		goto err;
 
 	size = (PLOOP_MAP_OFFSET + nr_bat_entries) * sizeof(map_index_t);
-
-	/* Memory for hdr + bat_entries */
-	cmd.resize.hdr = vzalloc(size);
-	if (!cmd.resize.hdr)
-		goto err;
-
 	nr_bat_clusters = DIV_ROUND_UP(size, 1 << (cluster_log + 9));
 	hb_nr = nr_bat_clusters + nr_bat_entries;
 	size = round_up(DIV_ROUND_UP(hb_nr, 8), sizeof(unsigned long));
@@ -530,9 +566,8 @@  static int ploop_resize(struct ploop *ploop, u64 new_size)
 err:
 	if (cmd.resize.bio)
 		free_bio_with_pages(ploop, cmd.resize.bio);
-	kvfree(cmd.resize.bat_levels);
 	kvfree(cmd.resize.holes_bitmap);
-	vfree(cmd.resize.hdr);
+	free_md_pages_tree(&cmd.resize.md_pages_root);
 	return ret;
 }
 
@@ -540,8 +575,9 @@  static int ploop_resize(struct ploop *ploop, u64 new_size)
 static void process_add_delta_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
 {
 	map_index_t *bat_entries, *delta_bat_entries;
-	unsigned int i, level, dst_cluster;
-	u8 *bat_levels;
+	unsigned int i, end, level, dst_cluster;
+	struct rb_node *node;
+	struct md_page *md;
 	bool is_raw;
 
 	if (unlikely(ploop->force_link_inflight_bios)) {
@@ -551,34 +587,43 @@  static void process_add_delta_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
 	}
 
 	level = ploop->nr_deltas;
-	bat_entries = ploop->bat_entries;
-	bat_levels = ploop->bat_levels;
-	delta_bat_entries = (map_index_t *)cmd->add_delta.hdr + PLOOP_MAP_OFFSET;
+	/* Points to hdr since md_page[0] also contains hdr. */
+	delta_bat_entries = (map_index_t *)cmd->add_delta.hdr;
 	is_raw = cmd->add_delta.deltas[level].is_raw;
 
 	write_lock_irq(&ploop->bat_rwlock);
 
 	/* FIXME: Stop on old delta's nr_bat_entries */
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (cluster_is_in_top_delta(ploop, i))
-			continue;
-		if (!is_raw)
-			dst_cluster = delta_bat_entries[i];
-		else
-			dst_cluster = i < cmd->add_delta.raw_clusters ? i : BAT_ENTRY_NONE;
-		if (dst_cluster == BAT_ENTRY_NONE)
-			continue;
-		/*
-		 * Prefer last added delta, since the order is:
-		 * 1)add top device
-		 * 2)add oldest delta
-		 * ...
-		 * n)add newest delta
-		 * Keep in mind, top device is current image, and
-		 * it is added first in contrary the "age" order.
-		 */
-		bat_levels[i] = level;
-		bat_entries[i] = dst_cluster;
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (md_page_cluster_is_in_top_delta(md, i))
+				continue;
+			if (!is_raw)
+				dst_cluster = delta_bat_entries[i];
+			else {
+				dst_cluster = page_clu_idx_to_bat_clu(md->id, i);
+				if (dst_cluster >= cmd->add_delta.raw_clusters)
+					dst_cluster = BAT_ENTRY_NONE;
+			}
+			if (dst_cluster == BAT_ENTRY_NONE)
+				continue;
+			/*
+			 * Prefer last added delta, since the order is:
+			 * 1)add top device
+			 * 2)add oldest delta
+			 * ...
+			 * n)add newest delta
+			 * Keep in mind, top device is current image, and
+			 * it is added first in contrary the "age" order.
+			 */
+			md->bat_levels[i] = level;
+			bat_entries[i] = dst_cluster;
+
+		}
+		kunmap_atomic(bat_entries);
+		delta_bat_entries += PAGE_SIZE / sizeof(map_index_t);
 	}
 
 	swap(ploop->deltas, cmd->add_delta.deltas);
@@ -686,8 +731,8 @@  static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop,
 /* Find mergeable cluster and return it in cmd->merge.cluster */
 static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
 {
-	unsigned int *cluster = &cmd->merge.cluster;
-	unsigned int level;
+	unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
+	u8 level;
 	bool skip;
 
 	BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT);
@@ -698,8 +743,9 @@  static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
 		 * We are in kwork, so bat_rwlock is not needed
 		 * (see comment in process_one_deferred_bio()).
 		 */
-		level = ploop->bat_levels[*cluster];
-		if (ploop->bat_entries[*cluster] == BAT_ENTRY_NONE ||
+		/* FIXME: Optimize this. ploop_bat_entries() is overkill */
+		dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
+		if (dst_cluster == BAT_ENTRY_NONE ||
 		    level != ploop->nr_deltas - 1)
 			continue;
 
@@ -724,8 +770,8 @@  static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
 static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
 					      struct ploop_cmd *cmd)
 {
-	unsigned int *cluster = &cmd->merge.cluster;
-	unsigned int level, dst_cluster;
+	unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
+	u8 level;
 	struct file *file;
 	int ret;
 
@@ -738,8 +784,8 @@  static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
 		 * (we can't race with changing BAT, since cmds
 		 *  are processed before bios and piwb is sync).
 		 */
-		dst_cluster = ploop->bat_entries[*cluster];
-		level = ploop->bat_levels[*cluster];
+		/* FIXME: Optimize this: ploop_bat_entries() is overkill */
+		dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
 
 		/* Check we can submit one more cow in parallel */
 		if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0))
@@ -818,11 +864,12 @@  static int ploop_merge_latest_snapshot(struct ploop *ploop)
 static void process_notify_delta_merged(struct ploop *ploop,
 					struct ploop_cmd *cmd)
 {
-	unsigned int i, *bat_entries, *delta_bat_entries;
+	unsigned int i, end, *bat_entries, *delta_bat_entries;
 	void *hdr = cmd->notify_delta_merged.hdr;
 	u8 level = cmd->notify_delta_merged.level;
+	struct rb_node *node;
+	struct md_page *md;
 	struct file *file;
-	u8 *bat_levels;
 	int ret;
 
 	force_defer_bio_count_inc(ploop);
@@ -832,32 +879,37 @@  static void process_notify_delta_merged(struct ploop *ploop,
 		goto out;
 	}
 
-	bat_entries = ploop->bat_entries;
-	bat_levels = ploop->bat_levels;
-	delta_bat_entries = (map_index_t *)hdr + PLOOP_MAP_OFFSET;
+	/* Points to hdr since md_page[0] also contains hdr. */
+	delta_bat_entries = (map_index_t *)hdr;
 
 	write_lock_irq(&ploop->bat_rwlock);
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (cluster_is_in_top_delta(ploop, i) ||
-		    delta_bat_entries[i] == BAT_ENTRY_NONE ||
-		    bat_levels[i] < level) {
-			continue;
-		}
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (md_page_cluster_is_in_top_delta(md, i) ||
+			    delta_bat_entries[i] == BAT_ENTRY_NONE ||
+			    md->bat_levels[i] < level)
+				continue;
+
+			/* deltas above @level become renumbered */
+			if (md->bat_levels[i] > level) {
+				md->bat_levels[i]--;
+				continue;
+			}
 
-		/* deltas above @level become renumbered */
-		if (bat_levels[i] > level) {
-			bat_levels[i]--;
-			continue;
+			/*
+			 * clusters from deltas of @level become pointing to
+			 * 1)next delta (which became renumbered) or
+			 * 2)prev delta (if !@forward).
+			 */
+			bat_entries[i] = delta_bat_entries[i];
+			WARN_ON(bat_entries[i] == BAT_ENTRY_NONE);
+			if (!cmd->notify_delta_merged.forward)
+				md->bat_levels[i]--;
 		}
-
-		/*
-		 * clusters from deltas of @level become pointing to next delta
-		 * (which became renumbered) or prev delta (if !@forward).
-		 */
-		bat_entries[i] = delta_bat_entries[i];
-		WARN_ON(bat_entries[i] == BAT_ENTRY_NONE);
-		if (!cmd->notify_delta_merged.forward)
-			bat_levels[i]--;
+		kunmap_atomic(bat_entries);
+		delta_bat_entries += PAGE_SIZE / sizeof(map_index_t);
 	}
 
 	file = ploop->deltas[level].file;
@@ -866,7 +918,6 @@  static void process_notify_delta_merged(struct ploop *ploop,
 		ploop->deltas[i - 1] = ploop->deltas[i];
 	ploop->deltas[--ploop->nr_deltas].file = NULL;
 	write_unlock_irq(&ploop->bat_rwlock);
-
 	fput(file);
 	cmd->retval = 0;
 out:
@@ -890,10 +941,10 @@  static void process_update_delta_index(struct ploop *ploop,
 	write_lock_irq(&ploop->bat_rwlock);
 	/* Check all */
 	while (sscanf(map, "%u:%u;%n", &cluster, &dst_cluster, &n) == 2) {
-		if (ploop->bat_entries[cluster] == BAT_ENTRY_NONE)
-			break;
 		if (cluster >= ploop->nr_bat_entries)
 			break;
+		if (ploop_bat_entries(ploop, cluster, NULL) == BAT_ENTRY_NONE)
+			break;
 		map += n;
 	}
 	if (map[0] != '\0') {
@@ -903,8 +954,7 @@  static void process_update_delta_index(struct ploop *ploop,
 	/* Commit all */
 	map = cmd->update_delta_index.map;
 	while (sscanf(map, "%u:%u;%n", &cluster, &dst_cluster, &n) == 2) {
-		if (ploop->bat_levels[cluster] == level)
-			ploop->bat_entries[cluster] = dst_cluster;
+		try_update_bat_entry(ploop, cluster, level, dst_cluster);
 		map += n;
 	}
 	ret = 0;
@@ -1031,7 +1081,9 @@  static int ploop_update_delta_index(struct ploop *ploop, unsigned int level,
 
 static void process_switch_top_delta(struct ploop *ploop, struct ploop_cmd *cmd)
 {
-	unsigned int i, size, bat_clusters, level = ploop->nr_deltas;
+	unsigned int i, end, size, bat_clusters, *bat_entries, level = ploop->nr_deltas;
+	struct rb_node *node;
+	struct md_page *md;
 	int ret;
 
 	force_defer_bio_count_inc(ploop);
@@ -1046,9 +1098,14 @@  static void process_switch_top_delta(struct ploop *ploop, struct ploop_cmd *cmd)
 	write_lock_irq(&ploop->bat_rwlock);
 	swap(ploop->origin_dev, cmd->switch_top_delta.origin_dev);
 	swap(ploop->deltas, cmd->switch_top_delta.deltas);
-	for (i = 0; i < ploop->nr_bat_entries; i++)
-		if (ploop->bat_levels[i] == BAT_LEVEL_TOP)
-			ploop->bat_levels[i] = level;
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (md->bat_levels[i] == BAT_LEVEL_TOP)
+				md->bat_levels[i] = level;
+		}
+	}
 
 	/* Header and BAT-occupied clusters at start of file */
 	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
@@ -1117,12 +1174,15 @@  static int ploop_switch_top_delta(struct ploop *ploop, int new_ro_fd,
 
 static void process_flip_upper_deltas(struct ploop *ploop, struct ploop_cmd *cmd)
 {
-	unsigned int i, size, bat_clusters, hb_nr = ploop->hb_nr;
+	unsigned int i, size, end, bat_clusters, hb_nr, *bat_entries;
 	void *holes_bitmap = ploop->holes_bitmap;
 	u8 level = ploop->nr_deltas - 1;
+	struct rb_node *node;
+	struct md_page *md;
 
 	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
         bat_clusters = DIV_ROUND_UP(size, 1 << (ploop->cluster_log + 9));
+	hb_nr = ploop->hb_nr;
 
 	write_lock_irq(&ploop->bat_rwlock);
 	/* Prepare holes_bitmap */
@@ -1133,16 +1193,22 @@  static void process_flip_upper_deltas(struct ploop *ploop, struct ploop_cmd *cmd
 		clear_bit(i, holes_bitmap);
 
 	/* Flip bat entries */
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (ploop->bat_entries[i] == BAT_ENTRY_NONE)
-			continue;
-		if (ploop->bat_levels[i] == level) {
-			ploop->bat_levels[i] = BAT_LEVEL_TOP;
-			clear_bit(ploop->bat_entries[i], holes_bitmap);
-		} else if (ploop->bat_levels[i] == BAT_LEVEL_TOP) {
-			ploop->bat_levels[i] = level;
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (bat_entries[i] == BAT_ENTRY_NONE)
+				continue;
+			if (md->bat_levels[i] == level) {
+				md->bat_levels[i] = BAT_LEVEL_TOP;
+				clear_bit(bat_entries[i], holes_bitmap);
+			} else if (md->bat_levels[i] == BAT_LEVEL_TOP) {
+				md->bat_levels[i] = level;
+			}
 		}
+		kunmap_atomic(bat_entries);
 	}
+
 	swap(ploop->origin_dev, cmd->flip_upper_deltas.origin_dev);
 	/* FIXME */
 	swap(ploop->deltas[level].file, cmd->flip_upper_deltas.file);
@@ -1156,10 +1222,14 @@  static void process_flip_upper_deltas(struct ploop *ploop, struct ploop_cmd *cmd
 
 static void process_tracking_start(struct ploop *ploop, struct ploop_cmd *cmd)
 {
-	unsigned int i, dst_cluster, tb_nr = cmd->tracking_start.tb_nr;
+	unsigned int i, nr_pages, end, *bat_entries, dst_cluster, tb_nr, nr;
 	void *tracking_bitmap = cmd->tracking_start.tracking_bitmap;
+	struct rb_node *node;
+	struct md_page *md;
 	int ret = 0;
 
+	tb_nr = cmd->tracking_start.tb_nr;
+
 	write_lock_irq(&ploop->bat_rwlock);
 	ploop->tracking_bitmap = tracking_bitmap;
 	ploop->tb_nr = tb_nr;
@@ -1176,18 +1246,30 @@  static void process_tracking_start(struct ploop *ploop, struct ploop_cmd *cmd)
 	write_lock_irq(&ploop->bat_rwlock);
 	for_each_clear_bit(i, ploop->holes_bitmap, ploop->hb_nr)
 		set_bit(i, tracking_bitmap);
-	for (i = 0; i < ploop->nr_bat_entries; i++) {
-		if (!cluster_is_in_top_delta(ploop, i))
-			continue;
-		dst_cluster = ploop->bat_entries[i];
-		if (WARN_ON(dst_cluster >= tb_nr)) {
-			ret = -EIO;
-			goto unlock;
+	nr_pages = bat_clu_to_page_nr(ploop->nr_bat_entries - 1) + 1;
+	nr = 0;
+
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			dst_cluster = bat_entries[i];
+			if (dst_cluster == BAT_ENTRY_NONE ||
+			    md->bat_levels[i] != BAT_LEVEL_TOP)
+				continue;
+			if (WARN_ON(dst_cluster >= tb_nr)) {
+				ret = -EIO;
+				break;
+			}
+			set_bit(dst_cluster, tracking_bitmap);
 		}
-		set_bit(dst_cluster, tracking_bitmap);
+		kunmap_atomic(bat_entries);
+		if (ret)
+			break;
+		nr++;
 	}
-unlock:
 	write_unlock_irq(&ploop->bat_rwlock);
+	BUG_ON(ret == 0 && nr != nr_pages);
 out:
 	cmd->retval = ret;
 	complete(&cmd->comp); /* Last touch of cmd memory */
@@ -1221,12 +1303,38 @@  static int tracking_get_next(struct ploop *ploop, char *result,
 	return ret;
 }
 
+static unsigned int max_dst_cluster_in_top_delta(struct ploop *ploop)
+{
+	unsigned int i, nr_pages, nr = 0, end, *bat_entries, dst_cluster = 0;
+	struct rb_node *node;
+	struct md_page *md;
+
+	nr_pages = bat_clu_to_page_nr(ploop->nr_bat_entries - 1) + 1;
+
+	read_lock_irq(&ploop->bat_rwlock);
+	ploop_for_each_md_page(ploop, md, node) {
+		init_bat_entries_iter(ploop, md->id, &i, &end);
+		bat_entries = kmap_atomic(md->page);
+		for (; i <= end; i++) {
+			if (dst_cluster < bat_entries[i] &&
+			    md->bat_levels[i] == BAT_LEVEL_TOP)
+				dst_cluster = bat_entries[i];
+		}
+		kunmap_atomic(bat_entries);
+		nr++;
+	}
+	read_unlock_irq(&ploop->bat_rwlock);
+
+	BUG_ON(nr != nr_pages);
+	return dst_cluster;
+}
+
 static int ploop_tracking_cmd(struct ploop *ploop, const char *suffix,
 			      char *result, unsigned int maxlen)
 {
 	struct ploop_cmd cmd = { {0} };
 	void *tracking_bitmap = NULL;
-	unsigned int i, tb_nr, size;
+	unsigned int tb_nr, size;
 	int ret = 0;
 
 	if (ploop_is_ro(ploop))
@@ -1243,17 +1351,14 @@  static int ploop_tracking_cmd(struct ploop *ploop, const char *suffix,
 			return -EEXIST;
 		if (ploop->maintaince)
 			return -EBUSY;
-		tb_nr = ploop->hb_nr;
-		read_lock_irq(&ploop->bat_rwlock);
-		for (i = 0; i < ploop->nr_bat_entries; i++)
-			if (cluster_is_in_top_delta(ploop, i) &&
-			    ploop->bat_entries[i] >= tb_nr)
-				tb_nr = ploop->bat_entries[i] + 1;
-		read_unlock_irq(&ploop->bat_rwlock);
+		/* max_dst_cluster_in_top_delta() may be above hb_nr */
+		tb_nr = max_dst_cluster_in_top_delta(ploop) + 1;
+		if (tb_nr < ploop->hb_nr)
+			tb_nr = ploop->hb_nr;
 		/*
-		 * After unlock new entries above tb_nr can't
-		 * occur, since we always alloc clusters from
-		 * holes_bitmap (and they nr < hb_nr).
+		 * After max_dst_cluster_in_top_delta() unlocks the lock,
+		 * new entries above tb_nr can't occur, since we always
+		 * alloc clusters from holes_bitmap (and they nr < hb_nr).
 		 */
 		size = DIV_ROUND_UP(tb_nr, 8 * sizeof(unsigned long));
 		size *= sizeof(unsigned long);
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 07659fdf88f5..cd88127ca2a9 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -90,14 +90,6 @@  static void ploop_init_end_io(struct ploop *ploop, struct bio *bio)
 	__ploop_init_end_io(ploop, h);
 }
 
-static unsigned int bat_clu_to_page_nr(unsigned int cluster)
-{
-	unsigned int byte;
-
-	byte = (cluster + PLOOP_MAP_OFFSET) * sizeof(map_index_t);
-	return byte >> PAGE_SHIFT;
-}
-
 /* Get cluster related to bio sectors */
 static int ploop_bio_cluster(struct ploop *ploop, struct bio *bio,
 			     unsigned int *ret_cluster)
@@ -526,22 +518,39 @@  static void complete_cow(struct ploop_cow *cow, blk_status_t bi_status)
 	kmem_cache_free(cow_cache, cow);
 }
 
+static void ploop_release_cluster(struct ploop *ploop,
+				  unsigned int cluster)
+{
+	unsigned int id, *bat_entries, dst_cluster;
+	struct md_page *md;
+
+	lockdep_assert_held(&ploop->bat_rwlock);
+
+	id = bat_clu_to_page_nr(cluster);
+        md = md_page_find(ploop, id);
+        BUG_ON(!md);
+
+	cluster = bat_clu_idx_in_page(cluster); /* relative to page */
+
+	bat_entries = kmap_atomic(md->page);
+	dst_cluster = bat_entries[cluster];
+	bat_entries[cluster] = BAT_ENTRY_NONE;
+	md->bat_levels[cluster] = 0;
+	kunmap_atomic(bat_entries);
+
+	ploop_hole_set_bit(dst_cluster, ploop);
+}
+
 static void piwb_discard_completed(struct ploop *ploop, bool success,
 		  unsigned int cluster, unsigned int new_dst_cluster)
 {
-	unsigned int dst_cluster;
-
 	if (new_dst_cluster)
 		return;
 
 	if (cluster_is_in_top_delta(ploop, cluster)) {
 		WARN_ON_ONCE(ploop->nr_deltas);
-		if (success) {
-			dst_cluster = ploop->bat_entries[cluster];
-			ploop->bat_entries[cluster] = BAT_ENTRY_NONE;
-			ploop->bat_levels[cluster] = 0;
-			ploop_hole_set_bit(dst_cluster, ploop);
-		}
+		if (success)
+			ploop_release_cluster(ploop, cluster);
 	}
 }
 
@@ -555,10 +564,14 @@  static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
 					     struct ploop_index_wb *piwb,
 					     bool success)
 {
+	struct md_page *md = md_page_find(ploop, piwb->page_nr);
+	unsigned int i, last, *bat_entries;
 	map_index_t *dst_cluster, off;
-	unsigned int i, last;
 	unsigned long flags;
 
+	BUG_ON(!md);
+	bat_entries = kmap_atomic(md->page);
+
 	/* Absolute number of first index in page (negative for page#0) */
 	off = piwb->page_nr * PAGE_SIZE / sizeof(map_index_t);
 	off -= PLOOP_MAP_OFFSET;
@@ -584,13 +597,13 @@  static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
 			continue;
 
 		if (cluster_is_in_top_delta(ploop, i + off) && piwb->type == PIWB_TYPE_ALLOC) {
-			WARN_ON(ploop->bat_entries[i + off] != dst_cluster[i]);
+			WARN_ON(bat_entries[i] != dst_cluster[i]);
 			continue;
 		}
 
 		if (success) {
-			ploop->bat_entries[i + off] = dst_cluster[i];
-			ploop->bat_levels[i + off] = BAT_LEVEL_TOP;
+			bat_entries[i] = dst_cluster[i];
+			md->bat_levels[i] = BAT_LEVEL_TOP;
 		} else {
 			/*
 			 * Despite set_bit() is atomic, we take read_lock()
@@ -604,6 +617,7 @@  static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
 
 	ploop_bat_unlock(ploop, success, flags);
 	kunmap_atomic(dst_cluster);
+	kunmap_atomic(bat_entries);
 }
 
 static void put_piwb(struct ploop_index_wb *piwb)
@@ -675,7 +689,8 @@  static void ploop_bat_write_complete(struct bio *bio)
 static int ploop_prepare_bat_update(struct ploop *ploop, unsigned int page_nr,
 				    struct ploop_index_wb *piwb)
 {
-	unsigned int i, off, last;
+	unsigned int i, off, last, *bat_entries;
+	struct md_page *md;
 	struct page *page;
 	struct bio *bio;
 	map_index_t *to;
@@ -691,9 +706,13 @@  static int ploop_prepare_bat_update(struct ploop *ploop, unsigned int page_nr,
 		return -ENOMEM;
 	}
 
+	md = md_page_find(ploop, page_nr);
+	BUG_ON(!md);
+	bat_entries = kmap_atomic(md->page);
+
 	piwb->page_nr = page_nr;
 	to = kmap_atomic(page);
-	memset((void *)to, 0, PAGE_SIZE);
+	memcpy((void *)to, bat_entries, PAGE_SIZE);
 
 	/* Absolute number of first index in page (negative for page#0) */
 	off = page_nr * PAGE_SIZE / sizeof(map_index_t);
@@ -704,19 +723,18 @@  static int ploop_prepare_bat_update(struct ploop *ploop, unsigned int page_nr,
 	if (last > PAGE_SIZE / sizeof(map_index_t))
 		last = PAGE_SIZE / sizeof(map_index_t);
 	i = 0;
-	if (!page_nr) {
+	if (!page_nr)
 		i = PLOOP_MAP_OFFSET;
-		memcpy(to, ploop->hdr, sizeof(*ploop->hdr));
-	}
 
 	/* Copy BAT (BAT goes right after hdr, see .ctr) */
 	for (; i < last; i++) {
-		if (!cluster_is_in_top_delta(ploop, i + off))
+		if (cluster_is_in_top_delta(ploop, i + off))
 			continue;
-		to[i] = ploop->bat_entries[i + off];
+		to[i] = 0;
 	}
 
 	kunmap_atomic(to);
+	kunmap_atomic(bat_entries);
 
 	sector = (page_nr * PAGE_SIZE) >> SECTOR_SHIFT;
 	bio->bi_iter.bi_sector = sector;
@@ -746,7 +764,7 @@  static void ploop_bat_page_zero_cluster(struct ploop *ploop,
 	map_index_t *to;
 
 	/* Cluster index related to the page[page_nr] start */
-	cluster -= piwb->page_nr * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
+	cluster = bat_clu_idx_in_page(cluster);
 
 	to = kmap_atomic(piwb->bat_page);
 	to[cluster] = 0;
@@ -1270,8 +1288,9 @@  static bool locate_new_cluster_and_attach_bio(struct ploop *ploop,
 static int process_one_deferred_bio(struct ploop *ploop, struct bio *bio,
 				    struct ploop_index_wb *piwb)
 {
-	unsigned int cluster, dst_cluster, level;
 	sector_t sector = bio->bi_iter.bi_sector;
+	unsigned int cluster, dst_cluster;
+	u8 level;
 	bool ret;
 
 	/*
@@ -1281,8 +1300,7 @@  static int process_one_deferred_bio(struct ploop *ploop, struct bio *bio,
 	 * and wait synchronously from *this* kwork.
 	 */
 	cluster = sector >> ploop->cluster_log;
-	dst_cluster = ploop->bat_entries[cluster];
-	level = ploop->bat_levels[cluster];
+	dst_cluster = ploop_bat_entries(ploop, cluster, &level);
 
 	if (postpone_if_cluster_locked(ploop, bio, cluster))
 		goto out;
@@ -1618,7 +1636,7 @@  int ploop_map(struct dm_target *ti, struct bio *bio)
 
 		/* map it */
 		read_lock_irqsave(&ploop->bat_rwlock, flags);
-		dst_cluster = ploop->bat_entries[cluster];
+		dst_cluster = ploop_bat_entries(ploop, cluster, NULL);
 		in_top_delta = cluster_is_in_top_delta(ploop, cluster);
 		if (unlikely(should_defer_bio(ploop, bio, cluster))) {
 			/* defer all bios */
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index d36906793b9f..986617dc1eb8 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -1,6 +1,7 @@ 
 // SPDX-License-Identifier: GPL-2.0-only
 #include "dm.h"
 #include <linux/buffer_head.h>
+#include <linux/rbtree.h>
 #include <linux/dm-io.h>
 #include <linux/dm-kcopyd.h>
 #include <linux/init.h>
@@ -29,6 +30,18 @@  static void inflight_bios_ref_exit1(struct percpu_ref *ref)
 	complete(&ploop->inflight_bios_ref_comp);
 }
 
+void free_md_pages_tree(struct rb_root *root)
+{
+	struct rb_node *node;
+	struct md_page *md;
+
+	while ((node = root->rb_node) != NULL) {
+		md = rb_entry(node, struct md_page, node);
+		rb_erase(node, root);
+		free_md_page(md);
+	}
+}
+
 /* This is called on final device destroy */
 static void ploop_flush_workqueue(struct ploop *ploop)
 {
@@ -71,10 +84,9 @@  static void ploop_destroy(struct ploop *ploop)
 	WARN_ON(!RB_EMPTY_ROOT(&ploop->exclusive_bios_rbtree));
 	WARN_ON(!RB_EMPTY_ROOT(&ploop->inflight_bios_rbtree));
 	kfree(ploop->deltas);
-	kvfree(ploop->bat_levels);
 	kvfree(ploop->holes_bitmap);
 	kvfree(ploop->tracking_bitmap);
-	vfree(ploop->hdr);
+	free_md_pages_tree(&ploop->bat_entries);
 	kfree(ploop);
 }
 
@@ -125,6 +137,7 @@  static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 	INIT_LIST_HEAD(&ploop->cluster_lk_list);
 	bio_list_init(&ploop->delta_cow_action_list);
 	atomic_set(&ploop->nr_discard_bios, 0);
+	ploop->bat_entries = RB_ROOT;
 
 	INIT_WORK(&ploop->worker, do_ploop_work);
 	init_completion(&ploop->inflight_bios_ref_comp);
@@ -183,7 +196,7 @@  static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 	ploop->inflight_bios_rbtree = RB_ROOT;
 	ret = -EINVAL;
 	for (i = 2; i < argc; i++) {
-                ret = ploop_add_delta(ploop, argv[i]);
+		ret = ploop_add_delta(ploop, argv[i]);
 		if (ret < 0)
 			goto err;
 	}
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 52b66440f066..5e0c88249dcd 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -55,8 +55,7 @@  struct ploop_cmd {
 		struct {
 			u64 new_size;
 			/* Preallocated data */
-			void *hdr; /* hdr and bat_entries consequentially */
-			void *bat_levels;
+			struct rb_root md_pages_root;
 			void *holes_bitmap;
 #define PLOOP_GROW_STAGE_INITIAL	0
 			unsigned int stage;
@@ -157,13 +156,19 @@  struct push_backup {
 	struct list_head pending;
 };
 
+/* Metadata page */
+struct md_page {
+	struct rb_node node;
+	unsigned int id; /* Number of this page starting from hdr */
+	struct page *page;
+	u8 *bat_levels;
+};
+
 struct ploop {
 	struct dm_target *ti;
 
 	struct dm_dev *origin_dev;
-	struct ploop_pvd_header *hdr;
-	unsigned int *bat_entries;
-	u8 *bat_levels;
+	struct rb_root bat_entries;
 	struct ploop_delta *deltas;
 	u8 nr_deltas;
 	unsigned int nr_bat_entries;
@@ -278,6 +283,13 @@  struct ploop_cow {
 extern struct kmem_cache *piocb_cache;
 extern struct kmem_cache *cow_cache;
 
+#define ploop_for_each_md_page(ploop, md, node)		\
+	for (node = rb_first(&ploop->bat_entries),	\
+	     md = rb_entry(node, struct md_page, node); \
+	     node != NULL;				\
+	     node = rb_next(node),			\
+	     md = rb_entry(node, struct md_page, node))
+
 static inline bool ploop_is_ro(struct ploop *ploop)
 {
 	return (dm_table_get_mode(ploop->ti->table) & FMODE_WRITE) == 0;
@@ -335,17 +347,108 @@  static inline unsigned int ploop_nr_bat_clusters(struct ploop *ploop,
 	return bat_clusters;
 }
 
+static inline unsigned int bat_clu_to_page_nr(unsigned int cluster)
+{
+	unsigned int byte;
+
+	byte = (cluster + PLOOP_MAP_OFFSET) * sizeof(map_index_t);
+	return byte >> PAGE_SHIFT;
+}
+
+static inline unsigned int bat_clu_idx_in_page(unsigned int cluster)
+{
+	return (cluster + PLOOP_MAP_OFFSET) % (PAGE_SIZE / sizeof(map_index_t));
+}
+
+static inline unsigned int page_clu_idx_to_bat_clu(unsigned int page_id,
+						   unsigned int cluster_rel)
+{
+	unsigned int off;
+	off = page_id * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
+	return off + cluster_rel;
+}
+
+extern struct md_page * md_page_find(struct ploop *ploop, unsigned int id);
+
+/*
+ * This should be called in very rare cases. Avoid this function
+ * in cycles by cluster, use ploop_for_each_md_page()-based
+ * iterations instead.
+ */
+static inline unsigned int ploop_bat_entries(struct ploop *ploop,
+					     unsigned int cluster,
+					     u8 *bat_level)
+{
+	unsigned int *bat_entries, dst_cluster, id;
+	struct md_page *md;
+
+	id = bat_clu_to_page_nr(cluster);
+	md = md_page_find(ploop, id);
+	BUG_ON(!md);
+
+	/* Cluster index related to the page[page_nr] start */
+	cluster = bat_clu_idx_in_page(cluster);
+
+	if (bat_level)
+		*bat_level = md->bat_levels[cluster];
+
+	bat_entries = kmap_atomic(md->page);
+	dst_cluster = bat_entries[cluster];
+	kunmap_atomic(bat_entries);
+	return dst_cluster;
+}
+
 static inline bool cluster_is_in_top_delta(struct ploop *ploop,
 					   unsigned int cluster)
 {
+	unsigned int dst_cluster;
+	u8 level;
+
 	if (WARN_ON(cluster >= ploop->nr_bat_entries))
 		return false;
-	if (ploop->bat_entries[cluster] == BAT_ENTRY_NONE ||
-	    ploop->bat_levels[cluster] < BAT_LEVEL_TOP)
+	dst_cluster = ploop_bat_entries(ploop, cluster, &level);
+
+	if (dst_cluster == BAT_ENTRY_NONE || level < BAT_LEVEL_TOP)
 		return false;
 	return true;
 }
 
+static inline bool md_page_cluster_is_in_top_delta(struct md_page *md,
+						   unsigned int cluster)
+{
+	unsigned int count, *bat_entries;
+	bool ret = true;
+
+	count = PAGE_SIZE / sizeof(map_index_t);
+	if ((cluster + 1) * sizeof(u8) > ksize(md->bat_levels) ||
+	    cluster >= count) {
+		WARN_ONCE(1, "cluster=%u count=%u\n", cluster, count);
+		return false;
+	}
+
+	bat_entries = kmap_atomic(md->page);
+	if (bat_entries[cluster] == BAT_ENTRY_NONE ||
+	    md->bat_levels[cluster] < BAT_LEVEL_TOP)
+		ret = false;
+	kunmap_atomic(bat_entries);
+	return ret;
+}
+
+static inline void init_bat_entries_iter(struct ploop *ploop, unsigned int page_id,
+					 unsigned int *start, unsigned int *end)
+{
+	unsigned int last_page = bat_clu_to_page_nr(ploop->nr_bat_entries - 1);
+	unsigned int count = PAGE_SIZE / sizeof(map_index_t);
+
+	*start = 0;
+	if (page_id == 0)
+		*start = PLOOP_MAP_OFFSET;
+
+	*end = count - 1;
+	if (page_id == last_page)
+		*end = ((ploop->nr_bat_entries + PLOOP_MAP_OFFSET) % count) - 1;
+}
+
 static inline void force_defer_bio_count_inc(struct ploop *ploop)
 {
 	unsigned long flags;
@@ -382,6 +485,13 @@  static inline struct dm_ploop_endio_hook *find_endio_hook(struct ploop *ploop,
 	return find_endio_hook_range(ploop, root, cluster, cluster);
 }
 
+extern struct md_page * alloc_md_page(unsigned int id);
+extern void md_page_insert(struct ploop *ploop, struct md_page *md);
+extern void free_md_page(struct md_page *md);
+extern void free_md_pages_tree(struct rb_root *root);
+extern bool try_update_bat_entry(struct ploop *ploop, unsigned int cluster,
+				 u8 level, unsigned int dst_cluster);
+
 extern int ploop_add_delta(struct ploop *ploop, const char *arg);
 extern void defer_bio(struct ploop *ploop, struct bio *bio);
 extern void defer_bio_list(struct ploop *ploop, struct bio_list *bio_list);