


## maildrop.pkg
#
# The implementation of Id-style synchronizing memory cells.
# These are essentially concurrency-safe replacements for REF cells.
# Compiled by:
# src/lib/std/standard.lib### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package tkq = threadkit_queue; # threadkit_queue is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package ts = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkgherein
package maildrop
: Maildrop # Maildrop is from src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api {
Fate(X)
=
fate::Fate(X);
call_with_current_fate = fate::call_with_current_fate;
resume_fate = fate::resume_fate;
Cell(X)
=
CELL {
priority: Ref( Int ),
read_q: tkq::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
value: Ref( Null_Or(X) )
};
Maildrop(X) = Cell(X);
exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
fun new_cell ()
=
CELL { priority => REF 0,
value => REF NULL,
read_q => tkq::make_threadkit_queue ()
};
fun same_cell (CELL { value=>v1, ... }, CELL { value=>v2, ... } )
=
v1 == v2;
fun make_transaction_id ()
=
REF (itt::TRANSACTION_ID (ts::get_current_thread()));
fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
=>
{ transaction_id := itt::CANCELLED_TRANSACTION_ID;
#
thread_id;
};
cancel_transaction_and_return_thread_id (REF (itt::CANCELLED_TRANSACTION_ID))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
fun bump_priority (p as REF n) # Bump a priority value by one, returning the old value.
=
{ p := n+1;
n;
};
Qy_Item(X)
#
= NO_ITEM
| ITEM ((Ref itt::Transaction_Id, Fate(X)))
;
# Functions to clean channel input and output queues
#
stipulate
fun clean []
=>
[];
clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
=>
clean r;
clean l
=>
l;
end;
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
=>
clean_rev (r, l);
clean_rev (x ! r, l)
=>
clean_rev (r, x ! l);
end;
herein
fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
f' => { front := f';
bump_priority priority;
};
esac;
end
also
fun clean_rear []
=>
0;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => 0;
rr => { front := rr;
bump_priority priority;
};
esac;
};
end;
end;
fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
(item ! rest) => { front := rest;
ITEM item;
};
esac;
end
also
fun clean_rear []
=>
NO_ITEM;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
ITEM item;
};
esac;
};
end;
end;
fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
f' => { front := f';
rear := item ! *rear;
};
esac;
end
also
fun clean_rear []
=>
front := [item];
clean_rear r
=>
case (clean_rev (r, []))
[] => { front := [item]; rear := []; };
rr => { rear := [item]; front := rr; };
esac;
end;
end;
end; # stipulate
# When a thread is resumed after being blocked
# on an iGet or mGet operation there may be
# other threads also blocked on the variable.
#
# This function is used to propagate the message
# to all of the threads that are blocked on the
# variable (or until one of them takes the value
# in the mvar case).
#
# It must be called from an atomic region.
# When the readQ is finally empty we leave
# the atomic region.
#
# We must use "cleanAndRemove" to get items
# from the readQ in the unlikely event that
# a single thread executes a choice of
# multiple gets on the same variable.
#
fun relay_msg (read_q, msg)
=
case (clean_and_remove read_q)
NO_ITEM
=>
ts::reenable_thread_switching ();
ITEM (transaction_id, fate)
=>
call_with_current_fate
(fn my_fate
=
{ ts::enqueue_and_switch_current_thread
(my_fate, cancel_transaction_and_return_thread_id transaction_id);
resume_fate fate msg;
}
);
esac;
fun impossible ()
=
raise exception FAIL "maildrop: impossible";
# M-variables:
#
make_empty_maildrop
=
new_cell;
fun make_full_maildrop x
=
CELL { priority => REF 0,
read_q => tkq::make_threadkit_queue (),
value => REF (THE x)
};
same_maildrop = same_cell;
fun fill (CELL { priority, read_q, value }, x)
=
{ ts::disable_thread_switching ();
case *value
NULL =>
{ value := THE x;
case (clean_and_remove read_q)
NO_ITEM
=>
ts::reenable_thread_switching ();
ITEM (transaction_id, fate)
=>
call_with_current_fate
(fn my_fate
=
{ ts::enqueue_and_switch_current_thread
(my_fate, cancel_transaction_and_return_thread_id transaction_id);
priority := 1;
resume_fate fate x;
}
);
esac;
};
THE _ =>
{ ts::reenable_thread_switching ();
#
raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
};
esac;
};
fun empty' (CELL { priority, read_q, value } )
=
{ fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (transaction_id, fate));
next ();
impossible ();
}
);
clean_up ();
value := NULL;
ts::reenable_thread_switching ();
v;
};
fun is_ready () # Reppy calls this fn pollFn
=
case *value
#
NULL => itt::MAILOP_UNREADY wait_for;
#
THE v => itt::MAILOP_READY
{
priority => bump_priority priority,
do_it => .{ value := NULL; # Reppy calls this field doFn
ts::reenable_thread_switching ();
v;
}
};
esac;
itt::BASE_MAILOPS [is_ready];
};
fun nonblocking_empty (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching();
case *value
THE v
=>
{ value := NULL;
ts::reenable_thread_switching ();
THE v;
};
NULL => NULL;
esac;
};
fun empty (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching();
case *value
NULL =>
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (make_transaction_id(), fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
value := NULL;
ts::reenable_thread_switching ();
v;
};
THE v =>
{ value := NULL;
ts::reenable_thread_switching ();
v;
};
esac;
};
fun peek (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching ();
case *value
NULL
=>
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (make_transaction_id(), fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
relay_msg (read_q, v);
v;
};
THE v
=>
{ ts::reenable_thread_switching ();
v;
};
esac;
};
fun peek' (CELL { priority, read_q, value } )
=
{
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (transaction_id, fate));
next ();
impossible ();
}
);
clean_up();
relay_msg (read_q, v);
v;
};
fun is_ready () # Reppy calls this fn pollFn
=
case *value
#
NULL => itt::MAILOP_UNREADY wait_for;
#
THE v => itt::MAILOP_READY
{
priority => bump_priority priority,
do_it => .{ ts::reenable_thread_switching (); # Reppy calls this field doFn
v;
}
};
esac;
itt::BASE_MAILOPS [is_ready];
};
fun nonblocking_peek (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching ();
case *value
THE v
=>
{ ts::reenable_thread_switching ();
THE v;
};
NULL => NULL;
esac;
};
# Swap the current contents of the cell with a new value.
#
# This function has the effect of an
# get_mail followed by a put_mail,
# except that it is guaranteed to be atomic.
#
# It is also somewhat more efficient.
#
fun swap (CELL { priority, read_q, value }, new_v)
=
{ ts::disable_thread_switching ();
case *value
NULL =>
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (make_transaction_id(), fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
value := THE new_v;
# Relay the new value to
# any other blocked threads:
#
relay_msg (read_q, new_v);
v;
};
THE v =>
{ value := THE new_v;
ts::reenable_thread_switching ();
v;
};
esac;
};
fun swap' (CELL { priority, read_q, value }, new_v)
=
{
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ v = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (transaction_id, fate));
next ();
impossible();
}
);
clean_up ();
value := THE new_v;
relay_msg (read_q, new_v);
v;
};
fun is_ready ()
=
case *value
#
NULL => itt::MAILOP_UNREADY wait_for;
THE v => itt::MAILOP_READY
{
priority => bump_priority priority,
do_it => .{ value := THE new_v; # Reppy calls this field doFn
ts::reenable_thread_switching ();
v;
}
};
esac;
itt::BASE_MAILOPS [is_ready];
};
}; # package maildrop
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


