org.apache.flume.Event类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.5k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中org.apache.flume.Event类的一些代码示例,展示了Event类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Event类的具体详情如下:
包路径:org.apache.flume.Event
类名称:Event

Event介绍

[英]Basic representation of a data object in Flume. Provides access to data as it flows through the system.
[中]Flume中数据对象的基本表示形式。在数据流经系统时提供对数据的访问。

代码示例

代码示例来源:origin: apache/flume

@Override
public void initialize(Event event, byte[] columnFamily) {
 this.headers = event.getHeaders();
 this.payload = event.getBody();
 this.cf = columnFamily;
}

代码示例来源:origin: apache/flume

@Override
public Event intercept(Event event) {
 String origBody = new String(event.getBody(), charset);
 Matcher matcher = searchPattern.matcher(origBody);
 String newBody = matcher.replaceAll(replaceString);
 event.setBody(newBody.getBytes(charset));
 return event;
}

代码示例来源:origin: apache/flume

/**
 * Instantiate an Event instance based on the provided body and headers.
 * If <code>headers</code> is <code>null</code>, then it is ignored.
 * @param body
 * @param headers
 * @return
 */
public static Event withBody(byte[] body, Map<String, String> headers) {
 Event event = new SimpleEvent();
 if (body == null) {
  body = new byte[0];
 }
 event.setBody(body);
 if (headers != null) {
  event.setHeaders(new HashMap<String, String>(headers));
 }
 return event;
}

代码示例来源:origin: apache/flume

private long estimateEventSize(Event event) {
 byte[] body = event.getBody();
 if (body != null && body.length != 0) {
  return body.length;
 }
 //Each event occupies at least 1 slot, so return 1.
 return 1;
}

代码示例来源:origin: apache/flume

/**
 * Modifies events in-place.
 */
@Override
public Event intercept(Event event) {
 Map<String, String> headers = event.getHeaders();
 if (preserveExisting && headers.containsKey(key)) {
  return event;
 }
 headers.put(key, value);
 return event;
}

代码示例来源:origin: apache/flume

@Override
public List<Event> convert(Message message) throws JMSException {
 Event event = new SimpleEvent();
 Map<String, String> headers = event.getHeaders();
 @SuppressWarnings("rawtypes")
 Enumeration propertyNames = message.getPropertyNames();
      "Read " + count + " of total " + length);
   event.setBody(body);
  String text = textMessage.getText();
  if (text != null) {
   event.setBody(text.getBytes(charset));
    out = new ObjectOutputStream(bos);
    out.writeObject(object);
    event.setBody(bos.toByteArray());
   } catch (IOException e) {
    throw new FlumeException("Error serializing object", e);

代码示例来源:origin: apache/flume

while (!(events = reader.readEvents(batchSize)).isEmpty()) {
 for (Event event : events) {
  event.setHeaders(headers);
  sentBytes += event.getBody().length;
  sent++;

代码示例来源:origin: apache/flume

LOG.debug("Building an Event with stream of size -- {}", outputStream.size());
Event event = EventBuilder.withBody(outputStream.toByteArray(), headers);
event.setHeaders(headers);
List<Event> eventList = new ArrayList<Event>();
eventList.add(event);

代码示例来源:origin: apache/flume

private long estimateEventSize(Event event) {
 byte[] body = event.getBody();
 if (body != null && body.length != 0) {
  return body.length;
 }
 //Each event occupies at least 1 slot, so return 1.
 return 1;
}

代码示例来源:origin: apache/flume

@Override
public List<Channel> getOptionalChannels(Event event) {
 String hdr = event.getHeaders().get(headerName);
 List<Channel> channels = optionalChannels.get(hdr);
 if (channels == null) {
  channels = EMPTY_LIST;
 }
 return channels;
}

代码示例来源:origin: apache/flume

@Override
public void initialize(Event event, byte[] columnFamily) {
 this.headers = event.getHeaders();
 this.payload = event.getBody();
 this.cf = columnFamily;
}

代码示例来源:origin: apache/flume

@Override
public void initialize(Event event, byte[] cf) {
 this.payload = event.getBody();
 this.cf = cf;
}

代码示例来源:origin: apache/flume

/**
 * Modifies events in-place.
 */
@Override
public Event intercept(Event event) {
 Map<String, String> headers = event.getHeaders();
 if (preserveExisting && headers.containsKey(header)) {
  return event;
 }
 if (host != null) {
  headers.put(header, host);
 }
 return event;
}

代码示例来源:origin: org.apache.flume/flume-ng-core

@Override
public Event intercept(Event event) {
 String origBody = new String(event.getBody(), charset);
 Matcher matcher = searchPattern.matcher(origBody);
 String newBody = matcher.replaceAll(replaceString);
 event.setBody(newBody.getBytes(charset));
 return event;
}

代码示例来源:origin: keedio/flume-ng-sql-source

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
  Event event = new SimpleEvent();
  
  String s = new String(cbuf);
  event.setBody(s.substring(off, len-1).getBytes(Charset.forName(sqlSourceHelper.getDefaultCharsetResultSet())));
  
  Map<String, String> headers;
  headers = new HashMap<String, String>();
  headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
  event.setHeaders(headers);
  
  events.add(event);
  
  if (events.size() >= sqlSourceHelper.getBatchSize())
    flush();
}

代码示例来源:origin: apache/flume

@Override
public void write(Event e) throws IOException {
 out.write((e.getHeaders() + " ").getBytes());
 out.write(e.getBody());
 if (appendNewline) {
  out.write('\n');
 }
}

代码示例来源:origin: apache/flume

@Override
public void initialize(Event event, byte[] cf) {
 this.payload = event.getBody();
 this.cf = cf;
}

代码示例来源:origin: apache/flume

private Object getKey(Event e) {
 // Write the data to HDFS
 String timestamp = e.getHeaders().get("timestamp");
 long eventStamp;
 if (timestamp == null) {
  eventStamp = System.currentTimeMillis();
 } else {
  eventStamp = Long.valueOf(timestamp);
 }
 return new LongWritable(eventStamp);
}

代码示例来源:origin: apache/flume

private List<Event> getSimpleEvents(List<Event> events) {
  List<Event> newEvents = new ArrayList<Event>(events.size());
  for (Event e:events) {
   newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));
  }
  return newEvents;
 }
}

代码示例来源:origin: apache/flume

@Override
public void setEvent(Event event) {
 this.payload = event.getBody();
}

相关文章

微信公众号

最新文章

更多