Operators

An ERDOS operator receives data on ReadStreams, and sends processed data on WriteStreams. We provide a standard library of operators for common dataflow patterns under erdos.operators. While the standard operators are general and versatile, some applications may implement custom operators to better optimize performance and take fine-grained control over exection.

All operators must inherit from the Operator base class and implement __init__() and connect() methods.

  • __init__() takes all ReadStreams from which the operator receives data, all WriteStreams on which the operator sends data, and any other arguments passed when calling connect(). Within __init__(), the state should be initialized, and callbacks may be registered across ReadStreams.

  • The connect() method takes ReadStreams and returns WriteStreams which are all later passed to __init__() by ERDOS. The ReadStreams and WriteStreams must appear in the same order as in __init__().

While ERDOS manages the execution of callbacks, some operators require more finegrained control. Operators can take manual control over the thread of execution by implementing Operator.run(), and pulling data from ReadStreams. Callbacks are not invoked while run executes.

Operator API

Examples

Full example at python/examples/simple_pipeline.py.

Periodically Publishing Data

class SendOp(erdos.Operator):
    def __init__(self, write_stream):
        self.write_stream = write_stream

    @staticmethod
    def connect():
        return [erdos.WriteStream()]

    def run(self):
        count = 0
        while True:
            msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
            print("SendOp: sending {msg}".format(msg=msg))
            self.write_stream.send(msg)

            count += 1
            time.sleep(1)

Processing Data via Callbacks

class CallbackOp(erdos.Operator):
    def __init__(self, read_stream):
        print("initializing  op")
        read_stream.add_callback(CallbackOp.callback)

    @staticmethod
    def callback(msg):
        print("CallbackOp: received {msg}".format(msg=msg))

    @staticmethod
    def connect(read_streams):
        return []

Processing Data by Pulling Messages

class PullOp(erdos.Operator):
    def __init__(self, read_stream):
        self.read_stream = read_stream

    @staticmethod
    def connect(read_streams):
        return []

    def run(self):
        while True:
            data = self.read_stream.read()
            print("PullOp: received {data}".format(data=data))