Friday, 5 October 2018

Managing HANA system replication takeover with Spring and Hibernate

SAP HANA system replication provides the possibility to copy and continuously synchronize a SAP HANA database to a secondary location in the same or another data center. Usually system replication is used to support high availabilty and disaster recovery. SAP HANA 2.0 introduced the Active/Active (read enabled) system replication operation mode, which allows read-only access to the secondary site.

SAP HANA 2 SPS 03 introduced the invisible takeover feature which automatically reroutes client connections to the secondary system in the event of a takeover. However, in the current state, a takeover from the primary to the secondary system can lead to hanging JDBC connections or invalid transaction states in a Java application connected to the database.

SAP HANA System Replication, SAP HANA Guides, SAP HANA Study Materials, SAP HANA Tutorial and Material

This blog will illustrate how to handle takeover on the application side to prevent errors from reaching the end user until the takeover has completed.

Prerequisites

◈ You have set up a SAP HANA Active/Active (read enabled) system, for example, as shown in the SAP HANA Academy video tutorial
◈ You have created a Spring Boot Java application using Hibernate as the JPA implementation
◈ You have set up a virtual IP address or a virtual host name for the database host which you use as the host name in your JDBC connection string.

Adapt the application


Configure the connection pool


The application’s connection pool must be configured to validate the connections when borrowing them from the pool.

If you use the Tomcat connection pool, add the following properties to your application.properties file:

spring.datasource.tomcat.test-on-borrow=true
spring.datasource.tomcat.validationQuery=SELECT 1 from dummy

If you use a different connection pool, you need to check in its documentation how to configure the connection validation.

Set up automatic retries


The application should be structured in a way that operations can be retried until they succeed.This ususally means that these operations must be executed within a database transaction that can be rolled back or committed depending on the outcome of the operation.

Spring provides the @Transactional annotation to tell the framework that an annotated method should be executed within a transaction. You can also add the annotation to a class which means that all methods of the class should be executed within a transaction.

@Repository
@Transactional
public interface ItemRepository extends JpaRepository<Item, Integer> {

  @Query(value = "SELECT ID, CONTENT FROM ITEM WHERE ID=?1 WITH HINT( RESULT_LAG ('hana_sr', 60) )", nativeQuery = true)
  Item findOneOnSecondary(Integer id);

}

Now that the individual operations are atomic by means of the transactional behavior, automatic retries can be set up to retry the operations until the takeover has completed.

An easy to use retry mechanism is provided by Spring Retry. To use Spring Retry it must be added as a Maven dependency.

<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
</dependency>

Once the dependency has been added, you can add the @EnableRetry annotation to one of your @Configuration classes and the @Retryable annotation to any method that should be retried. The annotation defines attributes that let you configure the retry behavior, e.g. the delay between the retries or the maximum number of retries.

@RestController
@RequestMapping(path = "/item")
public class ItemController {

@Autowired
private ItemRepository itemRepository;

@Retryable(backoff = @Backoff(delay = 10000), maxAttempts = 60)
@RequestMapping(path = "{id}")
public Item getItem(@PathVariable("id") Integer id) {
return this.itemRepository.findOneOnSecondary( id );
}

@Retryable(backoff = @Backoff(delay = 10000), maxAttempts = 60)
@RequestMapping(path = "", method = RequestMethod.POST)
public int addItem(@RequestParam(name = "content") String content) {
Item item = new Item();
item.setContent( content );
this.itemRepository.save( item );
return item.getId();
}

@Retryable(backoff = @Backoff(delay = 10000), maxAttempts = 60)
@RequestMapping(path = "{id}", method = RequestMethod.DELETE)
public void deleteItem(@PathVariable("id") Integer id) {
this.itemRepository.delete( id );
}
}

In the code above the operations will be retried at most 60 times while waiting 10 seconds between each retry. Any errors that occur during the execution are caught and ignored. Only after the maximum number of retries has been exhausted an error will be propagated to the caller.

Set up connection handling


