我开发了一个Python脚本,用于使用Modbus通信进行数据记录。该脚本的主要目的是以大约每6-9毫秒的高频率从Modbus连接的设备收集数据,例如RPM,扭矩,温度和压力。然后将这些数据保存到CSV文件中以供以后分析。
我将其临时保存在pandas Dataframe 中以减少读取延迟,然后在最后将其写入csv
剧本一开始看起来不错;然而,当我长时间运行它时,比如一个小时,它福尔斯达到我的预期。不是以预期的速率连续记录数据,而是在最初的10分钟左右后数据收集似乎停止,导致CSV文件中只有大约117,000行。这与预期的数据收集速率不一致,这将导致一个小时内的数据集明显更大。
我已经提供了负责数据记录的Python脚本。它利用pymodbus库进行Modbus通信,并利用pandas库进行数据存储。该脚本连接到Modbus设备,以所需的速率检索数据,并将其记录到CSV文件中。
class Data_Logger:
TIME_FORMAT = "%d-%m-%Y %H:%M:%S.%f"
def __init__(self, client, ip):
self.IP = ip
self.REG_start = 380
self.RPM = []
self.timestamps = []
self.client = client
self.start_time = datetime.datetime.now()
self.headers = ['Timestamp', 'RPM',
'TORQUE', 'PS',
'DY RPM', 'PLUG TEMP',
'OIL TEMP', 'PRESSURE ATM',
'ENGINE TORQUE', 'FUEL CONSUMPTION',
'WET TEMP', 'DRY TEMP',
'INTAKE TEMP', 'OIL PRESSURE']
self.data = pd.DataFrame(columns=self.headers)
def read_reg(self):
response = self.client.read_holding_registers(self.REG_start, 49)
self.RPM = response.registers
def timestamp_update(self):
time = tt.timeit(self.read_reg, number=1) * 1000.0
self.start_time += datetime.timedelta(milliseconds=float(time))
row_data = [self.start_time.strftime(self.TIME_FORMAT)] + [self.RPM[i] for i in range(0,49,4)]
new_data = pd.DataFrame([row_data], columns=self.headers)
self.data = pd.concat([self.data, new_data], ignore_index=True)
def clear_data(self):
self.RPM = []
self.timestamps = []
self.data = pd.DataFrame(columns=self.headers) # Reinitialize the DataFrame
def check_connection_quality(self):
try:
response = self.client.read_holding_registers(self.REG_start, 1) # Read a known register
if response:
return True # Successful read, connection is active
except Exception as e:
print(f"Connection Quality Check Error\n{e}\nPlease Restart the APP")
return False # Unable to read the register, connection issue
def save(self):
date = datetime.datetime.now().strftime("%d-%m-%Y")
file_name = f"{date} Engine Test Bench Data.csv"
if os.path.exists(file_name):
file_number = 1
while os.path.exists(f"{file_name} ({file_number}).csv"):
file_number += 1
self.data.to_csv(f"{file_name} ({file_number}).csv")
else:
self.data.to_csv(file_name)
def plot(self):
plt.plot(self.data['RPM'])
# Hide x-axis labels
plt.xticks([])
# Set labels and title
plt.xlabel('Timestamp (ms)')
plt.ylabel('RPM')
plt.title('RPM vs. Timestamp')
# Show the plot
plt.show()
def run(self):
self.timestamp_update()
class App:
def __init__(self):
#gui objects
self.interrupt = False
def quality_check(self):
try:
for i in range(5):
self.client.read(380)
return True
except AttributeError as e:
messagebox.showerror("Read Error","Cannot Read Device Register please restart the logger")
return False
def connect(self):
ip = self.ip_entry.get()
try:
# Validate the entered IP address
ipaddress.IPv4Address(ip)
self.client = ModbusTcpClient(ip, debug=True)
self.logger = Data_Logger(client=self.client, ip=ip)
self.quality_check()
if self.client.connect():
self.log("PLC Connected")
self.status.config(text="PLC Connected", foreground='#00FB28')
self.start.configure(state="normal")
self.connect_button.configure(state="disabled")
self.ip_entry.delete(0, 'end')
time.sleep(2)
else:
self.log("PLC Connection Failed")
except (ipaddress.AddressValueError, ValueError):
messagebox.showerror("Invalid IP Address", "Please enter a valid IPv4 address.")
except Exception as e:
messagebox.showerror("Connection Error", str(e))
def start_log(self):
self.console.delete(0,'end')
self.p_bar.start()
self.log("Logging Started")
self.start.configure(state="disabled")
self.stop.configure(state="normal")
try:
while not self.interrupt:
self.logger.run()
except AttributeError as e:
messagebox.showerror("Modbus I/O exception",f"\nRestart the Logger\n{e}")
def thread_helper(self):
thread = Thread(target=self.start_log)
thread.start()
def stop_log(self):
self.interrupt = True
self.p_bar.stop()
self.log("Logging Stopped")
self.log("Saving Data")
self.p_bar.start()
save_thread=Thread(target=self.logger.save)
save_thread.start()
self.log("Data Saved")
self.p_bar.stop()
self.start.configure(state="normal")
self.stop.configure(state="disabled")
self.logger.plot()
self.logger.clear_data()
def button_init(self):
self.connect_button = tk.ttk.Button(self.connect_frame, command=self.connect, text="Connect", width=20)
self.start = tk.ttk.Button(self.control_frame, command=self.thread_helper, text="Start", state="disabled", width=22)
self.stop = tk.ttk.Button(self.control_frame, command=self.stop_log, text="Stop", state="disabled", width=23)
def run(self):
self.main.mainloop()
字符串
数据与环境:
操作系统:Windows 10 Python版本:Python 3.9.16相关库:pymodbus、matplotlib、pandas数据源:Modbus与PLC(可编程逻辑控制器)通信
- 我已经在多台机器上测试了该脚本,以排除硬件相关问题。该问题在不同的硬件设置中仍然存在,这表明该问题可能不是特定于特定机器或系统配置。
**预期:**当运行Python数据日志脚本时,我的预期是以高频率持续记录数据,大约每6-9毫秒一次。在一个小时的过程中,我预期CSV文件中有大量数据集,有数十万行。
**实际结果:**实际结果与我的预期不符。虽然脚本最初按预期记录数据,但在前10分钟后莫名其妙地停止收集数据。这导致数据集比预期小得多,一个小时后CSV文件中只有大约117,000行。
额外-在2-3小时长的测试中,csv文件为空,只有标题。-未发现错误或异常
1条答案
按热度按时间iqih9akk1#
我把它暂时保存在pandas Dataframe 中以减少读取延迟,然后在最后把它写入csv。
我不相信添加进程会减少延迟,以及Pandas is just plain slow for iterating行/记录的数据。
Python的标准csv模块对于编码CSV数据来说已经足够快了。之后,你的主要目标的限制将是Python的文件写入器的本地缓冲行为。你可以通过为你需要写入的每一行在底层文件对象上调用
flush()
来解决这个问题。我说“大多数”是因为在每次调用flush时写入磁盘是不保证的-查看this answer的顶部评论。孤立地看,这一切看起来像:
字符串
这里有一个完整的程序,它模拟了一个生产者发送一些值到队列中,大约每7毫秒,一个消费者在这些值准备好后立即将它们从队列中取出并写入CSV。运行
tail -f output.csv
,我可以看到CSV文件正在逐行更新,每秒多行。型
(My第一次用Python写Python代码时,我从Using asyncio.Queue for producer-consumer flow中拼凑出来的。