org.apache.pig.data.DataBag类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中org.apache.pig.data.DataBag类的一些代码示例,展示了DataBag类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataBag类的具体详情如下:
包路径:org.apache.pig.data.DataBag
类名称:DataBag

DataBag介绍

[英]A collection of Tuples. A DataBag may or may not fit into memory. DataBag extends spillable, which means that it registers with a memory manager. By default, it attempts to keep all of its contents in memory. If it is asked by the memory manager to spill to disk (by a call to spill()), it takes whatever it has in memory, opens a spill file, and writes the contents out. This may happen multiple times. The bag tracks all of the files it's spilled to.

DataBag provides an Iterator interface, that allows callers to read through the contents. The iterators are aware of the data spilling. They have to be able to handle reading from files, as well as the fact that data they were reading from memory may have been spilled to disk underneath them.

The DataBag interface assumes that all data is written before any is read. That is, a DataBag cannot be used as a queue. If data is written after data is read, the results are undefined. This condition is not checked on each add or read, for reasons of speed. Caveat emptor.

Since spills are asynchronous (the memory manager requesting a spill runs in a separate thread), all operations dealing with the mContents Collection (which is the collection of tuples contained in the bag) have to be synchronized. This means that reading from a DataBag is currently serialized. This is ok for the moment because pig execution is currently single threaded. A ReadWriteLock was experimented with, but it was found to be about 10x slower than using the synchronize keyword. If pig changes its execution model to be multithreaded, we may need to return to this issue, as synchronizing reads will most likely defeat the purpose of multi-threading execution.

DataBags come in several types, default, sorted, and distinct. The type must be chosen up front, there is no way to convert a bag on the fly. Default data bags do not guarantee any particular order of retrieval for the tuples and may contain duplicate tuples. Sorted data bags guarantee that tuples will be retrieved in order, where "in order" is defined either by the default comparator for Tuple or the comparator provided by the caller when the bag was created. Sorted bags may contain duplicates. Distinct bags do not guarantee any particular order of retrieval, but do guarantee that they will not contain duplicate tuples.
[中]元组的集合。数据包可以装入内存,也可以不装入内存。数据包扩展了spillable,这意味着它向内存管理器注册。默认情况下,它会尝试将其所有内容保留在内存中。如果内存管理器要求它溢出到磁盘(通过调用spill()),它将获取内存中的所有内容,打开溢出文件,并将内容写出。这可能会发生多次。这个包跟踪所有它溢出到的文件。
DataAG提供了一个迭代器接口,允许调用者通读内容。迭代器知道数据溢出。他们必须能够处理从文件中读取的数据,以及他们从内存中读取的数据可能已经溢出到他们下面的磁盘的事实。
DataAG接口假定所有数据在读取之前都已写入。也就是说,数据包不能用作队列。如果在读取数据之后写入数据,则结果未定义。由于速度原因,每次添加或读取时都不会检查此条件。警告买主。
由于溢出是异步的(请求溢出的内存管理器在单独的线程中运行),因此必须同步处理mContents集合(包中包含的元组集合)的所有操作。这意味着从数据包读取当前是序列化的。目前这还可以,因为清管器执行目前是单线程的。ReadWriteLock进行了实验,但发现它比使用synchronize关键字慢10倍左右。如果pig将其执行模型更改为多线程,我们可能需要返回到这个问题,因为同步读取很可能会破坏多线程执行的目的。
数据包有几种类型,默认、排序和不同。必须提前选择类型,无法在飞行中转换包。默认数据包不保证元组的任何特定检索顺序,并且可能包含重复的元组。排序数据包保证按顺序检索元组,其中“顺序”由元组的默认比较器或创建包时调用者提供的比较器定义。分拣的行李可能包含重复件。不同的包不保证任何特定的检索顺序,但保证它们不包含重复的元组。

代码示例

代码示例来源:origin: apache/hive

private static DataBag transformToBag(List<?> list, HCatFieldSchema hfs) throws Exception {
 if (list == null) {
  return null;
 }
 HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
 DataBag db = new DefaultDataBag();
 for (Object o : list) {
  Tuple tuple;
  if (elementSubFieldSchema.getType() == Type.STRUCT) {
   tuple = transformToTuple((List<?>) o, elementSubFieldSchema);
  } else {
   // bags always contain tuples
   tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
  }
  db.add(tuple);
 }
 return db;
}

代码示例来源:origin: apache/hive

HCatSchema structSubSchema = hcatFS.getStructSubSchema();
List<Object> all = ((Tuple) pigObj).getAll();
ArrayList<Object> converted = new ArrayList<Object>(all.size());
for (int i = 0; i < all.size(); i++) {
HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
boolean needTuple = tupFS.getType() == Type.STRUCT;
List<Object> bagContents = new ArrayList<Object>((int) pigBag.size());
Iterator<Tuple> bagItr = pigBag.iterator();
 bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));

代码示例来源:origin: org.apache.pig/pig

