[RFC,1/2] fs/fuse kio: implement internal cs connection

Submitted by Pavel Butsykin on July 8, 2019, 2:57 p.m.

Details

Message ID 20190708145725.19937-1-pbutsykin@virtuozzo.com
State New
Series "Series without cover letter"
Headers show

Commit Message

Pavel Butsykin July 8, 2019, 2:57 p.m.
This patch transfers the creation of sock connection to CS from user-space
to kernel, in order to implement RDMA in Fast-Path. Together with the creation
of connection, this patch transfers the 'auth digest' handshake, which is
necessary to identify the client.

In contrast to usermode, the hendshake in this patch is implemented
synchronously, which made it possible to reduce and simplify the code.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 fs/fuse/Makefile                   |   3 +-
 fs/fuse/kio/pcs/pcs_cluster.c      |   9 +-
 fs/fuse/kio/pcs/pcs_cluster.h      |   4 +-
 fs/fuse/kio/pcs/pcs_cluster_core.c |   7 +-
 fs/fuse/kio/pcs/pcs_cs.c           |  47 ++-
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |   2 +-
 fs/fuse/kio/pcs/pcs_ioctl.h        |   3 +-
 fs/fuse/kio/pcs/pcs_req.h          |   3 +
 fs/fuse/kio/pcs/pcs_rpc.c          |  12 +-
 fs/fuse/kio/pcs/pcs_rpc.h          |  18 +-
 fs/fuse/kio/pcs/pcs_rpc_prot.h     |  21 ++
 fs/fuse/kio/pcs/pcs_sock_conn.c    | 657 +++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_sock_conn.h    |   7 +
 fs/fuse/kio/pcs/pcs_sock_io.c      |  47 ++-
 fs/fuse/kio/pcs/pcs_sock_io.h      |   9 +
 fs/fuse/kio/pcs/pcs_types.h        |   1 +
 16 files changed, 809 insertions(+), 41 deletions(-)
 create mode 100644 fs/fuse/kio/pcs/pcs_sock_conn.c
 create mode 100644 fs/fuse/kio/pcs/pcs_sock_conn.h

Patch hide | download patch | download mbox

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 87f655c596e6..26b5ef95bb3d 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -25,6 +25,7 @@  fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/pcs_cluster_core.o \
 	kio/pcs/pcs_cs.o \
 	kio/pcs/fuse_io.o \
-	kio/pcs/fuse_stat.o
+	kio/pcs/fuse_stat.o \
+	kio/pcs/pcs_sock_conn.o
 
 fuse-objs := dev.o dir.o file.o inode.o control.o
diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 082acea9affa..59aee8ad5394 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -590,19 +590,18 @@  static int ireq_check_redo_(struct pcs_int_request *ireq)
 }
 
 int pcs_cluster_init(struct pcs_fuse_cluster *pfc, struct workqueue_struct *wq,
-		     struct fuse_conn *fc, PCS_CLUSTER_ID_T *cl_id,
-		     PCS_NODE_ID_T *id)
+		     struct fuse_conn *fc, struct pcs_ioc_init_kdirect *info)
 {
 	struct pcs_cluster_core_attr attr;
 
-	attr.cluster = *cl_id;
-	attr.node = *id;
+	attr.cluster = info->cluster_id;
+	attr.node = info->node_id;
 	attr.abort_timeout_ms = 0;
 
 	pfc->fc = fc;
 
 	/* core init */
-	if (pcs_cc_init(&pfc->cc, wq, NULL, &attr))
+	if (pcs_cc_init(&pfc->cc, wq, info->cluster_name, &attr))
 		return -1;
 	pfc->cc.fc = fc;
 	pfc->cc.op.ireq_process	   = ireq_process_;
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 8369f9b00cfa..ba41271c78f2 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -2,6 +2,7 @@ 
 #define _PCS_CLUSTER_H_ 1
 
 #include "pcs_req.h"
+#include "pcs_ioctl.h"
 #include "../../fuse_i.h"
 struct fuse_conn;
 
@@ -54,8 +55,7 @@  struct pcs_fuse_work {
 extern struct workqueue_struct *pcs_cleanup_wq;
 
 int pcs_cluster_init(struct pcs_fuse_cluster *c, struct workqueue_struct *,
-		     struct fuse_conn *fc, PCS_CLUSTER_ID_T *cl_id,
-		     PCS_NODE_ID_T *id);
+		     struct fuse_conn *fc, struct pcs_ioc_init_kdirect *info);
 void pcs_cluster_fini(struct pcs_fuse_cluster *c);
 
 extern void fiemap_work_func(struct work_struct *w);
diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c
index 010d6588c99b..6077d58a50b0 100644
--- a/fs/fuse/kio/pcs/pcs_cluster_core.c
+++ b/fs/fuse/kio/pcs/pcs_cluster_core.c
@@ -120,9 +120,9 @@  int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
 		const char *cluster_name, struct pcs_cluster_core_attr *attr)
 {
 	int err;
-	/* Ignore this for now, i have cluter_id and node_id*/
-	/* if (cluster_name == NULL) */
-	/*	   return -1; */
+
+	if (!cluster_name)
+		   return -EINVAL;
 
 	spin_lock_init(&cc->lock);
 	INIT_LIST_HEAD(&cc->work_queue);
@@ -131,6 +131,7 @@  int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
 	INIT_WORK(&cc->completion_job, cc_completion_handler);
 	INIT_WORK(&cc->fiemap_work, fiemap_work_func);
 	cc->wq = wq;
+	snprintf(cc->cluster_name, sizeof(cc->cluster_name), "%s", cluster_name);
 
 	pcs_csset_init(&cc->css);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index c6ff456c59c2..8150b3390691 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -13,6 +13,7 @@ 
 #include "pcs_cs.h"
 #include "pcs_cs_prot.h"
 #include "pcs_cluster.h"
+#include "pcs_sock_conn.h"
 #include "pcs_ioctl.h"
 #include "log.h"
 #include "fuse_ktrace.h"
@@ -34,7 +35,8 @@  static void cs_aborting(struct pcs_rpc *ep, int error);
 static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
 static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg);
 static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
