example_multi.c (2262B)
1 #include <fcntl.h> 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <string.h> 5 #include <sys/time.h> 6 #include <unistd.h> 7 8 #include "src/scheduler.h" 9 10 #define BUF_SIZE 64 11 12 typedef struct { 13 int reader_fd; 14 int writer_fd; 15 int fds_reader[2]; 16 char buf[BUF_SIZE]; 17 int done; 18 int64_t last_write; 19 } SharedCtx; 20 21 static int reader_task(int64_t ts, pt_task_t *task) { 22 (void)ts; 23 SharedCtx *ctx = task->udata; 24 25 int ready = sched_has_data(ctx->fds_reader); 26 if (ready >= 0) { 27 ssize_t n = read(ready, ctx->buf, BUF_SIZE - 1); 28 if (n > 0) { 29 ctx->buf[n] = '\0'; 30 printf("[reader] received: \"%s\"\n", ctx->buf); 31 } 32 if (n == 0 || (n > 0 && strncmp(ctx->buf, "done\n", 5) == 0)) { 33 ctx->done = 1; 34 } 35 } 36 37 if (ctx->done) { 38 close(ctx->reader_fd); 39 ctx->reader_fd = -1; 40 return SCHED_DONE; 41 } 42 43 return SCHED_RUNNING; 44 } 45 46 static int writer_task(int64_t ts, pt_task_t *task) { 47 SharedCtx *ctx = task->udata; 48 49 if (ctx->done) { 50 return SCHED_DONE; 51 } 52 53 if (ts - ctx->last_write >= 200) { 54 static int count = 0; 55 char msg[32]; 56 int len = snprintf(msg, sizeof(msg), "message #%d\n", ++count); 57 write(ctx->writer_fd, msg, len); 58 ctx->last_write = ts; 59 60 if (count >= 3) { 61 snprintf(msg, sizeof(msg), "done\n"); 62 write(ctx->writer_fd, msg, 4); 63 close(ctx->writer_fd); 64 ctx->writer_fd = -1; 65 } 66 } 67 68 return SCHED_RUNNING; 69 } 70 71 static int shutdown_task(int64_t ts, pt_task_t *task) { 72 (void)ts; 73 SharedCtx *ctx = task->udata; 74 75 if (ctx->done && ctx->reader_fd < 0 && ctx->writer_fd < 0) { 76 printf("[shutdown] all done, cleaning up.\n"); 77 free(ctx); 78 return SCHED_DONE; 79 } 80 81 return SCHED_RUNNING; 82 } 83 84 int main(void) { 85 int pipefd[2]; 86 if (pipe(pipefd) < 0) { 87 perror("pipe"); 88 return 1; 89 } 90 91 SharedCtx *ctx = calloc(1, sizeof(SharedCtx)); 92 ctx->reader_fd = pipefd[0]; 93 ctx->writer_fd = pipefd[1]; 94 ctx->fds_reader[0] = 1; 95 ctx->fds_reader[1] = ctx->reader_fd; 96 97 struct timeval now; 98 gettimeofday(&now, NULL); 99 ctx->last_write = (int64_t)now.tv_sec * 1000 + now.tv_usec / 1000; 100 101 sched_create(writer_task, ctx); 102 sched_create(reader_task, ctx); 103 sched_create(shutdown_task, ctx); 104 sched_main(); 105 return 0; 106 }