Operators

ERDOS operators process received data, and use streams to broadcast erdos.Message and erdos.WatermarkMessage objects to downstream operators. We provide a standard library of operators for common dataflow patterns. While the standard operators are general and versatile, some applications may implement custom operators to better optimize performance and take fine-grained control over exection.

Operators are implemented as classes which implement a certain communication pattern. The built-in operators are subclassed based on the wanted communication pattern. For example, the SendOp from python/examples/simple_pipeline.py implements a erdos.operator.Source operator because it does not receive any data, and sends messages on a single output stream.

Operators can support both push and pull-based models of execution by implementing methods defined for each operator. By implementing callbacks such as erdos.operator.OneInOneOut.on_data(), operators can process messages as they arrive. Moreover, operators can implement callbacks over watermarks (e.g. erdos.operator.OneInOneOut.on_watermark()) to ensure ordered processing over timestamps. ERDOS ensures lock-free, safe, and concurrent processing via a system-managed ordering of callbacks, which is implemented as a run queue for the system’s multithreaded runtime.

While ERDOS manages the execution of callbacks, some operators require more finegrained control. Operators can take manual control over the thread of execution by implementing the run() (e.g. erdos.operator.OneInOneOut.run()) method. Callbacks are not invoked while run executes.

Operator API

class erdos.operator.BaseOperator

A BaseOperator is an internal class that provides the methods common to the individual operators.

property id

Returns the operator’s ID.

property config

Returns the operator’s config.

add_trace_event(event)

Records a profile trace event.

get_runtime(event_name, percentile)

Gets the runtime percentile for a given type of event.

Parameters
  • event_name (str) – The name of the event to get runtime for.

  • percentile (int) – The percentile runtime to get.

Returns

Runtime in microseconds, or None if the operator doesn’t have any runtime stats for the given event name.

Return type

(float)

class erdos.operator.Source(*args, **kwargs)

Bases: erdos.operator.BaseOperator

A Source is an abstract base class that needs to be inherited by user-defined source operators that generate data on a single WriteStream in an ERDOS dataflow graph.

A user-defined operator needs to implement the Source.run() and Source.destroy() in order to take control of the execution and the teardown of the operator respectively.

static __new__(cls, *args, **kwargs)

Set up variables before call to __init__ on the python end.

More setup is done in the Rust backend at src/python/mod.rs.

run(write_stream)

Runs the operator.

Invoked automatically by ERDOS, and provided with a WriteStream to send data on.

Parameters

write_stream (WriteStream) – A WriteStream instance to send data on.

destroy()

Destroys the operator.

Invoked automatically by ERDOS once run() finishes its execution, and can be used by the operator to teardown its state gracefully.

class erdos.operator.Sink(*args, **kwargs)

Bases: erdos.operator.BaseOperator

A Sink is an abstract class that needs to be inherited by user-defined sink operators that consume data from a single ReadStream in an ERDOS dataflow graph.

The user-defined operator can either implement the run() method and retrieve data from the provided read_stream or implement the on_data() and on_watermark() methods to request a callback upon receipt of messages and watermarks.

static __new__(cls, *args, **kwargs)

Set up variables before call to __init__ on the python end.

More setup is done in the Rust backend at src/python/mod.rs.

run(read_stream)

Runs the operator.

Invoked automatically by ERDOS, and provided with a ReadStream to retrieve data from.

Parameters

read_stream (ReadStream) – A ReadStream instance to read data from.

on_data(context, data)

Callback invoked upon receipt of a Message on the operator’s ReadStream.

Parameters
  • context (SinkContext) – A SinkContext instance to retrieve metadata about the current invocation of the callback.

  • data (Any) – The data contained in the message received on the read stream.

on_watermark(context)

Callback invoked upon receipt of a WatermarkMessage on the operator’s ReadStream.

Parameters

context (SinkContext) – A SinkContext instance to retrieve metadata about the current invocation of the callback.

destroy()

Destroys the operator.

Invoked automatically by ERDOS once run() finishes its execution, or when a watermark for the top timestamp is received on the read stream, and can be used by the operator to teardown its state gracefully.

