示例代码
(以“<pre> //从datapush服务器获取连接 //streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博 private List<String> s...”为内容创建页面) |
|||
第10行: | 第10行: | ||
//链接断开的重新链接机制 | //链接断开的重新链接机制 | ||
if(lastMsgLocation > 0){ | if(lastMsgLocation > 0){ | ||
− | targetURL += "& | + | targetURL += "&since_id=" + lastMsgLocation; |
} | } | ||
logger.info("StreamingReceiver http get url=" + targetURL); | logger.info("StreamingReceiver http get url=" + targetURL); |
2014年5月5日 (一) 11:09的版本
//从datapush服务器获取连接 //streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博 private List<String> streamingUrlList; //当前连接的streaming server,可以配置 private int curStreamUrlIndex =0; private GetMethod connectStreamServer() throws StreamingException { String targetURL = streamingUrlList.get(curStreamUrlIndex); //从指定的location开始读取数据,保证读取数据的连续性,消息完整性 //链接断开的重新链接机制 if(lastMsgLocation > 0){ targetURL += "&since_id=" + lastMsgLocation; } logger.info("StreamingReceiver http get url=" + targetURL); GetMethod method = new GetMethod(targetURL); int statusCode; try { statusCode = httpClient.executeMethod(method); } catch(Exception e){ //dealException; } if (statusCode != HttpStatus.SC_OK) { throw new StreamingException(".. "); } try { inputStream = new DataInputStream(method.getResponseBodyAsStream()); } catch (IOException e) { throw new StreamingException("get stream input io exception", e); } return method; } //开一个线程从服务器读取数据: public void run() { while(true){ GetMethod method=null; recIndex = 0; recBuf = new byte[recBufSize]; try{ method = connectStreamServer(); while(true){ processLine(); } }catch(Exception e){ //当连接断开时,重新连接 logger.error("streaming process error ",e); curStreamUrlIndex=++curStreamUrlIndex % streamingUrlList.size(); }finally{ method.releaseConnection(); } } } } private void processLine() throws IOException{ byte[] bytes = readLineBytes(); if(bytes != null && bytes.length > 0){ String message = new String(bytes); //handle the message you get handleMessage() ; lastMsgLocation=Message.getId(); } } } public byte[] readLineBytes() throws IOException { byte[] result = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int readCount = 0; if (recIndex > 0 && read(bos)) { return bos.toByteArray(); } while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length - recIndex)) > 0) { recIndex = recIndex + readCount; if (read(bos)) { break; } } result = bos.toByteArray(); if (result == null || (result != null && result.length <= 0 && recIndex <= 0)) { throw new IOException( "++++ Stream appears to be dead, so closing it down"); } return result; } private boolean read(ByteArrayOutputStream bos) { boolean result = false; int index = -1; for (int i = 0; i < recIndex - 1; i++) { if (recBuf[i] == 13 && recBuf[i + 1] == 10) {//13 cr-回车 10 lf-换行 index = i; break; } } if (index >= 0) { bos.write(recBuf, 0, index); byte[] newBuf = new byte[recBufSize]; if (recIndex > index + 2) { System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex - index - 2); } recBuf = newBuf; recIndex = recIndex - index - 2; result = true; } else { if (recBuf[recIndex - 1] == 13) { bos.write(recBuf, 0, recIndex - 1); Arrays.fill(recBuf,(byte)0); recBuf[0] = 13; recIndex = 1; } else { bos.write(recBuf, 0, recIndex); Arrays.fill(recBuf,(byte)0); recIndex = 0; } } return result; }