[RHEL7,COMMIT] fuse kio_pcs: ports from new user-space

Submitted by Konstantin Khorenko on May 23, 2018, 8:04 a.m.

Details

Message ID 201805230804.w4N84IFL003306@finist_ce7.work
State New
Series "fuse kio_pcs: ports from new user-space"
Headers show

Commit Message

Konstantin Khorenko May 23, 2018, 8:04 a.m.
The commit is pushed to "branch-rh7-3.10.0-693.21.1.vz7.50.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.21.1.vz7.50.1
------>
commit 785731c50a55a452f5ee0aecc0fa438c7edfba59
Author: Alexey Kuznetsov <kuznet@virtuozzo.com>
Date:   Wed May 23 11:04:18 2018 +0300

    fuse kio_pcs: ports from new user-space
    
    New token-based congestion avoidance is ported to kernel.
    
    Also, some bugs in fuse_kio_pcs congestion avoidance are fixed.
    There was a race condition, when cwnd could go to 0 from another thread
    and congestion queue would stuck. Now we solve the race condition moving
    cwnd check under cs->lock. In case cs is not congested under cs->lock,
    request is submitted immediately.
    
    Signed-off-by: Alexey Kuznetsov <kuznet@virtuozzo.com>
    Signed-off-by: Kirill Tkhai <ktkhai@virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_cluster.c      |  31 +++-
 fs/fuse/kio/pcs/pcs_cs.c           |  25 ++--
 fs/fuse/kio/pcs/pcs_cs.h           |  15 +-
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  10 +-
 fs/fuse/kio/pcs/pcs_map.c          | 287 ++++++++++++++++++++++---------------
 fs/fuse/kio/pcs/pcs_map.h          |   7 +-
 fs/fuse/kio/pcs/pcs_req.h          |  15 ++
 7 files changed, 241 insertions(+), 149 deletions(-)

Patch hide | download patch | download mbox

diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 79071655f6e5..24ec8a5f39a3 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -18,6 +18,7 @@ 
 #include "../../fuse_i.h"
 
 void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq);
+static void ireq_process_(struct pcs_int_request *ireq);
 
 static inline int is_file_inline(struct pcs_dentry_info *di)
 {
@@ -226,6 +227,9 @@  static int fiemap_worker(void * arg)
 
 		sreq->dentry = di;
 		sreq->type = PCS_IREQ_IOCHUNK;
+		INIT_LIST_HEAD(&sreq->tok_list);
+		sreq->tok_reserved = 0;
+		sreq->tok_serno = 0;
 		sreq->iochunk.map = NULL;
 		sreq->iochunk.flow = pcs_flow_record(&di->mapping.ftab, 0, pos, end-pos, &di->cluster->maps.ftab);
 		sreq->iochunk.cmd = PCS_REQ_T_FIEMAP;
@@ -280,7 +284,7 @@  void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq)
 		pcs_map_put(ireq->iochunk.map);
 	ireq->iochunk.map = map;
 
-	map_submit(map, ireq, 0);
+	map_submit(map, ireq);
 }
 
 /* TODO Remove noinline in production */
@@ -325,6 +329,9 @@  static noinline void __pcs_cc_process_ireq_rw(struct pcs_int_request *ireq)
 
 		sreq->dentry = di;
 		sreq->type = PCS_IREQ_IOCHUNK;
+		INIT_LIST_HEAD(&sreq->tok_list);
+		sreq->tok_reserved = 0;
+		sreq->tok_serno = 0;
 		sreq->iochunk.map = NULL;
 		sreq->iochunk.flow = pcs_flow_get(fl);
 		sreq->iochunk.cmd = ireq->apireq.req->type;
@@ -391,6 +398,25 @@  static void pcs_cc_process_ireq_ioreq(struct pcs_int_request *ireq)
 	return __pcs_cc_process_ireq_rw(ireq);
 }
 
