Operators¶
ERDOS operators process received data, and use streams to broadcast erdos.Message
and erdos.WatermarkMessage objects to downstream operators.
We provide a standard library of operators for common dataflow patterns.
While the standard operators are general and versatile, some applications may
implement custom operators to better optimize performance and take
fine-grained control over exection.
Operators are implemented as classes which implement a certain communication pattern. The built-in
operators are subclassed based on the wanted communication pattern. For example,
the SendOp from
python/examples/simple_pipeline.py
implements a erdos.operator.Source operator because it does not receive any data,
and sends messages on a single output stream.
The
erdos.operator.Sourceoperator is used to write data on a singleerdos.WriteStream.The
erdos.operator.Sinkoperator is used to read data from a singleerdos.ReadStream.The
erdos.operator.OneInOneOutoperator is used to read data from a singleerdos.ReadStreamand write data on a singleerdos.WriteStream.The
erdos.operator.TwoInOneOutoperator is used to read data from 2erdos.ReadStreams and write data on a singleerdos.WriteStream.The
erdos.operator.OneInTwoOutoperator is used to read data from a singleerdos.ReadStreams and write data on 2erdos.WriteStreams.
Operators can support both push and pull-based models of execution by
implementing methods defined for each operator. By implementing callbacks
such as erdos.operator.OneInOneOut.on_data(), operators can process
messages as they arrive. Moreover, operators can implement callbacks over watermarks
(e.g. erdos.operator.OneInOneOut.on_watermark()) to ensure ordered
processing over timestamps. ERDOS ensures lock-free, safe, and concurrent processing
via a system-managed ordering of callbacks, which is implemented as a
run queue for the system’s multithreaded runtime.
While ERDOS manages the execution of callbacks, some operators require
more finegrained control. Operators can take manual control over the
thread of execution by implementing the run()
(e.g. erdos.operator.OneInOneOut.run()) method.
Callbacks are not invoked while run executes.
Operator API¶
- class erdos.operator.BaseOperator¶
A
BaseOperatoris an internal class that provides the methods common to the individual operators.- property id¶
Returns the operator’s ID.
- property config¶
Returns the operator’s config.
- add_trace_event(event)¶
Records a profile trace event.
- get_runtime(event_name, percentile)¶
Gets the runtime percentile for a given type of event.
- Parameters
event_name (str) – The name of the event to get runtime for.
percentile (int) – The percentile runtime to get.
- Returns
Runtime in microseconds, or None if the operator doesn’t have any runtime stats for the given event name.
- Return type
(float)
- class erdos.operator.Source(*args, **kwargs)¶
Bases:
erdos.operator.BaseOperatorA
Sourceis an abstract base class that needs to be inherited by user-defined source operators that generate data on a singleWriteStreamin an ERDOS dataflow graph.A user-defined operator needs to implement the
Source.run()andSource.destroy()in order to take control of the execution and the teardown of the operator respectively.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- run(write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
WriteStreamto send data on.- Parameters
write_stream (
WriteStream) – AWriteStreaminstance to send data on.
- class erdos.operator.Sink(*args, **kwargs)¶
Bases:
erdos.operator.BaseOperatorA
Sinkis an abstract class that needs to be inherited by user-defined sink operators that consume data from a singleReadStreamin an ERDOS dataflow graph.The user-defined operator can either implement the
run()method and retrieve data from the provided read_stream or implement theon_data()andon_watermark()methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- run(read_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStreamto retrieve data from.- Parameters
read_stream (
ReadStream) – AReadStreaminstance to read data from.
- on_data(context, data)¶
Callback invoked upon receipt of a
Messageon the operator’sReadStream.- Parameters
context (
SinkContext) – ASinkContextinstance to retrieve metadata about the current invocation of the callback.data (
Any) – The data contained in the message received on the read stream.
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessageon the operator’sReadStream.- Parameters
context (
SinkContext) – ASinkContextinstance to retrieve metadata about the current invocation of the callback.
- class erdos.operator.OneInOneOut(*args, **kwargs)¶
Bases:
erdos.operator.BaseOperatorA
OneInOneOutis an abstract base class that needs to be inherited by user-defined operators that consume data from a singleReadStreamand produce data on a singleWriteStreamin an ERDOS dataflow graph.The user-defined operator can either implement the
run()method and retrieve data from the provided read_stream and send data on the write_stream or implement theon_data()andon_watermark()methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- run(read_stream, write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStreamto retrieve data from, and aWriteStreamto send data on.- Parameters
read_stream (
ReadStream) – AReadStreaminstance to read data from.write_stream (
WriteStream) – AWriteStreaminstance to send data on.
- on_data(context, data)¶
Callback invoked upon receipt of a
Messageon the operator’sReadStream.- Parameters
context (
OneInOneOutContext) – AOneInOneOutContextinstance to retrieve metadata about the current invocation of the callback.data (
Any) – The data contained in the message received on the read stream.
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessageon the operator’sReadStream.- Parameters
context (
OneInOneOutContext) – AOneInOneOutContextinstance to retrieve metadata about the current invocation of the callback.
- class erdos.operator.TwoInOneOut(*args, **kwargs)¶
Bases:
erdos.operator.BaseOperatorA
TwoInOneOutis an abstract base class that needs to be inherited by user-defined operators that consume data from twoReadStreaminstances and produces data on a singleWriteStreamin an ERDOS dataflow graph.The user-defined operator can either implement the
run()method and retrieve data from the provided left_read_stream and right_read_stream or implement theon_left_data(),on_right_data()andon_watermark()methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- run(left_read_stream, right_read_stream, write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with two instances of
ReadStreamto retrieve data from, and aWriteStreamto send data on.- Parameters
left_read_stream (
ReadStream) – The firstReadStreaminstance to read data from.right_read_stream (
ReadStream) – The secondReadStreaminstance to read data from.write_stream (
WriteStream) – AWriteStreaminstance to send data on.
- on_left_data(context, data)¶
Callback invoked upon receipt of a
Messageon the left_read_stream.- Parameters
context (
TwoInOneOutContext) – ATwoInOneOutContextinstance to retrieve metadata about the current invocation of the callback.data (
Any) – The data contained in the message received on the read stream.
- on_right_data(context, data)¶
Callback invoked puon receipt of a
Messageon the right_read_stream.- Parameters
context (
TwoInOneOutContext) – ATwoInOneOutContextinstance to retrieve metadata about the current invocation of the callback.data (
Any) – The data contained in the message received on the read stream.
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessageacross the two instances of the operator’sReadStream.- Parameters
context (
TwoInOneOutContext) – ATwoInOneOutContextinstance to retrieve metadata about the current invocation of the callback.
- class erdos.operator.OneInTwoOut(*args, **kwargs)¶
Bases:
erdos.operator.BaseOperatorA
OneInTwoOutis an abstract base class that needs to be inherited by user-defined operators that consume data from a singleReadStreaminstance and produce data on two instances ofWriteStreamin an ERDOS dataflow graph.The user-defined operator can either implement the
run()method and retrieve data from the provided read_stream and produce data on the left_write_stream and right_write_stream, or implement theon_data()andon_watermark()methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- run(read_stream, left_write_stream, right_write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStreaminstance to retrieve data from, and twoWriteStreaminstances to send data on.- Parameters
read_stream (
ReadStream) – TheReadStreaminstance to retrieve data from.left_write_stream (
WriteStream) – The firstWriteStreaminstance to send data on.right_write_stream (
WriteStream) – The secondWriteStreaminstance to send data on.
- on_data(context, data)¶
Callback invoked upon receipt of a
Messageon the read_stream.- Parameters
context (
OneInTwoOutContext) – AOneInTwoOutContextinstance to retrieve metadata about the current invocation of the callback.data (
Any) – The data contained in the message received on the read stream.
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessageon the operator’sReadStream.- Parameters
context (
OneInTwoOutContext) – AOneInTwoOutContextinstance to retrieve metadata about the current invocation of the callback.
Operator Config¶
- class erdos.operator.OperatorConfig(name=None, flow_watermarks=True, log_file_name=None, csv_log_file_name=None, profile_file_name=None)¶
An
OperatorConfigallows developers to configure an operator.An operator` can query the configuration passed to it by the driver by accessing the properties in
self.config. The below example shows how a LoggerOperator can access the log file name passed to the operator by the driver:class LoggerOperator(erdos.Operator): def __init__(self, input_stream): # Set up a logger. _log = self.config.log_file_name self.logger = erdos.utils.setup_logging(self.config.name, _log)
- property name¶
Name of the operator.
- property flow_watermarks¶
Whether to automatically pass on the low watermark.
- property log_file_name¶
File name used for logging.
- property csv_log_file_name¶
File name used for logging to CSV.
- property profile_file_name¶
File named used for profiling an operator’s performance.
Context API¶
- class erdos.context.SinkContext(timestamp, config)¶
A
SinkContextinstance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aSinkoperator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- class erdos.context.OneInOneOutContext(timestamp, config, write_stream)¶
A
OneInOneOutContextinstance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aOneInOneOutoperator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- write_stream¶
The write stream to send results to downstream operators.
- Type
- class erdos.context.TwoInOneOutContext(timestamp, config, write_stream)¶
A
TwoInOneOutContextinstance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aTwoInOneOutoperator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- write_stream¶
The write stream to send results to downstream operators.
- Type
- class erdos.context.OneInTwoOutContext(timestamp, config, left_write_stream, right_write_stream)¶
A
OneInTwoOutContextinstance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aOneInTwoOutoperator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- left_write_stream¶
The first write stream to send results to downstream operators.
- Type
- right_write_stream¶
The second write stream to send results to downstream operators.
- Type
Examples¶
Full example at python/examples/simple_pipeline.py.
Periodically Publishing Data¶
class SendOp(Source):
def __init__(self):
print("initializing source op")
def run(self, write_stream: WriteStream):
count = 0
while True:
msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
print("SendOp: sending {msg}".format(msg=msg))
write_stream.send(msg)
count += 1
time.sleep(1)
Processing Data via Callbacks¶
class CallbackOp(Sink):
def __init__(self):
print("initializing callback op")
def on_data(self, context: SinkContext, data: Any):
print("CallbackOp: received {}".format(data))
Processing Data by Pulling Messages¶
class PullOp(Sink):
def __init__(self):
print("initializing pull op using read")
def run(self, read_stream: ReadStream):
while True:
data = read_stream.read()
print("PullOp: received {data}".format(data=data))