-static void cs_connect(struct pcs_rpc *ep);
+static void cs_user_connect(struct pcs_rpc *ep);
+static void cs_kernel_connect(struct pcs_rpc *ep);
 static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose);
 static void pcs_cs_destroy(struct pcs_cs *cs);
 
@@ -43,7 +45,7 @@  struct pcs_rpc_ops cn_rpc_ops = {
 	.get_hdr		= cs_get_hdr,
 	.state_change		= cs_aborting,
 	.keep_waiting		= cs_keep_waiting,
-	.connect		= cs_connect,
+	.connect		= cs_kernel_connect,
 };
 
 static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs)
@@ -432,7 +434,46 @@  static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
 	}
 }
 
-static void cs_connect(struct pcs_rpc *ep)
+static void cs_kernel_connect(struct pcs_rpc *ep)
+{
+	if (ep->flags & PCS_RPC_F_LOCAL) {
+		char path[128];
+
+		snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
+			(unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
+
+		if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
+			TRACE("Path to local socket is too long: %s", path);
+
+			ep->flags &= ~PCS_RPC_F_LOCAL;
+			goto fail;
+		}
+		memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
+		ep->sh.sun.sun_family = AF_UNIX;
+		ep->sh.sa_len = sizeof(struct sockaddr_un);
+		strcpy(ep->sh.sun.sun_path, path);
+	} else {
+		/* TODO: print sock addr using pcs_format_netaddr() */
+		if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
+			if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
+				TRACE("netaddr to sockaddr failed");
+				goto fail;
+			}
+		} else {
+			WARN_ON_ONCE(1);
+			/* TODO: rdma connect init */
+			goto fail;
+		}
+	}
+	ep->state = PCS_RPC_CONNECT;
+	pcs_sockconnect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+	return;
+fail:
+	pcs_rpc_reset(ep);
+	return;
+}
+
+__maybe_unused static void cs_user_connect(struct pcs_rpc *ep)
 {
 	struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
 	struct pcs_fuse_cluster *pfc = pcs_cluster_from_cc(cc);
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index f9470d31c8a9..07aa683f2245 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -101,7 +101,7 @@  static void process_pcs_init_reply(struct fuse_conn *fc, struct fuse_req *req)
 		goto out;
 	}
 
-	if (pcs_cluster_init(pfc, pcs_wq, fc, &info->cluster_id, &info->node_id)) {
+	if (pcs_cluster_init(pfc, pcs_wq, fc, info)) {
 		fc->conn_error = 1;
 		kvfree(pfc);
 		goto out;
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 31d5e92cdd93..b86d670466cc 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -10,7 +10,7 @@ 
 #include "pcs_map.h"
 #include "pcs_rpc.h"
 
-#define PCS_FAST_PATH_VERSION ((PCS_FAST_PATH_VERSION_T){{1, 2}})
+#define PCS_FAST_PATH_VERSION ((PCS_FAST_PATH_VERSION_T){{1, 3}})
 
 #define PCS_FUSE_INO_SPECIAL_ ((unsigned long long)-0x1000)
 
@@ -38,6 +38,7 @@  struct pcs_ioc_init_kdirect
 	PCS_NODE_ID_T node_id;
 	PCS_CLUSTER_ID_T cluster_id;
 	PCS_FAST_PATH_VERSION_T version;
+	char cluster_name[NAME_MAX];
 };
 
 struct pcs_ioc_fileinfo
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 3f26aa3489ff..a614da116c3c 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -236,6 +236,9 @@  struct pcs_cluster_core
 	int (*abort_callback)(struct pcs_cluster_core *cc, struct pcs_int_request *ireq);
 	struct fuse_conn *fc;
 	spinlock_t		lock;
+
+	char cluster_name[NAME_MAX];
+
 };
 
 static inline struct pcs_cluster_core *cc_from_csset(struct pcs_cs_set * css)
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 66d090988552..71bc7e0a806d 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -245,6 +245,7 @@  out:
 /* Client close. */
 void pcs_rpc_close(struct pcs_rpc * ep)
 {
+	TRACE("pcs_rpc_close");
 	mutex_lock(&ep->mutex);
 	BUG_ON(ep->flags & PCS_RPC_F_DEAD);
 	BUG_ON(ep->flags & PCS_RPC_F_PASSIVE);
@@ -356,7 +357,7 @@  void __pcs_rpc_put(struct pcs_rpc *ep)
 		queue_work(pcs_cleanup_wq, &rpc_cleanup_work);
 }
 
-static void rpc_eof_cb(struct pcs_sockio * sio)
+void rpc_eof_cb(struct pcs_sockio *sio)
 {
 	struct pcs_rpc * ep = sio->parent;
 
@@ -410,7 +411,7 @@  void pcs_rpc_error_respond(struct pcs_rpc * ep, struct pcs_msg * msg, int err)
 /* After client gets csconn_complete() callback, he makes some actions and completes switch
  * to WORK state calling this function.
  */
-static void pcs_rpc_enable(struct pcs_rpc * ep, int error)
+void pcs_rpc_enable(struct pcs_rpc * ep, int error)
 {
 	struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
 
@@ -433,6 +434,7 @@  static void pcs_rpc_enable(struct pcs_rpc * ep, int error)
 	}
 	TRACE("ep(%p)->state: WORK\n", ep);
 	ep->state = PCS_RPC_WORK;
+	ep->retries = 0;
 	queue_work(cc->wq, &ep->work);
 }
 
@@ -604,13 +606,11 @@  found:
 	return msg;
 }
 
-
 /* Start connect. It is triggered by a message sent to this peer or can be called
  * explicitly, if caller needs to steal csconn from userspace
  */
-void pcs_rpc_connect(struct pcs_rpc * ep)
+void pcs_rpc_connect(struct pcs_rpc *ep)
 {
-
 	/* Nothing to do, connect is already initiated or in holddown state */
 	if (ep->state != PCS_RPC_UNCONN)
 		return;
@@ -1406,7 +1406,6 @@  void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock)
 
 	TRACE(PEER_FMT " ->state:%d sock:%p\n", PEER_ARGS(ep), ep->state, sock);
 	cancel_delayed_work(&ep->timer_work);