+static void process_ireq_token(struct pcs_int_request * ireq)
+{
+	struct pcs_int_request * parent = ireq->token.parent;
+
+        if (parent) {
+		int do_execute = 0;
+
+		spin_lock(&parent->completion_data.child_lock);
+		if (ireq->token.parent) {
+			ireq_drop_tokens(parent);
+			do_execute = 1;
+		}
+		spin_unlock(&parent->completion_data.child_lock);
+		if (do_execute)
+			ireq_process_(parent);
+        }
+        ireq_destroy(ireq);
+}
+
 static void ireq_process_(struct pcs_int_request *ireq)
 {
 	struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
@@ -425,6 +451,9 @@  static void ireq_process_(struct pcs_int_request *ireq)
 	case PCS_IREQ_CUSTOM:
 		ireq->custom.action(ireq);
 		break;
+	case PCS_IREQ_TOKEN:
+		process_ireq_token(ireq);
+		break;
 	default:
 		BUG();
 		break;
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 151b06cb18cf..cae591e65a42 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -694,8 +694,8 @@  void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
 	if (cs == NULL)
 		return;
 
+	list_splice_tail_init(&cs->active_list, &queue);
 	list_splice_tail_init(&cs->cong_queue, &queue);
-	clear_bit(CS_SF_CONGESTED, &cs->state);
 	cs->cong_queue_len = 0;
 	cs_blacklist(cs, err->value, "notify error");
 	spin_unlock(&cs->lock);
@@ -710,9 +710,7 @@  static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose)
 
 	list_splice_tail_init(&cs->active_list, dispose);
 	list_splice_tail_init(&cs->cong_queue, dispose);
-	cs->active_list_len = 0;
 	cs->cong_queue_len = 0;
-	clear_bit(CS_SF_CONGESTED, &cs->state);
 
 	cs->is_dead = 1;
 	spin_lock(&cs->css->lock);
@@ -920,7 +918,7 @@  void cs_decrement_in_flight(struct pcs_cs *cs, unsigned int to_dec)
 
 	if (cs->in_flight < cs->eff_cwnd) {
 		cs->cwr_state = 0;
-		pcs_cs_flush_cong_queue(cs);
+		pcs_cs_activate_cong_queue(cs);
 	}
 	if (cs->in_flight == 0)
 		cs->idle_stamp = jiffies;
@@ -1066,6 +1064,7 @@  void pcs_csset_init(struct pcs_cs_set *css)
 	INIT_DELAYED_WORK(&css->bl_work, bl_timer_work);
 	css->ncs = 0;
 	spin_lock_init(&css->lock);
+	atomic64_set(&css->csl_serno_gen, 0);
 }
 
 void pcs_csset_fini(struct pcs_cs_set *css)
@@ -1146,14 +1145,18 @@  void pcs_cs_set_stat_up(struct pcs_cs_set *set)
 	pcs_cs_for_each_entry(set, do_update_stat, 0);
 }
 
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs)
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs)
 {
+	int queued = 0;
+
 	spin_lock(&cs->lock);
-	if (test_bit(CS_SF_CONGESTED, &cs->state))
-		test_bit(CS_SF_CONGESTED, &cs->state);
-	list_add_tail(&ireq->list, &cs->cong_queue);
-	cs->cong_queue_len++;
-	if (!ireq->qdepth)
-		ireq->qdepth = cs->cong_queue_len + cs->active_list_len;
+	if (cs->in_flight >= cs->eff_cwnd) {
+		list_add_tail(&ireq->list, &cs->cong_queue);
+		cs->cong_queue_len++;
+		if (!ireq->qdepth)
+			ireq->qdepth = cs->cong_queue_len;
+		queued = 1;
+	}
 	spin_unlock(&cs->lock);
+	return queued;
 }
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index f46b31f2633d..eb81ac51f3ae 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -36,7 +36,6 @@  enum {
 	CS_SF_FAILED,
 	CS_SF_BLACKLISTED,
 	CS_SF_ACTIVE,
-	CS_SF_CONGESTED,
 };
 
 struct pcs_cs {
@@ -66,14 +65,13 @@  struct pcs_cs {
 	struct list_head	cong_queue;
 	int			cong_queue_len;
 	struct list_head	active_list;
-	int			active_list_len;
 
 	pcs_cs_io_prio_t	io_prio;
 	pcs_cs_net_prio_t	net_prio;
 	u8			mds_flags;
 	abs_time_t		io_prio_stamp;
 
-	struct list_head		flow_lru;
+	struct list_head	flow_lru;
 	int			nflows;
 
 	unsigned long		state;
@@ -104,24 +102,20 @@  static inline void pcs_cs_init_cong_queue(struct pcs_cs *cs)
 {
 	INIT_LIST_HEAD(&cs->cong_queue);
 	cs->cong_queue_len = 0;
-	clear_bit(CS_SF_CONGESTED, &cs->state);
 }
 
 static inline void pcs_cs_init_active_list(struct pcs_cs *cs)
 {
 	INIT_LIST_HEAD(&cs->active_list);
-	cs->active_list_len = 0;
 }
 
-static inline void pcs_cs_flush_cong_queue(struct pcs_cs *cs)
+static inline void pcs_cs_activate_cong_queue(struct pcs_cs *cs)
 {
 	assert_spin_locked(&cs->lock);
-	list_splice_tail(&cs->cong_queue, &cs->active_list);
-	cs->active_list_len += cs->cong_queue_len;
-	pcs_cs_init_cong_queue(cs);
+	list_splice_tail_init(&cs->cong_queue, &cs->active_list);
 }
 
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs);
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs);
 
 #define PCS_CS_HASH_SIZE 1024
 
