示例代码
第29行: | 第29行: | ||
streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021"); | streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021"); | ||
httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, | httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, | ||
− | + | 5);httpConnManager.getParams().setMaxTotalConnections(5); | |
httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE); | httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE); | ||
httpConnManager.getParams().setConnectionTimeout(10000); | httpConnManager.getParams().setConnectionTimeout(10000); |
2016年9月12日 (一) 17:22的版本
//java源代码如下: importorg.apache.commons.httpclient.HostConfiguration; importorg.apache.commons.httpclient.HttpClient; importorg.apache.commons.httpclient.HttpStatus; importorg.apache.commons.httpclient.MultiThreadedHttpConnectionManager; importorg.apache.commons.httpclient.methods.GetMethod; importjava.io.ByteArrayOutputStream; importjava.io.DataInputStream; importjava.io.IOException;importjava.util.ArrayList; importjava.util.Arrays;importjava.util.List; publicclassReceiveMessageTest{ privateMultiThreadedHttpConnectionManagerhttpConnManager; privateHttpClienthttpClient=null;//从datapush服务器获取连接 privateList<String>streamingUrlList=newArrayList<String>();//当前连接的streamingserver,可以配置privateintcurStreamUrlIndex=0; publicstaticlonglastMsgLocation=-1L; privateDataInputStreaminputStream; privatefinalintrecBufSize=256;privatebyte[ ]recBuf=newbyte[ recBufSize ];privateintrecIndex=0;publicstaticvoidmain(String[ ]args){ ReceiveMessageTesttest=newReceiveMessageTest();test.init(); }publicvoidinit(){ streamingUrlList.add("http://c.api.weibo.com/2/datapush/comment.json?subid=19021"); httpConnManager=newMultiThreadedHttpConnectionManager();httpConnManager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION, 5);httpConnManager.getParams().setMaxTotalConnections(5); httpConnManager.getParams().setSoTimeout(Integer.MAX_VALUE); httpConnManager.getParams().setConnectionTimeout(10000); httpConnManager.getParams().setReceiveBufferSize(655350); httpClient=newHttpClient(httpConnManager);newReadTask().start(); }classReadTaskextendsThread{ privateGetMethodconnectStreamServer(){ StringtargetURL=streamingUrlList.get(curStreamUrlIndex);//从指定的location开始读取数据,保证读取数据的连续性,消息完整性//链接断开的重新链接机制 if(lastMsgLocation>0){ targetURL+="&since_id="+lastMsgLocation; }System.out.println("StreamingReceiver http get url="+targetURL); GetMethodmethod=newGetMethod(targetURL);intstatusCode=0; try{ statusCode=httpClient.executeMethod(method); }catch(Exceptione){ //dealException; }if(statusCode!=HttpStatus.SC_OK){ thrownewRuntimeException(".. "); }try{ inputStream=newDataInputStream(method.getResponseBodyAsStream()); }catch(IOExceptione){ thrownewRuntimeException("get stream input io exception", e); }returnmethod; }//开一个线程从服务器读取数据:publicvoidrun(){ while(true){ GetMethodmethod=null;recIndex=0;recBuf=newbyte[ recBufSize ];try{ method=connectStreamServer();while(true){ processLine(); } }catch(Exceptione){ //当连接断开时,重新连接System.out.println("streaming process error "+e.getMessage()); lastMsgLocation=lastMsgLocation+2; curStreamUrlIndex=++curStreamUrlIndex%streamingUrlList.size(); } } }privatevoidprocessLine()throwsIOException{ byte[ ]bytes=readLineBytes();if(bytes!=null&&bytes.length>0){ Stringmessage=newString(bytes);//handlethemessageyougethandleMessage(message); } }//此方法解析message//同时需将lastMsgLocation赋值为最近1条消息的idprivatevoidhandleMessage(Stringmessage){ System.out.println(message); }publicbyte[ ]readLineBytes()throwsIOException{ byte[ ]result=null;ByteArrayOutputStreambos=newByteArrayOutputStream();intreadCount=0;if(recIndex>0&&read(bos)){ returnbos.toByteArray(); }while((readCount=inputStream.read(recBuf, recIndex, recBuf.length-recIndex))>0){ recIndex=recIndex+readCount;if(read(bos)){ break; } }result=bos.toByteArray();if(result==null||(result!=null&&result.length<=0&&recIndex<=0)){ thrownewIOException("++++ Stream appears to be dead, so closing it down"); }returnresult; }privatebooleanread(ByteArrayOutputStreambos){ booleanresult=false;intindex=-1;for(inti=0;i<recIndex-1;i++){ if(recBuf[ i ]==13&&recBuf[ i+1 ]==10){ //13cr-回车10lf-换行index=i;break; } }if(index>=0){ bos.write(recBuf, 0, index);byte[ ]newBuf=newbyte[ recBufSize ];if(recIndex>index+2){ System.arraycopy(recBuf, index+2, newBuf, 0, recIndex-index-2); }recBuf=newBuf;recIndex=recIndex-index-2;result=true; }else{ if(recBuf[ recIndex-1 ]==13){ bos.write(recBuf, 0, recIndex-1);Arrays.fill(recBuf, (byte)0);recBuf[ 0 ]=13;recIndex=1; }else{ bos.write(recBuf, 0, recIndex);Arrays.fill(recBuf, (byte)0);recIndex=0; } }returnresult; } } }
//pom.xml中内容如下: <dependencies> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> </dependencies>