@Override
  public Tuple exec(Tuple input) throws IOException {
    // Initial is called in the map.
    // we just send the tuple down
    try {
      // input is a bag with one tuple containing
      // the column we are trying to operate on
      DataBag bg = (DataBag) input.get(0);
      if (bg.iterator().hasNext()) {
        return bg.iterator().next();
      } else {
        // make sure that we call the object constructor, not the list constructor
        return tfact.newTuple((Object) null);
      }
    } catch (ExecException e) {
      throw e;
    } catch (Exception e) {
      int errCode = 2106;
      throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
    }
  }
}

代码示例来源:origin: com.linkedin.datafu/datafu

@Override
public DataBag exec(Tuple input) throws IOException
{
 // initialize a reverse mapping
 HashMap<Integer, String> positionToAlias = new HashMap<Integer, String>();
 for (String alias : getFieldAliases().keySet()) {
  positionToAlias.put(getFieldAliases().get(alias), alias);
 }
 DataBag output = BagFactory.getInstance().newDefaultBag();
 for (int i=0; i<input.size(); i++) {
  Tuple tuple = TupleFactory.getInstance().newTuple();
  tuple.append(positionToAlias.get(i));
  tuple.append(input.get(i));
  output.add(tuple);
 }
 return output;
}

代码示例来源:origin: org.apache.pig/pig

static protected long count(Tuple input) throws ExecException {
  DataBag values = (DataBag)input.get(0);
  Iterator it = values.iterator();
  long cnt = 0;
  while (it.hasNext()){
    Tuple t = (Tuple)it.next();
    if (t != null && t.size() > 0 && t.get(0) != null)
      cnt++;
  }
  return cnt;
}

代码示例来源:origin: org.apache.pig/pig