@@ -132,6 +126,7 @@  struct pcs_cs_set {
 	struct delayed_work	bl_work;
 	unsigned int		ncs;
 	spinlock_t		lock;
+	atomic64_t		csl_serno_gen;
 };
 
 void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq);
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index f4c0fb15403e..61378f0d9a58 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -843,11 +843,6 @@  static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
 		if (inarg->offset >= di->fileinfo.attr.size)
 			inarg->mode &= ~FALLOC_FL_ZERO_RANGE;
 
-		if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
-			if (inarg->offset + inarg->length > di->fileinfo.attr.size)
-				inarg->length = di->fileinfo.attr.size - inarg->offset;
-		}
-
 		if (inarg->mode & (FALLOC_FL_ZERO_RANGE|FALLOC_FL_PUNCH_HOLE)) {
 			if ((inarg->offset & (PAGE_SIZE - 1)) || (inarg->length & (PAGE_SIZE - 1))) {
 				r->req.out.h.error = -EINVAL;
@@ -855,6 +850,11 @@  static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
 			}
 		}
 
+		if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
+			if (inarg->offset + inarg->length > di->fileinfo.attr.size)
+				inarg->length = di->fileinfo.attr.size - inarg->offset;
+		}
+
 		ret = pcs_fuse_prep_rw(r);
 		if (!ret)
 			goto submit;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 085f49fd0219..90e311f7e995 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -911,8 +911,8 @@  struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 	atomic_set(&cs_list->refcnt, 1);
 	atomic_set(&cs_list->seq_read_in_flight, 0);
 	cs_list->read_index = -1;
-	cs_list->cong_index = -1 ;
 	cs_list->flags = 0;
+	cs_list->serno = atomic64_inc_return(&css->csl_serno_gen);
 	cs_list->blacklist = 0;
 	cs_list->read_timeout = (read_tout * HZ) / 1000;
 	cs_list->write_timeout = (write_tout * HZ) / 1000;
@@ -1380,7 +1380,7 @@  static void pcs_cs_deaccount(struct pcs_int_request *ireq, struct pcs_cs * cs, i
 	spin_unlock(&cs->lock);
 }
 
-static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
+static void pcs_cs_wakeup(struct pcs_cs * cs)
 {
 	struct pcs_int_request * sreq;
 	struct pcs_map_entry * map;
@@ -1393,11 +1393,32 @@  static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
 			break;
 		}
 		sreq = list_first_entry(&cs->active_list, struct pcs_int_request, list);
-		BUG_ON(!cs->active_list_len);
 		list_del_init(&sreq->list);
-		cs->active_list_len--;
+		cs->cong_queue_len--;
 		spin_unlock(&cs->lock);
 
