package market.thread;

import java.util.StringTokenizer;
import java.io.*;
import java.net.*;
import market.util.text.*;
import market.util.*;
import market.*;

/**
 * This Thread is Word Row Processor. It gets row by row from TextSplitter.
 * Creation date: (12/27/99 6:13:55 PM)
 * @author: tin
 */
public class WrProcessor extends Thread {

	Queue inStream;
	Queue_rate wr_rw_Count_Stream;

	WordElement[] wordList;

	WordRowElement inWordRow;
	public boolean working;
	int rwCount;
/**
 * WrProcessor constructor comment.
 */
public WrProcessor() {
	super();
}
/**
 * This constructor get input pipe. By this pipe WrProcessor recive word rows. 
 * Creation date: (12/27/99 6:42:25 PM)
 * @param num int ( Netrunner number )
 * @param threadGroup ThreadGroup 
 * @param inPipe java.io.Queue
 */
public WrProcessor( int num, ThreadGroup threadGroup, Queue inPipe) {

	super( threadGroup, "WrProcessor"+num);
	ConfigMarket.logWriter.log( getName()+": Super done...", LogPrinter.DEBUG);

	working = true;
	this.inStream= inPipe;
	wr_rw_Count_Stream =(( Graber )threadGroup).wr_rw_Count_Stream;
	ConfigMarket.logWriter.log( getName()+": Constructor done...", LogPrinter.DEBUG);
}
/**
 * This constructor get input pipe. By this pipe WrProcessor recive word rows. 
 * Creation date: (12/27/99 6:42:25 PM)
 * @param num int ( Netrunner number )
 * @param threadGroup ThreadGroup 
 * @param inPipe java.io.Queue
 */
public WrProcessor( int num, ThreadGroup threadGroup, Queue inPipe , Queue_rate wr_rw_count_queue) {

	
	super( threadGroup, "WrProcessor"+num);
	ConfigMarket.logWriter.log( getName()+": Super done...", LogPrinter.DEBUG);

	working = true;
	this.inStream= inPipe;
	wr_rw_Count_Stream = wr_rw_count_queue;
	wr_rw_Count_Stream =(( Graber )threadGroup).wr_rw_Count_Stream;
	ConfigMarket.logWriter.log( getName()+": Constructor done...", LogPrinter.DEBUG);
}
/**
 * Get rwKey for rwValue from Market_RwServer
 * Creation date: (12/27/99 7:00:56 PM)
 * @return long ( rwKey )
 * @param rwString String
 * @param rwCount int
 * @param langKey int
 */
public long directResolveRawWord(String rwString, int rwCount, int langKey, long wrKey) {
	
	int trys = 1;
	
	agein : while (true) {
		try {
			long rwKey = ConfigMarket.rwServer.market_rwMethodFlow(rwString, (short) langKey, (long) rwCount, wrKey);
			ConfigMarket.logWriter.log(getName() + ": Word: " + rwString + " rwKey:" + rwKey, LogPrinter.DEBUG_1);
			
			if (rwKey > 0) {
				return rwKey;
			} else {
				
				if (trys++ < Const.MAX_TRAYS) {
					continue agein;
				}
				ConfigMarket.logWriter.log(getName() + ": Can't Resolve raw word: " + rwString + " !", LogPrinter.ERROR);
				return 0;
			}
		} catch (Exception e) {
			
			if (trys++ < Const.MAX_TRAYS) {
				continue agein;
			}
			ConfigMarket.logWriter.log(getName() + ": Can't Resolve raw word: " + rwString + "!: " + e, LogPrinter.ERROR);
			e.printStackTrace();
			return 0;
		}
	}
}
/**
 * Get rwKey for all word in wordList
 * Creation date: (12/27/99 10:01:25 PM)
 * @param wordList WordElement[]
 * @param wrKey long
 */
public void directResolveRawWords( WordElement[] wordList, long wrKey ) {

	rwCount = 0;
	for(int i=0 ; i < wordList.length ; i++  ){
		wordList[i].rwKey = 
		  directResolveRawWord(wordList[i].rwString, wordList[i].rwCount, inWordRow.langKey, wrKey);
		rwCount += wordList[i].rwCount;  
		yield();  
	}
	
}
/**
 * Get rwKey for all word in wordList from market_rwServer
 * Creation date: (08/1/00 10:01:25 PM)
 * @param wordList WordElement[]
 * @param wrKey long
 */
public void directResolveRawWordsPack(WordElement[] wordList, long wrKey) {
	short numResolved = ConfigMarket.rwServer.market_rwResolvePacket(wrKey, (short) inWordRow.langKey, wordList);
	rwCount = numResolved;
	if (numResolved != wordList.length) {
		ConfigMarket.logWriter.log(getName() + ": Number Word: " + wordList.length + " Resolved Word: " + numResolved, LogPrinter.ERROR);
	} else {
		ConfigMarket.logWriter.log(getName() + ": Resolved Word: " + numResolved, LogPrinter.DEBUG_1);
	}
	if ((ConfigMarket.logWriter.getLogLevel() >= LogPrinter.DEBUG_1) && (ConfigMarket.logWriter.getScreenLogLevel() >= LogPrinter.DEBUG_1)) {
		for (int i = 0; i < wordList.length; i++) {
			ConfigMarket.logWriter.log(getName() + ": Word: " + wordList[i].rwString + " rwKey:" + wordList[i].rwKey, LogPrinter.DEBUG_1);
			yield();
		}
	}
}
/**
 * Get rwKey for rwValue from Market_RwServer
 * Creation date: (12/27/99 7:00:56 PM)
 * @return long ( rwKey )
 * @param rwString String
 * @param rwCount int
 * @param langKey int
 */
public long directResolveWordRow(String wrString, int langKey, int wrCount, long srcKey) {
	
	int trys = 1;
	
	agein : while (true) {
		try {
			long wrKey = ConfigMarket.wrServer.market_wrMethodFlow(wrString, (short) langKey, 
																	(long) wrCount, (int) srcKey);
			ConfigMarket.logWriter.log(getName() + ": Sentence:" + wrString + 
										" wrKey:" + wrKey, LogPrinter.DEBUG_1);
			if (wrKey > 0) {
				return wrKey;
			}else{
				if (trys++ < Const.MAX_TRAYS) {
					continue agein;
				}
				ConfigMarket.logWriter.log(getName() + ": Can't Resolve word row: " + 
										wrString + "!", LogPrinter.ERROR);
				return 0;				
			}
			
		} catch (Exception e) {
			if (trys++ < Const.MAX_TRAYS) {
				continue agein;
			}
			ConfigMarket.logWriter.log(getName() + ": Can't Resolve word row: " + 
									wrString + "!", LogPrinter.ERROR);
			return 0;
		}
	}
}
/**
 * Send bundle to MDVServer.
 * Creation date: (12/27/99 10:04:36 PM)
 * @return boolean ( true for sucsses )
 * @param wordList WordElement[]
 * @param wrKey long
 */
public boolean directSendBundleToMDVServer( WordElement[] wordList, long wrKey ) {
		
	int trys = 1;
	
	agein : while (true) {
		try {
			boolean ok = ConfigMarket.mdvServer.mdvProcInfo( wordList );
			
			ConfigMarket.logWriter.log(getName() + ": Result from MDVServer is :" + ok , LogPrinter.DEBUG);
			
			if ( ok ) {
				return true;
			}else{
				if (trys++ < Const.MAX_TRAYS) {
					continue agein;
				}
				ConfigMarket.logWriter.log(getName() + 
							": MDVServer return false " + Const.MAX_TRAYS + " times !", LogPrinter.ERROR);
				return false;				
			}
			
		} catch (Exception e) {
			if (trys++ < Const.MAX_TRAYS) {
				continue agein;
			}
			ConfigMarket.logWriter.log(getName() + 
						": Can't send bundle properly to MDVServer ! " + e.toString() , LogPrinter.ERROR);
			return false;				
		}
	}
	
}
/**
 * Get rwKey for rwValue from Market_RwServer
 * Creation date: (12/27/99 7:00:56 PM)
 * @return long ( rwKey )
 * @param rwString String
 * @param rwCount int
 * @param langKey int
 */
public long httpResolveRawWord(String rwString, int rwCount, int langKey, long wrKey) {
	
	StringBuffer ret_Value = new StringBuffer(Const.MAX_LEN_KEY);
	String encoded_rwString = URLEncoder.encode(rwString);

	int trys = 2;
	
	agein : while (true) {
		try {
			// ??? market_rwServer?marketKey=1&wrKey=2&rwString=Some&langKey=1&rwCounts=1
			URL ServletURL = new URL(ConfigMarket.hostMarketRwServer + "?" 
				+ Const.RW_STRING + "=" + encoded_rwString + "&" + Const.WR_KEY + "=" + wrKey +	"&" 
				+ Const.RW_COUNT + "=" + rwCount + "&" + Const.LANG_KEY + "=" + langKey);
			
			InputStream in = ServletURL.openStream();
			int b;

			while ((b = in.read()) != -1) {
				ret_Value.append((char) b);
			}

			in.close();
			in = null;
			
			long rwKey = Long.valueOf(ret_Value.toString()).longValue();
			
			ConfigMarket.logWriter.log( getName()+": Word: " + encoded_rwString 
				+ " rwKey:" + rwKey, LogPrinter.DEBUG_1);	

			if (rwKey > 0) {
				return rwKey;
			}

			throw new IOException();

		} catch ( IOException e ) {
			if (trys++ < Const.MAX_TRAYS){
				ret_Value = new StringBuffer(Const.MAX_LEN_KEY);
				continue agein;
			}	

			ConfigMarket.logWriter.log( getName()+": Can't Resolve raw word: "
										+ encoded_rwString +"!", LogPrinter.ERROR);	
			return 0;
		}

	}
}
/**
 * Get rwKey for all word in wordList
 * Creation date: (12/27/99 10:01:25 PM)
 * @param wordList WordElement[]
 * @param wrKey long
 */
public void httpResolveRawWords( WordElement[] wordList, long wrKey ) {

	rwCount = 0;  
	for(int i=0 ; i < wordList.length ; i++  ){
		wordList[i].rwKey = 
		  httpResolveRawWord(wordList[i].rwString, wordList[i].rwCount, inWordRow.langKey, wrKey);
		rwCount ++;    
		yield();  
	}
	
}
/**
 * Get wrKey for wrValue from Market_WrServer
 * Creation date: (12/27/99 6:57:17 PM)
 * @return long ( wrKey )
 * @param wrString String
 * @param langKey int
 * @param wrCount int
 * @param srcKey long
 */
public long httpResolveWordRow( String wrString, int langKey, int wrCount, long srcKey ) {
	
	StringBuffer ret_Value = new StringBuffer( Const.MAX_LEN_KEY );
	String encoded_wrString = URLEncoder.encode( wrString );

	String s_url = Const.WR_STRING +"=" +  encoded_wrString +
		  "&" + Const.LANG_KEY +"="+ langKey +
		  "&" + Const.WR_COUNTS +"="+ wrCount +
		  "&" + Const.SRC_KEY +"="+ srcKey;
	
	try{
	// market_wrServer?wrString=Some+value&langKey=1&wrCounts=1&srcKey=5
		  	
	  URL ServletURL = new URL( ConfigMarket.hostMarketWrServer +"?"+ s_url );
	  
	  ConfigMarket.logWriter.log( getName()+": URL Str:" + s_url , LogPrinter.DEBUG_1);
	  
	  InputStream in = ServletURL.openStream();
	  
	  int b; 
	
 	  while ((b =in.read())!= -1){ 
	 	  ret_Value.append((char)b);  
	  } 

	  in.close();
	  in = null;

 	  long wrKey = Long.valueOf(ret_Value.toString()).longValue();
 	  
	  ConfigMarket.logWriter.log( getName()+": Sentence:"+ wrString +" wrKey:" + wrKey , LogPrinter.DEBUG_1);
	  
 	  if ( wrKey > 0 ){
	 	  
 	  	return wrKey;
 	  }
 	  
 	  throw new Exception();

	}
	catch( Exception e ){
	    ConfigMarket.logWriter.log( getName()+": Can't Resolve word row!", LogPrinter.ERROR);
		return 0;
	}
	

}
/**
 * Create bundle for rwDayValueServer and send it
 * Creation date: (12/27/99 10:04:36 PM)
 * @param wordList WordElement[]
 * @param wrKey long
 */
public void httpSendBundleToMDVServer(WordElement[] wordList, long wrKey ) {

	StringBuffer req_RTVS = new StringBuffer( Const.MAX_LEN_URL );
	int j;
	
	for(int i=0 ; i < wordList.length ; i++  ){

		j = i+1;		
		req_RTVS.append("&"+Const.RW_KEY + j + "="+ wordList[i].rwKey +"&" 
			               +Const.RW_COUNT + j + "="+ wordList[i].rwCount );		
	}

	try{
// ??? MDVServer?ReqType=2&wrKey=11&numWord=2&rwKey1=54&rwCount1=2&rwKey2=34&rwCount2=12
	  URL ServletURL = new URL( ConfigMarket.hostMDVServer + "?"
		      + Const.REQ_TYPE +"=" +  Const.RTVS_COUNT +
		  "&" + Const.WR_KEY +"="+ wrKey +
		  "&" + Const.NUM_WORD +"="+ wordList.length 
		  	  + req_RTVS.toString() );

//	  System.out.println(req_RTVS.toString());	
	   
	  InputStream in = ServletURL.openStream();

  	  in.close();
	  in = null;

	}
	catch(IOException e){
	    ConfigMarket.logWriter.log( getName()+": Can't Send Bundle to RTVS !", LogPrinter.ERROR);
		return;
	}	
	
}
/**
 * This is a body of WrProcessor  <br>
 * 1. Get Next Row from inStream <br>
 * 2. Resolve Row Word Key on m_wrServer <br>
 * 3. Split sentences <br>
 * 4. Resolve Raw Word Key on m_rwServer <br>
 * 5. Send Bundle to m_RTVServer <br>
 * Creation date: (12/27/99 6:14:27 PM)
 */
public void run() {

   	Runtime myRuntime = Runtime.getRuntime();
   	myRuntime.traceInstructions( true ); 
   	myRuntime.traceMethodCalls( true ); 
	long wrKey ;
	int numEmptyLinkStream=0;
   
   while( working ) {

	 if ( ((float)myRuntime.freeMemory())/myRuntime.totalMemory() < Const.MinFreeMemory ){
		 
		ConfigMarket.logWriter.log( getName()+": Total Memory: "+myRuntime.totalMemory()
			+" Free Memory: "+myRuntime.freeMemory(), LogPrinter.DEBUG );	
		  
	 	System.runFinalization();
	 }	

	 try{
		try{
			inWordRow = null;
			ConfigMarket.logWriter.log( getName()+": Try read new senteces !", LogPrinter.DEBUG);				
			inWordRow = (WordRowElement) inStream.readObject();
			if (inWordRow == null){
  		  		working = false;
  	 	  		continue;	
	   		}	 
	 	}
	 	catch ( Exception  e){
			ConfigMarket.logWriter.log( getName()+": Empty link Stream!", LogPrinter.ERROR);	
			working = false;
  	    	continue;
	 	}	
		
//		wrKey = 102;
		wrKey = directResolveWordRow( inWordRow.wrString, inWordRow.langKey, 
								   inWordRow.wrCount, inWordRow.srcKey );

		if ( wrKey <= 0 ) continue;
		
		wordList = Text.splitSentence( inWordRow.wrString.toLowerCase() , inWordRow.wrCount );

		directResolveRawWordsPack( wordList, wrKey );

		directSendBundleToMDVServer( wordList, wrKey );		

		sendCounts( 1 , rwCount );
		
//		ConfigMarket.logWriter.log( getName()+": Time Sleep: " + ConfigMarket.wrProcessorTimeSleep , LogPrinter.INFO );	

		sleep( ConfigMarket.wrProcessorTimeSleep );
//		ConfigMarket.logWriter.log( getName()+": wake up " , LogPrinter.INFO );	
	 }catch (Exception e){
		ConfigMarket.logWriter.log( getName()+": Somting wrong after read!:" + e , LogPrinter.ERROR);
		e.printStackTrace( ConfigMarket.logWriter.getPrintWriter() );
		ConfigMarket.logWriter.flush();
		( (Graber)getThreadGroup() ).sendError();
	 }catch (ThreadDeath t) {
		ConfigMarket.logWriter.log(getName() + ": Force stop (kill):", LogPrinter.ERROR);
		t.printStackTrace(ConfigMarket.logWriter.getPrintWriter());
		ConfigMarket.logWriter.flush();
		working = false;
		return; // exit
	 }catch ( Throwable t) {
		ConfigMarket.logWriter.log( getName() + ": ! Eny error !:" + t, LogPrinter.ERROR );
		working = false;
		( (Graber)getThreadGroup() ).sendError();
		continue;
	 }

   }
   
ConfigMarket.logWriter.log( getName()+": Finish. ", LogPrinter.INFO);
Graber grab = (Graber) this.getThreadGroup();
grab.finish();

}
/**
 * Write in Queue_rate for Wr_Rw_RateMeter.
 * Creation date: (9.6.00 16:02:35)
 * @param wrCount int
 * @param rwCount int
 */
protected void sendCounts(int wrCount, int rwCount) {

	Wr_Rw_rateElement rateElm = new Wr_Rw_rateElement( wrCount, rwCount );

	wr_rw_Count_Stream.writeObject( rateElm );
	
}
}

