cassandra在aws上的性能缓慢

chy5wohz  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(279)

我们的一位DBA使用相同的python代码(如下所示)在aws ec2上对cassandra到oracle的插入性能(1m记录)进行了基准测试,得到了以下令人惊讶的结果:
oracle 12.2,单节点,64核/256gb,ec2 ebs存储,38秒
cassandra 5.1.13(ddac),单节点,2核/4gb,ec2 ebs存储,464秒
cassandra 3.11.4,4个节点,16核/64gb(每个节点),ec2 ebs存储,486秒
那么-我们做错什么了?
Cassandra怎么表现得这么慢?

  • 节点不够(为什么4个节点比单个节点慢?)
  • 配置问题?
  • 还有别的吗?

谢谢!
下面是python代码:

import logging
import time
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider

class PythonCassandraExample:

    def __init__(self):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log

    # Create Keyspace based on Given Name
    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self):
        c_sql = """
                CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                              ename varchar,
                                              sal double,
                                              city varchar);
                 """
        self.session.execute(c_sql)
        self.log.info("Employee Table Created !!!")

    # lets do some batch insert
    def insert_data(self):
        i = 1
        while i < 1000000:
          insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
          batch = BatchStatement()
          batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))
          self.session.execute(batch)
          # self.log.info('Batch Insert Completed for ' + str(i))
          i += 1

    # def select_data(self):
    #    rows = self.session.execute('select count(*) from perftest.employee limit 5;')
    #    for row in rows:
    #        print(row.ename, row.sal)

    def update_data(self):
        pass

    def delete_data(self):
        pass

if __name__ == '__main__':
    example1 = PythonCassandraExample()
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('perftest')
    example1.create_table()

    # Populate perftest.employee table
    start = time.time()
    example1.insert_data()
    end = time.time()
    print ('Duration: ' + str(end-start) + ' sec.')

    # example1.select_data()
sdnqo3pr

sdnqo3pr1#

这里有多个问题:
对于第二个测试,您没有为ddac分配足够的内存和内核,所以cassandra只得到1gb堆—cassandra默认占用所有可用内存的1/4。第三个测试也是如此-它只能得到16gb的ram作为堆,您可能需要将它提升到更高的值,比如24gb甚至更高。
不清楚每个测试中有多少iops—根据卷的大小和类型,ebs具有不同的吞吐量
您使用的是同步api来执行命令—基本上是在确认插入了前一项之后插入下一项。使用异步api可以获得最佳的吞吐量;
您在每次迭代中都在准备语句—这会导致每次都向服务器发送cql字符串,所以这会减慢一切—只需移动行即可 insert_sql = self.session.prepare( 跳出循环;
(不完全相关)您使用的是批处理语句来写入数据—这在cassandra中是反模式的,因为数据只发送到一个节点,然后应该将数据分发给真正拥有数据的节点。这就解释了为什么4节点集群比1节点集群差。
p、 现实的负载测试是相当困难的。有专门的工具,你可以找到,例如,更多的信息在这个博客文章。

kzmpq1sx

kzmpq1sx2#

下面的更新代码将每100条记录进行一次批处理:

"""
Python  by Techfossguru
Copyright (C) 2017  Satish Prasad

"""
import logging
import warnings
import time
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider

class PythonCassandraExample:

    def __init__(self):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log

    # Create Keyspace based on Given Name
    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self):
        c_sql = """
                CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                              ename varchar,
                                              sal double,
                                              city varchar);
                 """
        self.session.execute(c_sql)
        self.log.info("Employee Table Created !!!")

    # lets do some batch insert
    def insert_data(self):
        i = 1
        insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
        batch = BatchStatement()
        warnings.filterwarnings("ignore", category=FutureWarning)

        while i < 1000001:
          # insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
          # batch = BatchStatement()
          batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))

          # Commit every 100 records
          if (i % 100 == 0):
             self.session.execute(batch)
             batch = BatchStatement()
             # self.log.info('Batch Insert Completed for ' + str(i))
          i += 1
        self.session.execute(batch)

    # def select_data(self):
    #    rows = self.session.execute('select count(*) from actimize.employee limit 5;')
    #    for row in rows:
    #        print(row.ename, row.sal)

    def update_data(self):
        pass

    def delete_data(self):
        pass

if __name__ == '__main__':
    example1 = PythonCassandraExample()
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('actimize')
    example1.create_table()

    # Populate actimize.employee table
    start = time.time()
    example1.insert_data()
    end = time.time()
    print ('Duration: ' + str(end-start) + ' sec.')

    # example1.select_data()

相关问题