Now that the operations are retried during a takeover, you need to take care of connections that are in a state in which they don’t notice that the primary database server has failed. Such connections would continue to try to execute queries against the failed primary server until all retries are exhausted, even if the secondary server has taken over. These connections must be manually disconnected so they can reconnect to the new primary after the takeover.

There are two possibilities to implement the disconnecting of hanging connections

1. Via a retry listener
2. Via a custom transaction manager

Connection handling via a retry listener


Spring Retry provides additional customizing options via so called retry listeners. A retry listener can be used to react to retry events like “retry started”, “retry error”, and “retry completed”.

For the connection handling the “retry error” callback is the most appropriate. This callback is called whenever a retry failed and it provides information about the current state of the retry operation and the exception that was caught.

There are two situations that need to be handled

1. The current operation was interrupted by the takeover
2. The current operation couldn’t be started

For the first situation the retry handler must check if the root cause of the exception is either a SQLNonTransientConnectionException or a JDBCDriverException. Both these exceptions can indicate that the connection to the server was lost. If such an exception is encountered the retry listener must manually disconnect the connection. The connection will then be reestablished if possible on the next retry.

The second situation can occur when the takeover left a transaction object in an inconsistent state. In this case the transaction object must be reset so it can be re-used on the next retry.

import java.sql.SQLNonTransientConnectionException;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.PersistenceUnit;

import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.orm.jpa.EntityManagerHolder;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.sap.db.jdbc.exceptions.JDBCDriverException;

public class TakeoverHandlingRetryListener implements RetryListener {

  @PersistenceUnit
  private EntityManagerFactory emf;

  @Override
  public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
    return true;
  }

  @Override
  public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
  }

  @Override
  public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    // check if the root cause is a connection failure
    if ( isSQLNonTransientConnectionExceptionSapDB( throwable ) ) {
      EntityManagerHolder emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource( this.emf );
      if ( emHolder != null ) {
        EntityManager entityManager = emHolder.getEntityManager();
        if ( entityManager != null ) {
          SessionImplementor s = entityManager.unwrap( SessionImplementor.class );
          try {
            // extract the logical database connection
            LogicalConnectionImplementor logicalConnection = s.getJdbcCoordinator().getLogicalConnection();
            if ( logicalConnection.isOpen() ) {
              // close the  physical connection
              JdbcUtils.closeConnection( logicalConnection.getPhysicalConnection() );
              // manually disconnect
              s.getJdbcCoordinator().getLogicalConnection().manualDisconnect();
            }
          }
          catch (Exception e) {
            // ignore
          }
        }
      }
    }
    // check if a transaction couldn't be started because of an invalid state
    else if ( throwable instanceof CannotCreateTransactionException && throwable.getCause() instanceof IllegalStateException ) {
      EntityManagerHolder emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource( this.emf );
      if ( emHolder != null ) {
        EntityManager entityManager = emHolder.getEntityManager();
        if ( entityManager != null ) {
          try {
            // mark the transaction as read-only to prevent any changes to be written to the database
            entityManager.getTransaction().setRollbackOnly();
          }
          catch (Exception e) {
            // ignore
          }

          try {
            // execute a commit which will reset the transaction state so it can be restarted
            SessionImplementor s = entityManager.unwrap( SessionImplementor.class );
            s.getTransactionCoordinator().getTransactionDriverControl().commit();
          }
          catch (Exception e) {
            // ignore
          }
        }
      }
    }
  }

  private boolean isSQLNonTransientConnectionExceptionSapDB(Throwable throwable) {
    // check if the exception indicates a connection loss
    if ( throwable instanceof SQLNonTransientConnectionException || throwable instanceof JDBCDriverException ) {
      return true;
    }

    // check the cause of the exception if there is one
    if ( throwable.getCause() != null ) {
      return isSQLNonTransientConnectionExceptionSapDB( throwable.getCause() );
    }

    return false;
  }
}

To use the retry listener simply add it as a bean to your application configuration.

@Bean
public RetryListener retryListener() {
  return new TakeoverHandlingRetryListener();
}

Connection handling via a custom transaction manager


Instead of a retry listener taking care of broken connections a custom transaction manager can also be used. The transaction manager can intercept the “begin transaction”, “rollback”, and “commit” calls and do the necessary cleanup within this call.

