示例代码

跳转到: 导航, 搜索
第1行: 第1行:
 
<pre>
 
<pre>
        //从datapush服务器获取连接
+
importorg.apache.commons.httpclient.HostConfiguration;
//streaming server列表,可以配置,当前sever 180.149.153.38,180.149.153.39微博
+
importorg.apache.commons.httpclient.HttpClient;
private List<String> streamingUrlList;
+
importorg.apache.commons.httpclient.HttpStatus;
//当前连接的streaming server,可以配置
+
importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
private int curStreamUrlIndex =0;
+
importorg.apache.commons.httpclient.methods.GetMethod;
private GetMethod connectStreamServer() throws StreamingException {
+
importjava.io.ByteArrayOutputStream;
String targetURL = streamingUrlList.get(curStreamUrlIndex);
+
importjava.io.DataInputStream;
//从指定的location开始读取数据,保证读取数据的连续性,消息完整性
+
importjava.io.IOException;importjava.util.ArrayList;
//链接断开的重新链接机制
+
importjava.util.Arrays;importjava.util.List;
if(lastMsgLocation > 0){
+
targetURL += "&since_id=" + lastMsgLocation;
+
}
+
logger.info("StreamingReceiver http get url=" + targetURL);
+
GetMethod method = new GetMethod(targetURL);
+
int statusCode;
+
try {
+
statusCode = httpClient.executeMethod(method);
+
}
+
catch(Exception e){
+
//dealException;
+
}
+
if (statusCode != HttpStatus.SC_OK) {
+
throw new StreamingException(".. ");
+
}
+
try {
+
inputStream = new DataInputStream(method.getResponseBodyAsStream());
+
} catch (IOException e) {
+
throw new StreamingException("get stream input io exception", e);
+
}
+
return method;
+
}
+
  
//开一个线程从服务器读取数据:
+
publicclassReceiveMessageTest{
public void run() {
+
    privateMultiThreadedHttpConnectionManagerhttpConnManager;
while(true){
+
    privateHttpClienthttpClient=null;//从datapush服务器获取连接
GetMethod method=null;
+
    privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0;
recIndex = 0;
+
    publicstaticlonglastMsgLocation=-1L;
recBuf = new byte[recBufSize];
+
    privateDataInputStreaminputStream;
try{
+
    privatefinalintrecBufSize=256;privatebyte[
method = connectStreamServer();
+
       
while(true){
+
    ]recBuf=newbyte[
processLine();
+
        recBufSize
}
+
    ];privateintrecIndex=0;publicstaticvoidmain(String[
}catch(Exception e){
+
       
//当连接断开时,重新连接
+
    ]args){
logger.error("streaming process error ",e);
+
        ReceiveMessageTesttest=newReceiveMessageTest();test.init();
curStreamUrlIndex=++curStreamUrlIndex % streamingUrlList.size();
+
    }publicvoidinit(){
}finally{
+
        streamingUrlList.add("https://c.api.weibo.com/2/datapush/comment.json?subid=19021&access_token=xxx");
method.releaseConnection();
+
        httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
}
+
        2);httpConnManager.getParams().setMaxTotalConnections(2);
}
+
        httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
}
+
        httpConnManager.getParams().setConnectionTimeout(10000);
}
+
        httpConnManager.getParams().setReceiveBufferSize(655350);
 +
        httpClient=newHttpClient(httpConnManager);newReadTask().start();
 +
    }classReadTaskextendsThread{
 +
        privateGetMethodconnectStreamServer(){
 +
            StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制
 +
            if(lastMsgLocation>0){
 +
                targetURL+="&since_id="+lastMsgLocation;
 +
            }System.out.println("StreamingReceiver http get url="+targetURL);
 +
            GetMethodmethod=newGetMethod(targetURL);intstatusCode=0;
 +
            try{
 +
                statusCode=httpClient.executeMethod(method);
 +
            }catch(Exceptione){
 +
                //dealException;
 +
            }if(statusCode!=HttpStatus.SC_OK){
 +
                thrownewRuntimeException(".. ");
 +
            }try{
 +
                inputStream=newDataInputStream(method.getResponseBodyAsStream());
 +
            }catch(IOExceptione){
 +
                thrownewRuntimeException("get stream input io exception",
 +
                e);
 +
            }returnmethod;
 +
        }//开一个线程从服务器读取数据:publicvoidrun(){
 +
            while(true){
 +
                GetMethodmethod=null;recIndex=0;recBuf=newbyte[
 +
                    recBufSize
 +
                ];try{
 +
                    method=connectStreamServer();while(true){
 +
                        processLine();
 +
                    }
 +
                }catch(Exceptione){
 +
                    //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage());
 +
                    lastMsgLocation=lastMsgLocation+2;
 +
                    curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size();
 +
                }
 +
            }
 +
        }privatevoidprocessLine()throwsIOException{
 +
            byte[
 +
               
 +
            ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){
 +
                Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message);
 +
            }
 +
        }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){
 +
            System.out.println(message);
 +
        }publicbyte[
 +
           
 +
        ]readLineBytes()throwsIOException{
 +
            byte[
 +
               
 +
            ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){
 +
                returnbos.toByteArray();
 +
            }while((readCount=inputStream.read(recBuf,
 +
            recIndex,
 +
            recBuf.length-recIndex))>0){
 +
                recIndex=recIndex+readCount;if(read(bos)){
 +
                    break;
 +
                }
 +
            }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){
 +
                thrownewIOException("++++ Stream appears to be dead, so closing it down");
 +
            }returnresult;
 +
        }privatebooleanread(ByteArrayOutputStreambos){
 +
            booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){
 +
                if(recBuf[
 +
                    i
 +
                ]==13&&recBuf[
 +
                    i+1
 +
                ]==10){
 +
                    //13cr-回车10lf-换行index=i;break;
 +
                }
 +
            }if(index>=0){
 +
                bos.write(recBuf,
 +
                0,
 +
                index);byte[
 +
                   
 +
                ]newBuf=newbyte[
 +
                    recBufSize
 +
                ];if(recIndex>index+2){
 +
                    System.arraycopy(recBuf,
 +
                    index+2,
 +
                    newBuf,
 +
                    0,
 +
                    recIndex-index-2);
 +
                }recBuf=newBuf;recIndex=recIndex-index-2;result=true;
 +
            }else{
 +
                if(recBuf[
 +
                    recIndex-1
 +
                ]==13){
 +
                    bos.write(recBuf,
 +
                    0,
 +
                    recIndex-1);Arrays.fill(recBuf,
 +
                    (byte)0);recBuf[
 +
                        0
 +
                    ]=13;recIndex=1;
 +
                }else{
 +
                    bos.write(recBuf,
 +
                    0,
 +
                    recIndex);Arrays.fill(recBuf,
 +
                    (byte)0);recIndex=0;
 +
                }
 +
            }returnresult;
 +
        }
 +
    }
 +
}  
  
