微博开放平台
微连接
移动应用
网站接入
电商服务商
电商商家
数据服务
数据服务
合作伙伴
微博支付
轻应用
粉丝服务
文档
推广
我的应用
登录
weibo
开发文档
首页
平台政策与指引
概述
平台公约
新手指南
开发者协议
应用运营管理规范
微连接分级管理办法
应用审核产品指南
应用安全开发注意事项
平台应用设计规范
微服务接入指南
微博登录接入
用微博帐号登录
授权机制
移动应用接入
移动应用介绍
移动应用SSO授权
微博Deep Link
媒体接入平台
头条文章开放接口
视频上传开放接口
电商接入平台
电商服务商接入
电商商家端接入
电商平台能力接口
粉丝服务平台
粉丝服务平台
新手接入指南
微信开发者迁移指南
接收消息
发送消息
自定义菜单
用户管理
生成带参数的二维码
Fans Service Platform
商业接口
商业数据接入指南
订阅服务手册(中文版)
订阅服务手册(英文版)
商业接口-REST API
商业数据常见问题
网站接入
网站接入介绍
微博API
微博API
接口访问频次权限
资源下载
SDK
微博标识下载
常见问题
联系我们
工具箱
链入页面
链出更改
特殊页面
查看源代码
跳转到:
导航
,
搜索
根据下列原因,你没有权限编辑本页:
您刚才请求的操作只有这个用户组中的用户才能使用:
用户
您可以查看并复制此页面的源代码:
<pre> //java源代码如下: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpStatus; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.Date; public class ReceiveMessageServiceTest { private static transient long sinceId = -1L; private final int recBufSize = 256; private String receiveCommentUrl; private DataInputStream inputStream; private byte[] recBuf; private int recIndex; public static void main(String[] args) { new ReceiveMessageServiceTest().init(); } /** * 启动获取数据线程 */ public void init() { receiveCommentUrl = "https://c.api.weibo.com/commercial/push?subid=10826"; new ReadTask().start(); } /** * 获取数据线程 */ class ReadTask extends Thread { /** * 启一个线程从服务器读取数据 */ @Override public void run() { boolean hasError = false; while (!hasError) { HttpURLConnection connection = null; recIndex = 0; recBuf = new byte[recBufSize]; try { connection = connectServer(sinceId); while (true) { processLine(); } } catch (Exception e) { // 当连接断开时,重新连接 System.out.println("connection close: " + e.getMessage()); if (e.getMessage().contains("errorCode")) { hasError = true; } System.out.println("last since_id: " + sinceId); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { connection.disconnect(); } System.out.println(new Date().toString()); } } } /** * 建立http连接 * * @return */ private HttpURLConnection connectServer(long sinceId) throws Exception { String targetURL = receiveCommentUrl; // 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 if (sinceId > 0L) { targetURL = targetURL + "&since_id=" + sinceId; } System.out.println("get url: " + targetURL); URL url = new URL(receiveCommentUrl); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); int statusCode; try { statusCode = connection.getResponseCode(); } catch (Exception e) { connection.disconnect(); throw new Exception("stream url connect failed", e); } if (statusCode != HttpStatus.SC_OK) { throw new RuntimeException(connection.getResponseMessage()); } try { inputStream = new DataInputStream(connection.getInputStream()); } catch (IOException e) { throw new RuntimeException("get stream input io exception", e); } return connection; } /** * 读取并处理数据 * * @throws IOException */ private void processLine() throws IOException { byte[] bytes = readLineBytes(); if ((bytes != null) && (bytes.length > 0)) { String message = new String(bytes); handleMessage(message); } } /** * 可以重写此方法解析message * * @param message */ private void handleMessage(String message) { System.out.println(message); JSONObject jsonObject = JSON.parseObject(message); sinceId = jsonObject.getLongValue("id"); } /** * 读取数据 * * @return * @throws IOException */ public byte[] readLineBytes() throws IOException { byte[] result; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int readCount; if ((recIndex > 0) && (read(bos))) { return bos.toByteArray(); } while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length - recIndex)) > 0) { recIndex = (recIndex + readCount); if (read(bos)) { break; } } result = bos.toByteArray(); if (result == null || result.length <= 0 && recIndex <= 0) { throw new IOException("no data in 5 second"); } return result; } /** * 读数据到bos * * @param bos * @return */ private boolean read(ByteArrayOutputStream bos) { boolean result = false; int index = -1; for (int i = 0; i < recIndex - 1; i++) { // 13cr-回车 10lf-换行 if ((recBuf[i] == 13) && (recBuf[(i + 1)] == 10)) { index = i; break; } } if (index >= 0) { bos.write(recBuf, 0, index); byte[] newBuf = new byte[recBufSize]; if (recIndex > index + 2) { System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex - index - 2); } recBuf = newBuf; recIndex = (recIndex - index - 2); result = true; } else if (recBuf[(recIndex - 1)] == 13) { bos.write(recBuf, 0, recIndex - 1); Arrays.fill(recBuf, (byte) 0); recBuf[0] = 13; recIndex = 1; } else { bos.write(recBuf, 0, recIndex); Arrays.fill(recBuf, (byte) 0); recIndex = 0; } return result; } } }
返回到
新版示例代码
。
反馈
分享
顶部