


## threadkit-binary-io-g.pkg
# Compiled by:
# src/lib/std/standard.lib# This is the threadkit version of the binary_io generic package
# src/lib/std/src/io/binary-io-g.pkg# We are compiletime invoked by:
# src/lib/std/src/posix/posix-threadkit-binary-io.pkgstipulate
package iox = io_exceptions; # io_exceptions is from src/lib/std/src/io/io-exceptions.pkgherein
generic package threadkit_binary_io_g (
# Threadkit_Winix_Stream_Readers_And_Writers is from src/lib/std/src/io/threadkit-winix-stream-readers-and-writers.api # threadkit_posix_binary_stream_readers_and_writers is from src/lib/std/src/posix/threadkit-posix-binary-stream-readers-and-writers.pkg package threadkit_winix_stream_readers_and_writers
:
Threadkit_Winix_Stream_Readers_And_Writers
where stream_readers_and_writers::Rw_Vector == threadkit_binary_stream_readers_and_writers::Rw_Vector
where stream_readers_and_writers::Vector == threadkit_binary_stream_readers_and_writers::Vector
where stream_readers_and_writers::Rw_Vector_Slice == threadkit_binary_stream_readers_and_writers::Rw_Vector_Slice
where stream_readers_and_writers::Vector_Slice == threadkit_binary_stream_readers_and_writers::Vector_Slice
where stream_readers_and_writers::Element == threadkit_binary_stream_readers_and_writers::Element
where stream_readers_and_writers::File_Position == threadkit_binary_stream_readers_and_writers::File_Position
where stream_readers_and_writers::Stream_Reader == threadkit_binary_stream_readers_and_writers::Stream_Reader
where stream_readers_and_writers::Stream_Writer == threadkit_binary_stream_readers_and_writers::Stream_Writer;
)
: (weak) Threadkit_Binary_Io # Threadkit_Binary_Io is from src/lib/std/src/io/threadkit-binary-io.api {
include threadkit; # threadkit is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg package pio = threadkit_winix_stream_readers_and_writers::stream_readers_and_writers; # threadkit_winix_stream_readers_and_writers is our argument.
package a = rw_vector_of_one_byte_unts; # rw_vector_of_one_byte_unts is from src/lib/std/src/rw-vector-of-one-byte-unts.pkg package rs = rw_vector_slice_of_one_byte_unts; # rw_vector_slice_of_one_byte_unts is from src/lib/std/src/rw-vector-slice-of-one-byte-unts.pkg package v = vector_of_one_byte_unts; # vector_of_one_byte_unts is from src/lib/std/src/vector-of-one-byte-unts.pkg package vs = vector_slice_of_one_byte_unts; # vector_slice_of_one_byte_unts is from src/lib/std/src/vector-slice-of-one-byte-unts.pkg package pos = file_position; # file_position is from src/lib/std/file-position.pkg # Assign to a maildrop:
#
fun update_maildrop (mv, x)
=
{ empty mv;
fill (mv, x);
};
# An element for initializing buffers:
#
some_element = (0u0: one_byte_unt::Unt);
vec_extract = vs::to_vector o vs::make_slice;
vec_get = v::get;
rw_vec_set = a::set;
empty_vec = v::from_list [];
fun dummy_cleaner () = ();
package pure_io {
Vector = v::Vector;
Element = v::Element;
Stream_Reader = pio::Stream_Reader;
Stream_Writer = pio::Stream_Writer;
File_Position = pio::File_Position;
# ** Functional input streams **
#
Input_Stream
=
INPUT_STREAM (Input_Buffer, Int)
also
Input_Buffer
=
INPUT_BUFFER
{
base_position: Null_Or( File_Position ),
more: Maildrop( More ), # When this cell is empty, it means that
# there is an outstanding request to the
# server to extend the stream.
data: Vector,
info: Info
}
also
More
= MORE Input_Buffer # Forward link to additional data.
| NOMORE # Placeholder for forward link.
| TERMINATED # Termination of the stream.
also
Info = INFO { reader: Stream_Reader,
read_vector: Int -> Vector,
read_vec_mailop: Int -> threadkit::Mailop( Vector ),
is_closed: Ref( Bool ),
get_position: Void -> Null_Or( File_Position ),
tail: Maildrop( Maildrop( More ) ), # Points to the more cell of the last buffer.
clean_tag: threadkit_io_cleanup_at_shutdown::Tag
};
fun info_of_ibuf (INPUT_BUFFER { info, ... } )
=
info;
fun chunk_size_of_ibuf buf
=
{ (info_of_ibuf buf)
->
INFO { reader => pio::STREAM_READER { chunk_size, ... }, ... };
chunk_size;
};
fun read_vector (INPUT_BUFFER { info=>INFO { read_vector=>f, ... }, ... } )
=
f;
fun input_exn (INFO { reader => pio::STREAM_READER { name, ... }, ... }, ml_op, cause)
=
raise exception iox::IO { function => ml_op, name, cause };
More_Data = EOF | DATA Input_Buffer;
# Extend the stream by a chunk.
# Invariant: the more m-variable is empty on entry and full on exit.
#
fun extend_stream (read_g, ml_op, buf as INPUT_BUFFER { more, info, ... } )
=
{ info -> INFO { get_position, tail, ... };
base_position = get_position ();
chunk = read_g (chunk_size_of_ibuf buf);
if (v::length chunk == 0)
fill (more, NOMORE);
EOF;
else
new_more = make_empty_maildrop ();
buf' = INPUT_BUFFER {
base_position,
data => chunk,
more => new_more,
info
};
# Note that we do not fill the newMore cell until
# after the tail has been updated. This ensures
# that someone attempting to access the tail will
# not acquire the lock until after we are done.
#
update_maildrop (tail, new_more);
fill (more, MORE buf'); # releases lock!!
fill (new_more, NOMORE);
DATA buf';
fi;
}
except
ex = { fill (more, NOMORE);
input_exn (info, ml_op, ex);
};
# Get the next buffer in the stream,
# extending it if necessary.
# If the stream must be extended,
# we lock it by taking the value from the
# more cell; the extend_stream function
# is responsible for filling in the cell.
#
fun get_buffer (read_g, ml_op) (buf as INPUT_BUFFER { more, info, ... } )
=
get (peek more)
where
fun get TERMINATED => EOF;
get (MORE buf') => DATA buf';
get NOMORE
=>
case (empty more)
NOMORE => extend_stream (read_g, ml_op, buf);
next => { fill (more, next);
get next;
};
esac;
end;
end;
# Read a chunk that is at least the specified size
#
fun read_chunk buf
=
{ (info_of_ibuf buf)
->
INFO { read_vector, reader => pio::STREAM_READER { chunk_size, ... }, ... };
case (chunk_size - 1)
#
0 => (fn n = read_vector n);
k => (fn n = read_vector (int::quot (n+k, chunk_size) * chunk_size));
#
# Round up to next multiple of chunk_size
esac;
};
fun generalized_input get_buf
=
get
where
fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
=
{ len = v::length data;
#
if (pos < len)
#
(vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
else
case (get_buf buf)
#
EOF => (empty_vec, INPUT_STREAM (buf, len));
DATA rest => get (INPUT_STREAM (rest, 0));
esac;
fi;
};
end;
# Terminate an input stream
#
fun terminate (info as INFO { tail, clean_tag, ... } )
=
{ m = peek tail;
#
case (empty m)
#
(m' as MORE _)
=>
{ fill (m, m');
terminate info;
};
TERMINATED
=>
fill (m, TERMINATED);
_ => { threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
fill (m, TERMINATED);
};
esac;
};
# Find the end of the stream
#
fun find_eos (buf as INPUT_BUFFER { more, data, ... } )
=
case (peek more)
#
MORE buf => find_eos buf;
_ => INPUT_STREAM (buf, v::length data);
esac;
fun read (stream as INPUT_STREAM (buf, _))
=
generalized_input
(get_buffer (read_vector buf, "read"))
stream;
fun read_one (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, more, ... };
if (pos < v::length data)
THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
else
get (peek more)
where
fun get (MORE buf) => read_one (INPUT_STREAM (buf, 0));
get TERMINATED => NULL;
get NOMORE
=>
case (empty more)
NOMORE
=>
case (extend_stream (read_vector buf, "read_one", buf))
DATA rest => read_one (INPUT_STREAM (rest, 0));
EOF => NULL;
esac;
next =>
{ fill (more, next);
get next;
};
esac;
end;
end;
fi;
};
fun read_n (INPUT_STREAM (buf, pos), n)
=
{ fun join (item, (list, stream))
=
(item ! list, stream);
fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
=
{ len = v::length data;
remain = len-i;
if (remain >= n)
([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
else
join ( vec_extract (data, i, NULL),
next_buf (buf, n-remain)
);
fi;
}
also
fun next_buf (buf as INPUT_BUFFER { more, data, ... }, n)
=
get (peek more)
where
fun get (MORE buf)
=>
input_list (buf, 0, n);
get TERMINATED
=>
([], INPUT_STREAM (buf, v::length data));
get NOMORE
=>
case (empty more)
NOMORE => (case (extend_stream (read_vector buf, "read_n", buf))
EOF => ([], INPUT_STREAM (buf, v::length data));
(DATA rest) => input_list (rest, 0, n); esac
); # end case
next => { fill (more, next);
get next;
};
esac;
end;
end;
my (data, stream)
=
input_list (buf, pos, n);
(v::cat data, stream);
};
fun read_all (stream as INPUT_STREAM (buf, _))
=
{ (info_of_ibuf buf)
->
INFO { reader => pio::STREAM_READER { avail, ... }, ... };
# Read a chunk that is as large as the available input.
# Note that for systems that use CR-LF for '\n', the
# size will be too large, but this should be okay.
#
fun big_chunk _
=
read_chunk buf delta
where
delta = case (avail())
#
NULL => chunk_size_of_ibuf buf;
THE n => n;
esac;
end;
big_input
=
generalized_input
(get_buffer (big_chunk, "read_all"));
fun loop (v, stream)
=
if (v::length v == 0) [];
else v ! loop (big_input stream);
fi;
data = v::cat (loop (big_input stream));
(data, find_eos buf);
};
fun input1evt _ = raise exception FAIL "input1Evt unimplemented";
fun input_mailop _ = raise exception FAIL "inputEvt unimplemented";
fun input_nevt _ = raise exception FAIL "inputNEvt unimplemented";
fun input_all_mailop _ = raise exception FAIL "inputAllEvt unimplemented";
# Return THE k, if k <= amount
# characters can be read without blocking.
#
fun can_read (stream as INPUT_STREAM (buf, pos), amount)
=
if (amount < 0)
raise exception SIZE;
else
try_input (buf, pos, amount)
where
/******
readVecNB = (case buf
of (INPUT_BUFFER { info as INFO { readVecNB=NULL, ... }, ... } ) =>
inputExn (info, "can_read", iox::NONBLOCKING_IO_NOT_SUPPORTED)
| (INPUT_BUFFER { info=INFO { readVecNB=THE f, ... }, ... } ) => f
) # end case
******/
fun try_input (buf as INPUT_BUFFER { data, ... }, i, n)
=
{ len = v::length data;
remain = len - i;
remain >= n ?? THE n
:: next_buf (buf, n - remain);
}
also
fun next_buf (INPUT_BUFFER { more, ... }, n)
=
get (peek more)
where
fun get (MORE buf) => try_input (buf, 0, n);
get TERMINATED => THE (amount - n);
get NOMORE => THE (amount - n);
end;
/******
| get NOMORE = (case md::mTake more
of NOMORE => ((
case extendStream (readVecNB, "can_read", buf)
of EOF => THE (amount - n)
| (DATA b) => tryInput (b, 0, n)
) # end case
except iox::IO { cause=WOULD_BLOCK, ... } => THE (amount - n))
| next => (md::mPut (more, next); get next)
) # end case
******/
end;
end;
fi;
# Close an input stream given its info package.
# We need this function for the cleanup hook
# to avoid a space leak.
#
fun close_in_info (INFO { is_closed=>REF TRUE, ... } )
=>
();
close_in_info (info as INFO { is_closed, reader => pio::STREAM_READER { close, ... }, ... } )
=>
{
# ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
terminate info;
is_closed := TRUE;
close()
except
ex = input_exn (info, "close_input", ex);
};
end;
fun close_input (INPUT_STREAM (buf, _))
=
close_in_info (info_of_ibuf buf);
fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { more, ... }, pos))
=
case (empty more)
next as MORE _
=>
{ fill (more, next);
FALSE;
};
next
=>
{ buf -> INPUT_BUFFER { data, info=>INFO { is_closed, ... }, ... };
if (pos == v::length data)
#
case (next, *is_closed)
#
(NOMORE, FALSE)
=>
case (extend_stream (read_vector buf, "end_of_stream", buf))
#
EOF => TRUE;
_ => FALSE;
esac;
_ =>
{ fill (more, next);
TRUE;
};
esac;
else
fill (more, next);
FALSE;
fi;
};
esac;
fun make_instream (reader, data)
=
{ reader -> pio::STREAM_READER { read_vector, read_vec_mailop, get_position, set_position, ... };
#
get_position
=
case (get_position, set_position)
#
(THE f, THE _)
=>
(fn () = THE (f ()));
_ =>
(fn () = NULL);
esac;
more = make_full_maildrop NOMORE;
tag = threadkit_io_cleanup_at_shutdown::add_cleaner dummy_cleaner;
info = INFO { reader,
read_vector,
read_vec_mailop,
get_position,
#
is_closed => REF FALSE,
tail => make_full_maildrop more,
clean_tag => tag
};
# * What should we do about the position in this case ?? *
# Suggestion: When building a stream with supplied initial data,
# nothing can be said about the positions inside that initial
# data (who knows where that data even came from!).
base_position
=
if (v::length data == 0) get_position ();
else NULL;
fi;
buf = INPUT_BUFFER {
base_position, data,
info, more
};
stream = INPUT_STREAM (buf, 0);
threadkit_io_cleanup_at_shutdown::rebind_cleaner
( tag,
fn () = close_in_info info
);
stream;
};
fun get_reader (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, info as INFO { reader, ... }, more, ... };
#
fun get_data more
=
case (peek more)
#
MORE (INPUT_BUFFER { data, more=>more', ... } )
=>
data ! get_data more';
_ => [];
esac;
terminate info;
if (pos < v::length data)
( reader,
v::cat (vec_extract (data, pos, NULL) ! get_data more)
);
else
( reader,
v::cat (get_data more)
);
fi;
};
/*
# * Position operations on instreams *
enum in_pos = INP of {
base: pos,
offset: Int,
info: info
}
*/
/*
fun getPosIn (INPUT_STREAM (buf, pos)) = (case buf
of INPUT_BUFFER { basePos=NULL, info, ... } =>
inputExn (info, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
| INPUT_BUFFER { basePos=THE p, info, ... } => INP {
base = p, offset = pos, info = info
}
) # end case
*/
/*
fun filePosIn (INP { base, offset, ... } ) =
position.+(base, file_position::from_int offset)
*/
fun file_position_in (INPUT_STREAM (buf, pos))
=
case buf
INPUT_BUFFER { base_position=>NULL, info, ... }
=>
input_exn (info, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
INPUT_BUFFER { base_position=>THE b, ... }
=>
file_position::(+) (b, file_position::from_int pos);
esac;
/*
fun setPosIn (pos as INP { info as INFO { reader, ... }, ... } ) = let
fpos = filePosIn pos
my (PIO::READER rd) = reader
in
terminate info;
the rd.setPos fpos;
make_instream (PIO::READER rd, empty_vec)
end
*/
# IO mailop constructors:
# We exploit the "functional" nature of stream IO
# to implement the mailop constructors. These
# constructors spawn a thread to do the operation
# and and write the result in an iVariable that
# serves as the synchronization value.
#
# NOTE: This implementation has the weakness that
# it prevents shutdown when everything else is
# deadlocked, since the thread that is spawned
# to actually do the IO could proceed.
stipulate
Result X = RES X | EXCEPTION Exception;
fun do_input input_op
=
{ fun read arg
=
RES (input_op arg)
except
ex = EXCEPTION ex;
fn arg
=
guard
.{ reply_1shot = make_oneshot_maildrop ();
make_thread "binary I/O" .{
set (reply_1shot, read arg);
};
get' reply_1shot
==>
fn (RES x) => x;
(EXCEPTION ex) => raise exception ex;
end;
};
};
herein
input1evt = do_input read_one;
input_mailop = do_input read;
input_nevt = do_input read_n;
input_all_mailop = do_input read_all;
end; # stipulate
# ** Output streams **
# An output stream is implemented
# as a monitor using an mvar to
# hold its data.
Ostrm_Info
=
OSTRM_INFO
{
buffer: a::Rw_Vector,
first_free_byte_in_buffer: Ref( Int ),
is_closed: Ref( Bool ),
buffering_mode: Ref( iox::Buffering_Mode ),
writer: Stream_Writer,
write_rw_vector: rs::Slice -> Void,
write_vector: vs::Slice -> Void,
clean_tag: threadkit_io_cleanup_at_shutdown::Tag
};
Output_Stream
=
Maildrop( Ostrm_Info );
fun raise_io_exception (OSTRM_INFO { writer => pio::STREAM_WRITER { name, ... }, ... }, ml_op, cause)
=
raise exception iox::IO { function => ml_op, name, cause };
# Lock access to the stream and
# make sure that it is not closed.
#
fun lock_and_check_closed_out (strm_mv, ml_op)
=
case (empty strm_mv)
#
(stream as OSTRM_INFO( { is_closed => REF TRUE, ... } ))
=>
{ fill (strm_mv, stream);
#
raise_io_exception (stream, ml_op, iox::CLOSED_IO_STREAM);
};
stream => stream;
esac;
fun flush_buffer (strm_mv, stream as OSTRM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
=
case *first_free_byte_in_buffer
#
0 => ();
n => { write_rw_vector (rs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
}
except
ex = { fill (strm_mv, stream);
#
raise_io_exception (stream, ml_op, ex);
};
esac;
fun write (strm_mv, v)
=
{ my (stream as OSTRM_INFO os)
=
lock_and_check_closed_out (strm_mv, "write");
fun release ()
=
fill (strm_mv, stream);
os -> { buffer, first_free_byte_in_buffer, buffering_mode, ... };
fun flush ()
=
flush_buffer (strm_mv, stream, "write");
fun flush_all ()
=
os.write_rw_vector (rs::make_full_slice buffer)
except
ex = { release();
#
raise_io_exception (stream, "write", ex);
};
fun write_direct ()
=
{ case *first_free_byte_in_buffer
#
0 => ();
#
n => { os.write_rw_vector (rs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
};
esac;
os.write_vector (vs::make_full_slice v);
}
except
ex = { release();
#
raise_io_exception (stream, "write", ex);
};
fun insert copy_vec
=
{ buf_len = a::length buffer;
data_len = v::length v;
if (data_len >= buf_len)
#
write_direct ();
else
i = *first_free_byte_in_buffer;
avail = buf_len - i;
if (avail < data_len)
copy_vec (v, 0, avail, buffer, i);
flush_all ();
copy_vec (v, avail, data_len-avail, buffer, 0);
first_free_byte_in_buffer := data_len-avail;
else
copy_vec (v, 0, data_len, buffer, i);
first_free_byte_in_buffer := i + data_len;
if (avail == data_len) flush (); fi;
fi;
fi;
};
case *buffering_mode
#
iox::NO_BUFFERING
=>
write_direct ();
_ =>
insert copy_vec
where
fun copy_vec (from, from_i, from_len, to, to_i)
=
rs::copy_vec
{ from => vs::make_slice (from, from_i, THE from_len),
to,
di => to_i
};
end;
esac;
release ();
};
fun write_one (strm_mv, element)
=
release ()
where
my (stream as OSTRM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } )
=
lock_and_check_closed_out (strm_mv, "write_one");
fun release ()
=
fill (strm_mv, stream);
case *buffering_mode
#
iox::NO_BUFFERING
=>
{ rw_vec_set (buffer, 0, element);
write_rw_vector (rs::make_slice (buffer, 0, THE 1))
except
ex = { release();
raise_io_exception (stream, "write_one", ex);
};
};
_ =>
{ i = *first_free_byte_in_buffer;
#
i' = i+1;
rw_vec_set (buffer, i, element);
first_free_byte_in_buffer := i';
if (i' == a::length buffer)
#
flush_buffer (strm_mv, stream, "write_one");
fi;
};
esac;
end;
fun flush strm_mv
=
{ stream = lock_and_check_closed_out (strm_mv, "flush");
#
flush_buffer (strm_mv, stream, "flush");
#
fill (strm_mv, stream);
};
fun close_output strm_mv
=
{ (empty strm_mv)
->
(stream as OSTRM_INFO { writer => pio::STREAM_WRITER { close, ... }, is_closed, clean_tag, ... } );
if (not *is_closed)
#
flush_buffer (strm_mv, stream, "close");
is_closed := TRUE;
threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
close();
fi;
fill (strm_mv, stream);
};
fun make_outstream (wr as pio::STREAM_WRITER { chunk_size, write_rw_vector, write_vector, ... }, mode)
=
stream
where
fun iterate (f, size, subslice)
=
lp
where
fun lp sl
=
if (size sl != 0)
#
n = f sl;
lp (subslice (sl, n, NULL));
fi;
end;
write_rw_vector' = iterate (write_rw_vector, rs::length, rs::make_subslice);
write_vector' = iterate (write_vector, vs::length, vs::make_subslice);
# Install a dummy cleaner:
#
tag = threadkit_io_cleanup_at_shutdown::add_cleaner dummy_cleaner;
stream
=
make_full_maildrop (
OSTRM_INFO
{
buffer => a::make_rw_vector (chunk_size, some_element),
first_free_byte_in_buffer => REF 0,
is_closed => REF FALSE,
buffering_mode => REF mode,
writer => wr,
write_rw_vector => write_rw_vector',
write_vector => write_vector',
clean_tag => tag
}
);
threadkit_io_cleanup_at_shutdown::rebind_cleaner
(tag, fn () = close_output stream);
end;
fun get_writer strm_mv
=
{ my (stream as OSTRM_INFO { writer, buffering_mode, ... } )
=
lock_and_check_closed_out (strm_mv, "getWriter");
(writer, *buffering_mode)
before
fill (strm_mv, stream);
};
# Position operations on outstreams:
#
Out_Position
=
OUT_POSITION
{
pos: pio::File_Position,
stream: Output_Stream
};
fun get_output_position strm_mv
=
{ my (stream as OSTRM_INFO { writer, ... } )
=
lock_and_check_closed_out (strm_mv, "getWriter");
fun release ()
=
fill (strm_mv, stream);
flush_buffer (strm_mv, stream, "get_output_position");
case writer
#
pio::STREAM_WRITER { get_position => THE f, ... }
=>
OUT_POSITION { pos => f(), stream => strm_mv }
except
ex = { release();
raise_io_exception (stream, "get_output_position", ex);
};
_ => { release();
raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
}
before
release ();
esac;
};
fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
=
{ fill (strm_mv, lock_and_check_closed_out (strm_mv, "file_pos_out"));
#
pos;
};
fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
=
{ my (stream as OSTRM_INFO { writer, ... } )
=
lock_and_check_closed_out (strm_mv, "set_output_position");
fun release ()
=
fill (strm_mv, stream);
case writer
#
pio::STREAM_WRITER { set_position=>THE f, ... }
=>
f pos
except
ex = { release ();
raise_io_exception (stream, "set_output_position", ex);
};
_ =>
{ release ();
raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
};
esac;
release ();
};
fun set_buffering_mode (strm_mv, mode)
=
{ (lock_and_check_closed_out (strm_mv, "setBufferMode"))
->
(stream as OSTRM_INFO { buffering_mode, ... } );
if (mode == iox::NO_BUFFERING)
#
flush_buffer (strm_mv, stream, "setBufferMode");
fi;
buffering_mode := mode;
fill (strm_mv, stream);
};
fun get_buffering_mode strm_mv
=
{ # Should we be checking for closed streams here??? XXX BUGGO FIXME
#
(lock_and_check_closed_out (strm_mv, "getBufferMode"))
->
(stream as OSTRM_INFO { buffering_mode, ... } );
*buffering_mode
before
fill (strm_mv, stream);
};
}; # package pure_io
Vector = v::Vector;
Element = v::Element;
Input_Stream = Maildrop( pure_io::Input_Stream );
Output_Stream = Maildrop( pure_io::Output_Stream );
# Input operations:
fun read stream
=
{ my (v, stream')
=
pure_io::read (empty stream);
fill (stream, stream');
v;
};
fun read_one stream
=
case (pure_io::read_one (empty stream))
THE (element, stream')
=>
{ fill (stream, stream');
THE element;
};
NULL => NULL;
esac;
fun read_n (stream, n)
=
{ my (v, stream')
=
pure_io::read_n (empty stream, n);
fill (stream, stream');
v;
};
fun read_all (stream: Input_Stream)
=
{ my (v, stream')
=
pure_io::read_all (empty stream);
fill (stream, stream'); v;
};
fun input1evt _ = raise exception FAIL "input1evt unimplemented";
fun input_mailop _ = raise exception FAIL "input_mailop unimplemented";
fun input_nevt _ = raise exception FAIL "input_nevt unimplemented";
fun input_all_mailop _ = raise exception FAIL "input_ell_mailop unimplemented";
fun can_read (stream, n)
=
pure_io::can_read (peek stream, n);
fun lookahead (stream: Input_Stream)
=
case (pure_io::read_one (peek stream))
THE (element, _) => THE element;
NULL => NULL;
esac;
fun close_input stream
=
{ my (s as pure_io::INPUT_STREAM (buf as pure_io::INPUT_BUFFER { data, ... }, _))
=
empty stream;
pure_io::close_input s;
fill (stream, pure_io::find_eos buf);
};
fun end_of_stream stream
=
pure_io::end_of_stream (peek stream);
/*
fun getPosIn stream = pure_io::getPosIn (mGet stream)
fun setPosIn (stream, p) = mUpdate (stream, pure_io::setPosIn p)
*/
# Output operations:
fun write (stream, v) = pure_io::write (peek stream, v);
fun write_one (stream, c) = pure_io::write_one (peek stream, c);
fun flush stream = pure_io::flush (peek stream);
fun close_output stream = pure_io::close_output (peek stream);
fun get_output_position stream = pure_io::get_output_position (peek stream);
fun set_output_position (stream, p as pure_io::OUT_POSITION { stream=>stream', ... } )
=
{ update_maildrop (stream, stream');
pure_io::set_output_position p;
};
fun make_instream (stream: pure_io::Input_Stream) = make_full_maildrop stream;
fun get_instream (stream: Input_Stream) = peek stream;
fun set_instream (stream: Input_Stream, stream') = update_maildrop (stream, stream');
fun make_outstream (stream: pure_io::Output_Stream) = make_full_maildrop stream;
fun get_outstream (stream: Output_Stream) = peek stream;
fun set_outstream (stream: Output_Stream, stream') = update_maildrop (stream, stream');
# Open files
fun open_for_read fname
=
make_instream (pure_io::make_instream (threadkit_winix_stream_readers_and_writers::open_for_read fname, empty_vec))
except
ex = raise exception iox::IO { function=>"open_for_read", name=>fname, cause=>ex };
fun open_for_write fname
=
make_outstream (pure_io::make_outstream (threadkit_winix_stream_readers_and_writers::open_for_write fname, iox::BLOCK_BUFFERING))
except
ex = raise exception iox::IO { function=>"open", name=>fname, cause=>ex };
fun open_for_append fname
=
make_outstream (pure_io::make_outstream (threadkit_winix_stream_readers_and_writers::open_for_append fname, iox::NO_BUFFERING))
except
ex = raise exception iox::IO { function=>"open_for_append", name=>fname, cause=>ex };
}; # binary_io_g
end;
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2011,
## released under Gnu Public Licence version 3.


