E003 Creating a stream reader
A stream reader loads data of a given format like a normal reader and converts its contents into the internal representation. In contrast to a normal reader, a stream reader allows to load partial streams. Imagine a simulation process that writes its results into a pipe. With a stream reader, impulse will show all signals that were written until a certain point of time. This article shows how to convert a normal reader into a stream reader. The article uses the ExampleStreamReader class in the Extension Toolkit (that is the extended version of the ExampleFloatReader class).
Is this a stream reader ?
To state that your reader is a stream reader, you need to override this method:
@Override public boolean supportsStreaming() { return true; }
First step was easy, but unfortunately not enough:
Wait !
When the writer is used within a port, the port may want the writer to wait at the beginning (init phase). The information is given in progress structure:
// wait after header parsing if (progress instanceof IPortProgress) synchronized (progress) { while (!((IPortProgress) progress).isStreaming() && !progress.isCanceled()) try { progress.wait(250); } catch (InterruptedException e) { } }
A stream writer needs to add this fragment before any code that is writing signal values into the writers. The record structure shall be defined before.
What has been changed ?
The stream reader will be continuously asked if there was a change in the record structure or the signals.
public final static int CHANGED_NONE = 0; public final static int CHANGED_CURRENT = 1; public final static int CHANGED_SIGNALS = 2; public final static int CHANGED_RECORD = 3; @Override public int hasChanged() { return changed; }
The example uses the member changed. As soon as it extends the signal or record, it stores the current status into this member.
Flush !
When the environment found a change in the reader, it will ask the reader to flush its contents.
@Override synchronized public ICover flush() { changed = CHANGED_NONE; return super.doFlush(t); }
To inhibit a conflict between writing into the readers and flushing, the flush function as well as a part of the parse function (that is actually writing) are defined a synchronized.
synchronized private void parse() throws ParseException { // write the sin and cos data ((IFloatSamplesWriter) getWriter(float1)).write(t, Math.sin((t + 100) / 3000.0) * 10.0); // mark what has changed changed = CHANGED_SIGNALS; }