Operators¶
An ERDOS operator receives data on ReadStreams
,
and sends processed data on WriteStreams
.
We provide a standard library of operators for common dataflow patterns
under erdos.operators
.
While the standard operators are general and versatile, some applications may
implement custom operators to better optimize performance and take
fine-grained control over exection.
All operators must inherit from the Operator
base class and
implement __init__()
and
connect()
methods.
__init__()
takes allReadStreams
from which the operator receives data, allWriteStreams
on which the operator sends data, and any other arguments passed when callingconnect()
. Within__init__()
, the state should be initialized, and callbacks may be registered acrossReadStreams
.The
connect()
method takesReadStreams
and returnsWriteStreams
which are all later passed to__init__()
by ERDOS. TheReadStreams
andWriteStreams
must appear in the same order as in__init__()
.
While ERDOS manages the execution of callbacks, some operators require
more finegrained control. Operators can take manual control over the
thread of execution by implementing
Operator.run()
,
and pulling data from ReadStreams
.
Callbacks are not invoked while run executes.
Operator API¶
Examples¶
Full example at python/examples/simple_pipeline.py.
Periodically Publishing Data¶
class SendOp(erdos.Operator):
def __init__(self, write_stream):
self.write_stream = write_stream
@staticmethod
def connect():
return [erdos.WriteStream()]
def run(self):
count = 0
while True:
msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
print("SendOp: sending {msg}".format(msg=msg))
self.write_stream.send(msg)
count += 1
time.sleep(1)
Processing Data via Callbacks¶
class CallbackOp(erdos.Operator):
def __init__(self, read_stream):
print("initializing op")
read_stream.add_callback(CallbackOp.callback)
@staticmethod
def callback(msg):
print("CallbackOp: received {msg}".format(msg=msg))
@staticmethod
def connect(read_streams):
return []
Processing Data by Pulling Messages¶
class PullOp(erdos.Operator):
def __init__(self, read_stream):
self.read_stream = read_stream
@staticmethod
def connect(read_streams):
return []
def run(self):
while True:
data = self.read_stream.read()
print("PullOp: received {data}".format(data=data))