org.apache.kafka.connect.errors.ConnectException类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.0k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中org.apache.kafka.connect.errors.ConnectException类的一些代码示例,展示了ConnectException类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ConnectException类的具体详情如下:
包路径:org.apache.kafka.connect.errors.ConnectException
类名称:ConnectException

ConnectException介绍

[英]ConnectException is the top-level exception type generated by Kafka Connect and connector implementations.
[中]ConnectException是Kafka Connect和connector实现生成的顶级异常类型。

代码示例

代码示例来源:origin: debezium/debezium

private void throwProducerFailureIfPresent() {
    if (producerFailure != null) {
      throw new ConnectException("An exception ocurred in the change event producer. This connector will be stopped.", producerFailure);
    }
  }
}

代码示例来源:origin: debezium/debezium

ConnectException rethrown = new ConnectException(e);
if (closingException != null) {
  rethrown.addSuppressed(closingException);
throw new ConnectException(closingException);

代码示例来源:origin: debezium/debezium

private <T> T format(final String pattern, final String s, final Supplier<T> value) {
    try {
      return value.get();
    } catch (final DateTimeParseException e) {
      LOGGER.error("Cannot parse time/date value '{}', expected format '{}'", s, pattern);
      throw new ConnectException(e);
    }
  }
}

代码示例来源:origin: debezium/debezium

private long longOffsetValue(Map<String, ?> values, String key) {
  Object obj = values.get(key);
  if (obj == null) return 0L;
  if (obj instanceof Number) return ((Number) obj).longValue();
  try {
    return Long.parseLong(obj.toString());
  } catch (NumberFormatException e) {
    throw new ConnectException("Source offset '" + key + "' parameter value " + obj + " could not be converted to a long");
  }
}

代码示例来源:origin: debezium/debezium

private static long longOffsetValue(Map<String, ?> values, String key) {
  Object obj = values.get(key);
  if (obj == null) return 0;
  if (obj instanceof Number) return ((Number) obj).longValue();
  try {
    return Long.parseLong(obj.toString());
  } catch (NumberFormatException e) {
    throw new ConnectException("Source offset '" + key + "' parameter value " + obj + " could not be converted to a long");
  }
}

代码示例来源:origin: debezium/debezium

private static int intOffsetValue(Map<String, ?> values, String key) {
  Object obj = values.get(key);
  if (obj == null) return 0;
  if (obj instanceof Number) return ((Number) obj).intValue();
  try {
    return Integer.parseInt(obj.toString());
  } catch (NumberFormatException e) {
    throw new ConnectException("Source offset '" + key + "' parameter value " + obj + " could not be converted to an integer");
  }
}

代码示例来源:origin: debezium/debezium

private PreparedStatement createPreparedStatement(String preparedQueryString) {
  return statementCache.computeIfAbsent(preparedQueryString, query -> {
    try {
      LOGGER.trace("Inserting prepared statement '{}' removed from the cache", query);
      return connection().prepareStatement(query);
    }
    catch (SQLException e) {
      throw new ConnectException(e);
    }
  });
}

代码示例来源:origin: debezium/debezium

protected static ColumnMapper instantiateMapper(Class<ColumnMapper> clazz, Configuration config) {
    try {
       ColumnMapper mapper = clazz.newInstance();
       if ( config != null ) {
         mapper.initialize(config);
       }
       return mapper;
    } catch (InstantiationException e) {
      throw new ConnectException("Unable to instantiate column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
    } catch (IllegalAccessException e) {
      throw new ConnectException("Unable to access column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
    } catch (Throwable e) {
      throw new ConnectException("Unable to initialize the column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
    }
  }
}

代码示例来源:origin: debezium/debezium

private String parseType(String columnName, String typeWithModifiers) {
  Matcher m = AbstractReplicationMessageColumn.TypeMetadataImpl.TYPE_PATTERN.matcher(typeWithModifiers);
  if (!m.matches()) {
    LOGGER.error("Failed to parse columnType for {} '{}'", columnName, typeWithModifiers);
    throw new ConnectException(String.format("Failed to parse columnType '%s' for column %s", typeWithModifiers, columnName));
  }
  String baseType = m.group("base").trim();
  final String suffix = m.group("suffix");
  if (suffix != null) {
    baseType += suffix;
  }
  baseType = TypeRegistry.normalizeTypeName(baseType);
  if (m.group("array") != null) {
    baseType = "_" + baseType;
  }
  return baseType;
}

代码示例来源:origin: debezium/debezium

/**
 * Create a stateful Avro fullname adjuster that logs a warning the first time an invalid fullname is seen and replaced
 * with a valid fullname, and throws an error if the replacement conflicts with that of a different original. This method
 * replaces all invalid characters with the underscore character ('_').
 *
 * @param logger the logger to use; may not be null
 * @return the validator; never null
 */
public static SchemaNameAdjuster create(Logger logger) {
  return create(logger, (original, replacement, conflict) -> {
    String msg = "The Kafka Connect schema name '" + original +
        "' is not a valid Avro schema name and its replacement '" + replacement +
        "' conflicts with another different schema '" + conflict + "'";
    throw new ConnectException(msg);
  });
}

代码示例来源:origin: debezium/debezium

/**
 * Determine the available GTID set for MySQL.
 *
 * @return the string representation of MySQL's GTID sets; never null but an empty string if the server does not use GTIDs
 */
public String knownGtidSet() {
  AtomicReference<String> gtidSetStr = new AtomicReference<String>();
  try {
    jdbc.query("SHOW MASTER STATUS", rs -> {
      if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
        gtidSetStr.set(rs.getString(5));// GTID set, may be null, blank, or contain a GTID set
      }
    });
  } catch (SQLException e) {
    throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
  }
  String result = gtidSetStr.get();
  return result != null ? result : "";
}

代码示例来源:origin: debezium/debezium

/**
 * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
 * expression patterns.
 *
 * @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null
 * @param mapperClassName the name of the Java class that implements {@code BiFunction<Column, Object, Object>} and that
 *            will be used to map actual values into values used in the output record; null if
 *            an existing mapping function should be removed
 * @param config the configuration to pass to the {@link ColumnMapper} instance; may be null
 * @return this object so that methods can be chained together; never null
 */
@SuppressWarnings("unchecked")
public Builder map(String fullyQualifiedColumnNames, String mapperClassName, Configuration config) {
  Class<ColumnMapper> mapperClass = null;
  if (mapperClassName != null) {
    try {
      mapperClass = (Class<ColumnMapper>) getClass().getClassLoader().loadClass(mapperClassName);
    } catch (ClassNotFoundException e) {
      throw new ConnectException("Unable to find column mapper class " + mapperClassName + ": " + e.getMessage(), e);
    } catch (ClassCastException e) {
      throw new ConnectException(
          "Column mapper class must implement " + ColumnMapper.class + " but does not: " + e.getMessage(),
          e);
    }
  }
  return map(fullyQualifiedColumnNames, mapperClass, config);
}

代码示例来源:origin: debezium/debezium

/**
 * Determine whether the MySQL server has GTIDs enabled.
 *
 * @return {@code false} if the server's {@code gtid_mode} is set and is {@code OFF}, or {@code true} otherwise
 */
public boolean isGtidModeEnabled() {
  AtomicReference<String> mode = new AtomicReference<String>("off");
  try {
    jdbc().query("SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", rs -> {
      if (rs.next()) {
        mode.set(rs.getString(2));
      }
    });
  } catch (SQLException e) {
    throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
  }
  return !"OFF".equalsIgnoreCase(mode.get());
}

代码示例来源:origin: debezium/debezium

/**
 * Determine if the current user has the named privilege. Note that if the user has the "ALL" privilege this method
 * returns {@code true}.
 *
 * @param grantName the name of the MySQL privilege; may not be null
 * @return {@code true} if the user has the named privilege, or {@code false} otherwise
 */
public boolean userHasPrivileges(String grantName) {
  AtomicBoolean result = new AtomicBoolean(false);
  try {
    jdbc.query("SHOW GRANTS FOR CURRENT_USER", rs -> {
      while (rs.next()) {
        String grants = rs.getString(1);
        logger.debug(grants);
        if (grants == null) return;
        grants = grants.toUpperCase();
        if (grants.contains("ALL") || grants.contains(grantName.toUpperCase())) {
          result.set(true);
        }
      }
    });
  } catch (SQLException e) {
    throw new ConnectException("Unexpected error while connecting to MySQL and looking at privileges for current user: ", e);
  }
  return result.get();
}

代码示例来源:origin: debezium/debezium

/**
   * Determine whether the MySQL server has the row-level binlog enabled.
   *
   * @return {@code true} if the server's {@code binlog_format} is set to {@code ROW}, or {@code false} otherwise
   */
  protected boolean isRowBinlogEnabled() {
    AtomicReference<String> mode = new AtomicReference<String>("");
    try {
      connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", rs -> {
        if (rs.next()) {
          mode.set(rs.getString(2));
        }
      });
    } catch (SQLException e) {
      throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG mode: ", e);
    }

    logger.debug("binlog_format={}", mode.get());
    return "ROW".equalsIgnoreCase(mode.get());
  }
}