-	ep->retries++;
 
 	if (ep->state != PCS_RPC_CONNECT) {
 		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Invalid state: %u", ep->state);
@@ -1423,7 +1422,6 @@  void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock)
 	sio->get_msg = rpc_get_hdr;
 	sio->eof = rpc_eof_cb;
 	//pcs_ioconn_register(ep->conn);
-	ep->retries = 0;
 	if (ep->gc)
 		list_lru_add(&ep->gc->lru, &ep->lru_link);
 
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index 119de48d4376..2699657fac14 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -7,6 +7,10 @@ 
 
 struct pcs_msg;
 
+#define PCS_PRODUCT_NAME "vstorage"
+#define PCS_DEV_SHM	 "/dev/shm"
+#define PCS_SHM_DIR	 PCS_DEV_SHM"/"PCS_PRODUCT_NAME
+
 #define PCS_RPC_HASH_SIZE	1024
 
 enum
@@ -102,7 +106,16 @@  struct pcs_rpc
 #if 0
 	struct sockaddr_un *	sun;
 #endif
-	struct pcs_ioconn *	conn;		/* Active connection for the peer */
+	struct pcs_ioconn *conn;		/* Active connection for the peer */
+	struct {
+		int sa_len;
+		union {
+			struct sockaddr     sa;
+			struct sockaddr_in  saddr4;
+			struct sockaddr_in6 saddr6;
+			struct sockaddr_un  sun;
+		};
+	} sh;
 
 	struct pcs_rpc_ops *	ops;
 
@@ -272,6 +285,7 @@  void pcs_rpc_init_response(struct pcs_msg * msg, struct pcs_rpc_hdr * req_hdr, i
 
 /* Allocate message and initialize header */
 struct pcs_msg * pcs_rpc_alloc_msg_w_hdr(int type, int size);
+struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size);
 
 void pcs_rpc_set_memlimits(struct pcs_rpc_engine * eng, u64 thresh, u64 limit);
 void pcs_rpc_account_adjust(struct pcs_msg * msg, int adjustment);
@@ -286,6 +300,8 @@  void rpc_trace_health(struct pcs_rpc * ep);
 void pcs_rpc_enumerate_rpc(struct pcs_rpc_engine *eng, void (*cb)(struct pcs_rpc *ep, void *arg), void *arg);
 void pcs_rpc_set_sock(struct pcs_rpc *ep, struct pcs_sockio * sio);
 void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock);
