


## mailslot.pkg
# Compiled by:
# src/lib/std/standard.lib# The representation of synchronous slots.
#
# To ensure that we always leave the atomic region exactly once, we
# require that the blocking operation be responsible for leaving the
# atomic region (in the mailop case, it must also execute the clean-up
# action). The do_it fn always transfers control to the blocked thread
# without leaving the atomic region. Note that the give (and give')
# wait_fors run using the receiver's thread ID.
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package mop = mailop; # mailop is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package tkq = threadkit_queue; # threadkit_queue is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package ts = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg package t = thread; # thread is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread.pkgherein
package mailslot: (weak)
api {
Mailop(X);
include Mailslot; # Mailslot is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.api reset_mailslot: Mailslot(X) -> Void;
}
{
Mailop(X) = mop::Mailop(X);
Fate(X) = fat::Fate(X);
call_with_current_fate = fat::call_with_current_fate;
resume_fate = fat::resume_fate;
# Some inline functions to improve performance
#
fun enqueue (itt::THREADKIT_QUEUE { rear, ... }, x)
=
rear := x ! *rear;
Mailslot(X)
=
MAILSLOT
{ priority: Ref( Int ),
in_q: itt::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
out_q: itt::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate( (itt::Thread, Fate(X)) )) )
};
fun reset_mailslot (MAILSLOT { priority, in_q, out_q } )
=
{ priority := 1;
tkq::reset in_q;
tkq::reset out_q;
};
fun make_mailslot ()
=
MAILSLOT
{ priority => REF 1,
in_q => tkq::make_threadkit_queue (),
out_q => tkq::make_threadkit_queue ()
};
# (Mailslot(X), Mailslot(X)) -> Bool
#
fun same_mailslot
( MAILSLOT { in_q=>in1, ... },
MAILSLOT { in_q=>in2, ... }
)
=
tkq::same_queue (in1, in2);
fun make_transaction_id ()
=
REF (itt::TRANSACTION_ID (ts::get_current_thread()) );
fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
=>
{ transaction_id := itt::CANCELLED_TRANSACTION_ID;
#
thread_id;
};
cancel_transaction_and_return_thread_id (REF (itt::CANCELLED_TRANSACTION_ID))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
# Given a transaction ID,
# set the current thread
# to its thread ID and
# mark it cancelled.
#
fun set_current_thread transaction_id
=
ts::set_current_thread (cancel_transaction_and_return_thread_id transaction_id);
Qy_Item(X)
= NO_ITEM
| ITEM (Ref(itt::Transaction_Id), Fate(X))
;
# Bump a priority value by one,
# returning the old value:
#
fun bump_priority (p as REF n)
=
{ p := n+1;
n;
};
# Functions to clean slot input and output queues
#
stipulate
fun clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
=>
clean r;
clean [] => [];
clean l => l;
end;
fun clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
=>
clean_rev (r, l);
clean_rev (x ! r, l)
=> clean_rev (r, x ! l);
clean_rev ([], l)
=>
l;
end;
fun clean_all l
=
reverse (clean_rev (l, []), [])
where
fun reverse (x ! r, l)
=>
reverse (r, x ! l);
reverse ([], l)
=>
l;
end;
end;
herein
fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
=
clean_front (*front)
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
#
[] => clean_rear (*rear);
#
f' => { front := f';
bump_priority priority;
};
esac;
end
also
fun clean_rear []
=>
0;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => 0;
rr => { front := rr;
bump_priority priority;
};
esac;
};
end;
end;
fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
#
[] => clean_rear *rear;
#
(item ! rest) => { front := rest;
ITEM item;
};
esac;
end
also
fun clean_rear []
=>
NO_ITEM;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
ITEM item;
};
esac;
};
end;
end;
fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
=
case (clean_all *front)
#
[] => { front := clean_rev(*rear, [item]); rear := []; };
f => { front := f; rear := item ! clean_all *rear; };
esac;
end; # stipulate
fun impossible ()
=
raise exception FAIL "Slot: impossible";
fun give (MAILSLOT { priority, in_q, out_q }, msg)
=
{ ts::disable_thread_switching ();
#
case (clean_and_remove in_q)
#
ITEM (rid, rkont)
=>
call_with_current_fate
(fn put_fate = { ts::enqueue_and_switch_current_thread
( put_fate,
cancel_transaction_and_return_thread_id
rid
);
priority := 1;
resume_fate rkont msg;
}
);
NO_ITEM
=>
{ my (get_id, get_fate)
=
call_with_current_fate
(fn put_fate
=
{ enqueue (out_q, (make_transaction_id(), put_fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
ts::reenable_thread_switching_and_switch_to_thread
(get_id, get_fate, msg);
};
esac;
};
fun give' (MAILSLOT { priority, in_q, out_q }, msg)
=
itt::BASE_MAILOPS [is_ready]
where
fun do_it () # Reppy calls this fn doFn
=
{ (tkq::dequeue in_q)
->
(transaction_id, rfate);
call_with_current_fate
(fn put_fate
=
{ ts::enqueue_and_switch_current_thread
( put_fate,
cancel_transaction_and_return_thread_id transaction_id
);
priority := 1;
resume_fate rfate msg;
}
);
};
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ my (get_id, get_fate)
=
call_with_current_fate
(fn put_fate
=
{ clean_and_enqueue (out_q, (transaction_id, put_fate));
next();
impossible ();
}
);
clean_up();
ts::reenable_thread_switching_and_switch_to_thread (get_id, get_fate, msg);
};
fun is_ready ()
=
case (clean_and_check (priority, in_q))
#
0 => itt::MAILOP_UNREADY wait_for;
p => itt::MAILOP_READY { priority=>p, do_it };
esac;
end;
fun nonblocking_give (MAILSLOT { priority, in_q, out_q }, msg)
=
call_with_current_fate
(
fn put_fate
=
{ ts::disable_thread_switching ();
#
case (clean_and_remove in_q)
#
ITEM (rid, rkont)
=>
{ call_with_current_fate
(
fn put_fate
=
{ ts::enqueue_and_switch_current_thread (put_fate, cancel_transaction_and_return_thread_id rid);
priority := 1;
resume_fate rkont msg;
}
);
TRUE;
};
NO_ITEM
=>
{ ts::reenable_thread_switching ();
#
FALSE;
};
esac;
}
);
fun take (MAILSLOT { priority, in_q, out_q } )
=
call_with_current_fate
(
fn get_fate
=
{ ts::disable_thread_switching ();
#
case (clean_and_remove out_q)
#
ITEM (transaction_id, put_fate)
=>
{ my_id = ts::get_current_thread ();
set_current_thread transaction_id;
priority := 1;
resume_fate put_fate (my_id, get_fate);
};
NO_ITEM
=>
{ enqueue (in_q, (make_transaction_id(), get_fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
};
esac;
}
);
fun take' (MAILSLOT { priority, in_q, out_q } )
=
itt::BASE_MAILOPS [is_ready]
where
fun do_it () # Reppy calls this fn doFn
=
{ (tkq::dequeue out_q)
->
(transaction_id, put_fate);
my_id = ts::get_current_thread ();
set_current_thread transaction_id;
priority := 1;
call_with_current_fate
(fn get_fate
=
resume_fate put_fate (my_id, get_fate)
);
};
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ msg = call_with_current_fate
(
fn get_fate
=
{ clean_and_enqueue
(in_q, (transaction_id, get_fate));
next ();
impossible();
}
);
clean_up ();
ts::reenable_thread_switching ();
msg;
};
fun is_ready ()
=
case (clean_and_check (priority, out_q))
#
0 => itt::MAILOP_UNREADY wait_for;
priority => itt::MAILOP_READY { priority, do_it };
esac;
end;
fun nonblocking_take (MAILSLOT { priority, in_q, out_q } )
=
{ ts::disable_thread_switching ();
#
case (clean_and_remove out_q)
#
ITEM (transaction_id, put_fate)
=>
THE (call_with_current_fate
(fn get_fate
=
{ my_id = ts::get_current_thread ();
#
set_current_thread transaction_id;
#
priority := 1;
#
resume_fate
put_fate
(my_id, get_fate);
}
) );
NO_ITEM
=>
{ ts::reenable_thread_switching ();
#
NULL;
};
esac;
};
};
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