+		if (sreq->type == PCS_IREQ_TOKEN) {
+			struct pcs_int_request * parent = sreq->token.parent;
+			int do_execute = 0;
+
+			if (parent == NULL) {
+				ireq_destroy(sreq);
+				continue;
+			}
+
+			spin_lock(&parent->completion_data.child_lock);
+			if (sreq->token.parent) {
+				parent->tok_reserved |= (1ULL << sreq->token.cs_index);
+				list_del(&sreq->token.tok_link);
+				do_execute = list_empty(&parent->tok_list);
+			}
+			spin_unlock(&parent->completion_data.child_lock);
+			ireq_destroy(sreq);
+			if (!do_execute)
+				continue;
+			sreq = parent;
+		}
+
 		if (sreq->type != PCS_IREQ_FLUSH) {
 			map = pcs_find_get_map(sreq->dentry, sreq->iochunk.chunk +
 						   ((sreq->flags & IREQ_F_MAPPED) ? 0 : sreq->iochunk.offset));
@@ -1412,7 +1433,7 @@  static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
 							 preq->apireq.req->pos, preq->apireq.req->size,
 							 &sreq->cc->maps.ftab);
 				}
-				map_submit(map, sreq, requeue);
+				map_submit(map, sreq);
 			} else {
 				map_queue_on_limit(sreq);
 			}
@@ -1422,58 +1443,33 @@  static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
 				pcs_clear_error(&sreq->error);
 				ireq_complete(sreq);
 			} else
-				map_submit(map, sreq, requeue);
+				map_submit(map, sreq);
 		}
 	}
 }
 
 static int __pcs_cs_still_congested(struct pcs_cs * cs)
 {
+	if (!list_empty(&cs->active_list))
+		list_splice_tail_init(&cs->active_list, &cs->cong_queue);
 
-	assert_spin_locked(&cs->lock);
-
-	if (!list_empty(&cs->active_list)) {
-		BUG_ON(!cs->active_list_len);
-		list_splice_tail(&cs->active_list, &cs->cong_queue);
-		cs->cong_queue_len += cs->active_list_len;
-		set_bit(CS_SF_CONGESTED, &cs->state);
-		pcs_cs_init_active_list(cs);
-	} else if (list_empty(&cs->cong_queue)) {
+	if (list_empty(&cs->cong_queue)) {
 		BUG_ON(cs->cong_queue_len);
-		BUG_ON(test_bit(CS_SF_CONGESTED, &cs->state));
 		return 0;
-	} else {
-		BUG_ON(cs->active_list_len);
 	}
 
-	if (cs->in_flight >= cs->eff_cwnd)
-		return 0;
-
-	/* Exceptional situation: CS is not congested, but still has congestion queue.
-	 * This can happen f.e. when CS was congested with reads and has some writes in queue,
-	 * then all reads are complete, but writes cannot be sent because of congestion
-	 * on another CSes in chain. This is absolutely normal, we just should queue
-	 * not on this CS, but on actualle congested CSes. With current algorithm of preventing
-	 * reordering, we did a mistake and queued on node which used to be congested.
-	 * Solution for now is to retry sending with flag "requeue" set, it will requeue
-	 * requests on another nodes. It is difficult to say how frequently this happens,
-	 * so we spit out message. If we will have lots of them in logs, we have to select
-	 * different solution.
-	 */
-
-	TRACE("CS#" NODE_FMT " is free, but still has queue", NODE_ARGS(cs->id));
-	pcs_cs_flush_cong_queue(cs);
-
-	return 1;
+	return cs->in_flight < cs->eff_cwnd;
 }
+
 static int pcs_cs_still_congested(struct pcs_cs * cs)
 {
-	int ret;
+	int res;
 
 	spin_lock(&cs->lock);
-	ret = __pcs_cs_still_congested(cs);
+	res = __pcs_cs_still_congested(cs);
 	spin_unlock(&cs->lock);
-	return ret;
+
+	return res;
 }
 
 void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
@@ -1518,7 +1514,7 @@  void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
 
 	if (ireq->type == PCS_IREQ_FLUSH || (pcs_req_direction(ireq->iochunk.cmd) && !(ireq->flags & IREQ_F_MAPPED))) {
 		int i;
-		int requeue = 0;
+		int requeue;
 
 		for (i = csl->nsrv - 1; i >= 0; i--) {
 			if (!match_id || csl->cs[i].cslink.cs->id.val == match_id)
@@ -1538,14 +1534,14 @@  void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
 
 		do {
 			for (i = csl->nsrv - 1; i >= 0; i--)
-				pcs_cs_wakeup(csl->cs[i].cslink.cs, requeue);
+				pcs_cs_wakeup(csl->cs[i].cslink.cs);
 
 			requeue = 0;
 			for (i = csl->nsrv - 1; i >= 0; i--)
-				requeue += pcs_cs_still_congested(csl->cs[i].cslink.cs);
+				requeue |= pcs_cs_still_congested(csl->cs[i].cslink.cs);
 		} while (requeue);
 	} else {
-		int requeue = 0;
+		int requeue;
 		struct pcs_cs * rcs = csl->cs[ireq->iochunk.cs_index].cslink.cs;
 
 		if (ireq->flags & IREQ_F_SEQ_READ) {
@@ -1557,7 +1553,7 @@  void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
 		pcs_cs_deaccount(ireq, rcs, error);
 
 		do {
-			pcs_cs_wakeup(rcs, requeue);
+			pcs_cs_wakeup(rcs);
 
 			requeue = pcs_cs_still_congested(rcs);
 		} while (requeue);
@@ -1771,6 +1767,10 @@  pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign)
 	sreq->iochunk.map = ireq->iochunk.map;
 	if (sreq->iochunk.map)
 		__pcs_map_get(sreq->iochunk.map);
+	INIT_LIST_HEAD(&sreq->tok_list);
+	BUG_ON(!list_empty(&ireq->tok_list));
+	sreq->tok_reserved = ireq->tok_reserved;
+	sreq->tok_serno = ireq->tok_serno;
 	sreq->iochunk.flow = pcs_flow_get(ireq->iochunk.flow);
 	sreq->iochunk.cmd = ireq->iochunk.cmd;
 	sreq->iochunk.role = ireq->iochunk.role;
@@ -1803,7 +1803,7 @@  pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign)
 	return sreq;
 }
 
-static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
 {
 	struct pcs_cluster_core *cc = ireq->cc;
 	struct pcs_cs * cs;
@@ -1872,9 +1872,8 @@  static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li
 	spin_unlock(&cs->lock);
 
 	if (allot < 0) {
-		pcs_cs_cong_enqueue(ireq, cs);
-
-		return 0;
+		if (pcs_cs_cong_enqueue_cond(ireq, cs))
+			return 0;
 	}
 
 	if (allot < ireq->dentry->cluster->cfg.curr.lmss)
@@ -1924,77 +1923,131 @@  static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li
 			return 0;
 
 		if (allot < 0) {
-			pcs_cs_cong_enqueue(ireq, cs);
-			return 0;
+			if (pcs_cs_cong_enqueue_cond(ireq, cs))
+				return 0;
 		}
 	}
 }
 
