[4/5] Minor improvements on img-remote.c

Submitted by Rodrigo Bruno on May 14, 2018, 12:29 a.m.

Details

Message ID 20180514002951.5086-4-rbruno@gsd.inesc-id.pt
State Accepted
Series "Series without cover letter"
Commit 23a83725b4f60432b5f531cb64bf6c446345d8a1
Headers show

Commit Message

Rodrigo Bruno May 14, 2018, 12:29 a.m.
From: Rodrigo Bruno <rbruno@gsd.inesc-id.pt>

---
 criu/img-remote.c | 155 +++++++++++++++++++++++-----------------------
 1 file changed, 77 insertions(+), 78 deletions(-)

Patch hide | download patch | download mbox

diff --git a/criu/img-remote.c b/criu/img-remote.c
index ec69bc02..37466231 100644
--- a/criu/img-remote.c
+++ b/criu/img-remote.c
@@ -44,14 +44,26 @@  LIST_HEAD(rop_forwarding);
 // List of snapshots (useful when doing incremental restores/dumps
 LIST_HEAD(snapshot_head);
 
+// Snapshot id (setup at launch time by dump or restore).
 static char *snapshot_id;
-bool restoring = true; // TODO - check where this is used!
-// TODO - split this into two vars, recv_from_proxy, send_to_cache
-bool forwarding = false; // TODO - true if proxy_to_cache_fd is being used.
+
+// True if restoring (cache := true; proxy := false).
+bool restoring = true;
+
+// True if the proxy to cache socket is being used (receiving or sending).
+bool forwarding = false;
+
+// True if the local dump or restore is finished.
 bool finished_local = false;
+
+// True if the communication between the proxy and cache can be closed.
 bool finished_remote = false;
+
+// Proxy to cache socket fd; Local dump or restore servicing fd.
 int proxy_to_cache_fd;
 int local_req_fd;
+
+// Epoll fd and event array.
 int epoll_fd;
 struct epoll_event *events;
 
@@ -455,7 +467,7 @@  static struct rimage *clear_remote_image(struct rimage *rimg)
 	return rimg;
 }
 
