数据记录脚本问题-部分数据保存在csv中

ki1q1bka  于 8个月前  发布在  其他
关注(0)|答案(1)|浏览(58)

我开发了一个Python脚本,用于使用Modbus通信进行数据记录。该脚本的主要目的是以大约每6-9毫秒的高频率从Modbus连接的设备收集数据,例如RPM,扭矩,温度和压力。然后将这些数据保存到CSV文件中以供以后分析。
我将其临时保存在pandas Dataframe 中以减少读取延迟,然后在最后将其写入csv
剧本一开始看起来不错;然而,当我长时间运行它时,比如一个小时,它福尔斯达到我的预期。不是以预期的速率连续记录数据,而是在最初的10分钟左右后数据收集似乎停止,导致CSV文件中只有大约117,000行。这与预期的数据收集速率不一致,这将导致一个小时内的数据集明显更大。
我已经提供了负责数据记录的Python脚本。它利用pymodbus库进行Modbus通信,并利用pandas库进行数据存储。该脚本连接到Modbus设备,以所需的速率检索数据,并将其记录到CSV文件中。

class Data_Logger:
    TIME_FORMAT = "%d-%m-%Y %H:%M:%S.%f"

    def __init__(self, client, ip):
        self.IP = ip
        self.REG_start = 380
        self.RPM = []
        self.timestamps = []
        self.client = client
        self.start_time = datetime.datetime.now()
        self.headers = ['Timestamp', 'RPM',
                       'TORQUE', 'PS',
                       'DY RPM', 'PLUG TEMP',
                       'OIL TEMP', 'PRESSURE ATM',
                       'ENGINE TORQUE', 'FUEL CONSUMPTION',
                       'WET TEMP', 'DRY TEMP',
                       'INTAKE TEMP', 'OIL PRESSURE']
        self.data = pd.DataFrame(columns=self.headers)

    def read_reg(self):
        response = self.client.read_holding_registers(self.REG_start, 49)
        self.RPM = response.registers

    def timestamp_update(self):
        time = tt.timeit(self.read_reg, number=1) * 1000.0
        self.start_time += datetime.timedelta(milliseconds=float(time))
        row_data = [self.start_time.strftime(self.TIME_FORMAT)] + [self.RPM[i] for i in range(0,49,4)]
        new_data = pd.DataFrame([row_data], columns=self.headers)
        self.data = pd.concat([self.data, new_data], ignore_index=True)

    def clear_data(self):
        self.RPM = []
        self.timestamps = []
        self.data = pd.DataFrame(columns=self.headers)  # Reinitialize the DataFrame

    def check_connection_quality(self):
        try:
            response = self.client.read_holding_registers(self.REG_start, 1)  # Read a known register
            if response:
                return True  # Successful read, connection is active
        except Exception as e:
            print(f"Connection Quality Check Error\n{e}\nPlease Restart the APP")
        return False  # Unable to read the register, connection issue

    def save(self):
        date = datetime.datetime.now().strftime("%d-%m-%Y")
        file_name = f"{date} Engine Test Bench Data.csv"

        if os.path.exists(file_name):
            file_number = 1
            while os.path.exists(f"{file_name} ({file_number}).csv"):
                file_number += 1
            self.data.to_csv(f"{file_name} ({file_number}).csv")
        else:
            self.data.to_csv(file_name)

    def plot(self):
        plt.plot(self.data['RPM'])
        # Hide x-axis labels
        plt.xticks([])
        # Set labels and title
        plt.xlabel('Timestamp (ms)')
        plt.ylabel('RPM')
        plt.title('RPM vs. Timestamp')

        # Show the plot
        plt.show()

    def run(self):
        self.timestamp_update()
