[Devel,vz7,42/46] fuse: separate pqueue for clones

Submitted by Maxim Patlasov on March 25, 2017, 2:29 a.m.

Details

Message ID 149040896620.25341.9936439884755890987.stgit@maxim-thinkpad
State New
Series "fuse: add multi-threading support"
Headers show

Commit Message

Maxim Patlasov March 25, 2017, 2:29 a.m.
Backport from ml:

commit c3696046beb3a4479715b48f67f6a8a3aef4b3bb
Author: Miklos Szeredi <mszeredi@suse.cz>
Date:   Wed Jul 1 16:26:09 2015 +0200

    fuse: separate pqueue for clones

    Make each fuse device clone refer to a separate processing queue.  The only
    constraint on userspace code is that the request answer must be written to
    the same device clone as it was read off.

    Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>

Signed-off-by: Maxim Patlasov <mpatlasov@virtuozzo.com>
---
 fs/fuse/control.c |   12 ++++++++--
 fs/fuse/dev.c     |   63 +++++++++++++++++++++++++++++++----------------------
 fs/fuse/fuse_i.h  |    9 +++++---
 fs/fuse/inode.c   |   13 ++++++++---
 4 files changed, 62 insertions(+), 35 deletions(-)

Patch hide | download patch | download mbox

diff --git a/fs/fuse/control.c b/fs/fuse/control.c
index 10201ac..c35a69b 100644
--- a/fs/fuse/control.c
+++ b/fs/fuse/control.c
@@ -280,15 +280,17 @@  static int fuse_conn_seq_open(struct file *filp, int list_id)
 
 	fcp->conn = conn;
 	switch (list_id) {
-	case FUSE_PROCESSING_REQ:
-		fcp->req_list = &conn->pq.processing;
-		break;
 	case FUSE_PENDING_REQ:
 		fcp->req_list = &conn->iq.pending;
 		break;
+#if 0
+	case FUSE_PROCESSING_REQ:
+		fcp->req_list = &conn->pq.processing;
+		break;
 	case FUSE_IO_REQ:
 		fcp->req_list = &conn->pq.io;
 		break;
+#endif
 	default:
 		BUG();
 	}
@@ -318,6 +320,7 @@  static const struct file_operations fuse_conn_pending_req = {
 	.release = fuse_conn_release,
 };
 
