Operators

Operators process data in ERDOS applications.

Operators receive messages from streams passed to the connect static method. Operators also create streams on which they send messages. These streams must created and returned by the connect method. ERDOS expects developers to specify which streams operators read from and write to. For more details, see the data streams documentation.

All operators must implement erdos.Operator abstract class.

Operators set up state in the __init__ method. Operators should also add callbacks to streams in __init__.

Implement execution logic by overriding the run method. This method may contain a control loop or call methods that run regularly. Callbacks are not invoked while run executes.

API

Examples

Periodically Publishing Data

class SendOp(erdos.Operator):
    def __init__(self, write_stream):
        self.write_stream = write_stream

    @staticmethod
    def connect():
        return [erdos.WriteStream()]

    def run(self):
        count = 0
        while True:
            msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
            print("SendOp: sending {msg}".format(msg=msg))
            self.write_stream.send(msg)

            count += 1
            time.sleep(1)

Processing Data via Callbacks

class CallbackOp(erdos.Operator):
    def __init__(self, read_stream):
        print("initializing  op")
        read_stream.add_callback(CallbackOp.callback)

    @staticmethod
    def callback(msg):
        print("CallbackOp: received {msg}".format(msg=msg))

    @staticmethod
    def connect(read_streams):
        return []

Processing Data by Pulling Messages

class PullOp:
    def __init__(self, read_stream):
        self.read_stream = read_stream

    @staticmethod
    def connect(read_streams):
        return []

    def run(self):
        while True:
            data = self.read_stream.read()
            print("PullOp: received {data}".format(data=data))