/* * PF - select and timing. * Copyright (c) by Tom Schouten * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */ /* This file contains all occurances of 'select' in the PF code, so i can keep an eye on multiplexing the IO and timing. There is basicly just one function: Select for activity on input = 'select-input' During this call, all pending output is written too. This puts all the tools in place to ensure pf won't block: -> Input : use 'input-select' -> Output : print to string first using 'serialize', then 'write-buffer' */ #define D if(0) #include #include #include #include #include #include #include #include #include #include #include // Darwin #ifndef RUSAGE_SELF #define RUSAGE_SELF 0 #endif #ifndef RUSAGE_CHILDREN #define RUSAGE_CHILDREN -1 #endif #include #include #include #include #include #include #include #include #include #include // protocol header size #define HEADER_SIZE 256 /* TIME */ static void ms_to_tv(float ms, struct timeval *tv){ tv->tv_sec = (time_t) (ms / 1000.0f); tv->tv_usec = (ms - (((float)tv->tv_sec) * 1000.0f)) * 1000.0f; } static float tv_to_ms(struct timeval *tv){ return ((float)tv->tv_usec) / 1000.0f + ((float)tv->tv_sec) * 1000.0f; } // can be used inplace static void tv_sub(struct timeval *a_tv, struct timeval *b_tv, struct timeval *c_tv){ a_tv->tv_sec = b_tv->tv_sec - c_tv->tv_sec; a_tv->tv_usec = b_tv->tv_usec - c_tv->tv_usec; if (a_tv->tv_usec < 0){ a_tv->tv_usec += 1000000; a_tv->tv_sec--; } } static void tv_add(struct timeval *a_tv, struct timeval *b_tv, struct timeval *c_tv){ a_tv->tv_sec = b_tv->tv_sec + c_tv->tv_sec; a_tv->tv_usec = b_tv->tv_usec + c_tv->tv_usec; if (a_tv->tv_usec > 1000000){ a_tv->tv_usec -= 1000000; a_tv->tv_sec++; } } static pf_error_t vm_ms_to_tv(pf_vm_t *vm, float ms, struct timeval *tv){ if (ms < 0.0) { THROW(e_inval, "negative time delay"); } ms_to_tv(ms, tv); return e_ok; } static pf_error_t vm_arg_to_tv(pf_vm_t *vm, pf_atom_t *arg, struct timeval *tv){ float ms = INTORFLOAT(arg); return vm_ms_to_tv(vm, ms, tv); } #define TIMEVAL(tv, arg) struct timeval tv; \ {pf_error_t e = vm_arg_to_tv(vm, arg, &tv); if (e) return e;} // global timer static struct timeval mark_time; /* FIXME: these do user time only.. maybe need a wall clock timer too */ static PF_PRIMITIVE(pf_time_reset) { struct rusage u; getrusage(RUSAGE_SELF, &u); memcpy(&mark_time, &u.ru_utime, sizeof(struct timeval)); EXIT; } static PF_PRIMITIVE(pf_time_elapsed) { struct rusage u; struct timeval t; getrusage(RUSAGE_SELF, &u); t.tv_sec = u.ru_utime.tv_sec - mark_time.tv_sec; t.tv_usec = u.ru_utime.tv_usec - mark_time.tv_usec; if (t.tv_usec < 0){ t.tv_usec += 1000000; t.tv_sec--; } pf_post("user time elapsed: %d.%02d seconds.\n", t.tv_sec, t.tv_usec/10000); EXIT; } // the resolution here depends on what's used in pf_vm_schedule() // better to sync on external timer using OS events static PF_PRIMITIVE(sleep_blocking_start){ CHECKN(1); struct timeval endtime; float ms = INTORFLOAT(ARG0); pf_error_t e; struct timeval duration; if (e = vm_ms_to_tv(vm, ms, &duration)) return e; gettimeofday(&endtime, NULL); tv_add(&endtime, &endtime, &duration); DROP; PUSH_INT(endtime.tv_usec); PUSH_INT(endtime.tv_sec); EXIT; } static PF_PRIMITIVE(sleep_blocking_task){ struct timeval endtime; // continue CHECKN(2); struct timeval now; endtime.tv_sec = INT(ARG0); endtime.tv_usec = INT(ARG1); // check if we're done gettimeofday(&now, NULL); tv_sub(&now, &endtime, &now); // no -> block some more if (now.tv_sec >= 0){ return e_ok; } return e_eof; // hmmm } /* I/O POLLING */ static pf_error_t thing_stream(pf_vm_t *vm, pf_atom_t *thing, pf_stream_t **pstream){ // get head if it's a list if ((a_list == thing->t)) { thing = thing->w.w_list->first; } if (!thing) return e_inval; *pstream = STREAM(thing); EXIT; } static pf_error_t streamlist_to_fdset(pf_vm_t *vm, pf_list_t *streamlist, fd_set *set, int *fd_max){ pf_atom_t *a; pf_error_t e; pf_stream_t *stream; int fd; FD_ZERO(set); for (a = streamlist->first; a; a=a->next){ if (e = thing_stream(vm, a, &stream)) return e; fd = pf_stream_fd(stream); if (FD_ISSET(fd, set)){ THROW(e_inval, "list contains duplicate stream"); } if (fd > *fd_max) *fd_max = fd; if (fd) FD_SET(fd, set); } EXIT; } // if this function returns without error, the streamlist is empty static pf_error_t split_lists(pf_vm_t *vm, pf_list_t *streamlist, fd_set *set, pf_list_t *ready, pf_list_t *not_ready){ while(streamlist->first){ pf_stream_t *stream; pf_error_t e; int fd; if (e = thing_stream(vm, streamlist->first, &stream)) return e; fd = pf_stream_fd(stream); pf_list_push_atom((FD_ISSET(fd, set) ? ready : not_ready), pf_list_pop_atom(streamlist)); } EXIT; } static pf_error_t vm_select(pf_vm_t *vm, int fdmaxp1, fd_set *in, fd_set *out, fd_set *error, struct timeval *tv){ again: if (-1 == select(fdmaxp1, in, out, error, tv)){ switch(errno){ case EINTR: // at this point the signal handler has registered the // interrupt, so we can safely restart. goto again; default: THROW(e_file, "select() : %s", strerror(errno)); } } EXIT; } /* Select wrapper. The input lists can contain [see get_stream()] - stream atoms - list with first element a stream atom this structure is preserved in the ready/idle output lists so it enables pre-binding data or actions to a stream. */ static PF_PRIMITIVE(select_ioe){ CHECKN(2); TIMEVAL(tv, ARG0); struct timeval zero_tv = {0,0}; int fd_max = 0; fd_set in_set; fd_set out_set; fd_set err_set; pf_list_t *in_list, *out_list, *err_list; pf_list_t *all = LIST(ARG1); { pf_list_t *s = all; CHECKN(3); in_list = LIST(ARG0); out_list = LIST(ARG1); err_list = LIST(ARG2); } pf_error_t e; /* build sets */ if (e = streamlist_to_fdset(vm, in_list, &in_set, &fd_max)) return e; if (e = streamlist_to_fdset(vm, out_list, &out_set, &fd_max)) return e; if (e = streamlist_to_fdset(vm, err_list, &err_set, &fd_max)) return e; /* select */ if (e = vm_select(vm, fd_max+1, &in_set, &out_set, &err_set, &tv)) return e; /* create containers */ pf_list_t *ready = pf_list_new(); pf_list_t *err_ready = pf_list_new(); pf_list_push_list(ready, err_ready); pf_list_t *out_ready = pf_list_new(); pf_list_push_list(ready, out_ready); pf_list_t *in_ready = pf_list_new(); pf_list_push_list(ready, in_ready); pf_list_t *idle = pf_list_new(); pf_list_t *err_idle = pf_list_new(); pf_list_push_list(idle, err_idle); pf_list_t *out_idle = pf_list_new(); pf_list_push_list(idle, out_idle); pf_list_t *in_idle = pf_list_new(); pf_list_push_list(idle, in_idle); /* split in ready/idle */ if (e = split_lists(vm, in_list, &in_set, in_ready, in_idle)) goto error; if (e = split_lists(vm, out_list, &out_set, out_ready, out_idle)) goto error; if (e = split_lists(vm, err_list, &err_set, err_ready, err_idle)) goto error; /* rearrange stacks, and make sure that on error stack layout is * the same as on entry */ error: if (!e) { DROP2; } // if error, stack layout is same as on entry. PUSH_LIST(ready); PUSH_LIST(idle); return e; } /* SETUP CODE */ #define REGISTER_PRIMITIVE PF_REGISTER_FUNCTION PF_PRIMITIVE(pf_forth_select_setup) { REGISTER_PRIMITIVE(select_ioe, "select-i/o/e", "( select-i/o/e msec -- active idle )\t" "Split a list of stream lists (input output error) into a list of " "active and idle streams with the same structure."); REGISTER_PRIMITIVE(sleep_blocking_start, "sleep-blocking-start", "( ms -- x x )\t" "Start blocking sleep."); REGISTER_PRIMITIVE(sleep_blocking_task, "sleep-blocking-task", "( x x -- x x )\t" "Continue blocking sleep."); REGISTER_PRIMITIVE(pf_time_reset, "time-reset", "( -- )\t" "Reset global timer."); REGISTER_PRIMITIVE(pf_time_elapsed, "time-elapsed", "( -- )\t" "Print elapsed user time since last reset."); EXIT; }