示例代码

跳转到: 导航, 搜索
第29行: 第29行:
 
         streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021");
 
         streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021");
 
         httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
 
         httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
         2);httpConnManager.getParams().setMaxTotalConnections(2);
+
         5);httpConnManager.getParams().setMaxTotalConnections(5);
 
         httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
 
         httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
 
         httpConnManager.getParams().setConnectionTimeout(10000);
 
         httpConnManager.getParams().setConnectionTimeout(10000);

2016年9月12日 (一) 17:22的版本

//java源代码如下:

importorg.apache.commons.httpclient.HostConfiguration;
importorg.apache.commons.httpclient.HttpClient;
importorg.apache.commons.httpclient.HttpStatus;
importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
importorg.apache.commons.httpclient.methods.GetMethod;
importjava.io.ByteArrayOutputStream;
importjava.io.DataInputStream;
importjava.io.IOException;importjava.util.ArrayList;
importjava.util.Arrays;importjava.util.List;

publicclassReceiveMessageTest{
    privateMultiThreadedHttpConnectionManagerhttpConnManager;
    privateHttpClienthttpClient=null;//从datapush服务器获取连接
    privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0;
    publicstaticlonglastMsgLocation=-1L;
    privateDataInputStreaminputStream;
    privatefinalintrecBufSize=256;privatebyte[
        
    ]recBuf=newbyte[
        recBufSize
    ];privateintrecIndex=0;publicstaticvoidmain(String[
        
    ]args){
        ReceiveMessageTesttest=newReceiveMessageTest();test.init();
    }publicvoidinit(){
        streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021");
        httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
        5);httpConnManager.getParams().setMaxTotalConnections(5);
        httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
        httpConnManager.getParams().setConnectionTimeout(10000);
        httpConnManager.getParams().setReceiveBufferSize(655350);
        httpClient=newHttpClient(httpConnManager);newReadTask().start();
    }classReadTaskextendsThread{
        privateGetMethodconnectStreamServer(){
            StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制
            if(lastMsgLocation>0){
                targetURL+="&since_id="+lastMsgLocation;
            }System.out.println("StreamingReceiver http get url="+targetURL);
             GetMethodmethod=newGetMethod(targetURL);intstatusCode=0;
            try{
                statusCode=httpClient.executeMethod(method);
            }catch(Exceptione){
                //dealException;
            }if(statusCode!=HttpStatus.SC_OK){
                thrownewRuntimeException(".. ");
            }try{
                inputStream=newDataInputStream(method.getResponseBodyAsStream());
            }catch(IOExceptione){
                thrownewRuntimeException("get stream input io exception",
                e);
            }returnmethod;
        }//开一个线程从服务器读取数据:publicvoidrun(){
            while(true){
                GetMethodmethod=null;recIndex=0;recBuf=newbyte[
                    recBufSize
                ];try{
                    method=connectStreamServer();while(true){
                        processLine();
                    }
                }catch(Exceptione){
                    //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage());
                    lastMsgLocation=lastMsgLocation+2;
                    curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size();
                }
            }
        }privatevoidprocessLine()throwsIOException{
            byte[
                
            ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){
                Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message);
            }
        }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){
            System.out.println(message);
        }publicbyte[
            
        ]readLineBytes()throwsIOException{
            byte[
                
            ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){
                returnbos.toByteArray();
            }while((readCount=inputStream.read(recBuf,
            recIndex,
            recBuf.length-recIndex))>0){
                recIndex=recIndex+readCount;if(read(bos)){
                    break;
                }
            }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){
                thrownewIOException("++++ Stream appears to be dead, so closing it down");
            }returnresult;
        }privatebooleanread(ByteArrayOutputStreambos){
            booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){
                if(recBuf[
                    i
                ]==13&&recBuf[
                    i+1
                ]==10){
                    //13cr-回车10lf-换行index=i;break;
                }
            }if(index>=0){
                bos.write(recBuf,
                0,
                index);byte[
                    
                ]newBuf=newbyte[
                    recBufSize
                ];if(recIndex>index+2){
                    System.arraycopy(recBuf,
                    index+2,
                    newBuf,
                    0,
                    recIndex-index-2);
                }recBuf=newBuf;recIndex=recIndex-index-2;result=true;
            }else{
                if(recBuf[
                    recIndex-1
                ]==13){
                    bos.write(recBuf,
                    0,
                    recIndex-1);Arrays.fill(recBuf,
                    (byte)0);recBuf[
                        0
                    ]=13;recIndex=1;
                }else{
                    bos.write(recBuf,
                    0,
                    recIndex);Arrays.fill(recBuf,
                    (byte)0);recIndex=0;
                }
            }returnresult;
        }
    }
} 


//pom.xml中内容如下:

 <dependencies>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
    </dependencies>