scheduler.c

Basic task scheduling library
git clone git://git.finwo.net/lib/scheduler.c
Log | Files | Refs | README | LICENSE

example_multi.c (2262B)


      1 #include <fcntl.h>
      2 #include <stdio.h>
      3 #include <stdlib.h>
      4 #include <string.h>
      5 #include <sys/time.h>
      6 #include <unistd.h>
      7 
      8 #include "src/scheduler.h"
      9 
     10 #define BUF_SIZE 64
     11 
     12 typedef struct {
     13   int     reader_fd;
     14   int     writer_fd;
     15   int     fds_reader[2];
     16   char    buf[BUF_SIZE];
     17   int     done;
     18   int64_t last_write;
     19 } SharedCtx;
     20 
     21 static int reader_task(int64_t ts, pt_task_t *task) {
     22   (void)ts;
     23   SharedCtx *ctx = task->udata;
     24 
     25   int ready = sched_has_data(ctx->fds_reader);
     26   if (ready >= 0) {
     27     ssize_t n = read(ready, ctx->buf, BUF_SIZE - 1);
     28     if (n > 0) {
     29       ctx->buf[n] = '\0';
     30       printf("[reader] received: \"%s\"\n", ctx->buf);
     31     }
     32     if (n == 0 || (n > 0 && strncmp(ctx->buf, "done\n", 5) == 0)) {
     33       ctx->done = 1;
     34     }
     35   }
     36 
     37   if (ctx->done) {
     38     close(ctx->reader_fd);
     39     ctx->reader_fd = -1;
     40     return SCHED_DONE;
     41   }
     42 
     43   return SCHED_RUNNING;
     44 }
     45 
     46 static int writer_task(int64_t ts, pt_task_t *task) {
     47   SharedCtx *ctx = task->udata;
     48 
     49   if (ctx->done) {
     50     return SCHED_DONE;
     51   }
     52 
     53   if (ts - ctx->last_write >= 200) {
     54     static int count = 0;
     55     char       msg[32];
     56     int        len = snprintf(msg, sizeof(msg), "message #%d\n", ++count);
     57     write(ctx->writer_fd, msg, len);
     58     ctx->last_write = ts;
     59 
     60     if (count >= 3) {
     61       snprintf(msg, sizeof(msg), "done\n");
     62       write(ctx->writer_fd, msg, 4);
     63       close(ctx->writer_fd);
     64       ctx->writer_fd = -1;
     65     }
     66   }
     67 
     68   return SCHED_RUNNING;
     69 }
     70 
     71 static int shutdown_task(int64_t ts, pt_task_t *task) {
     72   (void)ts;
     73   SharedCtx *ctx = task->udata;
     74 
     75   if (ctx->done && ctx->reader_fd < 0 && ctx->writer_fd < 0) {
     76     printf("[shutdown] all done, cleaning up.\n");
     77     free(ctx);
     78     return SCHED_DONE;
     79   }
     80 
     81   return SCHED_RUNNING;
     82 }
     83 
     84 int main(void) {
     85   int pipefd[2];
     86   if (pipe(pipefd) < 0) {
     87     perror("pipe");
     88     return 1;
     89   }
     90 
     91   SharedCtx *ctx     = calloc(1, sizeof(SharedCtx));
     92   ctx->reader_fd     = pipefd[0];
     93   ctx->writer_fd     = pipefd[1];
     94   ctx->fds_reader[0] = 1;
     95   ctx->fds_reader[1] = ctx->reader_fd;
     96 
     97   struct timeval now;
     98   gettimeofday(&now, NULL);
     99   ctx->last_write = (int64_t)now.tv_sec * 1000 + now.tv_usec / 1000;
    100 
    101   sched_create(writer_task, ctx);
    102   sched_create(reader_task, ctx);
    103   sched_create(shutdown_task, ctx);
    104   sched_main();
    105   return 0;
    106 }