class erdos.operator.OneInOneOut(*args, **kwargs)

Bases: erdos.operator.BaseOperator

A OneInOneOut is an abstract base class that needs to be inherited by user-defined operators that consume data from a single ReadStream and produce data on a single WriteStream in an ERDOS dataflow graph.

The user-defined operator can either implement the run() method and retrieve data from the provided read_stream and send data on the write_stream or implement the on_data() and on_watermark() methods to request a callback upon receipt of messages and watermarks.

static __new__(cls, *args, **kwargs)

Set up variables before call to __init__ on the python end.

More setup is done in the Rust backend at src/python/mod.rs.

run(read_stream, write_stream)

Runs the operator.

Invoked automatically by ERDOS, and provided with a ReadStream to retrieve data from, and a WriteStream to send data on.

Parameters
on_data(context, data)

Callback invoked upon receipt of a Message on the operator’s ReadStream.

Parameters
  • context (OneInOneOutContext) – A OneInOneOutContext instance to retrieve metadata about the current invocation of the callback.

  • data (Any) – The data contained in the message received on the read stream.

on_watermark(context)

Callback invoked upon receipt of a WatermarkMessage on the operator’s ReadStream.

Parameters

context (OneInOneOutContext) – A OneInOneOutContext instance to retrieve metadata about the current invocation of the callback.

destroy()

Destroys the operator.

Invoked automatically by ERDOS once run() finishes its execution, or when a watermark for the top timestamp is received on the read stream, and can be used by the operator to teardown its state gracefully.

class erdos.operator.TwoInOneOut(*args, **kwargs)

Bases: erdos.operator.BaseOperator

A TwoInOneOut is an abstract base class that needs to be inherited by user-defined operators that consume data from two ReadStream instances and produces data on a single WriteStream in an ERDOS dataflow graph.

The user-defined operator can either implement the run() method and retrieve data from the provided left_read_stream and right_read_stream or implement the on_left_data(), on_right_data() and on_watermark() methods to request a callback upon receipt of messages and watermarks.

static __new__(cls, *args, **kwargs)

Set up variables before call to __init__ on the python end.

More setup is done in the Rust backend at src/python/mod.rs.

run(left_read_stream, right_read_stream, write_stream)

Runs the operator.

Invoked automatically by ERDOS, and provided with two instances of ReadStream to retrieve data from, and a WriteStream to send data on.

Parameters
on_left_data(context, data)

Callback invoked upon receipt of a Message on the left_read_stream.

Parameters
  • context (TwoInOneOutContext) – A TwoInOneOutContext instance to retrieve metadata about the current invocation of the callback.

  • data (Any) – The data contained in the message received on the read stream.

on_right_data(context, data)

Callback invoked puon receipt of a Message on the right_read_stream.

Parameters
  • context (TwoInOneOutContext) – A TwoInOneOutContext instance to retrieve metadata about the current invocation of the callback.

  • data (Any) – The data contained in the message received on the read stream.

on_watermark(context)

Callback invoked upon receipt of a WatermarkMessage across the two instances of the operator’s ReadStream.

Parameters

context (TwoInOneOutContext) – A TwoInOneOutContext instance to retrieve metadata about the current invocation of the callback.

destroy()

Destroys the operator.

Invoked automatically by ERDOS once run() finishes its execution, or when a watermark for the top timestamp is received on both the read streams, and can be used by the operator to teardown its state gracefully.

class erdos.operator.OneInTwoOut(*args, **kwargs)

Bases: erdos.operator.BaseOperator

A OneInTwoOut is an abstract base class that needs to be inherited by user-defined operators that consume data from a single ReadStream instance and produce data on two instances of WriteStream in an ERDOS dataflow graph.

The user-defined operator can either implement the run() method and retrieve data from the provided read_stream and produce data on the left_write_stream and right_write_stream, or implement the on_data() and on_watermark() methods to request a callback upon receipt of messages and watermarks.

static __new__(cls, *args, **kwargs)

Set up variables before call to __init__ on the python end.

More setup is done in the Rust backend at src/python/mod.rs.

run(read_stream, left_write_stream, right_write_stream)

Runs the operator.

