示例代码
importorg.apache.commons.httpclient.HostConfiguration;
importorg.apache.commons.httpclient.HttpClient;
importorg.apache.commons.httpclient.HttpStatus;
importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
importorg.apache.commons.httpclient.methods.GetMethod;
importjava.io.ByteArrayOutputStream;
importjava.io.DataInputStream;
importjava.io.IOException;importjava.util.ArrayList;
importjava.util.Arrays;importjava.util.List;
publicclassReceiveMessageTest{
privateMultiThreadedHttpConnectionManagerhttpConnManager;
privateHttpClienthttpClient=null;//从datapush服务器获取连接
privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0;
publicstaticlonglastMsgLocation=-1L;
privateDataInputStreaminputStream;
privatefinalintrecBufSize=256;privatebyte[
]recBuf=newbyte[
recBufSize
];privateintrecIndex=0;publicstaticvoidmain(String[
]args){
ReceiveMessageTesttest=newReceiveMessageTest();test.init();
}publicvoidinit(){
streamingUrlList.add("https://c.api.weibo.com/2/datapush/comment.json?subid=19021&access_token=xxx");
httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
2);httpConnManager.getParams().setMaxTotalConnections(2);
httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
httpConnManager.getParams().setConnectionTimeout(10000);
httpConnManager.getParams().setReceiveBufferSize(655350);
httpClient=newHttpClient(httpConnManager);newReadTask().start();
}classReadTaskextendsThread{
privateGetMethodconnectStreamServer(){
StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制
if(lastMsgLocation>0){
targetURL+="&since_id="+lastMsgLocation;
}System.out.println("StreamingReceiver http get url="+targetURL);
GetMethodmethod=newGetMethod(targetURL);intstatusCode=0;
try{
statusCode=httpClient.executeMethod(method);
}catch(Exceptione){
//dealException;
}if(statusCode!=HttpStatus.SC_OK){
thrownewRuntimeException(".. ");
}try{
inputStream=newDataInputStream(method.getResponseBodyAsStream());
}catch(IOExceptione){
thrownewRuntimeException("get stream input io exception",
e);
}returnmethod;
}//开一个线程从服务器读取数据:publicvoidrun(){
while(true){
GetMethodmethod=null;recIndex=0;recBuf=newbyte[
recBufSize
];try{
method=connectStreamServer();while(true){
processLine();
}
}catch(Exceptione){
//当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage());
lastMsgLocation=lastMsgLocation+2;
curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size();
}
}
}privatevoidprocessLine()throwsIOException{
byte[
]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){
Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message);
}
}//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){
System.out.println(message);
}publicbyte[
]readLineBytes()throwsIOException{
byte[
]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){
returnbos.toByteArray();
}while((readCount=inputStream.read(recBuf,
recIndex,
recBuf.length-recIndex))>0){
recIndex=recIndex+readCount;if(read(bos)){
break;
}
}result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){
thrownewIOException("++++ Stream appears to be dead, so closing it down");
}returnresult;
}privatebooleanread(ByteArrayOutputStreambos){
booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){
if(recBuf[
i
]==13&&recBuf[
i+1
]==10){
//13cr-回车10lf-换行index=i;break;
}
}if(index>=0){
bos.write(recBuf,
0,
index);byte[
]newBuf=newbyte[
recBufSize
];if(recIndex>index+2){
System.arraycopy(recBuf,
index+2,
newBuf,
0,
recIndex-index-2);
}recBuf=newBuf;recIndex=recIndex-index-2;result=true;
}else{
if(recBuf[
recIndex-1
]==13){
bos.write(recBuf,
0,
recIndex-1);Arrays.fill(recBuf,
(byte)0);recBuf[
0
]=13;recIndex=1;
}else{
bos.write(recBuf,
0,
recIndex);Arrays.fill(recBuf,
(byte)0);recIndex=0;
}
}returnresult;
}
}
}
//pom.xml中内容如下:
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>