-static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int ireq_queue_tokens(struct pcs_int_request * ireq, struct pcs_cs_list * csl)
+{
+       int i;
+       int queued = 0;
+       struct list_head drop;
+       struct pcs_int_request * toks[csl->nsrv];
+
+       INIT_LIST_HEAD(&drop);
+
+       for (i = 0; i < csl->nsrv; i++) {
+               struct pcs_int_request * ntok;
+
+	       /* ireq is private; no need to lock tok_* fields */
+
+               if (ireq->tok_reserved & (1ULL << i)) {
+		       toks[i] = NULL;
+                       continue;
+	       }
+
+               ntok = ireq_alloc(ireq->dentry);
+               BUG_ON(!ntok);
+               ntok->type = PCS_IREQ_TOKEN;
+               ntok->token.parent = ireq;
+               ntok->token.cs_index = i;
+	       toks[i] = ntok;
+       }
+
+       /* Publish tokens in CS queues */
+       spin_lock(&ireq->completion_data.child_lock);
+       for (i = 0; i < csl->nsrv; i++) {
+	       if (toks[i]) {
+		       struct pcs_cs * cs = csl->cs[i].cslink.cs;
+		       if (pcs_cs_cong_enqueue_cond(toks[i], cs)) {
+			       list_add(&toks[i]->token.tok_link, &ireq->tok_list);
+			       toks[i] = NULL;
+			       queued = 1;
+		       } else {
+			       list_add(&toks[i]->token.tok_link, &drop);
+		       }
+	       }
+       }
+       spin_unlock(&ireq->completion_data.child_lock);
+
+       while (!list_empty(&drop)) {
+	       struct pcs_int_request * tok = list_first_entry(&drop, struct pcs_int_request, token.tok_link);
+	       list_del(&tok->token.tok_link);
+	       ireq_destroy(tok);
+       }
+       return queued;
+}
+
+void ireq_drop_tokens(struct pcs_int_request * ireq)
+{
+	assert_spin_locked(&ireq->completion_data.child_lock);
+
+	while (!list_empty(&ireq->tok_list)) {
+		struct pcs_int_request * tok = list_first_entry(&ireq->tok_list, struct pcs_int_request, token.tok_link);
+		tok->token.parent = NULL;
+		list_del(&tok->token.tok_link);
+        }
+}
+
+static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
 {
 	struct pcs_cs * cs;
 	unsigned int iochunk;
 	int i;
-	int congested_idx;
-	int max_excess;
 	int allot;
+	struct pcs_cs * congested_cs = NULL;
+	u64 congested = 0;
 
 	ireq->iochunk.cs_index = 0;
 	iochunk = ireq->dentry->cluster->cfg.curr.lmss;
 
 restart:
-	congested_idx = -1;
-	max_excess = 0;
 	allot = ireq->iochunk.size;
+	if (csl->serno != ireq->tok_serno)
+		ireq->tok_reserved = 0;
+	BUG_ON(!list_empty(&ireq->tok_list));
 
 	for (i = 0; i < csl->nsrv; i++) {
-		int cs_allot;
-
 		cs = csl->cs[i].cslink.cs;
 		if (cs_is_blacklisted(cs)) {
 			map_remote_error(ireq->iochunk.map, cs->blacklist_reason, cs->id.val);
 			TRACE("Write to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT,
 			      MAP_ARGS(ireq->iochunk.map), cs->blacklist_reason, NODE_ARGS(cs->id));
+			spin_lock(&ireq->completion_data.child_lock);
+			ireq_drop_tokens(ireq);
+			spin_unlock(&ireq->completion_data.child_lock);
 			return -1;
 		}
 		spin_lock(&cs->lock);
 		cs_cwnd_use_or_lose(cs);
-		cs_allot = cs->eff_cwnd - cs->in_flight;
 		spin_unlock(&cs->lock);
 
-		if (cs_allot < 0) {
-			cs_allot = -cs_allot;
-			if (cs_allot > max_excess) {
-				congested_idx = i;
-				max_excess = cs_allot;
-			}
-		} else {
-			if (cs_allot < allot)
-				allot = cs_allot;
-		}
+		if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) {
+			congested_cs = cs;
+			congested |= (1ULL << i);
+		} else
+			ireq->tok_reserved |= (1ULL << i);
 
 		if (!(test_bit(CS_SF_LOCAL, &cs->state)))
 			iochunk = ireq->dentry->cluster->cfg.curr.wmss;
 	}
 