Invoked automatically by ERDOS, and provided with a ReadStream instance to retrieve data from, and two WriteStream instances to send data on.

Parameters
on_data(context, data)

Callback invoked upon receipt of a Message on the read_stream.

Parameters
  • context (OneInTwoOutContext) – A OneInTwoOutContext instance to retrieve metadata about the current invocation of the callback.

  • data (Any) – The data contained in the message received on the read stream.

on_watermark(context)

Callback invoked upon receipt of a WatermarkMessage on the operator’s ReadStream.

Parameters

context (OneInTwoOutContext) – A OneInTwoOutContext instance to retrieve metadata about the current invocation of the callback.

destroy()

Destroys the operator.

Invoked automatically by ERDOS once run() finishes its execution, or when a watermark for the top timestamp is received on the read stream, and can be used by the operator to teardown its state gracefully.

Operator Config

class erdos.operator.OperatorConfig(name=None, flow_watermarks=True, log_file_name=None, csv_log_file_name=None, profile_file_name=None)

An OperatorConfig allows developers to configure an operator.

An operator` can query the configuration passed to it by the driver by accessing the properties in self.config. The below example shows how a LoggerOperator can access the log file name passed to the operator by the driver:

class LoggerOperator(erdos.Operator):
    def __init__(self, input_stream):
        # Set up a logger.
        _log = self.config.log_file_name
        self.logger = erdos.utils.setup_logging(self.config.name, _log)
property name

Name of the operator.

property flow_watermarks

Whether to automatically pass on the low watermark.

property log_file_name

File name used for logging.

property csv_log_file_name

File name used for logging to CSV.

property profile_file_name

File named used for profiling an operator’s performance.

Context API

class erdos.context.SinkContext(timestamp, config)

A SinkContext instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in a Sink operator.

timestamp

The timestamp of the current invocation of the callback.

Type

Timestamp

config

The operator config generated by the driver upon connection of the operator to the graph.

Type

OperatorConfig

class erdos.context.OneInOneOutContext(timestamp, config, write_stream)

A OneInOneOutContext instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in a OneInOneOut operator.

timestamp

The timestamp of the current invocation of the callback.

Type

Timestamp

config

The operator config generated by the driver upon connection of the operator to the graph.

Type

OperatorConfig

write_stream

The write stream to send results to downstream operators.

Type

WriteStream

class erdos.context.TwoInOneOutContext(timestamp, config, write_stream)

A TwoInOneOutContext instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in a TwoInOneOut operator.

timestamp

The timestamp of the current invocation of the callback.

Type

Timestamp

config

The operator config generated by the driver upon connection of the operator to the graph.

Type

OperatorConfig

write_stream

The write stream to send results to downstream operators.

Type

WriteStream

class erdos.context.OneInTwoOutContext(timestamp, config, left_write_stream, right_write_stream)

A OneInTwoOutContext instance enables developers to retrieve metadata about the current invocation of either a message or a watermark callback in a OneInTwoOut operator.

timestamp

The timestamp of the current invocation of the callback.

Type

Timestamp

config

The operator config generated by the driver upon connection of the operator to the graph.

Type

OperatorConfig

left_write_stream

The first write stream to send results to downstream operators.

Type

WriteStream

right_write_stream

The second write stream to send results to downstream operators.

Type

WriteStream

Examples

Full example at python/examples/simple_pipeline.py.

Periodically Publishing Data

class SendOp(Source):
    def __init__(self):
        print("initializing source op")

    def run(self, write_stream: WriteStream):
        count = 0
        while True:
            msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
            print("SendOp: sending {msg}".format(msg=msg))
            write_stream.send(msg)

            count += 1
            time.sleep(1)

Processing Data via Callbacks

class CallbackOp(Sink):
    def __init__(self):
        print("initializing callback op")

    def on_data(self, context: SinkContext, data: Any):
        print("CallbackOp: received {}".format(data))

Processing Data by Pulling Messages

class PullOp(Sink):
    def __init__(self):
        print("initializing pull op using read")

    def run(self, read_stream: ReadStream):
        while True:
            data = read_stream.read()
            print("PullOp: received {data}".format(data=data))