示例代码
第1行: | 第1行: | ||
<pre> | <pre> | ||
− | + | importorg.apache.commons.httpclient.HostConfiguration; | |
− | + | importorg.apache.commons.httpclient.HttpClient; | |
− | + | importorg.apache.commons.httpclient.HttpStatus; | |
− | + | importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager; | |
− | + | importorg.apache.commons.httpclient.methods.GetMethod; | |
− | + | importjava.io.ByteArrayOutputStream; | |
− | + | importjava.io.DataInputStream; | |
− | + | importjava.io.IOException;importjava.util.ArrayList; | |
− | + | importjava.util.Arrays;importjava.util.List; | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | // | + | publicclassReceiveMessageTest{ |
− | + | privateMultiThreadedHttpConnectionManagerhttpConnManager; | |
− | + | privateHttpClienthttpClient=null;//从datapush服务器获取连接 | |
− | + | privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0; | |
− | + | publicstaticlonglastMsgLocation=-1L; | |
− | + | privateDataInputStreaminputStream; | |
− | + | privatefinalintrecBufSize=256;privatebyte[ | |
− | + | ||
− | + | ]recBuf=newbyte[ | |
− | + | recBufSize | |
− | + | ];privateintrecIndex=0;publicstaticvoidmain(String[ | |
− | + | ||
− | + | ]args){ | |
− | + | ReceiveMessageTesttest=newReceiveMessageTest();test.init(); | |
− | + | }publicvoidinit(){ | |
− | + | streamingUrlList.add("https://c.api.weibo.com/2/datapush/comment.json?subid=19021&access_token=xxx"); | |
− | + | httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, | |
− | + | 2);httpConnManager.getParams().setMaxTotalConnections(2); | |
− | + | httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE); | |
− | + | httpConnManager.getParams().setConnectionTimeout(10000); | |
− | + | httpConnManager.getParams().setReceiveBufferSize(655350); | |
+ | httpClient=newHttpClient(httpConnManager);newReadTask().start(); | ||
+ | }classReadTaskextendsThread{ | ||
+ | privateGetMethodconnectStreamServer(){ | ||
+ | StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制 | ||
+ | if(lastMsgLocation>0){ | ||
+ | targetURL+="&since_id="+lastMsgLocation; | ||
+ | }System.out.println("StreamingReceiver http get url="+targetURL); | ||
+ | GetMethodmethod=newGetMethod(targetURL);intstatusCode=0; | ||
+ | try{ | ||
+ | statusCode=httpClient.executeMethod(method); | ||
+ | }catch(Exceptione){ | ||
+ | //dealException; | ||
+ | }if(statusCode!=HttpStatus.SC_OK){ | ||
+ | thrownewRuntimeException(".. "); | ||
+ | }try{ | ||
+ | inputStream=newDataInputStream(method.getResponseBodyAsStream()); | ||
+ | }catch(IOExceptione){ | ||
+ | thrownewRuntimeException("get stream input io exception", | ||
+ | e); | ||
+ | }returnmethod; | ||
+ | }//开一个线程从服务器读取数据:publicvoidrun(){ | ||
+ | while(true){ | ||
+ | GetMethodmethod=null;recIndex=0;recBuf=newbyte[ | ||
+ | recBufSize | ||
+ | ];try{ | ||
+ | method=connectStreamServer();while(true){ | ||
+ | processLine(); | ||
+ | } | ||
+ | }catch(Exceptione){ | ||
+ | //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage()); | ||
+ | lastMsgLocation=lastMsgLocation+2; | ||
+ | curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size(); | ||
+ | } | ||
+ | } | ||
+ | }privatevoidprocessLine()throwsIOException{ | ||
+ | byte[ | ||
+ | |||
+ | ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){ | ||
+ | Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message); | ||
+ | } | ||
+ | }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){ | ||
+ | System.out.println(message); | ||
+ | }publicbyte[ | ||
+ | |||
+ | ]readLineBytes()throwsIOException{ | ||
+ | byte[ | ||
+ | |||
+ | ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){ | ||
+ | returnbos.toByteArray(); | ||
+ | }while((readCount=inputStream.read(recBuf, | ||
+ | recIndex, | ||
+ | recBuf.length-recIndex))>0){ | ||
+ | recIndex=recIndex+readCount;if(read(bos)){ | ||
+ | break; | ||
+ | } | ||
+ | }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){ | ||
+ | thrownewIOException("++++ Stream appears to be dead, so closing it down"); | ||
+ | }returnresult; | ||
+ | }privatebooleanread(ByteArrayOutputStreambos){ | ||
+ | booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){ | ||
+ | if(recBuf[ | ||
+ | i | ||
+ | ]==13&&recBuf[ | ||
+ | i+1 | ||
+ | ]==10){ | ||
+ | //13cr-回车10lf-换行index=i;break; | ||
+ | } | ||
+ | }if(index>=0){ | ||
+ | bos.write(recBuf, | ||
+ | 0, | ||
+ | index);byte[ | ||
+ | |||
+ | ]newBuf=newbyte[ | ||
+ | recBufSize | ||
+ | ];if(recIndex>index+2){ | ||
+ | System.arraycopy(recBuf, | ||
+ | index+2, | ||
+ | newBuf, | ||
+ | 0, | ||
+ | recIndex-index-2); | ||
+ | }recBuf=newBuf;recIndex=recIndex-index-2;result=true; | ||
+ | }else{ | ||
+ | if(recBuf[ | ||
+ | recIndex-1 | ||
+ | ]==13){ | ||
+ | bos.write(recBuf, | ||
+ | 0, | ||
+ | recIndex-1);Arrays.fill(recBuf, | ||
+ | (byte)0);recBuf[ | ||
+ | 0 | ||
+ | ]=13;recIndex=1; | ||
+ | }else{ | ||
+ | bos.write(recBuf, | ||
+ | 0, | ||
+ | recIndex);Arrays.fill(recBuf, | ||
+ | (byte)0);recIndex=0; | ||
+ | } | ||
+ | }returnresult; | ||
+ | } | ||
+ | } | ||
+ | } | ||
− | + | </pre> | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | <pre> | |
− | + | //pom.xml中内容如下: | |
− | + | <dependencies> | |
− | + | <dependency> | |
− | + | <groupId>commons-logging</groupId> | |
− | + | <artifactId>commons-logging</artifactId> | |
− | + | <version>1.1.1</version> | |
− | + | </dependency> | |
− | + | <dependency> | |
− | + | <groupId>commons-codec</groupId> | |
− | + | <artifactId>commons-codec</artifactId> | |
− | + | <version>1.4</version> | |
− | + | </dependency> | |
− | + | <dependency> | |
− | + | <groupId>commons-httpclient</groupId> | |
− | + | <artifactId>commons-httpclient</artifactId> | |
− | + | <version>3.1</version> | |
− | + | </dependency> | |
− | + | </dependencies> | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
</pre> | </pre> |
2014年11月18日 (二) 14:59的版本
importorg.apache.commons.httpclient.HostConfiguration; importorg.apache.commons.httpclient.HttpClient; importorg.apache.commons.httpclient.HttpStatus; importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager; importorg.apache.commons.httpclient.methods.GetMethod; importjava.io.ByteArrayOutputStream; importjava.io.DataInputStream; importjava.io.IOException;importjava.util.ArrayList; importjava.util.Arrays;importjava.util.List; publicclassReceiveMessageTest{ privateMultiThreadedHttpConnectionManagerhttpConnManager; privateHttpClienthttpClient=null;//从datapush服务器获取连接 privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0; publicstaticlonglastMsgLocation=-1L; privateDataInputStreaminputStream; privatefinalintrecBufSize=256;privatebyte[ ]recBuf=newbyte[ recBufSize ];privateintrecIndex=0;publicstaticvoidmain(String[ ]args){ ReceiveMessageTesttest=newReceiveMessageTest();test.init(); }publicvoidinit(){ streamingUrlList.add("https://c.api.weibo.com/2/datapush/comment.json?subid=19021&access_token=xxx"); httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, 2);httpConnManager.getParams().setMaxTotalConnections(2); httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE); httpConnManager.getParams().setConnectionTimeout(10000); httpConnManager.getParams().setReceiveBufferSize(655350); httpClient=newHttpClient(httpConnManager);newReadTask().start(); }classReadTaskextendsThread{ privateGetMethodconnectStreamServer(){ StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制 if(lastMsgLocation>0){ targetURL+="&since_id="+lastMsgLocation; }System.out.println("StreamingReceiver http get url="+targetURL); GetMethodmethod=newGetMethod(targetURL);intstatusCode=0; try{ statusCode=httpClient.executeMethod(method); }catch(Exceptione){ //dealException; }if(statusCode!=HttpStatus.SC_OK){ thrownewRuntimeException(".. "); }try{ inputStream=newDataInputStream(method.getResponseBodyAsStream()); }catch(IOExceptione){ thrownewRuntimeException("get stream input io exception", e); }returnmethod; }//开一个线程从服务器读取数据:publicvoidrun(){ while(true){ GetMethodmethod=null;recIndex=0;recBuf=newbyte[ recBufSize ];try{ method=connectStreamServer();while(true){ processLine(); } }catch(Exceptione){ //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage()); lastMsgLocation=lastMsgLocation+2; curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size(); } } }privatevoidprocessLine()throwsIOException{ byte[ ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){ Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message); } }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){ System.out.println(message); }publicbyte[ ]readLineBytes()throwsIOException{ byte[ ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){ returnbos.toByteArray(); }while((readCount=inputStream.read(recBuf, recIndex, recBuf.length-recIndex))>0){ recIndex=recIndex+readCount;if(read(bos)){ break; } }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){ thrownewIOException("++++ Stream appears to be dead, so closing it down"); }returnresult; }privatebooleanread(ByteArrayOutputStreambos){ booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){ if(recBuf[ i ]==13&&recBuf[ i+1 ]==10){ //13cr-回车10lf-换行index=i;break; } }if(index>=0){ bos.write(recBuf, 0, index);byte[ ]newBuf=newbyte[ recBufSize ];if(recIndex>index+2){ System.arraycopy(recBuf, index+2, newBuf, 0, recIndex-index-2); }recBuf=newBuf;recIndex=recIndex-index-2;result=true; }else{ if(recBuf[ recIndex-1 ]==13){ bos.write(recBuf, 0, recIndex-1);Arrays.fill(recBuf, (byte)0);recBuf[ 0 ]=13;recIndex=1; }else{ bos.write(recBuf, 0, recIndex);Arrays.fill(recBuf, (byte)0);recIndex=0; } }returnresult; } } }
//pom.xml中内容如下: <dependencies> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> </dependencies>