org.apache.nifi.components.PropertyValue.evaluateAttributeExpressions()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中org.apache.nifi.components.PropertyValue.evaluateAttributeExpressions()方法的一些代码示例,展示了PropertyValue.evaluateAttributeExpressions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。PropertyValue.evaluateAttributeExpressions()方法的具体详情如下:
包路径:org.apache.nifi.components.PropertyValue
类名称:PropertyValue
方法名:evaluateAttributeExpressions

PropertyValue.evaluateAttributeExpressions介绍

[英]Replaces values in the Property Value using the NiFi Expression Language; a PropertyValue with the new value is then returned, supporting call chaining. Before executing the expression language statement any variables names found within any underlying VariableRegistry will be substituted with their values.
[中]使用NiFi表达式语言替换属性值中的值;然后返回带有新值的PropertyValue,支持调用链接。在执行expression language语句之前,在任何基础VariableRegistry中找到的任何变量名称都将替换为它们的值。

代码示例

代码示例来源:origin: apache/nifi

@Override
  protected String getBaseUrl(FlowFile inputFlowFile, ProcessContext context) {
    return context.getProperty(URL).evaluateAttributeExpressions(inputFlowFile).getValue();
  }
}

代码示例来源:origin: apache/nifi

@Override
  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
      return;
    }

    session.adjustCounter(context.getProperty(COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(),
        Long.parseLong(context.getProperty(DELTA).evaluateAttributeExpressions(flowFile).getValue()),
        false
    );
    session.transfer(flowFile, SUCCESS);
  }
}

代码示例来源:origin: apache/nifi

private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map<String, String> statefulAttributes) {
  try {
    // evaluate the expression for the given flow file
    return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean();
  } catch (final Exception e) {
    getLogger().error(String.format("Could not evaluate the condition '%s' while processing Flowfile '%s'", condition.getExpression(), flowfile));
    throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), e), e);
  }
}

代码示例来源:origin: apache/nifi

/**
 * Report the registered metrics.
 *
 * @param context used for getting the most recent {@link ProcessGroupStatus}.
 */
@Override
public void onTrigger(ReportingContext context) {
  String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue();
  ProcessGroupStatus statusToReport = groupId == null
      ? context.getEventAccess().getControllerStatus()
      : context.getEventAccess().getGroupStatus(groupId);
  if (statusToReport != null) {
    currentStatusReference.set(statusToReport);
    reporter.report();
  } else {
    getLogger().error("Process group with provided group id could not be found.");
  }
}

代码示例来源:origin: apache/nifi

/**
 * Helper method to create InfluxDB instance
 * @return InfluxDB instance
 */
protected synchronized InfluxDB getInfluxDB(ProcessContext context) {
  if ( influxDB.get() == null ) {
    String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
    String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
    long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
    String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
    try {
      influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
    } catch(Exception e) {
      getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
      throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
    }
    getLogger().info("InfluxDB connection created for host {}",
        new Object[] {influxDbUrl});
  }
  return influxDB.get();
}

代码示例来源:origin: apache/nifi

@Override
protected String getPath(final ProcessContext context) {
  return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue();
}

代码示例来源:origin: apache/nifi

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  FlowFile flowFile = session.get();
  if (flowFile == null) {
    return;
  }
  for (int i = 1; i <= context.getProperty(NUM_COPIES).evaluateAttributeExpressions(flowFile).asInteger(); i++) {
    FlowFile copy = session.clone(flowFile);
    copy = session.putAttribute(copy, COPY_INDEX_ATTRIBUTE, Integer.toString(i));
    session.transfer(copy, REL_SUCCESS);
  }
  flowFile = session.putAttribute(flowFile, COPY_INDEX_ATTRIBUTE, "0");
  session.transfer(flowFile, REL_SUCCESS);
}

代码示例来源:origin: apache/nifi

/**
 *
 */
private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
  String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
  if (pv != null){
    return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
  }
  return null;
}

代码示例来源:origin: apache/nifi

/**
 * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
 * producing a result {@link FlowFile}.
 * <br>
 * The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS}
 * <br>
 * The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE}
 *
 */
@Override
protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
  boolean processed = false;
  FlowFile flowFile = session.get();
  if (flowFile != null) {
    flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    if (!this.isFailedFlowFile(flowFile)) {
      session.getProvenanceReporter().send(flowFile,
          context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/"
          + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
      session.transfer(flowFile, REL_SUCCESS);
    } else {
      session.transfer(session.penalize(flowFile), REL_FAILURE);
    }
    processed = true;
  }
  return processed;
}

代码示例来源:origin: apache/nifi

private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) {
  final List<String> cmds = new ArrayList<>();
  for (final PropertyDescriptor descriptor : descriptors) {
    cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
  }
  return cmds;
}

代码示例来源:origin: apache/nifi

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
  FlowFile flowFile = session.get();
  if (flowFile == null) {
    return;
  final String bucket = context.getProperty(BUCKET)
                .evaluateAttributeExpressions(flowFile)
                .getValue();
  final String key = context.getProperty(KEY)
                .evaluateAttributeExpressions(flowFile)
                .getValue();
  final Long generation = context.getProperty(GENERATION)
      .evaluateAttributeExpressions(flowFile)
      .asLong();
    storage.delete(BlobId.of(bucket, key, generation));
  } catch (Exception e) {
    getLogger().error(e.getMessage(), e);
    flowFile = session.penalize(flowFile);
    session.transfer(flowFile, REL_FAILURE);
    return;
  session.transfer(flowFile, REL_SUCCESS);
  final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
  getLogger().info("Successfully deleted GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});

代码示例来源:origin: apache/nifi

@Override
public MediaType contentType() {
  String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
  contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
  return MediaType.parse(contentType);
}

代码示例来源:origin: apache/nifi

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  final FlowFile flowFile = session.get();
  if ( flowFile == null ) {
    return;
    eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
  } catch (final Exception ex) {
    getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex, REL_INVALID_HTML}, ex);
    session.transfer(flowFile, REL_INVALID_HTML);
    return;
  final String prependValue = context.getProperty(PREPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
  final String appendValue = context.getProperty(APPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
  final String outputType = context.getProperty(OUTPUT_TYPE).getValue();
  final String attributeKey = context.getProperty(ATTRIBUTE_KEY).evaluateAttributeExpressions(flowFile).getValue();
    session.transfer(flowFile, REL_NOT_FOUND);
  } else {
      session.transfer(updatedFF, REL_SUCCESS);

代码示例来源:origin: apache/nifi

public RegexReplace(final byte[] buffer, final ProcessContext context) {
  this.buffer = buffer;
  final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
  numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
  additionalAttrs = new HashMap<>(numCapturingGroups);
}

代码示例来源:origin: apache/nifi

FlowFile flowFile = session.get();
if (flowFile == null) {
  return;
final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
  throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
    + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
  session.transfer(flowFile, REL_SUCCESS);
  session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
  session.transfer(session.penalize(flowFile), REL_FAILURE);
  getLogger().error("Failed while sending message to AMQP via " + publisher, e);

代码示例来源:origin: apache/nifi

private ReplaceTextCallback(ProcessContext context, FlowFile flowFile, int maxBufferSize) {
  this.regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
  this.flowFile = flowFile;
  this.charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
  final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
  this.numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
  this.buffer = new byte[maxBufferSize];
  this.groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
}

代码示例来源:origin: apache/nifi

storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
  storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
  sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
} else {
  storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
  storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
  sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
  getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
  throw new IllegalArgumentException(e);
} catch (InvalidKeyException e) {
  getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
  throw new IllegalArgumentException(e);

代码示例来源:origin: apache/nifi

@Override
protected String createTransitUri(ProcessContext context) {
  final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
  final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
  final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
  return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
}

代码示例来源:origin: apache/nifi

protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session,
    Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
  String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue();
  FlowFile flowFile = parent != null ? session.create(parent) : session.create();
  flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes(charset)), flowFile);
  flowFile = session.putAllAttributes(flowFile, extraAttributes);
  session.getProvenanceReporter().receive(flowFile, getURI(context));
  session.transfer(flowFile, rel);
}

代码示例来源:origin: apache/nifi

protected String makeProvenanceUrl(final ProcessContext context, String database) {
  return new StringBuilder("influxdb://")
    .append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/")
    .append(database).toString();
}

相关文章