dataflow/dataflow.c

261 lines
6.4 KiB
C
Raw Normal View History

2024-12-09 17:14:57 +01:00
#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include "bipbuffer.h"
#define NUM_SAMPLES (5)
typedef struct io_cb io_cb_t;
typedef struct {
io_cb_t *io;
int dimensions;
} arg_t;
typedef void (*data_handler_t)( arg_t *io );
struct io_cb {
void *data;
size_t size;
data_handler_t prepare_read;
data_handler_t commit_read;
data_handler_t prepare_write;
data_handler_t commit_write;
};
void dummy_handler( arg_t *io ) {
(void)io;
}
enum data_type_e {
UINT8 = 0,
INT16,
};
typedef struct {
io_cb_t cb;
int data_type;
FILE *fd;
} file_io_t;
enum arguments_e {
IN0 = 0,
IN1,
IN2,
IN3,
};
typedef struct {
void (*process)( arg_t args[] );
int input_argc;
int output_argc;
arg_t args[];
} node_t;
void file_prepare_read_handler( arg_t *arg ) {
file_io_t *io = (file_io_t*)arg->io;
char value[128] = {0};
char end;
int result;
FILE *fd = io->fd;
uint8_t *outu8 = io->cb.data;
int16_t *outi16 = io->cb.data;
do {
result = fscanf( fd, "%127[^,\n]", value );
if( result == 0 ) {
result = fscanf( fd, "%c", &end );
if( end == '\n' ) {
break;
}
} else {
int number = atoi(value);
switch( io->data_type ) {
case INT16: {
*outi16++ = number;
io->cb.size += 2;
break;
}
case UINT8:
default:
*outu8++ = number;
io->cb.size++;
break;
}
printf("%d\n", atoi(value));
}
} while(result != EOF);
}
void file_io_init( file_io_t *io, const char *filename ) {
FILE *fd = fopen(filename, "r");
io->fd = NULL;
if( fd == NULL ) {
perror("can't open file");
return;
}
io->fd = fd;
io_cb_t *cb = (io_cb_t*)io;
cb->prepare_read = file_prepare_read_handler;
cb->commit_read = dummy_handler;
cb->prepare_write = dummy_handler;
cb->commit_write = dummy_handler;
}
typedef struct {
io_cb_t cb;
bipbuf_t buf;
} buffer_io_t;
static void buffer_prepare_read_handler( arg_t *arg ) {
printf("%s\n", __func__);
buffer_io_t *io = (buffer_io_t*)arg->io;
io_cb_t *cb = &io->cb;
cb->data = bipbuf_peek( &io->buf, arg->dimensions );
cb->size = arg->dimensions;
}
static void buffer_commit_read_handler( arg_t *arg ) {
printf("%s\n", __func__);
buffer_io_t *io = (buffer_io_t*)arg->io;
io_cb_t *cb = &io->cb;
bipbuf_poll( &io->buf, arg->dimensions );
cb->data = NULL;
cb->size = 0;
}
static void buffer_prepare_write_handler( arg_t *arg ) {
printf("%s\n", __func__);
buffer_io_t *io = (buffer_io_t*)arg->io;
io_cb_t *cb = &io->cb;
cb->data = bipbuf_reserve( &io->buf, arg->dimensions );
cb->size = arg->dimensions;
}
static void buffer_commit_write_handler( arg_t *arg ) {
printf("%s\n", __func__);
buffer_io_t *io = (buffer_io_t*)arg->io;
io_cb_t *cb = &io->cb;
bipbuf_commit( &io->buf, arg->dimensions );
cb->data = NULL;
cb->size = 0;
}
void buffer_io_init( buffer_io_t *io, size_t size ) {
bipbuf_init( &io->buf, size );
io_cb_t *cb = &io->cb;
cb->prepare_read = buffer_prepare_read_handler;
cb->commit_read = buffer_commit_read_handler;
cb->prepare_write = buffer_prepare_write_handler;
cb->commit_write = buffer_commit_write_handler;
}
void simple( arg_t * __restrict args ) {
printf("%s\n", __func__ );
int16_t* __restrict in_t = args[0].io->data;
int16_t* __restrict out_t = args[1].io->data;
for( unsigned i=0; i<NUM_SAMPLES; i++ ) {
*out_t++ = 1 + *in_t++;
}
}
void process_node( node_t *node ) {
int active_inputs = 0;
int total_argc = node->input_argc + node->output_argc;
printf("node: %p\n", node );
for( int input = 0; input<node->input_argc; ++input ) {
arg_t *arg = &node->args[input];
io_cb_t *io = arg->io;
assert( io->prepare_read != NULL );
io->prepare_read( arg );
if( io->size >= arg->dimensions ) {
printf("node: %p input %d active\n", node, input );
++active_inputs;
}
}
if(( active_inputs == node->input_argc ) && ( node->process != NULL )) {
for( int output = node->input_argc; output<total_argc; ++output ) {
arg_t *arg = &node->args[output];
io_cb_t *io = arg->io;
assert( io->prepare_write != NULL );
io->prepare_write( arg );
}
node->process( node->args );
for( int output = node->input_argc; output<total_argc; ++output ) {
arg_t *arg = &node->args[output];
io_cb_t *io = arg->io;
assert( io->commit_write != NULL );
io->commit_write( arg );
}
for( int input = 0; input<node->input_argc; ++input ) {
arg_t *arg = &node->args[input];
io_cb_t *io = arg->io;
assert( io->commit_read != NULL );
io->commit_read( arg );
}
}
}
void process_graph( node_t *g[] ) {
node_t *last_node = NULL;
for( node_t **n = g; *n != NULL; ++n ) {
node_t *node = *n;
process_node( node );
}
}
static uint8_t fileBuf[128];
file_io_t inputFile = {
.cb.data = &fileBuf,
.data_type = INT16,
};
#define RING_BUFFER_SIZE 8
uint8_t ringA_storage[sizeof(buffer_io_t)+RING_BUFFER_SIZE];
buffer_io_t *ringA = (buffer_io_t*)ringA_storage;
uint8_t ringB_storage[sizeof(buffer_io_t)+RING_BUFFER_SIZE];
buffer_io_t *ringB = (buffer_io_t*)ringB_storage;
node_t adderA = {
.process = simple,
.input_argc = 1,
.output_argc = 1,
.args = {
{ (io_cb_t*)&inputFile, NUM_SAMPLES },
{ (io_cb_t*)ringA_storage, NUM_SAMPLES },
},
};
node_t adderB = {
.process = simple,
.input_argc = 1,
.output_argc = 1,
.args = {
{ (io_cb_t*)ringA_storage, NUM_SAMPLES },
{ (io_cb_t*)ringB_storage, NUM_SAMPLES },
}
};
node_t *graph[] = {
&adderA,
&adderB,
NULL,
};
int main( int argc, char *args[] ) {
file_io_init( &inputFile, "input.csv" );
buffer_io_init( ringA, RING_BUFFER_SIZE );
buffer_io_init( ringB, RING_BUFFER_SIZE );
process_graph( graph );
uint16_t* data = (uint16_t*)bipbuf_peek( &ringB->buf, NUM_SAMPLES );
for( int i=0; i<NUM_SAMPLES; ++i ) {
printf("%d\n", data[i]);
}
return EXIT_SUCCESS;
}