PreviousUpNext

15.4.906  src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.pkg

## mailslot.pkg

# Compiled by:
#     src/lib/std/standard.lib


# The representation of synchronous slots.
#
# To ensure that we always leave the atomic region exactly once, we
# require that the blocking operation be responsible for leaving the
# atomic region (in the mailop case, it must also execute the clean-up
# action).  The do_it fn always transfers control to the blocked thread
# without leaving the atomic region.  Note that the give (and give')
# wait_fors run using the receiver's thread ID.



stipulate
    package fat =  fate;                                # fate                          is from   src/lib/std/src/nj/fate.pkg
    package mop =  mailop;                              # mailop                        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
    package tkq =  threadkit_queue;                     # threadkit_queue               is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg
    package itt =  internal_threadkit_types;            # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package ts  =  thread_scheduler;                    # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
    package t = thread;                                 # thread                        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread.pkg
herein

    package mailslot: (weak)
    api {

        Mailop(X);

        include Mailslot;                               # Mailslot                      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.api

        reset_mailslot:  Mailslot(X) -> Void;

    }

    {
        Mailop(X) =  mop::Mailop(X);

        Fate(X) =   fat::Fate(X);

        call_with_current_fate =  fat::call_with_current_fate;
        resume_fate            =  fat::resume_fate;

        # Some inline functions to improve performance 
        #
        fun enqueue (itt::THREADKIT_QUEUE { rear, ... }, x)
            =
            rear := x ! *rear;

        Mailslot(X)
            =
            MAILSLOT
              { priority:  Ref( Int ),
                in_q:      itt::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
                out_q:     itt::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate( (itt::Thread, Fate(X)) )) )
              };

        fun reset_mailslot (MAILSLOT { priority, in_q, out_q } )
            =
            {   priority := 1;

                tkq::reset in_q;
                tkq::reset out_q;
            };

        fun make_mailslot ()
            =
            MAILSLOT
              { priority => REF 1,
                in_q     => tkq::make_threadkit_queue (),
                out_q    => tkq::make_threadkit_queue ()
              };

        # (Mailslot(X), Mailslot(X)) -> Bool 
        #       
        fun same_mailslot
            ( MAILSLOT { in_q=>in1, ... },
              MAILSLOT { in_q=>in2, ... }
            )
            =
            tkq::same_queue (in1, in2);


        fun make_transaction_id ()
            =
            REF (itt::TRANSACTION_ID (ts::get_current_thread()) );


        fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
                =>
                {   transaction_id :=   itt::CANCELLED_TRANSACTION_ID;
                    #
                    thread_id;
                };

            cancel_transaction_and_return_thread_id  (REF (itt::CANCELLED_TRANSACTION_ID))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;

        # Given a transaction ID,
        # set the current thread
        # to its thread ID and
        # mark it cancelled.
        #
        fun set_current_thread  transaction_id
            =
            ts::set_current_thread (cancel_transaction_and_return_thread_id transaction_id);

        Qy_Item(X)
          = NO_ITEM
          |    ITEM  (Ref(itt::Transaction_Id), Fate(X))
          ;

        # Bump a priority value by one,
        # returning the old value:
        #
        fun bump_priority (p as REF n)
            =
            {   p := n+1;
                n;
            };

        # Functions to clean slot input and output queues 
        #
        stipulate

            fun clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
                    =>
                    clean r;

                clean [] => [];
                clean l  => l;
            end;

            fun clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
                    =>
                    clean_rev (r, l);

                clean_rev (x ! r, l)
                    =>  clean_rev (r, x ! l);

                clean_rev ([], l)
                    =>
                    l;
            end;

            fun clean_all l
                =
                reverse (clean_rev (l, []), [])
                where
                    fun reverse (x ! r, l)
                            =>
                            reverse (r, x ! l);

                        reverse ([], l)
                            =>
                            l;
                    end;
                end;

        herein

            fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
                = 
                clean_front (*front)
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] => clean_rear (*rear);
                                #
                                f' => {   front := f';
                                          bump_priority priority;
                                      };
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            0;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))
                                    [] => 0;
                                    rr => {   front := rr;
                                              bump_priority priority;
                                          };
                                esac;
                            };
                    end;
                end;

            fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                []            =>  clean_rear *rear;
                                #
                                (item ! rest) =>  {   front := rest;
                                                      ITEM item;
                                                  };
                           esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            NO_ITEM;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))

                                    []  => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front := rest;
                                            ITEM item;
                                        };
                                 esac;
                            };
                    end;
                end;

            fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
                =
                case (clean_all *front)
                    #
                    [] =>  {  front := clean_rev(*rear, [item]);  rear := [];                      };
                    f  =>  {  front := f;                         rear := item ! clean_all *rear;  };
                esac;

        end;                            # stipulate


        fun impossible ()
            =
            raise exception FAIL "Slot: impossible";


        fun give (MAILSLOT { priority, in_q, out_q }, msg)
            =
            {   ts::disable_thread_switching ();
                #
                case (clean_and_remove  in_q)
                    #
                    ITEM (rid, rkont)
                        =>
                        call_with_current_fate
                            (fn put_fate = {   ts::enqueue_and_switch_current_thread
                                               ( put_fate,
                                                 cancel_transaction_and_return_thread_id
                                                     rid
                                               );

                                             priority := 1;
                                             resume_fate rkont msg;
                                         }
                            );

                    NO_ITEM
                        =>
                        {   my (get_id, get_fate)
                                = 
                                call_with_current_fate
                                    (fn put_fate
                                        =
                                        {   enqueue (out_q, (make_transaction_id(), put_fate));
                                            #
                                            ts::reenable_thread_switching_and_dispatch_next_thread ();
                                        }
                                    );

                            ts::reenable_thread_switching_and_switch_to_thread
                              (get_id, get_fate, msg);
                        };
                esac;
            };


        fun give' (MAILSLOT { priority, in_q, out_q }, msg)
            =
            itt::BASE_MAILOPS [is_ready]
            where
                fun do_it ()                                                                    # Reppy calls this fn doFn
                    =
                    {   (tkq::dequeue  in_q)
                            ->
                            (transaction_id,  rfate);
                            

                        call_with_current_fate
                            (fn put_fate
                                =
                                {   ts::enqueue_and_switch_current_thread
                                        ( put_fate,
                                          cancel_transaction_and_return_thread_id  transaction_id
                                        );

                                    priority := 1;

                                    resume_fate  rfate  msg;
                                }
                            );
                    };

                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    {   my (get_id, get_fate)
                            =
                            call_with_current_fate
                                (fn put_fate
                                    =
                                    {   clean_and_enqueue (out_q, (transaction_id, put_fate));
                                        next();
                                        impossible ();
                                    }
                                );

                        clean_up();

                        ts::reenable_thread_switching_and_switch_to_thread (get_id, get_fate, msg);
                    };

                fun is_ready ()
                    =
                    case (clean_and_check (priority, in_q))
                        #
                        0 => itt::MAILOP_UNREADY wait_for;
                        p => itt::MAILOP_READY { priority=>p, do_it };
                    esac;
            end;

        fun nonblocking_give (MAILSLOT { priority, in_q, out_q }, msg)
            =
            call_with_current_fate
                (
                 fn put_fate
                    =
                    {   ts::disable_thread_switching ();
                        #
                        case (clean_and_remove  in_q)
                            #
                            ITEM (rid, rkont)
                                =>
                                {   call_with_current_fate
                                        (
                                         fn put_fate
                                            =
                                            {   ts::enqueue_and_switch_current_thread (put_fate, cancel_transaction_and_return_thread_id rid);
                                                priority := 1;
                                                resume_fate rkont msg;
                                            }
                                        );

                                    TRUE;
                                };

                            NO_ITEM
                                =>
                                {   ts::reenable_thread_switching ();
                                    #
                                    FALSE;
                                };
                        esac;
                    }
                );

        fun take (MAILSLOT { priority, in_q, out_q } )
            =
            call_with_current_fate
                (
                 fn get_fate
                    =
                    {   ts::disable_thread_switching ();
                        #
                        case (clean_and_remove out_q)
                            #
                            ITEM (transaction_id, put_fate)
                                =>
                                {   my_id =  ts::get_current_thread ();

                                    set_current_thread  transaction_id;
                                    priority := 1;
                                    resume_fate put_fate (my_id, get_fate);
                                };

                            NO_ITEM
                                =>
                                {   enqueue (in_q, (make_transaction_id(), get_fate));
                                    #
                                    ts::reenable_thread_switching_and_dispatch_next_thread ();
                                };
                        esac;
                    }
                );

        fun take' (MAILSLOT { priority, in_q, out_q } )
            =
            itt::BASE_MAILOPS [is_ready]
            where
                fun do_it ()                                                                    # Reppy calls this fn doFn
                    =
                    {   (tkq::dequeue  out_q)
                            ->
                            (transaction_id, put_fate);

                        my_id =  ts::get_current_thread ();

                        set_current_thread transaction_id;

                        priority := 1;

                        call_with_current_fate
                            (fn get_fate
                                =
                                resume_fate  put_fate  (my_id,  get_fate)
                            );
                    };

                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    {   msg =   call_with_current_fate
                                    (
                                     fn get_fate
                                        =
                                        {   clean_and_enqueue
                                                (in_q, (transaction_id, get_fate));

                                            next ();

                                            impossible();
                                        }
                                    );

                        clean_up ();

                        ts::reenable_thread_switching ();

                        msg;
                    };

                fun is_ready ()
                    =
                    case (clean_and_check (priority, out_q))
                        #
                        0        =>  itt::MAILOP_UNREADY wait_for;
                        priority =>  itt::MAILOP_READY { priority, do_it };
                    esac;
            end;

        fun nonblocking_take (MAILSLOT { priority, in_q, out_q } )
            =
            {   ts::disable_thread_switching ();
                #
                case (clean_and_remove out_q)
                    #
                    ITEM (transaction_id, put_fate)
                        =>
                        THE (call_with_current_fate
                                (fn get_fate
                                    =
                                    {   my_id =  ts::get_current_thread ();
                                        #
                                        set_current_thread  transaction_id;
                                        #
                                        priority := 1;
                                        #
                                        resume_fate
                                            put_fate
                                            (my_id,  get_fate);
                                    }
                            )   );

                    NO_ITEM
                        =>
                        {   ts::reenable_thread_switching ();
                            #
                            NULL;
                        };
                esac;
            };
    };
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext