Hikaricp源码解读(4)——Proxy*代理类介绍

x33g5p2x  于2021-12-19 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(431)

4、Proxy*代理类介绍

本文以v2.7.2源码为主进行分析

HikariCP不同于一般连接池如proxool、c3p0等使用动态代理实现连接的操作转移,而是通过Javassist结合部分自定义代码实现对应接口实现的方式,减少了代理连接建立的代价,以下是HikariCP中的主要代理相关类:

  • ProxyConnection.java
  • ProxyStatement.java
  • ProxyPreparedStatement.java
  • ProxyCallableStatement.java
  • ProxyResultSet.java
  • ProxyFactory.java(工厂类)
  • JavassistProxyFactory.java(代码重构)

以上代码通过Javassist进行代码重构建之后生成实际使用的对应接口代理类:

  • HikariProxyConnection.java
  • HikariProxyStatement.java
  • HikariProxyPreparedStatement.java
  • HikariProxyCallableStatement.java
  • HikariProxyResultSet.java

我们从JavassistProxyFactory.java入手,其核心代码如下:

try {
   // Cast is not needed for these
   String methodBody = "{ try { return delegate.method($$); } catch (SQLException e) { throw checkException(e); } }";
   generateProxyClass(Connection.class, ProxyConnection.class.getName(), methodBody);
   generateProxyClass(Statement.class, ProxyStatement.class.getName(), methodBody);
   generateProxyClass(ResultSet.class, ProxyResultSet.class.getName(), methodBody);

   // For these we have to cast the delegate
   methodBody = "{ try { return ((cast) delegate).method($$); } catch (SQLException e) { throw checkException(e); } }";
   generateProxyClass(PreparedStatement.class, ProxyPreparedStatement.class.getName(), methodBody);
   generateProxyClass(CallableStatement.class, ProxyCallableStatement.class.getName(), methodBody);

   modifyProxyFactory();
}
catch (Exception e) {
   throw new RuntimeException(e);
}

由上可以看出,核心构造方法为

  • generateProxyClass
  • modifyProxyFactory

其中generateProxyClass负责生成实际使用的代理类字节码,modifyProxyFactory对应修改工厂类中的代理类获取方法。

generateProxyClass

Proxy*.java中定义的方法,在继承类中不进行overwrite,其他方法使用delegate执行对应的方法(methodBody中的method替换成对应方法的方法名):

// 区分是否抛出SQLException异常
if (isThrowsSqlException(intfMethod)) {
   modifiedBody = modifiedBody.replace("method", method.getName());
}
else {
   modifiedBody = "{ return ((cast) delegate).method($$); }".replace("method", method.getName()).replace("cast", primaryInterface.getName());
}
modifyProxyFactory

直接替换Proxy*.java为HikariProxy*.java

switch (method.getName()) {
case "getProxyConnection":
   method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");
   break;
case "getProxyStatement":
   method.setBody("{return new " + packageName + ".HikariProxyStatement($$);}");
   break;
case "getProxyPreparedStatement":
   method.setBody("{return new " + packageName + ".HikariProxyPreparedStatement($$);}");
   break;
case "getProxyCallableStatement":
   method.setBody("{return new " + packageName + ".HikariProxyCallableStatement($$);}");
   break;
case "getProxyResultSet":
   method.setBody("{return new " + packageName + ".HikariProxyResultSet($$);}");
   break;
default:
   // unhandled method
   break;
}

由此可知,Proxy*.java中的实现是代理类的核心代码实现。以下通过ProxyConnection.java、ProxyStatement.java、ProxyResultSet.java三个类展开介绍:

ProxyConnection.java

本类中代码大致分为三部分:以close为关键的代码overwrite、独立实现的ClosedConnection(动态代理实现的唯一实例化对象)、Hikari 连接池的自定义函数逻辑。

  • 自定义函数

主要实现对打开的statement的缓存管理和连接标识:

// 用于标识连接被访问或存在可提交数据
final void markCommitStateDirty()
{
   if (isAutoCommit) {
      lastAccess = currentTime();
   }
   else {
      isCommitStateDirty = true;
   }
}

// 缓存statement
private synchronized <T extends Statement> T trackStatement(final T statement)
{
   openStatements.add(statement);

   return statement;
}

// 移出statement缓存
final synchronized void untrackStatement(final Statement statement)
{
   openStatements.remove(statement);
}

// 关闭全部已打开的statement(只在close方法中调用)
@SuppressWarnings("EmptyTryBlock")
private synchronized void closeStatements()
{
   final int size = openStatements.size();
   if (size > 0) {
      for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
         try (Statement ignored = openStatements.get(i)) {
            // automatic resource cleanup
         }
         catch (SQLException e) {
            LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                        poolEntry.getPoolName(), delegate);
            leakTask.cancel();
            poolEntry.evict("(exception closing Statements during Connection.close())");
            delegate = ClosedConnection.CLOSED_CONNECTION;
         }
      }

      openStatements.clear();
   }
}
  • ClosedConnection

java动态代理生成的全局唯一变量,作为已关闭连接的代理引用,为连接关闭后外界代理连接的引用调用提供处理,同时唯一类减少了内存消耗和比对代价。代码如下:

private static final class ClosedConnection {
   static final Connection CLOSED_CONNECTION = getClosedConnection();

   private static Connection getClosedConnection()
   {
      InvocationHandler handler = (proxy, method, args) -> {
         // 只保留3个方法的快速返回,其他均抛出异常。
         final String methodName = method.getName();
         if ("abort".equals(methodName)) {
            return Void.TYPE;
         }
         else if ("isValid".equals(methodName)) {
            return Boolean.FALSE;
         }
         else if ("toString".equals(methodName)) {
            return ClosedConnection.class.getCanonicalName();
         }

         throw new SQLException("Connection is closed");
      };

      return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
   }
}
  • overwrite

主要修改statement获取、close、状态修改相关的方法和数据变更:

// ************statement获取************
public PreparedStatement prepareStatement(String sql) throws SQLException
{
   // 先缓存statement,然后通过工厂方法生成代理类。
   return ProxyFactory.getProxyPreparedStatement(this, trackStatement(delegate.prepareStatement(sql)));
}

// 其他statement获取类似

// ***********状态修改相关***************
public void setTransactionIsolation(int level) throws SQLException
{
   // 设置状态
   delegate.setTransactionIsolation(level);
   // 记录变更值
   transactionIsolation = level;
   // 记录变更位
   dirtyBits |= DIRTY_BIT_ISOLATION;
}

//setCatalog、setReadOnly、setAutoCommit、
//setNetworkTimeout、setSchema五个方法类似

// **************close*****************
public final void close() throws SQLException
{
   // 关闭全部打开的连接(有可能触发连接关闭,所以放到判断外)
   closeStatements();

   if (delegate != ClosedConnection.CLOSED_CONNECTION) {
      ……
      try {
         // 如果存在提交数据且不是自动提交,则回滚重置数据
         if (isCommitStateDirty && !isAutoCommit) {
            delegate.rollback();
            lastAccess = currentTime();
            LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
         }

         // 如果存在设置变更位,则进行重置
         if (dirtyBits != 0) {
            poolEntry.resetConnectionState(this, dirtyBits);
            lastAccess = currentTime();
         }

         // 重置
         delegate.clearWarnings();
      }
      catch (SQLException e) {
         ……
      }
      finally {
         // 代理置为关闭连接
         delegate = ClosedConnection.CLOSED_CONNECTION;
         // 回收PoolEntry(若PoolEntry已经被关闭和remove,则由下个接收线程丢弃或者threadlocal中无引用被回收)
         poolEntry.recycle(lastAccess);
      }
   }
}

// ***********commit\rollback***********
public void commit\rollback() throws SQLException
{
   // 调用方法
   delegate.commit\rollback();
   isCommitStateDirty = false;
   // 更新时间
   lastAccess = currentTime();
}
ProxyStatement.java

由overwrite构成,主要包含执行方法、close两类。

// **************执行方法****************
public ResultSet executeQuery(String sql) throws SQLException
{
   // 记录执行
   connection.markCommitStateDirty();
   // 调用方法
   ResultSet resultSet = delegate.executeQuery(sql);
   // 生成代理ResultSet
   return ProxyFactory.getProxyResultSet(connection, this, resultSet);
}

// **************close****************
public final void close() throws SQLException
{
   // 放置重复关闭
   synchronized (this) {
      if (isClosed) {
         return;
      }

      isClosed = true;
   }

   // 移出缓存
   connection.untrackStatement(delegate);

   try {
      // 关闭代理
      delegate.close();
   }
   catch (SQLException e) {
      throw connection.checkException(e);
   }
}
ProxyResultSet.java

此类代理最简单,就只是分别为updateRow、insertRow、deleteRow增加了执行记录connection.markCommitStateDirty(),示例如下:

public void updateRow() throws SQLException
{
   connection.markCommitStateDirty();
   delegate.updateRow();
}

附录:Javassist更多介绍

相关文章