package market.thread;

import market.util.text.*;
import java.util.*;
import market.util.*;
import market.*;
import java.io.*;

/**
 * Insert the type's description here.
 * Creation date: (1/2/00 9:22:31 PM)
 * @author: 
 */
public class TextSplitter extends Thread {

	public Queue[] wrOutQueue;	
	public Queue textInStream;
	
	public WrProcessor[] mWrProcessor;
	public market.util.TextElement inText;
	public boolean working;

	protected int numWrProcessors;

/**
 * Pointer to Wr_Rw_RateMeter
 */
	private Wr_Rw_RateMeter mRateMeter = null;
	
/**
 * TextSplitter constructor comment.
 */
public TextSplitter() {
	super();
}
/**
 * This constructor get input pipe. By this pipe TextSplitter recive text. 
 * Creation date: (12/27/99 6:42:25 PM)
 * @param num int ( Netrunner number )
 * @param threadGroup ThreadGroup 
 * @param textinPipe market.util.Queue
 * @param numWr int ( WrProcessor number )
 */
public TextSplitter( int num, ThreadGroup threadGroup , Queue textInPipe, int numWr )
{
	super( threadGroup, "TextSplitter"+num);
	ConfigMarket.logWriter.log( getName()+": Super done...", LogPrinter.DEBUG);
		
	working = true;
	this.textInStream= textInPipe;
	numWrProcessors = numWr;
// init Pipe

	wrOutQueue = new Queue[ numWrProcessors ];
	mWrProcessor = new WrProcessor[ numWrProcessors ];
	mRateMeter =(( Graber )threadGroup).mRateMeter;

	
	try{
		for (int i=0; i < mWrProcessor.length; i++ ){
			wrOutQueue[i] = new Queue( ConfigMarket.TEXT_SPLITTER_OUT );	
			ConfigMarket.logWriter.log( getName()+": Wr Out Queue "+i+" done.", LogPrinter.DEBUG_0);
			mWrProcessor[i] = new WrProcessor( num*10+i, threadGroup, wrOutQueue[i] );
			ConfigMarket.logWriter.log( getName()+": WrProcessor "+i+" done.", LogPrinter.DEBUG_0);
		}	
	}catch( Throwable  t ){
		ConfigMarket.logWriter.log( getName()+": " + t , LogPrinter.ERROR );	
	}
	
}
/**
 * Kill TextSpliter and all our slave.
 * Creation date: (03.3.2000 “. 04:34:59)
 */
public void kill() {
	
// Kill WrProcessors
	stopAllWrProcessors();

	stop();
}
/**
 * This is body cicle for TextSplitter
 * Creation date: (1/2/00 9:35:34 PM)
 */
public void run() {
	try {
		TxtSpliter txtSplit = new TxtSpliter();
		int wrKey, numEmptyLinkStream = 0;

		//   mWrProcessor.setDaemon( true );
		ConfigMarket.logWriter.log(getName() + ": Start" + numWrProcessors + " WrProcessors...", LogPrinter.DEBUG);
		startWrProcessors();
		Runtime myRuntime = Runtime.getRuntime();
		while (working) {
			if ((float) myRuntime.freeMemory() / myRuntime.totalMemory() < Const.MinFreeMemory) {
				ConfigMarket.logWriter.log(getName() + ": Total Memory: " + myRuntime.totalMemory() + " Free Memory: " + myRuntime.freeMemory(), LogPrinter.DEBUG_2);
				System.runFinalization();
			}
			try {
				try {
					ConfigMarket.logWriter.log(getName() + ": Geting Next Text Document...", LogPrinter.DEBUG);
					inText = (TextElement) textInStream.readObject();
					if (inText == null) {
						working = false;
						continue;
					}
				} catch (Exception e) {
					ConfigMarket.logWriter.log(getName() + ": Can't read from text Stream!", LogPrinter.ERROR);
					working = false;
					continue;
				}
				ConfigMarket.logWriter.log(getName() + ":Spliting srcKey= " + inText.srcKey, LogPrinter.DEBUG);
				try {
					txtSplit.SplitText(inText.text);
				} catch (Throwable t) {
					ConfigMarket.logWriter.log(getName() + ": Error in split process: " + t, LogPrinter.ERROR);
				}
				ConfigMarket.logWriter.log(getName() + ": Sending to WrProcessors... ", LogPrinter.DEBUG);
				sendWrToWrProcessors(txtSplit.sentances, txtSplit.counts);
				ConfigMarket.logWriter.log(getName() + ": Sended to WrProcessors ", LogPrinter.DEBUG);
			} catch (Exception e) {
				ConfigMarket.logWriter.log(getName() + ": Somting wrong:!", LogPrinter.ERROR);
				e.printStackTrace(ConfigMarket.logWriter.getPrintWriter());
			}
		}
	} catch (ThreadDeath t) {
		ConfigMarket.logWriter.log(getName() + ": Force stop (kill):", LogPrinter.ERROR);
		t.printStackTrace(ConfigMarket.logWriter.getPrintWriter());
		ConfigMarket.logWriter.log(getName() + ": Finish.", LogPrinter.INFO);
		return; // exit
	} catch (Throwable t) {
		ConfigMarket.logWriter.log(getName() + ": Error in split process: " + t, LogPrinter.ERROR);
		t.printStackTrace(ConfigMarket.logWriter.getPrintWriter());
		( (Graber)getThreadGroup() ).sendError();
	}
	for (int proc = 0; proc < numWrProcessors; proc++) {
		wrOutQueue[proc].writeObject((Object) null);
	}
	ConfigMarket.logWriter.log(getName() + ": Finish.", LogPrinter.INFO);
}
public void sendWrToWrProcessors(Vector sentences, Vector counts) {
	
	int i, size = sentences.size();
	int count = 1;
	String wrStr = null;
	WordRowElement wr;
	int wrProcIndex = 0;
	for (i = 0; i < size; i++) {
		try {
			wrStr = (String) sentences.elementAt(i);
			count = ((Integer) counts.elementAt(i)).intValue();
		} catch (Exception e) {
			ConfigMarket.logWriter.log(getName() + ":Can't get word row from Vector", LogPrinter.ERROR);
		}
		if ((wrStr == null) || (count == 0)) {
			ConfigMarket.logWriter.log(getName() + ": Empty wrString or zero count :", LogPrinter.ERROR);
			continue;
		}
		wr = new WordRowElement(inText.langKey, inText.srcKey, count, wrStr);
		try {
			wrOutQueue[wrProcIndex].writeObject((Object) wr);
			ConfigMarket.logWriter.log(getName() + ":NumOf Text Out Element:" + wrOutQueue[wrProcIndex].getNumOfElement(), LogPrinter.DEBUG_2);
		} catch (Exception e) {
			ConfigMarket.logWriter.log(getName() + ":Can't write in word row Queue: " + wrProcIndex, LogPrinter.ERROR);
		}
		wrProcIndex = (wrProcIndex + 1) % numWrProcessors;
	}
}
/**
 * Stop All WrProcessor.
 * Creation date: (1/24/00 6:05:56 PM)
 */
public void startWrProcessors() {
	
	int proc;
	
	ConfigMarket.logWriter.log( getName()+": Statring All WrProcessor...", LogPrinter.INFO );
	
	if(!mRateMeter.isAlive()){
		mRateMeter.time_start =  System.currentTimeMillis();
		mRateMeter.start();
	}

	for ( proc = 0; proc < numWrProcessors; proc++) {
		if ( mWrProcessor[proc] != null ) {
			mWrProcessor[proc].start();
		}
	}
	
	ConfigMarket.logWriter.log( getName()+": Starting All WrProcessor...Done", LogPrinter.INFO );
}
/**
 * Stop All WrProcessor.
 * Creation date: (1/24/00 6:05:56 PM)
 */
public void stopAllWrProcessors() {
	
	int proc;
	
	ConfigMarket.logWriter.log( getName()+": Stoping All WrProcessor...", LogPrinter.INFO );

	for ( proc = 0; proc < numWrProcessors; proc++) {
		if ( (mWrProcessor[proc] != null) && mWrProcessor[proc].isAlive() ) {
			mWrProcessor[proc].stop();
		}
		
		mWrProcessor[proc] = null;
	}
	
	ConfigMarket.logWriter.log( getName()+": Stoping All WrProcessor...Done", LogPrinter.INFO );
}
}

