示例代码
//从datapush服务器获取连接
//streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博
private List<String> streamingUrlList;
//当前连接的streaming server,可以配置
private int curStreamUrlIndex =0;
private GetMethod connectStreamServer() throws StreamingException {
String targetURL = streamingUrlList.get(curStreamUrlIndex);
//从指定的location开始读取数据,保证读取数据的连续性,消息完整性
//链接断开的重新链接机制
if(lastMsgLocation > 0){
targetURL += "&loc=" + lastMsgLocation;
}
logger.info("StreamingReceiver http get url=" + targetURL);
GetMethod method = new GetMethod(targetURL);
int statusCode;
try {
statusCode = httpClient.executeMethod(method);
}
catch(Exception e){
//dealException;
}
if (statusCode != HttpStatus.SC_OK) {
throw new StreamingException(".. ");
}
try {
inputStream = new DataInputStream(method.getResponseBodyAsStream());
} catch (IOException e) {
throw new StreamingException("get stream input io exception", e);
}
return method;
}
//开一个线程从服务器读取数据:
public void run() {
while(true){
GetMethod method=null;
recIndex = 0;
recBuf = new byte[recBufSize];
try{
method = connectStreamServer();
while(true){
processLine();
}
}catch(Exception e){
//当连接断开时,重新连接
logger.error("streaming process error ",e);
curStreamUrlIndex=++curStreamUrlIndex % streamingUrlList.size();
}finally{
method.releaseConnection();
}
}
}
}
private void processLine() throws IOException{
byte[] bytes = readLineBytes();
if(bytes != null && bytes.length > 0){
String message = new String(bytes);
//handle the message you get
handleMessage() ;
lastMsgLocation=Message.getId();
}
}
}
public byte[] readLineBytes() throws IOException {
byte[] result = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int readCount = 0;
if (recIndex > 0 && read(bos)) {
return bos.toByteArray();
}
while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length
- recIndex)) > 0) {
recIndex = recIndex + readCount;
if (read(bos)) {
break;
}
}
result = bos.toByteArray();
if (result == null
|| (result != null && result.length <= 0 && recIndex <= 0)) {
throw new IOException(
"++++ Stream appears to be dead, so closing it down");
}
return result;
}
private boolean read(ByteArrayOutputStream bos) {
boolean result = false;
int index = -1;
for (int i = 0; i < recIndex - 1; i++) {
if (recBuf[i] == 13 && recBuf[i + 1] == 10) {//13 cr-回车 10 lf-换行
index = i;
break;
}
}
if (index >= 0) {
bos.write(recBuf, 0, index);
byte[] newBuf = new byte[recBufSize];
if (recIndex > index + 2) {
System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex
- index - 2);
}
recBuf = newBuf;
recIndex = recIndex - index - 2;
result = true;
} else {
if (recBuf[recIndex - 1] == 13) {
bos.write(recBuf, 0, recIndex - 1);
Arrays.fill(recBuf,(byte)0);
recBuf[0] = 13;
recIndex = 1;
} else {
bos.write(recBuf, 0, recIndex);
Arrays.fill(recBuf,(byte)0);
recIndex = 0;
}
}
return result;
}