+void pcs_rpc_enable(struct pcs_rpc * ep, int error);
+void rpc_eof_cb(struct pcs_sockio *sio);
 
 static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
 {
diff --git a/fs/fuse/kio/pcs/pcs_rpc_prot.h b/fs/fuse/kio/pcs/pcs_rpc_prot.h
index e32e7912943e..7dd33012dbe4 100644
--- a/fs/fuse/kio/pcs/pcs_rpc_prot.h
+++ b/fs/fuse/kio/pcs/pcs_rpc_prot.h
@@ -62,6 +62,27 @@  struct pcs_rpc_error_resp
 /* Application specific payload types */
 #define PCS_RPC_APP_PAYLOAD_BASE	512
 
+/* Auth req/resp flags */
+enum
+{
+	PCS_RPC_AUTH_F_VOID_SENDER	= 1,		/* sender_id is undefined */
+	PCS_RPC_AUTH_F_VOID_RECIPIENT	= 2,		/* recipient_id is undefined */
+	PCS_RPC_AUTH_F_ACQ_SENDER	= 4,		/* sender_id requests for id, recipient must be MDS
+							 * NOTE: it is not clear, should be initializatiton
+							 * part of auth phase. F.e. CS register could be
+							 * with auth phase.
+							 */
+	PCS_RPC_AUTH_F_BOOTSTRAP	= 8,		/* Not a real MDS (f.e. no quorum), can only return
+							 * advices about another MDSs location
+							 */
+	PCS_RPC_AUTH_F_SLAVE		= 16,		/* MDS is authentic, but will not accept requests changing
+							 * state.
+							 */
+	PCS_RPC_AUTH_F_VOID_CLUSTERID	= 32,		/* cluster_id is undefined */
+
+	PCS_RPC_AUTH_F_AUTHORISED	= 64,		/* client was successfully authorised on MDS */
+};
+
 /* Node role */
 enum
 {
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.c b/fs/fuse/kio/pcs/pcs_sock_conn.c
new file mode 100644
index 000000000000..c924ed439a27
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.c
@@ -0,0 +1,657 @@ 
+#include <net/sock.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/tcp.h>
+
+#include <crypto/hash.h>
+#include <crypto/md5.h>
+
+#include "pcs_types.h"
+#include "pcs_sock_io.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+#define PCS_CFG_DIR "/etc/vstorage"
+#define AUTH_DIGEST_NAME "digest"
+#define AUTH_DIGEST_NAME_LEN (sizeof(AUTH_DIGEST_NAME) - 1)
+#define PCS_KEYPATH_FMT (PCS_CFG_DIR"/clusters/%s/auth_digest.key")
+#define LOCK_FILE_PATH_FMT (PCS_CFG_DIR"/clusters/%s/.digest_auth.lock")
+
+enum
+{	/* basic states of auth handshake required for establish SSL connection,
+	 * other authentication protocols can require more states
+	 */
+	PCS_AUTH_INITIAL = 0,
+	PCS_AUTH_SEND_HELLO ,	/* Client sends hello at connect */
+	PCS_AUTH_SEND_SRV_CERT,		/* Server sends its cert. to client */
+	PCS_AUTH_SEND_CN_CERT,		/* Client sends its cert. to server */
+	PCS_AUTH_SRV_ACCEPT,		/* Server accept client's cert. */
+};
+
+#define DIGEST_AUTH_ID_LEN      32
+#define DIGEST_SALT_LEN         16
+#define DIGEST_KEY_LEN          32
+
+__pre_packed struct digest_hello_msg {
+	unsigned char md5_cn[MD5_DIGEST_SIZE];
+} __packed;
+
+__pre_packed struct digest_srv_salt_msg {
+	unsigned char salt[DIGEST_SALT_LEN];
+	unsigned char id[DIGEST_AUTH_ID_LEN];
+} __packed;
+
+#define HMAC_SHA512_HSIZE 64U
+
+__pre_packed struct digest_hmac_msg {
+	unsigned int size;
+	unsigned char data[HMAC_SHA512_HSIZE];
+} __packed;
+
+__pre_packed struct digest_msg {
+	unsigned char id[DIGEST_AUTH_ID_LEN];
+	struct digest_hmac_msg hmac;
+} __packed;
+
+static int pcs_generate_hmac(u8 *key, size_t key_sz, u8 *in, size_t in_sz,
+			     u8 *out, u32 *out_sz)
+{
+	struct crypto_shash *hmacalg;
+	struct shash_desc *shash;
+	int ret;
+
+	hmacalg = crypto_alloc_shash("hmac(sha1)", 0, 0);
+	if (IS_ERR(hmacalg)) {
+		TRACE("hmacalg: could not allocate crypto %ld", PTR_ERR(hmacalg));
+		return PTR_ERR(hmacalg);
+	}
+
+	ret = crypto_shash_setkey(hmacalg, key, key_sz);
+	if (ret) {
+		TRACE("crypto_shash_setkey failed: err %d", ret);
+		goto fail1;
+	}
+
+	shash = kzalloc(sizeof(*shash) + crypto_shash_descsize(hmacalg),
+			GFP_KERNEL);
+	if (!shash) {
+		ret = -ENOMEM;
+		goto fail1;
+	}
+
+	shash->tfm = hmacalg;
+	shash->flags = CRYPTO_TFM_REQ_MAY_SLEEP;
+
+	ret = crypto_shash_digest(shash, in, in_sz, out);
+	if (ret)
+		TRACE("crypto_shash_digest failed: %d", ret);
+
+	*out_sz = crypto_shash_alg(shash->tfm)->digestsize;
+	kfree(shash);
+fail1:
+	crypto_free_shash(hmacalg);
+	return ret;
+}
+
+static int pcs_validate_hmac(struct digest_msg *digest, u8 *key, size_t key_sz,
+			     u8 *data, u32 data_sz)
+{
+	u8 hmac[HMAC_SHA512_HSIZE];
+	int err;
+
+	err = pcs_generate_hmac(key, key_sz, digest->id, sizeof(digest->id),
+				hmac, &data_sz);
+	if (err)
+		return err;
+
+	return !memcmp(hmac, data, min(data_sz, HMAC_SHA512_HSIZE));
+}
+
+static int pcs_md5_hash(char *result, char *data, size_t len)
+{
+	struct shash_desc *desc;
+	int err;
+
+	desc = kmalloc(sizeof(*desc), GFP_KERNEL);
+	if (!desc)
+		return -ENOMEM;
+
+	desc->tfm = crypto_alloc_shash("md5", 0, CRYPTO_ALG_ASYNC);
+	if(IS_ERR(desc->tfm)) {
+		err = PTR_ERR(desc->tfm);
+		goto fail1;
+	}
+
+	err = crypto_shash_init(desc);
+	if (err)
+		goto fail2;
+	err = crypto_shash_update(desc, data, len);
+	if (err)
+		goto fail2;
+	err = crypto_shash_final(desc, result);
+fail2:
+	crypto_free_shash(desc->tfm);
+fail1:
+	kfree(desc);
+
+	return err;
+}
+
+static struct file *lock_key_file(char *cluster_name)
+{
+	char lockfile[sizeof(LOCK_FILE_PATH_FMT) + NAME_MAX];
+	struct file_lock *lock;
+	struct file *f;
+	int err;
+
+	snprintf(lockfile, sizeof(lockfile) - 1, LOCK_FILE_PATH_FMT,
+		 cluster_name);
+	f = filp_open(lockfile, O_CREAT | O_RDONLY | O_CLOEXEC,
+		      S_IRUSR | S_IRGRP | S_IROTH);
+	if (IS_ERR(f))
+		return f;
+
+	lock = locks_alloc_lock(1);
+	if (!lock) {
+		filp_close(f, NULL);
+		return ERR_PTR(-ENOMEM);
+	}
+	lock->fl_file = f;
+	lock->fl_pid = current->tgid;
+	lock->fl_flags = FL_FLOCK;
+	lock->fl_type = F_WRLCK;
+	lock->fl_end = OFFSET_MAX;
+
+	err = locks_lock_file_wait(f, lock);
+	if (err < 0) {
+		filp_close(f, NULL);
+		return ERR_PTR(err);
+	}
+	return f;
+}
+
+static int pcs_load_keyfile_auth(char *cluster_name, u8 *key_out, u32 len)
+{
+	char keyfile[sizeof(PCS_KEYPATH_FMT) + NAME_MAX];
+	struct file *f, *flock;
+	u64 offs = 0;
+	int err;
+
+	flock = lock_key_file(cluster_name);
+	if (IS_ERR(flock)) {
+		TRACE("Lock keyfile failed: %ld", PTR_ERR(flock));
+		return PTR_ERR(flock);
+	}
+
+	snprintf(keyfile, sizeof(keyfile) - 1, PCS_KEYPATH_FMT, cluster_name);
+
+	f = filp_open(keyfile, O_RDONLY, 0);
+	if (IS_ERR(f)) {
+		err = PTR_ERR(f);
+		TRACE("Can't open keyfile auth: %s, err: %d", keyfile, err);
+		goto out;
+	}
+
+	err = vfs_read(f, key_out, len, &offs);
+	if (err < 0) {
+		TRACE("Can't read keyfile: %s, err: %d", keyfile, err);
+	} else if (err != len)
+		TRACE("Can't read full key(req: %d, read: %d)", len, err);
+	filp_close(f, NULL);
+out:
+	filp_close(flock, NULL);
+
+	return err < 0 ? err : 0;
+}
+
+static inline void pcs_sock_keepalive(struct socket *sock)
+{
+	int val;
+
+	val = 1;
+	kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+			  (char *)&val, sizeof(val));
+	val = 60;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
+			  (char *)&val, sizeof(val));
+	val = 5;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
+			  (char *)&val, sizeof(val));
+	val = 5;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
+			  (char *)&val, sizeof(val));
+}
+
+static inline int pcs_sock_cork(struct socket *sock)
+{
+	int val = 1;
+	if (kernel_setsockopt(sock, SOL_TCP, TCP_CORK, (char *)&val,
+			      sizeof(val)) == 0)
+		return 0;
+	return -1;
+}
+
+static inline void pcs_sock_nodelay(struct socket *sock)
+{
+	int val = 1;
+	kernel_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
+			  sizeof(val));
+}
+
+static int send_buf(struct socket *sock, u8 *buf, size_t size)
+{
+	struct msghdr msg = {
+		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL,
+	};
+	struct kvec iov = {
+		.iov_base = buf,
+		.iov_len = size,
+	};
+	int ret = kernel_sendmsg(sock, &msg, &iov, 1, size);
+	return ret < 0 ? ret : 0;
+}
+
+static int recv_buf(struct socket *sock, u8 *buf, size_t size)
+{
+	struct msghdr msg = {
+		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL,
+	};
+	struct kvec iov = {
+		.iov_base = buf,
+		.iov_len = size,
+	};
+	int ret = kernel_recvmsg(sock, &msg, &iov, 1, size,
+				 MSG_WAITALL | MSG_NOSIGNAL);
+	if (ret < 0)
+		return ret;
+	return ret != size ? -EPROTO : 0;
+}
+
+#define __str_len(s) (ARRAY_SIZE(s) - sizeof(*(s)))
+
+/* Multiple payloads are supported. They are expected to have fixed alignment in between. */
+#define PCS_RPC_AUTH_PAYLOAD_ALIGN 8
+
+#define PCS_BUILD_VERSION "unknown"
+#define MAX_BUILD_VERSION_LENGTH 30
+
+static struct {
+	struct pcs_rpc_payload p;
+	char build_version[MAX_BUILD_VERSION_LENGTH+1];
+} s_version_data = {
+	{
+		.len = __str_len(PCS_BUILD_VERSION),
+		.type = PCS_RPC_BUILD_VERSION_PAYLOAD,
+	},
+	.build_version = PCS_BUILD_VERSION
+};
+
+static inline unsigned rpc_auth_payload_size(struct pcs_rpc_payload const* p) {
+	return sizeof(*p) + p->len;
+}
+
+static inline unsigned rpc_auth_payload_size_aligned(struct pcs_rpc_payload const* p) {
+	return round_up(rpc_auth_payload_size(p), PCS_RPC_AUTH_PAYLOAD_ALIGN);
+}
+
+static inline struct pcs_rpc_payload* rpc_auth_payload_next(struct pcs_rpc_payload* p) {
+	return (void*)p + rpc_auth_payload_size_aligned(p);
+}
+
+#define PCS_RPC_DIGEST_PAYLOAD 13
+
+struct pcs_rpc_auth
+{
+	struct pcs_rpc_hdr	hdr;
+
+	PCS_CLUSTER_ID_T	cluster_id;		/* Cluster identity */
+	PCS_NODE_ID_T		sender_id;		/* Identity of sender */
+	PCS_NODE_ID_T		recipient_id;		/* Expected identity of recipient */
+	u8			sender_role;		/* Role of sender (TEST/CN/CS/MDS) */
+	u8			recipient_role;		/* Expected role of recipient */
+	u8			flags;			/* Flags */
+	u8			state;			/* state of auth handshake */
+	u32			version;		/* Protocol version */
+	struct pcs_host_info	host;
+	u32			reserved[3];
+	u32			npayloads;
+	struct pcs_rpc_payload	payload;
+} __attribute__((aligned(8)));
+
+
+#define PCS_RPC_AUTH_REQ	8
+#define PCS_RPC_AUTH_RESP	(PCS_RPC_AUTH_REQ | PCS_RPC_DIRECTION)
+
+static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
+{
+	struct pcs_rpc_engine *eng = ep->eng;
+	struct pcs_rpc_auth *au;
+	size_t msg_sz = sizeof(struct pcs_rpc_auth) +
+			round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN) +
+			rpc_auth_payload_size_aligned(&s_version_data.p);
+	struct pcs_msg *msg;
+	int err;
+
+	msg = pcs_rpc_alloc_output_msg(msg_sz);
+	if (!msg) {
+		TRACE("Can't alloc auth msg");
+		return -ENOMEM;
+	}
+
+	au = (struct pcs_rpc_auth *)msg->_inline_buffer;
+	*au = (struct pcs_rpc_auth) {
+		.hdr.type = PCS_RPC_AUTH_REQ,
+		.hdr.len = msg_sz,
+		.cluster_id = eng->cluster_id,
+		.sender_id = eng->local_id,
+		.recipient_id = ep->peer_id,
+		.recipient_role = ep->peer_role,
+		.version = PCS_VERSION_CURRENT,
+		.state = state,
+		.host = eng->my_host,
+		.npayloads = 2,
+	};
+	pcs_rpc_get_new_xid(eng, &au->hdr.xid);
+
+	if (size) {
+		au->payload.type = PCS_RPC_DIGEST_PAYLOAD;
+		au->payload.len = size;
+		memcpy(au + 1, data, size);
+	}
+	memcpy(rpc_auth_payload_next(&au->payload), &s_version_data,
+	       rpc_auth_payload_size(&s_version_data.p));
+
+	if (!(ep->flags & PCS_RPC_F_PEER_ID))
+		au->flags |= PCS_RPC_AUTH_F_VOID_RECIPIENT;
+	if (!(eng->flags & PCS_KNOWN_MYID)) {
+		au->flags |= PCS_RPC_AUTH_F_VOID_SENDER;
+		if (ep->flags & PCS_RPC_F_ACQ_ID)
+			au->flags |= PCS_RPC_AUTH_F_ACQ_SENDER;
+	}
+
+	if (!(eng->flags & PCS_KNOWN_CLUSTERID))
+		au->flags |= PCS_RPC_AUTH_F_VOID_CLUSTERID;
+
+	TRACE("state=%d, type=%d, len=%d, msg_sz: %lu",
+		au->state, au->payload.type, au->payload.len, msg_sz);
+
+	err = send_buf(ep->conn->socket, (u8*)au, msg_sz);
+	if (err)
+		TRACE("Can't send au msg, err: %d", err);
+	pcs_free_msg(msg);
+
+	return err;
+}
+
+static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
+{
+	struct pcs_rpc_auth *au;
+	size_t msg_sz = sizeof(struct pcs_rpc_auth) +
+			round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN) +
+			rpc_auth_payload_size_aligned(&s_version_data.p);
+	struct pcs_msg *msg;
+	int err;
+
+	msg = pcs_rpc_alloc_output_msg(msg_sz);
+	if (!msg) {
+		TRACE("Can't alloc auth msg");
+		return -ENOMEM;
+	}
+
+	err = recv_buf(ep->conn->socket, msg->_inline_buffer, msg_sz);
+	if (err) {
+	    TRACE("Can't recv auth msg(%d), err: %lu", err, msg_sz);
+	    goto fail;
+	}
+	au = (struct pcs_rpc_auth *)msg->_inline_buffer;
+
+	/* Fatal stream format error */
+	if (au->hdr.len < sizeof(au->hdr) || au->hdr.len > ep->params.max_msg_size) {
+		TRACE("Bad message header %u %u\n", au->hdr.len, au->hdr.type);
+		err = -EPROTO;
+		goto fail;
+	}
+	WARN_ON_ONCE(au->hdr.type != PCS_RPC_AUTH_RESP &&
+		     au->hdr.type != PCS_RPC_ERROR_RESP);
+
+	TRACE("state=%d, payloads:=%u, type=%d, len=%d", au->state,
+	      au->npayloads, au->payload.type, au->payload.len);
+	if (au->state != state) {
+		TRACE("Unexpected state %d, should be %d", au->state, state);
+		err = -EPROTO;
+		goto fail;
+	}
+	if (au->flags & PCS_RPC_AUTH_F_VOID_CLUSTERID)
+		TRACE("Wrong: auth void cluster");
+
+	WARN_ON_ONCE(au->npayloads != 2);
+	if (au->payload.len != size) {
+		TRACE("Wrong auth payload %u %u, data_sz: %lu\n",
+			au->payload.len, au->payload.type, size);
+		err = -EPROTO;
+		goto fail;
+	}
+	WARN_ON_ONCE(au->payload.type != PCS_RPC_DIGEST_PAYLOAD);
+	memcpy(data, &au->payload + 1, size);
+
+fail:
+	pcs_free_msg(msg);
+	return err;
+}
+
+static int pcs_do_auth_digest(struct pcs_rpc *ep)
+{
+	struct {
+		u8 key[DIGEST_KEY_LEN];
+		u8 salt[DIGEST_SALT_LEN];
+	} auth_cfg;
+	struct digest_hello_msg hi;
+	struct digest_srv_salt_msg slt;
+	struct digest_msg digest;
+	struct digest_hmac_msg hmac;
+	char *cluster_name = cc_from_rpc(ep->eng)->cluster_name;
+	int err;
+
+	err = pcs_load_keyfile_auth(cluster_name, (u8*)&auth_cfg, sizeof(auth_cfg));
+	if (err)
+		return err;
+
+	err = pcs_md5_hash(hi.md5_cn, cluster_name, strlen(cluster_name));
+	if (err) {
+		TRACE("Can't calculate md5 from cluster name, err: %d", err);
+		return err;
+	}
+
+	err = send_auth_msg(ep, &hi, sizeof(hi), PCS_AUTH_SEND_HELLO);
+	if (err) {
+		TRACE("Can't send hello auth msg, err: %d", err);
+		return err;
+	}
+
+	err = recv_auth_msg(ep, &slt, sizeof(slt), PCS_AUTH_SEND_SRV_CERT);
+	if (err) {
+		TRACE("Can't receive salt auth msg, err: %d", err);
+		return err;
+	}
+
+	if (memcmp(slt.salt, auth_cfg.salt, sizeof(auth_cfg.salt))) {
+		TRACE("Server use different salt");
+		return -EPROTO;
+	}
+
+	get_random_bytes(digest.id, sizeof(digest.id));
+	digest.hmac.size = sizeof(digest.hmac.data);
+	err = pcs_generate_hmac(auth_cfg.key, sizeof(auth_cfg.key), slt.id,
+				sizeof(slt.id), digest.hmac.data,
+				&digest.hmac.size);
+	if (err) {
+		TRACE("HMAC generate fail %d", err);
+		return err;
+	}
+
+	err = send_auth_msg(ep, &digest, sizeof(digest), PCS_AUTH_SEND_CN_CERT);
+	if (err) {
+		TRACE("Can't send digest msg, err: %d", err);
+		return err;
+	}
+
+	err = recv_auth_msg(ep, &hmac, sizeof(hmac), PCS_AUTH_SRV_ACCEPT);
+	if (err) {
+		TRACE("Can't receive hmac auth msg, err: %d", err);
+		return err;
+	}
+
+	if (!pcs_validate_hmac(&digest, auth_cfg.key, sizeof(auth_cfg.key),
+			       hmac.data, hmac.size)) {
+		TRACE("Received bad digest");
+		return -EPROTO;
+	}
+
+	err = send_auth_msg(ep, NULL, 0, PCS_AUTH_SRV_ACCEPT + 1);
+	if (err)
+		TRACE("Can't send auth srv accept msg, err: %d", err);
+
+	return err;
+}
+
+enum {
+	PCS_AUTH_DIGEST = 0,
+};
+
+static int rpc_client_start_auth(struct pcs_rpc *ep, int auth_type)
+{
+	switch (auth_type) {
+		case PCS_AUTH_DIGEST:
+			return pcs_do_auth_digest(ep);
+		default:
+			BUG();
+	}
+	return -EOPNOTSUPP;
+}
+
+int pcs_netaddr2sockaddr(PCS_NET_ADDR_T const* addr, struct sockaddr *sa, int *salen)
+{
+	BUG_ON(!sa);
+	if (addr->type == PCS_ADDRTYPE_IP || addr->type == PCS_ADDRTYPE_RDMA) {
+		struct sockaddr_in *saddr4 = (struct sockaddr_in *)sa;
+		*saddr4 = (struct sockaddr_in) {
+			.sin_family = AF_INET,
+			.sin_port = (u16)addr->port,
+		};
+		memcpy(&saddr4->sin_addr, addr->address, sizeof(saddr4->sin_addr));
+		*salen = sizeof(*saddr4);
+	} else if (addr->type == PCS_ADDRTYPE_IP6) {
+		struct sockaddr_in6 *saddr6 = (struct sockaddr_in6 *)sa;
+		*saddr6 = (struct sockaddr_in6) {
+			.sin6_family = AF_INET6,
+			.sin6_port = (u16)addr->port,
+		};
+		memcpy(&saddr6->sin6_addr, addr->address, sizeof(saddr6->sin6_addr));
+		*salen = sizeof(*saddr6);
+	} else
+		return -EINVAL;
+
+	return 0;
+}
+
+void pcs_sockconnect_start(struct pcs_rpc *ep)
+{
+	struct pcs_sockio *sio;
+	struct sockaddr *sa = &ep->sh.sa;
+	struct socket *sock;
+	int err, alloc_max = ep->params.alloc_hdr_size;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	sio = kzalloc(sizeof(struct pcs_sockio) + alloc_max, GFP_NOIO);
+	if (!sio) {
+		TRACE("Can't allocate sio\n");
+		goto fail;
+	}
+
+	INIT_LIST_HEAD(&sio->write_queue);
+	iov_iter_init_bad(&sio->read_iter);
+	iov_iter_init_bad(&sio->write_iter);
+	sio->hdr_max = sizeof(struct pcs_rpc_hdr);
+	sio->flags = sa->sa_family != AF_UNIX ? PCS_SOCK_F_CORK : 0;
+	INIT_LIST_HEAD(&sio->ioconn.list);
+
+	err = sock_create(sa->sa_family, SOCK_STREAM, 0, &sock);
+	if (err < 0) {
+		TRACE("Can't create socket: %d\n", err);
+		goto fail2;
+	}
+	pcs_clear_error(&sio->error);
+
+	err = sock->ops->connect(sock, sa, ep->sh.sa_len, O_NONBLOCK);
+	if (err != 0 && err != -EINPROGRESS) {
+		TRACE("Failed connection: %d\n", err);
+		sock_release(sock);
+		goto fail2;
+	}
+	pcs_sock_keepalive(sock);
+	if (!pcs_sock_cork(sock))
+		sio->flags |= PCS_SOCK_F_CORK;
+	else
+		pcs_sock_nodelay(sock);
+
+	TRACE(PEER_FMT " ->state:%d sock:%p\n", PEER_ARGS(ep), ep->state, sock);
+	cancel_delayed_work(&ep->timer_work);
+	ep->retries++;
+
+	ep->conn = &sio->ioconn;
+	sio->parent = pcs_rpc_get(ep);
+	sio->get_msg = rpc_get_hdr;
+	sio->eof = rpc_eof_cb;
+	sio->send_timeout = PCS_SIO_TIMEOUT;
+	sio->ioconn.socket = sock;
+	sio->ioconn.destruct = pcs_sock_internal_ioconn_destruct;
+	if (ep->gc)
+		list_lru_add(&ep->gc->lru, &ep->lru_link);
+
+	if (ep->flags & PCS_RPC_F_CLNT_PEER_ID)
+		ep->flags |= PCS_RPC_F_PEER_ID;
+
+	ep->state = PCS_RPC_AUTH;
+	err = rpc_client_start_auth(ep, PCS_AUTH_DIGEST);
+	if (err < 0) {
+		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR,
+			  "Authorization failed: %d", err);
+		goto fail; /* since ep->conn is initialized,
+			    * sio will be freed in pcs_rpc_reset()
+			    */
+	}
+	write_lock_bh(&sock->sk->sk_callback_lock);
+	/*
+	 * Backup original callbaks.
+	 * TCP and unix sockets do not have sk_user_data set.
+	 * So we avoid taking sk_callback_lock in callbacks,
+	 * since this seems to be able to result in performance.
+	 */
+	WARN_ON_ONCE(sock->sk->sk_user_data);
+	sio->ioconn.orig.user_data = sock->sk->sk_user_data;
+	sio->ioconn.orig.data_ready = sock->sk->sk_data_ready;
+	sio->ioconn.orig.write_space = sock->sk->sk_write_space;
+	sio->ioconn.orig.error_report = sock->sk->sk_error_report;
+
+	sock->sk->sk_sndtimeo = PCS_SIO_TIMEOUT;
+	sock->sk->sk_allocation = GFP_NOFS;
+
+	rcu_assign_sk_user_data(sock->sk, sio);
+	smp_wmb(); /* Pairs with smp_rmb() in callbacks */
+	sock->sk->sk_data_ready = pcs_sk_data_ready;
+	sock->sk->sk_write_space = pcs_sk_write_space;
+	sock->sk->sk_error_report = pcs_sk_error_report;
+	write_unlock_bh(&sock->sk->sk_callback_lock);
+
+	ep->state = PCS_RPC_APPWAIT;
+	pcs_rpc_enable(ep, 0);
+	return;
+fail2:
+	kfree(sio);
+fail:
+	pcs_rpc_reset(ep);
+	return;
+}
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.h b/fs/fuse/kio/pcs/pcs_sock_conn.h
new file mode 100644
index 000000000000..8b597dc7a74a
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.h
@@ -0,0 +1,7 @@ 
+#ifndef _PCS_SOCK_CONN_H_
+#define _PCS_SOCK_CONN_H_ 1
+
+void pcs_sockconnect_start(struct pcs_rpc *ep);
+int pcs_netaddr2sockaddr(PCS_NET_ADDR_T const* addr, struct sockaddr *sa, int *salen);
+
+#endif /* _PCS_SOCK_CONN_H_ */
\ No newline at end of file
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index bae7610af826..007df3b77bdd 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -41,7 +41,6 @@  void sio_push(struct pcs_sockio * sio)
 //// caseA: userspace close socket and wait for kernelspace
 //// caseB: kernelspace want to close socket and have to somehow
 ////	    notify about this to userspace (NEW API REQUIRED)
