Files
secondo/Algebras/Hadoop/Java/ParallelSecondo/RemoteStream.java

410 lines
11 KiB
Java
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
package ParallelSecondo;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
/*
The purpose of this program is created a java class,
that can accept the binary tuple data sent from Secondo RemoteStreamAlgebra,
which must contains a key attribute value in the front of each tuple.
The key attribute must be one of the four basic data type:
int, bool, real or string.
At the same time, this class also support the function to send
the binary data back to Secondo
after extracting the key attribute value out.
*/
public class RemoteStream {
public enum ROLE{
SERVER, CLIENT
};
// public static int MAX_TUPLESIZE = 65535;
public static int MAX_TUPLESIZE = 655359; //Big TUPLE-BUFFER
private static int SOCKET_SIZE = 65536;
private static int SOCKET_METASIZE = 8;
// public static int SOCKTUP_SIZE = 1016;
public static int SOCKTUP_SIZE = SOCKET_SIZE - SOCKET_METASIZE;
public RemoteStream(String r, String hn, int pt)
{
if (r.toUpperCase().equals("SERVER"))
role = ROLE.SERVER;
else if (r.toUpperCase().equals("CLIENT"))
role = ROLE.CLIENT;
else
System.err.println("Error role type: " + r.toUpperCase());
if (hn.length() <= 0 || pt < 0)
System.err.println("Error hostname or port");
sock_ID = 0;
hostName = hn;
port = pt;
endOfStream = false;
loadTupleNum = 0;
initialized = true;
connected = false;
}
public boolean sendSocket(byte[] buf, int offset, int sock_Num)
throws IOException{
byte[] newBuf = new byte[SOCKET_SIZE];
if (buf == null){
System.arraycopy(Int2Byte(sock_ID), 0, newBuf, 0, 4);
System.arraycopy(Int2Byte(-1), 0, newBuf, 4, 4);
outSocketStream.write(newBuf, 0, SOCKET_SIZE);
outSocketStream.flush();
return true;
}
System.arraycopy(Int2Byte(sock_ID), 0, newBuf, 0, 4);
System.arraycopy(Int2Byte(sock_Num), 0, newBuf, 4, 4);
System.arraycopy(buf, offset, newBuf, 8, SOCKTUP_SIZE);
outSocketStream.write(newBuf, 0, SOCKET_SIZE);
outSocketStream.flush();
int get_SockID = inSocketStream.readInt();
if (get_SockID != (sock_ID + 1))
throw new IOException("Error[sendSocket]: incorrect return sock_ID");
sock_ID++;
return true;
}
/*
Receive one socket, and copy the data into ~buf~ array.
Return ~TRUE~ if there are successive sockets,
or ~FALSE~ if it's the last socket of the current tuple buffer.
*/
public boolean receiveSocket(byte[] buf, int offset) throws IOException
{
//Read the sock_ID ...
int get_SockID = inSocketStream.readInt();
if (get_SockID != sock_ID)
throw new IOException("Error: receive error sock_ID " + get_SockID
+ "\t while expect " + sock_ID);
//Read the sock_Num ...
int sock_Num = inSocketStream.readInt();
//Read the left bytes of Socket into ~buf~
int readCount = 0;
while (readCount < SOCKTUP_SIZE){
readCount += inSocketStream.read(buf, offset + readCount, SOCKTUP_SIZE - readCount);
}
sock_ID++;
outSocketStream.writeInt(sock_ID);
outSocketStream.flush();
if (sock_Num > 1)
return true;
else
{
if (sock_Num < 0)
endOfStream = true; //No more sockets ...
return false;
}
}
public int SendTupleBlock(byte[] buffer, int curPos)
throws IOException
{
int offset = 0;
int sock_Num = (int)Math.ceil((double)curPos / SOCKTUP_SIZE);
//System.out.println("*****Get sock_Num is: " + sock_Num);
int sentSok_Num = 0;
if (buffer == null)
System.err.println("Error: buffer is NULL. ");
while(offset < curPos)
{
sendSocket(buffer, offset, sock_Num);
sock_Num--;
sentSok_Num++;
offset += SOCKTUP_SIZE;
}
return sentSok_Num;
}
public int LoadTuples(byte[] srcBuf, byte[] desBuf, int curPos)
throws Exception
{
int offset = 0;
int desOffset = 0;
while (offset < curPos) {
int keySize = Byte2Int(srcBuf, offset);
if (keySize <= 0)
break;
offset += 4;
String keyValue = "";
if(keyType.equals("string"))
{
keyValue = Byte2String(srcBuf, offset, keySize);
}
else if (keyType.equals("int"))
{
keyValue = "" + Byte2Int(srcBuf, offset);
}
else if (keyType.equals("real"))
{
keyValue = "" + Byte2Real(srcBuf, offset);
}
else if (keyType.equals("bool"))
{
keyValue = "" + Byte2Bool(srcBuf, offset);
}
offset += keySize;
short tupleSize = Byte2UnsignedShort(srcBuf, offset);
tupleSize += 2; //The whole tuple size should contain the short length
//copy the tuple data to ~desBuf~
System.arraycopy(srcBuf, offset, desBuf, desOffset, tupleSize);
offset += tupleSize;
desOffset += tupleSize;
System.out.println("Read key[" + keyValue
+ "] tupleSize[" + tupleSize + "]");
loadTupleNum++;
}
return desOffset;
}
public void setKeyType(String _kt){
keyType = _kt;
}
public boolean getTheLastSocket(){
return endOfStream;
}
public boolean getConnected(){
return connected;
}
public void Connect()
throws IOException, InterruptedException{
if (initialized){
connected = open(hostName, port);
}else{
throw new IOException("Uninitialized RemoteStream Object");
}
}
private boolean open(String hostName, int pt)
throws IOException, InterruptedException{
if(ROLE.SERVER == role)
{
//START the server socket, and wait for accept it.
//Then use ~socket~ only to communicate with client.
server = new ServerSocket(pt);
socket = server.accept();
if (socket.isConnected()){
socket.setKeepAlive(true);
socket.setSoTimeout(0);
//Use self-defined DataInputStream, to transport data with
//little-endian bytes ordering
inSocketStream = new RMDataInputStream(
new BufferedInputStream(socket.getInputStream()));
outSocketStream = new RMDataOutputStream(
new BufferedOutputStream(socket.getOutputStream()));
System.out.println("Connected with Secondo as [" + role + "] in " + hostName + ":" + pt);
} else
throw new IOException("Error: Server socket doesn't accpeted");
}
else if(ROLE.CLIENT == role)
{
//Start the socket to get the connection only.
socket = null;
int waitCounter = 0;
while(socket == null){
try{
System.out.println("RemoteStream try connect ... ");
if (waitCounter++ > 10)
throw new IOException(
"Error: Can't connect to server [" + hostName
+ "] on port: " + pt);
InetSocketAddress srv = new InetSocketAddress(hostName, pt);
socket = new Socket();
socket.connect(srv, 0);
socket.setKeepAlive(true);
socket.setSoTimeout(0);
}catch (ConnectException e){
socket = null;
Thread.sleep(500);
}
}
inSocketStream = new RMDataInputStream(
new BufferedInputStream(socket.getInputStream()));
outSocketStream = new RMDataOutputStream(
new BufferedOutputStream(socket.getOutputStream()));
}
return true;
}
public String readLine() throws IOException
{
if (initialized)
return inSocketStream.readLine();
else
return "";
}
public void writeLine(String s) throws IOException{
if(initialized){
outSocketStream.writeString(s);
outSocketStream.flush();
}
else
System.err.println("Error the sender socket is not initialized.");
System.out.println("send line :" + s);
}
public void close()
{
try{
if (connected)
{
inSocketStream.close();
outSocketStream.close();
socket.close();
if(ROLE.SERVER == role && server != null){
server.close();
server = null;
}
connected = false;
}
}catch (IOException e){
e.printStackTrace();
}
System.err.println(hostName + " as [" + role + "] is closed.");
}
synchronized static short Byte2UnsignedShort(byte[] buf, int offset)
throws EOFException
{
int byte1 = buf[offset];
int byte2 = buf[offset + 1];
if (byte2 == -1) throw new EOFException();
return (short) (((byte2 << 24) >>> 16) + (byte1 << 24) >>> 24);
}
private synchronized static byte[] Int2Byte(int value)
throws IOException
{
byte[] bTemp = new byte[4];
//According to Little-endian ordering
for (int i = 0; i < 4; i++) {
bTemp[i] = (byte) ((value >> (i * 8)) & 0xff);
}
return bTemp;
}
synchronized static int Byte2Int(byte[] value, int offset) throws IOException
{
//int len = 4;
int res = 0, tmp;
//According to Little-endian ordering
for (int i = 3; i >= 0; i--) {
res <<= 8;
tmp = value[i + offset] & 0xFF;
res |= tmp;
}
return res;
}
public static final String toHex(byte b) {
return ("" + "0123456789ABCDEF".charAt(0xf & b >> 4) + "0123456789ABCDEF".charAt(b & 0xf));
}
synchronized static String Byte2String(byte[] value, int offset, int len) throws IOException
{
return new String(value, offset, len - 1);
}
private synchronized static double Byte2Real(byte[] value, int offset) throws IOException
{
return Double.longBitsToDouble(Byte2Long(value, offset));
}
private synchronized static long Byte2Long(byte[] value, int offset) throws IOException
{
long res = 0;
for (int i = 7; i >= 0; i--){
res <<= 8;
int tmp = value[i + offset] & 0xFF;
res |= tmp;
}
return res;
}
private synchronized static boolean Byte2Bool(byte[] value, int offset) throws IOException
{
int i = (int)value[offset];
if (i<0) throw new IOException();
if (i ==0 )
return false;
else
return true;
}
private ROLE role;
private String hostName;
private int port;
private int sock_ID;
private boolean initialized;
private boolean connected;
private boolean endOfStream;
private String keyType;
private int loadTupleNum;
private ServerSocket server;
private Socket socket;
private RMDataInputStream inSocketStream;
private RMDataOutputStream outSocketStream;
}
class RemoteStreamException extends RuntimeException{
private static final long serialVersionUID = -222798966184550543L;
RemoteStreamException(String message){
super(message);
}
}