org.janusgraph.diskstorage.keycolumnvalue.KCVMutation类的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(13.5k)|赞(0)|评价(0)|浏览(78)

本文整理了Java中org.janusgraph.diskstorage.keycolumnvalue.KCVMutation类的一些代码示例,展示了KCVMutation类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KCVMutation类的具体详情如下:
包路径:org.janusgraph.diskstorage.keycolumnvalue.KCVMutation
类名称:KCVMutation

KCVMutation介绍

[英]Mutation type for KeyColumnValueStore.
[中]KeyColumnValueStore的突变类型。

代码示例

代码示例来源:origin: JanusGraph/janusgraph

private KCVMutation convert(KCVEntryMutation mutation) {
  assert !mutation.isEmpty();
  if (!mutation.hasDeletions())
    return new KCVMutation(mutation.getAdditions(), KeyColumnValueStore.NO_DELETIONS);
  else
    return new KCVMutation(mutation.getAdditions(), Lists.newArrayList(Iterables.transform(mutation.getDeletions(), KCVEntryMutation.ENTRY2COLUMN_FCT)));
}

代码示例来源:origin: JanusGraph/janusgraph

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
  final Map<String, KVMutation> converted = new HashMap<>(mutations.size());
  for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> storeEntry : mutations.entrySet()) {
    OrderedKeyValueStoreAdapter store = openDatabase(storeEntry.getKey());
    Preconditions.checkNotNull(store);
    KVMutation mut = new KVMutation();
    for (Map.Entry<StaticBuffer, KCVMutation> entry : storeEntry.getValue().entrySet()) {
      StaticBuffer key = entry.getKey();
      KCVMutation mutation = entry.getValue();
      if (mutation.hasAdditions()) {
        for (Entry addition : mutation.getAdditions()) {
          mut.addition(store.concatenate(key,addition));
        }
      }
      if (mutation.hasDeletions()) {
        for (StaticBuffer del : mutation.getDeletions()) {
          mut.deletion(store.concatenate(key, del));
        }
      }
    }
    converted.put(storeEntry.getKey(), mut);
  }
  manager.mutateMany(converted, txh);
}

代码示例来源:origin: JanusGraph/janusgraph

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
  for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> storeMut : mutations.entrySet()) {
    KeyColumnValueStore store = stores.get(storeMut.getKey());
    Preconditions.checkNotNull(store);
    for (Map.Entry<StaticBuffer, KCVMutation> keyMut : storeMut.getValue().entrySet()) {
      store.mutate(keyMut.getKey(), keyMut.getValue().getAdditions(), keyMut.getValue().getDeletions(), txh);
    }
  }
}

代码示例来源:origin: JanusGraph/janusgraph

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
  if (!manager.getFeatures().hasStoreTTL()) {
    assert manager.getFeatures().hasCellTTL();
    for (Map.Entry<String,Map<StaticBuffer, KCVMutation>> sentry : mutations.entrySet()) {
      Integer ttl = ttlEnabledStores.get(sentry.getKey());
      if (null != ttl && 0 < ttl) {
        for (KCVMutation mut : sentry.getValue().values()) {
          if (mut.hasAdditions()) applyTTL(mut.getAdditions(), ttl);
        }
      }
    }
  }
  manager.mutateMany(mutations,txh);
}

代码示例来源:origin: awslabs/dynamodb-janusgraph-storage-backend

@Override
public Collection<MutateWorker> createMutationWorkers(final Map<StaticBuffer, KCVMutation> mutationMap, final DynamoDbStoreTransaction txh) {
  final List<MutateWorker> workers = new LinkedList<>();
  for (Map.Entry<StaticBuffer, KCVMutation> entry : mutationMap.entrySet()) {
    final StaticBuffer hashKey = entry.getKey();
    final KCVMutation mutation = entry.getValue();
    // Filter out deletions that are also added - TODO why use a set?
    final Set<StaticBuffer> add = mutation.getAdditions().stream()
      .map(Entry::getColumn).collect(Collectors.toSet());
    final List<StaticBuffer> mutableDeletions = mutation.getDeletions().stream()
      .filter(del -> !add.contains(del))
      .collect(Collectors.toList());
    if (mutation.hasAdditions()) {
      workers.addAll(createWorkersForAdditions(hashKey, mutation.getAdditions(), txh));
    }
    if (!mutableDeletions.isEmpty()) {
      workers.addAll(createWorkersForDeletions(hashKey, mutableDeletions, txh));
    }
  }
  return workers;
}

代码示例来源:origin: awslabs/dynamodb-janusgraph-storage-backend

