Operators¶
ERDOS operators process received data, and use streams to broadcast erdos.Message
and erdos.WatermarkMessage
objects to downstream operators.
We provide a standard library of operators for common dataflow patterns.
While the standard operators are general and versatile, some applications may
implement custom operators to better optimize performance and take
fine-grained control over exection.
Operators are implemented as classes which implement a certain communication pattern. The built-in
operators are subclassed based on the wanted communication pattern. For example,
the SendOp from
python/examples/simple_pipeline.py
implements a erdos.operator.Source
operator because it does not receive any data,
and sends messages on a single output stream.
The
erdos.operator.Source
operator is used to write data on a singleerdos.WriteStream
.The
erdos.operator.Sink
operator is used to read data from a singleerdos.ReadStream
.The
erdos.operator.OneInOneOut
operator is used to read data from a singleerdos.ReadStream
and write data on a singleerdos.WriteStream
.The
erdos.operator.TwoInOneOut
operator is used to read data from 2erdos.ReadStream
s and write data on a singleerdos.WriteStream
.The
erdos.operator.OneInTwoOut
operator is used to read data from a singleerdos.ReadStream
s and write data on 2erdos.WriteStream
s.
Operators can support both push and pull-based models of execution by
implementing methods defined for each operator. By implementing callbacks
such as erdos.operator.OneInOneOut.on_data()
, operators can process
messages as they arrive. Moreover, operators can implement callbacks over watermarks
(e.g. erdos.operator.OneInOneOut.on_watermark()
) to ensure ordered
processing over timestamps. ERDOS ensures lock-free, safe, and concurrent processing
via a system-managed ordering of callbacks, which is implemented as a
run queue for the system’s multithreaded runtime.
While ERDOS manages the execution of callbacks, some operators require
more finegrained control. Operators can take manual control over the
thread of execution by implementing the run()
(e.g. erdos.operator.OneInOneOut.run()
) method.
Callbacks are not invoked while run executes.
Operator API¶
- class erdos.operator.BaseOperator¶
A
BaseOperator
is an internal class that provides the methods common to the individual operators.- property id: uuid.UUID¶
Returns the operator’s ID.
- Return type
UUID
- property config: erdos.config.OperatorConfig¶
Returns the operator’s config.
- Return type
- add_trace_event(event)¶
Records a profile trace event.
- Return type
None
- get_runtime(event_name, percentile)¶
Gets the runtime percentile for a given type of event.
- Parameters
event_name (str) – The name of the event to get runtime for.
percentile (int) – The percentile runtime to get.
- Returns
Runtime in microseconds, or None if the operator doesn’t have any runtime stats for the given event name.
- Return type
(float)
- class erdos.operator.Source(*args: List[Any], **kwargs: Dict[str, Any])¶
Bases:
erdos.operator.BaseOperator
,Generic
[erdos.operator.T
]A
Source
is an abstract base class that needs to be inherited by user-defined source operators that generate data on a singleWriteStream
in an ERDOS dataflow graph.A user-defined operator needs to implement the
Source.run()
andSource.destroy()
in order to take control of the execution and the teardown of the operator respectively.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- Return type
Source
[TypeVar
(T
)]
- run(write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
WriteStream
to send data on.- Parameters
write_stream (
WriteStream
[TypeVar
(T
)]) – AWriteStream
instance to send data on.- Return type
None
- class erdos.operator.Sink(*args: List[Any], **kwargs: Dict[str, Any])¶
Bases:
erdos.operator.BaseOperator
,Generic
[erdos.operator.T
]A
Sink
is an abstract class that needs to be inherited by user-defined sink operators that consume data from a singleReadStream
in an ERDOS dataflow graph.The user-defined operator can either implement the
run()
method and retrieve data from the provided read_stream or implement theon_data()
andon_watermark()
methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- Return type
Sink
[TypeVar
(T
)]
- run(read_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStream
to retrieve data from.- Parameters
read_stream (
ReadStream
[TypeVar
(T
)]) – AReadStream
instance to read data from.- Return type
None
- on_data(context, data)¶
Callback invoked upon receipt of a
Message
on the operator’sReadStream
.- Parameters
context (
SinkContext
) – ASinkContext
instance to retrieve metadata about the current invocation of the callback.data (
TypeVar
(T
)) – The data contained in the message received on the read stream.
- Return type
None
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessage
on the operator’sReadStream
.- Parameters
context (
SinkContext
) – ASinkContext
instance to retrieve metadata about the current invocation of the callback.- Return type
None
- class erdos.operator.OneInOneOut(*args: List[Any], **kwargs: Dict[str, Any])¶
Bases:
erdos.operator.BaseOperator
,Generic
[erdos.operator.T
,erdos.operator.U
]A
OneInOneOut
is an abstract base class that needs to be inherited by user-defined operators that consume data from a singleReadStream
and produce data on a singleWriteStream
in an ERDOS dataflow graph.The user-defined operator can either implement the
run()
method and retrieve data from the provided read_stream and send data on the write_stream or implement theon_data()
andon_watermark()
methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- Return type
OneInOneOut
[TypeVar
(T
),TypeVar
(U
)]
- run(read_stream, write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStream
to retrieve data from, and aWriteStream
to send data on.- Parameters
read_stream (
ReadStream
[TypeVar
(T
)]) – AReadStream
instance to read data from.write_stream (
WriteStream
[TypeVar
(U
)]) – AWriteStream
instance to send data on.
- Return type
None
- on_data(context, data)¶
Callback invoked upon receipt of a
Message
on the operator’sReadStream
.- Parameters
context (
OneInOneOutContext
[TypeVar
(U
)]) – AOneInOneOutContext
instance to retrieve metadata about the current invocation of the callback.data (
TypeVar
(T
)) – The data contained in the message received on the read stream.
- Return type
None
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessage
on the operator’sReadStream
.- Parameters
context (
OneInOneOutContext
[TypeVar
(U
)]) – AOneInOneOutContext
instance to retrieve metadata about the current invocation of the callback.- Return type
None
- class erdos.operator.TwoInOneOut(*args: List[Any], **kwargs: Dict[str, Any])¶
Bases:
erdos.operator.BaseOperator
,Generic
[erdos.operator.T
,erdos.operator.U
,erdos.operator.V
]A
TwoInOneOut
is an abstract base class that needs to be inherited by user-defined operators that consume data from twoReadStream
instances and produces data on a singleWriteStream
in an ERDOS dataflow graph.The user-defined operator can either implement the
run()
method and retrieve data from the provided left_read_stream and right_read_stream or implement theon_left_data()
,on_right_data()
andon_watermark()
methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- Return type
TwoInOneOut
[TypeVar
(T
),TypeVar
(U
),TypeVar
(V
)]
- run(left_read_stream, right_read_stream, write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with two instances of
ReadStream
to retrieve data from, and aWriteStream
to send data on.- Parameters
left_read_stream (
ReadStream
[TypeVar
(T
)]) – The firstReadStream
instance to read data from.right_read_stream (
ReadStream
[TypeVar
(U
)]) – The secondReadStream
instance to read data from.write_stream (
WriteStream
[TypeVar
(V
)]) – AWriteStream
instance to send data on.
- Return type
None
- on_left_data(context, data)¶
Callback invoked upon receipt of a
Message
on the left_read_stream.- Parameters
context (
TwoInOneOutContext
[TypeVar
(V
)]) – ATwoInOneOutContext
instance to retrieve metadata about the current invocation of the callback.data (
TypeVar
(T
)) – The data contained in the message received on the read stream.
- Return type
None
- on_right_data(context, data)¶
Callback invoked puon receipt of a
Message
on the right_read_stream.- Parameters
context (
TwoInOneOutContext
[TypeVar
(V
)]) – ATwoInOneOutContext
instance to retrieve metadata about the current invocation of the callback.data (
TypeVar
(U
)) – The data contained in the message received on the read stream.
- Return type
None
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessage
across the two instances of the operator’sReadStream
.- Parameters
context (
TwoInOneOutContext
[TypeVar
(V
)]) – ATwoInOneOutContext
instance to retrieve metadata about the current invocation of the callback.- Return type
None
- class erdos.operator.OneInTwoOut(*args: List[Any], **kwargs: Dict[str, Any])¶
Bases:
erdos.operator.BaseOperator
,Generic
[erdos.operator.T
,erdos.operator.U
,erdos.operator.V
]A
OneInTwoOut
is an abstract base class that needs to be inherited by user-defined operators that consume data from a singleReadStream
instance and produce data on two instances ofWriteStream
in an ERDOS dataflow graph.The user-defined operator can either implement the
run()
method and retrieve data from the provided read_stream and produce data on the left_write_stream and right_write_stream, or implement theon_data()
andon_watermark()
methods to request a callback upon receipt of messages and watermarks.- static __new__(cls, *args, **kwargs)¶
Set up variables before call to __init__ on the python end.
More setup is done in the Rust backend at src/python/mod.rs.
- Return type
OneInTwoOut
[TypeVar
(T
),TypeVar
(U
),TypeVar
(V
)]
- run(read_stream, left_write_stream, right_write_stream)¶
Runs the operator.
Invoked automatically by ERDOS, and provided with a
ReadStream
instance to retrieve data from, and twoWriteStream
instances to send data on.- Parameters
read_stream (
ReadStream
[TypeVar
(T
)]) – TheReadStream
instance to retrieve data from.left_write_stream (
WriteStream
[TypeVar
(U
)]) – The firstWriteStream
instance to send data on.right_write_stream (
WriteStream
[TypeVar
(V
)]) – The secondWriteStream
instance to send data on.
- Return type
None
- on_data(context, data)¶
Callback invoked upon receipt of a
Message
on the read_stream.- Parameters
context (
OneInTwoOutContext
[TypeVar
(U
),TypeVar
(V
)]) – AOneInTwoOutContext
instance to retrieve metadata about the current invocation of the callback.data (
TypeVar
(T
)) – The data contained in the message received on the read stream.
- Return type
None
- on_watermark(context)¶
Callback invoked upon receipt of a
WatermarkMessage
on the operator’sReadStream
.- Parameters
context (
OneInTwoOutContext
[TypeVar
(U
),TypeVar
(V
)]) – AOneInTwoOutContext
instance to retrieve metadata about the current invocation of the callback.- Return type
None
Operator Config¶
- class erdos.config.OperatorConfig(name=None, flow_watermarks=True, log_file_name=None, csv_log_file_name=None, profile_file_name=None)¶
An
OperatorConfig
allows developers to configure an operator.An operator` can query the configuration passed to it by the driver by accessing the properties in
self.config
. The below example shows how a LoggerOperator can access the log file name passed to the operator by the driver:class LoggerOperator(erdos.Operator): def __init__(self, input_stream): # Set up a logger. _log = self.config.log_file_name self.logger = erdos.utils.setup_logging(self.config.name, _log)
- property name: Optional[str]¶
Name of the operator.
- Return type
Optional
[str
]
- property flow_watermarks: bool¶
Whether to automatically pass on the low watermark.
- Return type
bool
- property log_file_name: Optional[str]¶
File name used for logging.
- Return type
Optional
[str
]
- property csv_log_file_name: Optional[str]¶
File name used for logging to CSV.
- Return type
Optional
[str
]
- property profile_file_name: Optional[str]¶
File named used for profiling an operator’s performance.
- Return type
Optional
[str
]
Context API¶
- class erdos.context.SinkContext(timestamp, config)¶
A
SinkContext
instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aSink
operator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- class erdos.context.OneInOneOutContext(timestamp, config, write_stream)¶
A
OneInOneOutContext
instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aOneInOneOut
operator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- write_stream¶
The write stream to send results to downstream operators.
- Type
- class erdos.context.TwoInOneOutContext(timestamp, config, write_stream)¶
A
TwoInOneOutContext
instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aTwoInOneOut
operator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- write_stream¶
The write stream to send results to downstream operators.
- Type
- class erdos.context.OneInTwoOutContext(timestamp, config, left_write_stream, right_write_stream)¶
A
OneInTwoOutContext
instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in aOneInTwoOut
operator.- config¶
The operator config generated by the driver upon connection of the operator to the graph.
- Type
- left_write_stream¶
The first write stream to send results to downstream operators.
- Type
- right_write_stream¶
The second write stream to send results to downstream operators.
- Type
Examples¶
Full example at python/examples/simple_pipeline.py.
Periodically Publishing Data¶
class SendOp(Source):
def __init__(self):
print("initializing source op")
def run(self, write_stream: WriteStream):
count = 0
while True:
msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
print("SendOp: sending {msg}".format(msg=msg))
write_stream.send(msg)
count += 1
time.sleep(1)
Processing Data via Callbacks¶
class CallbackOp(Sink):
def __init__(self):
print("initializing callback op")
def on_data(self, context: SinkContext, data: Any):
print("CallbackOp: received {}".format(data))
Processing Data by Pulling Messages¶
class PullOp(Sink):
def __init__(self):
print("initializing pull op using read")
def run(self, read_stream: ReadStream):
while True:
data = read_stream.read()
print("PullOp: received {data}".format(data=data))