[RHEL7,COMMIT] fs/fuse kio_pcs: fix grow work execution with zero

Submitted by Konstantin Khorenko on July 4, 2018, 3:06 p.m.

Details

Message ID 201807041506.w64F6oEG008936@finist_ce7.work
State New
Series "fs/fuse kio_pcs: fix grow work execution with zero"
Headers show

Commit Message

Konstantin Khorenko July 4, 2018, 3:06 p.m.
The commit is pushed to "branch-rh7-3.10.0-862.3.2.vz7.61.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-862.3.2.vz7.61.12
------>
commit dd1f51ac2d6ff7ca469cea3ab73df2addd55ee97
Author: Pavel Butsykin <pbutsykin@virtuozzo.com>
Date:   Wed Jul 4 18:06:50 2018 +0300

    fs/fuse kio_pcs: fix grow work execution with zero
    
    The bug is that grow work can be added twice to queue with the same list
    of requests. First execution of fuse_size_grow_work() should clear
    size.grow_queue, and the second execution will be with the empty list.
    As a result, submit_size_grow() will be executed with szie == 0.
    
    How it's possible:
    cpu0                                            cpu1
    fuse_size_grow_work()
      spin_lock(&di->lock)
      ...
      di->size.required = size; <-- 1
      spin_unlock(&di->lock)
      submit_size_grow(inode, size == 1)
      ...
                                                    spin_lock(&di->lock)
                                                    ...
                                                    wait_grow()
                                                      list_add_tail(req with size = 2, &di->size.grow.queue);
                                                      if (!di->size.required) /* required == 1 */
                                                        queue_work(pcs_wq, &di->size.work);
                                                    spin_unlock(&di->lock)
                                                    ...
      ...
      spin_lock(&di->lock)
      ...
      di->size.required = 0;
      if (!list_empty(&di->size.grow_queue))
            queue_work(pcs_wq, &di->size.work); <-- queue for req with size = 2
      spin_unlock(&di->lock);
      ...
                                                    spin_lock(&di->lock)
                                                    ...
    fuse_size_grow_work()
      /* wait for di->lock */                       wait_grow()
                                                    list_add_tail(req with size = 3, &di->size.grow.queue);
                                                    if (!di->size.required) /* required == 0 */
                                                      queue_work(pcs_wq, &di->size.work); <-- queue for req with size = 2, 3
                                                    spin_unlock(&di->lock)
      spin_lock(&di->lock)
      ...
      di->size.required = size; <-- 3
      spin_unlock(&di->lock)
      submit_size_grow(inode, size == 3)
      ...
      spin_lock(&di->lock)
      ...
      di->size.required = 0;
      if (!list_empty(&di->size.grow_queue) /* grow_queue is empty now */
        queue_work(pcs_wq, &di->size.work);
      spin_unlock(&di->lock);
      ...
    
    fuse_size_grow_work()
      spin_lock(&di->lock)
      /* grow_queue is still empty */
      ...
      di->size.required = size; <-- 0
      spin_unlock(&di->lock)
      submit_size_grow(inode, size == 0) --> shrink
    
    This patch simplifies interaction with pcs_dentry_info.size struct and fixes
    the issue with execution of the grow work when the queue is empty.
    
    It's not necessary to have two queues for grow and shrink operations, when these
    operations can't be executed in parallel. Therefore, such a queue will be one,
    and instead of size.shrink will be size.op to distinguish the operations among
    themselves.
    
    https://jira.sw.ru/browse/PSBM-85945
    
    Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
    Acked-by: Alexey Kuznetsov <kuznet@virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_client_types.h |  12 +++--
 fs/fuse/kio/pcs/pcs_cluster.h      |   7 +--
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 106 +++++++++++++++++--------------------
 3 files changed, 59 insertions(+), 66 deletions(-)

Patch hide | download patch | download mbox

diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
index a060ed58f87d..089817b4af9c 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -46,6 +46,13 @@  struct pcs_mapping {
 	struct pcs_flow_table	ftab;
 };
 
+
+typedef enum {
+	PCS_SIZE_INACTION,
+	PCS_SIZE_GROW,
+	PCS_SIZE_SHRINK,
+} size_op_t;
+
 struct fuse_inode;
 struct pcs_dentry_info {
 	struct pcs_dentry_id	id;
@@ -56,10 +63,9 @@  struct pcs_dentry_info {
 	spinlock_t		lock;
 	struct {
 		struct work_struct	work;
-		unsigned long long	shrink;
+		struct list_head	queue;
 		unsigned long long	required;
-		struct list_head	grow_queue;
-		struct list_head	shrink_queue;
+		size_op_t op;
 	} size;
 	struct fuse_inode	*inode;
 };
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 0f13345f919f..191753ba316d 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -8,12 +8,7 @@  struct fuse_conn;
 /* Try to follows pcs/client/fused structure style */
 struct pcs_fuse_exec_ctx {
 	struct pcs_int_request	ireq;
-	/* The file size control block */
-	struct {
-		unsigned long long	required;
-		unsigned char		granted;
-		unsigned char		waiting;
-	} size;
+	unsigned long long	size_required;
 	struct {
 		pcs_api_iorequest_t	req;
 		struct bio_vec		*bvec;
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 60401252a3b0..c9a291391405 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -283,8 +283,9 @@  static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct ino
 	/* di.id.name.len   = id->name.len; */
 
 	spin_lock_init(&di->lock);
-	INIT_LIST_HEAD(&di->size.grow_queue);
-	INIT_LIST_HEAD(&di->size.shrink_queue);
+	INIT_LIST_HEAD(&di->size.queue);
+	di->size.required = 0;
+	di->size.op = PCS_SIZE_INACTION;
 	INIT_WORK(&di->size.work, fuse_size_grow_work);
 
 	pcs_mapping_init(&pfc->cc, &di->mapping);
@@ -341,8 +342,7 @@  void kpcs_inode_release(struct fuse_inode *fi)
 	if(!di)
 		return;
 
-	BUG_ON(!list_empty(&di->size.grow_queue));
-	BUG_ON(!list_empty(&di->size.shrink_queue));
+	BUG_ON(!list_empty(&di->size.queue));
 	pcs_mapping_invalidate(&di->mapping);
 	pcs_mapping_deinit(&di->mapping);
 	/* TODO: properly destroy dentry info here!! */
@@ -658,92 +658,88 @@  static void fuse_size_grow_work(struct work_struct *w)
 	struct pcs_dentry_info* di = container_of(w, struct pcs_dentry_info, size.work);
 	struct inode *inode = &di->inode->inode;
 	struct pcs_int_request* ireq, *next;
-	unsigned long long size = 0;
+	unsigned long long size;
 	int err;
-	LIST_HEAD(to_submit);
+	LIST_HEAD(pending_reqs);
 
 	spin_lock(&di->lock);
-	BUG_ON(di->size.shrink);
-	if (di->size.required) {
+	BUG_ON(di->size.op != PCS_SIZE_INACTION);
+
+	size = di->size.required;
+	if (!size) {
+		BUG_ON(!list_empty(&di->size.queue));
 		spin_unlock(&di->lock);
+		TRACE("No more pending writes\n");
 		return;
 	}
-	list_for_each_entry(ireq, &di->size.grow_queue, list) {
-		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
-
-		TRACE("ino:%ld r(%p)->size:%lld	 required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+	BUG_ON(di->fileinfo.attr.size >= size);
 
-		BUG_ON(!r->exec.size.required);
-		if (size < r->exec.size.required)
-			size = r->exec.size.required;
-	}
-	di->size.required = size;
+	list_splice_tail_init(&di->size.queue, &pending_reqs);
+	di->size.op = PCS_SIZE_GROW;
 	spin_unlock(&di->lock);
 
 	err = submit_size_grow(inode, size);
 	if (err) {
-		LIST_HEAD(to_fail);
-
 		spin_lock(&di->lock);
+		di->size.op = PCS_SIZE_INACTION;
+		list_splice_tail_init(&di->size.queue, &pending_reqs);
 		di->size.required = 0;
-		list_splice_tail_init(&di->size.grow_queue, &to_fail);
 		spin_unlock(&di->lock);
 
-		pcs_ireq_queue_fail(&to_fail, err);
+		pcs_ireq_queue_fail(&pending_reqs, err);
 		return;
 	}
 
 	spin_lock(&di->lock);
-	BUG_ON(di->size.shrink);
-	BUG_ON(di->size.required != size);
-	list_for_each_entry_safe(ireq, next, &di->size.grow_queue, list) {
-		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+	BUG_ON(di->size.required < size);
+	di->size.op = PCS_SIZE_INACTION;
 
-		BUG_ON(!r->exec.size.required);
-		BUG_ON(!r->exec.size.waiting);
-		BUG_ON(r->exec.size.granted);
-		if (size >= r->exec.size.required) {
-			TRACE("resubmit ino:%ld r(%p)->size:%lld  required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+	list_for_each_entry_safe(ireq, next, &di->size.queue, list) {
+		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
 
-			r->exec.size.waiting = 0;
-			r->exec.size.granted = 1;
-			list_move(&ireq->list, &to_submit);
+		BUG_ON(!r->exec.size_required);
+		if (size >= r->exec.size_required) {
+			TRACE("resubmit ino:%ld r(%p)->size:%lld required:%lld\n",
+				inode->i_ino, r, r->exec.size_required, size);
+			list_move(&ireq->list, &pending_reqs);
 		}
 	}
-	di->size.required = 0;
-	if (!list_empty(&di->size.grow_queue))
-		queue_work(pcs_wq, &di->size.work);
+
+	if (list_empty(&di->size.queue))
+		di->size.required = 0;
 	spin_unlock(&di->lock);
 
-	pcs_cc_requeue(di->cluster, &to_submit);
+	pcs_cc_requeue(di->cluster, &pending_reqs);
 }
 
 static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsigned long long required)
 {
 	assert_spin_locked(&di->lock);
-	BUG_ON(r->exec.size.waiting);
+	BUG_ON(r->exec.size_required);
 	BUG_ON(r->req.in.h.opcode != FUSE_WRITE && r->req.in.h.opcode != FUSE_FALLOCATE);
+	BUG_ON(di->size.op != PCS_SIZE_INACTION && di->size.op != PCS_SIZE_GROW);
 
 	TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
 	      di->size.required, r, required);
-	r->exec.size.required = required;
-	r->exec.size.waiting = 1;
-	list_add_tail(&r->exec.ireq.list, &di->size.grow_queue);
+	r->exec.size_required = required;
 
-	if (!di->size.required)
+	if (list_empty(&di->size.queue))
 		queue_work(pcs_wq, &di->size.work);
+
+	list_add_tail(&r->exec.ireq.list, &di->size.queue);
+
+	di->size.required = max(di->size.required, required);
 }
 
 static void wait_shrink(struct pcs_fuse_req *r, struct pcs_dentry_info *di)
 {
 	assert_spin_locked(&di->lock);
-	BUG_ON(r->exec.size.waiting);
+	BUG_ON(r->exec.size_required);
 	/* Writes already blocked via fuse_set_nowrite */
 	BUG_ON(r->req.in.h.opcode != FUSE_READ);
 
 	TRACE("insert ino:%ld r:%p\n", r->req.io_inode->i_ino, r);
-	r->exec.size.waiting = 1;
-	list_add_tail(&r->exec.ireq.list, &di->size.shrink_queue);
+	list_add_tail(&r->exec.ireq.list, &di->size.queue);
 }
 
 /*
@@ -761,7 +757,7 @@  static int pcs_fuse_prep_rw(struct pcs_fuse_req *r)
 
 	spin_lock(&di->lock);
 	/* Deffer all requests if shrink requested to prevent livelock */
-	if (di->size.shrink) {
+	if (di->size.op == PCS_SIZE_SHRINK) {
 		wait_shrink(r, di);
 		spin_unlock(&di->lock);
 		return 1;
@@ -968,25 +964,22 @@  static void _pcs_shrink_end(struct fuse_conn *fc, struct fuse_req *req)
 	kpcs_setattr_end(fc, req);
 
 	spin_lock(&di->lock);
-	BUG_ON(!di->size.shrink);
+	BUG_ON(di->size.op != PCS_SIZE_SHRINK);
 	BUG_ON(di->size.required);
-	BUG_ON(!list_empty(&di->size.grow_queue));
 
-	list_splice_init(&di->size.shrink_queue, &dispose);
-	di->size.shrink = 0;
+	list_splice_init(&di->size.queue, &dispose);
+	di->size.op = PCS_SIZE_INACTION;
 	spin_unlock(&di->lock);
 
 	while (!list_empty(&dispose)) {
 		struct pcs_int_request* ireq = list_first_entry(&dispose, struct pcs_int_request, list);
 		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
 
-		BUG_ON(!r->exec.size.waiting);
-		BUG_ON(r->exec.size.granted);
+		BUG_ON(r->exec.size_required);
 		BUG_ON(r->req.in.h.opcode != FUSE_READ);
 
 		TRACE("resubmit %p\n", &r->req);
 		list_del_init(&ireq->list);
-		r->exec.size.waiting = 0;
 		pcs_fuse_submit(pfc, &r->req, 1);
 	}
 }
@@ -1030,7 +1023,6 @@  static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bo
 		struct pcs_fuse_req *r = pcs_req_from_fuse(req);
 		struct fuse_setattr_in *inarg = (void*) req->in.args[0].value;
 		struct pcs_dentry_info *di;
-		int shrink = 0;
 
 		if (!(inarg->valid & FATTR_SIZE))
 			return 1;
@@ -1038,12 +1030,12 @@  static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bo
 		di = pcs_inode_from_fuse(fi);
 		spin_lock(&di->lock);
 		if (inarg->size < di->fileinfo.attr.size) {
-			BUG_ON(di->size.shrink);
-			di->size.shrink = shrink = 1;
+			BUG_ON(di->size.op != PCS_SIZE_INACTION);
+			di->size.op = PCS_SIZE_SHRINK;
 		}
 		spin_unlock(&di->lock);
 		r->end = req->end;
-		if (shrink) {
+		if (di->size.op == PCS_SIZE_SHRINK) {
 			BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
 			/* wait for aio reads in flight */
 			inode_dio_wait(req->io_inode);