本文整理了Java中org.apache.flume.Event
类的一些代码示例,展示了Event
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Event
类的具体详情如下:
包路径:org.apache.flume.Event
类名称:Event
[英]Basic representation of a data object in Flume. Provides access to data as it flows through the system.
[中]Flume中数据对象的基本表示形式。在数据流经系统时提供对数据的访问。
代码示例来源:origin: apache/flume
@Override
public void initialize(Event event, byte[] columnFamily) {
this.headers = event.getHeaders();
this.payload = event.getBody();
this.cf = columnFamily;
}
代码示例来源:origin: apache/flume
@Override
public Event intercept(Event event) {
String origBody = new String(event.getBody(), charset);
Matcher matcher = searchPattern.matcher(origBody);
String newBody = matcher.replaceAll(replaceString);
event.setBody(newBody.getBytes(charset));
return event;
}
代码示例来源:origin: apache/flume
/**
* Instantiate an Event instance based on the provided body and headers.
* If <code>headers</code> is <code>null</code>, then it is ignored.
* @param body
* @param headers
* @return
*/
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if (body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
代码示例来源:origin: apache/flume
private long estimateEventSize(Event event) {
byte[] body = event.getBody();
if (body != null && body.length != 0) {
return body.length;
}
//Each event occupies at least 1 slot, so return 1.
return 1;
}
代码示例来源:origin: apache/flume
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(key)) {
return event;
}
headers.put(key, value);
return event;
}
代码示例来源:origin: apache/flume
@Override
public List<Event> convert(Message message) throws JMSException {
Event event = new SimpleEvent();
Map<String, String> headers = event.getHeaders();
@SuppressWarnings("rawtypes")
Enumeration propertyNames = message.getPropertyNames();
"Read " + count + " of total " + length);
event.setBody(body);
String text = textMessage.getText();
if (text != null) {
event.setBody(text.getBytes(charset));
out = new ObjectOutputStream(bos);
out.writeObject(object);
event.setBody(bos.toByteArray());
} catch (IOException e) {
throw new FlumeException("Error serializing object", e);
代码示例来源:origin: apache/flume
while (!(events = reader.readEvents(batchSize)).isEmpty()) {
for (Event event : events) {
event.setHeaders(headers);
sentBytes += event.getBody().length;
sent++;
代码示例来源:origin: apache/flume
LOG.debug("Building an Event with stream of size -- {}", outputStream.size());
Event event = EventBuilder.withBody(outputStream.toByteArray(), headers);
event.setHeaders(headers);
List<Event> eventList = new ArrayList<Event>();
eventList.add(event);
代码示例来源:origin: apache/flume
private long estimateEventSize(Event event) {
byte[] body = event.getBody();
if (body != null && body.length != 0) {
return body.length;
}
//Each event occupies at least 1 slot, so return 1.
return 1;
}
代码示例来源:origin: apache/flume
@Override
public List<Channel> getOptionalChannels(Event event) {
String hdr = event.getHeaders().get(headerName);
List<Channel> channels = optionalChannels.get(hdr);
if (channels == null) {
channels = EMPTY_LIST;
}
return channels;
}
代码示例来源:origin: apache/flume
@Override
public void initialize(Event event, byte[] columnFamily) {
this.headers = event.getHeaders();
this.payload = event.getBody();
this.cf = columnFamily;
}
代码示例来源:origin: apache/flume
@Override
public void initialize(Event event, byte[] cf) {
this.payload = event.getBody();
this.cf = cf;
}
代码示例来源:origin: apache/flume
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(header)) {
return event;
}
if (host != null) {
headers.put(header, host);
}
return event;
}
代码示例来源:origin: org.apache.flume/flume-ng-core
@Override
public Event intercept(Event event) {
String origBody = new String(event.getBody(), charset);
Matcher matcher = searchPattern.matcher(origBody);
String newBody = matcher.replaceAll(replaceString);
event.setBody(newBody.getBytes(charset));
return event;
}
代码示例来源:origin: keedio/flume-ng-sql-source
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
Event event = new SimpleEvent();
String s = new String(cbuf);
event.setBody(s.substring(off, len-1).getBytes(Charset.forName(sqlSourceHelper.getDefaultCharsetResultSet())));
Map<String, String> headers;
headers = new HashMap<String, String>();
headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
event.setHeaders(headers);
events.add(event);
if (events.size() >= sqlSourceHelper.getBatchSize())
flush();
}
代码示例来源:origin: apache/flume
@Override
public void write(Event e) throws IOException {
out.write((e.getHeaders() + " ").getBytes());
out.write(e.getBody());
if (appendNewline) {
out.write('\n');
}
}
代码示例来源:origin: apache/flume
@Override
public void initialize(Event event, byte[] cf) {
this.payload = event.getBody();
this.cf = cf;
}
代码示例来源:origin: apache/flume
private Object getKey(Event e) {
// Write the data to HDFS
String timestamp = e.getHeaders().get("timestamp");
long eventStamp;
if (timestamp == null) {
eventStamp = System.currentTimeMillis();
} else {
eventStamp = Long.valueOf(timestamp);
}
return new LongWritable(eventStamp);
}
代码示例来源:origin: apache/flume
private List<Event> getSimpleEvents(List<Event> events) {
List<Event> newEvents = new ArrayList<Event>(events.size());
for (Event e:events) {
newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));
}
return newEvents;
}
}
代码示例来源:origin: apache/flume
@Override
public void setEvent(Event event) {
this.payload = event.getBody();
}
内容来源于网络,如有侵权,请联系作者删除!