~martijnbraam/postprocessd

c8cc7f751efde68a96ef871af94f0e29eb3ef17d — Martijn Braam 2 years ago ff881cb
Sequentialize the processing

Create a postprocessd daemon when none is running and delegate all the
work to that daemon. Reuse the daemon if it exists.
1 files changed, 246 insertions(+), 149 deletions(-)

M main.c
M main.c => main.c +246 -149
@@ 4,189 4,286 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>

#include <sys/prctl.h>
#include "util.h"
#include "postprocess.h"

static char socket_path[100];

struct Job {
        pid_t pid;
        char burstdir[255];
        char target[255];
        int keep;
		pid_t pid;
		char burstdir[255];
		char target[255];
		int keep;
};


pid_t
void
start_processing(struct Job job)
{
    pid_t child_pid = fork();
    if (child_pid < 0) {
        err("fork failed");
    } else if (child_pid > 0) {
        // parent process
        return child_pid;
    } else {
        // child process
        postprocess_internal(job.burstdir, job.target, 1);
        exit(0);
    }
    return -1;
	postprocess_internal(job.burstdir, job.target, 1);
}

int
is_daemon_running()
{
	int sock;
	struct sockaddr_un addr;
	struct timeval tv;

	// Daemon isn't running if the socket doesn't exist
	if (!access(socket_path, F_OK)) {
		return 0;
	}

	// Check if the daemon responds on the socket
	sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
	if (sock < 0) {
		err("could not make socket fd");
		return 0;
	}

	// Set a short timeout on the socket
	tv.tv_sec = 0;
	tv.tv_usec = 500;
	setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv, sizeof tv);

	memset(&addr, 0, sizeof(struct sockaddr_un));
	addr.sun_family = AF_UNIX;
	strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
	if (connect(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) {
		return 0;
	}

	close(sock);
	fprintf(stderr, "[fg] daemon is already running\n");
	return 1;
}

int
listen_on_socket()
{
    int sock;
    struct sockaddr_un addr;

    // Clean existing socket
    if (remove(socket_path) == -1 && errno != ENOENT) {
        err("could not clean up old socket");
    }

    // Make new unix domain socket to listen on
    sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
    if (sock < 0) {
        err("could not make socket fd");
        return 0;
    }

    memset(&addr, 0, sizeof(struct sockaddr_un));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
    if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) {
        err("failed to bind socket");
        return 0;
    }

    if (listen(sock, 20) < 0) {
        err("failed to listen");
        return 0;
    }

    return sock;
	int sock;
	struct sockaddr_un addr;

	// Clean existing socket
	if (remove(socket_path) == -1 && errno != ENOENT) {
		err("could not clean up old socket");
	}

	// Make new unix domain socket to listen on
	sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
	if (sock < 0) {
		err("could not make socket fd");
		return 0;
	}

	memset(&addr, 0, sizeof(struct sockaddr_un));
	addr.sun_family = AF_UNIX;
	strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
	unlink(socket_path);
	if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) {
		err("failed to bind socket");
		return 0;
	}

	if (listen(sock, 20) < 0) {
		err("failed to listen");
		return 0;
	}

	return sock;
}

int
queue_job(struct Job job)
{
    int sock;
    struct sockaddr_un addr;

    sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
    if (sock < 0) {
        err("could not make socket fd");
        return 0;
    }

    memset(&addr, 0, sizeof(struct sockaddr_un));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
    if (connect(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) {
        err("failed to open socket");
        return 0;
    }
    if (write(sock, &job, sizeof(job)) < 0) {
        err("failed to write");
        return 0;
    }
    close(sock);
    return 1;
	int sock;
	struct sockaddr_un addr;
	char buffer[1024];

	sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
	if (sock < 0) {
		err("[fg] could not make socket fd");
		return 1;
	}

	memset(&addr, 0, sizeof(struct sockaddr_un));
	addr.sun_family = AF_UNIX;
	strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
	if (connect(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un)) < 0) {
		err("[fg] failed to open socket");
		return 2;
	}
	if (write(sock, &job, sizeof(job)) < 0) {
		err("[fg] failed to write");
		return 3;
	}
	fprintf(stderr, "[fg] wait until processing is done\n");
	// Wait until the background process return the resulting filename
	if (read(sock, buffer, 1024) < 1) {
		err("[fg] failed to read");
		return 4;
	}
	fprintf(stderr, "[fg] processing is done\n");

	// Pass the stacked filename to megapixels
	printf("%s\n", buffer);
	fprintf(stderr, "[fg] done\n");
	exit(0);
}