代码示例来源:origin: debezium/debezium

/**
 * Wraps the specified exception in a {@link ConnectException}, ensuring that all useful state is captured inside
 * the new exception's message.
 *
 * @param error the exception; may not be null
 * @return the wrapped Kafka Connect exception
 */
protected ConnectException wrap(Throwable error) {
  assert error != null;
  String msg = error.getMessage();
  if (error instanceof ServerException) {
    ServerException e = (ServerException) error;
    msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + ".";
  } else if (error instanceof SQLException) {
    SQLException e = (SQLException) error;
    msg = e.getMessage() + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSQLState() + ".";
  }
  return new ConnectException(msg, error);
}

代码示例来源:origin: debezium/debezium

/**
 * Determine the earliest binlog filename that is still available in the server.
 *
 * @return the name of the earliest binlog filename, or null if there are none.
 */
protected String earliestBinlogFilename() {
  // Accumulate the available binlog filenames ...
  List<String> logNames = new ArrayList<>();
  try {
    logger.info("Checking all known binlogs from MySQL");
    connectionContext.jdbc().query("SHOW BINARY LOGS", rs -> {
      while (rs.next()) {
        logNames.add(rs.getString(1));
      }
    });
  } catch (SQLException e) {
    throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
  }
  if (logNames.isEmpty()) return null;
  return logNames.get(0);
}

代码示例来源:origin: apache/ignite

/**
 * A sink lifecycle method. Validates grid-specific sink properties.
 *
 * @param props Sink properties.
 */
@Override public void start(Map<String, String> props) {
  configProps = props;
  try {
    A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
    A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
    A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
  }
  catch (IllegalArgumentException e) {
    throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
  }
}

代码示例来源:origin: apache/ignite

/** {@inheritDoc} */
@Override public void start(Map<String, String> props) {
  try {
    A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
    A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
    A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
    A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
  }
  catch (IllegalArgumentException e) {
    throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
  }
  configProps = props;
}

代码示例来源:origin: debezium/debezium

@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
  if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
    throw new ConnectException(
        "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
  }
  config.validateAndRecord(ALL_FIELDS, logger::error);
  if (running.get()) {
    throw new IllegalStateException("Database history file already initialized to " + path);
  }
  super.configure(config, comparator);
  path = Paths.get(config.getString(FILE_PATH));
}

相关文章

微信公众号

最新文章

更多