private void processLine() throws IOException{
+
</pre>
byte[] bytes = readLineBytes();
+
if(bytes != null && bytes.length > 0){
+
String message = new String(bytes);
+
//handle the message you get
+
handleMessage() ;
+
lastMsgLocation=Message.getId();
+
  
}
+
<pre>
}
+
//pom.xml中内容如下:
}
+
<dependencies>
 
+
        <dependency>
public byte[] readLineBytes() throws IOException {
+
            <groupId>commons-logging</groupId>
byte[] result = null;
+
            <artifactId>commons-logging</artifactId>
ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
            <version>1.1.1</version>
int readCount = 0;
+
        </dependency>
if (recIndex > 0 && read(bos)) {
+
        <dependency>
return bos.toByteArray();
+
            <groupId>commons-codec</groupId>
}
+
            <artifactId>commons-codec</artifactId>
while ((readCount = inputStream.read(recBuf, recIndex, recBuf.length
+
            <version>1.4</version>
- recIndex)) > 0) {
+
        </dependency>
recIndex = recIndex + readCount;
+
        <dependency>
 
+
            <groupId>commons-httpclient</groupId>
if (read(bos)) {
+
            <artifactId>commons-httpclient</artifactId>
break;
+
            <version>3.1</version>
}
+
        </dependency>
}
+
    </dependencies>
 
+
result = bos.toByteArray();
+
 
+
if (result == null
+
|| (result != null && result.length <= 0 && recIndex <= 0)) {
+
throw new IOException(
+
"++++ Stream appears to be dead, so closing it down");
+
}
+
 
+
return result;
+
}
+
 
+
private boolean read(ByteArrayOutputStream bos) {
+
boolean result = false;
+
int index = -1;
+
 
+
for (int i = 0; i < recIndex - 1; i++) {
+
if (recBuf[i] == 13 && recBuf[i + 1] == 10) {//13 cr-回车 10 lf-换行
+
index = i;
+
break;
+
}
+
}
+
 
+
if (index >= 0) {
+
bos.write(recBuf, 0, index);
+
byte[] newBuf = new byte[recBufSize];
+
if (recIndex > index + 2) {
+
System.arraycopy(recBuf, index + 2, newBuf, 0, recIndex
+
- index - 2);
+
}
+
recBuf = newBuf;
+
recIndex = recIndex - index - 2;
+
result = true;
+
} else {
+
if (recBuf[recIndex - 1] == 13) {
+
bos.write(recBuf, 0, recIndex - 1);
+
Arrays.fill(recBuf,(byte)0);
+
recBuf[0] = 13;
+
recIndex = 1;
+
} else {
+
bos.write(recBuf, 0, recIndex);
+
Arrays.fill(recBuf,(byte)0);
+
recIndex = 0;
+
}
+
}
+
return result;
+
}
+
  
 
</pre>
 
