[1/5] Preparing image receive and send for asynchronous sockets.

Submitted by Rodrigo Bruno on May 14, 2018, 12:29 a.m.

Details

Message ID 20180514002951.5086-1-rbruno@gsd.inesc-id.pt
State New
Series "Series without cover letter"
Headers show

Commit Message

Rodrigo Bruno May 14, 2018, 12:29 a.m.
From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>

---
 criu/img-remote.c         | 93 +++++++++++++++++++++++++++++----------
 criu/include/img-remote.h | 28 ++++++++++--
 2 files changed, 93 insertions(+), 28 deletions(-)

Patch hide | download patch | download mbox

diff --git a/criu/img-remote.c b/criu/img-remote.c
index f812c52d..70db71e2 100644
--- a/criu/img-remote.c
+++ b/criu/img-remote.c
@@ -470,8 +470,6 @@  static struct rimage *new_remote_image(char *path, char *snapshot_id)
 	buf->nbytes = 0;
 	INIT_LIST_HEAD(&(rimg->buf_head));
 	list_add_tail(&(buf->l), &(rimg->buf_head));
-	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-	rimg->curr_sent_bytes = 0;
 
 	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
 		pr_err("Remote image in_use mutex init failed\n");
@@ -498,8 +496,6 @@  static struct rimage *clear_remote_image(struct rimage *rimg)
 
 	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
 	rimg->size = 0;
-	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-	rimg->curr_sent_bytes = 0;
 
 	pthread_mutex_unlock(&(rimg->in_use));
 
@@ -669,18 +665,43 @@  err:
 	return NULL;
 }
 
+
+int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
+{
+	int ret;
+	struct roperation *op = malloc(sizeof(struct roperation));
+	bzero(op, sizeof(struct roperation));
+	op->fd = fd;
+	op->rimg = rimg;
+	op->size = size;
+	op->flags = flags;
+	op->close_fd = close_fd;
+	op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	while ((ret = recv_image_async(op)) < 0)
+		if (ret != EAGAIN && ret != EWOULDBLOCK)
+			return -1;
+	return ret;
+}
+
 /* Note: size is a limit on how much we want to read from the socket.  Zero means
  * read until the socket is closed.
  */