Note that the general retry mechanism of Spring Retry is still required. Only the retry listener can be replaced by the transaction manager.

The easiest way to implement a custom transaction manager is to extend one of the provided Spring transaction managers, for example, the JpaTransactionManager.

import java.lang.reflect.Method;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.orm.jpa.EntityManagerHolder;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.UnexpectedRollbackException;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.ReflectionUtils;

import com.sap.db.jdbc.exceptions.JDBCDriverException;

public class TakeoverHandlingJpaTransactionManager extends JpaTransactionManager {

  private static final long serialVersionUID = 1L;

  /**
   * The maximum number of times a transaction start should be retried
   */
  @Value("${transaction.retries:60}")
  private int retries;

  /**
   * The number of milliseconds to wait between retries
   */
  @Value("${transaction.timeout:10000}")
  private long timeout;

  public TakeoverHandlingJpaTransactionManager(EntityManagerFactory emf) {
    super( emf );
  }

  /**
   * {@inheritDoc}
   */
  @Override
  protected void doCommit(DefaultTransactionStatus status) {
    EntityManager entityManager = getEntityManager( status.getTransaction() );

    if ( entityManager == null ) {
      // no entity manager found => execute normally
      super.doCommit( status );
      return;
    }

    SessionImplementor s = entityManager.unwrap( SessionImplementor.class );
    LogicalConnectionImplementor logicalConnection = s.getJdbcCoordinator().getLogicalConnection();
    if ( !isConnectionValid( logicalConnection ) ) {
      try {
        // roll back the transaction to reset the status
        rollback( status );
      }
      catch (TransactionException e) {
        // ignore
      }
      // throw an UnexpectedRollbackException to trigger the correct behavior in the parent commit() call
      throw new UnexpectedRollbackException( "The connection is invalid" );
    }

    try {
      // commit the transaction
      super.doCommit( status );
    }
    catch (Exception e) {
      // Check if the connection needs to be reset and then re-throw the exception
      resetTransactionAndThrow( e, logicalConnection );
    }
  }

  /**
   * Get the entity manager either from the current transaction object or from the
   * {@link TransactionSynchronizationManager}.
   *
   * @param transaction The current transaction object
   * @return The entity manager if found, or {@code null} otherwise
   */
  private EntityManager getEntityManager(Object transaction) {
    // extract the entity manager from the transaction object
    Method method = ReflectionUtils.findMethod( transaction.getClass(), "getEntityManagerHolder" );
    method.setAccessible( true );
    EntityManagerHolder emHolder = (EntityManagerHolder) ReflectionUtils.invokeMethod( method, transaction );
    if ( emHolder == null ) {
      // no entity manager holder found on the transaction object => try the TransactionSynchronizationManager
      emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource( getEntityManagerFactory() );
    }

    if ( emHolder == null ) {
      return null;
    }

    return emHolder.getEntityManager();
  }

  /**
   * Check if the {@link Throwable} is caused by a connection loss
   *
   * @param throwable The throwable to check
   * @return {@code true} if the {@link Throwable} is caused by a connection loss, {@code false} otherwise
   */
  private boolean isSQLNonTransientConnectionExceptionSapDB(Throwable throwable) {
    if ( throwable instanceof SQLNonTransientConnectionException || throwable instanceof JDBCDriverException ) {
      return true;
    }

    if ( throwable.getCause() != null ) {
      return isSQLNonTransientConnectionExceptionSapDB( throwable.getCause() );
    }

    return false;
  }

  /**
   * {@inheritDoc}
   */
  @Override
  protected void doRollback(DefaultTransactionStatus status) {
    try {
      super.doRollback( status );
    }
    catch (Exception e) {
      EntityManager entityManager = getEntityManager( status.getTransaction() );
      if ( entityManager == null ) {
        throw e;
      }

      SessionImplementor s = entityManager.unwrap( SessionImplementor.class );
      LogicalConnectionImplementor logicalConnection = s.getJdbcCoordinator().getLogicalConnection();
      // Check if the connection needs to be reset and then re-throw the exception
      resetTransactionAndThrow( e, logicalConnection );
    }
  }

