示例代码
(以“<pre> //从datapush服务器获取连接 //streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博 private List<String> s...”为内容创建页面) |
|||
(未显示3个用户的5个中间版本) | |||
第1行: | 第1行: | ||
<pre> | <pre> | ||
− | + | //java源代码如下: | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | import java.io.ByteArrayOutputStream; | |
− | + | import java.io.DataInputStream; | |
− | + | import java.io.IOException; | |
− | + | import java.util.Arrays; | |
− | + | import java.util.List; | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | import org.apache.commons.httpclient.HostConfiguration; | |
− | + | import org.apache.commons.httpclient.HttpClient; | |
− | + | import org.apache.commons.httpclient.HttpStatus; | |
− | + | import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; | |
− | + | import org.apache.commons.httpclient.methods.GetMethod; | |
− | + | ||
− | + | ||
− | + | import com.weibo.datapush.StreamingException; | |
− | + | ||
− | + | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | public class ReceiveMessageServiceTest { | |
− | + | private MultiThreadedHttpConnectionManager httpConnManager; | |
− | + | private HttpClient httpClient; | |
− | + | private List<String> streamingUrlList; | |
+ | private int curStreamUrlIndex; | ||
+ | public static long lastMsgLocation = -1L; | ||
+ | private DataInputStream inputStream; | ||
+ | private final int recBufSize = 256; | ||
+ | private byte[] recBuf; | ||
+ | private int recIndex; | ||
− | + | /** | |
+ | * 获取数据主线程 | ||
+ | * | ||
+ | * @param args | ||
+ | */ | ||
+ | public static void main(String[] args) { | ||
+ | ReceiveMessageServiceTest test = new ReceiveMessageServiceTest(); | ||
+ | test.init(); | ||
+ | } | ||
− | + | /** | |
− | + | * 初始化httpclient,并启动获取数据线程 | |
− | + | */ | |
− | + | public void init() { | |
− | + | this.streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=10021"); | |
+ | this.httpConnManager = new MultiThreadedHttpConnectionManager(); | ||
+ | this.httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, 10); | ||
+ | this.httpConnManager.getParams().setMaxTotalConnections(10); | ||
+ | this.httpConnManager.getParams().setSoTimeout(2147483647); | ||
+ | this.httpConnManager.getParams().setConnectionTimeout(10000); | ||
+ | this.httpConnManager.getParams().setReceiveBufferSize(655350); | ||
+ | this.httpClient = new HttpClient(this.httpConnManager); | ||
+ | new ReadTask().start(); | ||
+ | } | ||
− | + | /** | |
− | + | * 获取数据线程 | |
+ | * | ||
+ | */ | ||
+ | class ReadTask extends Thread { | ||
− | + | /** | |
− | + | * 启一个线程从服务器读取数据 | |
− | + | */ | |
+ | public void run() { | ||
+ | while (true) { | ||
+ | GetMethod method = null; | ||
+ | recIndex = 0; | ||
+ | recBuf = new byte[recBufSize]; | ||
+ | try { | ||
+ | method = connectStreamServer(); | ||
+ | while (true) { | ||
+ | processLine(); | ||
+ | } | ||
+ | } catch (Exception e) { | ||
+ | // 当连接断开时,重新连接 | ||
+ | System.out.println("streaming process error " + e.getMessage()); | ||
+ | lastMsgLocation = lastMsgLocation + 2; | ||
+ | curStreamUrlIndex = ++curStreamUrlIndex % streamingUrlList.size(); | ||
+ | } finally { | ||
+ | if (method != null) | ||
+ | method.releaseConnection(); | ||
+ | } | ||
+ | } | ||
+ | } | ||
− | + | /** | |
− | + | * 建立http连接 | |
− | + | * | |
− | + | * @return | |
− | + | */ | |
− | + | private GetMethod connectStreamServer() { | |
+ | String targetURL = (String) streamingUrlList.get(curStreamUrlIndex); | ||
+ | // 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 | ||
+ | if (lastMsgLocation > 0L) { | ||
+ | targetURL = targetURL + "&since_id=" + lastMsgLocation; | ||
+ | } | ||
+ | System.out.println("StreamingReceiver http get url=" + targetURL); | ||
+ | GetMethod method = new GetMethod(targetURL); | ||
+ | int statusCode = 0; | ||
+ | try { | ||
+ | statusCode = httpClient.executeMethod(method); | ||
+ | } catch (Exception e) { | ||
+ | if (method != null) { | ||
+ | method.releaseConnection(); | ||
+ | } | ||
+ | throw new StreamingException("stream url connect failed", e); | ||
+ | } | ||
− | + | if (statusCode != HttpStatus.SC_OK) { | |
− | + | throw new RuntimeException(".. "); | |
− | + | } | |
− | + | ||
− | + | try { | |
− | + | inputStream = new DataInputStream(method.getResponseBodyAsStream()); | |
− | + | } catch (IOException e) { | |
− | + | throw new RuntimeException("get stream input io exception", e); | |
− | + | } | |
− | + | ||
− | + | return method; | |
− | + | } | |
− | + | ||
− | + | /** | |
− | + | * 读取并处理数据 | |
− | + | * | |
− | + | * @throws IOException | |
− | + | */ | |
− | + | private void processLine() throws IOException { | |
− | + | byte[] bytes = readLineBytes(); | |
− | + | if ((bytes != null) && (bytes.length > 0)) { | |
− | + | String message = new String(bytes); | |
− | + | handleMessage(message); | |
− | + | } | |
+ | } | ||
+ | |||
+ | /** | ||
+ | * 可以重写此方法解析message,同时需将lastMsgLocation赋值为最近1条消息的id | ||
+ | * | ||
+ | * @param message | ||
+ | */ | ||
+ | private void handleMessage(String message) { | ||
+ | System.out.println(message); | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * 读取数据 | ||
+ | * | ||
+ | * @return | ||
+ | * @throws IOException | ||
+ | */ | ||
+ | public byte[] readLineBytes() throws IOException { | ||
+ | byte[] result = null; | ||
+ | ByteArrayOutputStream bos = new ByteArrayOutputStream(); | ||
+ | int readCount = 0; | ||
+ | if ((recIndex > 0) && (read(bos))) { | ||
+ | return bos.toByteArray(); | ||
+ | } | ||
+ | while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length - recIndex)) > 0) { | ||
+ | recIndex = (recIndex + readCount); | ||
+ | if (read(bos)) { | ||
+ | break; | ||
+ | } | ||
+ | } | ||
+ | result = bos.toByteArray(); | ||
+ | if ((result == null) || ((result != null) && (result.length <= 0) && (recIndex <= 0))) { | ||
+ | throw new IOException("++++ Stream appears to be dead, so closing it down"); | ||
+ | } | ||
+ | return result; | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * 读数据到bos | ||
+ | * | ||
+ | * @param bos | ||
+ | * @return | ||
+ | */ | ||
+ | private boolean read(ByteArrayOutputStream bos) { | ||
+ | boolean result = false; | ||
+ | int index = -1; | ||
+ | for (int i = 0; i < recIndex - 1; i++) { | ||
+ | // 13cr-回车 10lf-换行 | ||
+ | if ((recBuf[i] == 13) && (recBuf[(i + 1)] == 10)) { | ||
+ | index = i; | ||
+ | break; | ||
+ | } | ||
+ | } | ||
+ | if (index >= 0) { | ||
+ | bos.write(recBuf, 0, index); | ||
+ | byte[] newBuf = new byte[recBufSize]; | ||
+ | if (recIndex > index + 2) { | ||
+ | System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex - index - 2); | ||
+ | } | ||
+ | recBuf = newBuf; | ||
+ | recIndex = (recIndex - index - 2); | ||
+ | result = true; | ||
+ | } else if (recBuf[(recIndex - 1)] == 13) { | ||
+ | bos.write(recBuf, 0, recIndex - 1); | ||
+ | Arrays.fill(recBuf, (byte) 0); | ||
+ | recBuf[0] = 13; | ||
+ | recIndex = 1; | ||
+ | } else { | ||
+ | bos.write(recBuf, 0, recIndex); | ||
+ | Arrays.fill(recBuf, (byte) 0); | ||
+ | recIndex = 0; | ||
+ | } | ||
+ | |||
+ | return result; | ||
+ | } | ||
+ | } | ||
+ | |||
+ | } | ||
+ | |||
+ | |||
+ | </pre> | ||
+ | |||
+ | |||
+ | <pre> | ||
+ | //pom.xml中内容如下: | ||
+ | |||
+ | <dependencies> | ||
+ | <dependency> | ||
+ | <groupId>commons-logging</groupId> | ||
+ | <artifactId>commons-logging</artifactId> | ||
+ | <version>1.1.1</version> | ||
+ | </dependency> | ||
+ | <dependency> | ||
+ | <groupId>commons-codec</groupId> | ||
+ | <artifactId>commons-codec</artifactId> | ||
+ | <version>1.4</version> | ||
+ | </dependency> | ||
+ | <dependency> | ||
+ | <groupId>commons-httpclient</groupId> | ||
+ | <artifactId>commons-httpclient</artifactId> | ||
+ | <version>3.1</version> | ||
+ | </dependency> | ||
+ | </dependencies> | ||
</pre> | </pre> |
2017年1月9日 (一) 16:08的最后版本
//java源代码如下: import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.commons.httpclient.HostConfiguration; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.methods.GetMethod; import com.weibo.datapush.StreamingException; public class ReceiveMessageServiceTest { private MultiThreadedHttpConnectionManager httpConnManager; private HttpClient httpClient; private List<String> streamingUrlList; private int curStreamUrlIndex; public static long lastMsgLocation = -1L; private DataInputStream inputStream; private final int recBufSize = 256; private byte[] recBuf; private int recIndex; /** * 获取数据主线程 * * @param args */ public static void main(String[] args) { ReceiveMessageServiceTest test = new ReceiveMessageServiceTest(); test.init(); } /** * 初始化httpclient,并启动获取数据线程 */ public void init() { this.streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=10021"); this.httpConnManager = new MultiThreadedHttpConnectionManager(); this.httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, 10); this.httpConnManager.getParams().setMaxTotalConnections(10); this.httpConnManager.getParams().setSoTimeout(2147483647); this.httpConnManager.getParams().setConnectionTimeout(10000); this.httpConnManager.getParams().setReceiveBufferSize(655350); this.httpClient = new HttpClient(this.httpConnManager); new ReadTask().start(); } /** * 获取数据线程 * */ class ReadTask extends Thread { /** * 启一个线程从服务器读取数据 */ public void run() { while (true) { GetMethod method = null; recIndex = 0; recBuf = new byte[recBufSize]; try { method = connectStreamServer(); while (true) { processLine(); } } catch (Exception e) { // 当连接断开时,重新连接 System.out.println("streaming process error " + e.getMessage()); lastMsgLocation = lastMsgLocation + 2; curStreamUrlIndex = ++curStreamUrlIndex % streamingUrlList.size(); } finally { if (method != null) method.releaseConnection(); } } } /** * 建立http连接 * * @return */ private GetMethod connectStreamServer() { String targetURL = (String) streamingUrlList.get(curStreamUrlIndex); // 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 if (lastMsgLocation > 0L) { targetURL = targetURL + "&since_id=" + lastMsgLocation; } System.out.println("StreamingReceiver http get url=" + targetURL); GetMethod method = new GetMethod(targetURL); int statusCode = 0; try { statusCode = httpClient.executeMethod(method); } catch (Exception e) { if (method != null) { method.releaseConnection(); } throw new StreamingException("stream url connect failed", e); } if (statusCode != HttpStatus.SC_OK) { throw new RuntimeException(".. "); } try { inputStream = new DataInputStream(method.getResponseBodyAsStream()); } catch (IOException e) { throw new RuntimeException("get stream input io exception", e); } return method; } /** * 读取并处理数据 * * @throws IOException */ private void processLine() throws IOException { byte[] bytes = readLineBytes(); if ((bytes != null) && (bytes.length > 0)) { String message = new String(bytes); handleMessage(message); } } /** * 可以重写此方法解析message,同时需将lastMsgLocation赋值为最近1条消息的id * * @param message */ private void handleMessage(String message) { System.out.println(message); } /** * 读取数据 * * @return * @throws IOException */ public byte[] readLineBytes() throws IOException { byte[] result = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int readCount = 0; if ((recIndex > 0) && (read(bos))) { return bos.toByteArray(); } while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length - recIndex)) > 0) { recIndex = (recIndex + readCount); if (read(bos)) { break; } } result = bos.toByteArray(); if ((result == null) || ((result != null) && (result.length <= 0) && (recIndex <= 0))) { throw new IOException("++++ Stream appears to be dead, so closing it down"); } return result; } /** * 读数据到bos * * @param bos * @return */ private boolean read(ByteArrayOutputStream bos) { boolean result = false; int index = -1; for (int i = 0; i < recIndex - 1; i++) { // 13cr-回车 10lf-换行 if ((recBuf[i] == 13) && (recBuf[(i + 1)] == 10)) { index = i; break; } } if (index >= 0) { bos.write(recBuf, 0, index); byte[] newBuf = new byte[recBufSize]; if (recIndex > index + 2) { System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex - index - 2); } recBuf = newBuf; recIndex = (recIndex - index - 2); result = true; } else if (recBuf[(recIndex - 1)] == 13) { bos.write(recBuf, 0, recIndex - 1); Arrays.fill(recBuf, (byte) 0); recBuf[0] = 13; recIndex = 1; } else { bos.write(recBuf, 0, recIndex); Arrays.fill(recBuf, (byte) 0); recIndex = 0; } return result; } } }
//pom.xml中内容如下: <dependencies> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> </dependencies>
文档更新时间: 2017-01-09