-	if (congested_idx >= 0) {
-		int cur_cong_idx = READ_ONCE(csl->cong_index);
+	if (allot < ireq->dentry->cluster->cfg.curr.lmss)
+		allot = ireq->dentry->cluster->cfg.curr.lmss;
 
+	if (congested) {
+		int queued;
 
-		if (cur_cong_idx >= 0 && !requeue &&
-		    (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) ||
-		     READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
-			congested_idx = cur_cong_idx;
+		ireq->tok_serno = csl->serno;
+		if (congested & (congested - 1))
+			queued = ireq_queue_tokens(ireq, csl);
 		else
-			WRITE_ONCE(csl->cong_index, congested_idx);
-
-		pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
-		return 0;
+			queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+		if (queued)
+			return 0;
 	}
-	WRITE_ONCE(csl->cong_index, -1);
-
-	if (allot < ireq->dentry->cluster->cfg.curr.lmss)
-		allot = ireq->dentry->cluster->cfg.curr.lmss;
 
 	for (;;) {
 		struct pcs_int_request * sreq = ireq;
@@ -2048,61 +2101,55 @@  static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_l
 	}
 }
 
-static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
 {
 	struct pcs_cs * cs;
 	int i;
-	int congested_idx;
-	int max_excess;
 	int allot = PCS_CS_FLUSH_WEIGHT;
 	struct pcs_msg * msg;
 	struct pcs_cs_iohdr * ioh;
+	u64 congested = 0;
+	struct pcs_cs * congested_cs = NULL;
 
-	congested_idx = -1;
-	max_excess = 0;
+	if (csl->serno != ireq->tok_serno)
+		ireq->tok_reserved = 0;
+	BUG_ON(!list_empty(&ireq->tok_list));
 
 	for (i = 0; i < csl->nsrv; i++) {
-		int cs_allot;
-
 		cs = csl->cs[i].cslink.cs;
 
 		if (cs_is_blacklisted(cs)) {
 			map_remote_error(ireq->flushreq.map, cs->blacklist_reason, cs->id.val);
 			TRACE("Flush to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT,
 			      MAP_ARGS(ireq->flushreq.map), cs->blacklist_reason, NODE_ARGS(cs->id));
+			spin_lock(&ireq->completion_data.child_lock);
+			ireq_drop_tokens(ireq);
+			spin_unlock(&ireq->completion_data.child_lock);
 			return -1;
 		}
 
 		spin_lock(&cs->lock);
 		cs_cwnd_use_or_lose(cs);
-		cs_allot = cs->eff_cwnd - cs->in_flight;
 		spin_unlock(&cs->lock);
-
-		if (cs_allot < 0) {
-			cs_allot = -cs_allot;
-			if (cs_allot > max_excess) {
-				congested_idx = i;
-				max_excess = cs_allot;
-			}
-		}
+		if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) {
+			congested_cs = cs;
+			congested |= (1ULL << i);
+		} else
+			ireq->tok_reserved |= (1ULL << i);
 	}
 
-	if (congested_idx >= 0) {
-		int cur_cong_idx = READ_ONCE(csl->cong_index);
+	if (congested) {
+		int queued;
 
-		if (cur_cong_idx >= 0 && !requeue &&
-		    (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) ||
-		     READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
-			congested_idx = cur_cong_idx;
+		ireq->tok_serno = csl->serno;
+		if (congested & (congested - 1))
+			queued = ireq_queue_tokens(ireq, csl);
 		else
-			WRITE_ONCE(csl->cong_index, congested_idx);
-
-		pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
-		return 0;
+			queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+		if (queued)
+			return 0;
 	}
 
-	WRITE_ONCE(csl->cong_index, -1);
-
 	for (i = 0; i < csl->nsrv; i++) {
 		cs = csl->cs[i].cslink.cs;
 		cs_increment_in_flight(cs, allot);
@@ -2137,25 +2184,25 @@  static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_l
 
 
 
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue)
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl)
 {
 	BUG_ON(!atomic_read(&csl->refcnt));
 
 	if (ireq->type == PCS_IREQ_FLUSH) {
-		return pcs_cslist_submit_flush(ireq, csl, requeue);
+		return pcs_cslist_submit_flush(ireq, csl);
 	} else if (!pcs_req_direction(ireq->iochunk.cmd)) {
-		return pcs_cslist_submit_read(ireq, csl, requeue);
+		return pcs_cslist_submit_read(ireq, csl);
 	} else if (ireq->flags & IREQ_F_MAPPED) {
 		BUG();
 		return -EIO;
 	} else {
-		return pcs_cslist_submit_write(ireq, csl, requeue);
+		return pcs_cslist_submit_write(ireq, csl);
 	}
 	BUG();
 	return -EIO;
 }
 
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue)
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq)
 {
 	int direction;
 	int done;
@@ -2235,7 +2282,7 @@  void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requ
 		if (direction && ireq->type != PCS_IREQ_FLUSH)
 			ireq->dentry->local_mtime = get_real_time_ms();
 
-		done = !pcs_cslist_submit(ireq, csl, requeue);
+		done = !pcs_cslist_submit(ireq, csl);
 		cslist_put(csl);
 	} while (!done);
 }
