Module Rpc.Connection

include module type of struct include Async_rpc_kernel.Rpc.Connection end
type t
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
module Heartbeat_config : sig ... end
module Client_implementations : sig ... end
val create : ?⁠implementations:'s Async_rpc_kernel__Implementations.t -> connection_state:(t -> 's) -> ?⁠handshake_timeout:Core_kernel.Time_ns.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> ?⁠description:Core_kernel.Info.t -> ?⁠time_source:Async_kernel.Synchronous_time_source.t -> Async_rpc_kernel__Transport.t -> (tCore_kernel.Exn.t) Core_kernel.Result.t Async_kernel.Deferred.t

Initiate an Rpc connection on the given transport. implementations should be the bag of implementations that the calling side implements; it defaults to Implementations.null (i.e., "I implement no RPCs").

connection_state will be called once, before create's result is determined, on the same connection that create returns. Its output will be provided to the implementations when queries arrive.

WARNING: If specifying a custom heartbeat_config, make sure that both ends of the Rpc connection use compatible settings for timeout and send frequency. Otherwise, your Rpc connections might close unexpectedly.

description can be used to give some extra information about the connection, which will then show up in error messages and the connection's sexp. If you have lots of connections in your program, this can be useful for distinguishing them.

time_source can be given to define the time_source for which the heartbeating events will be scheduled. Defaults to wall-clock.

val contains_magic_prefix : bool Bin_prot.Type_class.reader

As of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by contains_magic_prefix says whether this magic number was observed.

val description : t -> Core_kernel.Info.t
val add_heartbeat_callback : t -> (unit -> unit) -> unit

After add_heartbeat_callback t f, f () will be called on every subsequent heartbeat to t.

val close : ?⁠streaming_responses_flush_timeout:Core_kernel.Time_ns.Span.t -> ?⁠reason:Core_kernel.Info.t -> t -> unit Async_kernel.Deferred.t

close starts closing the connection's transport, and returns a deferred that becomes determined when its close completes. It is ok to call close multiple times on the same t; calls subsequent to the initial call will have no effect, but will return the same deferred as the original call.

Before closing the underlying transport's writer, close waits for all streaming reponses to be Pipe.upstream_flushed with a timeout of streaming_responses_flush_timeout.

The reason for closing the connection will be passed to callers of close_reason.

val close_finished : t -> unit Async_kernel.Deferred.t

close_finished becomes determined after the close of the connection's transport completes, i.e. the same deferred that close returns. close_finished differs from close in that it does not have the side effect of initiating a close.

val close_reason : t -> on_close:[ `started | `finished ] -> Core_kernel.Info.t Async_kernel.Deferred.t

close_reason ~on_close t becomes determined when close starts or finishes based on on_close, but additionally returns the reason that the connection was closed.

val is_closed : t -> bool

is_closed t returns true iff close t has been called. close may be called internally upon errors or timeouts.

val bytes_to_write : t -> int

bytes_to_write and flushed just call the similarly named functions on the Transport.Writer.t within a connection.

val flushed : t -> unit Async_kernel.Deferred.t
val with_close : ?⁠implementations:'s Async_rpc_kernel__Implementations.t -> ?⁠handshake_timeout:Core_kernel.Time_ns.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_rpc_kernel__Transport.t -> dispatch_queries:(t -> 'a Async_kernel.Deferred.t) -> on_handshake_error:[ `Raise | `Call of Core_kernel.Exn.t -> 'a Async_kernel.Deferred.t ] -> 'a Async_kernel.Deferred.t

with_close tries to create a t using the given transport. If a handshake error is the result, it calls on_handshake_error, for which the default behavior is to raise an exception. If no error results, dispatch_queries is called on t.

After dispatch_queries returns, if server is None, the t will be closed and the deferred returned by dispatch_queries wil be determined immediately. Otherwise, we'll wait until the other side closes the connection and then close t and determine the deferred returned by dispatch_queries.

When the deferred returned by with_close becomes determined, Transport.close has finished.

NOTE: Because this connection is closed when the Deferred.t returned by dispatch_queries is determined, you should be careful when using this with Pipe_rpc. For example, simply returning the pipe when you get it will close the pipe immediately. You should instead either use the pipe inside dispatch_queries and not determine its result until you are done with the pipe, or use a different function like create.

val server_with_close : ?⁠handshake_timeout:Core_kernel.Time_ns.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> Async_rpc_kernel__Transport.t -> implementations:'s Async_rpc_kernel__Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[ `Raise | `Ignore | `Call of Core_kernel.Exn.t -> unit Async_kernel.Deferred.t ] -> unit Async_kernel.Deferred.t

Runs with_close but dispatches no queries. The implementations are required because this function doesn't let you dispatch any queries (i.e., act as a client), it would be pointless to call it if you didn't want to act as a server.

val create : ?⁠implementations:'s Implementations.t -> connection_state:(t -> 's) -> ?⁠max_message_size:int -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> ?⁠description:Core.Info.t -> Async_unix.Reader.t -> Async_unix.Writer.t -> (tCore.Exn.t) Core.Result.t Async_kernel.Deferred.t

