FileDataWarehouse.java
package eu.javaexperience.storage.warehouse.file;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import eu.javaexperience.binary.FramedPacketCutter;
import eu.javaexperience.binary.PacketFramingTools;
import eu.javaexperience.storage.warehouse.DataWarehouse;
import eu.javaexperience.storage.warehouse.DataWarehouseInput;
import eu.javaexperience.storage.warehouse.DataWarehouseOutput;
public class FileDataWarehouse implements DataWarehouse<byte[]>
{
protected final File file;
protected FileDataWarehouseOutput fout;
protected static final byte[] PACKET_END_SEQUENCE = new byte[] {(byte) 0x0, (byte)0xff};
protected class FileDataWarehouseOutput implements DataWarehouseOutput<byte[]>
{
protected FileOutputStream out;
protected FileDataWarehouseOutput(FileOutputStream out)
{
this.out = out;
}
@Override
public synchronized void close() throws IOException
{
out.close();
out = null;
}
@Override
public synchronized void write(byte[] elem) throws IOException
{
byte[] w = PacketFramingTools.optEscapeBytes(elem, (byte) 0x00, 0);
out.write(w);
out.write(PACKET_END_SEQUENCE);
out.flush();
}
}
//TODO add locks
public FileDataWarehouse(File f) throws IOException
{
this.file = f;
if(!f.exists())
{
f.createNewFile();
}
fout = new FileDataWarehouseOutput(new FileOutputStream(file, true));
}
@Override
public DataWarehouseOutput<byte[]> openOutput()
{
return new DataWarehouseOutput<byte[]>()
{
boolean opened = true;
protected void assertNotClosed()
{
if(!opened && null != fout.out)
{
throw new RuntimeException("This DataWarehouseOutput is already closed");
}
}
@Override
public synchronized void close() throws IOException
{
assertNotClosed();
opened = false;
}
@Override
public synchronized void write(byte[] elem) throws IOException
{
assertNotClosed();
fout.write(elem);
}
};
}
@Override
public DataWarehouseInput<byte[]> openInput() throws IOException
{
return new DataWarehouseInput<byte[]>()
{
int entry = 0;
FileInputStream fis = new FileInputStream(file);
boolean opened = true;
byte[] readBuff = new byte[4096];
protected Deque<byte[]> packets = new LinkedList<>();
protected FramedPacketCutter cutter = new FramedPacketCutter((byte) 0x0, p->packets.add(p));
protected void assertNotClosed()
{
if(!opened)
{
throw new RuntimeException("This DataWarehouseInput is already closed");
}
}
@Override
public void close() throws IOException
{
assertNotClosed();
fis.close();
}
@Override
public synchronized byte[] read() throws EOFException, IOException
{
assertNotClosed();
do
{
byte[] ret = packets.pollLast();
if(null != ret)
{
++entry;
return ret;
}
int read = 0;
if((read = fis.read(readBuff)) < 1)
{
//TODO or eof exception?
return null;
}
cutter.feedBytes(readBuff, read);
}
while(true);
}
@Override
public synchronized boolean isSeekSupported()
{
assertNotClosed();
return false;
}
@Override
public synchronized long getPosition()
{
assertNotClosed();
return entry;
}
};
}
@Override
public void close() throws IOException
{
fout.close();
}
}