使用websockets从服务器流式处理事件

oogrdqng  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(297)

我希望使用服务器上的web套接字流式处理事件,该服务器的地址以:
wss://example.de/outbound/outboundinterface
我正在寻找使用apachespark、apacheflink等技术从服务器流式传输事件的选项。
我不确定如何将这些事件流到我的大学后端项目中。
任何指针都将有助于指示如何处理问题语句。

0yg35tkg

0yg35tkg1#

一种方法是编写一个小 Package 器,在websocket接口和标准tcp套接字(apachespark和apacheflink都支持)之间架起桥梁。下面是一个python示例,它连接到wikipedia websocket服务并打印出编辑流:


# !/usr/bin/env python

# -*- coding: utf-8 -*-

# sudo pip install socketIO_client==0.5.6

# ./simple.py 2> /dev/null | nc -lk 9999

import socketIO_client
import json

class WikiNamespace(socketIO_client.BaseNamespace):
    def on_change(self, change):
        print(json.dumps(change))

    def on_connect(self):
        self.emit('subscribe', 'en.wikipedia.org')

socketIO = socketIO_client.SocketIO('stream.wikimedia.org', 80)
socketIO.define(WikiNamespace, '/rc')

while True:
    socketIO.wait(10)

然后,您可以通过nc将其传输到flink,etal可以连接到的套接字(这里我将错误重定向到/dev/null—您可以将它们放在一个文件中):

./simple.py 2> /dev/null | nc -lk 9999

相关问题