cassandra 具有CQLMap和嵌套UDT的表的批插入失败

wljmcqd8  于 8个月前  发布在  Cassandra
关注(0)|答案(1)|浏览(65)

我尝试向cassandra写入一些数据,建立了一个连接,这是我的数据库模式:

CREATE TABLE IF NOT EXISTS road_traffic (
    road_id INT,
    timestamp TIMESTAMP,
    radar_id INT,
    vehicles MAP<INT, FROZEN<UDTVehicle>>,
    PRIMARY KEY (road_id, timestamp) // partition key and clustering key
);

CREATE TYPE IF NOT EXISTS UDTVehicle (
    num_vehicles INT,
    speed LIST<FLOAT>
);

这是我的Python代码:

def write_to_cassandra(self,session, record):
        insert_query = """
        INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles)
        VALUES (?, ?, ?, ?);
        """
        prepared_insert = session.prepare(insert_query)
        batch = BatchStatement()
        batch_size = 0
        batches= 0
        print(record)
        if not all(record.get(field) is not None for field in list(record.keys())) :
            logger.warning("Record is missing a required fields")
            return
        else :
            vehicles_map = {record.get('road_id'): record.get('Vehicles')}
            print(record.get('road_id'))
            batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
            batch_size += 1

据我所知,vehicles_map必须是一个键(分区键)和一个值(UDTVehicle对象)。当我打印记录时,我得到了这个:

{'timestamp': datetime.datetime(2022, 1, 1, 8, 0, 30), 'num_Vehicles': 7, 'road_id': 9, 'radar_id': 11, 'Vehicles': {'num_Vehicles': 7, 'speed': [72.78, 62.67, 85.15, 75.51, 83.95, 76.39, 57.92]}}

我得到的错误如下:

Traceback (most recent call last):
  File "cassandra_kafka/ingest_cassandra.py", line 135, in <module>
    Data_ingest.consume_store()
  File "cassandra_kafka/ingest_cassandra.py", line 113, in consume_store
    self.write_to_cassandra(session, value)
  File "cassandra_kafka/ingest_cassandra.py", line 66, in write_to_cassandra
    batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
  File "cassandra/query.py", line 827, in cassandra.query.BatchStatement.add
  File "cassandra/query.py", line 506, in cassandra.query.PreparedStatement.bind
  File "cassandra/query.py", line 636, in cassandra.query.BoundStatement.bind
  File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
  File "cassandra/cqltypes.py", line 909, in cassandra.cqltypes.MapType.serialize_safe
  File "cassandra/cqltypes.py", line 324, in cassandra.cqltypes._CassandraType.to_binary
  File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
  File "cassandra/cqltypes.py", line 1030, in cassandra.cqltypes.UserType.serialize_safe
KeyError: 0

我做错了什么?任何帮助将不胜感激。

13z8s7eq

13z8s7eq1#

