E07 Creating a stream reader

A stream reader loads data of a given format like a normal reader and converts its contents into the internal representation. In contrast to a normal reader, a stream reader allows to load partial streams. Imagine a simulation process that writes its results into a pipe. With a stream reader, impulse will show all signals that were written until a certain point of time. This article shows how to convert a normal reader into a stream reader. The article uses the ExampleStreamReader class in the Extension Toolkit (that is the extended version of the ExampleFloatReader class).

Is this a stream reader ?

To state that your reader is a stream reader, you need to override this method:

    @Override
    public boolean supportsStreaming() {
        return true;
    }

First step was easy, but unfortunately not enough:

Wait !

When the writer is used within a port, the port may want the writer to wait at the beginning (init phase). The information is given in progress structure:

    // wait after header parsing
    if (progress instanceof IPortProgress)
        synchronized (progress) {
            while (!((IPortProgress) progress).isStreaming() && !progress.isCanceled())
                try {
                    progress.wait(250);
                } catch (InterruptedException e) {
                }
        }

A stream writer needs to add this fragment before any code that is writing signal values into the writers. The record structure shall be defined before.

What has been changed ?

The stream reader will be continuously asked if there was a change in the record structure or the signals.

    public final static int CHANGED_NONE = 0;
    public final static int CHANGED_CURRENT = 1;
    public final static int CHANGED_SIGNALS = 2;
    public final static int CHANGED_RECORD = 3;
    
    @Override
    public int hasChanged() {
        return changed;
    }

The example uses the member changed. As soon as it extends the signal or record, it stores the current status into this member.

Flush !

When the environment found a change in the reader, it will ask the reader to flush its contents.

@Override
    synchronized public ICover flush() {
        changed = CHANGED_NONE;
        return super.doFlush(t);
    }

To inhibit a conflict between writing into the readers and flushing, the flush function as well as a part of the parse function (that is actually writing) are defined a synchronized.

    synchronized private void parse() throws ParseException {
        
        // write the sin and cos data
        ((IFloatSamplesWriter) getWriter(float1)).write(t, Math.sin((t + 100) / 3000.0) * 10.0);
        
        // mark what has changed
        changed = CHANGED_SIGNALS;
    }