class App:
    def __init__(self):
        #gui objects
        self.interrupt = False
    def quality_check(self):
        try:
            for i in range(5):
                self.client.read(380)
            return True
        except AttributeError as e:
            messagebox.showerror("Read Error","Cannot Read Device Register please restart the logger")
            return False
    def connect(self):
        ip = self.ip_entry.get()
        try:
            # Validate the entered IP address
            ipaddress.IPv4Address(ip)
            self.client = ModbusTcpClient(ip, debug=True)
            self.logger = Data_Logger(client=self.client, ip=ip)
            self.quality_check()
            if self.client.connect():
                self.log("PLC Connected")
                self.status.config(text="PLC Connected", foreground='#00FB28')
                self.start.configure(state="normal")
                self.connect_button.configure(state="disabled")
                self.ip_entry.delete(0, 'end')
                time.sleep(2)
            else:
                self.log("PLC Connection Failed")

        except (ipaddress.AddressValueError, ValueError):
            messagebox.showerror("Invalid IP Address", "Please enter a valid IPv4 address.")
        except Exception as e:
            messagebox.showerror("Connection Error", str(e))

    def start_log(self):
        self.console.delete(0,'end')
        self.p_bar.start()
        self.log("Logging Started")
        self.start.configure(state="disabled")
        self.stop.configure(state="normal")
        try:
            while not self.interrupt:
                self.logger.run()
        except AttributeError as e:
            messagebox.showerror("Modbus I/O exception",f"\nRestart the Logger\n{e}")

    def thread_helper(self):
        thread = Thread(target=self.start_log)
        thread.start()

    def stop_log(self):
        self.interrupt = True
        self.p_bar.stop()
        self.log("Logging Stopped")
        self.log("Saving Data")
        self.p_bar.start()
        save_thread=Thread(target=self.logger.save)
        save_thread.start()
        self.log("Data Saved")
        self.p_bar.stop()
        self.start.configure(state="normal")
        self.stop.configure(state="disabled")
        self.logger.plot()
        self.logger.clear_data()

    def button_init(self):
        self.connect_button = tk.ttk.Button(self.connect_frame, command=self.connect, text="Connect", width=20)
        self.start = tk.ttk.Button(self.control_frame, command=self.thread_helper, text="Start", state="disabled", width=22)
        self.stop = tk.ttk.Button(self.control_frame, command=self.stop_log, text="Stop", state="disabled", width=23)

    def run(self):
        self.main.mainloop()

字符串
数据与环境:
操作系统:Windows 10 Python版本:Python 3.9.16相关库:pymodbus、matplotlib、pandas数据源:Modbus与PLC(可编程逻辑控制器)通信

  • 我已经在多台机器上测试了该脚本,以排除硬件相关问题。该问题在不同的硬件设置中仍然存在,这表明该问题可能不是特定于特定机器或系统配置。
    **预期:**当运行Python数据日志脚本时,我的预期是以高频率持续记录数据,大约每6-9毫秒一次。在一个小时的过程中,我预期CSV文件中有大量数据集,有数十万行。
    **实际结果:**实际结果与我的预期不符。虽然脚本最初按预期记录数据,但在前10分钟后莫名其妙地停止收集数据。这导致数据集比预期小得多,一个小时后CSV文件中只有大约117,000行。
    额外-在2-3小时长的测试中,csv文件为空,只有标题。-未发现错误或异常
iqih9akk

iqih9akk1#

我把它暂时保存在pandas Dataframe 中以减少读取延迟,然后在最后把它写入csv。
我不相信添加进程会减少延迟,以及Pandas is just plain slow for iterating行/记录的数据。
Python的标准csv模块对于编码CSV数据来说已经足够快了。之后,你的主要目标的限制将是Python的文件写入器的本地缓冲行为。你可以通过为你需要写入的每一行在底层文件对象上调用flush()来解决这个问题。我说“大多数”是因为在每次调用flush时写入磁盘是不保证的-查看this answer的顶部评论。
孤立地看,这一切看起来像:

with open("output.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["ID", "Measured value", "Time monotonic"])
    f.flush()
    while True:
        writer.writerow(get_measured_values())
        f.flush()

字符串
这里有一个完整的程序,它模拟了一个生产者发送一些值到队列中,大约每7毫秒,一个消费者在这些值准备好后立即将它们从队列中取出并写入CSV。运行tail -f output.csv,我可以看到CSV文件正在逐行更新,每秒多行。

import asyncio
import csv
import random
import time

MyQueue = asyncio.Queue[tuple[int, float]]

async def rnd_sleep(t: float):
    """Sleep for t seconds, on average."""
    await asyncio.sleep(t * random.random() * 2)

ITERATIONS = 100_000

async def producer(queue: MyQueue):
    for i in range(1, ITERATIONS):
        token = random.random()
        await queue.put((i, token))
        await rnd_sleep(0.007)
        if i % 1_000 == 0:
            print(f"wrote row {i:>6}")

async def consumer(queue: MyQueue):
    with open("output.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)

        writer.writerow(["ID", "Measured value", "Time monotonic"])
        f.flush()
        while True:
            i, token = await queue.get()
            writer.writerow([i, token, time.monotonic()])
            f.flush()
            queue.task_done()

async def main():
    queue: MyQueue = asyncio.Queue()
    producers = [asyncio.create_task(producer(queue))]
    asyncio.create_task(consumer(queue))
    await asyncio.gather(*producers)

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass
except:
    raise


(My第一次用Python写Python代码时,我从Using asyncio.Queue for producer-consumer flow中拼凑出来的。

相关问题