Streams¶
Streams are used to send data in ERDOS applications.
ERDOS streams are similar to ROS topics, but have a few additional desirable properties. Streams facilitate one-to-many communication, so only 1 operator sends messages on a stream. ERDOS broadcasts messages sent on a stream to all connected operators. In addition, streams are typed when using the Rust API.
Streams expose 3 classes of interfaces:
Read-interfaces expose methods to receive and process data. They allow pulling data by calling
read()
andtry_read()
. Structures that implement read interfaces include:
ReadStream
: used by operators to read data and register callbacks.
ExtractStream
: used by the driver to read data.
Write-interfaces expose the send method to send data on a stream. Structures that implement write interfaces include:
WriteStream
: used by operators to send data.
IngestStream
: used by the driver to send data.
Abstract interfaces used to connect operators and construct a dataflow graph. Structures that implement the abstract :py:class:.Stream interface include:
OperatorStream
: representing a stream on which an operator sends messages.IngestStream
: used to send messages to operators from the driver.LoopStream
: used to create loops in the dataflow graph.
Some applications may want to introduce loops in their dataflow graphs which
is possible using the LoopStream
.
Sending Messages¶
Operators use Write Streams to send data.
- class erdos.WriteStream(_py_write_stream)¶
A
WriteStream
allows an operator to send messages and watermarks to other operators that connect to the correspondingReadStream
.Note
This class is created automatically when ERDOS initializes an operator, and should never be initialized manually.
Receiving Messages¶
Operators receive data by reading messages from Read Streams. Operators also receive data by implementing callbacks that are automatically invoked upon the receipt of a message.
- class erdos.ReadStream(_py_read_stream)¶
A
ReadStream
allows an operator to read and do work on data sent by other operators on a correspondingWriteStream
.An operator that takes control of its execution using the
run
method can retrieve the messages on aReadStream
using theReadStream.read()
orReadStream.try_read()
methods.Note
This class is created automatically during
run
, and should never be initialized manually. No callbacks are invoked if an operator takes control of the execution inrun
.- read()¶
Blocks until a message is read from the stream.
- Return type
Message[T] | WatermarkMessage
- try_read()¶
Tries to read a mesage from the stream.
Returns None if no messages are available at the moment.
- Return type
Optional[Message[T] | WatermarkMessage]
Abstract Streams¶
These streams represent edges in the dataflow graph, which ERDOS materializes using its communication protocols, and the :py:class:.ReadStream and :py:class:.WriteStream interfaces.
- class erdos.Stream(internal_stream)¶
Base class representing a stream to operators can be connected. from which is subclassed by streams that are used to connect operators in the driver.
Note
This class should never be initialized manually.
- property id: uuid.UUID¶
The id of the stream.
- Return type
UUID
- property name: str¶
The name of the stream. The stream ID if none was given.
- Return type
str
- map(function)¶
Applies the given function to each value sent on the stream, and outputs the results on the returned stream.
- Parameters
function (
Callable
[[TypeVar
(T
)],TypeVar
(U
)]) – The function applied to each value sent on this stream.- Return type
OperatorStream
[TypeVar
(U
)]- Returns
A stream that carries the results of the applied function.
- flat_map(function)¶
Applies the given function to each value sent on the stream, and outputs the sequence of received outputs as individual messages.
- Parameters
function (
Callable
[[TypeVar
(T
)],Sequence
[TypeVar
(U
)]]) – The function applied to each value sent on this stream.- Return type
OperatorStream
[TypeVar
(U
)]- Returns
A stream that carries the results of the applied function.
- filter(function)¶
Applies the given function to each value sent on the stream, and sends the value on the returned stream if the function evaluates to True.
- Parameters
function (
Callable
[[TypeVar
(T
)],bool
]) – The function applied to each value sent on this stream. The value is retained if the function returns True.- Return type
OperatorStream
[TypeVar
(T
)]- Returns
An stream that carries the filtered results from the applied function.
- split(function)¶
Applies the given function to each value sent on the stream, and outputs the value to either the left or the right stream depending on if the returned boolean value is True or False respectively.
- Parameters
function (
Callable
[[TypeVar
(T
)],bool
]) – The function applied to each message sent on this stream.- Return type
Tuple
[OperatorStream
[TypeVar
(T
)],OperatorStream
[TypeVar
(T
)]]- Returns
The left and the right stream respectively, containing the values output according to the split function.
- split_by_type(*data_type)¶
Returns a stream for each provided type on which each message’s data is an instance of that provided type.
Message with data not corresponding to a provided type are filtered out. Useful for building operators that send messages with more than 2 data types.
- Parameters
data_type (
Type
[Any
]) – the type of the data to be forwarded to the corresponding stream.- Return type
Tuple
[OperatorStream
[Any
],...
]- Returns
A stream for each provided type where each message’s data is an instance of that type.
- timestamp_join(other)¶
Joins the data with matching timestamps from the two different streams.
- Parameters
other (
Stream
[TypeVar
(U
)]) – The stream to join with.- Return type
OperatorStream
[Tuple
[TypeVar
(T
),TypeVar
(U
)]]- Returns
A stream that carries the joined results from the two streams.
- concat(*other)¶
Merges the data messages from the given streams into a single stream and forwards a watermark when a minimum watermark on the streams is achieved.
- Parameters
other (
Stream
[TypeVar
(T
)]) – The other stream(s) to merge with.- Return type
OperatorStream
[TypeVar
(T
)]- Returns
A stream that carries messages from all merged streams.
- class erdos.OperatorStream(operator_stream)¶
Bases:
erdos.streams.Stream
[erdos.streams.T
]Returned when connecting an operator to the dataflow graph.
Note
This class is created automatically by the connect functions, and should never be initialized manually.
Ingesting and Extracting Data¶
Some applications have trouble placing all of the data processing logic inside operators. For these applications, ERDOS provides special stream interfaces to ingest and extract data.
A comprehensive example is available here.
- class erdos.IngestStream(name=None)¶
Bases:
erdos.streams.Stream
[erdos.streams.T
]An
IngestStream
enables drivers to inject data into a running ERDOS application.The driver can initialize a new
IngestStream
and connect it to an operator through theconnect
family of functions. Similar to aWriteStream
, anIngestStream
provides aIngestStream.send()
to enable the driver to send data to the operator to which it was connected.- send(msg)¶
Sends a message on the stream.
- Parameters
msg (
Message
[TypeVar
(T
)]) – the message to send. This may be aWatermarkMessage
or aMessage
.- Return type
None
- class erdos.ExtractStream(stream)¶
An
ExtractStream
enables drivers to read data from a running ERDOS applications.The driver can initialize a new
ExtractStream
by passing the instance ofOperatorStream
returned by theconnect
family of functions. Similar to aReadStream
, anExtractStream
providesread()
andtry_read()
for reading data published on the correspondingOperatorStream
.- Parameters
stream (
OperatorStream
[TypeVar
(T
)]) – The stream from which to read messages.
- read()¶
Blocks until a message is read from the stream.
- Return type
Message[T] | WatermarkMessage
- try_read()¶
Tries to read a mesage from the stream.
Returns
None
if no messages are available at the moment.- Return type
Optional[Message[T] | WatermarkMessage]
Loops¶
Certain applications require feedback in the dataflow. To support this use case, ERDOS provides the LoopStream interface to support loops in the dataflow.
A comprehensive example is available here.
- class erdos.LoopStream¶
Bases:
erdos.streams.Stream
[erdos.streams.T
]Stream placeholder used to construct loops in the dataflow graph.
Note
Must call connect_loop with a valid
OperatorStream
to complete the loop.