int
listen_and_convert(int do_fork)
start_background_process()
{
    int sock, fd;
    struct sockaddr_un cli_addr;
    unsigned int clilen;
    struct Job job;

    if (do_fork == 1) {
        pid_t child_pid = fork();
        if (child_pid < 0) {
            err("fork failed");
        } else if (child_pid > 0) {
            usleep(1000000);
            // parent process
            return 1;
        }
    }

    clilen = sizeof(cli_addr);

    postprocess_setup();
    sock = listen_on_socket();


    while(1) {
        fd = accept(sock, (struct sockaddr *)&cli_addr, &clilen);
        if (fd < 0) {
            err("failed to accept");
            return 0;
        }
        if(read(fd, &job, sizeof(job)) < 0) {
            err("failed to read");
            return 0;
        }
        close(fd);

        start_processing(job);
        wait(NULL);
    }
	int sock, fd;
	struct sockaddr_un cli_addr;
	unsigned int cli_len;
	struct Job job;
	char buffer[272];

	const char *name_fg = "postprocessd fg";
	const char *name_bg = "postprocessd bg";

	// First fork
	pid_t child_pid = fork();
	if (child_pid < 0) {
		err("fork failed");
	} else if (child_pid > 0) {
		prctl(PR_SET_NAME, (unsigned long) name_fg);
		// In the parent process
		waitpid(child_pid, NULL, WNOHANG);

		// Give the fork a bit of time to create the socket
		while (access(socket_path, F_OK)) {
			usleep(100);
		}
		usleep(1000);
		return 1;
	}

	// Create new process group
	setsid();

	// Second fork
	pid_t child2_pid = fork();
	if (child2_pid != 0) {
		// The middle child, exit quickly
		exit(0);
	}

	// We're now in the grandchild
	prctl(PR_SET_NAME, (unsigned long) name_bg);

	// Clean up FDs
	for (fd = sysconf(_SC_OPEN_MAX); fd > 0; --fd) {
		close(fd);
	}

	// Recreate standard pipes
#ifdef __GLIBC__
	stdin = fopen("/dev/null", "r");
	stdout = fopen("/dev/null", "w+");
	stderr = fopen("/dev/null", "w+");
#else
	freopen("/dev/null", "r", stdin);
	freopen("/dev/null", "w", stdout);
	freopen("/dev/null", "w", stderr);
#endif

	cli_len = sizeof(cli_addr);

	fprintf(stderr, "[bg] init postprocessd\n");
	postprocess_setup();
	sock = listen_on_socket();
	fprintf(stderr, "[bg] socket created\n");

	while (1) {
		fd = accept(sock, (struct sockaddr *) &cli_addr, &cli_len);
		if (fd < 0) {
			err("[bg] failed to accept");
			return 0;
		}
		fprintf(stderr, "[bg] accepted connection\n");
		if (read(fd, &job, sizeof(job)) < 0) {
			err("[bg] failed to read");
			return 0;
		}
		fprintf(stderr, "[bg] start processing job\n");
		start_processing(job);
		wait(NULL);
		fprintf(stderr, "[bg] job done\n");

		snprintf(buffer, sizeof(buffer), "%s.stacked.jpg", job.target);
		fprintf(stderr, "[bg] result: '%s'\n", buffer);
		if (write(fd, buffer, sizeof(buffer)) < 0) {
			err("[bg] failed to write response");
		}
		close(fd);
	}
}

void
make_socket_path()
{
    char fname[80];
    char *xdg_runtime_dir = getenv("XDG_RUNTIME_DIR");
    char *user = getenv("USER");
	char fname[80];
	char *xdg_runtime_dir = getenv("XDG_RUNTIME_DIR");
	char *user = getenv("USER");

    snprintf(fname, sizeof(fname), "postprocessd-%s.sock", user);
	snprintf(fname, sizeof(fname), "postprocessd-%s.sock", user);

    if (xdg_runtime_dir) {
        snprintf(socket_path, sizeof(socket_path), "%s/%s", xdg_runtime_dir, fname);
    } else {
        snprintf(socket_path, sizeof(socket_path), "/tmp/%s", fname);
    }
	if (xdg_runtime_dir) {
		snprintf(socket_path, sizeof(socket_path), "%s/%s", xdg_runtime_dir, fname);
	} else {
		snprintf(socket_path, sizeof(socket_path), "/tmp/%s", fname);
	}
	fprintf(stderr, "[fg] using socket '%s'\n", socket_path);
}

int
handle_job(struct Job job)
{
	/*
	 * There's two parts to postprocessd, the postprocessd binary is called
	 * by Megapixels and will check if there's already a daemon running.
	 * If there isn't then it will fork() a daemon process that opens a socket.
	 *
	 * The main process will connect to that process over the unix socket and
	 * send the task. Then it'll wait until the background process is completed
	 * so the filename can be returned to Megapixels. This also means that if
	 * multiple pictures are taken while processing there will be a few
	 * postprocessd processes running that only wait on the daemon without using
	 * CPU and the the daemon will process all the tasks SEQUENTIALLY until
	 * done and notify the right waiting processes the job is done.
	 */
	if (!is_daemon_running()) {
		fprintf(stderr, "[fg] starting new daemon\n");
		unlink(socket_path);
		start_background_process();
	}
	fprintf(stderr, "[fg] queueing job\n");
	return queue_job(job);
}

int
main(int argc, char *argv[])
{
    struct Job job;
    make_socket_path();

    if (argc == 4) {
        // Parse command line arguments into the job struct
        job.pid = 0;
        strncpy(job.burstdir, argv[1], sizeof(job.burstdir));
        strncpy(job.target, argv[2], sizeof(job.target));
        if (strcmp(argv[3], "0") == 0) {
            job.keep = 0;
        } else {
            job.keep = 1;
        }

        if(queue_job(job)) return 0;

        if(listen_and_convert(1))
            queue_job(job);
    } else if (argc == 2) {
        if (strcmp(argv[1], "--daemon") == 0) {
            listen_and_convert(0);
        }
    } else {
        printf("usage: %s burst-dir target-name keep\n", argv[0]);
        exit(1);
    }


    if(listen_and_convert(1))
        queue_job(job);

    return 0;
	struct Job job;
	make_socket_path();

	if (argc == 4) {
		// Parse command line arguments into the job struct
		job.pid = 0;
		strncpy(job.burstdir, argv[1], sizeof(job.burstdir));
		strncpy(job.target, argv[2], sizeof(job.target));
		if (strcmp(argv[3], "0") == 0) {
			job.keep = 0;
		} else {
			job.keep = 1;
		}

		return handle_job(job);
	} else {
		printf("usage: %s burst-dir target-name keep\n", argv[0]);
		exit(1);
	}
}