+#if 0
 static int fuse_conn_processing_open(struct inode *inode, struct file *filp)
 {
 	return fuse_conn_seq_open(filp, FUSE_PROCESSING_REQ);
@@ -341,6 +344,7 @@  static const struct file_operations fuse_conn_io_req = {
 	.llseek = seq_lseek,
 	.release = fuse_conn_release,
 };
+#endif
 
 static int fuse_files_show(struct seq_file *f, void *v)
 {
@@ -504,12 +508,14 @@  int fuse_ctl_add_conn(struct fuse_conn *fc)
 	    !fuse_ctl_add_dentry(parent, fc, "pending_req",
 		    		S_IFREG | 0600, 1, NULL,
 				&fuse_conn_pending_req) ||
+#if 0
 	    !fuse_ctl_add_dentry(parent, fc, "processing_req",
 		    		S_IFREG | 0600, 1, NULL,
 				&fuse_conn_processing_req) ||
 	    !fuse_ctl_add_dentry(parent, fc, "io_req",
 		    		S_IFREG | 0600, 1, NULL,
 				&fuse_conn_io_req) ||
+#endif
 	    !fuse_ctl_add_dentry(parent, fc, "open_files",
 		    		S_IFREG | 0600, 1, NULL,
 				&fuse_conn_files_ops) ||
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 71d2841..d8360e6 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1198,12 +1198,13 @@  __releases(fiq->waitq.lock)
  * request_end().  Otherwise add it to the processing list, and set
  * the 'sent' flag.
  */
-static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
 				struct fuse_copy_state *cs, size_t nbytes)
 {
 	ssize_t err;
+	struct fuse_conn *fc = fud->fc;
 	struct fuse_iqueue *fiq = &fc->iq;
-	struct fuse_pqueue *fpq = &fc->pq;
+	struct fuse_pqueue *fpq = &fud->pq;
 	struct fuse_req *req;
 	struct fuse_in *in;
 	unsigned reqsize;
@@ -1311,7 +1312,7 @@  static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
 
 	fuse_copy_init(&cs, 1, iov, nr_segs);
 
-	return fuse_dev_do_read(fud->fc, file, &cs, iov_length(iov, nr_segs));
+	return fuse_dev_do_read(fud, file, &cs, iov_length(iov, nr_segs));
 }
 
 static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
@@ -1351,7 +1352,7 @@  static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
 	fuse_copy_init(&cs, 1, NULL, 0);
 	cs.pipebufs = bufs;
 	cs.pipe = pipe;
-	ret = fuse_dev_do_read(fud->fc, in, &cs, len);
+	ret = fuse_dev_do_read(fud, in, &cs, len);
 	if (ret < 0)
 		goto out;
 
@@ -1860,11 +1861,12 @@  static int copy_out_args(struct fuse_copy_state *cs, struct fuse_out *out,
  * it from the list and copy the rest of the buffer to the request.
  * The request is finished by calling request_end()
  */
-static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
+static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
 				 struct fuse_copy_state *cs, size_t nbytes)
 {
 	int err;
-	struct fuse_pqueue *fpq = &fc->pq;
+	struct fuse_conn *fc = fud->fc;
+	struct fuse_pqueue *fpq = &fud->pq;
 	struct fuse_req *req;
 	struct fuse_out_header oh;
 
@@ -1962,7 +1964,7 @@  static ssize_t fuse_dev_write(struct kiocb *iocb, const struct iovec *iov,
 
 	fuse_copy_init(&cs, 0, iov, nr_segs);
 
-	return fuse_dev_do_write(fud->fc, &cs, iov_length(iov, nr_segs));
+	return fuse_dev_do_write(fud, &cs, iov_length(iov, nr_segs));
 }
 
 static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
@@ -2032,7 +2034,7 @@  static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
 	if (flags & SPLICE_F_MOVE)
 		cs.move_pages = 1;
 
-	ret = fuse_dev_do_write(fud->fc, &cs, len);
+	ret = fuse_dev_do_write(fud, &cs, len);
 
 	for (idx = 0; idx < nbuf; idx++) {
 		struct pipe_buffer *buf = &bufs[idx];
@@ -2119,10 +2121,10 @@  static void end_polls(struct fuse_conn *fc)
 void fuse_abort_conn(struct fuse_conn *fc)
 {
 	struct fuse_iqueue *fiq = &fc->iq;
-	struct fuse_pqueue *fpq = &fc->pq;
 
 	spin_lock(&fc->lock);
 	if (fc->connected) {
+		struct fuse_dev *fud;
 		struct fuse_req *req, *next;
 		LIST_HEAD(to_end1);
 		LIST_HEAD(to_end2);
@@ -2130,20 +2132,24 @@  void fuse_abort_conn(struct fuse_conn *fc)
 		fc->connected = 0;
 		fc->blocked = 0;
 		fuse_set_initialized(fc);
-		spin_lock(&fpq->lock);
-		fpq->connected = 0;
-		list_for_each_entry_safe(req, next, &fpq->io, list) {
-			req->out.h.error = -ECONNABORTED;
-			spin_lock(&req->waitq.lock);
-			set_bit(FR_ABORTED, &req->flags);
-			if (!test_bit(FR_LOCKED, &req->flags)) {
-				set_bit(FR_PRIVATE, &req->flags);
-				list_move(&req->list, &to_end1);
+		list_for_each_entry(fud, &fc->devices, entry) {
+			struct fuse_pqueue *fpq = &fud->pq;
+
+			spin_lock(&fpq->lock);
+			fpq->connected = 0;
+			list_for_each_entry_safe(req, next, &fpq->io, list) {
+				req->out.h.error = -ECONNABORTED;
+				spin_lock(&req->waitq.lock);
+				set_bit(FR_ABORTED, &req->flags);
+				if (!test_bit(FR_LOCKED, &req->flags)) {
+					set_bit(FR_PRIVATE, &req->flags);
+					list_move(&req->list, &to_end1);
+				}
+				spin_unlock(&req->waitq.lock);
 			}
-			spin_unlock(&req->waitq.lock);
+			list_splice_init(&fpq->processing, &to_end2);
+			spin_unlock(&fpq->lock);
 		}
-		list_splice_init(&fpq->processing, &to_end2);
-		spin_unlock(&fpq->lock);
 		fc->max_background = UINT_MAX;
 		flush_bg_queue(fc);
 
@@ -2178,13 +2184,17 @@  int fuse_dev_release(struct inode *inode, struct file *file)
 
 	if (fud) {
 		struct fuse_conn *fc = fud->fc;
-
-		WARN_ON(!list_empty(&fc->pq.io));
-		WARN_ON(fc->iq.fasync != NULL);
-		fuse_abort_conn(fc);
+		struct fuse_pqueue *fpq = &fud->pq;
+
+		WARN_ON(!list_empty(&fpq->io));
+		end_requests(fc, &fpq->processing);
+		/* Are we the last open device? */
+		if (atomic_dec_and_test(&fc->dev_count)) {
+			WARN_ON(fc->iq.fasync != NULL);
+			fuse_abort_conn(fc);
+		}
 		fuse_dev_free(fud);
 	}
-
 	return 0;
 }
 EXPORT_SYMBOL_GPL(fuse_dev_release);
@@ -2212,6 +2222,7 @@  static int fuse_device_clone(struct fuse_conn *fc, struct file *new)
 		return -ENOMEM;
 
 	new->private_data = fud;
+	atomic_inc(&fc->dev_count);
 
 	return 0;
 }
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 128d6f2..77230c3 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -457,6 +457,9 @@  struct fuse_dev {
 	/** Fuse connection for this device */
 	struct fuse_conn *fc;
 
+	/** Processing queue */
+	struct fuse_pqueue pq;
+
 	/** list entry on fc->devices */
 	struct list_head entry;
 };
@@ -478,6 +481,9 @@  struct fuse_conn {
 	/** Refcount */
 	atomic_t count;
 
+	/** Number of fuse_dev's */
+	atomic_t dev_count;
+
 	/** The user id for this mount */
 	kuid_t user_id;
 
@@ -496,9 +502,6 @@  struct fuse_conn {
 	/** Input queue */
 	struct fuse_iqueue iq;
 
-	/** Processing queue */
-	struct fuse_pqueue pq;
-
 	/** The next unique kernel file handle */
 	u64 khctr;
 
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index a40461d..3ac204e 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -420,11 +420,17 @@  int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
 
 	err = filemap_write_and_wait(inode->i_mapping);
 	if (!err || err == -EIO) { /* AS_EIO might trigger -EIO */
+		struct fuse_dev *fud;
 		spin_lock(&fc->lock);
-		fuse_kill_requests(fc, inode, &fc->pq.processing);
+		list_for_each_entry(fud, &fc->devices, entry) {
+			struct fuse_pqueue *fpq = &fud->pq;
+			spin_lock(&fpq->lock);
+			fuse_kill_requests(fc, inode, &fpq->processing);
+			fuse_kill_requests(fc, inode, &fpq->io);
+			spin_unlock(&fpq->lock);
+		}
 		fuse_kill_requests(fc, inode, &fc->iq.pending);
 		fuse_kill_requests(fc, inode, &fc->bg_queue);
-		fuse_kill_requests(fc, inode, &fc->pq.io);
 		wake_up(&fi->page_waitq); /* readpage[s] can wait on fuse wb */
 		spin_unlock(&fc->lock);
 
@@ -713,10 +719,10 @@  void fuse_conn_init(struct fuse_conn *fc)
 	mutex_init(&fc->inst_mutex);
 	init_rwsem(&fc->killsb);
 	atomic_set(&fc->count, 1);
+	atomic_set(&fc->dev_count, 1);
 	init_waitqueue_head(&fc->blocked_waitq);
 	init_waitqueue_head(&fc->reserved_req_waitq);
 	fuse_iqueue_init(&fc->iq);
-	fuse_pqueue_init(&fc->pq);
 	INIT_LIST_HEAD(&fc->bg_queue);
 	INIT_LIST_HEAD(&fc->entry);
 	INIT_LIST_HEAD(&fc->conn_files);
@@ -1122,6 +1128,7 @@  struct fuse_dev *fuse_dev_alloc(struct fuse_conn *fc)
 	fud = kzalloc(sizeof(struct fuse_dev), GFP_KERNEL);
 	if (fud) {
 		fud->fc = fuse_conn_get(fc);
+		fuse_pqueue_init(&fud->pq);
 
 		spin_lock(&fc->lock);
 		list_add_tail(&fud->entry, &fc->devices);