fuse kio: Add comment about why we need to wait pending read requests

Submitted by Kirill Tkhai on Oct. 23, 2018, 10:23 a.m.

Details

Message ID adcf5f07-7b08-291c-e816-16d7b057e683@virtuozzo.com
State New
Series "fuse kio: Add comment about why we need to wait pending read requests"
Headers show

Commit Message

Kirill Tkhai Oct. 23, 2018, 10:23 a.m.
On 23.10.2018 13:19, Kirill Tkhai wrote:
> On 23.10.2018 13:15, Pavel Butsykin wrote:
>>
>>
>> On 23.10.2018 13:01, Kirill Tkhai wrote:
>>> Signed-off-by: Kirill Tkhai <ktkhai@virtuozzo.com>
>>> ---
>>>   fs/fuse/kio/pcs/pcs_fuse_kdirect.c |    7 ++++++-
>>>   1 file changed, 6 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>> index 3940d6c255ba..9b45fcbf8941 100644
>>> --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>> +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>> @@ -1081,7 +1081,12 @@ static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
>>>   	r->end = req->end;
>>>   	if (di->size.op == PCS_SIZE_SHRINK) {
>>>   		BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
>>> -		/* wait for aio reads in flight */
>>> +		/*
>>> +		 * Wait for already submitted aio reads. Further reads
>>> +		 * (including already queued to bg_queue) will be stopped
>>> +		 * by wait_shrink(), and they will be processed from
>>> +		 * _pcs_shrink_end().
>>> +		 */
>>
>> Why do we have to wait already submitted aio reads ?
> 
> Because there could be grow requests. This place is broken, I'm fixing that at the moment.
> One-dimensional size.op is not enough to drive race between grow and shrink, we need to
> convert PCS_SIZE_* into bit fields like it used to be before.

We need something like the below (WIP):

Patch hide | download patch | download mbox

diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
index 9ddce5cff3f5..3ab19348c0f9 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -47,11 +47,9 @@  struct pcs_mapping {
 };
 
 
-typedef enum {
-	PCS_SIZE_INACTION,
-	PCS_SIZE_GROW,
-	PCS_SIZE_SHRINK,
-} size_op_t;
+#define PCS_SIZE_INACTION 	0
+#define PCS_SIZE_GROW		(1 << 0)
+#define PCS_SIZE_SHRINK		(1 << 1)
 
 struct fuse_inode;
 struct pcs_dentry_info {
@@ -65,7 +63,7 @@  struct pcs_dentry_info {
 		struct work_struct	work;
 		struct list_head	queue;
 		unsigned long long	required;
-		size_op_t op;
+		u16 op;
 	} size;
 	struct fuse_inode	*inode;
 };
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 9b45fcbf8941..46ca1a892dcb 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -726,7 +726,7 @@  static void fuse_size_grow_work(struct work_struct *w)
 	LIST_HEAD(pending_reqs);
 
 	spin_lock(&di->lock);
-	BUG_ON(di->size.op != PCS_SIZE_INACTION);
+	BUG_ON(di->size.op & PCS_SIZE_GROW);
 
 	old_size = DENTRY_SIZE(di);
 	size = di->size.required;
@@ -739,13 +739,13 @@  static void fuse_size_grow_work(struct work_struct *w)
 	BUG_ON(old_size >= size);
 
 	list_splice_tail_init(&di->size.queue, &pending_reqs);
-	di->size.op = PCS_SIZE_GROW;
+	di->size.op |= PCS_SIZE_GROW;
 	spin_unlock(&di->lock);
 
 	err = submit_size_grow(inode, size);
 	if (err) {
 		spin_lock(&di->lock);
-		di->size.op = PCS_SIZE_INACTION;
+		di->size.op &= ~PCS_SIZE_GROW;
 		list_splice_tail_init(&di->size.queue, &pending_reqs);
 		di->size.required = 0;
 		spin_unlock(&di->lock);
@@ -756,7 +756,7 @@  static void fuse_size_grow_work(struct work_struct *w)
 
 	spin_lock(&di->lock);
 	BUG_ON(di->size.required < size);
-	di->size.op = PCS_SIZE_INACTION;
+	di->size.op &= PCS_SIZE_GROW;
 
 	list_for_each_entry_safe(ireq, next, &di->size.queue, list) {
 		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
@@ -792,7 +792,7 @@  static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsign
 	assert_spin_locked(&di->lock);
 	BUG_ON(r->exec.size_required);
 	BUG_ON(r->req.in.h.opcode != FUSE_WRITE && r->req.in.h.opcode != FUSE_FALLOCATE);
-	BUG_ON(di->size.op != PCS_SIZE_INACTION && di->size.op != PCS_SIZE_GROW);
+	BUG_ON(di->size.op != PCS_SIZE_INACTION && !(di->size.op & PCS_SIZE_GROW));
 
 	TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
 	      di->size.required, r, required);
@@ -1073,13 +1073,13 @@  static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
 	di = pcs_inode_from_fuse(fi);
 	spin_lock(&di->lock);
 	if (inarg->size < di->fileinfo.attr.size) {
-		BUG_ON(di->size.op != PCS_SIZE_INACTION);
-		di->size.op = PCS_SIZE_SHRINK;
+		BUG_ON(di->size.op & PCS_SIZE_SHRINK);
+		di->size.op |= PCS_SIZE_SHRINK;
 	}
 	spin_unlock(&di->lock);
 
 	r->end = req->end;
-	if (di->size.op == PCS_SIZE_SHRINK) {
+	if (di->size.op & PCS_SIZE_SHRINK) {
 		BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
 		/*
 		 * Wait for already submitted aio reads. Further reads
@@ -1088,6 +1088,7 @@  static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
 		 * _pcs_shrink_end().
 		 */
 		inode_dio_wait(req->io_inode);
+		BUG_ON(di->size.op != PCS_SIZE_SHRINK);
 		/*
 		 * Writebackcache was flushed already so it is safe to
 		 * drop pcs_mapping

Comments

Kirill Tkhai Oct. 23, 2018, 10:24 a.m.
On 23.10.2018 13:23, Kirill Tkhai wrote:
> On 23.10.2018 13:19, Kirill Tkhai wrote:
>> On 23.10.2018 13:15, Pavel Butsykin wrote:
>>>
>>>
>>> On 23.10.2018 13:01, Kirill Tkhai wrote:
>>>> Signed-off-by: Kirill Tkhai <ktkhai@virtuozzo.com>
>>>> ---
>>>>   fs/fuse/kio/pcs/pcs_fuse_kdirect.c |    7 ++++++-
>>>>   1 file changed, 6 insertions(+), 1 deletion(-)
>>>>
>>>> diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>>> index 3940d6c255ba..9b45fcbf8941 100644
>>>> --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>>> +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
>>>> @@ -1081,7 +1081,12 @@ static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
>>>>   	r->end = req->end;
>>>>   	if (di->size.op == PCS_SIZE_SHRINK) {
>>>>   		BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
>>>> -		/* wait for aio reads in flight */
>>>> +		/*
>>>> +		 * Wait for already submitted aio reads. Further reads
>>>> +		 * (including already queued to bg_queue) will be stopped
>>>> +		 * by wait_shrink(), and they will be processed from
>>>> +		 * _pcs_shrink_end().
>>>> +		 */
>>>
>>> Why do we have to wait already submitted aio reads ?
>>
>> Because there could be grow requests. This place is broken, I'm fixing that at the moment.
>> One-dimensional size.op is not enough to drive race between grow and shrink, we need to
>> convert PCS_SIZE_* into bit fields like it used to be before.
> 
> We need something like the below (WIP):
> 
> diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
> index 9ddce5cff3f5..3ab19348c0f9 100644
> --- a/fs/fuse/kio/pcs/pcs_client_types.h
> +++ b/fs/fuse/kio/pcs/pcs_client_types.h
> @@ -47,11 +47,9 @@ struct pcs_mapping {
>  };
>  
>  
> -typedef enum {
> -	PCS_SIZE_INACTION,
> -	PCS_SIZE_GROW,
> -	PCS_SIZE_SHRINK,
> -} size_op_t;
> +#define PCS_SIZE_INACTION 	0
> +#define PCS_SIZE_GROW		(1 << 0)
> +#define PCS_SIZE_SHRINK		(1 << 1)
>  
>  struct fuse_inode;
>  struct pcs_dentry_info {
> @@ -65,7 +63,7 @@ struct pcs_dentry_info {
>  		struct work_struct	work;
>  		struct list_head	queue;
>  		unsigned long long	required;
> -		size_op_t op;
> +		u16 op;
>  	} size;
>  	struct fuse_inode	*inode;
>  };
> diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> index 9b45fcbf8941..46ca1a892dcb 100644
> --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> @@ -726,7 +726,7 @@ static void fuse_size_grow_work(struct work_struct *w)
>  	LIST_HEAD(pending_reqs);
>  
>  	spin_lock(&di->lock);
> -	BUG_ON(di->size.op != PCS_SIZE_INACTION);
> +	BUG_ON(di->size.op & PCS_SIZE_GROW);
>  
>  	old_size = DENTRY_SIZE(di);
>  	size = di->size.required;
> @@ -739,13 +739,13 @@ static void fuse_size_grow_work(struct work_struct *w)
>  	BUG_ON(old_size >= size);
>  
>  	list_splice_tail_init(&di->size.queue, &pending_reqs);
> -	di->size.op = PCS_SIZE_GROW;
> +	di->size.op |= PCS_SIZE_GROW;
>  	spin_unlock(&di->lock);
>  
>  	err = submit_size_grow(inode, size);
>  	if (err) {
>  		spin_lock(&di->lock);
> -		di->size.op = PCS_SIZE_INACTION;
> +		di->size.op &= ~PCS_SIZE_GROW;
>  		list_splice_tail_init(&di->size.queue, &pending_reqs);
>  		di->size.required = 0;
>  		spin_unlock(&di->lock);
> @@ -756,7 +756,7 @@ static void fuse_size_grow_work(struct work_struct *w)
>  
>  	spin_lock(&di->lock);
>  	BUG_ON(di->size.required < size);
> -	di->size.op = PCS_SIZE_INACTION;
> +	di->size.op &= PCS_SIZE_GROW;

Here must be di->size.op &= ~PCS_SIZE_GROW;

>  
>  	list_for_each_entry_safe(ireq, next, &di->size.queue, list) {
>  		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
> @@ -792,7 +792,7 @@ static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsign
>  	assert_spin_locked(&di->lock);
>  	BUG_ON(r->exec.size_required);
>  	BUG_ON(r->req.in.h.opcode != FUSE_WRITE && r->req.in.h.opcode != FUSE_FALLOCATE);
> -	BUG_ON(di->size.op != PCS_SIZE_INACTION && di->size.op != PCS_SIZE_GROW);
> +	BUG_ON(di->size.op != PCS_SIZE_INACTION && !(di->size.op & PCS_SIZE_GROW));
>  
>  	TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
>  	      di->size.required, r, required);
> @@ -1073,13 +1073,13 @@ static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
>  	di = pcs_inode_from_fuse(fi);
>  	spin_lock(&di->lock);
>  	if (inarg->size < di->fileinfo.attr.size) {
> -		BUG_ON(di->size.op != PCS_SIZE_INACTION);
> -		di->size.op = PCS_SIZE_SHRINK;
> +		BUG_ON(di->size.op & PCS_SIZE_SHRINK);
> +		di->size.op |= PCS_SIZE_SHRINK;
>  	}
>  	spin_unlock(&di->lock);
>  
>  	r->end = req->end;
> -	if (di->size.op == PCS_SIZE_SHRINK) {
> +	if (di->size.op & PCS_SIZE_SHRINK) {
>  		BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
>  		/*
>  		 * Wait for already submitted aio reads. Further reads
> @@ -1088,6 +1088,7 @@ static void pcs_kio_setattr_handle(struct fuse_inode *fi, struct fuse_req *req)
>  		 * _pcs_shrink_end().
>  		 */
>  		inode_dio_wait(req->io_inode);
> +		BUG_ON(di->size.op != PCS_SIZE_SHRINK);
>  		/*
>  		 * Writebackcache was flushed already so it is safe to
>  		 * drop pcs_mapping
>