


## oneshot-maildrop.pkg
# Compiled by:
# src/lib/std/standard.lib# The implementation of Id-style synchronizing memory cells.
### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package tkq = threadkit_queue; # threadkit_queue is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg package ts = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkgherein
package oneshot_maildrop
: Oneshot_Maildrop # Oneshot_Maildrop is from src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.api {
Fate(X) = fat::Fate(X);
call_with_current_fate = fat::call_with_current_fate;
resume_fate = fat::resume_fate;
# We use the same underlying representation
# for both ivars and mvars:
#
Cell(X) = CELL { priority: Ref( Int ),
read_q: tkq::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
value: Ref( Null_Or(X) )
};
Oneshot_Maildrop(X) = Cell(X);
exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
fun new_cell ()
=
CELL { priority => REF 0,
value => REF NULL,
read_q => tkq::make_threadkit_queue ()
};
fun same_cell (CELL { value=>v1, ... }, CELL { value=>v2, ... } )
=
v1 == v2;
fun make_transaction_id ()
=
REF (itt::TRANSACTION_ID (ts::get_current_thread()));
fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
=>
{ transaction_id := itt::CANCELLED_TRANSACTION_ID;
#
thread_id;
};
cancel_transaction_and_return_thread_id (REF (itt::CANCELLED_TRANSACTION_ID))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
# Bump a priority value by one,
# returning the old value:
#
fun bump_priority (p as REF n)
=
{ p := n+1;
n;
};
Qy_Item X
= NO_ITEM
| ITEM ((Ref(itt::Transaction_Id), Fate(X)) )
;
# Functions to clean channel input and output queues
#
stipulate
fun clean []
=>
[];
clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
=>
clean r;
clean l
=>
l;
end;
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
=>
clean_rev (r, l);
clean_rev (x ! r, l)
=>
clean_rev (r, x ! l);
end;
herein
fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
f' => { front := f';
bump_priority priority;
};
esac;
end
also
fun clean_rear []
=>
0;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => 0;
rr => { front := rr;
bump_priority priority;
};
esac;
};
end;
end;
fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
(item ! rest) => { front := rest;
ITEM item;
};
esac;
end
also
fun clean_rear []
=>
NO_ITEM;
clean_rear r
=>
{ rear := [];
case (clean_rev (r, []))
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
ITEM item;
};
esac;
};
end;
end;
fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
=
clean_front *front
where
fun clean_front []
=>
clean_rear *rear;
clean_front f
=>
case (clean f)
[] => clean_rear *rear;
f' => { front := f';
rear := item ! *rear;
};
esac;
end
also
fun clean_rear []
=>
front := [item];
clean_rear r
=>
case (clean_rev (r, []))
[] => { front := [item]; rear := []; };
rr => { rear := [item]; front := rr; };
esac;
end;
end;
end; # stipulate
# When a thread is resumed after being blocked
# on an iGet or mGet operation there may be
# other threads also blocked on the variable.
#
# This function is used to propagate the message
# to all of the threads that are blocked on the
# variable (or until one of them takes the value
# in the mvar case).
#
# It must be called from an atomic region.
# When the readQ is finally empty we leave
# the atomic region.
#
# We must use "clean_and_remove" to get items
# from the read_q in the unlikely event that
# a single thread executes a choice of
# multiple gets on the same variable.
#
fun relay_msg (read_q, msg)
=
case (clean_and_remove read_q)
#
NO_ITEM => ts::reenable_thread_switching ();
#
ITEM (transaction_id, fate)
=>
call_with_current_fate
(fn my_fate
=
{ ts::enqueue_and_switch_current_thread
(
my_fate,
cancel_transaction_and_return_thread_id transaction_id
);
resume_fate fate msg;
}
);
esac;
# I-variables
#
make_oneshot_maildrop = new_cell;
same_oneshot_maildrop = same_cell;
fun set (CELL { priority, read_q, value }, x)
=
{ ts::disable_thread_switching ();
#
case *value
#
NULL => { value := THE x;
#
case (clean_and_remove read_q)
#
NO_ITEM => ts::reenable_thread_switching ();
#
ITEM (transaction_id, fate)
=>
call_with_current_fate
(
fn my_fate
=
{ ts::enqueue_and_switch_current_thread
(
my_fate,
cancel_transaction_and_return_thread_id transaction_id
);
priority := 1;
resume_fate fate x;
}
);
esac;
};
THE _ =>
{ ts::reenable_thread_switching ();
#
raise exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
};
esac;
};
fun get (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching ();
#
case *value
#
NULL => { msg = call_with_current_fate
(fn fate
=
{ tkq::enqueue (read_q, (make_transaction_id(), fate));
#
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
relay_msg (read_q, msg);
msg;
};
THE v => { ts::reenable_thread_switching ();
v;
};
esac;
};
fun get' (CELL { priority, read_q, value } )
=
itt::BASE_MAILOPS [is_ready]
where
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ msg = call_with_current_fate
(fn fate
=
{ tkq::enqueue
( read_q,
(transaction_id, fate)
);
next ();
raise exception FAIL "maildrop: impossible"; # Execution should never get to here.
}
);
clean_up ();
relay_msg (read_q, msg);
msg;
};
fun is_ready () # Reppy calls this fn pollFn
=
case *value
#
NULL => itt::MAILOP_UNREADY wait_for;
#
THE v => itt::MAILOP_READY
{
priority => bump_priority priority,
#
do_it => .{ priority := 1; # Reppy calls this field doFn
ts::reenable_thread_switching ();
v;
}
};
esac;
end;
fun nonblocking_get (CELL { priority, read_q, value } )
=
{ ts::disable_thread_switching ();
#
case *value
#
THE v => { ts::reenable_thread_switching ();
#
THE v;
};
NULL => NULL;
esac;
};
}; # package maildrop1
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


