In the last blog post, I gave an overview of our new Connectors APIs, discussing how they work under the hood and going over some of the examples we provide. In this post, we are going to take a deeper dive into how to build a connector to pull data from PostgreSQL. We’ll also talk a bit about some of the different approaches for building both external source and sink connectors.
What is Wallaroo
Wallaroo is a framework for building and operating scalable Python applications. Write your domain logic once and Wallaroo handles the rest.
Our goal is to make sure—regardless of where your data is coming from—that you can scale your application logic horizontally. All while removing the challenges that otherwise come with distributed applications.
What are we building
To demonstrate how easy it is to swap connectors, we are going to use the
alerts_stateless application. This application processes a stream of bank transactions. If the amount of a deposit or a withdrawal is greater than 1000, we trigger an alert. For the Source Connector, instead of generating our data inside of our Wallaroo application, we’ll read data from a PostgreSQL table. The Sink Connector will take the alert strings sent by Wallaroo and save them in a table.
One way to get the new data from a PostgreSQL table is to use the
LISTEN/NOTIFY that PostgreSQL provides. Each time a change is made to a table that you are monitoring
NOTIFY sends the the data to all the connected clients that have issued a
Before we start building our connectors, the
alert_stateless application needs a few modifications to the example. First we need to remove this line:
gen_source = wallaroo.GenSourceConfig(TransactionsGenerator())
replacing it with this:
# Add an additional import import wallaroo.experimental # inside of the application_setup function transactions_source = wallaroo.experimental.SourceConnectorConfig( "transaction_feed", encoder=encode_feed, decoder=decode_feed, port=7100) alert_feed = wallaroo.experimental.SinkConnectorConfig( "alert_feed", encoder=encode_conversion, decoder=decode_conversion, port=7200) # change the to_sink function .to_sink(alert_feed)
Building a Source
To build our source connector we need to have a table created with the proper schema. If this schema changes, then our Wallaroo application will need to as well. Since we are using the new Python API for Connectors, we’re also able to use any python library.
To work with PostgreSQL in Python, we’re going to use the Psycopg library. Psycopg works well with the
LISTEN/NOTIFY API but other libraries would work here as well.
After importing our modules, we need to specify if there are any required or optional parameters.
connector = wallaroo.experimental.SourceConnector(required_params=['connection'], optional_params=) connector.connect() connection_string = connector.params.connection
We specify that we’re creating a Source Connector and
connection is a required parameter with no optional parameters. After calling
connect, we can then extract those parameters as variables and use them later on in our script.
conn = psycopg2.connect(connection_string) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) curs = conn.cursor()
connection_string is passed when we start our connector script and needs to be in the format
"dbname=test user=postgres password=secret". More information on
psycopg2.connect can be found here. Then we set the isolation level to autocommit so that each query is explicitly a new transaction. Finally, we create a new cursor object.
By now we have a connection to our Postgres DB and now just need to specify a trigger so that Postgres knows which table it should provide events for. For our temperature table, this will look something like this:
# We define a trigger function called Wallaroo_example curs.execute(""" CREATE OR REPLACE FUNCTION NOTIFY() RETURNS trigger AS $BODY$ BEGIN PERFORM pg_notify('wallaroo_example', row_to_json(NEW)::text); RETURN new; END; $BODY$ LANGUAGE 'plpgsql' VOLATILE COST 100; """)
# We create a trigger function curs.execute(""" CREATE TRIGGER USERS_AFTER AFTER INSERT ON Alerts FOR EACH ROW EXECUTE PROCEDURE NOTIFY(); """) # Then listen on the channel `wallaroo_example` curs.execute("LISTEN wallaroo_example;")
Now all we need to do is poll for events from PostgreSQL. When a new notify event is received, we pop it from the dictionary.
while True: if select.select([conn], , , 5) == (, , ): print "Timeout" else: conn.poll() while conn.notifies: notify = conn.notifies.pop(0) payload = json.loads(notify.payload) connector.write(payload["content"])
connector.write() is what sends data to your Wallaroo application. In our case, we take the payload sent from PostgreSQL and then decodes it into JSON. This is then sent to our Wallaroo application.
Building a Sink
For PostgreSQL, building a Source Connector meant that we used the
NOTIFY/LISTEN API to retrieve change events. A Sink Connector is a little different. If we are writing to PostgreSQL we need to know the schema of the table. If changes need to be made to this table then both our External Sink and the External Application need to be aware; otherwise bad things can happen.
When a transaction is over the specified limit in our application we’ll take that alert and have our connector save it to a table storing all of those alerts. For our Sink Connector all that’s saved is the string sent from Wallaroo. Along with a
created_at timestamp and an id as the primary key.
First we need to create our tables schema. In production, it’d be a good idea to validate that our connector has the most up to date schema.
curs.execute(""" CREATE TABLE Alert_log ( id SERIAL PRIMARY KEY, alert_log VARCHAR, created_at timestamp with time zone NOT NULL default now() ) """)
Now that our table is created, we need to write the data coming from Wallaroo to our Postgres database. We’re still using the psycopg2 python library.
while True: alert_string = connector.read() curs.execute(""" INSERT INTO alert_log (alert_log) VALUES (%s); """, (alert_string))
Our new connector APIs make getting data in and out of Wallaroo much easier than before. In this post, we took a look at how you could build both a Source and Sink connector with a relational database.
Starting with the 0.5.3 release, we’ve included examples of other connectors, for example for use with Kafka, Kinesis, and S3. Please check them out and let us know if you’re writing a connector for your preferred data source.
Did you enjoy this post?
Subscribe to our blog and be updated each time we add a new post.