@@ -2690,7 +2737,7 @@  void process_flush_req(struct pcs_int_request *ireq)
 		goto done;
 	}
 	spin_unlock(&m->lock);
-	map_submit(m, ireq, 0);
+	map_submit(m, ireq);
 	return;
 
 done:
@@ -2888,6 +2935,8 @@  static int prepare_map_flush_ireq(struct pcs_map_entry *m, struct pcs_int_reques
 	}
 	prepare_map_flush_msg(m, sreq, msg);
 	sreq->type = PCS_IREQ_FLUSH;
+	INIT_LIST_HEAD(&sreq->tok_list);
+	sreq->tok_reserved = 0;
 	sreq->ts = ktime_get();
 	sreq->completion_data.parent = NULL;
 	sreq->flushreq.map = m;
@@ -2922,7 +2971,7 @@  static void sync_timer_work(struct work_struct *w)
 		map_sync_work_add(m, HZ);
 	} else {
 		if (sreq)
-			map_submit(m, sreq, 0);
+			map_submit(m, sreq);
 	}
 	/* Counter part from map_sync_work_add */
 	pcs_map_put(m);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 11176b2b80d5..c60e72b365d3 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -85,13 +85,13 @@  struct pcs_cs_list
 	atomic_t		refcnt;
 	atomic_t		seq_read_in_flight;
 	int			read_index;		/* volatile read hint */
