我正在搜索与\u unixtime(bigint unixtime)中的函数等价的函数,它存在于spark sql和flink sql中。我的目标是转换这个格式:1439799094格式:2015-05-18 05:43:37
noj0wjuj1#
只需使用自定义项!https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html示例用法 test.csvcreation_date|key 1535816823|1 1536392928|2 1536272308|3EpochTimeConverter.scala ```import java.time.format.DateTimeFormatterimport java.time.{Instant, LocalDateTime, ZoneId}
test.csv
creation_date|key 1535816823|1 1536392928|2 1536272308|3
EpochTimeConverter.scala
import org.apache.flink.table.functions.ScalarFunction
class EpochTimeConverter extends ScalarFunction {def eval(epochTime: Int): String = {// For performance, you may cache DateTimeFormatter in real lifeval timePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")timePattern.format(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochTime), ZoneId.systemDefault()))}}`UdfExample.scala`import org.apache.flink.api.scala.{ExecutionEnvironment, }import org.apache.flink.table.api.scala.import org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.sources.CsvTableSourceimport org.apache.flink.types.Row
DateTimeFormatter
`UdfExample.scala`
object UdfExample {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval tableEnv = TableEnvironment.getTableEnvironment(env)val csvData = CsvTableSource.builder().path("test.csv").ignoreFirstLine().fieldDelimiter("|").field("creation_date", Types.INT).field("key", Types.INT).build()
tableEnv.registerTableSource("temp_table", csvData) println("Without udf:") tableEnv.sqlQuery("SELECT creation_date, key FROM temp_table").toDataSet[Row].print() tableEnv.registerFunction("from_unixtime", new EpochTimeConverter()) println() println("With udf:") tableEnv.sqlQuery("select from_unixtime(creation_date),key from temp_table").toDataSet[Row].print()
}}
如果你跑了 `UdfExample.scala` ,它将产生类似的输出,如下所示:
Without udf:1535816823,11536272308,31536392928,2
With udf:2018-09-01 18:47:03,12018-09-07 01:18:28,32018-09-08 10:48:48,2
agyaoht72#
我想你在找 DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:%s') .有关详细信息,请参阅有关内置函数和日期格式说明符的文档。
DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:%s')
2条答案
按热度按时间noj0wjuj1#
只需使用自定义项!
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
示例用法
test.csv
creation_date|key 1535816823|1 1536392928|2 1536272308|3
EpochTimeConverter.scala
```import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDateTime, ZoneId}
import org.apache.flink.table.functions.ScalarFunction
class EpochTimeConverter extends ScalarFunction {
def eval(epochTime: Int): String = {
// For performance, you may cache
DateTimeFormatter
in real lifeval timePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
timePattern.format(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochTime), ZoneId.systemDefault()))
}
}
`UdfExample.scala`
import org.apache.flink.api.scala.{ExecutionEnvironment, }
import org.apache.flink.table.api.scala.
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row
object UdfExample {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvData = CsvTableSource
.builder()
.path("test.csv")
.ignoreFirstLine()
.fieldDelimiter("|")
.field("creation_date", Types.INT)
.field("key", Types.INT)
.build()
}
}
Without udf:
1535816823,1
1536272308,3
1536392928,2
With udf:
2018-09-01 18:47:03,1
2018-09-07 01:18:28,3
2018-09-08 10:48:48,2
agyaoht72#
我想你在找
DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:%s')
.有关详细信息,请参阅有关内置函数和日期格式说明符的文档。