</pre>

2014年11月18日 (二) 14:59的版本

importorg.apache.commons.httpclient.HostConfiguration;
importorg.apache.commons.httpclient.HttpClient;
importorg.apache.commons.httpclient.HttpStatus;
importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
importorg.apache.commons.httpclient.methods.GetMethod;
importjava.io.ByteArrayOutputStream;
importjava.io.DataInputStream;
importjava.io.IOException;importjava.util.ArrayList;
importjava.util.Arrays;importjava.util.List;

publicclassReceiveMessageTest{
    privateMultiThreadedHttpConnectionManagerhttpConnManager;
    privateHttpClienthttpClient=null;//从datapush服务器获取连接
    privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0;
    publicstaticlonglastMsgLocation=-1L;
    privateDataInputStreaminputStream;
    privatefinalintrecBufSize=256;privatebyte[
        
    ]recBuf=newbyte[
        recBufSize
    ];privateintrecIndex=0;publicstaticvoidmain(String[
        
    ]args){
        ReceiveMessageTesttest=newReceiveMessageTest();test.init();
    }publicvoidinit(){
        streamingUrlList.add("https://c.api.weibo.com/2/datapush/comment.json?subid=19021&access_token=xxx");
        httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
        2);httpConnManager.getParams().setMaxTotalConnections(2);
        httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE);
        httpConnManager.getParams().setConnectionTimeout(10000);
        httpConnManager.getParams().setReceiveBufferSize(655350);
        httpClient=newHttpClient(httpConnManager);newReadTask().start();
    }classReadTaskextendsThread{
        privateGetMethodconnectStreamServer(){
            StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制
            if(lastMsgLocation>0){
                targetURL+="&since_id="+lastMsgLocation;
            }System.out.println("StreamingReceiver http get url="+targetURL);
             GetMethodmethod=newGetMethod(targetURL);intstatusCode=0;
            try{
                statusCode=httpClient.executeMethod(method);
            }catch(Exceptione){
                //dealException;
            }if(statusCode!=HttpStatus.SC_OK){
                thrownewRuntimeException(".. ");
            }try{
                inputStream=newDataInputStream(method.getResponseBodyAsStream());
            }catch(IOExceptione){
                thrownewRuntimeException("get stream input io exception",
                e);
            }returnmethod;
        }//开一个线程从服务器读取数据:publicvoidrun(){
            while(true){
                GetMethodmethod=null;recIndex=0;recBuf=newbyte[
                    recBufSize
                ];try{
                    method=connectStreamServer();while(true){
                        processLine();
                    }
                }catch(Exceptione){
                    //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage());
                    lastMsgLocation=lastMsgLocation+2;
                    curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size();
                }
            }
        }privatevoidprocessLine()throwsIOException{
            byte[
                
            ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){
                Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message);
            }
        }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){
            System.out.println(message);
        }publicbyte[
            
        ]readLineBytes()throwsIOException{
            byte[
                
            ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){
                returnbos.toByteArray();
            }while((readCount=inputStream.read(recBuf,
            recIndex,
            recBuf.length-recIndex))>0){
                recIndex=recIndex+readCount;if(read(bos)){
                    break;
                }
            }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){
                thrownewIOException("++++ Stream appears to be dead, so closing it down");
            }returnresult;
        }privatebooleanread(ByteArrayOutputStreambos){
            booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){
                if(recBuf[
                    i
                ]==13&&recBuf[
                    i+1
                ]==10){
                    //13cr-回车10lf-换行index=i;break;
                }
            }if(index>=0){
                bos.write(recBuf,
                0,
                index);byte[
                    
                ]newBuf=newbyte[
                    recBufSize
                ];if(recIndex>index+2){
                    System.arraycopy(recBuf,
                    index+2,
                    newBuf,
                    0,
                    recIndex-index-2);
                }recBuf=newBuf;recIndex=recIndex-index-2;result=true;
            }else{
                if(recBuf[
                    recIndex-1
                ]==13){
                    bos.write(recBuf,
                    0,
                    recIndex-1);Arrays.fill(recBuf,
                    (byte)0);recBuf[
                        0
                    ]=13;recIndex=1;
                }else{
                    bos.write(recBuf,
                    0,
                    recIndex);Arrays.fill(recBuf,
                    (byte)0);recIndex=0;
                }
            }returnresult;
        }
    }
} 

//pom.xml中内容如下:
 <dependencies>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
    </dependencies>