janusgraph在gremlin控制台外部执行olap查询

sf6xfgos  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(247)

我有一个图,其中一些节点有数百万条传入边。我需要定期获得这些节点的边数。我正在使用cassandar作为存储后端。查询:

g.V().has('vid','qwerty').inE().count().next()

所有可用的文档都解释了如何从gremlin控制台利用apachespark来实现这一点。我是否可以在gremlin控制台外编写逻辑作为spark作业,并在hadoop集群上定期运行id。
以下是我不使用spark时在gremlin控制台上的查询输出:
14108889[gremlin-server-session-1]warn org.apache.tinkerpop.gremlin.server.op.abstractevalopprocessor-处理请求脚本时出现异常[requestmessage{,requestid=c3d902b7-0fdd-491d-8639-546963212474,op='eval',processor='session',args={gremlin=g.v().has('vid','qwerty').ine().count().next(),session=2831d264-4566-4d15-99c5-d9bbb202b1f8,bindings={},managetransaction=false,batchsize=64}]。位于org.apache.cassandra.thrift.cassandra$multiget\u slice\u result$multiget\u slice\u resultstandardscheme.read(cassandra。java:14696)在org.apache.cassandra.thrift.cassandra$multiget\u slice\u result$multiget\u slice\u resultstandardscheme.read(cassandra。java:14633)在org.apache.cassandra.thrift.cassandra$multiget\u slice\u result.read(cassandra。java:14559)位于org.apache.thrift.tserviceclient.receivebase(tserviceclient。java:78)在org.apache.cassandra.thrift.cassandra$client.recv\u multiget\u slice(cassandra。java:741)在org.apache.cassandra.thrift.cassandra$client.multiget\u slice(cassandra。java:725)在org.janusgraph.diskstorage.cassandra.thrift.cassandrathriftkeycolumnvaluestore.getnamesslice(cassandrathriftkeycolumnvaluestore)。java:143)在org.janusgraph.diskstorage.cassandra.thrift.cassandrathriftkeycolumnvaluestore.getslice(cassandrathriftkeycolumnvaluestore。java:100)在org.janusgraph.diskstorage.keycolumnvalue.kcvsproxy.getslice(kcvsproxy。java:82)在org.janusgraph.diskstorage.keycolumnvalue.cache.expirationkcvscache.getslice(expirationkcvscache。java:129)在org.janusgraph.diskstorage.backendtransaction$2.call(backendtransaction。java:288)在org.janusgraph.diskstorage.backendtransaction$2.call(backendtransaction。java:285)在org.janusgraph.diskstorage.util.backonoperation.executedirect(backonoperation。java:69)在org.janusgraph.diskstorage.util.backonoperation.execute(backonoperation。java:55)在org.janusgraph.diskstorage.backendtransaction.executeread(backendtransaction。java:470) 在org.janusgraph.diskstorage.backendtransaction.edgestoremultiquery(backendtransaction。java:285)在org.janusgraph.graphdb.database.standardjanusgraph.edgemultiquery(standardjanusgraph。java:441)在org.janusgraph.graphdb.transaction.standardjanusgraphtx.lambda$executemultiquery$3(standardjanusgraphtx。java:1054)在org.janusgraph.graphdb.query.profile.queryprofiler.profile(queryprofiler。java:98)在org.janusgraph.graphdb.query.profile.queryprofiler.profile(queryprofiler。java:90)在org.janusgraph.graphdb.transaction.standardjanusgraphtx.executemultiquery(standardjanusgraphtx。java:1054)在org.janusgraph.graphdb.query.vertex.multivertexcentricquerybuilder.execute(multivertexcentricquerybuilder。java:113)在org.janusgraph.graphdb.query.vertex.multivertexcentricquerybuilder.edges(multivertexcentricquerybuilder。java:133)在org.janusgraph.graphdb.tinkerpop.optimize.janusgraphvertexstep.initialize(janusgraphvertexstep。java:95)在org.janusgraph.graphdb.tinkerpop.optimize.janusgraphvertexstep.processnextstart(janusgraphvertexstep。java:101)位于org.apache.tinkerpop.gremlin.process.traversal.step.util.abstractstep.hasnext(抽象步骤)。java:143)在org.apache.tinkerpop.gremlin.process.traversal.step.util.expandablestepiterator.hasnext(expandablestepiterator。java:42)在org.apache.tinkerpop.gremlin.process.traversal.step.util.reducingbarrierstep.processallstarts(reducingbarrierstep。java:83)在org.apache.tinkerpop.gremlin.process.traversal.step.util.reducingbarrierstep.processnextstart(reducingbarrierstep。java:113)在org.apache.tinkerpop.gremlin.process.traversal.step.util.abstractstep.next(抽象步骤)。java:128)在org.apache.tinkerpop.gremlin.process.traversal.step.util.abstractstep.next(abstractstep。java:38)在org.apache.tinkerpop.gremlin.process.traversal.util.defaulttraversal.next(defaulttraversal。java:200)at java_util_iterator$next.call(未知源代码)在org.codehaus.groovy.runtime.callsite.callsitearray.defaultcall(callsitearray。java:48)在org.codehaus.groovy.runtime.callsite.abstractcallsite.call(abstractcallsite。java:113)在org.codehaus.groovy.runtime.callsite.abstractcallsite.call(abstractcallsite。java:117)在script13.运行(script13。groovy:1)在org.apache.tinkerpop.gremlin.groovy.jsr223.gremlingroovyscript引擎.eval(gremlingroovyscript引擎)。java:843)位于org.apache.tinkerpop.gremlin.groovy.jsr223.gremlingroovyscriptengine.eval(gremlingroovyscriptengine)。java:548)在javax.script.abstractscriptengine.eval(abstractscriptengine。java:233)在org.apache.tinkerpop.gremlin.groovy.engine.scriptengines.eval(脚本引擎)。java:120)在org.apache.tinkerpop.gremlin.groovy.engine.gremlinexecutor.lambda$eval$0(gremlinexecutor。java:290)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
然而 g.V().has('vid','qwerty').inE().limit(10000).count().next() 很好的工作和给予 ==>10000

