java—使用ApacheHttpClient增量处理twitter的流api?

qyuhtwio  于 2021-06-04  发布在  Flume
关注(0)|答案(2)|浏览(322)

我使用apachehttpclient4连接到twitter的流式api,并具有默认级别的访问权限。它在一开始工作得非常好,但在检索数据几分钟后,它会出现以下错误:

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443]
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated.
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216)
Make sure to release the connection before allocating another one.
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190)

我明白我为什么要面对这个问题。我正在尝试将flume集群中的这个httpclient用作flume源。代码如下所示:

public Event next() throws IOException, InterruptedException {

    try {

        HttpHost target = new HttpHost("stream.twitter.com", 443, "https");
        new BasicHttpContext();
        HttpPost httpPost = new HttpPost("/1/statuses/filter.json");
        StringEntity postEntity = new StringEntity("track=birthday",
                "UTF-8");
        postEntity.setContentType("application/x-www-form-urlencoded");
        httpPost.setEntity(postEntity);
        HttpResponse response = httpClient.execute(target, httpPost,
                new BasicHttpContext());
        BufferedReader reader = new BufferedReader(new InputStreamReader(
                response.getEntity().getContent()));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>30000) break;
        }
        return new EventImpl(buffer.toString().getBytes());
    } catch (IOException ie) {
        throw ie;
    }

}

我试图将响应流中的30000个字符缓冲到stringbuffer中,然后将其作为接收到的数据返回。我显然没有关闭连接-但我不想关闭它只是现在我猜。twitter的开发指南谈到了这一点,它写道:
某些http客户机库仅在服务器关闭连接后返回响应正文。这些客户端将无法访问流式api。您必须使用将以增量方式返回响应数据的http客户端。大多数健壮的http客户机库将提供此功能。例如,apachehttpclient将处理这个用例。
它清楚地告诉您httpclient将以增量方式返回响应数据。我已经阅读了这些示例和教程,但还没有找到任何与此接近的内容。如果你们已经使用了httpclient(如果不是apache的话)并且以增量方式阅读了twitter的流式api,请告诉我你们是如何实现这一壮举的。那些没有,请随时贡献答案。蒂亚。
更新
我试着这样做:1)我将获取流句柄移动到flume源的open方法。2) 使用简单的inpustream并将数据读入bytebuffer。下面是方法体现在的样子:

byte[] buffer = new byte[30000];

        while (true) {
            int count = instream.read(buffer);
            if (count == -1)
                continue;
            else
                break;
        }
        return new EventImpl(buffer);

这在某种程度上是有效的——我收到推特,它们很好地被写到了一个目的地。问题在于instream.read(buffer)返回值。即使流中没有数据,缓冲区也有默认的\u0000字节和30000字节,因此该值将被写入目标。因此目标文件如下所示..”推特..推特..两个星期\u0000\u0000\u0000\u0000\u0000\u0000\u0000…tweets..tweets。我知道计数不会返回-1,因为这是一个永无止境的流,所以我如何判断缓冲区是否有来自read命令的新内容?

tzcvj98z

tzcvj98z1#

原来是Flume问题。flume经过优化,可以传输大小为32kb的事件。任何超过32kb的内容,flume都会退出(解决方法是将事件大小调整为大于32kb)。所以,我把代码改为至少缓冲20000个字符。这算是可行的,但不是万无一失的。如果缓冲区长度超过32kb,这仍然会失败,然而,到目前为止,它在一小时的测试中还没有失败——我相信这与twitter在其公共流上没有发送大量数据有关。

while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>20000) break;
        }
ujv3wf0j

ujv3wf0j2#

问题是您的代码正在泄漏连接。请确保无论您是关闭内容流还是中止请求。

InputStream instream = response.getEntity().getContent();
    try {
        BufferedReader reader = new BufferedReader(
               new InputStreamReader(instream));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if (buffer.length()>30000) {
               httpPost.abort();
               // connection will not be re-used
               break;
            }
        }
        return new EventImpl(buffer.toString().getBytes());
    } finally {
        // if request is not aborted the connection can be re-used
        try {
          instream.close();
        } catch (IOException ex) {
          // log or ignore
        }
    }

相关问题