-static void pcs_restore_sockets(struct pcs_ioconn *ioconn);
 static void pcs_ioconn_unregister(struct pcs_ioconn *ioconn)
 {
 	if (!test_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags))
@@ -472,19 +471,18 @@  int pcs_sock_queuelen(struct pcs_sockio * sio)
 	return sio->write_queue_len;
 }
 
-static void pcs_restore_sockets(struct pcs_ioconn *ioconn)
+void pcs_restore_sockets(struct pcs_ioconn *ioconn)
 {
-
-	struct sock *sk;
-
-	sk = ioconn->socket->sk;
+	struct sock *sk = ioconn->socket->sk;
 
 	write_lock_bh(&sk->sk_callback_lock);
-	rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
-	sk->sk_data_ready =   ioconn->orig.data_ready;
-	sk->sk_write_space =  ioconn->orig.write_space;
-	sk->sk_error_report = ioconn->orig.error_report;
-	//sock->sk->sk_state_change = pcs_state_chage;
+	if (sk->sk_user_data) {
+		rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
+		sk->sk_data_ready =   ioconn->orig.data_ready;
+		sk->sk_write_space =  ioconn->orig.write_space;
+		sk->sk_error_report = ioconn->orig.error_report;
+		//sock->sk->sk_state_change = pcs_state_chage;
+	}
 	write_unlock_bh(&sk->sk_callback_lock);
 
 	sk->sk_sndtimeo = MAX_SCHEDULE_TIMEOUT;
@@ -501,17 +499,22 @@  static void sio_destroy_rcu(struct rcu_head *head)
 	kfree(sio);
 }
 