在Python驱动程序中插入包含UDT的行的正确方法是使用与UDT具有相同结构的简单类,并在为insert语句创建值时使用它。
我准备了一个简单的代码来演示我的意思:检查UdtVehicle类以及在批处理中为insert语句创建参数时如何示例化它。
根据传递的命令行arg,示例代码将继续演示其他一些内容。read显示了当“只是”按原样阅读行时得到的结果,read_udt显示了如何在集群中注册UDT,并将返回的行很好地转换为Python类,insert是对单行的健全性检查(=非批处理)准备好的插入语句(UDT按照上面的解释正确处理),而insertb在批处理中验证了前一个case的使用。
有关处理UDT的更多信息,请查看此页面:https://docs.datastax.com/en/developer/python-driver/latest/user_defined_types。请注意,对于 * prepared * 语句,您需要稍微不同的方法(无论如何,您可能希望在生产应用程序中使用prepared语句)。
查看上面的代码,您可能希望在您发布的函数中将车辆转换为UDT(取决于您通过Kafka流接收的确切内容)。请注意,只有显式调用session.execute(batch)(代码中没有显示)后,才会执行批处理。
另一对夫妇的意见,为您的意识:
1.不要在每次写操作时都准备语句:这是一个消耗资源的反模式。一旦准备好语句,就将其缓存在某处(self.prepared_statement是一个自然的候选者),然后使用它以获得更好的性能
1.你构造的UDT不能保证num_vehicles == len(speed)。如果应该强制执行,可能不同的模型会更好(但这又取决于您的用例)
1.在这种情况下,要格外小心地评估你是否真的需要一批。Cassandra中的批处理不是一种加速不分青红皂白的批量插入的方法!为此,您可以只发出一些并发的简单写入,驱动程序将负责其余的工作。只要给定批处理中的语句涉及不同的分区(在您的示例中,是不同的road_id值),那么可能要避免单个批处理。在此阅读更多信息:https://batey.info/cassandra-anti-pattern-cassandra-logged.htmlhttps://www.batey.info/cassandra-anti-pattern-misuse-of.html .
下面是你可以开始的示例代码。(在Cassandra 4.1上测试)

import sys
import datetime

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement, BatchStatement

class UdtVehicle():
    def __init__(self, num_vehicles, speed):
        self.num_vehicles = num_vehicles
        self.speed = speed

    def __repr__(self):
        return f"UdtVehicle[{self.num_vehicles} vehicles, speeds={', '.join('%.2f' % sp for sp in self.speed)}]"

if __name__ == '__main__':
    cluster = Cluster(
        ["CONTACT_POINT"],
        auth_provider=PlainTextAuthProvider(
            "USERNAME",
            "PASSWORD",
        ),
    )
    session = cluster.connect("KEYSPACE_NAME")

    cmd = sys.argv[1] if len(sys.argv) > 1 else "read"
    if cmd == "read":
        for r in session.execute("select * from road_traffic;"):
            print(str(r))
            print('-'*20)
            _one_udt = list(r.vehicles.values())[0]
            print(type(_one_udt))
            print(str(_one_udt))
            print('='*20)
    elif cmd == "read_udt":
        cluster.register_user_type("KEYSPACE_NAME", "udtvehicle", UdtVehicle)
        for r in session.execute("select * from road_traffic;"):
            print(str(r))
            print('-'*20)
            _one_udt = list(r.vehicles.values())[0]
            print(type(_one_udt))
            print(str(_one_udt))
            print('='*20)
    elif cmd == "insert":
        insertion_prepared = session.prepare("INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles) VALUES (?, ?, ?, ?);")
        road_id = 123
        timestamp = datetime.datetime.now()
        radar_id = 456
        vehicles = {
            10: UdtVehicle(
                num_vehicles=1,
                speed=[100.1, 100.2],
            ),
            999: UdtVehicle(
                num_vehicles=100,
                speed=[],
            ),
            11: UdtVehicle(
                num_vehicles=3,
                speed=[0.1, 0.2, 0.3],
            ),
        }
        result = session.execute(insertion_prepared, (road_id, timestamp, radar_id, vehicles))
    elif cmd == "insertb":
        insertion_prepared = session.prepare("INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles) VALUES (?, ?, ?, ?);")
        batch = BatchStatement()
        # as per best practices, this batch will be a single-partition batch!
        road_id = 100
        t0 = datetime.datetime.now()
        for row_id in range(3):
            timestamp = t0 + datetime.timedelta(hours=row_id)
            radar_id = 1000 + row_id
            vehicles = {
                (1000+row_id+3): UdtVehicle(
                    num_vehicles=row_id+30,
                    speed=[10.01] * (1+row_id),
                )
            }
            batch.add(insertion_prepared, (road_id, timestamp, radar_id, vehicles))
        # run the batch
        session.execute(batch)
    else:
        print("Unknown command '%s'" % cmd)

相关问题