org.apache.nifi.components.PropertyValue.getValue()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(92)

本文整理了Java中org.apache.nifi.components.PropertyValue.getValue()方法的一些代码示例,展示了PropertyValue.getValue()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。PropertyValue.getValue()方法的具体详情如下:
包路径:org.apache.nifi.components.PropertyValue
类名称:PropertyValue
方法名:getValue

PropertyValue.getValue介绍

暂无

代码示例

代码示例来源:origin: apache/nifi

@Override
  protected String getBaseUrl(FlowFile inputFlowFile, ProcessContext context) {
    return context.getProperty(URL).evaluateAttributeExpressions(inputFlowFile).getValue();
  }
}

代码示例来源:origin: apache/nifi

@Override
  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
      return;
    }

    session.adjustCounter(context.getProperty(COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(),
        Long.parseLong(context.getProperty(DELTA).evaluateAttributeExpressions(flowFile).getValue()),
        false
    );
    session.transfer(flowFile, SUCCESS);
  }
}

代码示例来源:origin: apache/nifi

@OnScheduled
public void onScheduled(final ProcessContext context) {
  hostname = context.getProperty(DB_HOST).getValue();
  port = context.getProperty(DB_PORT).asInteger();
  username = context.getProperty(USERNAME).getValue();
  password = context.getProperty(PASSWORD).getValue();
  databaseName = context.getProperty(DB_NAME).getValue();
  tableName = context.getProperty(TABLE_NAME).getValue();
  try {
    rethinkDbConnection = makeConnection();
  } catch(Exception e) {
    getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e);
    throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e);
  }
  getLogger().info("RethinkDB connection created for host {} port {} and db {}",
      new Object[] {hostname, port,databaseName});
}

代码示例来源:origin: apache/nifi

/**
 * Report the registered metrics.
 *
 * @param context used for getting the most recent {@link ProcessGroupStatus}.
 */
@Override
public void onTrigger(ReportingContext context) {
  String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue();
  ProcessGroupStatus statusToReport = groupId == null
      ? context.getEventAccess().getControllerStatus()
      : context.getEventAccess().getGroupStatus(groupId);
  if (statusToReport != null) {
    currentStatusReference.set(statusToReport);
    reporter.report();
  } else {
    getLogger().error("Process group with provided group id could not be found.");
  }
}

代码示例来源:origin: apache/nifi

@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
  try {
    setupClient(context);
    charset = Charset.forName(context.getProperty(CHARSET).getValue());
  } catch (Exception ex) {
    getLogger().error("Could not initialize ElasticSearch client.", ex);
    throw new InitializationException(ex);
  }
}

代码示例来源:origin: apache/nifi

/**
 * Helper method to create InfluxDB instance
 * @return InfluxDB instance
 */
protected synchronized InfluxDB getInfluxDB(ProcessContext context) {
  if ( influxDB.get() == null ) {
    String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
    String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
    long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
    String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
    try {
      influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
    } catch(Exception e) {
      getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
      throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
    }
    getLogger().info("InfluxDB connection created for host {}",
        new Object[] {influxDbUrl});
  }
  return influxDB.get();
}

代码示例来源:origin: apache/nifi

/**
 *
 */
private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
  String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
  if (pv != null){
    return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
  }
  return null;
}

代码示例来源:origin: apache/nifi

protected void initializeResolver(final ProcessContext context ) {
  final String dnsTimeout = context.getProperty(DNS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString();
  final String dnsServer = context.getProperty(DNS_SERVER).getValue();
  final String dnsRetries = context.getProperty(DNS_RETRIES).getValue();
  String finalServer = "";
  Hashtable<String,String> env = new Hashtable<String,String>();
  env.put("java.naming.factory.initial", contextFactory);
  env.put("com.sun.jndi.dns.timeout.initial", dnsTimeout);
  env.put("com.sun.jndi.dns.timeout.retries", dnsRetries);
  if (StringUtils.isNotEmpty(dnsServer)) {
    for (String server : dnsServer.split(",")) {
      finalServer = finalServer + "dns://" + server + "/. ";
    }
    env.put(Context.PROVIDER_URL, finalServer);
  }
  try {
    initializeContext(env);
    initialized.set(true);
  } catch (NamingException e) {
    getLogger().error("Could not initialize JNDI context", e);
  }
}

代码示例来源:origin: apache/nifi

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  final FlowFile flowFile = session.get();
  if ( flowFile == null ) {
    return;
    eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
  } catch (final Exception ex) {
    getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex, REL_INVALID_HTML}, ex);
    session.transfer(flowFile, REL_INVALID_HTML);
    return;
  final String prependValue = context.getProperty(PREPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
  final String appendValue = context.getProperty(APPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
  final String outputType = context.getProperty(OUTPUT_TYPE).getValue();
  final String attributeKey = context.getProperty(ATTRIBUTE_KEY).evaluateAttributeExpressions(flowFile).getValue();
    session.transfer(flowFile, REL_NOT_FOUND);
  } else {
      FlowFile updatedFF = ff;
      switch (context.getProperty(DESTINATION).getValue()) {
        case DESTINATION_ATTRIBUTE:
          updatedFF = session.putAttribute(ff, HTML_ELEMENT_ATTRIBUTE_NAME, extractedElementValue);
      session.transfer(updatedFF, REL_SUCCESS);

代码示例来源:origin: apache/nifi

public void setupVariables(ProcessContext context) {
  scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
  scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
  scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
  String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
  if (!StringUtils.isEmpty(modulePath)) {
    modules = modulePath.split(",");
  } else {
    modules = new String[0];
  }
}

代码示例来源:origin: apache/nifi

@OnScheduled
public void onScheduled(final ProcessContext context) {
  try {
    channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
    channel.start();
    sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
        context.getProperty(SINK_TYPE).getValue());
    sink.setChannel(channel);
    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    String agentName = context.getProperty(AGENT_NAME).getValue();
    String sinkName = context.getProperty(SOURCE_NAME).getValue();
    Configurables.configure(sink,
        getFlumeSinkContext(flumeConfig, agentName, sinkName));
    sink.start();
  } catch (Throwable th) {
    getLogger().error("Error creating sink", th);
    throw Throwables.propagate(th);
  }
}

代码示例来源:origin: apache/nifi

/**
 * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
 * producing a result {@link FlowFile}.
 * <br>
 * The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS}
 * <br>
 * The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE}
 *
 */
@Override
protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
  boolean processed = false;
  FlowFile flowFile = session.get();
  if (flowFile != null) {
    flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    if (!this.isFailedFlowFile(flowFile)) {
      session.getProvenanceReporter().send(flowFile,
          context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/"
          + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
      session.transfer(flowFile, REL_SUCCESS);
    } else {
      session.transfer(session.penalize(flowFile), REL_FAILURE);
    }
    processed = true;
  }
  return processed;
}

代码示例来源:origin: apache/nifi

ProcessorConfiguration(final ProcessContext context) {
  conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
  operation = context.getProperty(OPERATION).getValue();
  final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
  outputRootDirPath = new Path(outputDirValue);
  final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
  fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
  ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
}

代码示例来源:origin: apache/nifi

@OnScheduled
public void onScheduled(final ProcessContext context) {
  try {
    source = SOURCE_FACTORY.create(
        context.getProperty(SOURCE_NAME).getValue(),
        context.getProperty(SOURCE_TYPE).getValue());
    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
    String agentName = context.getProperty(AGENT_NAME).getValue();
    String sourceName = context.getProperty(SOURCE_NAME).getValue();
    Configurables.configure(source,
      getFlumeSourceContext(flumeConfig, agentName, sourceName));
    if (source instanceof PollableSource) {
      source.setChannelProcessor(new ChannelProcessor(
        new NifiChannelSelector(pollableSourceChannel)));
      source.start();
    }
  } catch (Throwable th) {
    getLogger().error("Error creating source", th);
    throw Throwables.propagate(th);
  }
}

代码示例来源:origin: apache/nifi

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  final FlowFile flowFile = session.get();
  if (flowFile == null) {
    return;
  try {
    doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
    eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
  } catch (Exception ex) {
    getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex);
    session.transfer(flowFile, REL_INVALID_HTML);
    return;
  final String modifiedValue = context.getProperty(MODIFIED_VALUE).evaluateAttributeExpressions(flowFile).getValue();
    session.transfer(flowFile, REL_NOT_FOUND);
  } else {
    for (Element ele : eles) {
      switch (context.getProperty(OUTPUT_TYPE).getValue()) {
        case ELEMENT_HTML:
          ele.html(modifiedValue);
          break;
        case ELEMENT_ATTRIBUTE:
          ele.attr(context.getProperty(ATTRIBUTE_KEY).evaluateAttributeExpressions(flowFile).getValue(), modifiedValue);
          break;
        case ELEMENT_TEXT:
    session.transfer(ff, REL_SUCCESS);

代码示例来源:origin: apache/nifi

private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) {
  final List<String> cmds = new ArrayList<>();
  for (final PropertyDescriptor descriptor : descriptors) {
    cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
  }
  return cmds;
}

代码示例来源:origin: apache/nifi

@OnScheduled
public void setup(final ProcessContext context) {
  int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
  transformCache = Caffeine.newBuilder()
      .maximumSize(maxTransformsToCache)
      .build(specString -> createTransform(context, specString.orElse(null)));
  try {
    if (context.getProperty(MODULES).isSet()) {
      customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
    } else {
      customClassLoader = this.getClass().getClassLoader();
    }
  } catch (final Exception ex) {
    getLogger().error("Unable to setup processor", ex);
  }
}

代码示例来源:origin: apache/nifi

FlowFile flowFile = session.get();
if (flowFile == null) {
  return;
final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
  throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
    + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
  session.transfer(flowFile, REL_SUCCESS);
  session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
  session.transfer(session.penalize(flowFile), REL_FAILURE);
  getLogger().error("Failed while sending message to AMQP via " + publisher, e);

代码示例来源:origin: apache/nifi

@Override
public MediaType contentType() {
  String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
  contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
  return MediaType.parse(contentType);
}

代码示例来源:origin: apache/nifi

/**
 * Initialize the ignite cache instance
 * @param context process context
 * @throws ProcessException if there is a problem while scheduling the processor
 */
public void initializeIgniteCache(ProcessContext context) throws ProcessException {
  getLogger().info("Initializing Ignite cache");
  try {
    if ( getIgnite() == null ) {
      getLogger().info("Initializing ignite as client");
      super.initializeIgnite(context);
    }
    cacheName = context.getProperty(CACHE_NAME).getValue();
  } catch (Exception e) {
    getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e);
    throw new ProcessException(e);
  }
}

相关文章