-void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
+static void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn, bool internal)
 {
 	struct pcs_sockio * sio = sio_from_ioconn(ioconn);
 
+	TRACE("Sock destruct_cb, sio: %p, internal: %d", sio, internal);
+
 	BUG_ON(sio->current_msg);
 	BUG_ON(!list_empty(&sio->write_queue));
 	BUG_ON(sio->write_queue_len);
 
 	if (ioconn->socket) {
 		pcs_restore_sockets(ioconn);
-		fput(ioconn->socket->file);
+		if (internal)
+			sock_release(ioconn->socket);
+		else
+			fput(ioconn->socket->file);
 		ioconn->socket = NULL;
 	}
 
@@ -519,6 +522,16 @@  void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
 	call_rcu(&sio->rcu, sio_destroy_rcu);
 }
 
+void pcs_sock_internal_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+	pcs_sock_ioconn_destruct(ioconn, true);
+}
+
+void pcs_sock_external_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+	pcs_sock_ioconn_destruct(ioconn, false);
+}
+
 static void pcs_sk_kick_queue(struct sock *sk)
 {
 	struct pcs_sockio *sio;
@@ -535,17 +548,17 @@  static void pcs_sk_kick_queue(struct sock *sk)
 	rcu_read_unlock();
 }
 
-static void pcs_sk_data_ready(struct sock *sk, int count)
+void pcs_sk_data_ready(struct sock *sk, int count)
 {
 	pcs_sk_kick_queue(sk);
 }