if (input.size() != 2) {
  int errCode = 2107;
  String msg = "DIFF expected two inputs but received " + input.size() + " inputs.";
  throw new ExecException(msg, errCode, PigException.BUG);
  Object o1 = input.get(0);
  if (o1 instanceof DataBag) {
    DataBag bag1 = (DataBag)o1;
    Object d2 = input.get(1);
    if (!d1.equals(d2)) {
      output.add(mTupleFactory.newTuple(d1));
      output.add(mTupleFactory.newTuple(d2));

代码示例来源:origin: lucidworks/solr-scale-tk

public DataBag exec(Tuple input) throws IOException {
  DataBag outputBag = bagFactory.newDefaultBag();        
  String idBase = (String)input.get(0);        
  for (int k=0; k < numKeys; k++) {
   String key = idBase+k;
   int key_bucket = random.nextInt(maxRandom);
   Tuple next = tupleFactory.newTuple(2);
   next.set(0, key);
   next.set(1, key_bucket);
   outputBag.add(next);
  }
  return outputBag;
}

代码示例来源:origin: pl.edu.icm.coansys/dc-logic

private DataBag getCategories(List<ClassifCode> classifCodeList) {
  DataBag db = new DefaultDataBag();
  for (ClassifCode code : classifCodeList) {
    for (String co_str : code.getValueList()) {
      db.add(TupleFactory.getInstance().newTuple(co_str));
    }
  }
  return db;
}

代码示例来源:origin: org.apache.pig/pig

final TupleFactory tf = TupleFactory.getInstance();
Map<String, Object> distMap = (Map<String, Object>) t.get(0);
DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
Iterator<Tuple> it = partitionList.iterator();
while (it.hasNext()) {
  Tuple idxTuple = it.next();
  Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
  Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
  Tuple keyTuple = tf.newTuple();
  for (int i = 0; i < idxTuple.size() - 2; i++) {
    keyTuple.append(idxTuple.get(i));
log.warn(e.getMessage());

代码示例来源:origin: com.linkedin.datafu/datafu

@Override
public void accumulate(Tuple arg0) throws IOException
{
 DataBag inputBag = (DataBag)arg0.get(0);
 for (Tuple t : inputBag) {
  Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
  t1.append(i);
  outputBag.add(t1);
  if (count % 1000000 == 0) {
   outputBag.spill();
   count = 0;
  }
  i++;
  count++;
 }
}

代码示例来源:origin: mozilla-metrics/akela

@Override
public Tuple exec(Tuple input) throws IOException {
  if (input == null || input.size() == 0) {
    return null;
  }
  DataBag db = (DataBag) input.get(0);
  Iterator<Tuple> iter = db.iterator();
  Tuple output = tupleFactory.newTuple();
  while (iter.hasNext()) {
    Tuple t = iter.next();
    for (Object o : t.getAll()) {
      output.append(o);
    }
  }
  return output;
}

代码示例来源:origin: thedatachef/varaha

public DataBag exec(Tuple input) throws IOException {
    if (input == null || input.size() < 1 || input.isNull(0))
      return null;

    // Output bag
    DataBag bagOfTokens = bagFactory.newDefaultBag();
        
    StringReader textInput = new StringReader(input.get(0).toString());
    PTBTokenizer ptbt = new PTBTokenizer(textInput, new CoreLabelTokenFactory(), "");

    for (CoreLabel label; ptbt.hasNext(); ) {
     label = (CoreLabel)ptbt.next();
     Tuple termText = tupleFactory.newTuple(label.toString());
     bagOfTokens.add(termText);
    }
    
    return bagOfTokens;
  }
}

代码示例来源:origin: com.linkedin.datafu/datafu

public DataBag call(DataBag inputBag) throws IOException
{
 DataBag outputBag = BagFactory.getInstance().newDefaultBag();
 long i = start, count = 0;
 i = inputBag.size() - 1 + start;
 for (Tuple t : inputBag) {
  Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
  t1.append(i);
  outputBag.add(t1);
  if (count % 1000000 == 0) {
   outputBag.spill();
   count = 0;
  }
  i--;
  count++;
 }
 return outputBag;
}

代码示例来源:origin: com.linkedin.datafu/datafu

@Override
public Tuple exec(Tuple input) throws IOException
 DataBag bag = (DataBag) input.get(0);
 DataBag selected = bagFactory.newDefaultBag();
 DataBag aggWaiting = bagFactory.newSortedBag(new ScoredTupleComparator());
 DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
 Tuple output = tupleFactory.newTuple();
  n += (Long) innerTuple.get(0);
  selected.addAll((DataBag) innerTuple.get(1));
    selected.add(scored.getTuple());
    aggWaiting.add(t);
   selected.add(scored.getTuple());
   waiting.add(t);
 output.append(waiting);
 System.err.println("Read " + n + " items, selected " + selected.size()
   + ", and wait-listed " + aggWaiting.size() + ".");

代码示例来源:origin: org.apache.pig/pig

@Override
  public Tuple exec(Tuple input) throws IOException {
    // Since Initial is guaranteed to be called
    // only in the map, it will be called with an
    // input of a bag with a single tuple - the 
    // count should always be 1 if bag is non empty
    DataBag bag = (DataBag)input.get(0);
    return mTupleFactory.newTuple(bag.iterator().hasNext()? 
        Long.valueOf(1L) : Long.valueOf(0L));
  }
}

代码示例来源:origin: com.linkedin.datafu/datafu

@Override
public DataBag exec(Tuple input) throws IOException 
{    
 DataBag samples = (DataBag)input.get(0);
 if (samples.size() <= numSamples) {
  return samples;
 }
 else
 {
  return super.exec(input);
 }
}

代码示例来源:origin: org.apache.pig/pig

@Override
  public Tuple exec(Tuple input) throws IOException {
    // Since Initial is guaranteed to be called
    // only in the map, it will be called with an
    // input of a bag with a single tuple - the 
    // count should always be 1 if bag is non empty
    DataBag bag = (DataBag)input.get(0);
    Iterator it = bag.iterator();
    if (it.hasNext()){
      Tuple t = (Tuple)it.next();
      if (t != null && t.size() > 0 && t.get(0) != null)
        return mTupleFactory.newTuple(Long.valueOf(1));
    }
    return mTupleFactory.newTuple(Long.valueOf(0));
  }
}

代码示例来源:origin: org.apache.pig/pig

Tuple tupleOfMaxSchemaSize = null;
for (DataBag bag : inputDataMap.get(fs)) {
  if (bag.size() > 0) {
    int size = 0;
    Tuple t = null;
    t = bag.iterator().next();
    size = t.size();
    if (size > maxSchemaSize) {
      maxSchemaSize = size;
  if (bag.size() > 0) {
    for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
      Tuple t = it.next();
      for (int i = t.size(); i < maxSchemaSize; ++i) {
        t.append(tupleOfMaxSchemaSize.get(i));
  newBaseData.put(e.getKey(), bag);
bag.addAll(e.getValue());

代码示例来源:origin: pl.edu.icm.coansys/document-similarity-logic

private <T1, T2> DataBag listToDataBag(List<T1> list1, List<T2> list2)
    throws ExecException {
  DataBag output = BagFactory.getInstance().newDefaultBag();
  for (int i = 0; i < Math.min(list1.size(), list2.size()); i++) {
    Tuple t = TupleFactory.getInstance().newTuple(2);
    t.set(0, list1.get(i));
    t.set(1, list2.get(i));
    output.add(t);
  }
  return output;
}

代码示例来源:origin: mozilla-metrics/akela

/**
 * Converts List objects to DataBag to keep Pig happy
 * 
 * @param l
 * @return
 */
@SuppressWarnings("unchecked")
private DataBag convertListToBag(List<Object> l) {
  DataBag dbag = bagFactory.newDefaultBag();
  Tuple t = tupleFactory.newTuple();
  for (Object o : l) {
    if (o instanceof List) {
      dbag.addAll(convertListToBag((List<Object>) o));
    } else {
      t.append(o);
    }
  }
  if (t.size() > 0) {
    dbag.add(t);
  }
  return dbag;
}

相关文章

微信公众号

最新文章

更多