-	int			cong_index;		/* volatile cong hint */
 	unsigned long		blacklist;		/* Atomic bit field */
 	abs_time_t		blacklist_expires;	/* volatile blacklist stamp */
 	abs_time_t		select_stamp;		/* volatile read hint stamp */
 	/* members below are immutable accross cslist life time */
 #define CSL_FL_HAS_LOCAL	1
 	unsigned int		flags;
+	u64                     serno;
 	int			read_timeout;
 	int			write_timeout;
 	int			nsrv;
@@ -165,7 +165,7 @@  void pcs_mapping_truncate(struct pcs_int_request *ireq, u64 old_size);
 void process_ireq_truncate(struct pcs_int_request *ireq);
 
 struct pcs_map_entry * pcs_find_get_map(struct pcs_dentry_info * de, u64 chunk);
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue);
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq);
 void map_notify_iochunk_error(struct pcs_int_request *ireq);
 void map_notify_soft_error(struct pcs_int_request *ireq);
 void __pcs_map_put(struct pcs_map_entry *m);
@@ -182,7 +182,7 @@  void pcs_map_verify_sync_state(struct pcs_dentry_info * de, struct pcs_int_reque
 void map_inject_flush_req(struct pcs_int_request *ireq);
 void process_flush_req(struct pcs_int_request *ireq);
 int map_check_limit(struct pcs_map_entry * map, struct pcs_int_request *ireq);
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue);
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl);
 struct pcs_int_request * pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign);
 int  fuse_map_resolve(struct pcs_map_entry * m, int direction);
 struct pcs_ioc_getmap;
@@ -190,6 +190,7 @@  void pcs_map_complete(struct pcs_map_entry *m, struct pcs_ioc_getmap *omap);
 int pcs_map_encode_req(struct pcs_map_entry*m, struct pcs_ioc_getmap *map, int direction);
 void map_truncate_tail(struct pcs_mapping *mapping, u64 offset);
 unsigned long pcs_map_shrink_scan(struct shrinker *,  struct shrink_control *sc);
+void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 #define MAP_FMT	"(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,	 (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index fba74e9c4a56..6f49018e3988 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -28,6 +28,8 @@  enum
 	PCS_IREQ_CUSTOM = 16,	/* generic request */
 	PCS_IREQ_WRAID	= 17,	/* compound raid6 write request */
 	PCS_IREQ_RRAID	= 18,	/* compound raid6 read request */
+	PCS_IREQ_GETMAP = 19,   /* get mapping for kdirect mode */
+	PCS_IREQ_TOKEN  = 20,   /* dummy token to allocate congestion window */
 	PCS_IREQ_KAPI	= 65	/* IO request from kernel API */
 };
 
@@ -86,6 +88,13 @@  struct pcs_int_request
 	*/
 	struct work_struct worker;
 
+	/* The following tok_* fields are sequenced by completion_data.child_lock
+	 * NOTE: cs->lock can be taken under this lock.
+	 */
+	struct list_head	tok_list;
+	u64                     tok_reserved;
+	u64                     tok_serno;
+
 	union {
 		struct {
 			struct pcs_map_entry	*map;
@@ -112,6 +121,12 @@  struct pcs_int_request
 			struct pcs_msg		*msg;
 		} flushreq;
 
+		struct {
+			struct pcs_int_request  *parent;
+			struct list_head	tok_link;
+			int			cs_index;
+		} token;
+
 		struct {
 			u64			offset;
 			int			phase;