From a7fa14cde006a18cf99c9869b15be5d773cac4b8 Mon Sep 17 00:00:00 2001 From: Charles Lohr Date: Sat, 4 Feb 2017 20:01:18 -0500 Subject: add data server for use with streaming libsurvive data to the internets. --- tools/data_server/Makefile | 7 + tools/data_server/data_server.c | 169 ++++++++++++++++++++ tools/data_server/os_generic.c | 336 ++++++++++++++++++++++++++++++++++++++++ tools/data_server/os_generic.h | 76 +++++++++ 4 files changed, 588 insertions(+) create mode 100644 tools/data_server/Makefile create mode 100644 tools/data_server/data_server.c create mode 100644 tools/data_server/os_generic.c create mode 100644 tools/data_server/os_generic.h (limited to 'tools') diff --git a/tools/data_server/Makefile b/tools/data_server/Makefile new file mode 100644 index 0000000..d15f094 --- /dev/null +++ b/tools/data_server/Makefile @@ -0,0 +1,7 @@ +all : data_server + +data_server : data_server.c os_generic.c + gcc -o $@ $^ -I. -lpthread + +clean : + rm -rf data_server *.o *~ diff --git a/tools/data_server/data_server.c b/tools/data_server/data_server.c new file mode 100644 index 0000000..f602a63 --- /dev/null +++ b/tools/data_server/data_server.c @@ -0,0 +1,169 @@ +//Blatantly ripped off of https://www.cs.cmu.edu/afs/cs/academic/class/15213-f99/www/class26/tcpserver.c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_CONNS 32 + +int SocketList[MAX_CONNS]; + +void error(char *msg) { + perror(msg); + exit(1); +} + +void * SendThread( void * v ) +{ + while(1) + { + int i; + char buff[1024]; + int rd = read( STDIN_FILENO, buff, sizeof( buff ) ); + if( rd <= 0 ) + { + fprintf( stderr, "Error: Can't read data\n" ); + exit( -1 ); + } + for( i = 0; i < MAX_CONNS; i++ ) + { + int sockc = SocketList[i]; + if( sockc == 0 ) continue; + int ss = send( sockc, buff, rd, MSG_DONTWAIT | MSG_NOSIGNAL ); + if( ss < rd ) + { + fprintf( stderr, "Dropped %d\n", i ); + close( SocketList[i] ); + SocketList[i] = 0; + } + } + } +} + +int main( int argc, char ** argv ) +{ + int parentfd; /* parent socket */ + int childfd; /* child socket */ + int portno; /* port to listen on */ + int clientlen; /* byte size of client's address */ + struct sockaddr_in serveraddr; /* server's addr */ + struct sockaddr_in clientaddr; /* client addr */ + struct hostent *hostp; /* client host info */ + char *hostaddrp; /* dotted decimal host addr string */ + int optval; /* flag value for setsockopt */ + int n; /* message byte size */ + + if (argc != 2) { + fprintf(stderr, "usage: %s \n", argv[0]); + exit(1); + } + portno = atoi(argv[1]); + + parentfd = socket(AF_INET, SOCK_STREAM, 0); + if (parentfd < 0) + error("ERROR opening socket"); + + + /* setsockopt: Handy debugging trick that lets + * us rerun the server immediately after we kill it; + * otherwise we have to wait about 20 secs. + * Eliminates "ERROR on binding: Address already in use" error. + */ + optval = 1; + setsockopt(parentfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)); + + + /* + * build the server's Internet address + */ + bzero((char *) &serveraddr, sizeof(serveraddr)); + + /* this is an Internet address */ + serveraddr.sin_family = AF_INET; + + /* let the system figure out our IP address */ + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); + + /* this is the port we will listen on */ + serveraddr.sin_port = htons((unsigned short)portno); + + /* + * bind: associate the parent socket with a port + */ + if (bind(parentfd, (struct sockaddr *) &serveraddr, + sizeof(serveraddr)) < 0) + error("ERROR on binding"); + + /* + * listen: make this socket ready to accept connection requests + */ + if (listen(parentfd, 5) < 0) /* allow 5 requests to queue up */ + error("ERROR on listen"); + + /* + * main loop: wait for a connection request, echo input line, + * then close connection. + */ + clientlen = sizeof(clientaddr); + + OGCreateThread( SendThread, 0 ); + while (1) { + /* + * accept: wait for a connection request + */ + childfd = accept(parentfd, (struct sockaddr *) &clientaddr, &clientlen); + if (childfd < 0) + error("ERROR on accept"); + +// hostp = gethostbyaddr((const char *)&clientaddr.sin_addr.s_addr, +// sizeof(clientaddr.sin_addr.s_addr), AF_INET); +// if (hostp == NULL) +// error("ERROR on gethostbyaddr"); + hostaddrp = inet_ntoa(clientaddr.sin_addr); + //if (hostaddrp == NULL) + //error("ERROR on inet_ntoa\n"); + + + int il; + for( il = 0; il < MAX_CONNS; il++ ) + { + if( SocketList[il] == 0 ) + { + SocketList[il] = childfd; + printf("Conn %s At %d\n", + hostaddrp, il); + break; + } + } + if( il == MAX_CONNS ) + { + close( childfd ); + } + + /* + * read: read input string from the client + */ +/* bzero(buf, BUFSIZE); + n = read(childfd, buf, BUFSIZE); + if (n < 0) + error("ERROR reading from socket"); + printf("server received %d bytes: %s", n, buf); +*/ + /* + * write: echo the input string back to the client + */ +/* n = write(childfd, buf, strlen(buf)); + if (n < 0) + error("ERROR writing to socket"); + + close(childfd);*/ + } +} + diff --git a/tools/data_server/os_generic.c b/tools/data_server/os_generic.c new file mode 100644 index 0000000..7b007e4 --- /dev/null +++ b/tools/data_server/os_generic.c @@ -0,0 +1,336 @@ +#include "os_generic.h" + + +#ifdef USE_WINDOWS + +#include + +void OGSleep( int is ) +{ + Sleep( is*1000 ); +} + +void OGUSleep( int ius ) +{ + Sleep( ius/1000 ); +} + +double OGGetAbsoluteTime() +{ + static LARGE_INTEGER lpf; + LARGE_INTEGER li; + + if( !lpf.QuadPart ) + { + QueryPerformanceFrequency( &lpf ); + } + + QueryPerformanceCounter( &li ); + return (double)li.QuadPart / (double)lpf.QuadPart; +} + + +double OGGetFileTime( const char * file ) +{ + FILETIME ft; + + HANDLE h = CreateFile(file, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL); + + if( h==INVALID_HANDLE_VALUE ) + return -1; + + GetFileTime( h, 0, 0, &ft ); + + CloseHandle( h ); + + return ft.dwHighDateTime + ft.dwLowDateTime; +} + + +og_thread_t OGCreateThread( void * (routine)( void * ), void * parameter ) +{ + return (og_thread_t)CreateThread( 0, 0, (LPTHREAD_START_ROUTINE)routine, parameter, 0, 0 ); +} + +void * OGJoinThread( og_thread_t ot ) +{ + WaitForSingleObject( ot, INFINITE ); + CloseHandle( ot ); +} + +void OGCancelThread( og_thread_t ot ) +{ + CloseHandle( ot ); +} + +og_mutex_t OGCreateMutex() +{ + return CreateMutex( 0, 0, 0 ); +} + +void OGLockMutex( og_mutex_t om ) +{ + WaitForSingleObject(om, INFINITE); +} + +void OGUnlockMutex( og_mutex_t om ) +{ + ReleaseMutex(om); +} + +void OGDeleteMutex( og_mutex_t om ) +{ + CloseHandle( om ); +} + + + +og_sema_t OGCreateSema() +{ + HANDLE sem = CreateSemaphore( 0, 0, 32767, 0 ); + return (og_sema_t)sem; +} + +int OGGetSema( og_sema_t os ) +{ + typedef LONG NTSTATUS; + HANDLE sem = (HANDLE)os; + typedef NTSTATUS (NTAPI *_NtQuerySemaphore)( + HANDLE SemaphoreHandle, + DWORD SemaphoreInformationClass, /* Would be SEMAPHORE_INFORMATION_CLASS */ + PVOID SemaphoreInformation, /* but this is to much to dump here */ + ULONG SemaphoreInformationLength, + PULONG ReturnLength OPTIONAL + ); + + typedef struct _SEMAPHORE_BASIC_INFORMATION { + ULONG CurrentCount; + ULONG MaximumCount; + } SEMAPHORE_BASIC_INFORMATION; + + + static _NtQuerySemaphore NtQuerySemaphore; + SEMAPHORE_BASIC_INFORMATION BasicInfo; + NTSTATUS Status; + + if( !NtQuerySemaphore ) + { + NtQuerySemaphore = (_NtQuerySemaphore)GetProcAddress (GetModuleHandle ("ntdll.dll"), "NtQuerySemaphore"); + if( !NtQuerySemaphore ) + { + return -1; + } + } + + + Status = NtQuerySemaphore (sem, 0 /*SemaphoreBasicInformation*/, + &BasicInfo, sizeof (SEMAPHORE_BASIC_INFORMATION), NULL); + + if (Status == ERROR_SUCCESS) + { + return BasicInfo.CurrentCount; + } + + return -2; +} + +void OGLockSema( og_sema_t os ) +{ + WaitForSingleObject( (HANDLE)os, INFINITE ); +} + +void OGUnlockSema( og_sema_t os ) +{ + ReleaseSemaphore( (HANDLE)os, 1, 0 ); +} + +void OGDeleteSema( og_sema_t os ) +{ + CloseHandle( os ); +} + +#else + +#define _GNU_SOURCE + + +#include +#include +#include +#include +#include +#include + +pthread_mutex_t g_RawMutexStart = PTHREAD_MUTEX_INITIALIZER; + +void OGSleep( int is ) +{ + sleep( is ); +} + +void OGUSleep( int ius ) +{ + usleep( ius ); +} + +double OGGetAbsoluteTime() +{ + struct timeval tv; + gettimeofday( &tv, 0 ); + return ((double)tv.tv_usec)/1000000. + (tv.tv_sec); +} + +double OGGetFileTime( const char * file ) +{ + struct stat buff; + + int r = stat( file, &buff ); + + if( r < 0 ) + { + return -1; + } + + return buff.st_mtime; +} + + + +og_thread_t OGCreateThread( void * (routine)( void * ), void * parameter ) +{ + pthread_t * ret = malloc( sizeof( pthread_t ) ); + int r = pthread_create( ret, 0, routine, parameter ); + if( r ) + { + free( ret ); + return 0; + } + return (og_thread_t)ret; +} + +void * OGJoinThread( og_thread_t ot ) +{ + void * retval; + if( !ot ) + { + return 0; + } + pthread_join( *(pthread_t*)ot, &retval ); + free( ot ); + return retval; +} + +void OGCancelThread( og_thread_t ot ) +{ + if( !ot ) + { + return; + } + pthread_cancel( *(pthread_t*)ot ); + free( ot ); +} + +og_mutex_t OGCreateMutex() +{ + pthread_mutexattr_t mta; + og_mutex_t r = malloc( sizeof( pthread_mutex_t ) ); + + pthread_mutexattr_init(&mta); + pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_RECURSIVE); + + pthread_mutex_init( (pthread_mutex_t *)r, &mta ); + + return r; +} + +void OGLockMutex( og_mutex_t om ) +{ + if( !om ) + { + return; + } + pthread_mutex_lock( (pthread_mutex_t*)om ); +} + +void OGUnlockMutex( og_mutex_t om ) +{ + if( !om ) + { + return; + } + pthread_mutex_unlock( (pthread_mutex_t*)om ); +} + +void OGDeleteMutex( og_mutex_t om ) +{ + if( !om ) + { + return; + } + + pthread_mutex_destroy( (pthread_mutex_t*)om ); + free( om ); +} + + + + +og_sema_t OGCreateSema() +{ + sem_t * sem = malloc( sizeof( sem_t ) ); + sem_init( sem, 0, 0 ); + return (og_sema_t)sem; +} + +int OGGetSema( og_sema_t os ) +{ + int valp; + sem_getvalue( os, &valp ); + return valp; +} + + +void OGLockSema( og_sema_t os ) +{ + sem_wait( os ); +} + +void OGUnlockSema( og_sema_t os ) +{ + sem_post( os ); +} + +void OGDeleteSema( og_sema_t os ) +{ + sem_destroy( os ); + free(os); +} + + + +#endif + +//Date Stamp: 2012-02-15 + +/* + Copyright (c) 2011-2012 <>< Charles Lohr + + Permission is hereby granted, free of charge, to any person obtaining a + copy of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom the + Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of this file. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + diff --git a/tools/data_server/os_generic.h b/tools/data_server/os_generic.h new file mode 100644 index 0000000..7ce22f2 --- /dev/null +++ b/tools/data_server/os_generic.h @@ -0,0 +1,76 @@ +#ifndef _OS_GENERIC_H +#define _OS_GENERIC_H + +#ifdef WIN32 +#define USE_WINDOWS +#endif + + +#ifdef __cplusplus +extern "C" { +#endif + +//Things that shouldn't be macro'd +double OGGetAbsoluteTime(); +void OGSleep( int is ); +void OGUSleep( int ius ); +double OGGetFileTime( const char * file ); + +//Threads and Mutices +typedef void* og_thread_t; +typedef void* og_mutex_t; +typedef void* og_sema_t; + +og_thread_t OGCreateThread( void * (routine)( void * ), void * parameter ); +void * OGJoinThread( og_thread_t ot ); +void OGCancelThread( og_thread_t ot ); + +//Always a recrusive mutex. +og_mutex_t OGCreateMutex(); +void OGLockMutex( og_mutex_t om ); +void OGUnlockMutex( og_mutex_t om ); +void OGDeleteMutex( og_mutex_t om ); + +//Always a semaphore +og_sema_t OGCreateSema(); //Create a semaphore, comes locked initially. NOTE: Max count is 32767 +void OGLockSema( og_sema_t os ); +int OGGetSema( og_sema_t os ); //if <0 there was a failure. +void OGUnlockSema( og_sema_t os ); +void OGDeleteSema( og_sema_t os ); + +#ifdef __cplusplus +}; +#endif + + + +#endif + + +//Date Stamp: 2012-02-15 + +/* + NOTE: Portions (namely the top section) are part of headers from other + sources. + + Copyright (c) 2011-2012 <>< Charles Lohr + + Permission is hereby granted, free of charge, to any person obtaining a + copy of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom the + Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of this file. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + -- cgit v1.2.3