Simplified database transaction management with the Vert.x RxJava API

TL;DR As of 3.5, man­ag­ing data­base trans­ac­tions with Vert.x re­quires a lot of boil­er­plate code. Vert.x 3.6 will pro­vide helpers and Observable trans­form­ers to eas­ily make a re­ac­tive flow trans­ac­tional.

Simple queries with the “Rxified” API

The Vert.x API for Rx­Java is one of the most pop­u­lar mod­ules in the Vert.x stack.

So we make sure the “Rx­i­fied” API is easy to use for com­mon pro­gram­ming tasks, such as read­ing rows from a re­la­tional data­base and send­ing the re­sult to the client:

dbClient.rxQuery("SELECT name, duration FROM tracks WHERE album = 'The Israelites'")
  .map(ResultSet::getResults)
  .map(rows -> {
    // Transform DB rows into a client-friendly JSON object
  })
  .subscribe(json -> {
    // Send JSON to the client
  }, t -> {
    // Send error to the client
  });

Managing transactions with Vert.x 3.5

But very often, de­vel­op­ers have to im­ple­ment com­plex in­ter­ac­tions with the data­base, run­ning in­side a sin­gle trans­ac­tion. To do so, the im­ple­men­ta­tion must fol­low this process:

  • get a con­nec­tion from the pool,
  • start a trans­ac­tion,
  • ex­e­cute queries,
  • if all queries suc­ceed, com­mit the trans­ac­tion,
  • oth­er­wise, roll­back the changes.

How does that trans­late to code?

// Get a connection from the pool
dbClient.rxGetConnection().flatMap(sqlConnection -> {
  // Setting auto-commit mode to false implicitely starts a transaction
  return sqlConnection.rxSetAutoCommit(false)
    .andThen(
      // Database queries
      sqlConnection.rxExecute("INSERT INTO albums (name) VALUES ('The Israelites')")
        .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Israelites')"))
        .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Too Much Too Soon')"))
        .andThen(sqlConnection.rxQuery("SELECT name FROM tracks WHERE album = 'The Israelites'").map(ResultSet::getResults))
    )
    // Commit if all queries succeed
    .flatMap(rows -> sqlConnection.rxCommit().andThen(Single.just(rows)))
    .onErrorResumeNext(throwable -> {
      // On error, rollback the changes
      return sqlConnection.rxRollback().onErrorComplete()
        .andThen(sqlConnection.rxSetAutoCommit(true).onErrorComplete())
        .andThen(Single.error(throwable));
    }).flatMap(rows -> sqlConnection.rxSetAutoCommit(true).andThen(Single.just(rows)))
    .doFinally(sqlConnection::close);
}).map(rows -> {
  // Transform DB rows into a client-friendly JSON object
}).subscribe(json -> {
  // Send JSON to the client
}, t -> {
  // Send error to the client
});

That is a lot of boil­er­plate around the spe­cific data­base queries… It would be bet­ter to re­lieve the de­vel­oper from main­tain­ing it.

Vert.x 3.6 tools for transaction management

That is why Vert.x 3.6 will pro­vide Observable trans­form­ers that can be ap­plied to re­ac­tive flows with compose to make them trans­ac­tional:

  • SQLClientHelper#txFlowableTransformer
  • SQLClientHelper#txObservableTransformer
  • SQLClientHelper#txSingleTransformer
  • SQLClientHelper#txMaybeTransformer
  • SQLClientHelper#txCompletableTransformer

These trans­form­ers wrap the cor­re­spond­ing source of events with SQL trans­ac­tion man­age­ment.

dbClient.rxGetConnection().flatMap(sqlConnection -> {
  return sqlConnection.rxExecute("INSERT INTO albums (name) VALUES ('The Israelites')")
    .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Israelites')"))
    .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Too Much Too Soon')"))
    .andThen(sqlConnection.rxQuery("SELECT name FROM tracks WHERE album = 'The Israelites'").map(ResultSet::getResults))
    .compose(SQLClientHelper.txSingleTransformer(sqlConnection))
    .doFinally(sqlConnection::close);
}).map(rows -> {
  // Transform DB rows into a client-friendly JSON object
}).subscribe(json -> {
  // Send JSON to the client
}, t -> {
  // Send error to the client
});

Source trans­form­ers pro­vide max­i­mum flex­i­bil­ity: you are still able to ex­e­cute op­er­a­tions with the con­nec­tion after the trans­ac­tion com­pletes.

How­ever, you usu­ally do not need the con­nec­tion after the changes are com­mited or roll­backed. In this case, you may sim­ply cre­ate you source ob­serv­able with one of the trans­ac­tional helper meth­ods in io.vertx.reactivex.ext.sql.SQLClientHelper.

Let’s rewrite the pre­vi­ous ex­am­ple:

SQLClientHelper.inTransactionSingle(client, sqlConnection -> {
  return sqlConnection.rxExecute("INSERT INTO albums (name) VALUES ('The Israelites')")
    .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Israelites')"))
    .andThen(sqlConnection.rxExecute("INSERT INTO tracks (album, name) VALUES ('The Israelites', 'Too Much Too Soon')"))
    .andThen(sqlConnection.rxQuery("SELECT name FROM tracks WHERE album = 'The Israelites'").map(ResultSet::getResults))
}).map(rows -> {
  // Transform DB rows into a client-friendly JSON object
}).subscribe(json -> {
  // Send JSON to the client
}, t -> {
  // Send error to the client
});

Give it a try

Vert.x 3.6 is ex­pected around fall, but the code is al­ready in mas­ter and snap­shots are reg­u­larly pub­lished to Sonatype’s OSS repos.

So give it a try and feel free to pro­vide your fee­back on our user or dev chan­nels.

Next post

Eclipse Vert.x 3.5.3

We have just released Vert.x 3.5.3, a bug fix release of Vert.x 3.5.x.

Read more
Previous post

Eclipse Vert.x 3.5.2

We have just released Vert.x 3.5.2, a bug fix release of Vert.x 3.5.x.

Read more
Related posts

Building services and APIs with AMQP 1.0

Microservices and APIs are everywhere. Everyone talks about them, presentation slides are full of them ... some people are actually even building them.

Read more

Eclipse Vert.x meets GraphQL

In this blog post, we will look at an example application written in Vert.x that uses the new GraphQL API of Gentics Mesh.

Read more

Contract Driven REST Services with Vert.x3

We see a new trend in development where we are shifting from developing applications to APIs. More and more we see services being offered as REST APIs that we are allowed to consume.

Read more