PreviousUpNext

15.4.918  src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.pkg

## timeout-mailop.pkg

# Compiled by:
#     src/lib/std/standard.lib



# Mail_Ops for synchronizing on timeouts.


stipulate
    package fat =  fate;                                                # fate                          is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                            # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mop =  mailop;                                              # mailop                        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
    package sch =  thread_scheduler;                                    # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
    package tim =  time;                                                # time                          is from   src/lib/std/time.pkg
    #
    Mailop(X) =  mop::Mailop(X);
herein

    package timeout_mailop: (weak)  api {
        #
        include Timeout_Mailop;                                         # Timeout_Mailop                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.api

        reset_sleep_queue_to_empty
            :
            Void -> Void;

        wake_sleeping_threads_whose_time_has_come
            :
            Void -> Void;

        time_until_next_sleeping_thread_wakes
            :
            Void -> Null_Or( tim::Time );
    }

    {


        # The list of threads waiting for timeouts.
        # It is sorted in increasing order
        # of time value.
        #
        # NOTE: We may want to use some sort of
        # balanced search package in the future.                XXX BUGGO FIXME
        #
        Item = ( tim::Time,
                 Void -> Void,
                 Ref (itt::Transaction_Id),
                 fat::Fate( Void )
               );
        #
        sleep_queue
            =
            REF ([]: List( Item ));


        fun time_wait (t, f, id, k)
            =
            sleep_queue := insert *sleep_queue
            where
                fun insert []
                        =>
                        [ (t, f, id, k) ];

                    insert ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
                        =>
                        # Drop cancelled transaction in passing:
                        #
                        insert rest;

                    insert (list as ((item as (t', _, _, _)) ! rest))
                       =>
                       tim::(<) (t', t)  ??  item ! insert rest
                                          ::  (t, f, id, k) ! list;
                end;
            end;


        # Drop all cancelled entries from itemlist.
        # Return cleaned list:
        #
        fun drop_cancelled_queue_entries  itemlist
            =
            drop_them  itemlist
            where
                fun drop_them ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
                        =>
                        drop_them  rest;

                    drop_them (item ! rest)
                        =>
                        item  !  drop_them  rest;

                    drop_them [] => [];
                end;
            end;


        # Find all sleeping threads whose
        # time has come and move them to
        # run queue.
        #
        # Return list of still-sleeping threads.
        #
        fun wake_and_remove_sleeping_threads_whose_time_has_come  q
            =
            wake_them q
            where
                now = sch::get_approximate_time ();

                fun wake_them ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
                        =>
                        wake_them rest;

                    wake_them (list as ((item as (t', f, transaction_id as REF (itt::TRANSACTION_ID tid), k)) ! rest))
                        =>
                        if (tim::(<=) (t', now))
                            #
                           sch::enqueue_thread (tid, k);
                           f ();                                # Cleanup function. 
                           wake_them rest;
                       else
                           drop_cancelled_queue_entries  list;
                       fi;

                   wake_them [] => [];
               end;
            end;


        fun time_until_next_sleeping_thread_wakes ()
            =
            case (drop_cancelled_queue_entries *sleep_queue)
                #
                []  =>  NULL;

                (q as ((t, _, _, _) ! _))
                    =>
                    {   now = sch::get_approximate_time ();

                        tim::(<=) (t, now)
                            ##
                            ??   THE (tim::zero_time)
                            ::   THE (tim::(-) (t, now));
                    };
            esac;


        fun wake_sleeping_threads_whose_time_has_come ()
            =
            case *sleep_queue
                #
                []     =>   ();
                queue  =>   sleep_queue
                                :=
                                wake_and_remove_sleeping_threads_whose_time_has_come
                                    queue;
            esac;


        fun reset_sleep_queue_to_empty ()
            =
            sleep_queue := [];


        # NOTE: Unlike for most base mail_ops, the
        # block functions of time-out mail_ops do not
        # have to exit the critical section or execute
        # the clean-up operation. This is done when
        # they are removed from the waiting queue.
        #
        fun timeout_in'  sleep_duration
            =
            itt::BASE_MAILOPS [ is_ready ]
            where
                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    { # now =    sch::get_approximate_time ();                                  # Replaced by below 2012-02-01 CrT because 100ms wait was coming back after 99ms, triggering 'make check' alarm.
                        now = tim::get_current_time_utc ();
                        #
                        fate::call_with_current_fate
                            (fn fate
                                =
                                {   time_wait
                                        ( tim::(+) (sleep_duration, now),
                                          clean_up,
                                          transaction_id,
                                          fate
                                        );

                                    next ();
                                }
                            );

                        sch::reenable_thread_switching ();
                    };

                fun is_ready ()                                                                 # Reppy calls this fn pollFn
                    =
                    if (sleep_duration == tim::zero_time)
                        #
                        itt::MAILOP_READY
                          { priority => -1,
                            do_it    => sch::reenable_thread_switching                          # Reppyc alls this field doFn
                          };
                    else
                        itt::MAILOP_UNREADY wait_for;
                    fi;

            end;


        fun sleep_for  sleep_duration
            =
            mop::do_mailop  (timeout_in'  sleep_duration);


        fun timeout_at'  wakeup_time
            =
            itt::BASE_MAILOPS [ is_ready ]
            where
                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    {   fate::call_with_current_fate
                            (
                             fn fate
                                =
                                {   time_wait (wakeup_time, clean_up, transaction_id, fate);
                                    #
                                    next ();
                                }
                            );

                        sch::reenable_thread_switching ();
                    };

                fun is_ready ()                                                                 # Reppy calls this fn pollFn
                    =
                    if (tim::(<=) (wakeup_time, sch::get_approximate_time ()))
                        #
                        itt::MAILOP_READY
                          { priority => -1,
                            do_it    => sch::reenable_thread_switching                          # Reppy calls this field doFn
                          };
                    else
                        itt::MAILOP_UNREADY  wait_for;
                    fi;
            end;


        fun sleep_until  wakeup_time
            =
            mop::do_mailop  (timeout_at'  wakeup_time);


    };
end;



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext