PreviousUpNext

15.4.904  src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg

## mailop.pkg

# Compiled by:
#     src/lib/std/standard.lib



# Implementation of mailop values and the mailop combinators.
#
# Some important requirements on the implementation
# of base mailop values:
#
#  1)  An is_ready, do_it or wait_for function
#      is always called from inside an atomic region.
#
#  2)  An is_ready returns an integer priority.
#      This is   0 when not enabled,
#               -1 for fixed priority and
#               >0 for dynamic priority.
#      The standard scheme is to associate a counter
#      with the underlying synchronization value and
#      to increase it by one for each synchronization attempt.
#
#  3)  A wait_for is responsible for exiting the atomic region.
#      A do_it should NOT leave the atomic region.
#
#  4)  Each wait_for is responsible for executing the
#      "clean_up" action prior to leaving the atomic region.


stipulate
    package fat =  fate;                                                        # fate                          is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                    # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package ts  =  thread_scheduler;                                            # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
herein

    package mailop: (weak)
    api {

        include Mailop;                                                         # Mailop                        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api

        atomic_cvar_set:   internal_threadkit_types::Condition_Variable -> Void;
        cvar_get_mailop:   internal_threadkit_types::Condition_Variable -> Mailop( Void );

    }{

        capture_fate            =  fat::capture_fate;
        throw_fate             =  fat::throw_fate;
        call_with_current_fate  =  fat::call_with_current_fate;
        resume_fate             =  fat::resume_fate;

        # Some inline functions
        # to improve performance:

        fun map f
            =
            mapf
            where
                fun mapf [] => [];
                    mapf [a] => [f a];
                    mapf [a, b] => [f a, f b];
                    mapf [a, b, c] => [f a, f b, f c];
                    mapf (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (mapf r);
                end;
            end;

        fun apply f
            =
            appf
            where
                fun appf [] => ();
                    appf (x ! r) => { f x; appf r;};
                end;
            end;

        fun fold_left f init l
            =
            foldf (l, init)
            where
                fun foldf ([], accum) => accum;
                    foldf (x ! r, accum) => foldf (r, f (x, accum));
                end;
            end;

        fun error msg
            =
            raise exception FAIL msg;

        Mailop_Status ==  itt::Mailop_Status;
        Mailop        ==  itt::Mailop;
        Base_Mailop(X) =  itt::Base_Mailop(X);


        # Condition variables.
        #
        # Because these variables are set inside
        # atomic regions we have to use different
        # conventions for clean-up, etc.  Instead
        # of requiring the wait_for fate
        # to call the clean_up action and to leave
        # the atomic region, we call the clean_up
        # function when setting the condition variable
        # (in atomic_cvar_set), and have the invariant
        # that the wait_for fate is dispatched
        # outside the atomic region.


        # Set a condition variable.
        # We assume that this function is always
        # executed in an atomic region.
        #
        fun atomic_cvar_set (itt::CONDITION_VARIABLE state)
            =
            case *state
                #
                itt::CVAR_UNSET waiting
                    =>
                    {   ts::foreground_run_queue ->  itt::THREADKIT_QUEUE { rear, ... };
                        #
                        fun add []
                                =>
                                *rear;

                            add ( { transaction_id=>REF itt::CANCELLED_TRANSACTION_ID, ... } ! r)
                                =>
                                add r;

                            add ( { transaction_id as REF (itt::TRANSACTION_ID tid), clean_up, fate } ! r)
                                =>
                                {   transaction_id
                                        :=
                                        itt::CANCELLED_TRANSACTION_ID;

                                    clean_up ();

                                    (tid, fate) ! (add r);
                                };
                        end;

                        state := itt::CVAR_SET 1;
                        rear := add waiting;
                    };

                 _ => error "cvar already set";
            esac;


        # The mailop constructor for
        # waiting on a condition variable:
        #
        fun cvar_get_mailop (itt::CONDITION_VARIABLE  state)
            =
            BASE_MAILOPS [is_ready]
            where 
                fun wait_for { transaction_id, clean_up, next }                         # Reppy calls this fn blockFn
                    =
                    call_with_current_fate
                        (fn fate
                            =
                            {   waiting =   case *state    itt::CVAR_UNSET waiting =>  waiting;
                                                 /* */     _                     =>  raise exception FAIL "Library bug: Unsupported case in cvar_get_mailop";   # Impossible because is_ready only queues us up if *state is not CVAR_SET.
                                            esac;
                                #
                                item = { transaction_id, clean_up, fate };

                                state := itt::CVAR_UNSET (item ! waiting);

                                next ();
                            }
                        );

                fun is_ready ()                                                                 # Reppy calls this fn pollFn
                    =
                    case *state
                        #
                        itt::CVAR_SET n
                            =>
                            {   state := itt::CVAR_SET (n+1);
                                #
                                MAILOP_READY { priority => n, do_it }
                                where
                                    fun do_it ()                                                # Reppy calls this fn doFn
                                        =
                                        {   state :=  itt::CVAR_SET  1;
                                            #
                                            ts::reenable_thread_switching ();
                                        };
                                end;
                            };

                        _ => MAILOP_UNREADY wait_for;
                    esac;
            end;


        # A mailop which is always ready
        # and produces given result:
        #
        fun always_mailop  result
            =
            BASE_MAILOPS [

                fn () = itt::MAILOP_READY
                          { priority  =>  -1,
                            do_it =>  fn () = {   ts::reenable_thread_switching ();             # Reppy calls this field doFn
                                                  result;
                                              }
                          }
            ];

        # A mailop which is never ready:
        #
        never = BASE_MAILOPS [];

        guard = GUARD;

        with_nack = WITH_NACK;
                                                                                            # "bevt" == "base event"
        fun choose (el:  List(  Mailop(X) ))
            =
            gather_bevts (reverse el, [])
            where
                fun gather_bevts ([], l) => BASE_MAILOPS l;
                    gather_bevts (BASE_MAILOPS [] ! r, l) => gather_bevts (r, l);
                    gather_bevts (BASE_MAILOPS [bev] ! r, base_mailops') => gather_bevts (r, bev ! base_mailops');
                    gather_bevts (BASE_MAILOPS base_mailops ! r, base_mailops') => gather_bevts (r, base_mailops @ base_mailops');
                    gather_bevts (mailops, []) => gather (mailops, []);
                    gather_bevts (mailops, l) => gather (mailops, [BASE_MAILOPS l]);
                end 

                also
                fun gather ([], [mailop]) => mailop;
                    gather ([], mailops) => CHOOSE mailops;
                    gather (CHOOSE mailops ! r, mailops') => gather (r, mailops @ mailops');
                    gather (BASE_MAILOPS base_mailops ! r, BASE_MAILOPS base_mailops' ! r') => gather (r, BASE_MAILOPS (base_mailops @ base_mailops') ! r');
                    gather (mailop ! r, mailops') => gather (r, mailop ! mailops');
                end;
            end;

        fun wrap (mailop, wfn)
            =
            wrap' mailop
            where
                fun wrap_base_mailop is_ready ()
                    =
                    case (is_ready ())
                        #
                        MAILOP_READY { priority, do_it } =>  MAILOP_READY { priority, do_it => wfn o do_it };
                        MAILOP_UNREADY wait_for          =>  MAILOP_UNREADY (wfn o wait_for);
                    esac;

                fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops);
                    #
                    wrap' (CHOOSE mailops)  =>  CHOOSE (map wrap' mailops);
                    wrap' (GUARD g)          =>  GUARD (fn () = wrap (g(), wfn));
                    wrap' (WITH_NACK f)      =>  WITH_NACK (fn mailop = wrap (f mailop, wfn));
                end;
            end;

        (==>) = wrap;           # Infix synonym for readability.

        fun wrap_handler (mailop, hfn)
            =
            wrap' mailop
            where
                fun wrap f x
                    =
                    f x
                    except
                        exn = hfn exn;

                fun wrap_base_mailop  is_ready ()
                    =
                    case (is_ready ())
                        #
                        MAILOP_READY { priority, do_it } =>  MAILOP_READY { priority, do_it => wrap do_it };
                        MAILOP_UNREADY wait_for          =>  MAILOP_UNREADY (wrap wait_for);
                    esac;

                fun wrap' (BASE_MAILOPS base_mailops) =>  BASE_MAILOPS (map wrap_base_mailop base_mailops);
                    wrap' (CHOOSE mailops)    =>  CHOOSE (map wrap' mailops);
                    wrap' (GUARD g)          =>  GUARD  (fn ()  = wrap_handler (g(), hfn));
                    wrap' (WITH_NACK f)      =>  WITH_NACK (fn mailop = wrap_handler (f mailop, hfn));
                end;
            end;

        Mailop_Group X
          = BASE_GROUP  List( Base_Mailop(X) )
          | GROUP       List( Mailop_Group(X) )
          | NACK_GROUP  (itt::Condition_Variable, Mailop_Group(X))
          ;

    /* +DEBUG
    fun sayGroup (msg, eg) = let
          fun f (BASE_GROUP l, sl) = "BASE_GROUP(" ! int::to_string (list::length l) ::()) ! sl
            | f (GROUP l, sl) = "GROUP(" ! g (l, ")" ! sl)
            | f (NACK_GROUP l, sl) = "NACK_GROUP(" ! f(#2 l, ")" ! sl)
          also g ([], sl) = sl
            | g ([x], sl) = f (x, sl)
            | g (x ! r, sl) = f (x, ", " ! g (r, sl))
          in
            Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
          end
    -DEBUG*/

        # Force the evaluation of
        # any guards in a mailop group:
        #
        fun force (BASE_MAILOPS l)
                =>
                BASE_GROUP l;

            force mailop
                =>
                force' mailop
                where
                    fun force' (GUARD g)
                            =>
                            force' (g ());

                        force' (WITH_NACK f)
                            =>
                            {   cvar = itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET []));

                                NACK_GROUP (cvar, force' (f (cvar_get_mailop cvar)));
                            };

                        force' (BASE_MAILOPS group)
                            =>
                            BASE_GROUP group;

                        force' (CHOOSE mailops)
                            =>
                            force_bl (mailops, [])
                            where
                                fun force_bl ([], base_mailops)
                                        =>
                                        BASE_GROUP base_mailops;

                                    force_bl (mailop ! r, base_mailops')
                                        =>
                                        case (force' mailop)
                                            BASE_GROUP base_mailops =>  force_bl (r, base_mailops @ base_mailops');
                                            GROUP      group        =>  force_l  (r, group @ [BASE_GROUP base_mailops']);
                                            group                   =>  force_l  (r, [group, BASE_GROUP base_mailops']);
                                        esac;
                                end

                                also
                                fun force_l ([], [group]) =>  group;
                                    force_l ([], l)       =>  GROUP l;

                                    force_l (mailop ! r, l)
                                        =>
                                        case (force' mailop, l)
                                            #                                 
                                            (BASE_GROUP base_mailops, BASE_GROUP base_mailops' ! r')
                                                =>
                                                force_l (r, BASE_GROUP (base_mailops @ base_mailops') ! r');

                                            (GROUP group, l)
                                                =>
                                                force_l (r, group @ l);

                                            (group, l)
                                                =>
                                                force_l (r, group ! l);
                                       esac;
                                end;
                            end;
                      end;
                end;
        end;


        stipulate

            count = REF 0;

            fun random i
                =
                {   j = *count;

                    if (j == 1000000)   count := 0;
                    else                count := j+1;
                    fi;

                    int::rem (j, i);
                };
        herein

            fun select_do_fn ([(_, do_it)], _)
                    =>
                    do_it;

                select_do_fn (l, n)
                    =>
                    max (l, 0, 0, [])
                    where

                        fun priority -1 =>  n;
                            priority  p =>  p;
                        end;

                        fun max ((p, do_it) ! r, max_p, k, do_its)
                                =>
                                {   p = priority p;

                                    if    (p >  max_p)   max (r, p, 1, [do_it]);
                                    elif  (p == max_p)   max (r, max_p, k+1, do_it ! do_its);
                                    else                 max (r, max_p, k, do_its);
                                    fi;
                                };

                            max ([], _, k, [do_it])
                                =>
                                do_it;

                            max ([], _, k, do_its)
                                =>
                                list::nth (do_its, random k);
                        end;
                    end;
             end;
        end;

        fun make_flag ()
            =
            {   flag = REF (itt::TRANSACTION_ID (ts::get_current_thread()));

                (flag, fn () =  flag := itt::CANCELLED_TRANSACTION_ID);
            };


        fun sync_on_one_mailop  (is_ready:  Base_Mailop(X))
            =
            {   ts::disable_thread_switching ();
                #
                case (is_ready ())
                    #         
                    MAILOP_READY { do_it, ... }                                 # Reppy calls this field doFn
                        =>
                        do_it ();

                    MAILOP_UNREADY wait_for
                        =>
                        {   my (flag, set_flag)
                                =
                                make_flag ();

                            wait_for { transaction_id=>flag, clean_up=>set_flag, next=>ts::reenable_thread_switching_and_dispatch_next_thread };
                        };
                esac;
            };

        # This function handles the case of
        # synchronizing on a list of base mailops
        # (w/o any negative acknowledgements).
        #
        # It also handles the case of synchronizing
        # on NEVER.
        #
        fun sync_on_base_mailops []    =>  ts::dispatch_next_thread ();
            sync_on_base_mailops [bev] =>  sync_on_one_mailop  bev;

            sync_on_base_mailops  base_mailops
                =>
                {   fun ext ([], wait_fors)
                            =>
                            capture_fate
                                (fn fate
                                    =
                                    {   throw_fate = throw_fate fate;

                                        my (transaction_id, set_flag)
                                            =
                                            make_flag ();

                                        fun log []
                                                =>
                                                ts::reenable_thread_switching_and_dispatch_next_thread ();

                                            log (wait_for ! r)
                                                =>
                                                throw_fate
                                                    (wait_for
                                                      { transaction_id,
                                                        clean_up =>  set_flag,
                                                        next     =>  fn () = log r
                                                      }
                                                    );
                                        end;

                                        log wait_fors;

                                        error "[log]";
                                    }
                                );

                        ext (is_ready ! r, wait_fors)
                            =>
                            case (is_ready ())
                                #
                                MAILOP_READY { priority, do_it } =>  ext_rdy (r, [(priority, do_it)], 1);
                                MAILOP_UNREADY wait_for          =>  ext     (r, wait_for ! wait_fors);
                            esac;
                    end

                    # NOTE: Maybe we should just keep
                    #       track of the max priority?
                    #       What about fairness to fixed
                    #       priority mailops (e::g., always, timeout?)

                    also
                    fun ext_rdy ([], do_its, n)
                            =>
                            select_do_fn (do_its, n) ();

                        ext_rdy (is_ready ! r, do_its, n)
                            =>
                            case (is_ready ())
                                #
                                MAILOP_READY { priority, do_it } =>  ext_rdy (r, (priority, do_it) ! do_its, n+1);
                                _                                =>  ext_rdy (r, do_its, n);
                            esac;
                    end;

                    ts::disable_thread_switching ();

                    ext (base_mailops, []);
                };
        end;

        # Walk the mailop group tree,
        # collecting the base mailops
        # (with associated ack flags),
        # also a list of flag sets.
        #
        # A flag set is a
        #     (cCar, List(Flag(Ack)))
        # pair, where the flags are
        # those associated with the
        # mailops covered by the nack
        # cvar.
        #
        fun collect group
            =
            {   un_wrapped_flag
                    =
                    REF FALSE;

                fun gather_wrapped (group, bl, flg_sets)
                        =
                        {       my (bl, _, flg_sets)
                                =
                                gather (group, bl, [], flg_sets);

                            (bl, flg_sets);
                        }
                        where
                            fun gather (BASE_GROUP base_mailops, bl, all_flgs, flg_sets)
                                    =>
                                    {   fun append ([], bl, all_flgs)
                                                =>
                                                (bl, all_flgs);

                                            append (bev ! r, bl, all_flgs)
                                                =>
                                                {   flag = REF FALSE;

                                                    append (r, (bev, flag) ! bl, flag ! all_flgs);
                                                };
                                        end;


                                        my (bl', all_flgs')
                                            =
                                            append (base_mailops, bl, all_flgs);


                                        (bl', all_flgs', flg_sets);
                                    };

                                gather (GROUP group, bl, all_flgs, flg_sets)
                                    =>
                                    {   fun f (group', (bl', all_flgs', flg_sets'))
                                            =
                                            gather (group', bl', all_flgs', flg_sets');

                                        fold_left f (bl, all_flgs, flg_sets) group;
                                    };

                                gather (NACK_GROUP (cvar, group), bl, all_flgs, flg_sets)
                                    =>
                                    {   my (bl', all_flgs', flg_sets')
                                            =
                                            gather (group, bl, [], flg_sets);

                                        (bl', all_flgs' @ all_flgs, (cvar, all_flgs') ! flg_sets');
                                    };
                            end;
                        end;

                case group
                    #
                    GROUP _ =>
                        gather (group, [], [])
                        where 
                            un_wrapped_flag =  REF FALSE;

                            fun append ([],      bl) =>  bl;
                                append (bev ! r, bl) =>  append (r, (bev, un_wrapped_flag) ! bl);
                            end;

                            fun gather (BASE_GROUP base_mailops, bl, flg_sets)
                                    =>
                                    (append (base_mailops, bl), flg_sets);

                                gather (GROUP group, bl, flg_sets)
                                    =>
                                    fold_left  f  (bl, flg_sets)  group
                                    where
                                        fun f (group', (bl', flg_sets'))
                                            =
                                            gather (group', bl', flg_sets');
                                    end;


                                gather (group as NACK_GROUP _, bl, flg_sets)
                                    =>
                                    gather_wrapped (group, bl, flg_sets);
                            end;
                        end;

                    group =>
                        gather_wrapped (group, [], []);
                esac;
            };

        # This function handles the more
        # complicated case of synchronization
        # on groups of mailops where negative
        # acknowledgements are involved.
        #
        fun sync_on_group group
            =
            {   my (bl, flg_sets)
                    =
                    collect group;

                fun check_cvars ()
                    =
                    apply check_cvar flg_sets
                    where
                        # check_cvar checks the flags of a flag set.
                        # If they are all FALSE then the
                        # corresponding cvar is set to signal
                        # the negative ack.
                        #
                        fun check_cvar (cvar, flgs)
                            =
                            check_flgs flgs
                            where
                                fun check_flgs []
                                        =>
                                        atomic_cvar_set cvar;

                                    check_flgs ((REF TRUE) ! _)
                                        =>
                                        ();

                                    check_flgs (_ ! r)
                                        =>
                                        check_flgs r;
                                end;
                            end;
                    end;

                fun ext ([], wait_fors)
                        =>
                        capture_fate
                            (fn fate
                                =
                                {   throw_fate =  throw_fate  fate;

                                    transaction_id
                                        =
                                        REF (itt::TRANSACTION_ID (ts::get_current_thread ()));

                                    fun set_flag flag ()
                                        =
                                        {   transaction_id := itt::CANCELLED_TRANSACTION_ID;
                                            flag := TRUE;
                                            check_cvars ();
                                        };

                                    fun log []
                                            =>
                                            ts::reenable_thread_switching_and_dispatch_next_thread ();

                                        log ((wait_for, flag) ! r)
                                            =>
                                            throw_fate
                                                (wait_for { transaction_id,
                                                            clean_up =>  set_flag flag,
                                                            next     =>  fn () = log r
                                                          }
                                                );
                                    end;

                                    log  wait_fors;

                                    error "[log]";
                                 }
                            );

                    ext ((is_ready, flag) ! r, wait_fors)
                        =>
                        case (is_ready ())
                            #
                            MAILOP_READY { priority, do_it } =>  ext_rdy (r, [(priority, (do_it, flag))], 1);
                            MAILOP_UNREADY wait_for          =>  ext (r, (wait_for, flag) ! wait_fors);
                        esac;
                end

                # NOTE: maybe we should just
                # keep track of the max priority?
                # What about fairness to fixed
                # priority mailops (e::g., always, timeout?)
                #
                also
                fun ext_rdy ([], do_its, n)
                        =>
                        {   my (do_it, flag)
                                =
                                select_do_fn (do_its, n);

                            flag := TRUE;
                            check_cvars ();
                            do_it();
                        };

                    ext_rdy ((is_ready, flag) ! r, do_its, n)
                        =>
                        case (is_ready ())
                            #
                            MAILOP_READY { priority, do_it }
                                =>
                                ext_rdy (r, (priority, (do_it, flag)) ! do_its, n+1);

                            _   =>
                                ext_rdy (r, do_its, n);
                        esac;
                end;

                ts::disable_thread_switching ();

                ext (bl, []);
            };                                                  # fun sync_on_group


        fun do_mailop  mailop
            =
            case (force mailop)
                #
                BASE_GROUP base_mailops =>  sync_on_base_mailops  base_mailops;
                group                   =>  sync_on_group         group;
            esac;


        fun select mailops
            =
            {   fun force_bl ([], base_mailops)
                        =>
                        BASE_GROUP base_mailops;

                    force_bl (mailop ! r, base_mailops')
                        =>
                        case (force' mailop)
                            #
                            BASE_GROUP base_mailops =>  force_bl (r, base_mailops @ base_mailops');
                            #
                            GROUP group  =>  force_l  (r, group @ [BASE_GROUP base_mailops']);
                            group        =>  force_l  (r, [group, BASE_GROUP base_mailops']);
                        esac;
                end

                also
                fun force_l ([], [group]) =>  group;
                    force_l ([], l)       =>  GROUP l;

                    force_l (mailop ! r, l)
                        =>
                        case (force' mailop, l)
                            #
                            (BASE_GROUP base_mailops, BASE_GROUP base_mailops' ! r')
                                =>
                                force_l (r, BASE_GROUP (base_mailops @ base_mailops') ! r');

                            (GROUP group, l)
                                =>
                                force_l (r, group @ l);

                            (group, l)
                                =>
                                force_l (r, group ! l);
                        esac;
                end

                also
                fun force' (GUARD g)
                        =>
                        force' (g ());

                    force' (WITH_NACK f)
                        =>
                        {   cvar = itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET []));

                            NACK_GROUP (cvar, force' (f (cvar_get_mailop cvar)));
                        };

                    force' (BASE_MAILOPS group)
                        =>
                        BASE_GROUP group;

                    force' (CHOOSE mailops)
                        =>
                        force_bl (mailops, []);
                end;

                case (force_bl (mailops, []))
                    #
                    BASE_GROUP base_mailops
                        =>
                        sync_on_base_mailops  base_mailops;

                    group =>
                        sync_on_group  group;
                esac;
            };                          # fun select
    };                                  # package mailop
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext