本文整理了Java中org.apache.nifi.components.PropertyValue.getValue()
方法的一些代码示例,展示了PropertyValue.getValue()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。PropertyValue.getValue()
方法的具体详情如下:
包路径:org.apache.nifi.components.PropertyValue
类名称:PropertyValue
方法名:getValue
暂无
代码示例来源:origin: apache/nifi
@Override
protected String getBaseUrl(FlowFile inputFlowFile, ProcessContext context) {
return context.getProperty(URL).evaluateAttributeExpressions(inputFlowFile).getValue();
}
}
代码示例来源:origin: apache/nifi
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
session.adjustCounter(context.getProperty(COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(),
Long.parseLong(context.getProperty(DELTA).evaluateAttributeExpressions(flowFile).getValue()),
false
);
session.transfer(flowFile, SUCCESS);
}
}
代码示例来源:origin: apache/nifi
@OnScheduled
public void onScheduled(final ProcessContext context) {
hostname = context.getProperty(DB_HOST).getValue();
port = context.getProperty(DB_PORT).asInteger();
username = context.getProperty(USERNAME).getValue();
password = context.getProperty(PASSWORD).getValue();
databaseName = context.getProperty(DB_NAME).getValue();
tableName = context.getProperty(TABLE_NAME).getValue();
try {
rethinkDbConnection = makeConnection();
} catch(Exception e) {
getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e);
throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e);
}
getLogger().info("RethinkDB connection created for host {} port {} and db {}",
new Object[] {hostname, port,databaseName});
}
代码示例来源:origin: apache/nifi
/**
* Report the registered metrics.
*
* @param context used for getting the most recent {@link ProcessGroupStatus}.
*/
@Override
public void onTrigger(ReportingContext context) {
String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue();
ProcessGroupStatus statusToReport = groupId == null
? context.getEventAccess().getControllerStatus()
: context.getEventAccess().getGroupStatus(groupId);
if (statusToReport != null) {
currentStatusReference.set(statusToReport);
reporter.report();
} else {
getLogger().error("Process group with provided group id could not be found.");
}
}
代码示例来源:origin: apache/nifi
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
setupClient(context);
charset = Charset.forName(context.getProperty(CHARSET).getValue());
} catch (Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
}
}
代码示例来源:origin: apache/nifi
/**
* Helper method to create InfluxDB instance
* @return InfluxDB instance
*/
protected synchronized InfluxDB getInfluxDB(ProcessContext context) {
if ( influxDB.get() == null ) {
String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
try {
influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
} catch(Exception e) {
getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
}
getLogger().info("InfluxDB connection created for host {}",
new Object[] {influxDbUrl});
}
return influxDB.get();
}
代码示例来源:origin: apache/nifi
/**
*
*/
private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
if (pv != null){
return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
}
return null;
}
代码示例来源:origin: apache/nifi
protected void initializeResolver(final ProcessContext context ) {
final String dnsTimeout = context.getProperty(DNS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString();
final String dnsServer = context.getProperty(DNS_SERVER).getValue();
final String dnsRetries = context.getProperty(DNS_RETRIES).getValue();
String finalServer = "";
Hashtable<String,String> env = new Hashtable<String,String>();
env.put("java.naming.factory.initial", contextFactory);
env.put("com.sun.jndi.dns.timeout.initial", dnsTimeout);
env.put("com.sun.jndi.dns.timeout.retries", dnsRetries);
if (StringUtils.isNotEmpty(dnsServer)) {
for (String server : dnsServer.split(",")) {
finalServer = finalServer + "dns://" + server + "/. ";
}
env.put(Context.PROVIDER_URL, finalServer);
}
try {
initializeContext(env);
initialized.set(true);
} catch (NamingException e) {
getLogger().error("Could not initialize JNDI context", e);
}
}
代码示例来源:origin: apache/nifi
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
} catch (final Exception ex) {
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex, REL_INVALID_HTML}, ex);
session.transfer(flowFile, REL_INVALID_HTML);
return;
final String prependValue = context.getProperty(PREPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String appendValue = context.getProperty(APPEND_ELEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String outputType = context.getProperty(OUTPUT_TYPE).getValue();
final String attributeKey = context.getProperty(ATTRIBUTE_KEY).evaluateAttributeExpressions(flowFile).getValue();
session.transfer(flowFile, REL_NOT_FOUND);
} else {
FlowFile updatedFF = ff;
switch (context.getProperty(DESTINATION).getValue()) {
case DESTINATION_ATTRIBUTE:
updatedFF = session.putAttribute(ff, HTML_ELEMENT_ATTRIBUTE_NAME, extractedElementValue);
session.transfer(updatedFF, REL_SUCCESS);
代码示例来源:origin: apache/nifi
public void setupVariables(ProcessContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
}
代码示例来源:origin: apache/nifi
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
context.getProperty(SINK_TYPE).getValue());
sink.setChannel(channel);
String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
String agentName = context.getProperty(AGENT_NAME).getValue();
String sinkName = context.getProperty(SOURCE_NAME).getValue();
Configurables.configure(sink,
getFlumeSinkContext(flumeConfig, agentName, sinkName));
sink.start();
} catch (Throwable th) {
getLogger().error("Error creating sink", th);
throw Throwables.propagate(th);
}
}
代码示例来源:origin: apache/nifi
/**
* Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
* producing a result {@link FlowFile}.
* <br>
* The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS}
* <br>
* The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE}
*
*/
@Override
protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
boolean processed = false;
FlowFile flowFile = session.get();
if (flowFile != null) {
flowFile = this.doRendezvousWithKafka(flowFile, context, session);
if (!this.isFailedFlowFile(flowFile)) {
session.getProvenanceReporter().send(flowFile,
context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/"
+ context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
session.transfer(flowFile, REL_SUCCESS);
} else {
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
processed = true;
}
return processed;
}
代码示例来源:origin: apache/nifi
ProcessorConfiguration(final ProcessContext context) {
conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
operation = context.getProperty(OPERATION).getValue();
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
outputRootDirPath = new Path(outputDirValue);
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
}
代码示例来源:origin: apache/nifi
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
source = SOURCE_FACTORY.create(
context.getProperty(SOURCE_NAME).getValue(),
context.getProperty(SOURCE_TYPE).getValue());
String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
String agentName = context.getProperty(AGENT_NAME).getValue();
String sourceName = context.getProperty(SOURCE_NAME).getValue();
Configurables.configure(source,
getFlumeSourceContext(flumeConfig, agentName, sourceName));
if (source instanceof PollableSource) {
source.setChannelProcessor(new ChannelProcessor(
new NifiChannelSelector(pollableSourceChannel)));
source.start();
}
} catch (Throwable th) {
getLogger().error("Error creating source", th);
throw Throwables.propagate(th);
}
}
代码示例来源:origin: apache/nifi
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
try {
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
} catch (Exception ex) {
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex);
session.transfer(flowFile, REL_INVALID_HTML);
return;
final String modifiedValue = context.getProperty(MODIFIED_VALUE).evaluateAttributeExpressions(flowFile).getValue();
session.transfer(flowFile, REL_NOT_FOUND);
} else {
for (Element ele : eles) {
switch (context.getProperty(OUTPUT_TYPE).getValue()) {
case ELEMENT_HTML:
ele.html(modifiedValue);
break;
case ELEMENT_ATTRIBUTE:
ele.attr(context.getProperty(ATTRIBUTE_KEY).evaluateAttributeExpressions(flowFile).getValue(), modifiedValue);
break;
case ELEMENT_TEXT:
session.transfer(ff, REL_SUCCESS);
代码示例来源:origin: apache/nifi
private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) {
final List<String> cmds = new ArrayList<>();
for (final PropertyDescriptor descriptor : descriptors) {
cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
}
return cmds;
}
代码示例来源:origin: apache/nifi
@OnScheduled
public void setup(final ProcessContext context) {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
.build(specString -> createTransform(context, specString.orElse(null)));
try {
if (context.getProperty(MODULES).isSet()) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
}
} catch (final Exception ex) {
getLogger().error("Unable to setup processor", ex);
}
}
代码示例来源:origin: apache/nifi
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().error("Failed while sending message to AMQP via " + publisher, e);
代码示例来源:origin: apache/nifi
@Override
public MediaType contentType() {
String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
return MediaType.parse(contentType);
}
代码示例来源:origin: apache/nifi
/**
* Initialize the ignite cache instance
* @param context process context
* @throws ProcessException if there is a problem while scheduling the processor
*/
public void initializeIgniteCache(ProcessContext context) throws ProcessException {
getLogger().info("Initializing Ignite cache");
try {
if ( getIgnite() == null ) {
getLogger().info("Initializing ignite as client");
super.initializeIgnite(context);
}
cacheName = context.getProperty(CACHE_NAME).getValue();
} catch (Exception e) {
getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e);
throw new ProcessException(e);
}
}
内容来源于网络,如有侵权,请联系作者删除!