  /**
   * Check if the given logical connection is valid
   *
   * @param logicalConnection The connection to check
   * @return {@code true} if the connection is valid, {@code false} otherwise
   */
  private boolean isConnectionValid(LogicalConnectionImplementor logicalConnection) {
    if ( logicalConnection.isOpen() ) {
      try {
        // check if the connection is valid
        if ( logicalConnection.getPhysicalConnection().isValid( 1000 ) ) {
          return true;
        }

        // the connection is invalid => disconnect manually
        logicalConnection.manualDisconnect();
      }
      catch (SQLException e) {
        try {
          // Check if the connection needs to be reset and then re-throw the exception
          resetTransactionAndThrow( e, logicalConnection );
        }
        catch (Exception ex) {
          // ignore
        }
      }
    }

    return false;
  }

  /**
   * Check if the given connection is potentially caused by a connection loss. If so, disconnect the connection
   * explicitly before re-throwing the exception.
   *
   * @param e The exception to check
   * @param logicalConnection The logical connection that will be disconnection if necessary
   */
  private void resetTransactionAndThrow(Exception e, LogicalConnectionImplementor logicalConnection) {
    if ( isSQLNonTransientConnectionExceptionSapDB( e ) ) {
      try {
        if ( logicalConnection.isOpen() ) {
          logicalConnection.manualDisconnect();
        }
      }
      catch (Exception ex) {
        // ignore
      }
      sneakyThrow( e );
    }
    else {
      sneakyThrow( e );
    }
  }

  /**
   * Sneakily throw an exception
   *
   * @param e The exception to throw
   * @throws E The type of the exception to throw
   * @see <a href="https://www.baeldung.com/java-sneaky-throws">https://www.baeldung.com/java-sneaky-throws</a>
   */
  @SuppressWarnings("unchecked")
  private static <E extends Exception> void sneakyThrow(Exception e) throws E {
    throw (E) e;
  }

  /**
   * {@inheritDoc}
   */
  @Override
  protected void doBegin(Object transaction, TransactionDefinition definition) {
    EntityManager entityManager = getEntityManager( transaction );
    if ( entityManager == null ) {
      // no entity manager found => execute normally
      super.doBegin( transaction, definition );
      return;
    }

    SessionImplementor s = entityManager.unwrap( SessionImplementor.class );

    int count = 0;
    do {
      LogicalConnectionImplementor logicalConnection = s.getJdbcCoordinator().getLogicalConnection();

      // check if the connection is valid
      if ( !isConnectionValid( logicalConnection ) ) {
        try {
          Thread.sleep( this.timeout );
        }
        catch (InterruptedException ex) {
          sneakyThrow( ex );
        }
        continue;
      }

      try {
        super.doBegin( transaction, definition );
      }
      catch (CannotCreateTransactionException e) {
        if ( e.getCause() instanceof IllegalStateException ) {
          try {
            // mark the transaction as read-only to prevent any changes to be written to the database
            entityManager.getTransaction().setRollbackOnly();
          }
          catch (Exception ex) {
            // ignore
          }

          try {
            // execute a commit which will reset the transaction state so it can be restarted
            s.getTransactionCoordinator().getTransactionDriverControl().commit();
          }
          catch (Exception ex) {
            // ignore
          }

          try {
            // wait for the timeout period to start the next try
            Thread.sleep( this.timeout );
          }
          catch (InterruptedException ex) {
            sneakyThrow( ex );
          }

          count++;
          continue;
        }

        throw e;
      }

      return;
    } while ( count < this.retries );

    if ( count >= this.retries ) {
      throw new CannotCreateTransactionException( "Timeout reached while trying to create a transaction" );
    }
  }
}

To use the transaction manager simply add it as a bean to your application configuration.

@PersistenceUnit
private EntityManagerFactory emf;

@Bean
public PlatformTransactionManager transactionManager() {
return new TakeoverHandlingJpaTransactionManager( this.emf );
}

No comments:

Post a Comment