-int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
+int64_t recv_image_async(struct roperation *op)
 {
-	struct rbuf *curr_buf = NULL;
+	int fd = op->fd;
+	struct rimage *rimg = op->rimg;
+	uint64_t size = op->size;
+	int flags = op->flags;
+	bool close_fd = op->close_fd;
+	struct rbuf *curr_buf = op->curr_recv_buf;
 	int n;
 
-	if (flags == O_APPEND)
-		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
-	else
-		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	if (curr_buf == NULL) {
+		if (flags == O_APPEND)
+			curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+		else
+			curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	}
 
 	while (1) {
 		n = read(fd,
@@ -712,6 +733,8 @@  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
 					close(fd);
 				return rimg->size;
 			}
+		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+			return errno;
 		} else {
 			pr_perror("Read on %s:%s socket failed",
 				rimg->path, rimg->snapshot_id);
@@ -724,37 +747,59 @@  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
 
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
 {
+	int ret;
+	struct roperation *op = malloc(sizeof(struct roperation));
+	bzero(op, sizeof(struct roperation));
+	op->fd = fd;
+	op->rimg = rimg;
+	op->flags = flags;
+	op->close_fd = close_fd;
+	op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	while ((ret = send_image_async(op)) < 0)
+		if (ret != EAGAIN && ret != EWOULDBLOCK)
+			return -1;
+	return ret;
+}
 
-	int n, nblocks = 0;
+int64_t send_image_async(struct roperation *op)
+{
+	int fd = op->fd;
+	struct rimage *rimg = op->rimg;
+	int flags = op->flags;
+	bool close_fd = op->close_fd;
+	int n;
 
 	if (flags != O_APPEND) {
-		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-		rimg->curr_sent_bytes = 0;
+		op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		op->curr_sent_bytes = 0;
 	}
 
 	while (1) {
 		n = send(
 		    fd,
-		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
-		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
+		    op->curr_sent_buf->buffer + op->curr_sent_bytes,
+		    min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes,
 		    MSG_NOSIGNAL);
 		if (n > -1) {
-			rimg->curr_sent_bytes += n;
-			if (rimg->curr_sent_bytes == BUF_SIZE) {
-				rimg->curr_sent_buf =
-				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
-				nblocks++;
-				rimg->curr_sent_bytes = 0;
-			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
+			op->curr_sent_bytes += n;
+			if (op->curr_sent_bytes == BUF_SIZE) {
+				op->curr_sent_buf =
+				    list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
+				op->nblocks++;
+				op->curr_sent_bytes = 0;
+			} else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
 				if (close_fd)
 					close(fd);
-				return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
+				return op->nblocks*BUF_SIZE + op->curr_sent_buf->nbytes;
 			}
 		} else if (errno == EPIPE || errno == ECONNRESET) {
 			pr_warn("Connection for %s:%s was closed early than expected\n",
 				rimg->path, rimg->snapshot_id);
 			return 0;
-		} else {
+		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+			return errno;
+		}
+		else {
 			pr_perror("Write on %s:%s socket failed",
 				rimg->path, rimg->snapshot_id);
 			return -1;
diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
index 1771d310..0947e7f0 100644
--- a/criu/include/img-remote.h
+++ b/criu/include/img-remote.h
@@ -36,10 +36,6 @@  struct rimage {
 	char snapshot_id[PATHLEN];
 	struct list_head l;
 	struct list_head buf_head;
-	/* Used to track already sent buffers when the image is appended. */
-	struct rbuf *curr_sent_buf;
-	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
-	int curr_sent_bytes;
 	uint64_t size; /* number of bytes */
 	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
 };
@@ -57,6 +53,28 @@  struct wthread {
 	sem_t wakeup_sem;
 };
 
+/* Structure that describes the state of a remote operation on remote images. */
+struct roperation {
+	/* File descriptor being used. */
+	int fd;
+	/* Remote image being used. */
+	struct rimage *rimg;
+	/* Flags for the operation. */
+	int flags;
+	/* If fd should be closed when the operation is done. */
+	bool close_fd;
+	/* Note: recv operation only. How much bytes should be received. */
+	uint64_t size;
+	/* Note: recv operation only. Buffer being writen. */
+	struct rbuf *curr_recv_buf;
+	/* Note: send operation only. Number of blocks already sent. */
+	int nblocks;
+	/* Note: send operation only. Pointer to buffer being sent. */
+	struct rbuf *curr_sent_buf;
+	/* Note: send operation only. Number of bytes sent in 'curr_send_buf. */
+	uint64_t curr_sent_bytes;
+};
+
 /* This variable is used to indicate when the dump is finished. */
 extern bool finished;
 /* This is the proxy to cache TCP socket FD. */
@@ -80,7 +98,9 @@  void *accept_remote_image_connections(void *ptr);
 
 int64_t forward_image(struct rimage *rimg);
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
+int64_t send_image_async(struct roperation *op);
 int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
+int64_t recv_image_async(struct roperation *op);
 
 int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
 int64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);

Comments

Andrei Vagin May 15, 2018, 6:49 a.m.
I rebased these patches and added a few minor fixes:
https://github.com/avagin/criu/tree/remote

On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>
> 
> ---
>  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++----------
>  criu/include/img-remote.h | 28 ++++++++++--
>  2 files changed, 93 insertions(+), 28 deletions(-)
> 
> diff --git a/criu/img-remote.c b/criu/img-remote.c
> index f812c52d..70db71e2 100644
> --- a/criu/img-remote.c
> +++ b/criu/img-remote.c
> @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char *path, char *snapshot_id)
>  	buf->nbytes = 0;
>  	INIT_LIST_HEAD(&(rimg->buf_head));
>  	list_add_tail(&(buf->l), &(rimg->buf_head));
> -	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -	rimg->curr_sent_bytes = 0;
>  
>  	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
>  		pr_err("Remote image in_use mutex init failed\n");
> @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct rimage *rimg)
>  
>  	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
>  	rimg->size = 0;
> -	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -	rimg->curr_sent_bytes = 0;
>  
>  	pthread_mutex_unlock(&(rimg->in_use));
>  
> @@ -669,18 +665,43 @@ err:
>  	return NULL;
>  }
>  
> +
> +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
> +{
> +	int ret;
> +	struct roperation *op = malloc(sizeof(struct roperation));
> +	bzero(op, sizeof(struct roperation));
> +	op->fd = fd;
> +	op->rimg = rimg;
> +	op->size = size;
> +	op->flags = flags;
> +	op->close_fd = close_fd;
> +	op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	while ((ret = recv_image_async(op)) < 0)
> +		if (ret != EAGAIN && ret != EWOULDBLOCK)
> +			return -1;
> +	return ret;
> +}
> +
>  /* Note: size is a limit on how much we want to read from the socket.  Zero means
>   * read until the socket is closed.
>   */
> -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
> +int64_t recv_image_async(struct roperation *op)
>  {
> -	struct rbuf *curr_buf = NULL;
> +	int fd = op->fd;
> +	struct rimage *rimg = op->rimg;
> +	uint64_t size = op->size;
> +	int flags = op->flags;
> +	bool close_fd = op->close_fd;
> +	struct rbuf *curr_buf = op->curr_recv_buf;
>  	int n;
>  
> -	if (flags == O_APPEND)
> -		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> -	else
> -		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	if (curr_buf == NULL) {
> +		if (flags == O_APPEND)
> +			curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> +		else
> +			curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	}
>  
>  	while (1) {
>  		n = read(fd,
> @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
>  					close(fd);
>  				return rimg->size;
>  			}
> +		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> +			return errno;
>  		} else {
>  			pr_perror("Read on %s:%s socket failed",
>  				rimg->path, rimg->snapshot_id);
> @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
>  
>  int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
>  {
> +	int ret;
> +	struct roperation *op = malloc(sizeof(struct roperation));
> +	bzero(op, sizeof(struct roperation));
> +	op->fd = fd;
> +	op->rimg = rimg;
> +	op->flags = flags;
> +	op->close_fd = close_fd;
> +	op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	while ((ret = send_image_async(op)) < 0)
> +		if (ret != EAGAIN && ret != EWOULDBLOCK)
> +			return -1;
> +	return ret;
> +}
>  
> -	int n, nblocks = 0;
> +int64_t send_image_async(struct roperation *op)
> +{
> +	int fd = op->fd;
> +	struct rimage *rimg = op->rimg;
> +	int flags = op->flags;
> +	bool close_fd = op->close_fd;
> +	int n;
>  
>  	if (flags != O_APPEND) {
> -		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -		rimg->curr_sent_bytes = 0;
> +		op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +		op->curr_sent_bytes = 0;
>  	}
>  
>  	while (1) {
>  		n = send(
>  		    fd,
> -		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
> -		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
> +		    op->curr_sent_buf->buffer + op->curr_sent_bytes,
> +		    min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes,
>  		    MSG_NOSIGNAL);
>  		if (n > -1) {
> -			rimg->curr_sent_bytes += n;
> -			if (rimg->curr_sent_bytes == BUF_SIZE) {
> -				rimg->curr_sent_buf =
> -				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
> -				nblocks++;
> -				rimg->curr_sent_bytes = 0;
> -			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
> +			op->curr_sent_bytes += n;
> +			if (op->curr_sent_bytes == BUF_SIZE) {
> +				op->curr_sent_buf =
> +				    list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
> +				op->nblocks++;
> +				op->curr_sent_bytes = 0;
> +			} else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
>  				if (close_fd)
>  					close(fd);
> -				return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
> +				return op->nblocks*BUF_SIZE + op->curr_sent_buf->nbytes;
>  			}
>  		} else if (errno == EPIPE || errno == ECONNRESET) {
>  			pr_warn("Connection for %s:%s was closed early than expected\n",
>  				rimg->path, rimg->snapshot_id);
>  			return 0;
> -		} else {
> +		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> +			return errno;
> +		}
> +		else {
>  			pr_perror("Write on %s:%s socket failed",
>  				rimg->path, rimg->snapshot_id);
>  			return -1;
> diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> index 1771d310..0947e7f0 100644
> --- a/criu/include/img-remote.h
> +++ b/criu/include/img-remote.h
> @@ -36,10 +36,6 @@ struct rimage {
>  	char snapshot_id[PATHLEN];
>  	struct list_head l;
>  	struct list_head buf_head;
> -	/* Used to track already sent buffers when the image is appended. */
> -	struct rbuf *curr_sent_buf;
> -	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
> -	int curr_sent_bytes;
>  	uint64_t size; /* number of bytes */
>  	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
>  };
> @@ -57,6 +53,28 @@ struct wthread {
>  	sem_t wakeup_sem;
>  };
>  
> +/* Structure that describes the state of a remote operation on remote images. */
> +struct roperation {
> +	/* File descriptor being used. */
> +	int fd;
> +	/* Remote image being used. */
> +	struct rimage *rimg;
> +	/* Flags for the operation. */
> +	int flags;
> +	/* If fd should be closed when the operation is done. */
> +	bool close_fd;
> +	/* Note: recv operation only. How much bytes should be received. */
> +	uint64_t size;
> +	/* Note: recv operation only. Buffer being writen. */
> +	struct rbuf *curr_recv_buf;
> +	/* Note: send operation only. Number of blocks already sent. */
> +	int nblocks;
> +	/* Note: send operation only. Pointer to buffer being sent. */
> +	struct rbuf *curr_sent_buf;
> +	/* Note: send operation only. Number of bytes sent in 'curr_send_buf. */
> +	uint64_t curr_sent_bytes;
> +};
> +
>  /* This variable is used to indicate when the dump is finished. */
>  extern bool finished;
>  /* This is the proxy to cache TCP socket FD. */
> @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
>  
>  int64_t forward_image(struct rimage *rimg);
>  int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
> +int64_t send_image_async(struct roperation *op);
>  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
> +int64_t recv_image_async(struct roperation *op);
>  
>  int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
>  int64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);
> -- 
> 2.17.0
> 
> _______________________________________________
> CRIU mailing list
> CRIU@openvz.org
> https://lists.openvz.org/mailman/listinfo/criu
Rodrigo Bruno May 15, 2018, 7:52 a.m.
Hi Andrei,

thank you.

So we can start working on fixing bugs in order for the zdtm tests to pass
sending patches against
https://github.com/avagin/criu/tree/remote ?

cheers,
rodrigo

2018-05-15 7:49 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:

>
> I rebased these patches and added a few minor fixes:
> https://github.com/avagin/criu/tree/remote
>
> On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> > From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>
> >
> > ---
> >  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++----------
> >  criu/include/img-remote.h | 28 ++++++++++--
> >  2 files changed, 93 insertions(+), 28 deletions(-)
> >
> > diff --git a/criu/img-remote.c b/criu/img-remote.c
> > index f812c52d..70db71e2 100644
> > --- a/criu/img-remote.c
> > +++ b/criu/img-remote.c
> > @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char *path,
> char *snapshot_id)
> >       buf->nbytes = 0;
> >       INIT_LIST_HEAD(&(rimg->buf_head));
> >       list_add_tail(&(buf->l), &(rimg->buf_head));
> > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> rbuf, l);
> > -     rimg->curr_sent_bytes = 0;
> >
> >       if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> >               pr_err("Remote image in_use mutex init failed\n");
> > @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct
> rimage *rimg)
> >
> >       list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> >       rimg->size = 0;
> > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> rbuf, l);
> > -     rimg->curr_sent_bytes = 0;
> >
> >       pthread_mutex_unlock(&(rimg->in_use));
> >
> > @@ -669,18 +665,43 @@ err:
> >       return NULL;
> >  }
> >
> > +
> > +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> flags, bool close_fd)
> > +{
> > +     int ret;
> > +     struct roperation *op = malloc(sizeof(struct roperation));
> > +     bzero(op, sizeof(struct roperation));
> > +     op->fd = fd;
> > +     op->rimg = rimg;
> > +     op->size = size;
> > +     op->flags = flags;
> > +     op->close_fd = close_fd;
> > +     op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf,
> l);
> > +     while ((ret = recv_image_async(op)) < 0)
> > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > +                     return -1;
> > +     return ret;
> > +}
> > +
> >  /* Note: size is a limit on how much we want to read from the socket.
> Zero means
> >   * read until the socket is closed.
> >   */
> > -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> flags, bool close_fd)
> > +int64_t recv_image_async(struct roperation *op)
> >  {
> > -     struct rbuf *curr_buf = NULL;
> > +     int fd = op->fd;
> > +     struct rimage *rimg = op->rimg;
> > +     uint64_t size = op->size;
> > +     int flags = op->flags;
> > +     bool close_fd = op->close_fd;
> > +     struct rbuf *curr_buf = op->curr_recv_buf;
> >       int n;
> >
> > -     if (flags == O_APPEND)
> > -             curr_buf = list_entry(rimg->buf_head.prev, struct rbuf,
> l);
> > -     else
> > -             curr_buf = list_entry(rimg->buf_head.next, struct rbuf,
> l);
> > +     if (curr_buf == NULL) {
> > +             if (flags == O_APPEND)
> > +                     curr_buf = list_entry(rimg->buf_head.prev, struct
> rbuf, l);
> > +             else
> > +                     curr_buf = list_entry(rimg->buf_head.next, struct
> rbuf, l);
> > +     }
> >
> >       while (1) {
> >               n = read(fd,
> > @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg,
> uint64_t size, int flags, bool c
> >                                       close(fd);
> >                               return rimg->size;
> >                       }
> > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > +                     return errno;
> >               } else {
> >                       pr_perror("Read on %s:%s socket failed",
> >                               rimg->path, rimg->snapshot_id);
> > @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg,
> uint64_t size, int flags, bool c
> >
> >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> close_fd)
> >  {
> > +     int ret;
> > +     struct roperation *op = malloc(sizeof(struct roperation));
> > +     bzero(op, sizeof(struct roperation));
> > +     op->fd = fd;
> > +     op->rimg = rimg;
> > +     op->flags = flags;
> > +     op->close_fd = close_fd;
> > +     op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf,
> l);
> > +     while ((ret = send_image_async(op)) < 0)
> > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > +                     return -1;
> > +     return ret;
> > +}
> >
> > -     int n, nblocks = 0;
> > +int64_t send_image_async(struct roperation *op)
> > +{
> > +     int fd = op->fd;
> > +     struct rimage *rimg = op->rimg;
> > +     int flags = op->flags;
> > +     bool close_fd = op->close_fd;
> > +     int n;
> >
> >       if (flags != O_APPEND) {
> > -             rimg->curr_sent_buf = list_entry(rimg->buf_head.next,
> struct rbuf, l);
> > -             rimg->curr_sent_bytes = 0;
> > +             op->curr_sent_buf = list_entry(rimg->buf_head.next,
> struct rbuf, l);
> > +             op->curr_sent_bytes = 0;
> >       }
> >
> >       while (1) {
> >               n = send(
> >                   fd,
> > -                 rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
> > -                 min(BUF_SIZE, rimg->curr_sent_buf->nbytes) -
> rimg->curr_sent_bytes,
> > +                 op->curr_sent_buf->buffer + op->curr_sent_bytes,
> > +                 min(BUF_SIZE, op->curr_sent_buf->nbytes) -
> op->curr_sent_bytes,
> >                   MSG_NOSIGNAL);
> >               if (n > -1) {
> > -                     rimg->curr_sent_bytes += n;
> > -                     if (rimg->curr_sent_bytes == BUF_SIZE) {
> > -                             rimg->curr_sent_buf =
> > -                                 list_entry(rimg->curr_sent_buf->l.next,
> struct rbuf, l);
> > -                             nblocks++;
> > -                             rimg->curr_sent_bytes = 0;
> > -                     } else if (rimg->curr_sent_bytes ==
> rimg->curr_sent_buf->nbytes) {
> > +                     op->curr_sent_bytes += n;
> > +                     if (op->curr_sent_bytes == BUF_SIZE) {
> > +                             op->curr_sent_buf =
> > +                                 list_entry(op->curr_sent_buf->l.next,
> struct rbuf, l);
> > +                             op->nblocks++;
> > +                             op->curr_sent_bytes = 0;
> > +                     } else if (op->curr_sent_bytes ==
> op->curr_sent_buf->nbytes) {
> >                               if (close_fd)
> >                                       close(fd);
> > -                             return nblocks*BUF_SIZE +
> rimg->curr_sent_buf->nbytes;
> > +                             return op->nblocks*BUF_SIZE +
> op->curr_sent_buf->nbytes;
> >                       }
> >               } else if (errno == EPIPE || errno == ECONNRESET) {
> >                       pr_warn("Connection for %s:%s was closed early
> than expected\n",
> >                               rimg->path, rimg->snapshot_id);
> >                       return 0;
> > -             } else {
> > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > +                     return errno;
> > +             }
> > +             else {
> >                       pr_perror("Write on %s:%s socket failed",
> >                               rimg->path, rimg->snapshot_id);
> >                       return -1;
> > diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> > index 1771d310..0947e7f0 100644
> > --- a/criu/include/img-remote.h
> > +++ b/criu/include/img-remote.h
> > @@ -36,10 +36,6 @@ struct rimage {
> >       char snapshot_id[PATHLEN];
> >       struct list_head l;
> >       struct list_head buf_head;
> > -     /* Used to track already sent buffers when the image is appended.
> */
> > -     struct rbuf *curr_sent_buf;
> > -     /* Similar to the previous field. Number of bytes sent in
> 'curr_sent_buf'. */
> > -     int curr_sent_bytes;
> >       uint64_t size; /* number of bytes */
> >       pthread_mutex_t in_use; /* Only one operation at a time, per
> image. */
> >  };
> > @@ -57,6 +53,28 @@ struct wthread {
> >       sem_t wakeup_sem;
> >  };
> >
> > +/* Structure that describes the state of a remote operation on remote
> images. */
> > +struct roperation {
> > +     /* File descriptor being used. */
> > +     int fd;
> > +     /* Remote image being used. */
> > +     struct rimage *rimg;
> > +     /* Flags for the operation. */
> > +     int flags;
> > +     /* If fd should be closed when the operation is done. */
> > +     bool close_fd;
> > +     /* Note: recv operation only. How much bytes should be received. */
> > +     uint64_t size;
> > +     /* Note: recv operation only. Buffer being writen. */
> > +     struct rbuf *curr_recv_buf;
> > +     /* Note: send operation only. Number of blocks already sent. */
> > +     int nblocks;
> > +     /* Note: send operation only. Pointer to buffer being sent. */
> > +     struct rbuf *curr_sent_buf;
> > +     /* Note: send operation only. Number of bytes sent in
> 'curr_send_buf. */
> > +     uint64_t curr_sent_bytes;
> > +};
> > +
> >  /* This variable is used to indicate when the dump is finished. */
> >  extern bool finished;
> >  /* This is the proxy to cache TCP socket FD. */
> > @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
> >
> >  int64_t forward_image(struct rimage *rimg);
> >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> image_check);
> > +int64_t send_image_async(struct roperation *op);
> >  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> flags, bool image_check);
> > +int64_t recv_image_async(struct roperation *op);
> >
> >  int64_t read_remote_header(int fd, char *snapshot_id, char *path, int
> *open_mode, uint64_t *size);
> >  int64_t write_remote_header(int fd, char *snapshot_id, char *path, int
> open_mode, uint64_t size);
> > --
> > 2.17.0
> >
> > _______________________________________________
> > CRIU mailing list
> > CRIU@openvz.org
> > https://lists.openvz.org/mailman/listinfo/criu
>
Andrei Vagin May 16, 2018, 5:35 a.m.
On Tue, May 15, 2018 at 08:52:03AM +0100, Rodrigo Bruno wrote:
> Hi Andrei,
> 
> thank you.
> 
> So we can start working on fixing bugs in order for the zdtm tests to pass
> sending patches against
> https://github.com/avagin/criu/tree/remote ?

I'm going to merge these patches into criu-dev soon.

> 
> cheers,
> rodrigo
> 
> 2018-05-15 7:49 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:
> 
> >
> > I rebased these patches and added a few minor fixes:
> > https://github.com/avagin/criu/tree/remote
> >
> > On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> > > From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>
> > >
> > > ---
> > >  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++----------
> > >  criu/include/img-remote.h | 28 ++++++++++--
> > >  2 files changed, 93 insertions(+), 28 deletions(-)
> > >
> > > diff --git a/criu/img-remote.c b/criu/img-remote.c
> > > index f812c52d..70db71e2 100644
> > > --- a/criu/img-remote.c
> > > +++ b/criu/img-remote.c
> > > @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char *path,
> > char *snapshot_id)
> > >       buf->nbytes = 0;
> > >       INIT_LIST_HEAD(&(rimg->buf_head));
> > >       list_add_tail(&(buf->l), &(rimg->buf_head));
> > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > rbuf, l);
> > > -     rimg->curr_sent_bytes = 0;
> > >
> > >       if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> > >               pr_err("Remote image in_use mutex init failed\n");
> > > @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct
> > rimage *rimg)
> > >
> > >       list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> > >       rimg->size = 0;
> > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > rbuf, l);
> > > -     rimg->curr_sent_bytes = 0;
> > >
> > >       pthread_mutex_unlock(&(rimg->in_use));
> > >
> > > @@ -669,18 +665,43 @@ err:
> > >       return NULL;
> > >  }
> > >
> > > +
> > > +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > flags, bool close_fd)
> > > +{
> > > +     int ret;
> > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > +     bzero(op, sizeof(struct roperation));
> > > +     op->fd = fd;
> > > +     op->rimg = rimg;
> > > +     op->size = size;
> > > +     op->flags = flags;
> > > +     op->close_fd = close_fd;
> > > +     op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf,
> > l);
> > > +     while ((ret = recv_image_async(op)) < 0)
> > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > +                     return -1;
> > > +     return ret;
> > > +}
> > > +
> > >  /* Note: size is a limit on how much we want to read from the socket.
> > Zero means
> > >   * read until the socket is closed.
> > >   */
> > > -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > flags, bool close_fd)
> > > +int64_t recv_image_async(struct roperation *op)
> > >  {
> > > -     struct rbuf *curr_buf = NULL;
> > > +     int fd = op->fd;
> > > +     struct rimage *rimg = op->rimg;
> > > +     uint64_t size = op->size;
> > > +     int flags = op->flags;
> > > +     bool close_fd = op->close_fd;
> > > +     struct rbuf *curr_buf = op->curr_recv_buf;
> > >       int n;
> > >
> > > -     if (flags == O_APPEND)
> > > -             curr_buf = list_entry(rimg->buf_head.prev, struct rbuf,
> > l);
> > > -     else
> > > -             curr_buf = list_entry(rimg->buf_head.next, struct rbuf,
> > l);
> > > +     if (curr_buf == NULL) {
> > > +             if (flags == O_APPEND)
> > > +                     curr_buf = list_entry(rimg->buf_head.prev, struct
> > rbuf, l);
> > > +             else
> > > +                     curr_buf = list_entry(rimg->buf_head.next, struct
> > rbuf, l);
> > > +     }
> > >
> > >       while (1) {
> > >               n = read(fd,
> > > @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > uint64_t size, int flags, bool c
> > >                                       close(fd);
> > >                               return rimg->size;
> > >                       }
> > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > +                     return errno;
> > >               } else {
> > >                       pr_perror("Read on %s:%s socket failed",
> > >                               rimg->path, rimg->snapshot_id);
> > > @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > uint64_t size, int flags, bool c
> > >
> > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > close_fd)
> > >  {
> > > +     int ret;
> > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > +     bzero(op, sizeof(struct roperation));
> > > +     op->fd = fd;
> > > +     op->rimg = rimg;
> > > +     op->flags = flags;
> > > +     op->close_fd = close_fd;
> > > +     op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf,
> > l);
> > > +     while ((ret = send_image_async(op)) < 0)
> > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > +                     return -1;
> > > +     return ret;
> > > +}
> > >
> > > -     int n, nblocks = 0;
> > > +int64_t send_image_async(struct roperation *op)
> > > +{
> > > +     int fd = op->fd;
> > > +     struct rimage *rimg = op->rimg;
> > > +     int flags = op->flags;
> > > +     bool close_fd = op->close_fd;
> > > +     int n;
> > >
> > >       if (flags != O_APPEND) {
> > > -             rimg->curr_sent_buf = list_entry(rimg->buf_head.next,
> > struct rbuf, l);
> > > -             rimg->curr_sent_bytes = 0;
> > > +             op->curr_sent_buf = list_entry(rimg->buf_head.next,
> > struct rbuf, l);
> > > +             op->curr_sent_bytes = 0;
> > >       }
> > >
> > >       while (1) {
> > >               n = send(
> > >                   fd,
> > > -                 rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
> > > -                 min(BUF_SIZE, rimg->curr_sent_buf->nbytes) -
> > rimg->curr_sent_bytes,
> > > +                 op->curr_sent_buf->buffer + op->curr_sent_bytes,
> > > +                 min(BUF_SIZE, op->curr_sent_buf->nbytes) -
> > op->curr_sent_bytes,
> > >                   MSG_NOSIGNAL);
> > >               if (n > -1) {
> > > -                     rimg->curr_sent_bytes += n;
> > > -                     if (rimg->curr_sent_bytes == BUF_SIZE) {
> > > -                             rimg->curr_sent_buf =
> > > -                                 list_entry(rimg->curr_sent_buf->l.next,
> > struct rbuf, l);
> > > -                             nblocks++;
> > > -                             rimg->curr_sent_bytes = 0;
> > > -                     } else if (rimg->curr_sent_bytes ==
> > rimg->curr_sent_buf->nbytes) {
> > > +                     op->curr_sent_bytes += n;
> > > +                     if (op->curr_sent_bytes == BUF_SIZE) {
> > > +                             op->curr_sent_buf =
> > > +                                 list_entry(op->curr_sent_buf->l.next,
> > struct rbuf, l);
> > > +                             op->nblocks++;
> > > +                             op->curr_sent_bytes = 0;
> > > +                     } else if (op->curr_sent_bytes ==
> > op->curr_sent_buf->nbytes) {
> > >                               if (close_fd)
> > >                                       close(fd);
> > > -                             return nblocks*BUF_SIZE +
> > rimg->curr_sent_buf->nbytes;
> > > +                             return op->nblocks*BUF_SIZE +
> > op->curr_sent_buf->nbytes;
> > >                       }
> > >               } else if (errno == EPIPE || errno == ECONNRESET) {
> > >                       pr_warn("Connection for %s:%s was closed early
> > than expected\n",
> > >                               rimg->path, rimg->snapshot_id);
> > >                       return 0;
> > > -             } else {
> > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > +                     return errno;
> > > +             }
> > > +             else {
> > >                       pr_perror("Write on %s:%s socket failed",
> > >                               rimg->path, rimg->snapshot_id);
> > >                       return -1;
> > > diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> > > index 1771d310..0947e7f0 100644
> > > --- a/criu/include/img-remote.h
> > > +++ b/criu/include/img-remote.h
> > > @@ -36,10 +36,6 @@ struct rimage {
> > >       char snapshot_id[PATHLEN];
> > >       struct list_head l;
> > >       struct list_head buf_head;
> > > -     /* Used to track already sent buffers when the image is appended.
> > */
> > > -     struct rbuf *curr_sent_buf;
> > > -     /* Similar to the previous field. Number of bytes sent in
> > 'curr_sent_buf'. */
> > > -     int curr_sent_bytes;
> > >       uint64_t size; /* number of bytes */
> > >       pthread_mutex_t in_use; /* Only one operation at a time, per
> > image. */
> > >  };
> > > @@ -57,6 +53,28 @@ struct wthread {
> > >       sem_t wakeup_sem;
> > >  };
> > >
> > > +/* Structure that describes the state of a remote operation on remote
> > images. */
> > > +struct roperation {
> > > +     /* File descriptor being used. */
> > > +     int fd;
> > > +     /* Remote image being used. */
> > > +     struct rimage *rimg;
> > > +     /* Flags for the operation. */
> > > +     int flags;
> > > +     /* If fd should be closed when the operation is done. */
> > > +     bool close_fd;
> > > +     /* Note: recv operation only. How much bytes should be received. */
> > > +     uint64_t size;
> > > +     /* Note: recv operation only. Buffer being writen. */
> > > +     struct rbuf *curr_recv_buf;
> > > +     /* Note: send operation only. Number of blocks already sent. */
> > > +     int nblocks;
> > > +     /* Note: send operation only. Pointer to buffer being sent. */
> > > +     struct rbuf *curr_sent_buf;
> > > +     /* Note: send operation only. Number of bytes sent in
> > 'curr_send_buf. */
> > > +     uint64_t curr_sent_bytes;
> > > +};
> > > +
> > >  /* This variable is used to indicate when the dump is finished. */
> > >  extern bool finished;
> > >  /* This is the proxy to cache TCP socket FD. */
> > > @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
> > >
> > >  int64_t forward_image(struct rimage *rimg);
> > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > image_check);
> > > +int64_t send_image_async(struct roperation *op);
> > >  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > flags, bool image_check);
> > > +int64_t recv_image_async(struct roperation *op);
> > >
> > >  int64_t read_remote_header(int fd, char *snapshot_id, char *path, int
> > *open_mode, uint64_t *size);
> > >  int64_t write_remote_header(int fd, char *snapshot_id, char *path, int
> > open_mode, uint64_t size);
> > > --
> > > 2.17.0
> > >
> > > _______________________________________________
> > > CRIU mailing list
> > > CRIU@openvz.org
> > > https://lists.openvz.org/mailman/listinfo/criu
> >
Rodrigo Bruno May 22, 2018, 9:14 p.m.
Thanks, that would be great and then we could work on top of that!

Let me know when the merge is done.

best,
rodrigo

2018-05-16 6:35 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:

> On Tue, May 15, 2018 at 08:52:03AM +0100, Rodrigo Bruno wrote:
> > Hi Andrei,
> >
> > thank you.
> >
> > So we can start working on fixing bugs in order for the zdtm tests to
> pass
> > sending patches against
> > https://github.com/avagin/criu/tree/remote ?
>
> I'm going to merge these patches into criu-dev soon.
>
> >
> > cheers,
> > rodrigo
> >
> > 2018-05-15 7:49 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:
> >
> > >
> > > I rebased these patches and added a few minor fixes:
> > > https://github.com/avagin/criu/tree/remote
> > >
> > > On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> > > > From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>
> > > >
> > > > ---
> > > >  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++-
> ---------
> > > >  criu/include/img-remote.h | 28 ++++++++++--
> > > >  2 files changed, 93 insertions(+), 28 deletions(-)
> > > >
> > > > diff --git a/criu/img-remote.c b/criu/img-remote.c
> > > > index f812c52d..70db71e2 100644
> > > > --- a/criu/img-remote.c
> > > > +++ b/criu/img-remote.c
> > > > @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char
> *path,
> > > char *snapshot_id)
> > > >       buf->nbytes = 0;
> > > >       INIT_LIST_HEAD(&(rimg->buf_head));
> > > >       list_add_tail(&(buf->l), &(rimg->buf_head));
> > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > rbuf, l);
> > > > -     rimg->curr_sent_bytes = 0;
> > > >
> > > >       if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> > > >               pr_err("Remote image in_use mutex init failed\n");
> > > > @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct
> > > rimage *rimg)
> > > >
> > > >       list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> > > >       rimg->size = 0;
> > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > rbuf, l);
> > > > -     rimg->curr_sent_bytes = 0;
> > > >
> > > >       pthread_mutex_unlock(&(rimg->in_use));
> > > >
> > > > @@ -669,18 +665,43 @@ err:
> > > >       return NULL;
> > > >  }
> > > >
> > > > +
> > > > +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool close_fd)
> > > > +{
> > > > +     int ret;
> > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > +     bzero(op, sizeof(struct roperation));
> > > > +     op->fd = fd;
> > > > +     op->rimg = rimg;
> > > > +     op->size = size;
> > > > +     op->flags = flags;
> > > > +     op->close_fd = close_fd;
> > > > +     op->curr_recv_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     while ((ret = recv_image_async(op)) < 0)
> > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > +                     return -1;
> > > > +     return ret;
> > > > +}
> > > > +
> > > >  /* Note: size is a limit on how much we want to read from the
> socket.
> > > Zero means
> > > >   * read until the socket is closed.
> > > >   */
> > > > -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool close_fd)
> > > > +int64_t recv_image_async(struct roperation *op)
> > > >  {
> > > > -     struct rbuf *curr_buf = NULL;
> > > > +     int fd = op->fd;
> > > > +     struct rimage *rimg = op->rimg;
> > > > +     uint64_t size = op->size;
> > > > +     int flags = op->flags;
> > > > +     bool close_fd = op->close_fd;
> > > > +     struct rbuf *curr_buf = op->curr_recv_buf;
> > > >       int n;
> > > >
> > > > -     if (flags == O_APPEND)
> > > > -             curr_buf = list_entry(rimg->buf_head.prev, struct
> rbuf,
> > > l);
> > > > -     else
> > > > -             curr_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     if (curr_buf == NULL) {
> > > > +             if (flags == O_APPEND)
> > > > +                     curr_buf = list_entry(rimg->buf_head.prev,
> struct
> > > rbuf, l);
> > > > +             else
> > > > +                     curr_buf = list_entry(rimg->buf_head.next,
> struct
> > > rbuf, l);
> > > > +     }
> > > >
> > > >       while (1) {
> > > >               n = read(fd,
> > > > @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > uint64_t size, int flags, bool c
> > > >                                       close(fd);
> > > >                               return rimg->size;
> > > >                       }
> > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > +                     return errno;
> > > >               } else {
> > > >                       pr_perror("Read on %s:%s socket failed",
> > > >                               rimg->path, rimg->snapshot_id);
> > > > @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > uint64_t size, int flags, bool c
> > > >
> > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > close_fd)
> > > >  {
> > > > +     int ret;
> > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > +     bzero(op, sizeof(struct roperation));
> > > > +     op->fd = fd;
> > > > +     op->rimg = rimg;
> > > > +     op->flags = flags;
> > > > +     op->close_fd = close_fd;
> > > > +     op->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     while ((ret = send_image_async(op)) < 0)
> > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > +                     return -1;
> > > > +     return ret;
> > > > +}
> > > >
> > > > -     int n, nblocks = 0;
> > > > +int64_t send_image_async(struct roperation *op)
> > > > +{
> > > > +     int fd = op->fd;
> > > > +     struct rimage *rimg = op->rimg;
> > > > +     int flags = op->flags;
> > > > +     bool close_fd = op->close_fd;
> > > > +     int n;
> > > >
> > > >       if (flags != O_APPEND) {
> > > > -             rimg->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > struct rbuf, l);
> > > > -             rimg->curr_sent_bytes = 0;
> > > > +             op->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > struct rbuf, l);
> > > > +             op->curr_sent_bytes = 0;
> > > >       }
> > > >
> > > >       while (1) {
> > > >               n = send(
> > > >                   fd,
> > > > -                 rimg->curr_sent_buf->buffer +
> rimg->curr_sent_bytes,
> > > > -                 min(BUF_SIZE, rimg->curr_sent_buf->nbytes) -
> > > rimg->curr_sent_bytes,
> > > > +                 op->curr_sent_buf->buffer + op->curr_sent_bytes,
> > > > +                 min(BUF_SIZE, op->curr_sent_buf->nbytes) -
> > > op->curr_sent_bytes,
> > > >                   MSG_NOSIGNAL);
> > > >               if (n > -1) {
> > > > -                     rimg->curr_sent_bytes += n;
> > > > -                     if (rimg->curr_sent_bytes == BUF_SIZE) {
> > > > -                             rimg->curr_sent_buf =
> > > > -                                 list_entry(rimg->curr_sent_
> buf->l.next,
> > > struct rbuf, l);
> > > > -                             nblocks++;
> > > > -                             rimg->curr_sent_bytes = 0;
> > > > -                     } else if (rimg->curr_sent_bytes ==
> > > rimg->curr_sent_buf->nbytes) {
> > > > +                     op->curr_sent_bytes += n;
> > > > +                     if (op->curr_sent_bytes == BUF_SIZE) {
> > > > +                             op->curr_sent_buf =
> > > > +                                 list_entry(op->curr_sent_buf->
> l.next,
> > > struct rbuf, l);
> > > > +                             op->nblocks++;
> > > > +                             op->curr_sent_bytes = 0;
> > > > +                     } else if (op->curr_sent_bytes ==
> > > op->curr_sent_buf->nbytes) {
> > > >                               if (close_fd)
> > > >                                       close(fd);
> > > > -                             return nblocks*BUF_SIZE +
> > > rimg->curr_sent_buf->nbytes;
> > > > +                             return op->nblocks*BUF_SIZE +
> > > op->curr_sent_buf->nbytes;
> > > >                       }
> > > >               } else if (errno == EPIPE || errno == ECONNRESET) {
> > > >                       pr_warn("Connection for %s:%s was closed early
> > > than expected\n",
> > > >                               rimg->path, rimg->snapshot_id);
> > > >                       return 0;
> > > > -             } else {
> > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > +                     return errno;
> > > > +             }
> > > > +             else {
> > > >                       pr_perror("Write on %s:%s socket failed",
> > > >                               rimg->path, rimg->snapshot_id);
> > > >                       return -1;
> > > > diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> > > > index 1771d310..0947e7f0 100644
> > > > --- a/criu/include/img-remote.h
> > > > +++ b/criu/include/img-remote.h
> > > > @@ -36,10 +36,6 @@ struct rimage {
> > > >       char snapshot_id[PATHLEN];
> > > >       struct list_head l;
> > > >       struct list_head buf_head;
> > > > -     /* Used to track already sent buffers when the image is
> appended.
> > > */
> > > > -     struct rbuf *curr_sent_buf;
> > > > -     /* Similar to the previous field. Number of bytes sent in
> > > 'curr_sent_buf'. */
> > > > -     int curr_sent_bytes;
> > > >       uint64_t size; /* number of bytes */
> > > >       pthread_mutex_t in_use; /* Only one operation at a time, per
> > > image. */
> > > >  };
> > > > @@ -57,6 +53,28 @@ struct wthread {
> > > >       sem_t wakeup_sem;
> > > >  };
> > > >
> > > > +/* Structure that describes the state of a remote operation on
> remote
> > > images. */
> > > > +struct roperation {
> > > > +     /* File descriptor being used. */
> > > > +     int fd;
> > > > +     /* Remote image being used. */
> > > > +     struct rimage *rimg;
> > > > +     /* Flags for the operation. */
> > > > +     int flags;
> > > > +     /* If fd should be closed when the operation is done. */
> > > > +     bool close_fd;
> > > > +     /* Note: recv operation only. How much bytes should be
> received. */
> > > > +     uint64_t size;
> > > > +     /* Note: recv operation only. Buffer being writen. */
> > > > +     struct rbuf *curr_recv_buf;
> > > > +     /* Note: send operation only. Number of blocks already sent. */
> > > > +     int nblocks;
> > > > +     /* Note: send operation only. Pointer to buffer being sent. */
> > > > +     struct rbuf *curr_sent_buf;
> > > > +     /* Note: send operation only. Number of bytes sent in
> > > 'curr_send_buf. */
> > > > +     uint64_t curr_sent_bytes;
> > > > +};
> > > > +
> > > >  /* This variable is used to indicate when the dump is finished. */
> > > >  extern bool finished;
> > > >  /* This is the proxy to cache TCP socket FD. */
> > > > @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
> > > >
> > > >  int64_t forward_image(struct rimage *rimg);
> > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > image_check);
> > > > +int64_t send_image_async(struct roperation *op);
> > > >  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool image_check);
> > > > +int64_t recv_image_async(struct roperation *op);
> > > >
> > > >  int64_t read_remote_header(int fd, char *snapshot_id, char *path,
> int
> > > *open_mode, uint64_t *size);
> > > >  int64_t write_remote_header(int fd, char *snapshot_id, char *path,
> int
> > > open_mode, uint64_t size);
> > > > --
> > > > 2.17.0
> > > >
> > > > _______________________________________________
> > > > CRIU mailing list
> > > > CRIU@openvz.org
> > > > https://lists.openvz.org/mailman/listinfo/criu
> > >
>
Andrei Vagin May 23, 2018, 12:19 a.m.
Rodrigo and Radostin,

I've pushed patches into the criu-dev branch.

Radostin, if you don't know where to start, you can start from this
error:

$ python test/zdtm.py run -t zdtm/static/env00 --remote -f ns
=== Run 1/1 ================ zdtm/static/env00

========================= Run zdtm/static/env00 in ns ==========================
Start test
./env00 --pidfile=env00.pid --outfile=env00.out --envname=ENV_00_TEST
Adding image cache
Adding image proxy
Run criu dump
Run criu restore
=[log]=> dump/zdtm/static/env00/43/1/restore.log
------------------------ grep Error ------------------------
RTNETLINK answers: File exists
(00.091854)      1: do_open_remote_image RDONLY path=route-8.img snapshot_id=dump/zdtm/static/env00/43/1
(00.092096)      1: 	Running ip route restore
Failed to restore: ftell: Illegal seek
(00.105267)      1: Error (criu/util.c:842): exited, status=1
(00.105362)      1: Error (criu/net.c:1720): IP tool failed on route restore
(00.105402)      1: Error (criu/net.c:2434): Can't create net_ns
(00.105512)      1: Error (criu/util.c:1567): Can't wait or bad status: errno=0, status=65280
(00.105642) Error (criu/cr-restore.c:2358): Failed to switch restore stage to CR_STATE_PREPARE_NAMESPACES
(00.117497) Error (criu/mount.c:3175): mnt: Can't remove the directory /tmp/.criu.mntns.vnaMiK: No such file or directory
(00.117531) uns: calling exit_usernsd[0x468640] (-1, 1)
(00.117607) uns: daemon calls 0x468640 (63, -1, 1)
(00.117625) uns: `- daemon exits w/ 0
(00.119281) uns: daemon stopped
(00.119334) Error (criu/cr-restore.c:2568): Restoring FAILED.
------------------------ ERROR OVER ------------------------
################# Test zdtm/static/env00 FAIL at CRIU restore ##################
##################################### FAIL #####################################

Thanks,
Andrei


On Tue, May 22, 2018 at 10:14:24PM +0100, Rodrigo Bruno wrote:
> Thanks, that would be great and then we could work on top of that!
> 
> Let me know when the merge is done.
> 
> best,
> rodrigo
> 
> 2018-05-16 6:35 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:
> 
> > On Tue, May 15, 2018 at 08:52:03AM +0100, Rodrigo Bruno wrote:
> > > Hi Andrei,
> > >
> > > thank you.
> > >
> > > So we can start working on fixing bugs in order for the zdtm tests to
> > pass
> > > sending patches against
> > > https://github.com/avagin/criu/tree/remote ?
> >
> > I'm going to merge these patches into criu-dev soon.
> >
> > >
> > > cheers,
> > > rodrigo
> > >
> > > 2018-05-15 7:49 GMT+01:00 Andrei Vagin <avagin@virtuozzo.com>:
> > >
> > > >
> > > > I rebased these patches and added a few minor fixes:
> > > > https://github.com/avagin/criu/tree/remote
> > > >
> > > > On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> > > > > From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>
> > > > >
> > > > > ---
> > > > >  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++-
> > ---------
> > > > >  criu/include/img-remote.h | 28 ++++++++++--
> > > > >  2 files changed, 93 insertions(+), 28 deletions(-)
> > > > >
> > > > > diff --git a/criu/img-remote.c b/criu/img-remote.c
> > > > > index f812c52d..70db71e2 100644
> > > > > --- a/criu/img-remote.c
> > > > > +++ b/criu/img-remote.c
> > > > > @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char
> > *path,
> > > > char *snapshot_id)
> > > > >       buf->nbytes = 0;
> > > > >       INIT_LIST_HEAD(&(rimg->buf_head));
> > > > >       list_add_tail(&(buf->l), &(rimg->buf_head));
> > > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > > rbuf, l);
> > > > > -     rimg->curr_sent_bytes = 0;
> > > > >
> > > > >       if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> > > > >               pr_err("Remote image in_use mutex init failed\n");
> > > > > @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct
> > > > rimage *rimg)
> > > > >
> > > > >       list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> > > > >       rimg->size = 0;
> > > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > > rbuf, l);
> > > > > -     rimg->curr_sent_bytes = 0;
> > > > >
> > > > >       pthread_mutex_unlock(&(rimg->in_use));
> > > > >
> > > > > @@ -669,18 +665,43 @@ err:
> > > > >       return NULL;
> > > > >  }
> > > > >
> > > > > +
> > > > > +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > > flags, bool close_fd)
> > > > > +{
> > > > > +     int ret;
> > > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > > +     bzero(op, sizeof(struct roperation));
> > > > > +     op->fd = fd;
> > > > > +     op->rimg = rimg;
> > > > > +     op->size = size;
> > > > > +     op->flags = flags;
> > > > > +     op->close_fd = close_fd;
> > > > > +     op->curr_recv_buf = list_entry(rimg->buf_head.next, struct
> > rbuf,
> > > > l);
> > > > > +     while ((ret = recv_image_async(op)) < 0)
> > > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > > +                     return -1;
> > > > > +     return ret;
> > > > > +}
> > > > > +
> > > > >  /* Note: size is a limit on how much we want to read from the
> > socket.
> > > > Zero means
> > > > >   * read until the socket is closed.
> > > > >   */
> > > > > -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > > flags, bool close_fd)
> > > > > +int64_t recv_image_async(struct roperation *op)
> > > > >  {
> > > > > -     struct rbuf *curr_buf = NULL;
> > > > > +     int fd = op->fd;
> > > > > +     struct rimage *rimg = op->rimg;
> > > > > +     uint64_t size = op->size;
> > > > > +     int flags = op->flags;
> > > > > +     bool close_fd = op->close_fd;
> > > > > +     struct rbuf *curr_buf = op->curr_recv_buf;
> > > > >       int n;
> > > > >
> > > > > -     if (flags == O_APPEND)
> > > > > -             curr_buf = list_entry(rimg->buf_head.prev, struct
> > rbuf,
> > > > l);
> > > > > -     else
> > > > > -             curr_buf = list_entry(rimg->buf_head.next, struct
> > rbuf,
> > > > l);
> > > > > +     if (curr_buf == NULL) {
> > > > > +             if (flags == O_APPEND)
> > > > > +                     curr_buf = list_entry(rimg->buf_head.prev,
> > struct
> > > > rbuf, l);
> > > > > +             else
> > > > > +                     curr_buf = list_entry(rimg->buf_head.next,
> > struct
> > > > rbuf, l);
> > > > > +     }
> > > > >
> > > > >       while (1) {
> > > > >               n = read(fd,
> > > > > @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > > uint64_t size, int flags, bool c
> > > > >                                       close(fd);
> > > > >                               return rimg->size;
> > > > >                       }
> > > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > > +                     return errno;
> > > > >               } else {
> > > > >                       pr_perror("Read on %s:%s socket failed",
> > > > >                               rimg->path, rimg->snapshot_id);
> > > > > @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > > uint64_t size, int flags, bool c
> > > > >
> > > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > > close_fd)
> > > > >  {
> > > > > +     int ret;
> > > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > > +     bzero(op, sizeof(struct roperation));
> > > > > +     op->fd = fd;
> > > > > +     op->rimg = rimg;
> > > > > +     op->flags = flags;
> > > > > +     op->close_fd = close_fd;
> > > > > +     op->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > rbuf,
> > > > l);
> > > > > +     while ((ret = send_image_async(op)) < 0)
> > > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > > +                     return -1;
> > > > > +     return ret;
> > > > > +}
> > > > >
> > > > > -     int n, nblocks = 0;
> > > > > +int64_t send_image_async(struct roperation *op)
> > > > > +{
> > > > > +     int fd = op->fd;
> > > > > +     struct rimage *rimg = op->rimg;
> > > > > +     int flags = op->flags;
> > > > > +     bool close_fd = op->close_fd;
> > > > > +     int n;
> > > > >
> > > > >       if (flags != O_APPEND) {
> > > > > -             rimg->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > > struct rbuf, l);
> > > > > -             rimg->curr_sent_bytes = 0;
> > > > > +             op->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > > struct rbuf, l);
> > > > > +             op->curr_sent_bytes = 0;
> > > > >       }
> > > > >
> > > > >       while (1) {
> > > > >               n = send(
> > > > >                   fd,
> > > > > -                 rimg->curr_sent_buf->buffer +
> > rimg->curr_sent_bytes,
> > > > > -                 min(BUF_SIZE, rimg->curr_sent_buf->nbytes) -
> > > > rimg->curr_sent_bytes,
> > > > > +                 op->curr_sent_buf->buffer + op->curr_sent_bytes,
> > > > > +                 min(BUF_SIZE, op->curr_sent_buf->nbytes) -
> > > > op->curr_sent_bytes,
> > > > >                   MSG_NOSIGNAL);
> > > > >               if (n > -1) {
> > > > > -                     rimg->curr_sent_bytes += n;
> > > > > -                     if (rimg->curr_sent_bytes == BUF_SIZE) {
> > > > > -                             rimg->curr_sent_buf =
> > > > > -                                 list_entry(rimg->curr_sent_
> > buf->l.next,
> > > > struct rbuf, l);
> > > > > -                             nblocks++;
> > > > > -                             rimg->curr_sent_bytes = 0;
> > > > > -                     } else if (rimg->curr_sent_bytes ==
> > > > rimg->curr_sent_buf->nbytes) {
> > > > > +                     op->curr_sent_bytes += n;
> > > > > +                     if (op->curr_sent_bytes == BUF_SIZE) {
> > > > > +                             op->curr_sent_buf =
> > > > > +                                 list_entry(op->curr_sent_buf->
> > l.next,
> > > > struct rbuf, l);
> > > > > +                             op->nblocks++;
> > > > > +                             op->curr_sent_bytes = 0;
> > > > > +                     } else if (op->curr_sent_bytes ==
> > > > op->curr_sent_buf->nbytes) {
> > > > >                               if (close_fd)
> > > > >                                       close(fd);
> > > > > -                             return nblocks*BUF_SIZE +
> > > > rimg->curr_sent_buf->nbytes;
> > > > > +                             return op->nblocks*BUF_SIZE +
> > > > op->curr_sent_buf->nbytes;
> > > > >                       }
> > > > >               } else if (errno == EPIPE || errno == ECONNRESET) {
> > > > >                       pr_warn("Connection for %s:%s was closed early
> > > > than expected\n",
> > > > >                               rimg->path, rimg->snapshot_id);
> > > > >                       return 0;
> > > > > -             } else {
> > > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > > +                     return errno;
> > > > > +             }
> > > > > +             else {
> > > > >                       pr_perror("Write on %s:%s socket failed",
> > > > >                               rimg->path, rimg->snapshot_id);
> > > > >                       return -1;
> > > > > diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> > > > > index 1771d310..0947e7f0 100644
> > > > > --- a/criu/include/img-remote.h
> > > > > +++ b/criu/include/img-remote.h
> > > > > @@ -36,10 +36,6 @@ struct rimage {
> > > > >       char snapshot_id[PATHLEN];
> > > > >       struct list_head l;
> > > > >       struct list_head buf_head;
> > > > > -     /* Used to track already sent buffers when the image is
> > appended.
> > > > */
> > > > > -     struct rbuf *curr_sent_buf;
> > > > > -     /* Similar to the previous field. Number of bytes sent in
> > > > 'curr_sent_buf'. */
> > > > > -     int curr_sent_bytes;
> > > > >       uint64_t size; /* number of bytes */
> > > > >       pthread_mutex_t in_use; /* Only one operation at a time, per
> > > > image. */
> > > > >  };
> > > > > @@ -57,6 +53,28 @@ struct wthread {
> > > > >       sem_t wakeup_sem;
> > > > >  };
> > > > >
> > > > > +/* Structure that describes the state of a remote operation on
> > remote
> > > > images. */
> > > > > +struct roperation {
> > > > > +     /* File descriptor being used. */
> > > > > +     int fd;
> > > > > +     /* Remote image being used. */
> > > > > +     struct rimage *rimg;
> > > > > +     /* Flags for the operation. */
> > > > > +     int flags;
> > > > > +     /* If fd should be closed when the operation is done. */
> > > > > +     bool close_fd;
> > > > > +     /* Note: recv operation only. How much bytes should be
> > received. */
> > > > > +     uint64_t size;
> > > > > +     /* Note: recv operation only. Buffer being writen. */
> > > > > +     struct rbuf *curr_recv_buf;
> > > > > +     /* Note: send operation only. Number of blocks already sent. */
> > > > > +     int nblocks;
> > > > > +     /* Note: send operation only. Pointer to buffer being sent. */
> > > > > +     struct rbuf *curr_sent_buf;
> > > > > +     /* Note: send operation only. Number of bytes sent in
> > > > 'curr_send_buf. */
> > > > > +     uint64_t curr_sent_bytes;
> > > > > +};
> > > > > +
> > > > >  /* This variable is used to indicate when the dump is finished. */
> > > > >  extern bool finished;
> > > > >  /* This is the proxy to cache TCP socket FD. */
> > > > > @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
> > > > >
> > > > >  int64_t forward_image(struct rimage *rimg);
> > > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > > image_check);
> > > > > +int64_t send_image_async(struct roperation *op);
> > > > >  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > > flags, bool image_check);
> > > > > +int64_t recv_image_async(struct roperation *op);
> > > > >
> > > > >  int64_t read_remote_header(int fd, char *snapshot_id, char *path,
> > int
> > > > *open_mode, uint64_t *size);
> > > > >  int64_t write_remote_header(int fd, char *snapshot_id, char *path,
> > int
> > > > open_mode, uint64_t size);
> > > > > --
> > > > > 2.17.0
> > > > >
> > > > > _______________________________________________
> > > > > CRIU mailing list
> > > > > CRIU@openvz.org
> > > > > https://lists.openvz.org/mailman/listinfo/criu
> > > >
> >
Radostin Stoyanov May 23, 2018, 9:55 a.m.
On 23/05/18 01:19, Andrei Vagin wrote:
> Rodrigo and Radostin,
>
> I've pushed patches into the criu-dev branch.
>
> Radostin, if you don't know where to start, you can start from this
> error:
>
> $ python test/zdtm.py run -t zdtm/static/env00 --remote -f ns
Ok, thank you for the pointer!

Radostin