includesource "tcp.ac" includesource "local_channel.ac" (* ******************************************************************* *) (* ** ** *) (* ** Distributed_channel ** *) (* ** ** *) (* ******************************************************************* *) (* Distributed_channel provides simple typed asynchronous distributed channels, above Tcp_string_messaging and Local_channel. Function init : Tcp.ip option * Tcp.port option -> unit initialises a Tcp_string_messaging daemon with the specified port and IP address. Function send : forall t. string -> (Tcp.addr * t name) -> t -> unit sends a message (marshalled wrt the mark specified) to the specified channel at the specified TCP address, returning immediately. It does a case split depending on whether the target is local or not, for efficiency. Function recv : forall t. t name -> (t -> unit) -> unit registers a receiver on the specified name, returning immediately. Function local_addr : unit -> Tcp.ip option * Tcp.port option returns the registered local address. As soon as there is both a message and a receiver for a name the receiver is applied to the message. The receiver is then removed. These are _non-mobile_ distributed channels: the receivers cannot be moved from one Tcp.addr to another. See npi.ac for a mobile extension. Similarly to Local_channel, this is a hash! module. The module state consists of an ho field, recording the Tcp_string_messaging handle in use. To allow client code to determine the local TCP address this is set by the init function (it is stored as an option reference and can be set at most once). Use of the hash! mode gives the module an exact-hash version. Use of module state (rather than explicitly-passed handles) ensures the right semantics when marshalling client code. Internally, the wire format consists of marshalled values of type exists t'.t' name * t' marshalled with respect to whatever mark is supplied to the send function. This mark should usually be at or below the mark "DChan" just below the module, so that the Distributed_channel code itself is not marshalled. *) (* NB: fields marked by (*A*) will be removed from the interface *) (* when width subsignaturing is added *) module hash! Distributed_channel : sig type tf (*A*) type tho (*A*) val f : tf (*A*) val ho : tho (*A*) val init : Tcp.ip option * Tcp.port option -> unit val send : forall t. string -> (Tcp.addr * t name) -> t -> unit val recv : forall t. t name -> (t -> unit) -> unit val local_addr : unit -> Tcp.ip option * Tcp.port end = struct type tf = (Tcp.ip option * Tcp.port) -> Tcp.addr -> string -> unit type tho = Tcp_string_messaging.handle option ref let f ipop_local addr_remote data = let {t,x} = unmarshal data as exists t'. t' name * t' in let (c,v) = x in Pervasives.prerr_endline("Got v: " ^ (marshal "StdLib" (v) : t)); Local_channel.send %[t] c v let ho = ref None let init (ipo,po) = match !ho with Some _ -> raise (Failure "Distributed_channel already initialised") | None -> ho := Some (Tcp_string_messaging.daemon (ipo,po) f) let send = Function t -> fun mk -> fun (addr,(c: t name)) (v: t) -> let h = Utils.the %[] !ho in let (ip, port) = addr in if (Some ip, port) = Tcp_string_messaging.local_addr h then Local_channel.send %[t] c v else (Pervasives.prerr_endline("marshalling"); let data = marshal mk ({t, (c,v)} as exists t'.t' name * t') : exists t'.t' name * t' in ( Pervasives.prerr_endline("sending " ^ data); Tcp_string_messaging.send h addr data)) let recv = Function t -> fun (c: t name) (f: t -> unit) -> Local_channel.recv %[t] c f let local_addr () = Tcp_string_messaging.local_addr (Utils.the %[] !ho) end mark "DChan" (* TODO: Extend with replicated input and with blocking receive *) (* Note that with this code the local-send optimisation will only be effective if the local daemon IP was set explicitly, not wildcarded. To deal properly with hosts with multiple interfaces one should check against getifaddrs. *)