新版示例代码
(以“<pre> //java源代码如下: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HostConfiguration; imp...”为内容创建页面) |
|||
(未显示2个用户的4个中间版本) | |||
第1行: | 第1行: | ||
<pre> | <pre> | ||
//java源代码如下: | //java源代码如下: | ||
− | |||
import com.alibaba.fastjson.JSON; | import com.alibaba.fastjson.JSON; | ||
import com.alibaba.fastjson.JSONObject; | import com.alibaba.fastjson.JSONObject; | ||
− | |||
− | |||
import org.apache.commons.httpclient.HttpStatus; | import org.apache.commons.httpclient.HttpStatus; | ||
− | |||
− | |||
import java.io.ByteArrayOutputStream; | import java.io.ByteArrayOutputStream; | ||
import java.io.DataInputStream; | import java.io.DataInputStream; | ||
import java.io.IOException; | import java.io.IOException; | ||
+ | import java.net.HttpURLConnection; | ||
+ | import java.net.URL; | ||
import java.util.Arrays; | import java.util.Arrays; | ||
+ | import java.util.Date; | ||
public class ReceiveMessageServiceTest { | public class ReceiveMessageServiceTest { | ||
private static transient long sinceId = -1L; | private static transient long sinceId = -1L; | ||
private final int recBufSize = 256; | private final int recBufSize = 256; | ||
− | |||
private String receiveCommentUrl; | private String receiveCommentUrl; | ||
private DataInputStream inputStream; | private DataInputStream inputStream; | ||
第25行: | 第22行: | ||
public static void main(String[] args) { | public static void main(String[] args) { | ||
− | + | new ReceiveMessageServiceTest().init(); | |
− | + | ||
} | } | ||
/** | /** | ||
− | * | + | * 启动获取数据线程 |
*/ | */ | ||
public void init() { | public void init() { | ||
− | receiveCommentUrl = "https://c.api.weibo.com/commercial/push?subid= | + | receiveCommentUrl = "https://c.api.weibo.com/commercial/push?subid=10826"; |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
new ReadTask().start(); | new ReadTask().start(); | ||
} | } | ||
第56行: | 第45行: | ||
boolean hasError = false; | boolean hasError = false; | ||
while (!hasError) { | while (!hasError) { | ||
− | + | HttpURLConnection connection = null; | |
recIndex = 0; | recIndex = 0; | ||
recBuf = new byte[recBufSize]; | recBuf = new byte[recBufSize]; | ||
try { | try { | ||
− | + | connection = connectServer(sinceId); | |
while (true) { | while (true) { | ||
processLine(); | processLine(); | ||
第72行: | 第61行: | ||
System.out.println("last since_id: " + sinceId); | System.out.println("last since_id: " + sinceId); | ||
} finally { | } finally { | ||
− | if ( | + | |
− | + | if (inputStream != null) { | |
+ | try { | ||
+ | inputStream.close(); | ||
+ | } catch (IOException e) { | ||
+ | e.printStackTrace(); | ||
+ | } | ||
+ | } | ||
+ | if (connection != null) { | ||
+ | connection.disconnect(); | ||
} | } | ||
+ | System.out.println(new Date().toString()); | ||
} | } | ||
} | } | ||
第84行: | 第82行: | ||
* @return | * @return | ||
*/ | */ | ||
− | private | + | private HttpURLConnection connectServer(long sinceId) throws Exception { |
String targetURL = receiveCommentUrl; | String targetURL = receiveCommentUrl; | ||
// 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 | // 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 | ||
第92行: | 第90行: | ||
System.out.println("get url: " + targetURL); | System.out.println("get url: " + targetURL); | ||
− | + | URL url = new URL(targetURL); | |
+ | HttpURLConnection connection = (HttpURLConnection) url.openConnection(); | ||
int statusCode; | int statusCode; | ||
try { | try { | ||
− | statusCode = | + | statusCode = connection.getResponseCode(); |
} catch (Exception e) { | } catch (Exception e) { | ||
− | + | connection.disconnect(); | |
throw new Exception("stream url connect failed", e); | throw new Exception("stream url connect failed", e); | ||
} | } | ||
if (statusCode != HttpStatus.SC_OK) { | if (statusCode != HttpStatus.SC_OK) { | ||
− | throw new RuntimeException( | + | throw new RuntimeException(connection.getResponseMessage()); |
} | } | ||
try { | try { | ||
− | inputStream = new DataInputStream( | + | inputStream = new DataInputStream(connection.getInputStream()); |
} catch (IOException e) { | } catch (IOException e) { | ||
throw new RuntimeException("get stream input io exception", e); | throw new RuntimeException("get stream input io exception", e); | ||
} | } | ||
− | return | + | return connection; |
} | } | ||
第205行: | 第204行: | ||
} | } | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− |
2018年7月13日 (五) 11:19的最后版本
//java源代码如下: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpStatus; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.Date; public class ReceiveMessageServiceTest { private static transient long sinceId = -1L; private final int recBufSize = 256; private String receiveCommentUrl; private DataInputStream inputStream; private byte[] recBuf; private int recIndex; public static void main(String[] args) { new ReceiveMessageServiceTest().init(); } /** * 启动获取数据线程 */ public void init() { receiveCommentUrl = "https://c.api.weibo.com/commercial/push?subid=10826"; new ReadTask().start(); } /** * 获取数据线程 */ class ReadTask extends Thread { /** * 启一个线程从服务器读取数据 */ @Override public void run() { boolean hasError = false; while (!hasError) { HttpURLConnection connection = null; recIndex = 0; recBuf = new byte[recBufSize]; try { connection = connectServer(sinceId); while (true) { processLine(); } } catch (Exception e) { // 当连接断开时,重新连接 System.out.println("connection close: " + e.getMessage()); if (e.getMessage().contains("errorCode")) { hasError = true; } System.out.println("last since_id: " + sinceId); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { connection.disconnect(); } System.out.println(new Date().toString()); } } } /** * 建立http连接 * * @return */ private HttpURLConnection connectServer(long sinceId) throws Exception { String targetURL = receiveCommentUrl; // 从指定的since_id开始读取数据,保证读取数据的连续性,消息完整性 if (sinceId > 0L) { targetURL = targetURL + "&since_id=" + sinceId; } System.out.println("get url: " + targetURL); URL url = new URL(targetURL); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); int statusCode; try { statusCode = connection.getResponseCode(); } catch (Exception e) { connection.disconnect(); throw new Exception("stream url connect failed", e); } if (statusCode != HttpStatus.SC_OK) { throw new RuntimeException(connection.getResponseMessage()); } try { inputStream = new DataInputStream(connection.getInputStream()); } catch (IOException e) { throw new RuntimeException("get stream input io exception", e); } return connection; } /** * 读取并处理数据 * * @throws IOException */ private void processLine() throws IOException { byte[] bytes = readLineBytes(); if ((bytes != null) && (bytes.length > 0)) { String message = new String(bytes); handleMessage(message); } } /** * 可以重写此方法解析message * * @param message */ private void handleMessage(String message) { System.out.println(message); JSONObject jsonObject = JSON.parseObject(message); sinceId = jsonObject.getLongValue("id"); } /** * 读取数据 * * @return * @throws IOException */ public byte[] readLineBytes() throws IOException { byte[] result; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int readCount; if ((recIndex > 0) && (read(bos))) { return bos.toByteArray(); } while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length - recIndex)) > 0) { recIndex = (recIndex + readCount); if (read(bos)) { break; } } result = bos.toByteArray(); if (result == null || result.length <= 0 && recIndex <= 0) { throw new IOException("no data in 5 second"); } return result; } /** * 读数据到bos * * @param bos * @return */ private boolean read(ByteArrayOutputStream bos) { boolean result = false; int index = -1; for (int i = 0; i < recIndex - 1; i++) { // 13cr-回车 10lf-换行 if ((recBuf[i] == 13) && (recBuf[(i + 1)] == 10)) { index = i; break; } } if (index >= 0) { bos.write(recBuf, 0, index); byte[] newBuf = new byte[recBufSize]; if (recIndex > index + 2) { System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex - index - 2); } recBuf = newBuf; recIndex = (recIndex - index - 2); result = true; } else if (recBuf[(recIndex - 1)] == 13) { bos.write(recBuf, 0, recIndex - 1); Arrays.fill(recBuf, (byte) 0); recBuf[0] = 13; recIndex = 1; } else { bos.write(recBuf, 0, recIndex); Arrays.fill(recBuf, (byte) 0); recIndex = 0; } return result; } } }
文档更新时间: 2018-07-13