From f21352094c0949b643721ee5387fefae0cdab507 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 11:31:56 +0100 Subject: Move hexparse to utils and add default PRBS poly --- src/ConfigParser.cpp | 18 ------------------ 1 file changed, 18 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 8e5fed1..e68f98f 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -112,24 +112,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, std::shared_ptr ensemble, const string& subchanuid); -/* a helper class to parse hexadecimal ids */ -static int hexparse(std::string input) -{ - int value; - if (input.find("0x") == 0) { - value = strtoll(input.c_str() + 2, nullptr, 16); - } - else { - value = strtoll(input.c_str(), nullptr, 10); - } - - if (errno == ERANGE) { - throw runtime_error("hex conversion: value out of range"); - } - - return value; -} - static uint16_t get_announcement_flag_from_ptree( boost::property_tree::ptree& pt ) -- cgit v1.2.3 From 5aa8118d99b104402bf044e57c473a7156dc2314 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 12:08:29 +0100 Subject: Remove bridge format and input --- configure.ac | 8 +- src/ConfigParser.cpp | 11 - src/DabMux.cpp | 1 - src/Makefile.am | 2 - src/bridge.c | 515 ---------------------------------------------- src/bridge.h | 112 ---------- src/dabInputBridgeUdp.cpp | 129 ------------ src/dabInputBridgeUdp.h | 49 ----- src/utils.cpp | 3 - 9 files changed, 1 insertion(+), 829 deletions(-) delete mode 100644 src/bridge.c delete mode 100644 src/bridge.h delete mode 100644 src/dabInputBridgeUdp.cpp delete mode 100644 src/dabInputBridgeUdp.h (limited to 'src/ConfigParser.cpp') diff --git a/configure.ac b/configure.ac index 39d3f19..06a2249 100644 --- a/configure.ac +++ b/configure.ac @@ -198,12 +198,6 @@ AC_ARG_ENABLE([format_raw], [], [enable_format_raw=yes]) AS_IF([test "x$enable_format_raw" = "xyes"], [AC_DEFINE(HAVE_FORMAT_RAW, [1], [Define if RAW format is enabled])]) -# BRIDGE -AC_ARG_ENABLE([format_bridge], - [AS_HELP_STRING([--enable-format-bridge], [Enable BRIDGE format])], - [], [enable_format_bridge=no]) -AS_IF([test "x$enable_format_bridge" = "xno"], - [AC_DEFINE(HAVE_FORMAT_BRIDGE, [1], [Define if BRIDGE format is enabled])]) # MPEG AC_ARG_ENABLE([format_mpeg], [AS_HELP_STRING([--disable-format-mpeg], [Disable MPEG format])], @@ -266,7 +260,7 @@ echo echo "Formats:" enabled="" disabled="" -for format in raw bridge mpeg packet dabplus dmb epm +for format in raw mpeg packet dabplus dmb epm do eval var=\$enable_format_$format AS_IF([test "x$var" = "xyes"], diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index e68f98f..aee86fc 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -60,7 +60,6 @@ #include "dabInputEnhancedPacketFile.h" #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" -#include "dabInputBridgeUdp.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" @@ -714,16 +713,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, throw runtime_error(ss.str()); } #endif // defined(HAVE_FORMAT_DABPLUS) - } else if (type == "bridge") { - // TODO default proto should be udp:// - if (0) { -#if defined(HAVE_FORMAT_BRIDGE) -#if defined(HAVE_INPUT_UDP) - } else if (proto == "udp") { - operations = dabInputBridgeUdpOperations; -#endif // defined(HAVE_INPUT_UDP) -#endif // defined(HAVE_FORMAT_BRIDGE) - } } else if (type == "data" and proto == "prbs") { input_is_old_style = false; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index f72ea8d..a4605e7 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -104,7 +104,6 @@ typedef DWORD32 uint32_t; #include "dabInputEnhancedPacketFile.h" #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" -#include "dabInputBridgeUdp.h" #include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" diff --git a/src/Makefile.am b/src/Makefile.am index dfcdb12..b23e71c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -48,7 +48,6 @@ odr_dabmux_LDADD =$(ZMQ_LIBS) $(CURL_LIBS) \ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ DabMultiplexer.cpp DabMultiplexer.h \ dabInput.h dabInput.cpp \ - dabInputBridgeUdp.h dabInputBridgeUdp.cpp \ dabInputDabplusFifo.h dabInputDabplusFifo.cpp \ dabInputDabplusFile.h dabInputDabplusFile.cpp \ dabInputDmbFile.h dabInputDmbFile.cpp \ @@ -93,7 +92,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ TcpSocket.h TcpSocket.cpp \ UdpSocket.h UdpSocket.cpp \ ThreadsafeQueue.h \ - bridge.h bridge.c \ crc.h crc.c \ fig/FIG.h fig/FIG.cpp \ fig/FIG0.h fig/FIG0structs.h \ diff --git a/src/bridge.c b/src/bridge.c deleted file mode 100644 index d66a7b2..0000000 --- a/src/bridge.c +++ /dev/null @@ -1,515 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include -#ifdef _WIN32 -# include -#else -# include -#endif // _WIN32 -#include -#include "bridge.h" -#include "crc.h" - -#include -#include "PcDebug.h" - -#ifdef _WIN32 -# ifdef _DEBUG - int bridgeVerbosity = 0; -# endif -#else -# ifdef DEBUG - int bridgeVerbosity = 0; -# endif -#endif - -void printStats(struct bridgeInfo* info, FILE* out) -{ - struct bridgeStats stats = getStats(info); - fprintf(out, "frames : %lu\n", stats.frames); - fprintf(out, " valids : %lu\n", stats.valids); - fprintf(out, " invalids : %lu\n", stats.invalids); - fprintf(out, " bytes : %lu\n", stats.bytes); - fprintf(out, " packets : %lu\n", stats.packets); - fprintf(out, " errors : %lu\n", stats.errors); - fprintf(out, " missings : %lu\n", stats.missings); - fprintf(out, " dropped : %lu\n", stats.dropped); - fprintf(out, " crc : %lu\n", stats.crc); - fprintf(out, " overruns : %lu\n", stats.overruns); -} - - -void resetStats(struct bridgeInfo* info) -{ - memset(&info->stats, 0, sizeof(info->stats)); -} - - -struct bridgeStats getStats(struct bridgeInfo* info) -{ - return info->stats; -} - - -void bridgeInitInfo(struct bridgeInfo* info) -{ - memset(info, 0, sizeof(*info)); - info->transmitted = -8; -}; - - -int writePacket(void* dataIn, int sizeIn, void* dataOut, int sizeOut, - struct bridgeInfo* info) -{ - static struct bridgeHdr header = { 0 }; - - PDEBUG4_VERBOSE(1, bridgeVerbosity, "writePacket\n sizeIn: %i, sizeOut: %i, " - "offset: %i, transmitted: %i\n", - sizeIn, sizeOut, info->offset, info->transmitted); - - assert(info->transmitted < sizeIn); - - if ((info->offset == 0) && (sizeIn > 0)) { - ((unsigned short*)dataOut)[0] = 0xb486; - info->offset = 2; - } - if (sizeIn == 0) { - memset((unsigned char*)dataOut + info->offset, 0, sizeOut - info->offset); - info->offset = 0; - info->transmitted = -8; - PDEBUG1_VERBOSE(1, bridgeVerbosity, " return %i (sizeIn == 0)\n", - sizeOut); - return 0; - } - - while (info->offset < sizeOut) { - switch (info->transmitted) { - case (-8): - ((unsigned char*)dataOut)[info->offset++] = 0xcb; - ++info->transmitted; - break; - case (-7): - ((unsigned char*)dataOut)[info->offset++] = 0x28; - ++info->transmitted; - break; - case (-6): - header.size = htons((unsigned short)sizeIn); - header.crc = htons((unsigned short)(crc16(0xffff, &header, 4) ^ 0xffff)); - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[0]; - ++info->transmitted; - break; - case (-5): - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[1]; - ++info->transmitted; - break; - case (-4): - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[2]; - ++info->transmitted; - break; - case (-3): - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[3]; - ++info->transmitted; - break; - case (-2): - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[4]; - ++info->transmitted; - break; - case (-1): - ((unsigned char*)dataOut)[info->offset++] = ((char*)&header)[5]; - ++info->transmitted; - header.seqNb = htons((unsigned short)(ntohs(header.seqNb) + 1)); - break; - default: - ((unsigned char*)dataOut)[info->offset++] = - ((unsigned char*)dataIn)[info->transmitted++]; - if (info->transmitted == sizeIn) { - PDEBUG2_VERBOSE(1, bridgeVerbosity, - " Packet done, %i bytes at offset %i\n", - info->transmitted, info->offset); - PDEBUG1_VERBOSE(1, bridgeVerbosity, - " return %i (sizeIn == transmitted)\n", info->offset); - info->transmitted = -8; - return info->offset; - } - } - } - - PDEBUG1_VERBOSE(1, bridgeVerbosity, " return %i (offset >= sizeOut)\n", - info->offset); - info->offset = 0; - return 0; -} - - -int getPacket(void* dataIn, int sizeIn, void* dataOut, int sizeOut, - struct bridgeInfo* info, char async) -{ - unsigned char* in = (unsigned char*)dataIn; - unsigned char* out = (unsigned char*)dataOut; - unsigned char ch; - unsigned short crc; - unsigned short diff; - - PDEBUG3_VERBOSE(1, bridgeVerbosity, - "getPacket\n pos\t%i\n state\t%i\n received\t%i\n", - info->pos, info->state, info->received); - - if (info->pos == 0) { - ++info->stats.frames; - if (((unsigned short*)dataIn)[0] != 0xb486) { - if (((unsigned short*)dataIn)[0] != 0) { - ++info->stats.invalids; - printf("WARNING: processing frame with invalid magic " - "number!\n"); - } else { - PDEBUG0_VERBOSE(1, bridgeVerbosity, - "getPacket: not a valid frame\n"); - return 0; - } - } else { - PDEBUG0_VERBOSE(2, bridgeVerbosity, "Valid frame\n"); - info->pos += 2; - ++info->stats.valids; - } - info->stats.bytes += sizeIn; - } - while (info->pos < sizeIn) { - ch = in[info->pos++]; - switch (info->state) { - case 0: // sync search - info->sync <<= 8; - info->sync |= ch; - if (info->sync == 0xcb28) { - PDEBUG0_VERBOSE(2, bridgeVerbosity, "Sync found\n"); - ++info->stats.packets; - info->received = 0; - info->state = 1; - } - if (info->sync == 0) { // Padding - info->pos = 0; - return 0; - } - break; - case 1: // header search - ((char*)&info->header)[info->received++] = ch; - if (info->received == sizeof(struct bridgeHdr)) { - PDEBUG0_VERBOSE(2, bridgeVerbosity, "Header found\n"); - out = (unsigned char*)dataOut; - info->received = 0; - info->state = 2; - crc = crc16(0xffff, &info->header, 4); - crc ^= 0xffff; - info->header.size = ntohs(info->header.size); - info->header.seqNb = ntohs(info->header.seqNb); - info->header.crc = ntohs(info->header.crc); - PDEBUG4_VERBOSE(2, bridgeVerbosity, - " size\t%i\n seq\t%i\n crc\t0x%.4x (0x%.4x)\n", - info->header.size, info->header.seqNb, - info->header.crc, crc); - if (crc != info->header.crc) { - PDEBUG0_VERBOSE(2, bridgeVerbosity, "CRC error\n"); - ++info->stats.errors; - ++info->stats.crc; - info->state = 0; - if (info->pos < sizeof(struct bridgeHdr) + 2 + 2) { - info->pos = 2; - } - } else { - if (!info->initSeq) { - info->lastSeq = info->header.seqNb; - info->initSeq = 1; - } else { - if (info->header.seqNb > info->lastSeq) { - diff = (info->header.seqNb - info->lastSeq) - 1; - } else { - diff = ((short)info->lastSeq - - (short)info->header.seqNb) - 1; - } - info->stats.errors += diff; - info->stats.missings += diff; - info->lastSeq = info->header.seqNb; - } - } - } - break; - case 2: // data - out[info->received++] = ch; - if (info->received == info->header.size) { - PDEBUG0_VERBOSE(2, bridgeVerbosity, "data found\n"); - info->state = 0; - return info->received; - } - if (info->received == sizeOut) { - PDEBUG1_VERBOSE(1, bridgeVerbosity, "To much data: %i\n", - info->received); - ++info->stats.errors; - ++info->stats.overruns; - info->sync = 0; - info->state = 0; - return -1; - } - break; - case 3: // Padding or sync - if (ch == 0) { // Padding - info->pos = 0; - return 0; - } - if (ch != 0xcb) { // error - info->sync = ch; - info->state = 0; - } else { - info->state = 4; - } - break; - case 4: // Low byte sync - if (ch != 28) { // error - info->sync <<= 8; - info->sync |= ch; - info->state = 0; - } else { - info->state = 2; - } - break; - } - } - info->pos = 0; - return 0; -} - - -void dump(void* data, int size, FILE* stream) -{ - int i; - fprintf(stream, "%i bytes\n", size); - for (i = 0; i < size; ++i) { - fprintf(stream, " 0x%.2x", ((unsigned char*)data)[i]); - if (i % 8 == 7) - fprintf(stream, "\n"); - } - fprintf(stream, "\n"); -} - - -#ifdef BRIDGE_TEST -#include - - -int test(const unsigned char* data) -{ - unsigned char bridgeSize = data[0]; - unsigned char nbInput = data[1]; - unsigned char nbBridge = 1; - struct bridgeInfo info; - - int i, j; - int index = 0; - int max = 0; - int nbBytes; - - unsigned char** inputData; - unsigned char** bridgeData; - unsigned char* outputData; - - inputData = malloc(nbInput * 4); - bridgeData = malloc(nbBridge * 4); - for (i = 0; i < nbInput; ++i) { - if (data[i + 2] > 0) - inputData[i] = malloc(data[i + 2]); - if (data[i + 2] > max) { - max = data[i + 2]; - } - for (j = 0; j < data[i + 2]; ++j) { - inputData[i][j] = index++; - } - } - bridgeData[0] = malloc(bridgeSize); - memset(bridgeData[0], 0, bridgeSize); - outputData = malloc(max); - bridgeInitInfo(&info); - - // Write packets - index = 0; - while (1) { - if (data[index + 2] == 0) { - if (++index == nbInput) - break; - } - while ((nbBytes = writePacket(inputData[index], data[index + 2], - bridgeData[nbBridge - 1], bridgeSize, &info)) - != 0) { - if (++index == nbInput) { - break; - } - } - if (index == nbInput) - break; - // TODO check null - bridgeData = realloc(bridgeData, (++nbBridge) * 4); - bridgeData[nbBridge - 1] = malloc(bridgeSize); - memset(bridgeData[nbBridge - 1], 0, bridgeSize); - } -// if (nbBytes != bridgeSize) { - writePacket(NULL, 0, bridgeData[nbBridge - 1], bridgeSize, &info); -// } - - // read packets - index = 0; - for (i = 0; i < nbBridge; ++i) { - while ((nbBytes = getPacket(bridgeData[i], bridgeSize, outputData, max, - &info, 0)) != 0) { - while (data[index + 2] == 0) { - ++index; - } - if (nbBytes != data[index + 2]) { - printf("FAILED\n"); - printf("Invalid size at bridge %i, data %i: %i != %i\n", - i, index, nbBytes, data[index + 2]); - for (i = 0; i < nbInput; ++i) { - printf("Input %i: ", i); - dump(inputData[i], data[i + 2], stdout); - } - for (i = 0; i < nbBridge; ++i) { - printf("Bridge %i: ", i); - dump(bridgeData[i], bridgeSize, stdout); - } - printf("Output %i: ", index); - dump(outputData, nbBytes, stdout); - return -1; - } - if (memcmp(outputData, inputData[index], data[index + 2]) != 0) { - printf("FAILED\n"); - printf("output != input\n"); - for (i = 0; i < nbInput; ++i) { - printf("Input %i: ", i); - dump(inputData[i], data[i + 2], stdout); - } - for (i = 0; i < nbBridge; ++i) { - printf("Bridge %i: ", i); - dump(bridgeData[i], bridgeSize, stdout); - } - printf("Output %i: ", index); - dump(outputData, nbBytes, stdout); - } - ++index; - } - } - - printf("SUCCESS\n"); - - for (i = 0; i < nbInput; ++i) { - if (data[i + 2] > 0) - free(inputData[i]); - } - free(inputData); - free(outputData); - for (i = 0; i < nbBridge; ++i) { - free(bridgeData[i]); - } - free(bridgeData); - - return -1; -} - - -int main(int argc, char* argv[]) -{ - int i; - // test: bridgesize, nbinput [, input1, input2, ... ] - const unsigned char complete[] = { 32, 1, 16 }; - const unsigned char split[] = { 32, 1, 48 }; - const unsigned char twice[] = {32, 2, 8, 4 }; - const unsigned char secondSplit[] = { 32, 2, 16, 16 }; - const unsigned char headerSplit[][4] = { - { 32, 2, 23, 16 }, - { 32, 2, 22, 16 }, - { 32, 2, 21, 16 }, - { 32, 2, 20, 16 }, - { 32, 2, 19, 16 }, - { 32, 2, 18, 16 }, - { 32, 2, 17, 16 } - }; - const unsigned char two[] = { 32, 3, 16, 0, 16 }; - const unsigned char doubleSplit[] = { 32, 2, 32, 32 }; - const unsigned char full[] = { 32, 2, 24, 12 }; - const unsigned char empty[] = { 32, 3, 0, 0, 5 }; - -#ifdef _WIN32 - #ifdef _DEBUG - bridgeVerbosity = argc - 1; - #endif // DEBUG -#else - #ifdef DEBUG - bridgeVerbosity = argc - 1; - #endif // DEBUG -#endif // _WIN32 - - printf("Complete: "); - test(complete); - // printStats(stdout); - fflush(stdout); - - printf("split: "); - test(split); - // printStats(stdout); - fflush(stdout); - - printf("twice: "); - test(twice); - // printStats(stdout); - fflush(stdout); - - printf("second split: "); - test(secondSplit); - // printStats(stdout); - fflush(stdout); - - for (i = 0; i < sizeof(headerSplit) / sizeof(headerSplit[0]); ++i) { - printf("headerSplit%i: ", i); - test(headerSplit[i]); - // printStats(stdout); - fflush(stdout); - } - - printf("two: "); - test(two); - // printStats(stdout); - fflush(stdout); - - printf("doubleSplit: "); - test(doubleSplit); - // printStats(stdout); - fflush(stdout); - - printf("full: "); - test(full); - // printStats(stdout); - fflush(stdout); - - printf("empty: "); - test(empty); - // printStats(stdout); - fflush(stdout); - - return 0; -} - -#endif // BRIDGE_TEST diff --git a/src/bridge.h b/src/bridge.h deleted file mode 100644 index 0bae007..0000000 --- a/src/bridge.h +++ /dev/null @@ -1,112 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef _BRIDGE -#define _BRIDGE - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include - - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef _WIN32 -# ifdef _DEBUG - extern int bridgeVerbosity; -# endif // _DEBUG -#else -# ifndef DEBUG -# ifndef NDEBUG -# define NDEBUG -# endif -# else - extern int bridgeVerbosity; -# endif // DEBUG -#endif // _WIN32 - -struct bridgeStats { - unsigned long frames; // Number of frames analyzed - unsigned long valids; // Nb of frames with a good magic number - unsigned long invalids; // Nb of frames with a good magic number - unsigned long bytes; // Nb of data bytes - unsigned long packets; // Nb of packets found - unsigned long errors; - unsigned long missings; - unsigned long dropped; - unsigned long crc; // Nb of crc errors - unsigned long overruns; // Nb of packet too big -}; - - -struct bridgeHdr { - unsigned short size; - unsigned short seqNb; - unsigned short crc; -}; - - -struct bridgeInfo { - // Tx - int transmitted; // Nb bytes written - int offset; // Offset of the next byte to write - // Rx - int received; - int pos; - int state; - unsigned short lastSeq; - unsigned short sync; - char initSeq; - // General - struct bridgeHdr header; - struct bridgeStats stats; -}; - - - -void dump(void* data, int size, FILE* stream); - -/* - * Example of usae: - * if (data.length == 0) - * read(data) - * while (writePacket() != 0) - * read(read) - * ... - */ -int writePacket(void* dataIn, int sizeIn, void* dataOut, int sizeOut, struct bridgeInfo* info); - -int getPacket(void* dataIn, int sizeIn, void* dataOut, int sizeOut, struct bridgeInfo* info, char async); - -void bridgeInitInfo(struct bridgeInfo* info); -struct bridgeStats getStats(struct bridgeInfo* info); -void resetStats(struct bridgeInfo* info); -void printStats(struct bridgeInfo* info, FILE* out); - -#ifdef __cplusplus -} -#endif - -#endif // _BRIDGE diff --git a/src/dabInputBridgeUdp.cpp b/src/dabInputBridgeUdp.cpp deleted file mode 100644 index fdf3d1f..0000000 --- a/src/dabInputBridgeUdp.cpp +++ /dev/null @@ -1,129 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "dabInputBridgeUdp.h" -#include "dabInputUdp.h" -#include "bridge.h" - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_UDP - -struct dabInputBridgeUdpData { - dabInputUdpData* udpData; - bridgeInfo* info; -}; - - -struct dabInputOperations dabInputBridgeUdpOperations = { - dabInputBridgeUdpInit, - dabInputBridgeUdpOpen, - dabInputSetbuf, - NULL, - NULL, - NULL, - dabInputBridgeUdpRead, - dabInputSetbitrate, - dabInputBridgeUdpClose, - dabInputBridgeUdpClean, - NULL -}; - - -int dabInputBridgeUdpInit(void** args) -{ - dabInputBridgeUdpData* input = new dabInputBridgeUdpData; - dabInputUdpInit((void**)&input->udpData); - input->info = new bridgeInfo; - bridgeInitInfo(input->info); - *args = input; - - return 0; -} - - -int dabInputBridgeUdpOpen(void* args, const char* inputName) -{ - dabInputBridgeUdpData* input = (dabInputBridgeUdpData*)args; - - return dabInputUdpOpen(input->udpData, inputName); -} - - -int dabInputBridgeUdpRead(dabInputOperations* ops, void* args, void* buffer, int size) -{ - int nbBytes = 0; - dabInputBridgeUdpData* input = (dabInputBridgeUdpData*)args; - dabInputFifoStats* stats = (dabInputFifoStats*)&input->udpData->stats; - - stats->frameRecords[stats->frameCount].curSize = 0; - stats->frameRecords[stats->frameCount].maxSize = size; - - if (input->udpData->packet->getSize() == 0) { - input->udpData->socket->receive(*input->udpData->packet); - } - while ((nbBytes = writePacket(input->udpData->packet->getData(), - input->udpData->packet->getSize(), buffer, size, - input->info)) - != 0) { - stats->frameRecords[stats->frameCount].curSize = nbBytes; - input->udpData->socket->receive(*input->udpData->packet); - } - - if (input->udpData->packet->getSize() != 0) { - stats->frameRecords[stats->frameCount].curSize = size; - } - - if (++stats->frameCount == NB_RECORDS) { - etiLog.log(info, "Data subchannel usage: (%i)", - stats->id); - for (int i = 0; i < stats->frameCount; ++i) { - etiLog.log(info, " %i/%i", - stats->frameRecords[i].curSize, - stats->frameRecords[i].maxSize); - } - etiLog.log(info, "\n"); - stats->frameCount = 0; - } - return size; -} - - -int dabInputBridgeUdpClose(void* args) -{ - dabInputBridgeUdpData* input = (dabInputBridgeUdpData*)args; - - return dabInputUdpClose(input->udpData); -} - - -int dabInputBridgeUdpClean(void** args) -{ - dabInputBridgeUdpData* input = (dabInputBridgeUdpData*)(*args); - dabInputUdpClean((void**)&input->udpData); - delete input->info; - delete input; - return 0; -} - - -# endif // HAVE_INPUT_UDP -#endif // HAVE_FORMAT_BRIDGE - diff --git a/src/dabInputBridgeUdp.h b/src/dabInputBridgeUdp.h deleted file mode 100644 index ed00952..0000000 --- a/src/dabInputBridgeUdp.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef DAB_INPUT_BRIDGE_UDP_H -#define DAB_INPUT_BRIDGE_UDP_H - - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif -#include "dabInput.h" - - -#ifdef HAVE_FORMAT_BRIDGE -# ifdef HAVE_INPUT_UDP - - -extern struct dabInputOperations dabInputBridgeUdpOperations; - -int dabInputBridgeUdpInit(void** args); -int dabInputBridgeUdpOpen(void* args, const char* inputName); -int dabInputBridgeUdpRead(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputBridgeUdpClose(void* args); -int dabInputBridgeUdpClean(void** args); - - -# endif -#endif - - -#endif // DAB_INPUT_BRIDGE_UDP_H diff --git a/src/utils.cpp b/src/utils.cpp index f0df772..5e7cf7a 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -124,9 +124,6 @@ void header_message() #if defined(HAVE_FORMAT_RAW) " raw" << #endif -#if defined(HAVE_FORMAT_BRIDGE) - " bridge" << -#endif #if defined(HAVE_FORMAT_MPEG) " mpeg" << #endif -- cgit v1.2.3 From 7ab5c97051108d9d752896798efe0886573e730e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 12:18:30 +0100 Subject: Remove dabInputTest --- configure.ac | 14 ++----- src/ConfigParser.cpp | 7 ---- src/DabMux.cpp | 1 - src/Makefile.am | 1 - src/dabInputTest.cpp | 105 --------------------------------------------------- src/dabInputTest.h | 50 ------------------------ src/utils.cpp | 3 -- 7 files changed, 4 insertions(+), 177 deletions(-) delete mode 100644 src/dabInputTest.cpp delete mode 100644 src/dabInputTest.h (limited to 'src/ConfigParser.cpp') diff --git a/configure.ac b/configure.ac index 06a2249..47523de 100644 --- a/configure.ac +++ b/configure.ac @@ -115,12 +115,6 @@ AC_ARG_ENABLE([input_file], AS_IF([test "x$enable_input_file" = "xyes"], [AC_DEFINE(HAVE_INPUT_FILE, [1], [Define if FILE input is enabled])]) -# TEST -AC_ARG_ENABLE([input_test], - [AS_HELP_STRING([--enable-input-test], [Enable TEST input])], - [], [enable_input_test=no]) -AS_IF([test "x$enable_input_test" = "xyes"], - [AC_DEFINE(HAVE_INPUT_TEST, [1], [Define if TEST input is enabled])]) # UDP AC_ARG_ENABLE([input_udp], [AS_HELP_STRING([--enable-input-udp], [Enable UDP input])], @@ -246,12 +240,12 @@ echo echo "Inputs:" enabled="prbs" disabled="" -for output in test udp fifo file +for input in udp fifo file do - eval var=\$enable_input_$output + eval var=\$enable_input_$input AS_IF([test "x$var" = "xyes"], - [enabled="$enabled $output"], - [disabled="$disabled $output"]) + [enabled="$enabled $input"], + [disabled="$disabled $input"]) done echo " Enabled: $enabled" echo " Disabled: $disabled" diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index aee86fc..d5c55ae 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -60,7 +60,6 @@ #include "dabInputEnhancedPacketFile.h" #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" -#include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" #include "dabInputRawFifo.h" @@ -742,12 +741,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; -#if defined(HAVE_INPUT_TEST) && defined(HAVE_FORMAT_RAW) - } else if (type == "test") { - subchan->type = subchannel_type_t::DataDmb; - subchan->bitrate = DEFAULT_DATA_BITRATE; - operations = dabInputTestOperations; -#endif // defined(HAVE_INPUT_TEST)) && defined(HAVE_FORMAT_RAW) #ifdef HAVE_FORMAT_PACKET } else if (type == "packet") { subchan->type = subchannel_type_t::Packet; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index a4605e7..79a8573 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -104,7 +104,6 @@ typedef DWORD32 uint32_t; #include "dabInputEnhancedPacketFile.h" #include "dabInputEnhancedFifo.h" #include "dabInputUdp.h" -#include "dabInputTest.h" #include "dabInputPrbs.h" #include "dabInputRawFile.h" #include "dabInputRawFifo.h" diff --git a/src/Makefile.am b/src/Makefile.am index b23e71c..408c86e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -62,7 +62,6 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ dabInputPrbs.h dabInputPrbs.cpp \ dabInputRawFile.h dabInputRawFile.cpp \ dabInputRawFifo.h dabInputRawFifo.cpp \ - dabInputTest.h dabInputTest.cpp \ dabInputUdp.h dabInputUdp.cpp \ dabInputZmq.h dabInputZmq.cpp \ dabOutput/dabOutput.h \ diff --git a/src/dabInputTest.cpp b/src/dabInputTest.cpp deleted file mode 100644 index fd4fc59..0000000 --- a/src/dabInputTest.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "dabInputTest.h" - -#include -#ifdef _WIN32 -#else -# include -#endif - - -#ifdef HAVE_FORMAT_RAW -# ifdef HAVE_INPUT_TEST - - -struct dabInputTestData { - unsigned long counter; -}; - - -struct dabInputOperations dabInputTestOperations = { - dabInputTestInit, - dabInputTestOpen, - NULL, - NULL, - NULL, - NULL, - dabInputTestRead, - dabInputTestSetbitrate, - dabInputTestClose, - dabInputTestClean, - NULL -}; - - -int dabInputTestInit(void** args) -{ - dabInputTestData* input = new dabInputTestData; - memset(input, 0, sizeof(*input)); - input->counter = 0; - *args = input; - return 0; -} - - -int dabInputTestOpen(void* args, const char* inputName) -{ - return 0; -} - - -int dabInputTestRead(dabInputOperations* ops, void* args, void* buffer, int size) -{ - dabInputTestData* input = (dabInputTestData*)args; - char* data = (char*)buffer; - - *((long*)buffer) = htonl(input->counter++); - for (int i = sizeof(input->counter); i < size; ++i) { - data[i] = i; - } - return size; -} - - -int dabInputTestSetbitrate(dabInputOperations* ops, void* args, int bitrate) -{ - return bitrate; -} - - -int dabInputTestClose(void* args) -{ - return 0; -} - - -int dabInputTestClean(void** args) -{ - dabInputTestData* input = (dabInputTestData*)(*args); - delete input; - return 0; -} - - -# endif -#endif diff --git a/src/dabInputTest.h b/src/dabInputTest.h deleted file mode 100644 index 34ebc05..0000000 --- a/src/dabInputTest.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef DAB_INPUT_TEST_H -#define DAB_INPUT_TEST_H - - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif -#include "dabInput.h" - - -#ifdef HAVE_FORMAT_RAW -# ifdef HAVE_INPUT_TEST - - -extern struct dabInputOperations dabInputTestOperations; - -int dabInputTestInit(void** args); -int dabInputTestOpen(void* args, const char* inputName); -int dabInputTestRead(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputTestSetbitrate(dabInputOperations* ops, void* args, int bitrate); -int dabInputTestClose(void* args); -int dabInputTestClean(void** args); - - -# endif -#endif - - -#endif // DAB_INPUT_TEST_H diff --git a/src/utils.cpp b/src/utils.cpp index 5e7cf7a..e26389d 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -103,9 +103,6 @@ void header_message() std::cerr << "Input URLs supported:" << std::endl << " prbs" << -#if defined(HAVE_INPUT_TEST) - " test" << -#endif #if defined(HAVE_INPUT_UDP) " udp" << #endif -- cgit v1.2.3 From ea9f9c8a241b7cc74fce905b6839398737481efc Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 12:45:42 +0100 Subject: Remove condition about test input in config parser --- src/ConfigParser.cpp | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index d5c55ae..3bcf9fc 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -594,20 +594,19 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, throw runtime_error(ss.str()); } - string inputUri = ""; - // fail if no inputUri given unless type is test - if (type != "test") { - inputUri = pt.get("inputuri", ""); - - if (inputUri == "") { - try { - inputUri = pt.get("inputfile"); - } - catch (ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; - throw runtime_error(ss.str()); - } + /* Both inputfile and inputuri are supported, and are equivalent. + * inputuri has precedence + */ + string inputUri = pt.get("inputuri", ""); + + if (inputUri == "") { + try { + inputUri = pt.get("inputfile"); + } + catch (ptree_error &e) { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; + throw runtime_error(ss.str()); } } -- cgit v1.2.3 From bbf23c4d8acb28eba6da018271330d674b8e8dd4 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 12:56:02 +0100 Subject: Make compilation possible with all formats disabled --- src/ConfigParser.cpp | 4 +++- src/dabInputEnhancedPacketFile.cpp | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 3bcf9fc..7e3f855 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -728,8 +728,10 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, } else if (proto == "file") { operations = dabInputRawFileOperations; #endif +#if defined(HAVE_INPUT_FIFO) } else if (proto == "fifo") { operations = dabInputRawFifoOperations; +#endif } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << @@ -855,8 +857,8 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, " non-blocking I/O only available for audio or packet services!"; throw runtime_error(ss.str()); } -#endif // defined(HAVE_INPUT_FIFO) && defined(HAVE_INPUT_FILE) } +#endif // defined(HAVE_INPUT_FIFO) && defined(HAVE_INPUT_FILE) /* Get id */ diff --git a/src/dabInputEnhancedPacketFile.cpp b/src/dabInputEnhancedPacketFile.cpp index 3b12e0b..5c26188 100644 --- a/src/dabInputEnhancedPacketFile.cpp +++ b/src/dabInputEnhancedPacketFile.cpp @@ -23,6 +23,9 @@ #include "dabInputPacketFile.h" #include "dabInputFile.h" +#ifdef HAVE_FORMAT_PACKET +# ifdef HAVE_FORMAT_EPM +# ifdef HAVE_INPUT_FILE struct dabInputOperations dabInputEnhancedPacketFileOperations = { dabInputEnhancedFileInit, @@ -51,3 +54,7 @@ int dabInputEnhancedFileInit(void** args) return 0; } + +# endif +# endif +#endif -- cgit v1.2.3 From 128768f7fd719eb455a946a0f716d7128b4ded63 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 15:29:31 +0100 Subject: Start reworking inputs, break all but Prbs and ZMQ --- src/ConfigParser.cpp | 178 +++++++++++++-- src/DabMux.cpp | 18 +- src/DabMux.h | 7 +- src/Makefile.am | 20 +- src/MuxElements.h | 4 +- src/dabInput.h | 15 -- src/dabInputPrbs.cpp | 98 -------- src/dabInputPrbs.h | 52 ----- src/dabInputZmq.cpp | 619 -------------------------------------------------- src/dabInputZmq.h | 276 ----------------------- src/input/Prbs.cpp | 101 +++++++++ src/input/Prbs.h | 56 +++++ src/input/Zmq.cpp | 625 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Zmq.h | 271 ++++++++++++++++++++++ 14 files changed, 1220 insertions(+), 1120 deletions(-) delete mode 100644 src/dabInputPrbs.cpp delete mode 100644 src/dabInputPrbs.h delete mode 100644 src/dabInputZmq.cpp delete mode 100644 src/dabInputZmq.h create mode 100644 src/input/Prbs.cpp create mode 100644 src/input/Prbs.h create mode 100644 src/input/Zmq.cpp create mode 100644 src/input/Zmq.h (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 7e3f855..bdc2099 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -48,27 +48,14 @@ #include #include #include "dabOutput/dabOutput.h" -#include "dabInput.h" +#include "input/inputs.h" #include "utils.h" -#include "dabInputFile.h" -#include "dabInputFifo.h" -#include "dabInputMpegFile.h" -#include "dabInputMpegFifo.h" -#include "dabInputDabplusFile.h" -#include "dabInputDabplusFifo.h" -#include "dabInputPacketFile.h" -#include "dabInputEnhancedPacketFile.h" -#include "dabInputEnhancedFifo.h" -#include "dabInputUdp.h" -#include "dabInputPrbs.h" -#include "dabInputRawFile.h" -#include "dabInputRawFifo.h" -#include "dabInputDmbFile.h" -#include "dabInputDmbUdp.h" -#include "dabInputZmq.h" #include "DabMux.h" #include "ManagementServer.h" +#include "input/Prbs.h" +#include "input/Zmq.h" + #ifdef _WIN32 # pragma warning ( disable : 4103 ) @@ -541,13 +528,13 @@ void parse_ptree( } } -static dab_input_zmq_config_t setup_zmq_input( +static Inputs::dab_input_zmq_config_t setup_zmq_input( const boost::property_tree::ptree &pt, const std::string& subchanuid) { using boost::property_tree::ptree_error; - dab_input_zmq_config_t zmqconfig; + Inputs::dab_input_zmq_config_t zmqconfig; try { zmqconfig.buffer_size = pt.get("zmq-buffer"); @@ -621,6 +608,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->inputUri = inputUri; +#if OLD_INPUTS // {{{ /* The input is of the old_style type, * with the struct of function pointers, * and needs to be a DabInputCompatible @@ -714,7 +702,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, } else if (type == "data" and proto == "prbs") { input_is_old_style = false; - subchan->input = new DabInputPrbs(); + subchan->input = make_shared(); subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; } else if (type == "data") { @@ -928,5 +916,155 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->input = new DabInputCompatible(operations); } // else { it's already been created! } +#endif // 0 }}} + + dabProtection* protection = &subchan->protection; + + const bool nonblock = pt.get("nonblock", false); + + if (type == "dabplus" or type == "audio") { + subchan->type = subchannel_type_t::Audio; + subchan->bitrate = 0; + + if (proto == "file") { + if (nonblock) { + // TODO + } + } + else if (proto == "tcp" || + proto == "epgm" || + proto == "ipc") { + + if (nonblock) { + etiLog.level(warn) << "The nonblock option is meaningless for the zmq input"; + } + + auto zmqconfig = setup_zmq_input(pt, subchanuid); + + if (type == "audio") { + auto inzmq = make_shared(subchanuid, zmqconfig); + rcs.enrol(inzmq.get()); + subchan->input = inzmq; + } + else if (type == "dabplus") { + auto inzmq = make_shared(subchanuid, zmqconfig); + rcs.enrol(inzmq.get()); + subchan->input = inzmq; + } + + if (proto == "epgm") { + etiLog.level(warn) << "Using untested epgm:// zeromq input"; + } + else if (proto == "ipc") { + etiLog.level(warn) << "Using untested ipc:// zeromq input"; + } + } + else { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": Invalid protocol for " << type << " input (" << + proto << ")" << endl; + throw runtime_error(ss.str()); + } + } + else if (type == "data" and proto == "prbs") { + subchan->input = make_shared(); + subchan->type = subchannel_type_t::DataDmb; + subchan->bitrate = DEFAULT_DATA_BITRATE; + } + else { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << " has unknown type!"; + throw runtime_error(ss.str()); + } + subchan->startAddress = 0; + + if (type == "audio") { + protection->form = UEP; + protection->level = 2; + protection->uep.tableIndex = 0; + } + else { + protection->level = 2; + protection->form = EEP; + protection->eep.profile = EEP_A; + } + + /* Get bitrate */ + try { + subchan->bitrate = pt.get("bitrate"); + if ((subchan->bitrate & 0x7) != 0) { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": Bitrate (" << subchan->bitrate << " not a multiple of 8!"; + throw runtime_error(ss.str()); + } + } + catch (ptree_error &e) { + stringstream ss; + ss << "Error, no bitrate defined for subchannel " << subchanuid; + throw runtime_error(ss.str()); + } + + /* Get id */ + try { + subchan->id = hexparse(pt.get("id")); + } + catch (ptree_error &e) { + for (int i = 0; i < 64; ++i) { // Find first free subchannel + vector::iterator subchannel = getSubchannel(ensemble->subchannels, i); + if (subchannel == ensemble->subchannels.end()) { + subchannel = ensemble->subchannels.end() - 1; + subchan->id = i; + break; + } + } + } + + /* Get optional protection profile */ + string profile = pt.get("protection-profile", ""); + + if (profile == "EEP_A") { + protection->form = EEP; + protection->eep.profile = EEP_A; + } + else if (profile == "EEP_B") { + protection->form = EEP; + protection->eep.profile = EEP_B; + } + else if (profile == "UEP") { + protection->form = UEP; + } + + /* Get protection level */ + try { + int level = pt.get("protection"); + + if (protection->form == UEP) { + if ((level < 1) || (level > 5)) { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": protection level must be between " + "1 to 5 inclusively (current = " << level << " )"; + throw runtime_error(ss.str()); + } + } + else if (protection->form == EEP) { + if ((level < 1) || (level > 4)) { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": protection level must be between " + "1 to 4 inclusively (current = " << level << " )"; + throw runtime_error(ss.str()); + } + } + protection->level = level - 1; + } + catch (ptree_error &e) { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": protection level undefined!"; + throw runtime_error(ss.str()); + } } diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 32ddb39..3927420 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -94,22 +94,8 @@ typedef DWORD32 uint32_t; # include "Eti.h" #endif -#include "dabInputFile.h" -#include "dabInputFifo.h" -#include "dabInputMpegFile.h" -#include "dabInputMpegFifo.h" -#include "dabInputDabplusFile.h" -#include "dabInputDabplusFifo.h" -#include "dabInputPacketFile.h" -#include "dabInputEnhancedPacketFile.h" -#include "dabInputEnhancedFifo.h" -#include "dabInputUdp.h" -#include "dabInputPrbs.h" -#include "dabInputRawFile.h" -#include "dabInputRawFifo.h" -#include "dabInputDmbFile.h" -#include "dabInputDmbUdp.h" - +#include "input/Prbs.h" +#include "input/Zmq.h" #include "dabOutput/dabOutput.h" #include "dabOutput/edi/TagItems.h" diff --git a/src/DabMux.h b/src/DabMux.h index 5dda759..80b4881 100644 --- a/src/DabMux.h +++ b/src/DabMux.h @@ -25,8 +25,7 @@ You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ -#ifndef _DABMUX_H -#define _DABMUX_H +#pragma once #include #include @@ -34,7 +33,7 @@ #include "DabMultiplexer.h" #include "RemoteControl.h" #include "dabOutput/dabOutput.h" -#include "dabInput.h" +#include "input/inputs.h" #include "Eti.h" #include "MuxElements.h" @@ -44,5 +43,3 @@ # include #endif -#endif - diff --git a/src/Makefile.am b/src/Makefile.am index 408c86e..b8de4e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -47,23 +47,9 @@ odr_dabmux_LDADD =$(ZMQ_LIBS) $(CURL_LIBS) \ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ DabMultiplexer.cpp DabMultiplexer.h \ - dabInput.h dabInput.cpp \ - dabInputDabplusFifo.h dabInputDabplusFifo.cpp \ - dabInputDabplusFile.h dabInputDabplusFile.cpp \ - dabInputDmbFile.h dabInputDmbFile.cpp \ - dabInputDmbUdp.h dabInputDmbUdp.cpp \ - dabInputEnhancedFifo.h dabInputEnhancedFifo.cpp \ - dabInputEnhancedPacketFile.h dabInputEnhancedPacketFile.cpp \ - dabInputFifo.h dabInputFifo.cpp \ - dabInputFile.h dabInputFile.cpp \ - dabInputMpegFifo.h dabInputMpegFifo.cpp \ - dabInputMpegFile.h dabInputMpegFile.cpp \ - dabInputPacketFile.h dabInputPacketFile.cpp \ - dabInputPrbs.h dabInputPrbs.cpp \ - dabInputRawFile.h dabInputRawFile.cpp \ - dabInputRawFifo.h dabInputRawFifo.cpp \ - dabInputUdp.h dabInputUdp.cpp \ - dabInputZmq.h dabInputZmq.cpp \ + input/inputs.h \ + input/Prbs.cpp input/Prbs.h \ + input/Zmq.cpp input/Zmq.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/MuxElements.h b/src/MuxElements.h index 7056121..7324cdc 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -40,7 +40,7 @@ #include #include #include "dabOutput/dabOutput.h" -#include "dabInput.h" +#include "input/inputs.h" #include "RemoteControl.h" #include "Eti.h" @@ -295,7 +295,7 @@ public: std::string uid; std::string inputUri; - DabInputBase* input; + std::shared_ptr input; unsigned char id; subchannel_type_t type; uint16_t startAddress; diff --git a/src/dabInput.h b/src/dabInput.h index d5444cd..0accddb 100644 --- a/src/dabInput.h +++ b/src/dabInput.h @@ -29,8 +29,6 @@ #include "RemoteControl.h" #include -extern Logger etiLog; - // TODO replace usage of dabInputOperations by a // class hierarchy struct dabInputOperations { @@ -48,19 +46,6 @@ struct dabInputOperations { bool operator==(const dabInputOperations&); }; -/* New input object base */ -class DabInputBase { - public: - virtual int open(const std::string& name) = 0; - virtual int readFrame(void* buffer, int size) = 0; - virtual int setBitrate(int bitrate) = 0; - virtual int close() = 0; - - virtual ~DabInputBase() {} - protected: - DabInputBase() {} -}; - /* Wrapper class for old-style dabInputOperations inputs */ class DabInputCompatible : public DabInputBase { public: diff --git a/src/dabInputPrbs.cpp b/src/dabInputPrbs.cpp deleted file mode 100644 index 2678668..0000000 --- a/src/dabInputPrbs.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - Pseudo-Random Bit Sequence generator for test purposes. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "dabInputPrbs.h" - -#include -#include -#include -#include -#include -#include -#include "utils.h" - -using namespace std; - -// ETS 300 799 Clause G.2.1 -// Preferred polynomial is G(x) = x^20 + x^17 + 1 -const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; - -int DabInputPrbs::open(const string& name) -{ - if (name.empty()) { - m_prbs.setup(PRBS_DEFAULT_POLY); - } - else { - if (name[0] != ':') { - throw invalid_argument( - "Invalid PRBS address format. " - "Must be prbs://:polynomial."); - } - - const string poly_str = name.substr(1); - - long polynomial = hexparse(poly_str); - - if (polynomial == 0) { - throw invalid_argument("No polynomial given for PRBS input"); - } - - m_prbs.setup(polynomial); - } - rewind(); - - return 0; -} - -int DabInputPrbs::readFrame(void* buffer, int size) -{ - unsigned char* cbuffer = reinterpret_cast(buffer); - - for (int i = 0; i < size; ++i) { - cbuffer[i] = m_prbs.step(); - } - - return size; -} - -int DabInputPrbs::setBitrate(int bitrate) -{ - return bitrate; -} - -int DabInputPrbs::close() -{ - return 0; -} - -int DabInputPrbs::rewind() -{ - m_prbs.rewind(); - return 0; -} - diff --git a/src/dabInputPrbs.h b/src/dabInputPrbs.h deleted file mode 100644 index 95c5e25..0000000 --- a/src/dabInputPrbs.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - Pseudo-Random Bit Sequence generator for test purposes. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include - -#include "dabInput.h" -#include "prbs.h" - -class DabInputPrbs : public DabInputBase { - public: - virtual int open(const std::string& name); - virtual int readFrame(void* buffer, int size); - virtual int setBitrate(int bitrate); - virtual int close(); - - private: - virtual int rewind(); - - PrbsGenerator m_prbs; -}; - diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp deleted file mode 100644 index 93f1ea3..0000000 --- a/src/dabInputZmq.cpp +++ /dev/null @@ -1,619 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2013, 2014 Matthias P. Braendli - http://www.opendigitalradio.org - - ZeroMQ input. see www.zeromq.org for more info - - For the AAC+ input, each zeromq message must contain one superframe - or one zmq_frame_header_t followed by a superframe. - - For the MPEG input, each zeromq message must contain one frame. - - Encryption is provided by zmq_curve, see the corresponding manpage. - - From the ZeroMQ manpage 'zmq': - - The 0MQ lightweight messaging kernel is a library which extends the standard - socket interfaces with features traditionally provided by specialised - messaging middleware products. 0MQ sockets provide an abstraction of - asynchronous message queues, multiple messaging patterns, message filtering - (subscriptions), seamless access to multiple transport protocols and more. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "dabInput.h" -#include "dabInputZmq.h" -#include "PcDebug.h" - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - -#include "zmq.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef __MINGW32__ -# define bzero(s, n) memset(s, 0, n) -#endif - -using namespace std; - -int readkey(string& keyfile, char* key) -{ - int fd = open(keyfile.c_str(), O_RDONLY); - if (fd < 0) - return fd; - int ret = read(fd, key, CURVE_KEYLEN); - close(fd); - if (ret < 0) { - return ret; - } - - /* It needs to be zero-terminated */ - key[CURVE_KEYLEN] = '\0'; - - return 0; -} - -/***** Common functions (MPEG and AAC) ******/ - -/* If necessary, unbind the socket, then check the keys, - * if they are ok and encryption is required, set the - * keys to the socket, and finally bind the socket - * to the new address - */ -void DabInputZmqBase::rebind() -{ - if (! m_zmq_sock_bound_to.empty()) { - try { - m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str()); - } - catch (zmq::error_t& err) { - etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed"; - } - } - - m_zmq_sock_bound_to = ""; - - /* Load each key independently */ - if (! m_config.curve_public_keyfile.empty()) { - int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key); - - if (rc < 0) { - etiLog.level(warn) << "Invalid public key for input " << - m_name; - - INVALIDATE_KEY(m_curve_public_key); - } - } - - if (! m_config.curve_secret_keyfile.empty()) { - int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key); - - if (rc < 0) { - etiLog.level(warn) << "Invalid secret key for input " << - m_name; - - INVALIDATE_KEY(m_curve_secret_key); - } - } - - if (! m_config.curve_encoder_keyfile.empty()) { - int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key); - - if (rc < 0) { - etiLog.level(warn) << "Invalid encoder key for input " << - m_name; - - INVALIDATE_KEY(m_curve_encoder_key); - } - } - - /* If you want encryption, you need to have defined all - * key files - */ - if ( m_config.enable_encryption && - ( ! (KEY_VALID(m_curve_public_key) && - KEY_VALID(m_curve_secret_key) && - KEY_VALID(m_curve_encoder_key) ) ) ) { - throw std::runtime_error("When enabling encryption, all three " - "keyfiles must be valid!"); - } - - if (m_config.enable_encryption) { - try { - /* We want to check that the encoder is the right one, - * so the encoder is the CURVE server. - */ - m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY, - m_curve_encoder_key, CURVE_KEYLEN); - } - catch (zmq::error_t& err) { - std::ostringstream os; - os << "ZMQ set encoder key for input " << m_name << " failed" << - err.what(); - throw std::runtime_error(os.str()); - } - - try { - m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY, - m_curve_public_key, CURVE_KEYLEN); - } - catch (zmq::error_t& err) { - std::ostringstream os; - os << "ZMQ set public key for input " << m_name << " failed" << - err.what(); - throw std::runtime_error(os.str()); - } - - try { - m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, - m_curve_secret_key, CURVE_KEYLEN); - } - catch (zmq::error_t& err) { - std::ostringstream os; - os << "ZMQ set secret key for input " << m_name << " failed" << - err.what(); - throw std::runtime_error(os.str()); - } - } - else { - try { - /* This forces the socket to go to the ZMQ_NULL auth - * mechanism - */ - const int no = 0; - m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no)); - } - catch (zmq::error_t& err) { - etiLog.level(warn) << "ZMQ disable encryption keys for input " << - m_name << " failed: " << err.what(); - } - - } - - // Prepare the ZMQ socket to accept connections - try { - m_zmq_sock.bind(m_inputUri.c_str()); - } - catch (zmq::error_t& err) { - std::ostringstream os; - os << "ZMQ bind for input " << m_name << " failed" << - err.what(); - throw std::runtime_error(os.str()); - } - - m_zmq_sock_bound_to = m_inputUri; - - try { - m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); - } - catch (zmq::error_t& err) { - std::ostringstream os; - os << "ZMQ set socket options for input " << m_name << " failed" << - err.what(); - throw std::runtime_error(os.str()); - } -} - -int DabInputZmqBase::open(const std::string& inputUri) -{ - m_inputUri = inputUri; - - /* Let caller handle exceptions when we open() */ - rebind(); - - // We want to appear in the statistics ! - m_stats.registerAtServer(); - - return 0; -} - -int DabInputZmqBase::close() -{ - m_zmq_sock.close(); - return 0; -} - -int DabInputZmqBase::setBitrate(int bitrate) -{ - m_bitrate = bitrate; - return bitrate; // TODO do a nice check here -} - -// size corresponds to a frame size. It is constant for a given bitrate -int DabInputZmqBase::readFrame(void* buffer, int size) -{ - int rc; - - /* We must *always* read data from the ZMQ socket, - * to make sure that ZMQ internal buffers are emptied - * quickly. It's the only way to control the buffers - * of the whole path from encoder to our frame_buffer. - */ - rc = readFromSocket(size); - - /* Notify of a buffer overrun, and drop some frames */ - if (m_frame_buffer.size() >= m_config.buffer_size) { - m_stats.notifyOverrun(); - - /* If the buffer is really too full, we drop as many frames as needed - * to get down to the prebuffering size. We would like to have our buffer - * filled to the prebuffering length. - */ - if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { - size_t over_max = m_frame_buffer.size() - m_config.prebuffering; - - while (over_max--) { - delete[] m_frame_buffer.front(); - m_frame_buffer.pop_front(); - } - } - else { - /* Our frame_buffer contains DAB logical frames. Five of these make one - * AAC superframe. - * - * Dropping this superframe amounts to dropping 120ms of audio. - * - * We're actually not sure to drop five DAB logical frames - * belonging to the same AAC superframe. It is assumed that no - * receiver will crash because of this. At least, the DAB logical frame - * vs. AAC superframe alignment is preserved. - * - * TODO: of course this assumption probably doesn't hold. Fix this ! - * TODO: also, with MPEG, the above doesn't hold, so we drop five - * frames even though we could drop less. - * */ - for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) { - delete[] m_frame_buffer.front(); - m_frame_buffer.pop_front(); - } - } - } - - if (m_prebuf_current > 0) { - if (rc > 0) - m_prebuf_current--; - if (m_prebuf_current == 0) - etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", - m_name.c_str()); - - /* During prebuffering, give a zeroed frame to the mux */ - m_stats.notifyUnderrun(); - memset(buffer, 0, size); - return size; - } - - // Save stats data in bytes, not in frames - m_stats.notifyBuffer(m_frame_buffer.size() * size); - - if (m_frame_buffer.empty()) { - etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", - m_name.c_str()); - // reset prebuffering - m_prebuf_current = m_config.prebuffering; - - /* We have no data to give, we give a zeroed frame */ - m_stats.notifyUnderrun(); - memset(buffer, 0, size); - return size; - } - else - { - /* Normal situation, give a frame from the frame_buffer */ - uint8_t* newframe = m_frame_buffer.front(); - memcpy(buffer, newframe, size); - delete[] newframe; - m_frame_buffer.pop_front(); - return size; - } -} - - -/******** MPEG input *******/ - -// Read a MPEG frame from the socket, and push to list -int DabInputZmqMPEG::readFromSocket(size_t framesize) -{ - bool messageReceived = false; - zmq::message_t msg; - - try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); - if (!messageReceived) { - return 0; - } - - } - catch (zmq::error_t& err) - { - etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << - m_name << ": " << err.what(); - } - - /* This is the old 'one superframe per ZMQ message' format */ - uint8_t* data = (uint8_t*)msg.data(); - size_t datalen = msg.size(); - - /* Look for the new zmq_frame_header_t format */ - zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); - - if (msg.size() == ZMQ_FRAME_SIZE(frame) && - frame->version == 1 && - frame->encoder == ZMQ_ENCODER_TOOLAME) { - datalen = frame->datasize; - data = ZMQ_FRAME_DATA(frame); - - m_stats.notifyPeakLevels(frame->audiolevel_left, - frame->audiolevel_right); - } - - - if (datalen == framesize) - { - if (m_frame_buffer.size() > m_config.buffer_size) { - etiLog.level(warn) << - "inputZMQ " << m_name << - " buffer full (" << m_frame_buffer.size() << ")," - " dropping incoming frame !"; - messageReceived = false; - } - else if (m_enable_input) { - // copy the input frame blockwise into the frame_buffer - auto framedata = new uint8_t[framesize]; - memcpy(framedata, data, framesize); - m_frame_buffer.push_back(framedata); - } - else { - return 0; - } - } - else { - etiLog.level(error) << - "inputZMQ " << m_name << - " verify bitrate: recv'd " << msg.size() << " B" << - ", need " << framesize << "."; - messageReceived = false; - } - - return messageReceived ? msg.size() : 0; -} - -/******** AAC+ input *******/ - -// Read a AAC+ superframe from the socket, cut it into five frames, -// and push to list -int DabInputZmqAAC::readFromSocket(size_t framesize) -{ - bool messageReceived; - zmq::message_t msg; - - try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); - if (!messageReceived) { - return 0; - } - - } - catch (zmq::error_t& err) - { - etiLog.level(error) << - "Failed to receive AAC superframe from zmq socket " << - m_name << ": " << err.what(); - } - - /* This is the old 'one superframe per ZMQ message' format */ - uint8_t* data = (uint8_t*)msg.data(); - size_t datalen = msg.size(); - - /* Look for the new zmq_frame_header_t format */ - zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); - - if (msg.size() == ZMQ_FRAME_SIZE(frame) && - frame->version == 1 && - frame->encoder == ZMQ_ENCODER_FDK) { - datalen = frame->datasize; - data = ZMQ_FRAME_DATA(frame); - - m_stats.notifyPeakLevels(frame->audiolevel_left, - frame->audiolevel_right); - } - - - /* TS 102 563, Section 6: - * Audio super frames are transported in five successive DAB logical frames - * with additional error protection. - */ - if (datalen) - { - if (datalen == 5*framesize) - { - if (m_frame_buffer.size() > m_config.buffer_size) { - etiLog.level(warn) << - "inputZMQ " << m_name << - " buffer full (" << m_frame_buffer.size() << ")," - " dropping incoming superframe !"; - datalen = 0; - } - else if (m_enable_input) { - // copy the input frame blockwise into the frame_buffer - for (uint8_t* framestart = data; - framestart < &data[5*framesize]; - framestart += framesize) { - auto audioframe = new uint8_t[framesize]; - memcpy(audioframe, framestart, framesize); - m_frame_buffer.push_back(audioframe); - } - } - else { - datalen = 0; - } - } - else { - etiLog.level(error) << - "inputZMQ " << m_name << - " verify bitrate: recv'd " << msg.size() << " B" << - ", need " << 5*framesize << "."; - - datalen = 0; - } - } - else { - etiLog.level(error) << - "inputZMQ " << m_name << - " invalid frame received"; - } - - return datalen; -} - -/********* REMOTE CONTROL ***********/ - -void DabInputZmqBase::set_parameter(const string& parameter, - const string& value) -{ - if (parameter == "buffer") { - size_t new_limit = atol(value.c_str()); - - if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { - throw ParameterError("Desired buffer size too small." - " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); - } - else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { - throw ParameterError("Desired buffer size too large." - " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); - } - - m_config.buffer_size = new_limit; - } - else if (parameter == "prebuffering") { - size_t new_prebuf = atol(value.c_str()); - - if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) { - throw ParameterError("Desired prebuffering too small." - " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); - } - else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) { - throw ParameterError("Desired prebuffering too large." - " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); - } - - m_config.prebuffering = new_prebuf; - } - else if (parameter == "enable") { - if (value == "1") { - m_enable_input = true; - } - else if (value == "0") { - m_enable_input = false; - } - else { - throw ParameterError("Value not understood, specify 0 or 1."); - } - } - else if (parameter == "encryption") { - if (value == "1") { - m_config.enable_encryption = true; - } - else if (value == "0") { - m_config.enable_encryption = false; - } - else { - throw ParameterError("Value not understood, specify 0 or 1."); - } - - try { - rebind(); - } - catch (std::runtime_error &e) { - stringstream ss; - ss << "Could not bind socket again with new keys." << - e.what(); - throw ParameterError(ss.str()); - } - } - else if (parameter == "secretkey") { - m_config.curve_secret_keyfile = value; - } - else if (parameter == "publickey") { - m_config.curve_public_keyfile = value; - } - else if (parameter == "encoderkey") { - m_config.curve_encoder_keyfile = value; - } - else { - stringstream ss; - ss << "Parameter '" << parameter << - "' is not exported by controllable " << get_rc_name(); - throw ParameterError(ss.str()); - } -} - -const string DabInputZmqBase::get_parameter(const string& parameter) const -{ - stringstream ss; - if (parameter == "buffer") { - ss << m_config.buffer_size; - } - else if (parameter == "prebuffering") { - ss << m_config.prebuffering; - } - else if (parameter == "enable") { - if (m_enable_input) - ss << "true"; - else - ss << "false"; - } - else if (parameter == "encryption") { - if (m_config.enable_encryption) - ss << "true"; - else - ss << "false"; - } - else if (parameter == "secretkey") { - ss << m_config.curve_secret_keyfile; - } - else if (parameter == "publickey") { - ss << m_config.curve_public_keyfile; - } - else if (parameter == "encoderkey") { - ss << m_config.curve_encoder_keyfile; - } - else { - ss << "Parameter '" << parameter << - "' is not exported by controllable " << get_rc_name(); - throw ParameterError(ss.str()); - } - return ss.str(); - -} - -#endif - diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h deleted file mode 100644 index 351fb07..0000000 --- a/src/dabInputZmq.h +++ /dev/null @@ -1,276 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2013, 2014 Matthias P. Braendli - http://www.opendigitalradio.org - - ZeroMQ input. see www.zeromq.org for more info - - For the AAC+ input, each zeromq message must contain one superframe, - or one zmq_frame_header_t followed by a superframe. - - For the MPEG input, each zeromq message must contain one frame. - - Encryption is provided by zmq_curve, see the corresponding manpage. - - From the ZeroMQ manpage 'zmq': - - The 0MQ lightweight messaging kernel is a library which extends the standard - socket interfaces with features traditionally provided by specialised - messaging middleware products. 0MQ sockets provide an abstraction of - asynchronous message queues, multiple messaging patterns, message filtering - (subscriptions), seamless access to multiple transport protocols and more. - */ -/* - This file is part of ODR-DabMux. - - It defines a ZeroMQ input for dabplus data. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef DAB_INPUT_ZMQ_H -#define DAB_INPUT_ZMQ_H - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - -#include -#include -#include -#include "zmq.hpp" -#include "dabInput.h" -#include "ManagementServer.h" - -/* The frame_buffer contains DAB logical frames as defined in - * TS 102 563, section 6. - * Five elements of this buffer make one AAC superframe (120ms audio) - */ - -// Number of elements to prebuffer before starting the pipeline -#define INPUT_ZMQ_DEF_PREBUFFERING (5*4) // 480ms - -// Default frame_buffer size in number of elements -#define INPUT_ZMQ_DEF_BUFFER_SIZE (5*8) // 960ms - -// Minimum frame_buffer size in number of elements -// This is one AAC superframe, but you probably don't want to -// go that low anyway. -#define INPUT_ZMQ_MIN_BUFFER_SIZE (5*1) // 120ms - -// Maximum frame_buffer size in number of elements -// One minute is clearly way over what everybody would -// want. -#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s - -/* The ZeroMQ Curve key is 40 bytes long in Z85 representation - * - * But we need to store it as zero-terminated string. - */ -#define CURVE_KEYLEN 40 - -/* helper to invalidate a key */ -#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1) - -/* Verification for key validity */ -#define KEY_VALID(k) (k[0] != '\0') - -/* Read a key from file into key - * - * Returns 0 on success, negative value on failure - */ -int readkey(std::string& keyfile, char* key); - -struct dab_input_zmq_config_t -{ - /* The size of the internal buffer, measured in number - * of elements. - * - * Each element corresponds to five frames, - * or one AAC superframe. - */ - size_t buffer_size; - - /* The amount of prebuffering to do before we start streaming - * - * Same units as buffer_size - */ - size_t prebuffering; - - /* Whether to enforce encryption or not - */ - bool enable_encryption; - - /* Full path to file containing public key. - */ - std::string curve_public_keyfile; - - /* Full path to file containing secret key. - */ - std::string curve_secret_keyfile; - - /* Full path to file containing encoder public key. - */ - std::string curve_encoder_keyfile; -}; - -#define ZMQ_ENCODER_FDK 1 -#define ZMQ_ENCODER_TOOLAME 2 - -/* This defines the on-wire representation of a ZMQ message header. - * - * The data follows right after this header */ -struct zmq_frame_header_t -{ - uint16_t version; // we support version=1 now - uint16_t encoder; // see ZMQ_ENCODER_XYZ - - /* length of the 'data' field */ - uint32_t datasize; - - /* Audio level, peak, linear PCM */ - int16_t audiolevel_left; - int16_t audiolevel_right; - - /* Data follows this header */ -} __attribute__ ((packed)); - -/* The expected frame size incl data of the given frame */ -#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize) - -#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) ) - - -class DabInputZmqBase : public DabInputBase, public RemoteControllable { - public: - DabInputZmqBase(const std::string name, - dab_input_zmq_config_t config) - : RemoteControllable(name), - m_zmq_context(1), - m_zmq_sock(m_zmq_context, ZMQ_SUB), - m_zmq_sock_bound_to(""), - m_bitrate(0), - m_enable_input(true), - m_config(config), - m_stats(m_name), - m_prebuf_current(config.prebuffering) { - RC_ADD_PARAMETER(enable, - "If the input is enabled. Set to zero to empty the buffer."); - - RC_ADD_PARAMETER(encryption, - "If encryption is enabled or disabled [1 or 0]." - " If 1 is written, the keys are reloaded."); - - RC_ADD_PARAMETER(publickey, - "The multiplexer's public key file."); - - RC_ADD_PARAMETER(secretkey, - "The multiplexer's secret key file."); - - RC_ADD_PARAMETER(encoderkey, - "The encoder's public key file."); - - /* Set all keys to zero */ - INVALIDATE_KEY(m_curve_public_key); - INVALIDATE_KEY(m_curve_secret_key); - INVALIDATE_KEY(m_curve_encoder_key); - } - - virtual int open(const std::string& inputUri); - virtual int readFrame(void* buffer, int size); - virtual int setBitrate(int bitrate); - virtual int close(); - - /* Remote control */ - virtual void set_parameter(const std::string& parameter, - const std::string& value); - - /* Getting a parameter always returns a string. */ - virtual const std::string get_parameter(const std::string& parameter) const; - - protected: - virtual int readFromSocket(size_t framesize) = 0; - - virtual void rebind(); - - zmq::context_t m_zmq_context; - zmq::socket_t m_zmq_sock; // handle for the zmq socket - - /* If the socket is bound, this saves the endpoint, - * otherwise, it's an empty string - */ - std::string m_zmq_sock_bound_to; - int m_bitrate; - - /* set this to zero to empty the input buffer */ - bool m_enable_input; - - /* stores elements of type char[] */ - std::list m_frame_buffer; - - dab_input_zmq_config_t m_config; - - /* Key management, keys need to be zero-terminated */ - char m_curve_public_key[CURVE_KEYLEN+1]; - char m_curve_secret_key[CURVE_KEYLEN+1]; - char m_curve_encoder_key[CURVE_KEYLEN+1]; - - std::string m_inputUri; - - InputStat m_stats; - - private: - size_t m_prebuf_current; -}; - -class DabInputZmqMPEG : public DabInputZmqBase { - public: - DabInputZmqMPEG(const std::string name, - dab_input_zmq_config_t config) - : DabInputZmqBase(name, config) { - RC_ADD_PARAMETER(buffer, - "Size of the input buffer [mpeg frames]"); - - RC_ADD_PARAMETER(prebuffering, - "Min buffer level before streaming starts [mpeg frames]"); - } - - private: - virtual int readFromSocket(size_t framesize); -}; - -class DabInputZmqAAC : public DabInputZmqBase { - public: - DabInputZmqAAC(const std::string name, - dab_input_zmq_config_t config) - : DabInputZmqBase(name, config) { - RC_ADD_PARAMETER(buffer, - "Size of the input buffer [aac superframes]"); - - RC_ADD_PARAMETER(prebuffering, - "Min buffer level before streaming starts [aac superframes]"); - } - - private: - virtual int readFromSocket(size_t framesize); -}; - -#endif // HAVE_INPUT_ZMQ - -#endif // DAB_INPUT_ZMQ_H - diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp new file mode 100644 index 0000000..b9e244b --- /dev/null +++ b/src/input/Prbs.cpp @@ -0,0 +1,101 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Prbs.h" + +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +// ETS 300 799 Clause G.2.1 +// Preferred polynomial is G(x) = x^20 + x^17 + 1 +const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; + +int Prbs::open(const string& name) +{ + if (name.empty()) { + m_prbs.setup(PRBS_DEFAULT_POLY); + } + else { + if (name[0] != ':') { + throw invalid_argument( + "Invalid PRBS address format. " + "Must be prbs://:polynomial."); + } + + const string poly_str = name.substr(1); + + long polynomial = hexparse(poly_str); + + if (polynomial == 0) { + throw invalid_argument("No polynomial given for PRBS input"); + } + + m_prbs.setup(polynomial); + } + rewind(); + + return 0; +} + +int Prbs::readFrame(void* buffer, int size) +{ + unsigned char* cbuffer = reinterpret_cast(buffer); + + for (int i = 0; i < size; ++i) { + cbuffer[i] = m_prbs.step(); + } + + return size; +} + +int Prbs::setBitrate(int bitrate) +{ + return bitrate; +} + +int Prbs::close() +{ + return 0; +} + +int Prbs::rewind() +{ + m_prbs.rewind(); + return 0; +} + +}; diff --git a/src/input/Prbs.h b/src/input/Prbs.h new file mode 100644 index 0000000..47b52ad --- /dev/null +++ b/src/input/Prbs.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include + +#include "input/inputs.h" +#include "prbs.h" + +namespace Inputs { + +class Prbs : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + virtual int rewind(); + + PrbsGenerator m_prbs; +}; + +}; + diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp new file mode 100644 index 0000000..6ef5fce --- /dev/null +++ b/src/input/Zmq.cpp @@ -0,0 +1,625 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Zmq.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include "zmq.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include "PcDebug.h" +#include "Log.h" + +#ifdef __MINGW32__ +# define bzero(s, n) memset(s, 0, n) +#endif + +namespace Inputs { + +using namespace std; + +int readkey(string& keyfile, char* key) +{ + FILE* fd = fopen(keyfile.c_str(), "r"); + if (fd == nullptr) { + return -1; + } + + int ret = fread(key, CURVE_KEYLEN, 1, fd); + fclose(fd); + if (ret == 0) { + return -1; + } + + /* It needs to be zero-terminated */ + key[CURVE_KEYLEN] = '\0'; + + return 0; +} + +/***** Common functions (MPEG and AAC) ******/ + +/* If necessary, unbind the socket, then check the keys, + * if they are ok and encryption is required, set the + * keys to the socket, and finally bind the socket + * to the new address + */ +void ZmqBase::rebind() +{ + if (! m_zmq_sock_bound_to.empty()) { + try { + m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str()); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed"; + } + } + + m_zmq_sock_bound_to = ""; + + /* Load each key independently */ + if (! m_config.curve_public_keyfile.empty()) { + int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid public key for input " << + m_name; + + INVALIDATE_KEY(m_curve_public_key); + } + } + + if (! m_config.curve_secret_keyfile.empty()) { + int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid secret key for input " << + m_name; + + INVALIDATE_KEY(m_curve_secret_key); + } + } + + if (! m_config.curve_encoder_keyfile.empty()) { + int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid encoder key for input " << + m_name; + + INVALIDATE_KEY(m_curve_encoder_key); + } + } + + /* If you want encryption, you need to have defined all + * key files + */ + if ( m_config.enable_encryption && + ( ! (KEY_VALID(m_curve_public_key) && + KEY_VALID(m_curve_secret_key) && + KEY_VALID(m_curve_encoder_key) ) ) ) { + throw std::runtime_error("When enabling encryption, all three " + "keyfiles must be valid!"); + } + + if (m_config.enable_encryption) { + try { + /* We want to check that the encoder is the right one, + * so the encoder is the CURVE server. + */ + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY, + m_curve_encoder_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set encoder key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY, + m_curve_public_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set public key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, + m_curve_secret_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set secret key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + } + else { + try { + /* This forces the socket to go to the ZMQ_NULL auth + * mechanism + */ + const int no = 0; + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no)); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ disable encryption keys for input " << + m_name << " failed: " << err.what(); + } + + } + + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(m_inputUri.c_str()); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + m_zmq_sock_bound_to = m_inputUri; + + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } +} + +int ZmqBase::open(const std::string& inputUri) +{ + m_inputUri = inputUri; + + /* Let caller handle exceptions when we open() */ + rebind(); + + // We want to appear in the statistics ! + m_stats.registerAtServer(); + + return 0; +} + +int ZmqBase::close() +{ + m_zmq_sock.close(); + return 0; +} + +int ZmqBase::setBitrate(int bitrate) +{ + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here +} + +// size corresponds to a frame size. It is constant for a given bitrate +int ZmqBase::readFrame(void* buffer, int size) +{ + int rc; + + /* We must *always* read data from the ZMQ socket, + * to make sure that ZMQ internal buffers are emptied + * quickly. It's the only way to control the buffers + * of the whole path from encoder to our frame_buffer. + */ + rc = readFromSocket(size); + + /* Notify of a buffer overrun, and drop some frames */ + if (m_frame_buffer.size() >= m_config.buffer_size) { + m_stats.notifyOverrun(); + + /* If the buffer is really too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. + */ + if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { + size_t over_max = m_frame_buffer.size() - m_config.prebuffering; + + while (over_max--) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + else { + /* Our frame_buffer contains DAB logical frames. Five of these make one + * AAC superframe. + * + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * belonging to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * TODO: also, with MPEG, the above doesn't hold, so we drop five + * frames even though we could drop less. + * */ + for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + } + + if (m_prebuf_current > 0) { + if (rc > 0) + m_prebuf_current--; + if (m_prebuf_current == 0) + etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", + m_name.c_str()); + + /* During prebuffering, give a zeroed frame to the mux */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frame_buffer.size() * size); + + if (m_frame_buffer.empty()) { + etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", + m_name.c_str()); + // reset prebuffering + m_prebuf_current = m_config.prebuffering; + + /* We have no data to give, we give a zeroed frame */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + else + { + /* Normal situation, give a frame from the frame_buffer */ + uint8_t* newframe = m_frame_buffer.front(); + memcpy(buffer, newframe, size); + delete[] newframe; + m_frame_buffer.pop_front(); + return size; + } +} + + +/******** MPEG input *******/ + +// Read a MPEG frame from the socket, and push to list +int ZmqMPEG::readFromSocket(size_t framesize) +{ + bool messageReceived = false; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_TOOLAME) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + if (datalen == framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming frame !"; + messageReceived = false; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + auto framedata = new uint8_t[framesize]; + memcpy(framedata, data, framesize); + m_frame_buffer.push_back(framedata); + } + else { + return 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << framesize << "."; + messageReceived = false; + } + + return messageReceived ? msg.size() : 0; +} + +/******** AAC+ input *******/ + +// Read a AAC+ superframe from the socket, cut it into five frames, +// and push to list +int ZmqAAC::readFromSocket(size_t framesize) +{ + bool messageReceived; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << + "Failed to receive AAC superframe from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_FDK) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + /* TS 102 563, Section 6: + * Audio super frames are transported in five successive DAB logical frames + * with additional error protection. + */ + if (datalen) + { + if (datalen == 5*framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming superframe !"; + datalen = 0; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + for (uint8_t* framestart = data; + framestart < &data[5*framesize]; + framestart += framesize) { + auto audioframe = new uint8_t[framesize]; + memcpy(audioframe, framestart, framesize); + m_frame_buffer.push_back(audioframe); + } + } + else { + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << 5*framesize << "."; + + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " invalid frame received"; + } + + return datalen; +} + +/********* REMOTE CONTROL ***********/ + +void ZmqBase::set_parameter(const string& parameter, + const string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + + if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.buffer_size = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_prebuf = atol(value.c_str()); + + if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.prebuffering = new_prebuf; + } + else if (parameter == "enable") { + if (value == "1") { + m_enable_input = true; + } + else if (value == "0") { + m_enable_input = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + } + else if (parameter == "encryption") { + if (value == "1") { + m_config.enable_encryption = true; + } + else if (value == "0") { + m_config.enable_encryption = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + + try { + rebind(); + } + catch (std::runtime_error &e) { + stringstream ss; + ss << "Could not bind socket again with new keys." << + e.what(); + throw ParameterError(ss.str()); + } + } + else if (parameter == "secretkey") { + m_config.curve_secret_keyfile = value; + } + else if (parameter == "publickey") { + m_config.curve_public_keyfile = value; + } + else if (parameter == "encoderkey") { + m_config.curve_encoder_keyfile = value; + } + else { + stringstream ss; + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } +} + +const string ZmqBase::get_parameter(const string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_config.buffer_size; + } + else if (parameter == "prebuffering") { + ss << m_config.prebuffering; + } + else if (parameter == "enable") { + if (m_enable_input) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "encryption") { + if (m_config.enable_encryption) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "secretkey") { + ss << m_config.curve_secret_keyfile; + } + else if (parameter == "publickey") { + ss << m_config.curve_public_keyfile; + } + else if (parameter == "encoderkey") { + ss << m_config.curve_encoder_keyfile; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); + +} + +}; + +#endif + diff --git a/src/input/Zmq.h b/src/input/Zmq.h new file mode 100644 index 0000000..d1dd2d5 --- /dev/null +++ b/src/input/Zmq.h @@ -0,0 +1,271 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe, + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + It defines a ZeroMQ input for dabplus data. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include +#include +#include +#include "zmq.hpp" +#include "input/inputs.h" +#include "ManagementServer.h" + +namespace Inputs { + +/* The frame_buffer contains DAB logical frames as defined in + * TS 102 563, section 6. + * Five elements of this buffer make one AAC superframe (120ms audio) + */ + +// Minimum frame_buffer size in number of elements +// This is one AAC superframe, but you probably don't want to +// go that low anyway. +const size_t INPUT_ZMQ_MIN_BUFFER_SIZE = 5*1; // 120ms + +// Maximum frame_buffer size in number of elements +// One minute is clearly way over what everybody would +// want. +const size_t INPUT_ZMQ_MAX_BUFFER_SIZE = 5*500; // 60s + +/* The ZeroMQ Curve key is 40 bytes long in Z85 representation + * + * But we need to store it as zero-terminated string. + */ +const size_t CURVE_KEYLEN = 40; + +/* helper to invalidate a key */ +#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1) + +/* Verification for key validity */ +#define KEY_VALID(k) (k[0] != '\0') + +/* Read a key from file into key + * + * Returns 0 on success, negative value on failure + */ +int readkey(std::string& keyfile, char* key); + +struct dab_input_zmq_config_t +{ + /* The size of the internal buffer, measured in number + * of elements. + * + * Each element corresponds to five frames, + * or one AAC superframe. + */ + size_t buffer_size; + + /* The amount of prebuffering to do before we start streaming + * + * Same units as buffer_size + */ + size_t prebuffering; + + /* Whether to enforce encryption or not + */ + bool enable_encryption; + + /* Full path to file containing public key. + */ + std::string curve_public_keyfile; + + /* Full path to file containing secret key. + */ + std::string curve_secret_keyfile; + + /* Full path to file containing encoder public key. + */ + std::string curve_encoder_keyfile; +}; + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +/* This defines the on-wire representation of a ZMQ message header. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) ) + + +class ZmqBase : public InputBase, public RemoteControllable { + public: + ZmqBase(const std::string name, + dab_input_zmq_config_t config) + : RemoteControllable(name), + m_zmq_context(1), + m_zmq_sock(m_zmq_context, ZMQ_SUB), + m_zmq_sock_bound_to(""), + m_bitrate(0), + m_enable_input(true), + m_config(config), + m_stats(m_name), + m_prebuf_current(config.prebuffering) { + RC_ADD_PARAMETER(enable, + "If the input is enabled. Set to zero to empty the buffer."); + + RC_ADD_PARAMETER(encryption, + "If encryption is enabled or disabled [1 or 0]." + " If 1 is written, the keys are reloaded."); + + RC_ADD_PARAMETER(publickey, + "The multiplexer's public key file."); + + RC_ADD_PARAMETER(secretkey, + "The multiplexer's secret key file."); + + RC_ADD_PARAMETER(encoderkey, + "The encoder's public key file."); + + /* Set all keys to zero */ + INVALIDATE_KEY(m_curve_public_key); + INVALIDATE_KEY(m_curve_secret_key); + INVALIDATE_KEY(m_curve_encoder_key); + } + + virtual int open(const std::string& inputUri); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + /* Remote control */ + virtual void set_parameter(const std::string& parameter, + const std::string& value); + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter(const std::string& parameter) const; + + protected: + virtual int readFromSocket(size_t framesize) = 0; + + virtual void rebind(); + + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; // handle for the zmq socket + + /* If the socket is bound, this saves the endpoint, + * otherwise, it's an empty string + */ + std::string m_zmq_sock_bound_to; + int m_bitrate; + + /* set this to zero to empty the input buffer */ + bool m_enable_input; + + /* stores elements of type char[] */ + std::list m_frame_buffer; + + dab_input_zmq_config_t m_config; + + /* Key management, keys need to be zero-terminated */ + char m_curve_public_key[CURVE_KEYLEN+1]; + char m_curve_secret_key[CURVE_KEYLEN+1]; + char m_curve_encoder_key[CURVE_KEYLEN+1]; + + std::string m_inputUri; + + InputStat m_stats; + + private: + size_t m_prebuf_current; +}; + +class ZmqMPEG : public ZmqBase { + public: + ZmqMPEG(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [mpeg frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [mpeg frames]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +class ZmqAAC : public ZmqBase { + public: + ZmqAAC(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [aac superframes]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [aac superframes]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +}; +#endif // HAVE_INPUT_ZMQ + + -- cgit v1.2.3 From 804fe1979f9ed7bef7badaf0aa08e35e09874772 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 16:28:27 +0100 Subject: Add rudimentary file input No nonblock support yet --- src/ConfigParser.cpp | 11 +++ src/Makefile.am | 1 + src/dabInputMpegFile.cpp | 31 ------- src/input/File.cpp | 227 +++++++++++++++++++++++++++++++++++++++++++++++ src/input/File.h | 69 ++++++++++++++ src/input/Prbs.h | 4 - src/input/Zmq.cpp | 9 -- src/input/Zmq.h | 7 -- src/input/inputs.h | 52 +++++++++++ src/mpeg.c | 26 ++++++ src/mpeg.h | 7 ++ 11 files changed, 393 insertions(+), 51 deletions(-) create mode 100644 src/input/File.cpp create mode 100644 src/input/File.h create mode 100644 src/input/inputs.h (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index bdc2099..2a8d3da 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -55,6 +55,7 @@ #include "input/Prbs.h" #include "input/Zmq.h" +#include "input/File.h" #ifdef _WIN32 @@ -930,6 +931,16 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, if (nonblock) { // TODO } + + if (type == "audio") { + subchan->input = make_shared(); + } + else if (type == "dabplus") { + subchan->input = make_shared(); + } + else { + throw logic_error("Incomplete handling of file input"); + } } else if (proto == "tcp" || proto == "epgm" || diff --git a/src/Makefile.am b/src/Makefile.am index b8de4e8..b356566 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -50,6 +50,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ input/inputs.h \ input/Prbs.cpp input/Prbs.h \ input/Zmq.cpp input/Zmq.h \ + input/File.cpp input/File.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/dabInputMpegFile.cpp b/src/dabInputMpegFile.cpp index 804ea29..6f24f32 100644 --- a/src/dabInputMpegFile.cpp +++ b/src/dabInputMpegFile.cpp @@ -47,37 +47,6 @@ struct dabInputOperations dabInputMpegFileOperations = { }; -#define MPEG_FREQUENCY -2 -#define MPEG_PADDING -3 -#define MPEG_COPYRIGHT -4 -#define MPEG_ORIGINAL -5 -#define MPEG_EMPHASIS -6 -int checkDabMpegFrame(void* data) { - mpegHeader* header = (mpegHeader*)data; - unsigned long* headerData = (unsigned long*)data; - if ((*headerData & 0x0f0ffcff) == 0x0004fcff) return 0; - if ((*headerData & 0x0f0ffcff) == 0x0004f4ff) return 0; - if (getMpegFrequency(header) != 48000) { - if (getMpegFrequency(header) != 24000) { - return MPEG_FREQUENCY; - } - } - if (header->padding != 0) { - return MPEG_PADDING; - } - if (header->copyright != 0) { - return MPEG_COPYRIGHT; - } - if (header->original != 0) { - return MPEG_ORIGINAL; - } - if (header->emphasis != 0) { - return MPEG_EMPHASIS; - } - return -1; -} - - int dabInputMpegFileRead(dabInputOperations* ops, void* args, void* buffer, int size) { dabInputFileData* data = (dabInputFileData*)args; diff --git a/src/input/File.cpp b/src/input/File.cpp new file mode 100644 index 0000000..9721b97 --- /dev/null +++ b/src/input/File.cpp @@ -0,0 +1,227 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +# define O_BINARY 0 +#endif + +#include "input/File.h" +#include "mpeg.h" + +namespace Inputs { + +int FileBase::open(const std::string& name) +{ + m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY); + if (m_fd == -1) { + std::stringstream ss; + ss << "Could not open input file " << name << ": " << + strerror(errno); + throw std::runtime_error(ss.str()); + } + return 0; +} + +int FileBase::close() +{ + if (m_fd != -1) { + ::close(m_fd); + m_fd = -1; + } + return 0; +} + +int FileBase::rewind() +{ + return ::lseek(m_fd, 0, SEEK_SET); +} + +int MPEGFile::readFrame(void* buffer, int size) +{ + int result; + bool do_rewind = false; +READ_SUBCHANNEL: + if (m_parity) { + result = readData(m_fd, buffer, size, 2); + m_parity = false; + return 0; + } else { + result = readMpegHeader(m_fd, buffer, size); + if (result > 0) { + result = readMpegFrame(m_fd, buffer, size); + if (result < 0 && getMpegFrequency(buffer) == 24000) { + m_parity = true; + result = size; + } + } + } + switch (result) { + case MPEG_BUFFER_UNDERFLOW: + etiLog.log(warn, "data underflow -> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_BUFFER_OVERFLOW: + etiLog.log(warn, "bitrate too high -> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_FILE_EMPTY: + if (do_rewind) { + etiLog.log(error, "file rewinded and still empty " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + } + else { + etiLog.log(info, "reach end of file -> rewinding\n"); + rewind(); + goto READ_SUBCHANNEL; + } + case MPEG_FILE_ERROR: + etiLog.log(alert, "can't read file (%i) -> frame muted\n", errno); + perror(""); + goto MUTE_SUBCHANNEL; + case MPEG_SYNC_NOT_FOUND: + etiLog.log(alert, "mpeg sync not found, maybe is not a valid file " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_INVALID_FRAME: + etiLog.log(alert, "file is not a valid mpeg file " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + default: + if (result < 0) { + etiLog.log(alert, + "unknown error (code = %i) -> frame muted\n", + result); +MUTE_SUBCHANNEL: + memset(buffer, 0, size); + } + else { + if (result < size) { + etiLog.log(warn, "bitrate too low from file " + "-> frame padded\n"); + memset((char*)buffer + result, 0, size - result); + } + + result = checkDabMpegFrame(buffer); + switch (result) { + case MPEG_FREQUENCY: + etiLog.log(error, "file has a frame with an invalid " + "frequency: %i, should be 48000 or 24000\n", + getMpegFrequency(buffer)); + break; + case MPEG_PADDING: + etiLog.log(warn, + "file has a frame with padding bit set\n"); + break; + case MPEG_COPYRIGHT: + result = 0; + break; + case MPEG_ORIGINAL: + result = 0; + break; + case MPEG_EMPHASIS: + etiLog.log(warn, + "file has a frame with emphasis bits set\n"); + break; + default: + if (result < 0) { + etiLog.log(alert, "mpeg file has an invalid DAB " + "mpeg frame (unknown reason: %i)\n", result); + } + break; + } + } + } + return result; +} + +int MPEGFile::setBitrate(int bitrate) +{ + if (bitrate == 0) { + char buffer[4]; + + if (readFrame(buffer, 4) == 0) { + bitrate = getMpegBitrate(buffer); + } + else { + bitrate = -1; + } + rewind(); + } + return bitrate; +} + + +int DABPlusFile::readFrame(void* buffer, int size) +{ + uint8_t* dataOut = reinterpret_cast(buffer); + + ssize_t ret = read(m_fd, dataOut, size); + + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < size) { + ssize_t sizeOut = ret; + etiLog.log(info, "reach end of file -> rewinding\n"); + if (rewind() == -1) { + etiLog.log(alert, "ERROR: Can't rewind file\n"); + return -1; + } + + ret = read(m_fd, dataOut + sizeOut, size - sizeOut); + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < size) { + etiLog.log(alert, "ERROR: Not enough data in file\n"); + return -1; + } + } + + return size; +} + +int DABPlusFile::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + +}; diff --git a/src/input/File.h b/src/input/File.h new file mode 100644 index 0000000..61be8b1 --- /dev/null +++ b/src/input/File.h @@ -0,0 +1,69 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include +#include "input/inputs.h" +#include "ManagementServer.h" + +namespace Inputs { + +class FileBase : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size) = 0; + virtual int setBitrate(int bitrate) = 0; + virtual int close(); + + /* Rewind the file + * Returns -1 on failure, 0 on success + */ + virtual int rewind(); + protected: + // We use unix open() instead of fopen() because + // we want to do non-blocking I/O + int m_fd = -1; +}; + +class MPEGFile : public FileBase { + public: + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + + private: + bool m_parity = false; +}; + +class DABPlusFile : public FileBase { + public: + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); +}; + + +}; diff --git a/src/input/Prbs.h b/src/input/Prbs.h index 47b52ad..1ad5047 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -28,10 +28,6 @@ #pragma once -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - #include #include "input/inputs.h" diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 6ef5fce..985fad3 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -41,13 +41,6 @@ #include "input/Zmq.h" -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - -#include "zmq.hpp" #include #include #include @@ -621,5 +614,3 @@ const string ZmqBase::get_parameter(const string& parameter) const }; -#endif - diff --git a/src/input/Zmq.h b/src/input/Zmq.h index d1dd2d5..02fce3a 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -43,12 +43,6 @@ #pragma once -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - #include #include #include @@ -266,6 +260,5 @@ class ZmqAAC : public ZmqBase { }; }; -#endif // HAVE_INPUT_ZMQ diff --git a/src/input/inputs.h b/src/input/inputs.h new file mode 100644 index 0000000..3bc1fa4 --- /dev/null +++ b/src/input/inputs.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif +#include "Log.h" +#include "RemoteControl.h" +#include + +namespace Inputs { + +/* New input object base */ +class InputBase { + public: + virtual int open(const std::string& name) = 0; + virtual int readFrame(void* buffer, int size) = 0; + virtual int setBitrate(int bitrate) = 0; + virtual int close() = 0; + + virtual ~InputBase() {} + protected: + InputBase() {} +}; + +}; + diff --git a/src/mpeg.c b/src/mpeg.c index f7aed34..fb6591a 100644 --- a/src/mpeg.c +++ b/src/mpeg.c @@ -219,3 +219,29 @@ int readMpegFrame(int file, void* data, int size) } return framelength; } + +int checkDabMpegFrame(void* data) { + mpegHeader* header = (mpegHeader*)data; + unsigned long* headerData = (unsigned long*)data; + if ((*headerData & 0x0f0ffcff) == 0x0004fcff) return 0; + if ((*headerData & 0x0f0ffcff) == 0x0004f4ff) return 0; + if (getMpegFrequency(header) != 48000) { + if (getMpegFrequency(header) != 24000) { + return MPEG_FREQUENCY; + } + } + if (header->padding != 0) { + return MPEG_PADDING; + } + if (header->copyright != 0) { + return MPEG_COPYRIGHT; + } + if (header->original != 0) { + return MPEG_ORIGINAL; + } + if (header->emphasis != 0) { + return MPEG_EMPHASIS; + } + return -1; +} + diff --git a/src/mpeg.h b/src/mpeg.h index aa7cfb6..15b9b80 100644 --- a/src/mpeg.h +++ b/src/mpeg.h @@ -75,6 +75,13 @@ ssize_t readData(int file, void* data, size_t size, unsigned int tries); int readMpegHeader(int file, void* data, int size); int readMpegFrame(int file, void* data, int size); +#define MPEG_FREQUENCY -2 +#define MPEG_PADDING -3 +#define MPEG_COPYRIGHT -4 +#define MPEG_ORIGINAL -5 +#define MPEG_EMPHASIS -6 +int checkDabMpegFrame(void* data); + #ifdef __cplusplus } #endif -- cgit v1.2.3 From 51491533a312884862849082b3507e49c1829d22 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:27:50 +0100 Subject: Add new UDP input --- src/ConfigParser.cpp | 30 +++++++++--- src/Makefile.am | 1 + src/UdpSocket.cpp | 18 +++++-- src/UdpSocket.h | 10 +++- src/input/Udp.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Udp.h | 52 ++++++++++++++++++++ 6 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 src/input/Udp.cpp create mode 100644 src/input/Udp.h (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2a8d3da..e48200a 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -56,6 +56,7 @@ #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" +#include "input/Udp.h" #ifdef _WIN32 @@ -922,16 +923,15 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, dabProtection* protection = &subchan->protection; const bool nonblock = pt.get("nonblock", false); + if (nonblock) { + etiLog.level(warn) << "The nonblock option is not supported"; + } if (type == "dabplus" or type == "audio") { subchan->type = subchannel_type_t::Audio; subchan->bitrate = 0; if (proto == "file") { - if (nonblock) { - // TODO - } - if (type == "audio") { subchan->input = make_shared(); } @@ -946,10 +946,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, proto == "epgm" || proto == "ipc") { - if (nonblock) { - etiLog.level(warn) << "The nonblock option is meaningless for the zmq input"; - } - auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -983,6 +979,24 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; } + else if (type == "data") { + if (proto == "udp") { + subchan->input = make_shared(); + } else if (proto == "file") { + // TODO + } else if (proto == "fifo") { + // TODO + } else { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": Invalid protocol for data input (" << + proto << ")" << endl; + throw runtime_error(ss.str()); + } + + subchan->type = subchannel_type_t::DataDmb; + subchan->bitrate = DEFAULT_DATA_BITRATE; + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << " has unknown type!"; diff --git a/src/Makefile.am b/src/Makefile.am index b356566..084cf7b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -51,6 +51,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ input/Prbs.cpp input/Prbs.h \ input/Zmq.cpp input/Zmq.h \ input/File.cpp input/File.h \ + input/Udp.cpp input/Udp.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 020e3f5..ccdd7ed 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -37,19 +37,19 @@ using namespace std; UdpSocket::UdpSocket() : listenSocket(INVALID_SOCKET) { - init_sock(0, ""); + reinit(0, ""); } UdpSocket::UdpSocket(int port) : listenSocket(INVALID_SOCKET) { - init_sock(port, ""); + reinit(port, ""); } UdpSocket::UdpSocket(int port, const std::string& name) : listenSocket(INVALID_SOCKET) { - init_sock(port, name); + reinit(port, name); } @@ -67,7 +67,7 @@ int UdpSocket::setBlocking(bool block) return 0; } -int UdpSocket::init_sock(int port, const std::string& name) +int UdpSocket::reinit(int port, const std::string& name) { if (listenSocket != INVALID_SOCKET) { ::close(listenSocket); @@ -98,6 +98,16 @@ int UdpSocket::init_sock(int port, const std::string& name) return 0; } +int UdpSocket::close() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + listenSocket = INVALID_SOCKET; + + return 0; +} UdpSocket::~UdpSocket() { diff --git a/src/UdpSocket.h b/src/UdpSocket.h index 535499e..dfeaac1 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -80,6 +80,15 @@ class UdpSocket UdpSocket(const UdpSocket& other) = delete; const UdpSocket& operator=(const UdpSocket& other) = delete; + /** reinitialise socket. Close the already open socket, and + * create a new one + */ + int reinit(int port, const std::string& name); + + /** Close the socket + */ + int close(void); + /** Send an UDP packet. * @param packet The UDP packet to be sent. It includes the data and the * destination address @@ -111,7 +120,6 @@ class UdpSocket int setBlocking(bool block); protected: - int init_sock(int port, const std::string& name); /// The address on which the socket is bound. InetAddress address; diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp new file mode 100644 index 0000000..e534a06 --- /dev/null +++ b/src/input/Udp.cpp @@ -0,0 +1,134 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Udp.h" + +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +int Udp::open(const std::string& name) +{ + // Skip the udp:// part if it is present + const string endpoint = (name.substr(0, 6) == "udp://") ? + name.substr(6) : name; + + // The endpoint should be address:port + const auto colon_pos = endpoint.find_first_of(":"); + if (colon_pos == string::npos) { + stringstream ss; + ss << "'" << name << + " is an invalid format for udp address: " + "expected [udp://]address:port"; + throw invalid_argument(ss.str()); + } + + const auto address = endpoint.substr(0, colon_pos); + const auto port_str = endpoint.substr(colon_pos + 1); + + const long port = strtol(port_str.c_str(), nullptr, 10); + + if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) { + throw out_of_range("udp input: port out of range"); + } + else if (port == 0 and errno != 0) { + stringstream ss; + ss << "udp input port parse error: " << strerror(errno); + throw invalid_argument(ss.str()); + } + + if (port == 0) { + throw out_of_range("can't use port number 0 in udp address"); + } + + if (m_sock.reinit(port, address) == -1) { + stringstream ss; + ss << "Could not init UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + if (m_sock.setBlocking(false) == -1) { + stringstream ss; + ss << "Could not set non-blocking UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + return 0; +} + +int Udp::readFrame(void* buffer, int size) +{ + uint8_t* data = reinterpret_cast(buffer); + + // Regardless of buffer contents, try receiving data. + UdpPacket packet; + int ret = m_sock.receive(packet); + + if (ret == -1) { + stringstream ss; + ss << "Could not read from UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + std::copy(packet.getData(), packet.getData() + packet.getSize(), + back_inserter(m_buffer)); + + // Take data from the buffer if it contains enough data, + // in any case write the buffer + if (m_buffer.size() >= (size_t)size) { + std::copy(m_buffer.begin(), m_buffer.begin() + size, data); + } + else { + memset(data, 0x0, size); + } + + return size; +} + +int Udp::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + +int Udp::close() +{ + return m_sock.close(); +} + +}; diff --git a/src/input/Udp.h b/src/input/Udp.h new file mode 100644 index 0000000..b6705e9 --- /dev/null +++ b/src/input/Udp.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include "input/inputs.h" +#include "UdpSocket.h" + +namespace Inputs { + +class Udp : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + UdpSocket m_sock; + + // The content of the UDP packets gets written into the + // buffer, and the UDP packet boundaries disappear there. + std::vector m_buffer; +}; + +}; + -- cgit v1.2.3 From 7604a3fbd99ec617274f77ecdd0a098abdb33f06 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:28:19 +0100 Subject: Do not throw a copy of the exception --- src/ConfigParser.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index e48200a..ddcb9ed 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -373,7 +373,7 @@ void parse_ptree( catch (runtime_error &e) { etiLog.log(error, "%s\n", e.what()); - throw e; + throw; } -- cgit v1.2.3 From 7405d574963abb37732de8a90dd9e42174e0410f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:57:16 +0100 Subject: Use RawFile for DAB+ and for data --- src/ConfigParser.cpp | 8 +++----- src/input/File.cpp | 4 ++-- src/input/File.h | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index ddcb9ed..a311d63 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -936,7 +936,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->input = make_shared(); } else if (type == "dabplus") { - subchan->input = make_shared(); + subchan->input = make_shared(); } else { throw logic_error("Incomplete handling of file input"); @@ -982,10 +982,8 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, else if (type == "data") { if (proto == "udp") { subchan->input = make_shared(); - } else if (proto == "file") { - // TODO - } else if (proto == "fifo") { - // TODO + } else if (proto == "file" or proto == "fifo") { + subchan->input = make_shared(); } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << diff --git a/src/input/File.cpp b/src/input/File.cpp index eb26136..732f2a2 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -178,7 +178,7 @@ int MPEGFile::setBitrate(int bitrate) } -int DABPlusFile::readFrame(uint8_t* buffer, size_t size) +int RawFile::readFrame(uint8_t* buffer, size_t size) { ssize_t ret = read(m_fd, buffer, size); @@ -212,7 +212,7 @@ int DABPlusFile::readFrame(uint8_t* buffer, size_t size) return size; } -int DABPlusFile::setBitrate(int bitrate) +int RawFile::setBitrate(int bitrate) { if (bitrate <= 0) { etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); diff --git a/src/input/File.h b/src/input/File.h index 01f4f21..99e0a87 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -59,11 +59,10 @@ class MPEGFile : public FileBase { bool m_parity = false; }; -class DABPlusFile : public FileBase { +class RawFile : public FileBase { public: virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); }; - }; -- cgit v1.2.3 From 076b370ddcf6f8d69bd728dd5ae7b18b18a77b23 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 15:09:29 +0100 Subject: Treat DMB input as data, update TODO accordingly --- TODO | 19 +++++++++++++++---- src/ConfigParser.cpp | 10 +++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/TODO b/TODO index 3bf746f..297e31f 100644 --- a/TODO +++ b/TODO @@ -15,6 +15,7 @@ ODR-DabMod to add EDI input there too. Initial work started in http://git.mpb.li/git/odr-edilib/ + Explicit Service Linking ------------------------ At the moment there is no support to signal explicit service linking. @@ -36,20 +37,29 @@ Clarify usage of PTy We currently transmit dynamic PTy in FIG0/17 since it can be changed in through the remote control. Some receivers might not display the dynamic PTy, but only the static PTy. Clarify if we need to add both PTy variants -to the configuration and the code.o +to the configuration and the code. -Refactor inputs ---------------- +Refactor inputs *ONGOING* +------------------------- The input code is written in very C-like OOP, with structures of function pointers that do dynamic dispatch. Refactoring this to proper classes and documenting it properly will simplify the addition of new input formats, facilitate runtime configurability and clarify the usages of the inputs. -Also, all inputs using UDP are now broken. +Also, all inputs using UDP are now broken. Add statistics to UDP input. Find out what purpose the bridge input serves. +Decide if non-blocking file input is still necessary. + + +Fix DMB input +------------- +The code that does interleaving and reed-solomon encoding for DMB is not used +anymore, and is untested. + + Communicate Leap Seconds ------------------------ Actually, we're supposed to say in FIG0/10 when there is a UTC leap second @@ -58,6 +68,7 @@ concept is totally unaware of that, this is not done. We need to know for EDI TIST, and the ClockTAI class can get the information from the Internet, but it is not used in FIG0/10. + Add support for services with different ECC than ensemble --------------------------------------------------------- FIG 0/9 can transmit an Extended field for this information. Needs change of diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index a311d63..3167d49 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -979,7 +979,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; } - else if (type == "data") { + else if (type == "data" or type == "dmb") { if (proto == "udp") { subchan->input = make_shared(); } else if (proto == "file" or proto == "fifo") { @@ -994,6 +994,14 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; + + if (type == "dmb") { + /* The old dmb input took care of interleaving and Reed-Solomon encoding. This + * code is unported. + * See dabInputDmbFile.cpp + */ + etiLog.level(warn) << "uid " << subchanuid << " of type Dmb uses RAW input"; + } } else { stringstream ss; -- cgit v1.2.3 From 5fd4d99aded3677497c6cf5ab31517a5383333cb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 5 Nov 2016 14:35:25 +0100 Subject: Add Packet File input --- src/ConfigParser.cpp | 7 ++ src/input/File.cpp | 282 +++++++++++++++++++++++++++++++++++++++++++++------ src/input/File.h | 26 ++++- 3 files changed, 280 insertions(+), 35 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 3167d49..1ed1bac 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -1003,6 +1003,13 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, etiLog.level(warn) << "uid " << subchanuid << " of type Dmb uses RAW input"; } } + else if (type == "packet" or type == "enhancedpacket") { + subchan->type = subchannel_type_t::Packet; + subchan->bitrate = DEFAULT_PACKET_BITRATE; + + bool enhanced = (type == "enhancedpacket"); + subchan->input = make_shared(enhanced); + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << " has unknown type!"; diff --git a/src/input/File.cpp b/src/input/File.cpp index 732f2a2..5c61fd4 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -35,9 +35,31 @@ #include "input/File.h" #include "mpeg.h" +#include "ReedSolomon.h" namespace Inputs { +#ifdef _WIN32 +# pragma pack(push, 1) +#endif +struct packetHeader { + unsigned char addressHigh:2; + unsigned char last:1; + unsigned char first:1; + unsigned char continuityIndex:2; + unsigned char packetLength:2; + unsigned char addressLow; + unsigned char dataLength:7; + unsigned char command; +} +#ifdef _WIN32 +# pragma pack(pop) +#else +__attribute((packed)) +#endif +; + + int FileBase::open(const std::string& name) { m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY); @@ -50,6 +72,17 @@ int FileBase::open(const std::string& name) return 0; } +int FileBase::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + + int FileBase::close() { if (m_fd != -1) { @@ -64,6 +97,40 @@ int FileBase::rewind() return ::lseek(m_fd, 0, SEEK_SET); } +ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size) +{ + ssize_t ret = read(m_fd, buffer, size); + + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < (ssize_t)size) { + ssize_t sizeOut = ret; + etiLog.log(info, "reach end of file -> rewinding\n"); + if (rewind() == -1) { + etiLog.log(alert, "ERROR: Can't rewind file\n"); + return -1; + } + + ret = read(m_fd, buffer + sizeOut, size - sizeOut); + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < (ssize_t)size) { + etiLog.log(alert, "ERROR: Not enough data in file\n"); + return -1; + } + } + + return size; +} + int MPEGFile::readFrame(uint8_t* buffer, size_t size) { int result; @@ -177,49 +244,200 @@ int MPEGFile::setBitrate(int bitrate) return bitrate; } - int RawFile::readFrame(uint8_t* buffer, size_t size) { - ssize_t ret = read(m_fd, buffer, size); + return readFromFile(buffer, size); +} - if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); - return -1; - } +PacketFile::PacketFile(bool enhancedPacketMode) +{ + m_enhancedPacketEnabled = enhancedPacketMode; +} - if (ret < (ssize_t)size) { - ssize_t sizeOut = ret; - etiLog.log(info, "reach end of file -> rewinding\n"); - if (rewind() == -1) { - etiLog.log(alert, "ERROR: Can't rewind file\n"); - return -1; - } +int PacketFile::readFrame(uint8_t* buffer, size_t size) +{ + size_t written = 0; + int length; + packetHeader* header; + int indexRow; + int indexCol; - ret = read(m_fd, buffer + sizeOut, size - sizeOut); - if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); - return -1; + while (written < size) { + if (m_enhancedPacketWaiting > 0) { + *buffer = 192 - m_enhancedPacketWaiting; + *buffer /= 22; + *buffer <<= 2; + *(buffer++) |= 0x03; + *(buffer++) = 0xfe; + indexCol = 188; + indexCol += (192 - m_enhancedPacketWaiting) / 12; + indexRow = 0; + indexRow += (192 - m_enhancedPacketWaiting) % 12; + for (int j = 0; j < 22; ++j) { + if (m_enhancedPacketWaiting == 0) { + *(buffer++) = 0; + } + else { + *(buffer++) = m_enhancedPacketData[indexRow][indexCol]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + --m_enhancedPacketWaiting; + } + } + written += 24; + if (m_enhancedPacketWaiting == 0) { + m_enhancedPacketLength = 0; + } } + else if (m_packetLength != 0) { + header = (packetHeader*)(&m_packetData[0]); + if (written + m_packetLength > (unsigned)size) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + length = 24; + } + else if (m_enhancedPacketEnabled) { + if (m_enhancedPacketLength + m_packetLength > (12 * 188)) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + length = 24; + } + else { + std::copy(m_packetData.begin(), + m_packetData.begin() + m_packetLength, + buffer); + length = m_packetLength; + m_packetLength = 0; + } + } + else { + std::copy(m_packetData.begin(), + m_packetData.begin() + m_packetLength, + buffer); + length = m_packetLength; + m_packetLength = 0; + } - if (ret < (ssize_t)size) { - etiLog.log(alert, "ERROR: Not enough data in file\n"); - return -1; + if (m_enhancedPacketEnabled) { + indexCol = m_enhancedPacketLength / 12; + indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0 + for (int j = 0; j < length; ++j) { + m_enhancedPacketData[indexRow][indexCol] = buffer[j]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + } + m_enhancedPacketLength += length; + if (m_enhancedPacketLength >= (12 * 188)) { + m_enhancedPacketLength = (12 * 188); + ReedSolomon encoder(204, 188); + for (int j = 0; j < 12; ++j) { + encoder.encode(&m_enhancedPacketData[j][0], 188); + } + m_enhancedPacketWaiting = 192; + } + } + written += length; + buffer += length; } - } + else { + int nbBytes = readFromFile(buffer, 3); + header = (packetHeader*)buffer; + if (nbBytes == -1) { + if (errno == EAGAIN) goto END_PACKET; + perror("Packet file"); + return -1; + } + else if (nbBytes == 0) { + if (rewind() == -1) { + goto END_PACKET; + } + continue; + } + else if (nbBytes < 3) { + etiLog.log(error, + "Error while reading file for packet header; " + "read %i out of 3 bytes\n", nbBytes); + break; + } - return size; -} + length = header->packetLength * 24 + 24; + if (written + length > size) { + memcpy(&m_packetData[0], header, 3); + readFromFile(&m_packetData[3], length - 3); + m_packetLength = length; + continue; + } -int RawFile::setBitrate(int bitrate) -{ - if (bitrate <= 0) { - etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); - return -1; - } + if (m_enhancedPacketEnabled) { + if (m_enhancedPacketLength + length > (12 * 188)) { + memcpy(&m_packetData[0], header, 3); + readFromFile(&m_packetData[3], length - 3); + m_packetLength = length; + continue; + } + } - return bitrate; + nbBytes = readFromFile(buffer + 3, length - 3); + if (nbBytes == -1) { + perror("Packet file"); + return -1; + } + else if (nbBytes == 0) { + etiLog.log(info, + "Packet header read, but no data!\n"); + if (rewind() == -1) { + goto END_PACKET; + } + continue; + } + else if (nbBytes < length - 3) { + etiLog.log(error, "Error while reading packet file; " + "read %i out of %i bytes\n", nbBytes, length - 3); + break; + } + + if (m_enhancedPacketEnabled) { + indexCol = m_enhancedPacketLength / 12; + indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0 + for (int j = 0; j < length; ++j) { + m_enhancedPacketData[indexRow][indexCol] = buffer[j]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + } + m_enhancedPacketLength += length; + if (m_enhancedPacketLength >= (12 * 188)) { + if (m_enhancedPacketLength > (12 * 188)) { + etiLog.log(error, + "Error, too much enhanced packet data!\n"); + } + ReedSolomon encoder(204, 188); + for (int j = 0; j < 12; ++j) { + encoder.encode(&m_enhancedPacketData[j][0], 188); + } + m_enhancedPacketWaiting = 192; + } + } + written += length; + buffer += length; + } + } +END_PACKET: + while (written < size) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + buffer += 24; + written += 24; + } + return written; } }; diff --git a/src/input/File.h b/src/input/File.h index 99e0a87..bf99748 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -26,6 +26,7 @@ #pragma once #include +#include #include #include #include "input/inputs.h" @@ -37,7 +38,7 @@ class FileBase : public InputBase { public: virtual int open(const std::string& name); virtual int readFrame(uint8_t* buffer, size_t size) = 0; - virtual int setBitrate(int bitrate) = 0; + virtual int setBitrate(int bitrate); virtual int close(); /* Rewind the file @@ -45,8 +46,13 @@ class FileBase : public InputBase { */ virtual int rewind(); protected: + /* Read len bytes from the file into buf, and return + * the number of bytes read, or -1 in case of error. + */ + virtual ssize_t readFromFile(uint8_t* buf, size_t len); + // We use unix open() instead of fopen() because - // we want to do non-blocking I/O + // we might want to do non-blocking I/O in the future int m_fd = -1; }; @@ -62,7 +68,21 @@ class MPEGFile : public FileBase { class RawFile : public FileBase { public: virtual int readFrame(uint8_t* buffer, size_t size); - virtual int setBitrate(int bitrate); +}; + +class PacketFile : public FileBase { + public: + PacketFile(bool enhancedPacketMode); + virtual int readFrame(uint8_t* buffer, size_t size); + + protected: + std::array m_packetData; + size_t m_packetLength; + + bool m_enhancedPacketEnabled = false; + std::array,12> m_enhancedPacketData; + size_t m_enhancedPacketWaiting; + size_t m_enhancedPacketLength; }; }; -- cgit v1.2.3 From 4388257ae49ab64f510294037df50e057ac55794 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 7 Nov 2016 21:59:59 +0100 Subject: Remove parser for old inputs --- src/ConfigParser.cpp | 310 --------------------------------------------------- 1 file changed, 310 deletions(-) (limited to 'src/ConfigParser.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 1ed1bac..0f05076 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -610,316 +610,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->inputUri = inputUri; -#if OLD_INPUTS // {{{ - /* The input is of the old_style type, - * with the struct of function pointers, - * and needs to be a DabInputCompatible - */ - bool input_is_old_style = true; - dabInputOperations operations; - dabProtection* protection = &subchan->protection; - - - if (0) { -#if defined(HAVE_FORMAT_MPEG) - } else if (type == "audio") { - subchan->type = subchannel_type_t::Audio; - subchan->bitrate = 0; - - if (0) { -#if defined(HAVE_INPUT_FILE) - } else if (proto == "file") { - operations = dabInputMpegFileOperations; -#endif // defined(HAVE_INPUT_FILE) -#if defined(HAVE_INPUT_ZEROMQ) - } - else if (proto == "tcp" || - proto == "epgm" || - proto == "ipc") { - input_is_old_style = false; - - auto zmqconfig = setup_zmq_input(pt, subchanuid); - - DabInputZmqMPEG* inzmq = - new DabInputZmqMPEG(subchanuid, zmqconfig); - rcs.enrol(inzmq); - subchan->input = inzmq; - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } - -#endif // defined(HAVE_INPUT_ZEROMQ) - } else { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": Invalid protocol for MPEG input (" << - proto << ")" << endl; - throw runtime_error(ss.str()); - } -#endif // defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_MPEG) -#if defined(HAVE_FORMAT_DABPLUS) - } else if (type == "dabplus") { - subchan->type = subchannel_type_t::Audio; - subchan->bitrate = 32; - - if (0) { -#if defined(HAVE_INPUT_FILE) - } else if (proto == "file") { - operations = dabInputDabplusFileOperations; -#endif // defined(HAVE_INPUT_FILE) -#if defined(HAVE_INPUT_ZEROMQ) - } - else if (proto == "tcp" || - proto == "epgm" || - proto == "ipc") { - input_is_old_style = false; - - auto zmqconfig = setup_zmq_input(pt, subchanuid); - - DabInputZmqAAC* inzmq = - new DabInputZmqAAC(subchanuid, zmqconfig); - - rcs.enrol(inzmq); - subchan->input = inzmq; - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } -#endif // defined(HAVE_INPUT_ZEROMQ) - } else { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": Invalid protocol for DAB+ input (" << - proto << ")" << endl; - throw runtime_error(ss.str()); - } -#endif // defined(HAVE_FORMAT_DABPLUS) - } else if (type == "data" and proto == "prbs") { - input_is_old_style = false; - - subchan->input = make_shared(); - subchan->type = subchannel_type_t::DataDmb; - subchan->bitrate = DEFAULT_DATA_BITRATE; - } else if (type == "data") { - // TODO default proto should be udp:// - if (0) { -#if defined(HAVE_INPUT_UDP) - } else if (proto == "udp") { - operations = dabInputUdpOperations; -#endif -#if defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_RAW) - } else if (proto == "file") { - operations = dabInputRawFileOperations; -#endif -#if defined(HAVE_INPUT_FIFO) - } else if (proto == "fifo") { - operations = dabInputRawFifoOperations; -#endif - } else { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": Invalid protocol for data input (" << - proto << ")" << endl; - throw runtime_error(ss.str()); - } - - subchan->type = subchannel_type_t::DataDmb; - subchan->bitrate = DEFAULT_DATA_BITRATE; -#ifdef HAVE_FORMAT_PACKET - } else if (type == "packet") { - subchan->type = subchannel_type_t::Packet; - subchan->bitrate = DEFAULT_PACKET_BITRATE; -#ifdef HAVE_INPUT_FILE - operations = dabInputPacketFileOperations; -#elif defined(HAVE_INPUT_FIFO) - operations = dabInputFifoOperations; -#else -# pragma error("Must define at least one packet input") -#endif // defined(HAVE_INPUT_FILE) -#ifdef HAVE_FORMAT_EPM - } else if (type == "enhancedpacket") { - subchan->type = subchannel_type_t::Packet; - subchan->bitrate = DEFAULT_PACKET_BITRATE; - operations = dabInputEnhancedPacketFileOperations; -#endif // defined(HAVE_FORMAT_EPM) -#endif // defined(HAVE_FORMAT_PACKET) -#ifdef HAVE_FORMAT_DMB - } else if (type == "dmb") { - // TODO default proto should be UDP - if (0) { -#if defined(HAVE_INPUT_UDP) - } else if (proto == "udp") { - operations = dabInputDmbUdpOperations; -#endif - } else if (proto == "file") { - operations = dabInputDmbFileOperations; - } else { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": Invalid protocol for DMB input (" << - proto << ")" << endl; - throw runtime_error(ss.str()); - } - - subchan->type = subchannel_type_t::DataDmb; - subchan->bitrate = DEFAULT_DATA_BITRATE; -#endif - } else { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has unknown type!"; - throw runtime_error(ss.str()); - } - subchan->startAddress = 0; - - if (type == "audio") { - protection->form = UEP; - protection->level = 2; - protection->uep.tableIndex = 0; - } else { - protection->level = 2; - protection->form = EEP; - protection->eep.profile = EEP_A; - } - - /* Get bitrate */ - try { - subchan->bitrate = pt.get("bitrate"); - if ((subchan->bitrate & 0x7) != 0) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": Bitrate (" << subchan->bitrate << " not a multiple of 8!"; - throw runtime_error(ss.str()); - } - } - catch (ptree_error &e) { - stringstream ss; - ss << "Error, no bitrate defined for subchannel " << subchanuid; - throw runtime_error(ss.str()); - } - -#if defined(HAVE_INPUT_FIFO) && defined(HAVE_INPUT_FILE) - /* Get nonblock */ - bool nonblock = pt.get("nonblock", false); - if (nonblock) { - switch (subchan->type) { -#ifdef HAVE_FORMAT_PACKET - case subchannel_type_t::Packet: - if (operations == dabInputPacketFileOperations) { - operations = dabInputFifoOperations; -#ifdef HAVE_FORMAT_EPM - } else if (operations == dabInputEnhancedPacketFileOperations) { - operations = dabInputEnhancedFifoOperations; -#endif // defined(HAVE_FORMAT_EPM) - } else { - stringstream ss; - ss << "Error, wrong packet operations for subchannel " << - subchanuid; - throw runtime_error(ss.str()); - } - break; -#endif // defined(HAVE_FORMAT_PACKET) -#ifdef HAVE_FORMAT_MPEG - case subchannel_type_t::Audio: - if (operations == dabInputMpegFileOperations) { - operations = dabInputMpegFifoOperations; - } else if (operations == dabInputDabplusFileOperations) { - operations = dabInputDabplusFifoOperations; - } else { - stringstream ss; - ss << "Error, wrong audio operations for subchannel " << - subchanuid; - throw runtime_error(ss.str()); - } - break; -#endif // defined(HAVE_FORMAT_MPEG) - case subchannel_type_t::DataDmb: - case subchannel_type_t::Fidc: - default: - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - " non-blocking I/O only available for audio or packet services!"; - throw runtime_error(ss.str()); - } - } -#endif // defined(HAVE_INPUT_FIFO) && defined(HAVE_INPUT_FILE) - - - /* Get id */ - - try { - subchan->id = hexparse(pt.get("id")); - } - catch (ptree_error &e) { - for (int i = 0; i < 64; ++i) { // Find first free subchannel - vector::iterator subchannel = getSubchannel(ensemble->subchannels, i); - if (subchannel == ensemble->subchannels.end()) { - subchannel = ensemble->subchannels.end() - 1; - subchan->id = i; - break; - } - } - } - - /* Get optional protection profile */ - string profile = pt.get("protection-profile", ""); - - if (profile == "EEP_A") { - protection->form = EEP; - protection->eep.profile = EEP_A; - } - else if (profile == "EEP_B") { - protection->form = EEP; - protection->eep.profile = EEP_B; - } - else if (profile == "UEP") { - protection->form = UEP; - } - - /* Get protection level */ - try { - int level = pt.get("protection"); - - if (protection->form == UEP) { - if ((level < 1) || (level > 5)) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": protection level must be between " - "1 to 5 inclusively (current = " << level << " )"; - throw runtime_error(ss.str()); - } - } - else if (protection->form == EEP) { - if ((level < 1) || (level > 4)) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": protection level must be between " - "1 to 4 inclusively (current = " << level << " )"; - throw runtime_error(ss.str()); - } - } - protection->level = level - 1; - } - catch (ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << - ": protection level undefined!"; - throw runtime_error(ss.str()); - } - - /* Create object */ - if (input_is_old_style) { - subchan->input = new DabInputCompatible(operations); - } - // else { it's already been created! } -#endif // 0 }}} - dabProtection* protection = &subchan->protection; const bool nonblock = pt.get("nonblock", false); -- cgit v1.2.3