-static void pcs_sk_write_space(struct sock *sk)
+void pcs_sk_write_space(struct sock *sk)
 {
 	pcs_sk_kick_queue(sk);
 }
 
 /* TODO this call back does not look correct, sane locking/error handling is required */
-static void pcs_sk_error_report(struct sock *sk)
+void pcs_sk_error_report(struct sock *sk)
 {
 	struct pcs_sockio *sio;
 
@@ -607,7 +620,7 @@  struct pcs_sockio * pcs_sockio_init(struct socket *sock,
 	sk->sk_allocation = GFP_NOFS;
 	sio->send_timeout = PCS_SIO_TIMEOUT;
 	sio->ioconn.socket = sock;
-	sio->ioconn.destruct = pcs_sock_ioconn_destruct;
+	sio->ioconn.destruct = pcs_sock_external_ioconn_destruct;
 
 	rcu_assign_sk_user_data(sk, sio);
 	smp_wmb(); /* Pairs with smp_rmb() in callbacks */
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 7795212045c2..b50c784ee3b9 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -2,6 +2,9 @@ 
 #define _PCS_SOCK_IO_H_ 1
 
 #include <linux/net.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/un.h>
 
 #include "pcs_types.h"
 ////#include "pcs_process.h"
@@ -154,6 +157,10 @@  int pcs_sock_queuelen(struct pcs_sockio * sio);
 void pcs_sock_abort(struct pcs_sockio * sio);
 void pcs_sock_error(struct pcs_sockio * sio, int error);
 
+void pcs_sk_data_ready(struct sock *sk, int count);
+void pcs_sk_write_space(struct sock *sk);
+void pcs_sk_error_report(struct sock *sk);
+
 void pcs_sock_throttle(struct pcs_sockio * sio);
 void pcs_sock_unthrottle(struct pcs_sockio * sio);
 
@@ -164,6 +171,8 @@  struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int data_len);
 void pcs_clone_done(struct pcs_msg * msg);
 void pcs_free_msg(struct pcs_msg * msg);
 void pcs_get_iter_inline(struct pcs_msg * msg, int offset, struct iov_iter*it);
+void pcs_sock_internal_ioconn_destruct(struct pcs_ioconn *ioconn);
+void pcs_sock_external_ioconn_destruct(struct pcs_ioconn *ioconn);
 
 static inline void * msg_inline_head(struct pcs_msg * msg)
 {
diff --git a/fs/fuse/kio/pcs/pcs_types.h b/fs/fuse/kio/pcs/pcs_types.h
index 1170475c2226..1915fd047ab5 100644
--- a/fs/fuse/kio/pcs/pcs_types.h
+++ b/fs/fuse/kio/pcs/pcs_types.h
@@ -27,6 +27,7 @@  enum
 	PCS_ADDRTYPE_IP6 = 2,
 	PCS_ADDRTYPE_UNIX = 3,
 	PCS_ADDRTYPE_RDMA = 4,
+	PCS_ADDRTYPE_NETLINK = 5,
 };
 
 /* alignment makes it usable in binary protocols */