These functions are mostly the same as the ones with the same names in Async_rpc_kernel.Rpc.Connection; see Connection_intf in that library for documentation. The differences are that:

  • they take an Async_unix.Reader.t, Async_unix.Writer.t and max_message_size instead of a Transport.t
  • they use Time instead of Time_ns
val contains_magic_prefix : Async_unix.Reader.t -> bool Async_kernel.Deferred.t

As of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by contains_magic_prefix says whether this magic number was observed.

This operation is a "peek" that does not advance any pointers associated with the reader. In particular, it makes sense to call create on a reader after calling this function.

val with_close : ?⁠implementations:'s Implementations.t -> ?⁠max_message_size:int -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> connection_state:(t -> 's) -> Async_unix.Reader.t -> Async_unix.Writer.t -> dispatch_queries:(t -> 'a Async_kernel.Deferred.t) -> on_handshake_error:[ `Raise | `Call of Core.Exn.t -> 'a Async_kernel.Deferred.t ] -> 'a Async_kernel.Deferred.t
val server_with_close : ?⁠max_message_size:int -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> Async_unix.Reader.t -> Async_unix.Writer.t -> implementations:'s Implementations.t -> connection_state:(t -> 's) -> on_handshake_error:[ `Raise | `Ignore | `Call of Core.Exn.t -> unit Async_kernel.Deferred.t ] -> unit Async_kernel.Deferred.t
type transport_maker = Async_unix.Fd.t -> max_message_size:int -> Transport.t

A function creating a transport from a file descriptor. It is responsible for setting the low-level parameters of the underlying transport.

For instance to set up a transport using Async.{Reader,Writer} and set a buffer age limit on the writer, you can pass this to the functions of this module:

~make_transport:(fun fd ~max_message_size ->
  Rpc.Transport.of_fd fd ~max_message_size ~buffer_age_limit:`Unlimited)
type on_handshake_error = [
| `Raise
| `Ignore
| `Call of Core.Exn.t -> unit
]
val serve : implementations:'s Implementations.t -> initial_connection_state:('address -> t -> 's) -> where_to_listen:('address'listening_on) Async_unix.Tcp.Where_to_listen.t -> ?⁠max_connections:int -> ?⁠backlog:int -> ?⁠max_message_size:int -> ?⁠make_transport:transport_maker -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> ?⁠auth:('address -> bool) -> ?⁠on_handshake_error:on_handshake_error -> ?⁠on_handler_error:[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] -> unit -> ('address'listening_on) Async_unix.Tcp.Server.t Async_kernel.Deferred.t

serve implementations ~port ?on_handshake_error () starts a server with the given implementation on port. The optional auth function will be called on all incoming connections with the address info of the client and will disconnect the client immediately if it returns false. This auth mechanism is generic and does nothing other than disconnect the client -- any logging or record of the reasons is the responsibility of the auth function itself.

val serve_with_transport : handshake_timeout:Core.Time.Span.t option -> heartbeat_config:Heartbeat_config.t option -> implementations:'s Implementations.t -> description:Core.Info.t -> connection_state:(t -> 's) -> on_handshake_error:on_handshake_error -> Transport.t -> unit Async_kernel.Deferred.t
val client : ?⁠implementations:_ Client_implementations.t -> ?⁠max_message_size:int -> ?⁠make_transport:transport_maker -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> ?⁠description:Core.Info.t -> _ Async_unix.Tcp.Where_to_connect.t -> (tCore.Exn.t) Core.Result.t Async_kernel.Deferred.t

client where_to_connect () connects to the server at where_to_connect and returns the connection or an Error if a connection could not be made. It is the responsibility of the caller to eventually call close.

In client and with_client, the handshake_timeout encompasses both the TCP connection timeout and the timeout for this module's own handshake.

val client' : ?⁠implementations:_ Client_implementations.t -> ?⁠max_message_size:int -> ?⁠make_transport:transport_maker -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> ?⁠description:Core.Info.t -> 'transport Async_unix.Tcp.Where_to_connect.t -> ('transport * tCore.Exn.t) Core.Result.t Async_kernel.Deferred.t

Similar to client, but additionally expose the Socket.Address.t of the RPC server that we connected to.

val with_client : ?⁠implementations:_ Client_implementations.t -> ?⁠max_message_size:int -> ?⁠make_transport:transport_maker -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> _ Async_unix.Tcp.Where_to_connect.t -> (t -> 'a Async_kernel.Deferred.t) -> ('aCore.Exn.t) Core.Result.t Async_kernel.Deferred.t

with_client where_to_connect f connects to the server at where_to_connect and runs f until an exception is thrown or until the returned Deferred is fulfilled.

NOTE: As with with_close, you should be careful when using this with Pipe_rpc. See with_close for more information.

val with_client' : ?⁠implementations:_ Client_implementations.t -> ?⁠max_message_size:int -> ?⁠make_transport:transport_maker -> ?⁠handshake_timeout:Core.Time.Span.t -> ?⁠heartbeat_config:Heartbeat_config.t -> 'transport Async_unix.Tcp.Where_to_connect.t -> (remote_server:'transport -> t -> 'a Async_kernel.Deferred.t) -> ('aCore.Exn.t) Core.Result.t Async_kernel.Deferred.t

Similar to with_client, but additionally expose the Socket.Address.t of the RPC server that we connected to.