Streams

Streams are used to send data in ERDOS applications.

ERDOS streams are similar to ROS topics, but have a few additional desirable properties. Streams facilitate one-to-many communication, so only 1 operator sends messages on a stream. ERDOS broadcasts messages sent on a stream to all connected operators. In addition, streams are typed when using the Rust API.

Streams expose 3 classes of interfaces:

  1. Read-interfaces expose methods to receive and process data. They allow pulling data by calling read() and try_read(). Structures that implement read interfaces include:

  • ReadStream: used by operators to read data and register callbacks.

  • ExtractStream: used by the driver to read data.

  1. Write-interfaces expose the send method to send data on a stream. Structures that implement write interfaces include:

  1. Abstract interfaces used to connect operators and construct a dataflow graph. Structures that implement the abstract :py:class:.Stream interface include:

    • OperatorStream: representing a stream on which an operator sends messages.

    • IngestStream: used to send messages to operators from the driver.

    • LoopStream: used to create loops in the dataflow graph.

Some applications may want to introduce loops in their dataflow graphs which is possible using the LoopStream.

Sending Messages

Operators use Write Streams to send data.

class erdos.WriteStream(_py_write_stream)

A WriteStream allows an operator to send messages and watermarks to other operators that connect to the corresponding ReadStream.

Note

This class is created automatically when ERDOS initializes an operator, and should never be initialized manually.

send(msg)

Sends a message on the stream.

Parameters

msg (Message) – the message to send. This may be a Watermark or a Message.

Receiving Messages

Operators receive data by reading messages from Read Streams. Operators also receive data by implementing callbacks that are automatically invoked upon the receipt of a message.

class erdos.ReadStream(_py_read_stream)

A ReadStream allows an operator to read and do work on data sent by other operators on a corresponding WriteStream.

An operator that takes control of its execution using the run method can retrieve the messages on a ReadStream using the ReadStream.read() or ReadStream.try_read() methods.

Note

This class is created automatically during run, and should never be initialized manually. No callbacks are invoked if an operator takes control of the execution in run.

read()

Blocks until a message is read from the stream.

Return type

Message

try_read()

Tries to read a mesage from the stream.

Returns None if no messages are available at the moment.

Return type

Optional[Message]

Abstract Streams

These streams represent edges in the dataflow graph, which ERDOS materializes using its communication protocols, and the :py:class:.ReadStream and :py:class:.WriteStream interfaces.

class erdos.Stream(internal_stream)

Base class representing a stream to operators can be connected. from which is subclassed by streams that are used to connect operators in the driver.

Note

This class should never be initialized manually.

property id: str

The id of the stream.

Return type

str

property name: str

The name of the stream. The stream ID if none was given.

Return type

str

class erdos.OperatorStream(operator_stream)

Bases: erdos.streams.Stream

Returned when connecting an operator to the dataflow graph.

Note

This class is created automatically by the connect functions, and should never be initialized manually.

Ingesting and Extracting Data

Some applications have trouble placing all of the data processing logic inside operators. For these applications, ERDOS provides special stream interfaces to ingest and extract data.

A comprehensive example is available here.

class erdos.IngestStream(name=None)

Bases: erdos.streams.Stream

An IngestStream enables drivers to inject data into a running ERDOS application.

The driver can initialize a new IngestStream and connect it to an operator through the connect family of functions. Similar to a WriteStream, an IngestStream provides a IngestStream.send() to enable the driver to send data to the operator to which it was connected.

send(msg)

Sends a message on the stream.

Parameters

msg (Message) – the message to send. This may be a WatermarkMessage or a Message.

class erdos.ExtractStream(stream)

An ExtractStream enables drivers to read data from a running ERDOS applications.

The driver can initialize a new ExtractStream by passing the instance of OperatorStream returned by the connect family of functions. Similar to a ReadStream, an ExtractStream provides read() and try_read() for reading data published on the corresponding OperatorStream.

Parameters

stream (OperatorStream) – The stream from which to read messages.

read()

Blocks until a message is read from the stream.

Return type

Message

try_read()

Tries to read a mesage from the stream.

Returns None if no messages are available at the moment.

Return type

Optional[Message]

Loops

Certain applications require feedback in the dataflow. To support this use case, ERDOS provides the LoopStream interface to support loops in the dataflow.

A comprehensive example is available here.

class erdos.LoopStream

Bases: erdos.streams.Stream

Stream placeholder used to construct loops in the dataflow graph.

Note

Must call connect_loop with a valid OperatorStream to complete the loop.