zengzsys

zengzsys1#

以下是java客户端,它使用sparkgraphcomputer创建图形:

public class FollowCountSpark {

    private static Graph hgraph;
    private static GraphTraversalSource traversalSource;

    public static void main(String[] args) {
        createHGraph();
        System.exit(0);
    }

    private static void createHGraph() {
        hgraph = GraphFactory.open("/resources/jp_spark.properties");

        traversalSource = hgraph.traversal().withComputer(SparkGraphComputer.class);
        System.out.println("traversalSource = "+traversalSource);
        getAllEdgesFromHGraph();
    }

    static long getAllEdgesFromHGraph(){
        try{
            GraphTraversal<Vertex, Vertex> allV = traversalSource.V();
            GraphTraversal<Vertex, Vertex> gt = allV.has("vid", "supernode");
            GraphTraversal<Vertex, Long> c = gt.inE()
//                    .limit(600000)
                    .count();
            long l = c.next();
            System.out.println("All edges = "+l);
            return l;
        }catch (Exception e) {
            System.out.println("Error while fetching the edges for : ");
            e.printStackTrace();
        }
        return -1;
    }
}

相应的属性文件是:

storage.backend=cassandrathrift
storage.cassandra.keyspace=t_graph

cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
ids.block-size = 100000
storage.batch-loading = true
storage.buffer-size = 1000

# read-cassandra-3.properties

# 

# Hadoop Graph Configuration

# 

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

# 

# JanusGraph Cassandra InputFormat configuration

# 

# These properties defines the connection properties which were used while write data to JanusGraph.

janusgraphmr.ioformat.conf.storage.backend=cassandrathrift

# This specifies the hostname & port for Cassandra data store.

# janusgraphmr.ioformat.conf.storage.hostname=10.xx.xx.xx,xx.xx.xx.18,xx.xx.xx.141

janusgraphmr.ioformat.conf.storage.port=9160

# This specifies the keyspace where data is stored.

janusgraphmr.ioformat.conf.storage.cassandra.keyspace=t_graph

# 

# Apache Cassandra InputFormat configuration

# 

cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
spark.cassandra.input.split.size=256

# 

# SparkGraphComputer Configuration

# 

spark.master=local[1]
spark.executor.memory=1g
spark.cassandra.input.split.size_in_mb=512
spark.executor.extraClassPath=/opt/lib/janusgraph/*
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator

以及所有spark和hadoop特定类的相应pom.xml依赖关系:

<dependencies>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-core</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-cassandra</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>spark-gremlin</artifactId>
            <version>3.1.0-incubating</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>spark-gremlin</artifactId>
            <version>3.2.5</version>
            <exclusions>
               <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-hadoop-core</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-hbase</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>

        <dependency>
            <groupId>org.janusgraph</groupId>
            <artifactId>janusgraph-cql</artifactId>
            <version>${janusgraph.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.1</version>
        </dependency>

    </dependencies>

希望这有帮助:)

相关问题