public Map<String, ExpectedAttributeValue> build(final KCVMutation mutation) {
  final Map<String, ExpectedAttributeValue> expected = Maps.newHashMapWithExpectedSize(mutation.getTotalMutations());
  for (Entry addedColumn : mutation.getAdditions()) {
    final StaticBuffer columnKey = addedColumn.getColumn();
    addExpectedValueIfPresent(columnKey, expected);
  }
  for (StaticBuffer deletedKey : mutation.getDeletions()) {
    addExpectedValueIfPresent(deletedKey, expected);
  }
  return expected;
}

代码示例来源:origin: org.janusgraph/janusgraph-cql

private void mutateManyLogged(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
  final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
  final BatchStatement batchStatement = new BatchStatement(Type.LOGGED);
  batchStatement.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel());
  batchStatement.addAll(Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
    final String tableName = tableNameAndMutations.getKey();
    final Map<StaticBuffer, KCVMutation> tableMutations = tableNameAndMutations.getValue();
    final CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName))
        .getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
    return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
      final StaticBuffer key = keyAndMutations.getKey();
      final KCVMutation keyMutations = keyAndMutations.getValue();
      final Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times))
          .flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime)));
      final Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times))
          .flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime)));
      return Iterator.concat(deletions, additions);
    });
  }));
  final Future<ResultSet> result = Future.fromJavaFuture(this.executorService, this.session.executeAsync(batchStatement));
  result.await();
  if (result.isFailure()) {
    throw EXCEPTION_MAPPER.apply(result.getCause().get());
  }
  sleepAfterWrite(txh, commitTime);
}

代码示例来源:origin: org.apache.atlas/atlas-janusgraph-hbase2

if (mutation.hasDeletions()) {
  if (commands.getSecond() == null) {
    Delete d = new Delete(key);
  for (StaticBuffer b : mutation.getDeletions()) {
if (mutation.hasAdditions()) {
  for (Entry e : mutation.getAdditions()) {

代码示例来源:origin: JanusGraph/janusgraph

@Test
  public void testMutateManyWithoutLockUsesInconsistentTx() throws BackendException {
    final ImmutableList<Entry> adds = ImmutableList.of(StaticArrayEntry.of(DATA_COL, DATA_VAL));
    final ImmutableList<StaticBuffer> deletions = ImmutableList.of();

    Map<String, Map<StaticBuffer, KCVMutation>> mutations =
        ImmutableMap.of(STORE_NAME,
            ImmutableMap.of(DATA_KEY, new KCVMutation(adds, deletions)));

    // Run mutateMany
    backingManager.mutateMany(mutations, inconsistentTx); // consistency level is unconstrained w/o locks

    ctrl.replay();
    // Run mutateMany
    expectManager.mutateMany(mutations, expectTx);
  }
}

代码示例来源:origin: org.janusgraph/janusgraph-cql

private void mutateManyUnlogged(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
  final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
  final Future<Seq<ResultSet>> result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
    final String tableName = tableNameAndMutations.getKey();
    final Map<StaticBuffer, KCVMutation> tableMutations = tableNameAndMutations.getValue();
    final CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName))
        .getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
    return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
      final StaticBuffer key = keyAndMutations.getKey();
      final KCVMutation keyMutations = keyAndMutations.getValue();
      final Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times))
          .flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime)));
      final Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times))
          .flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime)));
      return Iterator.concat(deletions, additions)
          .grouped(this.batchSize)
          .map(group -> Future.fromJavaFuture(this.executorService,
              this.session.executeAsync(
                  new BatchStatement(Type.UNLOGGED)
                      .addAll(group)
                      .setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()))));
    });
  }));
  result.await();
  if (result.isFailure()) {
    throw EXCEPTION_MAPPER.apply(result.getCause().get());
  }
  sleepAfterWrite(txh, commitTime);
}

代码示例来源:origin: awslabs/dynamodb-janusgraph-storage-backend

new SingleUpdateBuilder().deletions(mutation.getDeletions())
    .additions(mutation.getAdditions())
    .build();
