provide fork # default pf scheduler # currently, it's a very simple round-robin scheduler # FIXME: add suport for select and proper error handling # FIXME: add thread local dynamic variables + wind-protect # FIXME: a task should be an abstract data type, only known here. variable running variable waiting variable waiting-write variable waiting-read # currently saved dynamic variables () variable! dynamic-parameters : killall () running ! () waiting ! () waiting-write ! () waiting-read ! ; killall # ( n -- task ) take n itemes with empty return stack to create task : ds/rs>task 2 pack ; "( ds rs -- task )" doc : new-task pack () ds/rs>task ; # : swaptask postpone swap-rs/ds ; immediate # task swapping is split in 3 parts. the middle part will do the # return stack swapping, and is always inlined (as swapreturn) so the # saved return stack acts as a pure continuation, without any # reference to the swap routine itself: this simplifies initial # generation of tasks, since the RS can be empty. the pre and post # code can be put in a word: it's independent from the return stack. : swap-dynamic # ( new-dynvars -- old-dynvars ) dynamic-parameters @> swap-parameters swap swap-parameters dynamic-parameters ! ; # TASK == (RS DS (VAR value) ...) : __pre-swaptask # ( task -- [STUFF] next-rs ) split # just split of the RS so it can be swapped ; : __post-swaptask # ( [STUFF] prev-rs -- task ) >r # save old RS ((DS VARS)) uncons >r swapdata r> # swap ds # rest of TASK struct are dynamic variables: swap out the # previous ones, and swap in the current ones swap-dynamic cons r> unsplit ; : swaptask postpone __pre-swaptask postpone swapreturn postpone __post-swaptask ; immediate # task struct access : taskvar>rs head ; : taskvar>ds head follow ; : task-ds-cons # atom task -- task >r rsp taskvar>ds push r> ; : task-ds-uncons # task -- atom task >r rsp taskvar>ds pop r> ; # introducing a task-local variable. this moves the current value to # the dynamic-parameters stack, and leaves the variable undefined. : local-variable dup @> 2 list dynamic-parameters push ; ; "( variable -- )\tRegister a variable as task-local." doc : local-xt xt>body local-variable ;; ; "( xt -- )\tRegister an XT as task-local. Semantics (word/variable/...) are inherited, while contents is uninitialized." doc : wake-up swaptask drop ; # reschedule should do 'select' # currently, there are 4 projected scheduling reasons: a task gives up # control because it's waiting for an I/O event from the OS or another # PF task. this is probably sufficient. # : reschedule # block-microsleep # () waiting xchg running ! # running pop wake-up ; # no more running tasks, so need to reschedule. # 1. check the i/o tasks + schedule results # 2. schedule the polling tasks : >running { running push } for-each ; # remove first element from list (for stripping stream) : strip-buffers { >r rsp pop drop r> } map ; # get next task : next-task try running pop recover 0 leave endtry -1 ; # milliseconds maxium poll interval # hint: use a real (external) sync source. 10 variable! microsleep : reschedule () waiting xchg >running # add poll tasks first () waiting-write @> waiting-read @> 3 pack # list of 3 wait lists microsleep @ # maximum wait time select-i/o/e unpack waiting-read ! waiting-write ! drop # error list was empty in the first place unpack strip-buffers >running # read tasks strip-buffers >running # write tasks drop # error list ... running pop wake-up ; # print stack of task # whenever the current task blocks (e_idle) the scheduler is called, # and it transfers control to the next task. # transfer control to next task : finish try running pop recover e_underflow or-throw reschedule # no more tasks -> reschedule endtry wake-up ; : schedule-poll 0 new-task swaptask # wrap task waiting queue # save current task finish ; : schedule-read 1 new-task swaptask # wrap task, preserving 1 argument cons waiting-read queue finish ; : schedule-write 1 new-task swaptask # wrap task, preserving 1 argument cons waiting-write queue finish ; # blocking pop : pop-block try pop recover drop schedule-poll # wait till next round pass pop-block endtry ; "( listvar -- thing )\tLike 'pop', but block task if stack/queue is empty." doc # detach will terminate a word at the current point, but returns a # task that will continue at the current point. : detach (pack r> 1 pack ds/rs>task) interpret-list ; immediate "( n -- task )\tPinch off a task taking n arguments." doc : fork detach waiting queue ; "( n -- )\tFork of a background task taking n arguments." doc # this would be a lot more interesting if it were possible to wait # for another task to return a value. : spawn 1 + fork execute finish ; "( ... xt n -- )\tSpawn a task from xt and n arguments." doc : fork-sync detach running queue ; "( n -- )\tLike 'fork' but task will be executed immediately." doc # take whole DS # : sdetach (stack r> 1 pack 2 list) interpret-list ; immediate # : sfork sdetach waiting queue ; # buffered output implemented as tasks # note: if you have cyclic communication pipes between multiple PF # processes, it is necessary to use this to prevent deadlock. : write-atom-buffered >r serialize r> # serialize in current context 2 fork # spawn background task ( string stream -- ) print-atom finish ; # ERROR HANDLING variable aborted-task # restart aborted task : aborted aborted-task @> running push ; # FIXME: still a bit flakey: interrupt during scheduler run won't work # NOTE: can't use print inside scheduler, since it can call the # scheduler again (but only if output blocks). here i'll group all # print statments in a separate task, and fork it off. : fork-print-bt 0 fork aborted-task .task finish ; : schedule-abort 0 new-task swaptask aborted-task ! # save task # schedule task to print backtrace fork-print-bt finish ; # methods / objects are implemented on top of tasks, using the shallow # binding mechanism for methods. not optimal, but easy to implement. : link-method # \ symbol -- dup defined? if drop else link-defer then ; # implemented as deferred words registered as local xt's : method read link-method ; : method! dup local-xt defer! ; # behaviour shallowvar -- : read find method! ; : [handles] read find postpone literal postpone method! ; immediate ' ' [handles] interpret/compile: handles () variable! classes : class read classes push ; # classes need to be implemented in a single file : load-class # \ symbol -- dup defined? not if >string ".pf" concat load else drop then ; # tasks can be identified by their return stacks: these are lists # which are always swapped, nevery copied. this list pointer could # serve as a process ID. : start-scheduler ' schedule-abort >abort ' schedule-poll is block-poll ' schedule-read is block-read ' schedule-write is block-write 0 fork begin schedule-poll again ; # idle task # start-scheduler