-void handle_accept_write(
+struct roperation* handle_accept_write(
     int cli_fd, char* snapshot_id, char* path, int flags, bool close_fd, uint64_t size)
 {
     struct roperation *rop = NULL;
@@ -482,25 +494,24 @@  void handle_accept_write(
 
     rop_set_rimg(rop, rimg);
 	rop->size = size;
-    list_add_tail(&(rop->l), &rop_inprogress);
-	event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
-	return;
+	return rop;
 err:
     free(rimg);
     free(rop);
+	return NULL;
 }
 
-void handle_accept_proxy_write(
+struct roperation* handle_accept_proxy_write(
 	int cli_fd, char* snapshot_id, char* path, int flags)
 {
-	handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
+	return handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
 }
 
-void handle_accept_proxy_read(
+struct roperation* handle_accept_proxy_read(
     int cli_fd, char* snapshot_id, char* path, int flags)
 {
     struct roperation *rop = NULL;
-	struct rimage *rimg = NULL;
+	struct rimage *rimg    = NULL;
 
     rimg = get_rimg_by_name(snapshot_id, path);
 
@@ -511,40 +522,40 @@  void handle_accept_proxy_read(
 			pr_perror("Error writing reply header for unexisting image");
 		    goto err;
         }
+    	close(cli_fd);
+		return NULL;
 	}
-	else {
-		if (write_reply_header(cli_fd, 0) < 0) {
-			pr_perror("Error writing reply header for %s:%s",
-				path, snapshot_id);
-			goto err;
-		}
 
-        rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
-        if (rop == NULL) {
-			pr_perror("Error preparing remote operation");
-            goto err;
-        }
-        rop_set_rimg(rop, rimg);
-        list_add_tail(&(rop->l), &rop_inprogress);
-        event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+	if (write_reply_header(cli_fd, 0) < 0) {
+		pr_perror("Error writing reply header for %s:%s",
+			path, snapshot_id);
+		goto err;
 	}
-	return;
+
+    rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
+    if (rop == NULL) {
+		pr_perror("Error preparing remote operation");
+        goto err;
+    }
+
+	rop_set_rimg(rop, rimg);
+	return rop;
 err:
     close(cli_fd);
+	return NULL;
 }
 
 void finish_local()
 {
 	int ret;
 	finished_local = true;
-	//shutdown(local_req_fd, SHUT_RD); //TODO - should this be removed?
 	ret = event_set(epoll_fd, EPOLL_CTL_DEL, local_req_fd, 0, 0);
 	if (ret) {
 		pr_perror("Failed to del local fd from epoll");
 	}
 }
 
-void handle_accept_cache_read(
+struct roperation* handle_accept_cache_read(
     int cli_fd, char* snapshot_id, char* path, int flags)
 {
     struct rimage     *rimg = NULL;
@@ -554,14 +565,14 @@  void handle_accept_cache_read(
 	if (!strncmp(path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
 		close(cli_fd);
 		finish_local();
-		return;
+		return NULL;
 	}
 
     rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
     if (rop == NULL) {
 	    pr_perror("Error preparing remote operation");
         close(cli_fd);
-		return;
+		return NULL;
     }
 
 	// Check if we already have the image.
@@ -574,28 +585,25 @@  void handle_accept_cache_read(
 			close(rop->fd);
 		}
         rop_set_rimg(rop, rimg);
-		list_add_tail(&(rop->l), &rop_inprogress);
-        event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
-	}
-	// The file may exist in future.
-	else if (!finished_remote){
-		list_add_tail(&(rop->l), &rop_pending);
+		return rop;
 	}
 	// The file does not exist.
-	else {
+	else if (finished_remote) {
 		pr_info("No image %s:%s.\n", path, snapshot_id);
 		if (write_reply_header(cli_fd, ENOENT) < 0)
 			pr_perror("Error writing reply header for unexisting image");
         free(rop);
 		close(cli_fd);
 	}
+	return NULL;
 }
 
 void forward_remote_image(struct roperation* rop)
 {
 	uint64_t ret = 0;
+
     // Set blocking during the setup.
-//    socket_set_blocking(rop->fd); // TODO - test
+    socket_set_blocking(rop->fd);
 
 	ret = write_remote_header(
 		rop->fd, rop->snapshot_id, rop->path, rop->flags, rop->size);
@@ -614,7 +622,7 @@  void forward_remote_image(struct roperation* rop)
 
 
     // Go back to non-blocking
-//    socket_set_non_blocking(rop->fd); // TODO - test
+    socket_set_non_blocking(rop->fd);
 
 	forwarding = true;
     event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
@@ -627,9 +635,10 @@  void handle_remote_accept(int fd)
 	int flags;
     uint64_t size = 0;
     uint64_t ret;
+	struct roperation* rop = NULL;
 
     // Set blocking during the setup.
-//    socket_set_blocking(fd); // TODO - test!
+    socket_set_blocking(fd);
 
     ret = read_remote_header(fd, snapshot_id, path, &flags, &size);
 	if (ret < 0) {
@@ -644,7 +653,7 @@  void handle_remote_accept(int fd)
 	}
 
     // Go back to non-blocking
-//    socket_set_non_blocking(fd); // TODO - test!
+    socket_set_non_blocking(fd);
 
 	pr_info("[fd=%d] Received %s request for %s:%s with %lu bytes\n",
 		fd,
@@ -654,8 +663,13 @@  void handle_remote_accept(int fd)
 
 
 	forwarding = true;
-    handle_accept_write(fd, snapshot_id, path, flags, false, size);
-    return;
+    rop = handle_accept_write(fd, snapshot_id, path, flags, false, size);
+
+	if (rop != NULL) {
+		list_add_tail(&(rop->l), &rop_inprogress);
+		event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
+	}
+	return;
 err:
     close(fd);
 }
@@ -668,6 +682,7 @@  void handle_local_accept(int fd)
     int flags = 0;
 	struct sockaddr_in cli_addr;
 	socklen_t clilen = sizeof(cli_addr);
+	struct roperation *rop = NULL;
 
 	cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
 	if (cli_fd < 0) {
@@ -688,19 +703,32 @@  void handle_local_accept(int fd)
 
 	// Write/Append case (only possible in img-proxy).
 	if (flags != O_RDONLY) {
-        handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
+        rop = handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
 	}
 	// Read case while restoring (img-cache).
 	else if (restoring) {
-        handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
+        rop = handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
 	}
 	// Read case while dumping (img-proxy).
 	else {
-        handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
+        rop = handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
 	}
 
-    // Set socket non-blocking.
-    socket_set_non_blocking(cli_fd);
+    // If we have an operation. Check if we are ready to start or not.
+	if (rop != NULL) {
+		if (rop->rimg != NULL) {
+		    list_add_tail(&(rop->l), &rop_inprogress);
+		    event_set(
+				epoll_fd,
+				EPOLL_CTL_ADD,
+				rop->fd,
+				rop->flags == O_RDONLY ? EPOLLOUT : EPOLLIN,
+				rop);
+		} else {
+		    list_add_tail(&(rop->l), &rop_pending);
+		}
+		socket_set_non_blocking(rop->fd);
+	}
 
     return;
 err:
@@ -758,7 +786,6 @@  void finish_cache_write(struct roperation* rop)
     // Add image to list of images.
 	list_add_tail(&(rop->rimg->l), &rimg_head);
 
-    // TODO - what if we have multiple requests for the same name?
     if (prop != NULL) {
 		pr_info("\t[fd=%d] Resuming pending %s for %s:%s\n",
 			prop->fd,
@@ -947,31 +974,17 @@  void accept_image_connections() {
 
         // If both local and remote sockets are closed, leave.
         if (finished_local && finished_remote) {
-			pr_info("\tFinished both local and remote, exiting\n");
+			pr_info("Finished both local and remote, exiting\n");
             goto end;
 		}
     }
 end:
-	// TODO - release pending when no receiving and finished.
 	close(epoll_fd);
 	close(local_req_fd);
 	free(events);
 }
 
 
-int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
-{
-	int ret;
-    struct roperation *op = new_remote_operation(
-        rimg->path, rimg->snapshot_id, fd, flags, close_fd);
-    rop_set_rimg(op, rimg);
-	while ((ret = recv_image_async(op)) < 0)
-		if (ret != EAGAIN && ret != EWOULDBLOCK)
-			return -1;
-	free(op);
-	return ret;
-}
-
 /* Note: size is a limit on how much we want to read from the socket.  Zero means
  * read until the socket is closed.
  */
@@ -1026,18 +1039,6 @@  int64_t recv_image_async(struct roperation *op)
 	return n;
 }
 
-int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
-{
-	int ret;
-	struct roperation *op = new_remote_operation(
-        rimg->path, rimg->snapshot_id, fd, flags, close_fd);
-    rop_set_rimg(op, rimg);
-	while ((ret = send_image_async(op)) < 0)
-		if (ret != EAGAIN && ret != EWOULDBLOCK)
-			return -1;
-	return ret;
-}
-
 int64_t send_image_async(struct roperation *op)
 {
 	int fd = op->fd;
@@ -1058,7 +1059,6 @@  int64_t send_image_async(struct roperation *op)
 			op->curr_sent_bytes = 0;
 			return n;
 		}
-		// TODO - cloudn't we just compare to the img size?
 		else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
 			if (close_fd)
 				close(fd);
@@ -1066,7 +1066,6 @@  int64_t send_image_async(struct roperation *op)
 		}
 		return n;
 	}
-	// TODO - clouldn't these checks be made upstream?
 	else if (errno == EPIPE || errno == ECONNRESET) {
 		pr_warn("Connection for %s:%s was closed early than expected\n",
 			rimg->path, rimg->snapshot_id);