From f311e96400b8740493e9bdafac84e8c1bde5aaa4 Mon Sep 17 00:00:00 2001 From: Dirk Helbig Date: Mon, 9 Dec 2024 17:14:57 +0100 Subject: [PATCH] initial commit --- Makefile | 11 +++ dataflow.c | 260 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 Makefile create mode 100644 dataflow.c diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fdb8f2b --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +CC=gcc +CFLAGS = -Oz -g3 -march=native -std=c99 +CFLAGS += -Ibipbuffer + +all: dataflow + +dataflow: bipbuffer/bipbuffer.o + +.PHONE: clean +clean: + rm -rf *.o dataflow diff --git a/dataflow.c b/dataflow.c new file mode 100644 index 0000000..6561813 --- /dev/null +++ b/dataflow.c @@ -0,0 +1,260 @@ +#include +#include +#include +#include + +#include "bipbuffer.h" + +#define NUM_SAMPLES (5) + +typedef struct io_cb io_cb_t; + +typedef struct { + io_cb_t *io; + int dimensions; +} arg_t; + +typedef void (*data_handler_t)( arg_t *io ); + +struct io_cb { + void *data; + size_t size; + data_handler_t prepare_read; + data_handler_t commit_read; + data_handler_t prepare_write; + data_handler_t commit_write; +}; + +void dummy_handler( arg_t *io ) { + (void)io; +} + +enum data_type_e { + UINT8 = 0, + INT16, +}; + +typedef struct { + io_cb_t cb; + int data_type; + FILE *fd; +} file_io_t; + +enum arguments_e { + IN0 = 0, + IN1, + IN2, + IN3, +}; + +typedef struct { + void (*process)( arg_t args[] ); + int input_argc; + int output_argc; + arg_t args[]; +} node_t; + +void file_prepare_read_handler( arg_t *arg ) { + file_io_t *io = (file_io_t*)arg->io; + char value[128] = {0}; + char end; + int result; + FILE *fd = io->fd; + uint8_t *outu8 = io->cb.data; + int16_t *outi16 = io->cb.data; + do { + result = fscanf( fd, "%127[^,\n]", value ); + if( result == 0 ) { + result = fscanf( fd, "%c", &end ); + if( end == '\n' ) { + break; + } + } else { + int number = atoi(value); + switch( io->data_type ) { + case INT16: { + *outi16++ = number; + io->cb.size += 2; + break; + } + case UINT8: + default: + *outu8++ = number; + io->cb.size++; + break; + } + printf("%d\n", atoi(value)); + } + } while(result != EOF); +} + +void file_io_init( file_io_t *io, const char *filename ) { + FILE *fd = fopen(filename, "r"); + io->fd = NULL; + if( fd == NULL ) { + perror("can't open file"); + return; + } + io->fd = fd; + io_cb_t *cb = (io_cb_t*)io; + cb->prepare_read = file_prepare_read_handler; + cb->commit_read = dummy_handler; + cb->prepare_write = dummy_handler; + cb->commit_write = dummy_handler; +} + +typedef struct { + io_cb_t cb; + bipbuf_t buf; +} buffer_io_t; + + +static void buffer_prepare_read_handler( arg_t *arg ) { + printf("%s\n", __func__); + buffer_io_t *io = (buffer_io_t*)arg->io; + io_cb_t *cb = &io->cb; + cb->data = bipbuf_peek( &io->buf, arg->dimensions ); + cb->size = arg->dimensions; +} + +static void buffer_commit_read_handler( arg_t *arg ) { + printf("%s\n", __func__); + buffer_io_t *io = (buffer_io_t*)arg->io; + io_cb_t *cb = &io->cb; + bipbuf_poll( &io->buf, arg->dimensions ); + cb->data = NULL; + cb->size = 0; +} + +static void buffer_prepare_write_handler( arg_t *arg ) { + printf("%s\n", __func__); + buffer_io_t *io = (buffer_io_t*)arg->io; + io_cb_t *cb = &io->cb; + cb->data = bipbuf_reserve( &io->buf, arg->dimensions ); + cb->size = arg->dimensions; +} + +static void buffer_commit_write_handler( arg_t *arg ) { + printf("%s\n", __func__); + buffer_io_t *io = (buffer_io_t*)arg->io; + io_cb_t *cb = &io->cb; + bipbuf_commit( &io->buf, arg->dimensions ); + cb->data = NULL; + cb->size = 0; +} + +void buffer_io_init( buffer_io_t *io, size_t size ) { + bipbuf_init( &io->buf, size ); + io_cb_t *cb = &io->cb; + cb->prepare_read = buffer_prepare_read_handler; + cb->commit_read = buffer_commit_read_handler; + cb->prepare_write = buffer_prepare_write_handler; + cb->commit_write = buffer_commit_write_handler; +} + + +void simple( arg_t * __restrict args ) { + printf("%s\n", __func__ ); + int16_t* __restrict in_t = args[0].io->data; + int16_t* __restrict out_t = args[1].io->data; + for( unsigned i=0; iinput_argc + node->output_argc; + printf("node: %p\n", node ); + for( int input = 0; inputinput_argc; ++input ) { + arg_t *arg = &node->args[input]; + io_cb_t *io = arg->io; + assert( io->prepare_read != NULL ); + io->prepare_read( arg ); + + if( io->size >= arg->dimensions ) { + printf("node: %p input %d active\n", node, input ); + ++active_inputs; + } + } + if(( active_inputs == node->input_argc ) && ( node->process != NULL )) { + for( int output = node->input_argc; outputargs[output]; + io_cb_t *io = arg->io; + assert( io->prepare_write != NULL ); + io->prepare_write( arg ); + } + node->process( node->args ); + for( int output = node->input_argc; outputargs[output]; + io_cb_t *io = arg->io; + assert( io->commit_write != NULL ); + io->commit_write( arg ); + } + for( int input = 0; inputinput_argc; ++input ) { + arg_t *arg = &node->args[input]; + io_cb_t *io = arg->io; + assert( io->commit_read != NULL ); + io->commit_read( arg ); + } + } +} + +void process_graph( node_t *g[] ) { + node_t *last_node = NULL; + for( node_t **n = g; *n != NULL; ++n ) { + node_t *node = *n; + process_node( node ); + } +} + +static uint8_t fileBuf[128]; +file_io_t inputFile = { + .cb.data = &fileBuf, + .data_type = INT16, +}; + +#define RING_BUFFER_SIZE 8 +uint8_t ringA_storage[sizeof(buffer_io_t)+RING_BUFFER_SIZE]; +buffer_io_t *ringA = (buffer_io_t*)ringA_storage; + +uint8_t ringB_storage[sizeof(buffer_io_t)+RING_BUFFER_SIZE]; +buffer_io_t *ringB = (buffer_io_t*)ringB_storage; + +node_t adderA = { + .process = simple, + .input_argc = 1, + .output_argc = 1, + .args = { + { (io_cb_t*)&inputFile, NUM_SAMPLES }, + { (io_cb_t*)ringA_storage, NUM_SAMPLES }, + }, +}; + +node_t adderB = { + .process = simple, + .input_argc = 1, + .output_argc = 1, + .args = { + { (io_cb_t*)ringA_storage, NUM_SAMPLES }, + { (io_cb_t*)ringB_storage, NUM_SAMPLES }, + } +}; + +node_t *graph[] = { + &adderA, + &adderB, + NULL, +}; + +int main( int argc, char *args[] ) { + file_io_init( &inputFile, "input.csv" ); + buffer_io_init( ringA, RING_BUFFER_SIZE ); + buffer_io_init( ringB, RING_BUFFER_SIZE ); + process_graph( graph ); + uint16_t* data = (uint16_t*)bipbuf_peek( &ringB->buf, NUM_SAMPLES ); + for( int i=0; i