if (mutation.hasDeletions() && !mutation.hasAdditions()) {
  worker = new SingleUpdateWithCleanupWorker(request, client.getDelegate());
} else {

代码示例来源:origin: JanusGraph/janusgraph

@Test
public void testMutateManyWithLockUsesConsistentTx() throws BackendException {
  final ImmutableList<Entry> adds = ImmutableList.of(StaticArrayEntry.of(DATA_COL, DATA_VAL));
  final ImmutableList<StaticBuffer> deletions = ImmutableList.of();
  Map<String, Map<StaticBuffer, KCVMutation>> mutations =
      ImmutableMap.of(STORE_NAME,
          ImmutableMap.of(DATA_KEY, new KCVMutation(adds, deletions)));
  final KeyColumn kc = new KeyColumn(LOCK_KEY, LOCK_COL);
  // Acquire a lock
  backingLocker.writeLock(kc, consistentTx);
  // 2. Run mutateMany
  // 2.1. Check locks & expected values before mutating data
  backingLocker.checkLocks(consistentTx);
  StaticBuffer nextBuf = BufferUtil.nextBiggerBuffer(kc.getColumn());
  KeySliceQuery expectedValueQuery = new KeySliceQuery(kc.getKey(), kc.getColumn(), nextBuf);
  expect(backingStore.getSlice(expectedValueQuery, consistentTx)) // expected value read must use strong consistency
    .andReturn(StaticArrayEntryList.of(StaticArrayEntry.of(LOCK_COL, LOCK_VAL)));
  // 2.2. Run mutateMany on backing manager to modify data
  backingManager.mutateMany(mutations, consistentTx); // writes by txs with locks must use strong consistency
  ctrl.replay();
  // Lock acquisition
  expectStore.acquireLock(LOCK_KEY, LOCK_COL, LOCK_VAL, expectTx);
  // Mutate
  expectManager.mutateMany(mutations, expectTx);
}

代码示例来源:origin: JanusGraph/janusgraph

@Test
public void mutateManyWritesSameKeyOnMultipleCFs() throws BackendException {
  final long arbitraryLong = 42; //must be greater than 0
  final StaticBuffer key = KeyColumnValueStoreUtil.longToByteBuffer(arbitraryLong * arbitraryLong);
  final StaticBuffer val = KeyColumnValueStoreUtil.longToByteBuffer(arbitraryLong * arbitraryLong * arbitraryLong);
  final StaticBuffer col = KeyColumnValueStoreUtil.longToByteBuffer(arbitraryLong);
  final StaticBuffer nextCol = KeyColumnValueStoreUtil.longToByteBuffer(arbitraryLong + 1);
  final StoreTransaction directTx = manager.beginTransaction(getTxConfig());
  KCVMutation km = new KCVMutation(
      Lists.newArrayList(StaticArrayEntry.of(col, val)),
      Lists.newArrayList());
  Map<StaticBuffer, KCVMutation> keyColumnAndValue = ImmutableMap.of(key, km);
  Map<String, Map<StaticBuffer, KCVMutation>> mutations =
      ImmutableMap.of(
          storeName1, keyColumnAndValue,
          storeName2, keyColumnAndValue);
  manager.mutateMany(mutations, directTx);
  directTx.commit();
  KeySliceQuery query = new KeySliceQuery(key, col, nextCol);
  List<Entry> expected = ImmutableList.of(StaticArrayEntry.of(col, val));
  Assert.assertEquals(expected, store1.getSlice(query, tx));
  Assert.assertEquals(expected, store2.getSlice(query, tx));
}

代码示例来源:origin: JanusGraph/janusgraph

new KCVMutation(ImmutableList.of(StaticArrayEntry.of(col, val2)), ImmutableList.of())));

代码示例来源:origin: org.apache.atlas/atlas-janusgraph-hbase2

@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
  Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
  mutateMany(mutations, txh);
}

代码示例来源:origin: org.janusgraph/janusgraph-cql

@Override
public void mutate(final StaticBuffer key, final List<Entry> additions, final List<StaticBuffer> deletions, final StoreTransaction txh) throws BackendException {
  this.storeManager.mutateMany(Collections.singletonMap(this.tableName, Collections.singletonMap(key, new KCVMutation(additions, deletions))), txh);
}

代码示例来源:origin: awslabs/dynamodb-janusgraph-storage-backend

@Override
public void mutate(final StaticBuffer hashKey, final List<Entry> additions, final List<StaticBuffer> deletions, final StoreTransaction txh) throws BackendException {
  log.debug("Entering mutate table:{} keys:{} additions:{} deletions:{} txh:{}",
       getTableName(),
       encodeKeyForLog(hashKey),
       encodeForLog(additions),
       encodeForLog(deletions),
       txh);
  super.mutateOneKey(hashKey, new KCVMutation(additions, deletions), txh);
  log.debug("Exiting mutate table:{} keys:{} additions:{} deletions:{} txh:{} returning:void",
       getTableName(),
       encodeKeyForLog(hashKey),
       encodeForLog(additions),
       encodeForLog(deletions),
       txh);
}

代码示例来源:origin: awslabs/dynamodb-janusgraph-storage-backend

@Override
public void mutate(final StaticBuffer key, final List<Entry> additions, final List<StaticBuffer> deletions, final StoreTransaction txh) throws BackendException {
  log.debug("Entering mutate table:{} keys:{} additions:{} deletions:{} txh:{}",
       getTableName(),
       encodeKeyForLog(key),
       encodeForLog(additions),
       encodeForLog(deletions),
       txh);
  // this method also filters out deletions that are also added
  super.mutateOneKey(key, new KCVMutation(additions, deletions), txh);
  log.debug("Exiting mutate table:{} keys:{} additions:{} deletions:{} txh:{} returning:void",
       getTableName(),
       encodeKeyForLog(key),
       encodeForLog(additions),
       encodeForLog(deletions),
       txh);
}

相关文章