我想得到每一行的当前时间戳。我使用以下代码
dataframe.withColumn("current_date",current_timestamp());
但当前的时间戳()是在序列化之前计算的,所以我总是得到相同的日期。如何计算每行Dataframe的当前\u timestamp()。我需要你的帮助。谢谢您。
zc0qhyus1#
试试这个-
df2.withColumn("current_date", expr("reflect('java.lang.System', 'currentTimeMillis')")) .show(false) /** * +-----+------+-------------+ * |class|gender|current_date | * +-----+------+-------------+ * |1 |m |1594137247247| * |1 |m |1594137247247| * |1 |f |1594137247247| * |2 |f |1594137247272| * |2 |f |1594137247272| * |3 |m |1594137247272| * |3 |m |1594137247272| * +-----+------+-------------+ */ df2.withColumn("current_date", expr("reflect('java.time.LocalDateTime', 'now')")) .show(false) /** * +-----+------+-----------------------+ * |class|gender|current_date | * +-----+------+-----------------------+ * |1 |m |2020-07-07T21:24:07.377| * |1 |m |2020-07-07T21:24:07.378| * |1 |f |2020-07-07T21:24:07.378| * |2 |f |2020-07-07T21:24:07.398| * |2 |f |2020-07-07T21:24:07.398| * |3 |m |2020-07-07T21:24:07.398| * |3 |m |2020-07-07T21:24:07.398| * +-----+------+-----------------------+ */ // you can convert current_date to timestamp by casting it to "timestamp"
dhxwm5r42#
即使是直接的python表达式也被视为序列化时间常量,下面的代码也为每一行提供相同的时间值,
dataframe.withColumn("current_date", F.lit( time.time()))
但是为时间值生成一个udf会使它在运行时解析时间值,如下所示,
from pyspark.sql.functions import udf def get_time(): return time.time() time_udf=udf(get_time) dataframe.withColumn("current_date", time_udf())
希望这有帮助!!
2条答案
按热度按时间zc0qhyus1#
试试这个-
dhxwm5r42#
即使是直接的python表达式也被视为序列化时间常量,下面的代码也为每一行提供相同的时间值,
但是为时间值生成一个udf会使它在运行时解析时间值,如下所示,
希望这有帮助!!