示例代码

跳转到: 导航, 搜索
(以“<pre> //从datapush服务器获取连接 //streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博 private List<String> s...”为内容创建页面)
 
第10行: 第10行:
 
//链接断开的重新链接机制
 
//链接断开的重新链接机制
 
if(lastMsgLocation > 0){
 
if(lastMsgLocation > 0){
targetURL += "&loc=" + lastMsgLocation;
+
targetURL += "&since_id=" + lastMsgLocation;
 
}
 
}
 
logger.info("StreamingReceiver http get url=" + targetURL);
 
logger.info("StreamingReceiver http get url=" + targetURL);

2014年5月5日 (一) 11:09的版本

        //从datapush服务器获取连接
	//streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博
	private List<String> streamingUrlList;
	//当前连接的streaming server,可以配置
	private int curStreamUrlIndex =0;
	private GetMethod connectStreamServer() throws StreamingException {
		String targetURL = streamingUrlList.get(curStreamUrlIndex);
		//从指定的location开始读取数据,保证读取数据的连续性,消息完整性
		//链接断开的重新链接机制
		if(lastMsgLocation > 0){
			targetURL += "&since_id=" + lastMsgLocation;
		}
		logger.info("StreamingReceiver http get url=" + targetURL);
		GetMethod method = new GetMethod(targetURL);
		int statusCode;
		try {
			statusCode = httpClient.executeMethod(method);
		} 
		catch(Exception e){
			//dealException;
		}
		if (statusCode != HttpStatus.SC_OK) {
			throw new StreamingException(".. ");
		}			
		try {
			inputStream = new 		DataInputStream(method.getResponseBodyAsStream());
		} catch (IOException e) {
			throw new StreamingException("get stream input io exception", e);
		}		
		return method;
	}

//开一个线程从服务器读取数据:
public void run() {
		while(true){
			GetMethod method=null;
			recIndex = 0;
			recBuf = new byte[recBufSize];
			try{
				method = connectStreamServer();
				while(true){
					processLine();
				}
			}catch(Exception e){
		//当连接断开时,重新连接
		logger.error("streaming process error ",e);
		curStreamUrlIndex=++curStreamUrlIndex % streamingUrlList.size();
			}finally{
					method.releaseConnection();			
				}
			}
		}
	}

	private void processLine() throws IOException{
		byte[] bytes = readLineBytes();
		if(bytes != null && bytes.length > 0){
			String message = new String(bytes);
				//handle the message you get
				handleMessage() ;
				lastMsgLocation=Message.getId();

			}
		}
	}

	public byte[] readLineBytes() throws IOException {
		byte[] result = null;
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		int readCount = 0;
		if (recIndex > 0 && read(bos)) {
			return bos.toByteArray();
		}
		while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length
				- recIndex)) > 0) {
			recIndex = recIndex + readCount;

			if (read(bos)) {
				break;
			}
		}

		result = bos.toByteArray();

		if (result == null
				|| (result != null && result.length <= 0 && recIndex <= 0)) {
			throw new IOException(
			"++++ Stream appears to be dead, so closing it down");
		}

		return result;
	}

	private boolean read(ByteArrayOutputStream bos) {
		boolean result = false;
		int index = -1;

		for (int i = 0; i < recIndex - 1; i++) {
			if (recBuf[i] == 13 && recBuf[i + 1] == 10) {//13 cr-回车 10 lf-换行
				index = i;
				break;
			}
		}

		if (index >= 0) {
			bos.write(recBuf, 0, index);
			byte[] newBuf = new byte[recBufSize];
			if (recIndex > index + 2) {
				System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex
						- index - 2);
			}
			recBuf = newBuf;
			recIndex = recIndex - index - 2;
			result = true;
		} else {
			if (recBuf[recIndex - 1] == 13) {
				bos.write(recBuf, 0, recIndex - 1);
				Arrays.fill(recBuf,(byte)0);
				recBuf[0] = 13;
				recIndex = 1;
			} else {
				bos.write(recBuf, 0, recIndex);
				Arrays.fill(recBuf,(byte)0);
				recIndex = 0;
			}
		}
		return result;
	}