diff options
| -rw-r--r-- | Makefile.am | 39 | ||||
| -rw-r--r-- | configure.ac | 8 | ||||
| -rw-r--r-- | lib/ClockTAI.cpp | 607 | ||||
| -rw-r--r-- | lib/ClockTAI.h | 102 | ||||
| -rw-r--r-- | lib/Globals.cpp | 36 | ||||
| -rw-r--r-- | lib/Log.cpp | 207 | ||||
| -rw-r--r-- | lib/Log.h | 200 | ||||
| -rw-r--r-- | lib/ReedSolomon.cpp | 118 | ||||
| -rw-r--r-- | lib/ReedSolomon.h | 56 | ||||
| -rw-r--r-- | lib/RemoteControl.cpp | 579 | ||||
| -rw-r--r-- | lib/RemoteControl.h | 251 | ||||
| -rw-r--r-- | lib/Socket.cpp | 30 | ||||
| -rw-r--r-- | lib/Socket.h | 2 | ||||
| -rw-r--r-- | lib/crc.c (renamed from src/crc.c) | 0 | ||||
| -rw-r--r-- | lib/crc.h (renamed from src/crc.h) | 0 | ||||
| -rw-r--r-- | lib/edi/AFPacket.cpp | 96 | ||||
| -rw-r--r-- | lib/edi/AFPacket.h | 61 | ||||
| -rw-r--r-- | lib/edi/EDIConfig.h | 84 | ||||
| -rw-r--r-- | lib/edi/Interleaver.cpp | 122 | ||||
| -rw-r--r-- | lib/edi/Interleaver.h | 75 | ||||
| -rw-r--r-- | lib/edi/PFT.cpp | 327 | ||||
| -rw-r--r-- | lib/edi/PFT.h | 78 | ||||
| -rw-r--r-- | lib/edi/TagItems.cpp | 449 | ||||
| -rw-r--r-- | lib/edi/TagItems.h | 253 | ||||
| -rw-r--r-- | lib/edi/TagPacket.cpp | 78 | ||||
| -rw-r--r-- | lib/edi/TagPacket.h | 56 | ||||
| -rw-r--r-- | lib/edi/Transport.cpp | 188 | ||||
| -rw-r--r-- | lib/edi/Transport.h | 71 | ||||
| -rw-r--r-- | lib/fec/LICENSE (renamed from src/fec/LICENSE) | 0 | ||||
| -rw-r--r-- | lib/fec/README.md (renamed from src/fec/README.md) | 0 | ||||
| -rw-r--r-- | lib/fec/char.h (renamed from src/fec/char.h) | 0 | ||||
| -rw-r--r-- | lib/fec/decode_rs.h (renamed from src/fec/decode_rs.h) | 0 | ||||
| -rw-r--r-- | lib/fec/decode_rs_char.c (renamed from src/fec/decode_rs_char.c) | 0 | ||||
| -rw-r--r-- | lib/fec/encode_rs.h (renamed from src/fec/encode_rs.h) | 0 | ||||
| -rw-r--r-- | lib/fec/encode_rs_char.c (renamed from src/fec/encode_rs_char.c) | 0 | ||||
| -rw-r--r-- | lib/fec/fec.h (renamed from src/fec/fec.h) | 0 | ||||
| -rw-r--r-- | lib/fec/init_rs.h (renamed from src/fec/init_rs.h) | 0 | ||||
| -rw-r--r-- | lib/fec/init_rs_char.c (renamed from src/fec/init_rs_char.c) | 0 | ||||
| -rw-r--r-- | lib/fec/rs-common.h (renamed from src/fec/rs-common.h) | 0 | 
39 files changed, 4145 insertions, 28 deletions
| diff --git a/Makefile.am b/Makefile.am index 1bac77d..4884dab 100644 --- a/Makefile.am +++ b/Makefile.am @@ -15,21 +15,32 @@ odr_sourcecompanion_SOURCES     = src/odr-sourcecompanion.cpp \  								  src/AVTInput.h src/AVTInput.cpp \  								  src/OrderedQueue.h src/OrderedQueue.cpp \  								  src/StatsPublish.h src/StatsPublish.cpp \ -								  src/crc.h src/crc.c \  								  src/encryption.h src/encryption.c \  								  src/utils.h src/utils.c \ -								  src/fec/char.h \ -								  src/fec/decode_rs_char.c \ -								  src/fec/decode_rs.h \ -								  src/fec/encode_rs_char.c \ -								  src/fec/encode_rs.h \ -								  src/fec/fec.h \ -								  src/fec/init_rs_char.c \ -								  src/fec/init_rs.h \ -								  src/fec/rs-common.h \ +								  lib/fec/char.h \ +								  lib/fec/decode_rs_char.c \ +								  lib/fec/decode_rs.h \ +								  lib/fec/encode_rs_char.c \ +								  lib/fec/encode_rs.h \ +								  lib/fec/fec.h \ +								  lib/fec/init_rs_char.c \ +								  lib/fec/init_rs.h \ +								  lib/fec/rs-common.h \ +								  lib/ClockTAI.h lib/ClockTAI.cpp \ +								  lib/Globals.cpp \ +								  lib/Log.h lib/Log.cpp \ +								  lib/ReedSolomon.h lib/ReedSolomon.cpp \ +								  lib/RemoteControl.h lib/RemoteControl.cpp \ +								  lib/Socket.h lib/Socket.cpp \  								  lib/ThreadsafeQueue.h \ -								  lib/Socket.h lib/Socket.cpp - +								  lib/crc.h lib/crc.c \ +								  lib/edi/AFPacket.h lib/edi/AFPacket.cpp \ +								  lib/edi/EDIConfig.h \ +								  lib/edi/Interleaver.h lib/edi/Interleaver.cpp \ +								  lib/edi/PFT.h lib/edi/PFT.cpp \ +								  lib/edi/TagItems.h lib/edi/TagItems.cpp \ +								  lib/edi/TagPacket.h lib/edi/TagPacket.cpp \ +								  lib/edi/Transport.h lib/edi/Transport.cpp  bin_PROGRAMS =  odr-sourcecompanion$(EXEEXT) @@ -38,8 +49,8 @@ EXTRA_DIST = $(top_srcdir)/bootstrap \  			 $(top_srcdir)/LICENCE \  			 $(top_srcdir)/ChangeLog \  			 $(top_srcdir)/Doxyfile \ -			 $(top_srcdir)/src/fec/LICENSE -			 $(top_srcdir)/src/fec/README.md +			 $(top_srcdir)/lib/fec/LICENSE +			 $(top_srcdir)/lib/fec/README.md  doc: export PROJECT_NUMBER:=$(shell git describe --dirty) diff --git a/configure.ac b/configure.ac index 24f73fe..c1b3938 100644 --- a/configure.ac +++ b/configure.ac @@ -58,6 +58,14 @@ AC_COMPILE_IFELSE( [AC_LANG_PROGRAM([[#include <fdk-aac/aacenc_lib.h>]],                      ]                   ) +AC_CHECK_LIB(curl, curl_easy_init) +have_curl=$ac_cv_lib_curl_curl_easy_init + +AS_IF([test "x$have_curl" = "xyes"], +             [AC_DEFINE(HAVE_CURL, [1], [Define if cURL is available])]) + +AS_IF([test "x$have_curl" = "xno"], +             [AC_MSG_WARN([cURL not found, timestamps will not work])])  AM_CONDITIONAL([IS_GIT_REPO], [test -d '.git']) diff --git a/lib/ClockTAI.cpp b/lib/ClockTAI.cpp new file mode 100644 index 0000000..2656345 --- /dev/null +++ b/lib/ClockTAI.cpp @@ -0,0 +1,607 @@ +/* +   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +   2011, 2012 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +/* This file downloads the TAI-UTC bulletins from the from IETF and parses them + * so that correct time can be communicated in EDI timestamps. + * + * This file contains self-test code that can be executed by running + *  g++ -g -Wall -DTAI_TEST -DHAVE_CURL -std=c++11 -lcurl -pthread \ + *  ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest + */ + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include "ClockTAI.h" +#include "Log.h" + +#include <ctime> +#include <cstdio> +#include <cerrno> +#if SUPPORT_SETTING_CLOCK_TAI +#  include <sys/timex.h> +#endif +#ifdef HAVE_CURL +#  include <curl/curl.h> +#endif +#include <array> +#include <string> +#include <iostream> +#include <algorithm> +#include <regex> +#include <unistd.h> +#include <sys/stat.h> +#include <fcntl.h> + +using namespace std; + +#ifdef DOWNLOADED_IN_THE_PAST_TEST +static bool wait_longer = true; +#endif + +constexpr int download_retry_interval_hours = 1; + +// Offset between NTP time and POSIX time: +// timestamp_unix = timestamp_ntp - ntp_unix_offset +const int64_t ntp_unix_offset = 2208988800L; + +// leap seconds insertion bulletin is available from the IETF and in the TZ +// distribution +static array<const char*, 2> default_tai_urls = { +    "https://www.ietf.org/timezones/data/leap-seconds.list", +    "https://raw.githubusercontent.com/eggert/tz/master/leap-seconds.list", +}; + +// According to the Filesystem Hierarchy Standard, the data in +// /var/tmp "must not be deleted when the system is booted." +static const char *tai_cache_location = "/var/tmp/odr-leap-seconds.cache"; + +// read TAI offset from a valid bulletin in IETF format +static int parse_ietf_bulletin(const std::string& bulletin) +{ +    // Example Line: +    // 3692217600	37	# 1 Jan 2017 +    // +    // NTP timestamp<TAB>leap seconds<TAB># some comment +    // The NTP timestamp starts at epoch 1.1.1900. +    // The difference between NTP timestamps and unix epoch is 70 +    // years i.e. 2208988800 seconds + +    std::regex regex_bulletin(R"(([0-9]+)\s+([0-9]+)\s+#.*)"); + +    time_t now = time(nullptr); + +    int tai_utc_offset = 0; + +    int tai_utc_offset_valid = false; + +    stringstream ss(bulletin); + +    /* We cannot just take the last line, because it might +     * be in the future, announcing an upcoming leap second. +     * +     * So we need to look at the current date, and compare it +     * with the date of the leap second. +     */ +    for (string line; getline(ss, line); ) { + +        std::smatch bulletin_entry; + +        bool is_match = std::regex_search(line, bulletin_entry, regex_bulletin); +        if (is_match) { +            if (bulletin_entry.size() != 3) { +                throw runtime_error( +                        "Incorrect number of matched TAI IETF bulletin entries"); +            } +            const string bulletin_ntp_timestamp(bulletin_entry[1]); +            const string bulletin_offset(bulletin_entry[2]); + +            const int64_t timestamp_unix = +                std::atoll(bulletin_ntp_timestamp.c_str()) - ntp_unix_offset; + +            const int offset = std::atoi(bulletin_offset.c_str()); +            // Ignore entries announcing leap seconds in the future +            if (timestamp_unix < now) { +                tai_utc_offset = offset; +                tai_utc_offset_valid = true; +            } +#if TAI_TEST +            else { +                cerr << "IETF Ignoring offset " << bulletin_offset << +                    " at TS " << bulletin_ntp_timestamp << +                    " in the future" << endl; +            } +#endif +        } +    } + +    if (not tai_utc_offset_valid) { +        throw runtime_error("No data in TAI bulletin"); +    } + +    return tai_utc_offset; +} + + +struct bulletin_state { +    bool valid = false; +    int64_t expiry = 0; +    int offset = 0; + +    bool usable() const { return valid and expiry > 0; } +}; + +static bulletin_state parse_bulletin(const string& bulletin) +{ +    // The bulletin contains one line that specifies an expiration date +    // in NTP time. If that point in time is in the future, we consider +    // the bulletin valid. +    // +    // The entry looks like this: +    //#@	3707596800 + +    bulletin_state ret; + +    std::regex regex_expiration(R"(#@\s+([0-9]+))"); + +    time_t now = time(nullptr); + +    stringstream ss(bulletin); + +    for (string line; getline(ss, line); ) { +        std::smatch bulletin_entry; + +        bool is_match = std::regex_search(line, bulletin_entry, regex_expiration); +        if (is_match) { +            if (bulletin_entry.size() != 2) { +                throw runtime_error( +                        "Incorrect number of matched TAI IETF bulletin expiration"); +            } +            const string expiry_data_str(bulletin_entry[1]); +            const int64_t expiry_unix = +                std::atoll(expiry_data_str.c_str()) - ntp_unix_offset; + +#ifdef TAI_TEST +            etiLog.level(info) << "Bulletin expires in " << expiry_unix - now; +#endif +            ret.expiry = expiry_unix - now; +            try { +                ret.offset = parse_ietf_bulletin(bulletin); +                ret.valid = true; +            } +            catch (const runtime_error& e) { +                etiLog.level(warn) << "Bulletin expiry ok but parse error: " << e.what(); +            } +            break; +        } +    } +    return ret; +} + + +// callback that receives data from cURL +static size_t fill_bulletin(char *ptr, size_t size, size_t nmemb, void *ctx) +{ +    auto *bulletin = reinterpret_cast<stringstream*>(ctx); + +    size_t len = size * nmemb; +    for (size_t i = 0; i < len; i++) { +        *bulletin << ptr[i]; +    } +    return len; +} + +static string download_tai_utc_bulletin(const char* url) +{ +    stringstream bulletin; + +#ifdef HAVE_CURL +    CURL *curl; +    CURLcode res; + +    curl = curl_easy_init(); +    if (curl) { +        curl_easy_setopt(curl, CURLOPT_URL, url); +        /* Tell libcurl to follow redirection */ +        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); +        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, fill_bulletin); +        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &bulletin); + +        res = curl_easy_perform(curl); +        /* always cleanup ! */ +        curl_easy_cleanup(curl); + +        if (res != CURLE_OK) { +            throw runtime_error( "TAI-UTC bulletin download failed: " + +                    string(curl_easy_strerror(res))); +        } +    } +    return bulletin.str(); +#else +    throw runtime_error("Cannot download TAI Clock information without cURL"); +#endif // HAVE_CURL +} + +static string load_bulletin_from_file(const char* cache_filename) +{ +    int fd = open(cache_filename, O_RDWR); // lockf requires O_RDWR +    if (fd == -1) { +        etiLog.level(error) << "TAI-UTC bulletin open cache for reading: " << +            strerror(errno); +        return ""; +    } + +    lseek(fd, 0, SEEK_SET); + +    vector<char> buf(1024); +    vector<char> new_bulletin_data; + +    ssize_t ret = lockf(fd, F_LOCK, 0); +    if (ret == 0) { +        // exclusive lock acquired + +        do { +            ret = read(fd, buf.data(), buf.size()); + +            if (ret == -1) { +                close(fd); +                etiLog.level(error) << "TAI-UTC bulletin read cache: " << +                        strerror(errno); +                return ""; +            } + +            copy(buf.data(), buf.data() + ret, back_inserter(new_bulletin_data)); +        } while (ret > 0); + +        close(fd); + +        return string{new_bulletin_data.data(), new_bulletin_data.size()}; +    } +    else { +        etiLog.level(error) << +            "TAI-UTC bulletin acquire cache lock for reading: " << +            strerror(errno); +        close(fd); +    } +    return ""; +} + +ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) : +    RemoteControllable("clocktai") +{ +    RC_ADD_PARAMETER(expiry, "Number of seconds until TAI Bulletin expires"); + +    if (bulletin_urls.empty()) { +        etiLog.level(debug) << "Initialising default TAI Bulletin URLs"; +        for (const auto url : default_tai_urls) { +            m_bulletin_urls.push_back(url); +        } +    } +    else { +        etiLog.level(debug) << "Initialising user-configured TAI Bulletin URLs"; +        m_bulletin_urls = bulletin_urls; +    } + +    for (const auto url : m_bulletin_urls) { +        etiLog.level(info) << "TAI Bulletin URL: '" << url << "'"; +    } +} + +int ClockTAI::get_valid_offset() +{ +    int offset = 0; +    bool offset_valid = false; + +    std::unique_lock<std::mutex> lock(m_data_mutex); + +    const auto state = parse_bulletin(m_bulletin); +    if (state.usable()) { +#if TAI_TEST +        etiLog.level(info) << "Bulletin already valid"; +#endif +        offset = state.offset; +        offset_valid = true; +    } +    else { +        const auto cache_bulletin = load_bulletin_from_file(tai_cache_location); +#if TAI_TEST +        etiLog.level(info) << "Loaded cache bulletin with " << +            std::count_if(cache_bulletin.cbegin(), cache_bulletin.cend(), +                    [](const char c){ return c == '\n'; }) << " lines"; +#endif +        const auto cache_state = parse_bulletin(cache_bulletin); + +        if (cache_state.usable()) { +            m_bulletin = cache_bulletin; +            offset = cache_state.offset; +            offset_valid = true; +#if TAI_TEST +            etiLog.level(info) << "Bulletin from cache valid with offset=" << offset; +#endif +        } +        else { +            for (const auto url : m_bulletin_urls) { +                try { +#if TAI_TEST +                    etiLog.level(info) << "Load bulletin from " << url; +#endif +                    const auto new_bulletin = download_tai_utc_bulletin(url.c_str()); +                    const auto new_state = parse_bulletin(new_bulletin); +                    if (new_state.usable()) { +                        m_bulletin = new_bulletin; +                        offset = new_state.offset; +                        offset_valid = true; + +                        etiLog.level(debug) << "Loaded valid TAI Bulletin from " << +                            url << " giving offset=" << offset; +                    } +                    else { +                        etiLog.level(debug) << "Skipping invalid TAI bulletin from " +                            << url; +                    } +                } +                catch (const runtime_error& e) { +                    etiLog.level(warn) << +                        "TAI-UTC offset could not be retrieved from " << +                        url << " : " << e.what(); +                } + +                if (offset_valid) { +                    update_cache(tai_cache_location); +                    break; +                } +            } +        } +    } + +    if (offset_valid) { +        // With the current evolution of the offset, we're probably going +        // to reach 500 long after DAB gets replaced by another standard. +        if (offset < 0 or offset > 500) { +            stringstream ss; +            ss << "TAI offset " << offset << " out of range"; +            throw range_error(ss.str()); +        } + +        return offset; +    } +    else { +        // Try again later +        throw download_failed(); +    } +} + + +int ClockTAI::get_offset() +{ +    using namespace std::chrono; +    const auto time_now = system_clock::now(); + +    std::unique_lock<std::mutex> lock(m_data_mutex); + +    if (not m_offset_valid) { +#ifdef DOWNLOADED_IN_THE_PAST_TEST +        // Assume we've downloaded it in the past: + +        m_offset = 37; // Valid in early 2017 +        m_offset_valid = true; + +        // Simulate requiring a new download +        m_bulletin_download_time = time_now - hours(24 * 40); +#else +        // First time we run we must block until we know +        // the offset +        lock.unlock(); +        try { +            m_offset = get_valid_offset(); +        } +        catch (const download_failed&) { +            throw runtime_error("Unable to download TAI bulletin"); +        } +        lock.lock(); +        m_offset_valid = true; +        m_bulletin_download_time = time_now; +#endif +        etiLog.level(info) << +            "Initialised TAI-UTC offset to " << m_offset << "s."; +    } + +    if (time_now - m_bulletin_download_time > hours(24 * 31)) { +        // Refresh if it's older than one month. Leap seconds are +        // announced several months in advance +        etiLog.level(debug) << "Trying to refresh TAI bulletin"; + +        if (m_offset_future.valid()) { +            auto state = m_offset_future.wait_for(seconds(0)); +            switch (state) { +                case future_status::ready: +                    try { +                        m_offset = m_offset_future.get(); +                        m_offset_valid = true; +                        m_bulletin_download_time = time_now; + +                        etiLog.level(info) << +                            "Updated TAI-UTC offset to " << m_offset << "s."; +                    } +                    catch (const download_failed&) { +                        etiLog.level(warn) << +                            "TAI-UTC download failed, will retry in " << +                            download_retry_interval_hours << " hour(s)"; + +                        m_bulletin_download_time += hours(download_retry_interval_hours); +                    } +#ifdef DOWNLOADED_IN_THE_PAST_TEST +                    wait_longer = false; +#endif +                    break; + +                case future_status::deferred: +                case future_status::timeout: +                    // Not ready yet +#ifdef TAI_TEST +                    etiLog.level(debug) << "  async not ready yet"; +#endif +                    break; +            } +        } +        else { +#ifdef TAI_TEST +            etiLog.level(debug) << " Launch async"; +#endif +            m_offset_future = async(launch::async, &ClockTAI::get_valid_offset, this); +        } +    } + +    return m_offset; +} + +#if SUPPORT_SETTING_CLOCK_TAI +int ClockTAI::update_local_tai_clock(int offset) +{ +    struct timex timex_request; +    timex_request.modes = ADJ_TAI; +    timex_request.constant = offset; + +    int err = adjtimex(&timex_request); +    if (err == -1) { +        perror("adjtimex"); +    } + +    printf("adjtimex: %d, tai %d\n", err, timex_request.tai); + +    return err; +} +#endif + +void ClockTAI::update_cache(const char* cache_filename) +{ +    int fd = open(cache_filename, O_RDWR | O_CREAT, 00664); +    if (fd == -1) { +        etiLog.level(error) << +            "TAI-UTC bulletin open cache for writing: " << +            strerror(errno); +        return; +    } + +    lseek(fd, 0, SEEK_SET); + +    ssize_t ret = lockf(fd, F_LOCK, 0); +    if (ret == 0) { +        // exclusive lock acquired +        const char *data = m_bulletin.data(); +        size_t remaining = m_bulletin.size(); + +        while (remaining > 0) { +            ret = write(fd, data, remaining); +            if (ret == -1) { +                close(fd); +                etiLog.level(error) << +                    "TAI-UTC bulletin write cache: " << +                    strerror(errno); +                return; +            } + +            remaining -= ret; +            data += ret; +        } +        etiLog.level(debug) << "TAI-UTC bulletin cache updated"; +        close(fd); +    } +    else { +        close(fd); +        etiLog.level(error) << +            "TAI-UTC bulletin acquire cache lock for writing: " << +            strerror(errno); +        return; +    } +} + + +void ClockTAI::set_parameter(const string& parameter, const string& value) +{ +    if (parameter == "expiry") { +        throw ParameterError("Parameter '" + parameter + +            "' is read-only in controllable " + get_rc_name()); +    } +    else { +        throw ParameterError("Parameter '" + parameter + +            "' is not exported by controllable " + get_rc_name()); +    } +} + +const string ClockTAI::get_parameter(const string& parameter) const +{ +    if (parameter == "expiry") { +        std::unique_lock<std::mutex> lock(m_data_mutex); +        const int64_t expiry = parse_bulletin(m_bulletin).expiry; +        if (expiry > 0) { +            return to_string(expiry); +        } +        else { +            return "Bulletin expired or invalid!"; +        } +    } +    else { +        throw ParameterError("Parameter '" + parameter + +            "' is not exported by controllable " + get_rc_name()); +    } +} + +#if 0 +// Example testing code +void debug_tai_clk() +{ +    struct timespec rt_clk; + +    int err = clock_gettime(CLOCK_REALTIME, &rt_clk); +    if (err) { +        perror("REALTIME clock_gettime failed"); +    } + +    struct timespec tai_clk; + +    err = clock_gettime(CLOCK_TAI, &tai_clk); +    if (err) { +        perror("TAI clock_gettime failed"); +    } + +    printf("RT - TAI = %ld\n", rt_clk.tv_sec - tai_clk.tv_sec); + + +    struct timex timex_request; +    timex_request.modes = 0; // Do not set anything + +    err = adjtimex(&timex_request); +    if (err == -1) { +        perror("adjtimex"); +    } + +    printf("adjtimex: %d, tai %d\n", err, timex_request.tai); +} +#endif + diff --git a/lib/ClockTAI.h b/lib/ClockTAI.h new file mode 100644 index 0000000..50a6323 --- /dev/null +++ b/lib/ClockTAI.h @@ -0,0 +1,102 @@ +/* +   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +   2011, 2012 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +/* The EDI output needs TAI clock, according to ETSI TS 102 693 Annex F + * "EDI Timestamps". This module can set the local CLOCK_TAI clock by + * setting the TAI-UTC offset using adjtimex. + * + * This functionality requires Linux 3.10 (30 Jun 2013) or newer. + */ + +#pragma once + +#include <cstdint> +#include <cstdlib> +#include <sstream> +#include <chrono> +#include <future> +#include <mutex> +#include <string> +#include <vector> +#include "RemoteControl.h" + +// EDI needs to know UTC-TAI, but doesn't need the CLOCK_TAI to be set. +// We can keep this code, maybe for future use +#define SUPPORT_SETTING_CLOCK_TAI 0 + +/* Loads, parses and represents TAI-UTC offset information from the IETF bulletin */ +class ClockTAI : public RemoteControllable { +    public: +        ClockTAI(const std::vector<std::string>& bulletin_urls); + +        // Fetch the bulletin from the IETF website and return the current +        // TAI-UTC offset. +        // Throws runtime_error on failure. +        int get_offset(void); + +#if SUPPORT_SETTING_CLOCK_TAI +        // Update the local TAI clock according to the TAI-UTC offset +        // return 0 on success +        int update_local_tai_clock(int offset); +#endif + +    private: +        class download_failed {}; + +        // Either retrieve the bulletin from the cache or if necessarly +        // download it, and calculate the TAI-UTC offset. +        // Returns the offset or throws download_failed or a range_error +        // if the offset is out of bounds. +        int get_valid_offset(void); + +        // Download of new bulletin is done asynchronously +        std::future<int> m_offset_future; + +        // Protect all data members, as RC functions are in another thread +        mutable std::mutex m_data_mutex; + +        // The currently used TAI-UTC offset +        int m_offset = 0; +        int m_offset_valid = false; + +        std::vector<std::string> m_bulletin_urls; + +        std::string m_bulletin; +        std::chrono::system_clock::time_point m_bulletin_download_time; + +        // Update the cache file with the current m_bulletin +        void update_cache(const char* cache_filename); + + +        /* Remote control */ +        virtual void set_parameter(const std::string& parameter, +               const std::string& value); + +        /* Getting a parameter always returns a string. */ +        virtual const std::string get_parameter(const std::string& parameter) const; +}; + diff --git a/lib/Globals.cpp b/lib/Globals.cpp new file mode 100644 index 0000000..6be26ec --- /dev/null +++ b/lib/Globals.cpp @@ -0,0 +1,36 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +/* Ensure construction and destruction of static globals in the right order */ + +#include "Log.h" +#include "RemoteControl.h" + +// the RC needs logging, and needs to be initialised later. +Logger etiLog; +RemoteControllers rcs; + diff --git a/lib/Log.cpp b/lib/Log.cpp new file mode 100644 index 0000000..abbd69a --- /dev/null +++ b/lib/Log.cpp @@ -0,0 +1,207 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include <list> +#include <cstdarg> +#include <cinttypes> +#include <chrono> + +#include "Log.h" + +using namespace std; + + +Logger::Logger() +{ +    m_io_thread = std::thread(&Logger::io_process, this); +} + +Logger::~Logger() { +    m_message_queue.trigger_wakeup(); +    m_io_thread.join(); + +    std::lock_guard<std::mutex> guard(m_backend_mutex); +    backends.clear(); +} + +void Logger::register_backend(std::shared_ptr<LogBackend> backend) +{ +    std::lock_guard<std::mutex> guard(m_backend_mutex); +    backends.push_back(backend); +} + + +void Logger::log(log_level_t level, const char* fmt, ...) +{ +    if (level == discard) { +        return; +    } + +    int size = 100; +    std::string str; +    va_list ap; +    while (1) { +        str.resize(size); +        va_start(ap, fmt); +        int n = vsnprintf((char *)str.c_str(), size, fmt, ap); +        va_end(ap); +        if (n > -1 && n < size) { +            str.resize(n); +            break; +        } +        if (n > -1) +            size = n + 1; +        else +            size *= 2; +    } + +    logstr(level, move(str)); +} + +void Logger::logstr(log_level_t level, std::string&& message) +{ +    if (level == discard) { +        return; +    } + +    log_message_t m(level, move(message)); +    m_message_queue.push(move(m)); +} + +void Logger::io_process() +{ +    while (1) { +        log_message_t m; +        try { +            m_message_queue.wait_and_pop(m); +        } +        catch (const ThreadsafeQueueWakeup&) { +            break; +        } + +        auto message = m.message; + +        /* Remove a potential trailing newline. +         * It doesn't look good in syslog +         */ +        if (message[message.length()-1] == '\n') { +            message.resize(message.length()-1); +        } + +        { +            std::lock_guard<std::mutex> guard(m_backend_mutex); +            for (auto &backend : backends) { +                backend->log(m.level, message); +            } + +            if (m.level != log_level_t::trace) { +                std::cerr << levels_as_str[m.level] << " " << message << std::endl; +            } +        } +    } +} + + +LogLine Logger::level(log_level_t level) +{ +    return LogLine(this, level); +} + +LogToFile::LogToFile(const std::string& filename) : name("FILE") +{ +    FILE* fd = fopen(filename.c_str(), "a"); +    if (fd == nullptr) { +        fprintf(stderr, "Cannot open log file !"); +        throw std::runtime_error("Cannot open log file !"); +    } + +    log_file.reset(fd); +} + +void LogToFile::log(log_level_t level, const std::string& message) +{ +    if (not (level == log_level_t::trace or level == log_level_t::discard)) { +        const char* log_level_text[] = { +            "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"}; + +        // fprintf is thread-safe +        fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n", +                log_level_text[(size_t)level], message.c_str()); +        fflush(log_file.get()); +    } +} + +void LogToSyslog::log(log_level_t level, const std::string& message) +{ +    if (not (level == log_level_t::trace or level == log_level_t::discard)) { +        int syslog_level = LOG_EMERG; +        switch (level) { +            case debug: syslog_level = LOG_DEBUG; break; +            case info:  syslog_level = LOG_INFO; break; +                        /* we don't have the notice level */ +            case warn:  syslog_level = LOG_WARNING; break; +            case error: syslog_level = LOG_ERR; break; +            default:    syslog_level = LOG_CRIT; break; +            case alert: syslog_level = LOG_ALERT; break; +            case emerg: syslog_level = LOG_EMERG; break; +        } + +        syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str()); +    } +} + +LogTracer::LogTracer(const string& trace_filename) : name("TRACE") +{ +    etiLog.level(info) << "Setting up TRACE to " << trace_filename; + +    FILE* fd = fopen(trace_filename.c_str(), "a"); +    if (fd == nullptr) { +        fprintf(stderr, "Cannot open trace file !"); +        throw std::runtime_error("Cannot open trace file !"); +    } +    m_trace_file.reset(fd); + +    using namespace std::chrono; +    auto now = steady_clock::now().time_since_epoch(); +    m_trace_micros_startup = duration_cast<microseconds>(now).count(); + +    fprintf(m_trace_file.get(), +            "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup); +} + +void LogTracer::log(log_level_t level, const std::string& message) +{ +    if (level == log_level_t::trace) { +        using namespace std::chrono; +        const auto now = steady_clock::now().time_since_epoch(); +        const auto micros = duration_cast<microseconds>(now).count(); + +        fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n", +                micros - m_trace_micros_startup, +                message.c_str()); +    } +} diff --git a/lib/Log.h b/lib/Log.h new file mode 100644 index 0000000..f20e698 --- /dev/null +++ b/lib/Log.h @@ -0,0 +1,200 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include <syslog.h> +#include <cstdarg> +#include <cstdio> +#include <fstream> +#include <sstream> +#include <iostream> +#include <list> +#include <stdexcept> +#include <string> +#include <map> +#include <mutex> +#include <memory> +#include <thread> +#include "ThreadsafeQueue.h" + +#define SYSLOG_IDENT PACKAGE_NAME +#define SYSLOG_FACILITY LOG_LOCAL0 + +enum log_level_t {debug = 0, info, warn, error, alert, emerg, trace, discard}; + +static const std::string levels_as_str[] = +    { "     ", "     ", "WARN ", "ERROR", "ALERT", "EMERG", "TRACE", "-----"} ; + +/** Abstract class all backends must inherit from */ +class LogBackend { +    public: +        virtual ~LogBackend() {}; +        virtual void log(log_level_t level, const std::string& message) = 0; +        virtual std::string get_name() const = 0; +}; + +/** A Logging backend for Syslog */ +class LogToSyslog : public LogBackend { +    public: +        LogToSyslog() : name("SYSLOG") { +            openlog(SYSLOG_IDENT, LOG_PID, SYSLOG_FACILITY); +        } + +        virtual ~LogToSyslog() { +            closelog(); +        } + +        void log(log_level_t level, const std::string& message); + +        std::string get_name() const { return name; } + +    private: +        const std::string name; + +        LogToSyslog(const LogToSyslog& other) = delete; +        const LogToSyslog& operator=(const LogToSyslog& other) = delete; +}; + +class LogToFile : public LogBackend { +    public: +        LogToFile(const std::string& filename); +        void log(log_level_t level, const std::string& message); +        std::string get_name() const { return name; } + +    private: +        const std::string name; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> log_file; + +        LogToFile(const LogToFile& other) = delete; +        const LogToFile& operator=(const LogToFile& other) = delete; +}; + +class LogTracer : public LogBackend { +    public: +        LogTracer(const std::string& filename); +        void log(log_level_t level, const std::string& message); +        std::string get_name() const { return name; } +    private: +        std::string name; +        uint64_t m_trace_micros_startup = 0; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> m_trace_file; + +        LogTracer(const LogTracer& other) = delete; +        const LogTracer& operator=(const LogTracer& other) = delete; +}; + +class LogLine; + +struct log_message_t { +    log_message_t(log_level_t _level, std::string&& _message) : +        level(_level), +        message(move(_message)) {} + +    log_message_t() : +        level(debug), +        message("") {} + +    log_level_t level; +    std::string message; +}; + +class Logger { +    public: +        Logger(); +        Logger(const Logger& other) = delete; +        const Logger& operator=(const Logger& other) = delete; +        ~Logger(); + +        void register_backend(std::shared_ptr<LogBackend> backend); + +        /* Log the message to all backends */ +        void log(log_level_t level, const char* fmt, ...); + +        void logstr(log_level_t level, std::string&& message); + +        /* All logging IO is done in another thread */ +        void io_process(void); + +        /* Return a LogLine for the given level +         * so that you can write etiLog.level(info) << "stuff = " << 21 */ +        LogLine level(log_level_t level); + +    private: +        std::list<std::shared_ptr<LogBackend> > backends; + +        ThreadsafeQueue<log_message_t> m_message_queue; +        std::thread m_io_thread; +        std::mutex m_backend_mutex; +}; + +/* etiLog is a singleton used in all parts of the program to output log messages. + * It is constructed in Globals.cpp */ +extern Logger etiLog; + +// Accumulate a line of logs, using same syntax as stringstream +// The line is logged when the LogLine gets destroyed +class LogLine { +    public: +        LogLine(const LogLine& logline); +        const LogLine& operator=(const LogLine& other) = delete; +        LogLine(Logger* logger, log_level_t level) : +            logger_(logger) +        { +            level_ = level; +        } + +        // Push the new element into the stringstream +        template <typename T> +        LogLine& operator<<(T s) { +            if (level_ != discard) { +                os << s; +            } +            return *this; +        } + +        ~LogLine() +        { +            if (level_ != discard) { +                logger_->logstr(level_, os.str()); +            } +        } + +    private: +        std::ostringstream os; +        log_level_t level_; +        Logger* logger_; +}; + diff --git a/lib/ReedSolomon.cpp b/lib/ReedSolomon.cpp new file mode 100644 index 0000000..1bf0b24 --- /dev/null +++ b/lib/ReedSolomon.cpp @@ -0,0 +1,118 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right +   of Canada (Communications Research Center Canada) + +   Copyright (C) 2016 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "ReedSolomon.h" +#include <vector> +#include <algorithm> +#include <stdexcept> +#include <sstream> +#include <stdio.h>          // For galois.h ... +#include <string.h>         // For memcpy + +extern "C" { +#include "fec/fec.h" +} +#include <assert.h> + +#define SYMSIZE     8 + + +ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) +{ +    setReverse(reverse); + +    m_N = N; +    m_K = K; + +    const int symsize = SYMSIZE; +    const int nroots = N - K; // For EDI PFT, this must be 48 +    const int pad = ((1 << symsize) - 1) - N; // is 255-N + +    rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); + +    if (rsData == nullptr) { +        std::stringstream ss; +        ss << "Invalid Reed-Solomon parameters! " << +            "N=" << N << " ; K=" << K << " ; pad=" << pad; +        throw std::invalid_argument(ss.str()); +    } +} + + +ReedSolomon::~ReedSolomon() +{ +    if (rsData != nullptr) { +        free_rs_char(rsData); +    } +} + + +void ReedSolomon::setReverse(bool state) +{ +    reverse = state; +} + + +int ReedSolomon::encode(void* data, void* fec, size_t size) +{ +    uint8_t* input = reinterpret_cast<uint8_t*>(data); +    uint8_t* output = reinterpret_cast<uint8_t*>(fec); +    int ret = 0; + +    if (reverse) { +        std::vector<uint8_t> buffer(m_N); + +        memcpy(&buffer[0], input, m_K); +        memcpy(&buffer[m_K], output, m_N - m_K); + +        ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); +        if ((ret != 0) && (ret != -1)) { +            memcpy(input, &buffer[0], m_K); +            memcpy(output, &buffer[m_K], m_N - m_K); +        } +    } +    else { +        encode_rs_char(rsData, input, output); +    } + +    return ret; +} + + +int ReedSolomon::encode(void* data, size_t size) +{ +    uint8_t* input = reinterpret_cast<uint8_t*>(data); +    int ret = 0; + +    if (reverse) { +        ret = decode_rs_char(rsData, input, nullptr, 0); +    } +    else { +        encode_rs_char(rsData, input, &input[m_K]); +    } + +    return ret; +} diff --git a/lib/ReedSolomon.h b/lib/ReedSolomon.h new file mode 100644 index 0000000..abcef62 --- /dev/null +++ b/lib/ReedSolomon.h @@ -0,0 +1,56 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right +   of Canada (Communications Research Center Canada) + +   Copyright (C) 2016 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include <config.h> +#endif + +#include <stdlib.h> + +class ReedSolomon +{ +public: +    ReedSolomon(int N, int K, +            bool reverse = false, +            int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); +    ReedSolomon(const ReedSolomon& other) = delete; +    ReedSolomon operator=(const ReedSolomon& other) = delete; +    ~ReedSolomon(); + +    void setReverse(bool state); +    int encode(void* data, void* fec, size_t size); +    int encode(void* data, size_t size); + +private: +    int m_N; +    int m_K; + +    void* rsData; +    bool reverse; +}; + diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp new file mode 100644 index 0000000..4adb90c --- /dev/null +++ b/lib/RemoteControl.cpp @@ -0,0 +1,579 @@ +/* +   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. + */ +#include <list> +#include <string> +#include <iostream> +#include <string> +#include <algorithm> + +#include "RemoteControl.h" + +using namespace std; + +RemoteControllerTelnet::~RemoteControllerTelnet() +{ +    m_active = false; + +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } +} + +void RemoteControllerTelnet::restart() +{ +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    m_restarter_thread = std::thread( +            &RemoteControllerTelnet::restart_thread, +            this, 0); +} + +RemoteControllable::~RemoteControllable() { +    rcs.remove_controllable(this); +} + +std::list<std::string> RemoteControllable::get_supported_parameters() const { +    std::list<std::string> parameterlist; +    for (const auto& param : m_parameters) { +        parameterlist.push_back(param[0]); +    } +    return parameterlist; +} + +void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) { +    m_controllers.push_back(rc); +} + +void RemoteControllers::enrol(RemoteControllable *rc) { +    controllables.push_back(rc); +} + +void RemoteControllers::remove_controllable(RemoteControllable *rc) { +    controllables.remove(rc); +} + +std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) { +    RemoteControllable* controllable = get_controllable_(name); + +    std::list< std::vector<std::string> > allparams; +    for (auto ¶m : controllable->get_supported_parameters()) { +        std::vector<std::string> item; +        item.push_back(param); +        try { +            item.push_back(controllable->get_parameter(param)); +        } +        catch (const ParameterError &e) { +            item.push_back(std::string("error: ") + e.what()); +        } + +        allparams.push_back(item); +    } +    return allparams; +} + +std::string RemoteControllers::get_param(const std::string& name, const std::string& param) { +    RemoteControllable* controllable = get_controllable_(name); +    return controllable->get_parameter(param); +} + +void RemoteControllers::check_faults() { +    for (auto &controller : m_controllers) { +        if (controller->fault_detected()) { +            etiLog.level(warn) << +                "Detected Remote Control fault, restarting it"; +            controller->restart(); +        } +    } +} + +RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) +{ +    auto rc = std::find_if(controllables.begin(), controllables.end(), +            [&](RemoteControllable* r) { return r->get_rc_name() == name; }); + +    if (rc == controllables.end()) { +        throw ParameterError("Module name unknown"); +    } +    else { +        return *rc; +    } +} + +void RemoteControllers::set_param( +        const std::string& name, +        const std::string& param, +        const std::string& value) +{ +    etiLog.level(info) << "RC: Setting " << name << " " << param +        << " to " << value; +    RemoteControllable* controllable = get_controllable_(name); +    try { +        return controllable->set_parameter(param, value); +    } +    catch (const ios_base::failure& e) { +        etiLog.level(info) << "RC: Failed to set " << name << " " << param +        << " to " << value << ": " << e.what(); +        throw ParameterError("Cannot understand value"); +    } +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerTelnet::restart_thread(long) +{ +    m_active = false; + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } + +    m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); +} + +void RemoteControllerTelnet::handle_accept(Socket::TCPSocket&& socket) +{ +    const std::string welcome = PACKAGE_NAME " Remote Control CLI\n" +                                "Write 'help' for help.\n" +                                "**********\n"; +    const std::string prompt = "> "; + +    std::string in_message; + +    try { +        etiLog.level(info) << "RC: Accepted"; + +        socket.sendall(welcome.data(), welcome.size()); + +        while (m_active and in_message != "quit") { +            socket.sendall(prompt.data(), prompt.size()); + +            stringstream in_message_stream; + +            char last_char = '\0'; +            try { +                while (last_char != '\n') { +                    try { +                        auto ret = socket.recv(&last_char, 1, 0, 1000); +                        if (ret == 1) { +                            in_message_stream << last_char; +                        } +                        else { +                            break; +                        } +                    } +                    catch (const Socket::TCPSocket::Timeout&) { +                        if (not m_active) { +                            break; +                        } +                    } +                } +            } +            catch (const Socket::TCPSocket::Interrupted&) { +                in_message_stream.clear(); +            } + + +            if (in_message_stream.str().size() == 0) { +                etiLog.level(info) << "RC: Connection terminated"; +                break; +            } + +            std::getline(in_message_stream, in_message); + +            while (in_message.length() > 0 && +                    (in_message[in_message.length()-1] == '\r' || +                     in_message[in_message.length()-1] == '\n')) { +                in_message.erase(in_message.length()-1, 1); +            } + +            if (in_message.length() == 0) { +                continue; +            } + +            etiLog.level(info) << "RC: Got message '" << in_message << "'"; + +            dispatch_command(socket, in_message); +        } +        etiLog.level(info) << "RC: Closing socket"; +        socket.close(); +    } +    catch (const std::exception& e) { +        etiLog.level(error) << "Remote control caught exception: " << e.what(); +    } +} + +void RemoteControllerTelnet::process(long) +{ +    try { +        m_active = true; + +        m_socket.listen(m_port, "localhost"); + +        etiLog.level(info) << "RC: Waiting for connection on port " << m_port; +        while (m_active) { +            auto sock = m_socket.accept(1000); + +            if (sock.valid()) { +                handle_accept(move(sock)); +                etiLog.level(info) << "RC: Connection closed. Waiting for connection on port " << m_port; +            } +        } +    } +    catch (const runtime_error& e) { +        etiLog.level(warn) << "RC: Encountered error: " << e.what(); +    } + +    etiLog.level(info) << "RC: Leaving"; +    m_fault = true; +} + +static std::vector<std::string> tokenise(const std::string& message) { +    stringstream ss(message); +    std::vector<std::string> all_tokens; +    std::string item; + +    while (std::getline(ss, item, ' ')) { +        all_tokens.push_back(move(item)); +    } +    return all_tokens; +} + + +void RemoteControllerTelnet::dispatch_command(Socket::TCPSocket& socket, string command) +{ +    vector<string> cmd = tokenise(command); + +    if (cmd[0] == "help") { +        reply(socket, +                "The following commands are supported:\n" +                "  list\n" +                "    * Lists the modules that are loaded and their parameters\n" +                "  show MODULE\n" +                "    * Lists all parameters and their values from module MODULE\n" +                "  get MODULE PARAMETER\n" +                "    * Gets the value for the specified PARAMETER from module MODULE\n" +                "  set MODULE PARAMETER VALUE\n" +                "    * Sets the value for the PARAMETER ofr module MODULE\n" +                "  quit\n" +                "    * Terminate this session\n" +                "\n"); +    } +    else if (cmd[0] == "list") { +        stringstream ss; + +        if (cmd.size() == 1) { +            for (auto &controllable : rcs.controllables) { +                ss << controllable->get_rc_name() << endl; + +                list< vector<string> > params = controllable->get_parameter_descriptions(); +                for (auto ¶m : params) { +                    ss << "\t" << param[0] << " : " << param[1] << endl; +                } +            } +        } +        else { +            reply(socket, "Too many arguments for command 'list'"); +        } + +        reply(socket, ss.str()); +    } +    else if (cmd[0] == "show") { +        if (cmd.size() == 2) { +            try { +                stringstream ss; +                list< vector<string> > r = rcs.get_param_list_values(cmd[1]); +                for (auto ¶m_val : r) { +                    ss << param_val[0] << ": " << param_val[1] << endl; +                } +                reply(socket, ss.str()); + +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'show'"); +        } +    } +    else if (cmd[0] == "get") { +        if (cmd.size() == 3) { +            try { +                string r = rcs.get_param(cmd[1], cmd[2]); +                reply(socket, r); +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'get'"); +        } +    } +    else if (cmd[0] == "set") { +        if (cmd.size() >= 4) { +            try { +                stringstream new_param_value; +                for (size_t i = 3; i < cmd.size(); i++) { +                    new_param_value << cmd[i]; + +                    if (i+1 < cmd.size()) { +                        new_param_value << " "; +                    } +                } + +                rcs.set_param(cmd[1], cmd[2], new_param_value.str()); +                reply(socket, "ok"); +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +            catch (const exception &e) { +                reply(socket, "Error: Invalid parameter value. "); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'set'"); +        } +    } +    else if (cmd[0] == "quit") { +        reply(socket, "Goodbye"); +    } +    else { +        reply(socket, "Message not understood"); +    } +} + +void RemoteControllerTelnet::reply(Socket::TCPSocket& socket, string message) +{ +    stringstream ss; +    ss << message << "\r\n"; +    socket.sendall(message.data(), message.size()); +} + + +#if defined(HAVE_ZEROMQ) + +RemoteControllerZmq::~RemoteControllerZmq() { +    m_active = false; +    m_fault = false; + +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } +} + +void RemoteControllerZmq::restart() +{ +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ +    m_active = false; + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } + +    m_child_thread = std::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) +{ +    bool more = true; +    do { +        zmq::message_t msg; +        pSocket.recv(&msg); +        std::string incoming((char*)msg.data(), msg.size()); +        message.push_back(incoming); +        more = msg.more(); +    } while (more); +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) +{ +    zmq::message_t msg(2); +    char repCode[2] = {'o', 'k'}; +    memcpy ((void*) msg.data(), repCode, 2); +    pSocket.send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) +{ +    zmq::message_t msg1(4); +    char repCode[4] = {'f', 'a', 'i', 'l'}; +    memcpy ((void*) msg1.data(), repCode, 4); +    pSocket.send(msg1, ZMQ_SNDMORE); + +    zmq::message_t msg2(error.length()); +    memcpy ((void*) msg2.data(), error.c_str(), error.length()); +    pSocket.send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ +    m_fault = false; + +    // create zmq reply socket for receiving ctrl parameters +    try { +        zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + +        // connect the socket +        int hwm = 100; +        int linger = 0; +        repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +        repSocket.bind(m_endpoint.c_str()); + +        // create pollitem that polls the  ZMQ sockets +        zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; +        while (m_active) { +            zmq::poll(pollItems, 1, 100); +            std::vector<std::string> msg; + +            if (pollItems[0].revents & ZMQ_POLLIN) { +                recv_all(repSocket, msg); + +                std::string command((char*)msg[0].data(), msg[0].size()); + +                if (msg.size() == 1 && command == "ping") { +                    send_ok_reply(repSocket); +                } +                else if (msg.size() == 1 && command == "list") { +                    size_t cohort_size = rcs.controllables.size(); +                    for (auto &controllable : rcs.controllables) { +                        std::stringstream ss; +                        ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," << +                            " \"params\": { "; + +                        list< vector<string> > params = controllable->get_parameter_descriptions(); +                        size_t i = 0; +                        for (auto ¶m : params) { +                            if (i > 0) { +                                ss << ", "; +                            } + +                            ss << "\"" << param[0] << "\": " << +                                "\"" << param[1] << "\""; + +                            i++; +                        } + +                        ss << " } }"; + +                        std::string msg_s = ss.str(); + +                        zmq::message_t zmsg(ss.str().size()); +                        memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); + +                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; +                        repSocket.send(zmsg, flag); +                    } +                } +                else if (msg.size() == 2 && command == "show") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    try { +                        list< vector<string> > r = rcs.get_param_list_values(module); +                        size_t r_size = r.size(); +                        for (auto ¶m_val : r) { +                            std::stringstream ss; +                            ss << param_val[0] << ": " << param_val[1] << endl; +                            zmq::message_t zmsg(ss.str().size()); +                            memcpy(zmsg.data(), ss.str().data(), ss.str().size()); + +                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; +                            repSocket.send(zmsg, flag); +                        } +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else if (msg.size() == 3 && command == "get") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); + +                    try { +                        std::string value = rcs.get_param(module, parameter); +                        zmq::message_t zmsg(value.size()); +                        memcpy ((void*) zmsg.data(), value.data(), value.size()); +                        repSocket.send(zmsg, 0); +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else if (msg.size() == 4 && command == "set") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); +                    std::string value((char*) msg[3].data(), msg[3].size()); + +                    try { +                        rcs.set_param(module, parameter, value); +                        send_ok_reply(repSocket); +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else { +                    send_fail_reply(repSocket, +                            "Unsupported command. commands: list, show, get, set"); +                } +            } +        } +        repSocket.close(); +    } +    catch (const zmq::error_t &e) { +        etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); +    } +    catch (const std::exception& e) { +        etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); +        m_fault = true; +    } +} + +#endif + diff --git a/lib/RemoteControl.h b/lib/RemoteControl.h new file mode 100644 index 0000000..2358b3a --- /dev/null +++ b/lib/RemoteControl.h @@ -0,0 +1,251 @@ +/* +   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   This module adds remote-control capability to some of the dabmux/dabmod modules. + */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#  include "config.h" +#endif + +#if defined(HAVE_ZEROMQ) +#  include "zmq.hpp" +#endif + +#include <list> +#include <map> +#include <memory> +#include <string> +#include <atomic> +#include <iostream> +#include <thread> +#include <stdexcept> + +#include "Log.h" +#include "Socket.h" + +#define RC_ADD_PARAMETER(p, desc) {   \ +  std::vector<std::string> p; \ +  p.push_back(#p); \ +  p.push_back(desc); \ +  m_parameters.push_back(p); \ +} + +class ParameterError : public std::exception +{ +    public: +        ParameterError(std::string message) : m_message(message) {} +        ~ParameterError() throw() {} +        const char* what() const throw() { return m_message.c_str(); } + +    private: +        std::string m_message; +}; + +class RemoteControllable; + +/* Remote controllers (that recieve orders from the user) + * must implement BaseRemoteController + */ +class BaseRemoteController { +    public: +        /* When this returns one, the remote controller cannot be +         * used anymore, and must be restarted +         */ +        virtual bool fault_detected() = 0; + +        /* In case of a fault, the remote controller can be +         * restarted. +         */ +        virtual void restart() = 0; + +        virtual ~BaseRemoteController() {} +}; + +/* Objects that support remote control must implement the following class */ +class RemoteControllable { +    public: +        RemoteControllable(const std::string& name) : +            m_rc_name(name) {} + +        RemoteControllable(const RemoteControllable& other) = delete; +        RemoteControllable& operator=(const RemoteControllable& other) = delete; + +        virtual ~RemoteControllable(); + +        /* return a short name used to identify the controllable. +         * It might be used in the commands the user has to type, so keep +         * it short +         */ +        virtual std::string get_rc_name() const { return m_rc_name; } + +        /* Return a list of possible parameters that can be set */ +        virtual std::list<std::string> get_supported_parameters() const; + +        /* Return a mapping of the descriptions of all parameters */ +        virtual std::list< std::vector<std::string> > +            get_parameter_descriptions() const +            { +                return m_parameters; +            } + +        /* Base function to set parameters. */ +        virtual void set_parameter( +                const std::string& parameter, +                const std::string& value) = 0; + +        /* Getting a parameter always returns a string. */ +        virtual const std::string get_parameter(const std::string& parameter) const = 0; + +    protected: +        std::string m_rc_name; +        std::list< std::vector<std::string> > m_parameters; +}; + +/* Holds all our remote controllers and controlled object. + */ +class RemoteControllers { +    public: +        void add_controller(std::shared_ptr<BaseRemoteController> rc); +        void enrol(RemoteControllable *rc); +        void remove_controllable(RemoteControllable *rc); +        void check_faults(); +        std::list< std::vector<std::string> > get_param_list_values(const std::string& name); +        std::string get_param(const std::string& name, const std::string& param); + +        void set_param( +                const std::string& name, +                const std::string& param, +                const std::string& value); + +        std::list<RemoteControllable*> controllables; + +    private: +        RemoteControllable* get_controllable_(const std::string& name); + +        std::list<std::shared_ptr<BaseRemoteController> > m_controllers; +}; + +/* rcs is a singleton used in all parts of the program to interact with the RC. + * It is constructed in Globals.cpp */ +extern RemoteControllers rcs; + +/* Implements a Remote controller based on a simple telnet CLI + * that listens on localhost + */ +class RemoteControllerTelnet : public BaseRemoteController { +    public: +        RemoteControllerTelnet() +            : m_active(false), +            m_fault(false), +            m_port(0) { } + +        RemoteControllerTelnet(int port) +            : m_active(port > 0), +            m_fault(false), +            m_port(port) +        { +            restart(); +        } + + +        RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; +        RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; + +        ~RemoteControllerTelnet(); + +        virtual bool fault_detected() { return m_fault; } + +        virtual void restart(); + +    private: +        void restart_thread(long); + +        void process(long); + +        void dispatch_command(Socket::TCPSocket& socket, std::string command); +        void reply(Socket::TCPSocket& socket, std::string message); +        void handle_accept(Socket::TCPSocket&& socket); + +        std::atomic<bool> m_active; + +        /* This is set to true if a fault occurred */ +        std::atomic<bool> m_fault; +        std::thread m_restarter_thread; + +        std::thread m_child_thread; + +        Socket::TCPSocket m_socket; +        int m_port; +}; + +#if defined(HAVE_ZEROMQ) +/* Implements a Remote controller using ZMQ transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { +    public: +        RemoteControllerZmq() +            : m_active(false), m_fault(false), +            m_zmqContext(1), +            m_endpoint("") { } + +        RemoteControllerZmq(const std::string& endpoint) +            : m_active(not endpoint.empty()), m_fault(false), +            m_zmqContext(1), +            m_endpoint(endpoint), +            m_child_thread(&RemoteControllerZmq::process, this) { } + +        RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; +        RemoteControllerZmq(const RemoteControllerZmq& other) = delete; + +        ~RemoteControllerZmq(); + +        virtual bool fault_detected() { return m_fault; } + +        virtual void restart(); + +    private: +        void restart_thread(); + +        void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message); +        void send_ok_reply(zmq::socket_t &pSocket); +        void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); +        void process(); + +        std::atomic<bool> m_active; + +        /* This is set to true if a fault occurred */ +        std::atomic<bool> m_fault; +        std::thread m_restarter_thread; + +        zmq::context_t m_zmqContext; + +        std::string m_endpoint; +        std::thread m_child_thread; +}; +#endif + diff --git a/lib/Socket.cpp b/lib/Socket.cpp index cd70a8e..0c3cbb4 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -381,7 +381,7 @@ bool TCPSocket::valid() const      return m_sock != -1;  } -void TCPSocket::connect(const std::string& hostname, int port) +void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)  {      if (m_sock != INVALID_SOCKET) {          throw std::logic_error("You may only connect an invalid TCPSocket"); @@ -415,10 +415,21 @@ void TCPSocket::connect(const std::string& hostname, int port)          if (sfd == -1)              continue; +        if (nonblock) { +            int flags = fcntl(sfd, F_GETFL); +            if (flags == -1) { +                std::string errstr(strerror(errno)); +                throw std::runtime_error("TCP: Could not get socket flags: " + errstr); +            } + +            if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { +                std::string errstr(strerror(errno)); +                throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); +            } +        } +          int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);          if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) { -            // As the TCPClient could set the socket to nonblocking, we -            // must handle EINPROGRESS here              m_sock = sfd;              break;          } @@ -673,9 +684,6 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)          if (ret == 0) {              m_sock.close(); - -            TCPSocket newsock; -            m_sock = std::move(newsock);              reconnect();          } @@ -693,13 +701,9 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)  void TCPClient::reconnect()  { -    int flags = fcntl(m_sock.m_sock, F_GETFL); -    if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { -        std::string errstr(strerror(errno)); -        throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); -    } - -    m_sock.connect(m_hostname, m_port); +    TCPSocket newsock; +    m_sock = std::move(newsock); +    m_sock.connect(m_hostname, m_port, true);  }  TCPConnection::TCPConnection(TCPSocket&& sock) : diff --git a/lib/Socket.h b/lib/Socket.h index 8bb7fe1..c3c37e1 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -162,7 +162,7 @@ class TCPSocket {          TCPSocket& operator=(TCPSocket&& other);          bool valid(void) const; -        void connect(const std::string& hostname, int port); +        void connect(const std::string& hostname, int port, bool nonblock = false);          void listen(int port, const std::string& name);          void close(void); diff --git a/lib/edi/AFPacket.cpp b/lib/edi/AFPacket.cpp new file mode 100644 index 0000000..b38c38b --- /dev/null +++ b/lib/edi/AFPacket.cpp @@ -0,0 +1,96 @@ +/* +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output. +    This implements an AF Packet as defined ETSI TS 102 821. +    Also see ETSI TS 102 693 + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ +#include "config.h" +#include "crc.h" +#include "AFPacket.h" +#include "TagItems.h" +#include "TagPacket.h" +#include <vector> +#include <string> +#include <iostream> +#include <cstdio> +#include <cstdint> +#include <arpa/inet.h> + +namespace edi { + +// Header PT field. AF packet contains TAG payload +const uint8_t AFHEADER_PT_TAG = 'T'; + +// AF Packet Major (3 bits) and Minor (4 bits) version +const uint8_t AFHEADER_VERSION = 0x10; // MAJ=1, MIN=0 + +AFPacket AFPacketiser::Assemble(TagPacket tag_packet) +{ +    std::vector<uint8_t> payload = tag_packet.Assemble(); + +    if (m_verbose) +        std::cerr << "Assemble AFPacket " << seq << std::endl; + +    std::string pack_data("AF"); // SYNC +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); + +    uint32_t taglength = payload.size(); + +    if (m_verbose) +        std::cerr << "         AFPacket payload size " << payload.size() << std::endl; + +    // write length into packet +    packet.push_back((taglength >> 24) & 0xFF); +    packet.push_back((taglength >> 16) & 0xFF); +    packet.push_back((taglength >> 8) & 0xFF); +    packet.push_back(taglength & 0xFF); + +    // fill rest of header +    packet.push_back(seq >> 8); +    packet.push_back(seq & 0xFF); +    seq++; +    packet.push_back((have_crc ? 0x80 : 0) | AFHEADER_VERSION); // ar_cf: CRC=1 +    packet.push_back(AFHEADER_PT_TAG); + +    // insert payload, must have a length multiple of 8 bytes +    packet.insert(packet.end(), payload.begin(), payload.end()); + +    // calculate CRC over AF Header and payload +    uint16_t crc = 0xffff; +    crc = crc16(crc, &(packet.front()), packet.size()); +    crc ^= 0xffff; + +    if (m_verbose) +        fprintf(stderr, "         AFPacket crc %x\n", crc); + +    packet.push_back((crc >> 8) & 0xFF); +    packet.push_back(crc & 0xFF); + +    if (m_verbose) +        std::cerr << "         AFPacket length " << packet.size() << std::endl; + +    return packet; +} + +} diff --git a/lib/edi/AFPacket.h b/lib/edi/AFPacket.h new file mode 100644 index 0000000..f2c4e35 --- /dev/null +++ b/lib/edi/AFPacket.h @@ -0,0 +1,61 @@ +/* +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output. +    This implements an AF Packet as defined ETSI TS 102 821. +    Also see ETSI TS 102 693 + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include <vector> +#include <cstdint> +#include "TagItems.h" +#include "TagPacket.h" + +namespace edi { + +typedef std::vector<uint8_t> AFPacket; + +// ETSI TS 102 821, 6.1 AF packet structure +class AFPacketiser +{ +    public: +        AFPacketiser() : +            m_verbose(false) {}; +        AFPacketiser(bool verbose) : +            m_verbose(verbose) {}; + +        AFPacket Assemble(TagPacket tag_packet); + +    private: +        static const bool have_crc = true; + +        uint16_t seq = 0; //counter that overflows at 0xFFFF + +        bool m_verbose; +}; + +} + diff --git a/lib/edi/EDIConfig.h b/lib/edi/EDIConfig.h new file mode 100644 index 0000000..ca76322 --- /dev/null +++ b/lib/edi/EDIConfig.h @@ -0,0 +1,84 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include <vector> +#include <string> +#include <memory> +#include <cstdint> + +namespace edi { + +/** Configuration for EDI output */ + +struct destination_t { +    virtual ~destination_t() {}; +}; + +// Can represent both unicast and multicast destinations +struct udp_destination_t : public destination_t { +    std::string dest_addr; +    std::string source_addr; +    unsigned int source_port = 0; +    unsigned int ttl = 10; +}; + +// TCP server that can accept multiple connections +struct tcp_server_t : public destination_t { +    unsigned int listen_port = 0; +    size_t max_frames_queued = 1024; +}; + +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { +    std::string dest_addr; +    unsigned int dest_port = 0; +    size_t max_frames_queued = 1024; +}; + +struct configuration_t { +    unsigned chunk_len = 207;        // RSk, data length of each chunk +    unsigned fec       = 0;          // number of fragments that can be recovered +    bool dump          = false;      // dump a file with the EDI packets +    bool verbose       = false; +    bool enable_pft    = false;      // Enable protection and fragmentation +    unsigned int tagpacket_alignment = 0; +    std::vector<std::shared_ptr<destination_t> > destinations; +    unsigned int dest_port = 0;      // common destination port, because it's encoded in the transport layer +    unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms + +    bool enabled() const { return destinations.size() > 0; } +    bool interleaver_enabled() const { return latency_frames > 0; } + +    void print() const; +}; + +} + + diff --git a/lib/edi/Interleaver.cpp b/lib/edi/Interleaver.cpp new file mode 100644 index 0000000..f26a50e --- /dev/null +++ b/lib/edi/Interleaver.cpp @@ -0,0 +1,122 @@ +/* +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   Interleaving of PFT fragments to increase robustness against +   burst packet loss. + +   This is possible because EDI has to assume that fragments may reach +   the receiver out of order. + +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "Interleaver.h" +#include <cassert> + +namespace edi { + +void Interleaver::SetLatency(size_t latency_frames) +{ +    m_latency = latency_frames; +} + +Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments) +{ +    m_fragment_count = fragments.size(); + +    // Create vectors containing Fcount*latency fragments in total +    // and store them into the deque +    if (m_buffer.empty()) { +        m_buffer.emplace_back(); +    } + +    auto& last_buffer = m_buffer.back(); + +    for (auto& fragment : fragments) { +        const bool last_buffer_is_complete = +                (last_buffer.size() >= m_fragment_count * m_latency); + +        if (last_buffer_is_complete) { +            m_buffer.emplace_back(); +            last_buffer = m_buffer.back(); +        } + +        last_buffer.push_back(std::move(fragment)); +    } + +    fragments.clear(); + +    while ( not m_buffer.empty() and +            (m_buffer.front().size() >= m_fragment_count * m_latency)) { + +        auto& first_buffer = m_buffer.front(); + +        assert(first_buffer.size() == m_fragment_count * m_latency); + +        /* Assume we have 5 fragments per AF frame, and latency of 3. +         * This will give the following strides: +         *    0        1     2 +         * +-------+-------+---+ +         * | 0   1 | 2   3 | 4 | +         * |       |   +---+   | +         * | 5   6 | 7 | 8   9 | +         * |   +---+   |       | +         * |10 |11  12 |13  14 | +         * +---+-------+-------+ +         * +         * ix will be 0, 5, 10, 1, 6 in the first loop +         */ + +        for (size_t i = 0; i < m_fragment_count; i++) { +            const size_t ix = m_interleave_offset + m_fragment_count * m_stride; +            m_interleaved_fragments.push_back(first_buffer.at(ix)); + +            m_stride += 1; +            if (m_stride >= m_latency) { +                m_interleave_offset++; +                m_stride = 0; +            } +        } + +        if (m_interleave_offset >= m_fragment_count) { +            m_interleave_offset = 0; +            m_stride = 0; +            m_buffer.pop_front(); +        } +    } + +    std::vector<PFTFragment> interleaved_frags; + +    const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size()); +    std::move(m_interleaved_fragments.begin(), +              m_interleaved_fragments.begin() + n, +              std::back_inserter(interleaved_frags)); +    m_interleaved_fragments.erase( +              m_interleaved_fragments.begin(), +              m_interleaved_fragments.begin() + n); + +    return interleaved_frags; +} + +} + + diff --git a/lib/edi/Interleaver.h b/lib/edi/Interleaver.h new file mode 100644 index 0000000..3029d5d --- /dev/null +++ b/lib/edi/Interleaver.h @@ -0,0 +1,75 @@ +/* +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   Interleaving of PFT fragments to increase robustness against +   burst packet loss. + +   This is possible because EDI has to assume that fragments may reach +   the receiver out of order. + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include <vector> +#include <deque> +#include <stdexcept> +#include <cstdint> +#include "Log.h" +#include "PFT.h" + +namespace edi { + +class Interleaver { +    public: +        using fragment_vec = std::vector<PFTFragment>; + +        /* Configure the interleaver to use latency_frames number of AF +         * packets for interleaving. Total delay through the interleaver +         * will be latency_frames * 24ms +         */ +        void SetLatency(size_t latency_frames); + +        /* Move the fragments for an AF Packet into the interleaver and +         * return interleaved fragments to be transmitted. +         */ +        fragment_vec Interleave(fragment_vec &fragments); + +    private: +        size_t m_latency = 0; +        size_t m_fragment_count = 0; +        size_t m_interleave_offset = 0; +        size_t m_stride = 0; + +        /* Buffer that accumulates enough fragments to interleave */ +        std::deque<fragment_vec> m_buffer; + +        /* Buffer that contains fragments that have been interleaved, +         * to avoid that the interleaver output is too bursty +         */ +        std::deque<PFTFragment> m_interleaved_fragments; +}; + +} + diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp new file mode 100644 index 0000000..371d36f --- /dev/null +++ b/lib/edi/PFT.cpp @@ -0,0 +1,327 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   Protection, Fragmentation and Transport. (PFT) + +   Are supported: +    Reed-Solomon and Fragmentation + +   This implements part of PFT as defined ETSI TS 102 821. + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "config.h" +#include <vector> +#include <list> +#include <cstdio> +#include <cstring> +#include <cstdint> +#include <arpa/inet.h> +#include <stdexcept> +#include <sstream> +#include "PFT.h" +#include "crc.h" +#include "ReedSolomon.h" + +namespace edi { + +using namespace std; + +// An integer division that rounds up, i.e. ceil(a/b) +#define CEIL_DIV(a, b) (a % b == 0  ? a / b : a / b + 1) + +PFT::PFT() { } + +PFT::PFT(const configuration_t &conf) : +    m_k(conf.chunk_len), +    m_m(conf.fec), +    m_dest_port(conf.dest_port), +    m_pseq(0), +    m_num_chunks(0), +    m_verbose(conf.verbose) +    { +        if (m_k > 207) { +            etiLog.level(warn) << +                "EDI PFT: maximum chunk size is 207."; +            throw std::out_of_range("EDI PFT Chunk size too large."); +        } + +        if (m_m > 5) { +            etiLog.level(warn) << +                "EDI PFT: high number of recoverable fragments" +                " may lead to large overhead"; +            // See TS 102 821, 7.2.1 Known values, list entry for 'm' +        } +    } + +RSBlock PFT::Protect(AFPacket af_packet) +{ +    RSBlock rs_block; + +    // number of chunks is ceil(afpacketsize / m_k) +    // TS 102 821 7.2.2: c = ceil(l / k_max) +    m_num_chunks = CEIL_DIV(af_packet.size(), m_k); + +    if (m_verbose) { +        fprintf(stderr, "Protect %zu chunks of size %zu\n", +                m_num_chunks, af_packet.size()); +    } + +    // calculate size of chunk: +    // TS 102 821 7.2.2: k = ceil(l / c) +    // chunk_len does not include the 48 bytes of protection. +    const size_t chunk_len = CEIL_DIV(af_packet.size(), m_num_chunks); +    if (chunk_len > 207) { +        std::stringstream ss; +        ss << "Chunk length " << chunk_len << " too large (>207)"; +        throw std::runtime_error(ss.str()); +    } + +    // The last RS chunk is zero padded +    // TS 102 821 7.2.2: z = c*k - l +    const size_t zero_pad = m_num_chunks * chunk_len - af_packet.size(); + +    // Create the RS(k+p,k) encoder +    const int firstRoot = 1; // Discovered by analysing EDI dump +    const int gfPoly = 0x11d; +    const bool reverse = false; +    // The encoding has to be 255, 207 always, because the chunk has to +    // be padded at the end, and not at the beginning as libfec would +    // do +    ReedSolomon rs_encoder(255, 207, reverse, gfPoly, firstRoot); + +    // add zero padding to last chunk +    for (size_t i = 0; i < zero_pad; i++) { +        af_packet.push_back(0); +    } + +    if (m_verbose) { +        fprintf(stderr, "        add %zu zero padding\n", zero_pad); +    } + +    // Calculate RS for each chunk and assemble RS block +    for (size_t i = 0; i < af_packet.size(); i+= chunk_len) { +        vector<uint8_t> chunk(207); +        vector<uint8_t> protection(PARITYBYTES); + +        // copy chunk_len bytes into new chunk +        memcpy(&chunk.front(), &af_packet[i], chunk_len); + +        // calculate RS for chunk with padding +        rs_encoder.encode(&chunk.front(), &protection.front(), 207); + +        // Drop the padding +        chunk.resize(chunk_len); + +        // append new chunk and protection to the RS Packet +        rs_block.insert(rs_block.end(), chunk.begin(), chunk.end()); +        rs_block.insert(rs_block.end(), protection.begin(), protection.end()); +    } + +    return rs_block; +} + +vector< vector<uint8_t> > PFT::ProtectAndFragment(AFPacket af_packet) +{ +    const bool enable_RS = (m_m > 0); + +    if (enable_RS) { +        RSBlock rs_block = Protect(af_packet); + +#if 0 +        fprintf(stderr, "  af_packet (%zu):", af_packet.size()); +        for (size_t i = 0; i < af_packet.size(); i++) { +            fprintf(stderr, "%02x ", af_packet[i]); +        } +        fprintf(stderr, "\n"); + +        fprintf(stderr, "  rs_block (%zu):", rs_block.size()); +        for (size_t i = 0; i < rs_block.size(); i++) { +            fprintf(stderr, "%02x ", rs_block[i]); +        } +        fprintf(stderr, "\n"); +#endif + +        // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) +        const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1); + +        // Calculate fragment count and size +        // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) +        // l + c*p + z = length of RS block +        const size_t num_fragments = CEIL_DIV(rs_block.size(), max_payload_size); + +        // TS 102 821 7.2.2: ceil((l + c*p + z) / f) +        const size_t fragment_size = CEIL_DIV(rs_block.size(), num_fragments); + +        if (m_verbose) +            fprintf(stderr, "  PnF fragment_size %zu, num frag %zu\n", +                    fragment_size, num_fragments); + +        vector< vector<uint8_t> > fragments(num_fragments); + +        for (size_t i = 0; i < num_fragments; i++) { +            fragments[i].resize(fragment_size); +            for (size_t j = 0; j < fragment_size; j++) { +                const size_t ix = j*num_fragments + i; +                if (ix < rs_block.size()) { +                    fragments[i][j] = rs_block[ix]; +                } +                else { +                    fragments[i][j] = 0; +                } +            } +        } + +        return fragments; +    } +    else { // No RS, only fragmentation +        // TS 102 821 7.2.2: s_max = MTU - h +        // Ethernet MTU is 1500, but maybe you are routing over a network which +        // has some sort of packet encapsulation. Add some margin. +        const size_t max_payload_size = 1400; + +        // Calculate fragment count and size +        // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) +        // l + c*p + z = length of AF packet +        const size_t num_fragments = CEIL_DIV(af_packet.size(), max_payload_size); + +        // TS 102 821 7.2.2: ceil((l + c*p + z) / f) +        const size_t fragment_size = CEIL_DIV(af_packet.size(), num_fragments); +        vector< vector<uint8_t> > fragments(num_fragments); + +        for (size_t i = 0; i < num_fragments; i++) { +            fragments[i].reserve(fragment_size); + +            for (size_t j = 0; j < fragment_size; j++) { +                const size_t ix = i*fragment_size + j; +                if (ix < af_packet.size()) { +                    fragments[i].push_back(af_packet.at(ix)); +                } +                else { +                    break; +                } +            } +        } + +        return fragments; +    } +} + +std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) +{ +    vector< vector<uint8_t> > fragments = ProtectAndFragment(af_packet); +    vector< vector<uint8_t> > pft_fragments; // These contain PF headers + +    const bool enable_RS = (m_m > 0); +    const bool enable_transport = true; + +    unsigned int findex = 0; + +    unsigned fcount = fragments.size(); + +    // calculate size of chunk: +    // TS 102 821 7.2.2: k = ceil(l / c) +    // chunk_len does not include the 48 bytes of protection. +    const size_t chunk_len = enable_RS ? +        CEIL_DIV(af_packet.size(), m_num_chunks) : 0; + +    // The last RS chunk is zero padded +    // TS 102 821 7.2.2: z = c*k - l +    const size_t zero_pad = enable_RS ? +        m_num_chunks * chunk_len - af_packet.size() : 0; + +    for (const auto &fragment : fragments) { +        // Psync +        std::string psync("PF"); +        std::vector<uint8_t> packet(psync.begin(), psync.end()); + +        // Pseq +        packet.push_back(m_pseq >> 8); +        packet.push_back(m_pseq & 0xFF); + +        // Findex +        packet.push_back(findex >> 16); +        packet.push_back(findex >> 8); +        packet.push_back(findex & 0xFF); +        findex++; + +        // Fcount +        packet.push_back(fcount >> 16); +        packet.push_back(fcount >> 8); +        packet.push_back(fcount & 0xFF); + +        // RS (1 bit), transport (1 bit) and Plen (14 bits) +        unsigned int plen = fragment.size(); +        if (enable_RS) { +            plen |= 0x8000; // Set FEC bit +        } + +        if (enable_transport) { +            plen |= 0x4000; // Set ADDR bit +        } + +        packet.push_back(plen >> 8); +        packet.push_back(plen & 0xFF); + +        if (enable_RS) { +            packet.push_back(chunk_len);   // RSk +            packet.push_back(zero_pad);    // RSz +        } + +        if (enable_transport) { +            // Source (16 bits) +            uint16_t addr_source = 0; +            packet.push_back(addr_source >> 8); +            packet.push_back(addr_source & 0xFF); + +            // Dest (16 bits) +            packet.push_back(m_dest_port >> 8); +            packet.push_back(m_dest_port & 0xFF); +        } + +        // calculate CRC over AF Header and payload +        uint16_t crc = 0xffff; +        crc = crc16(crc, &(packet.front()), packet.size()); +        crc ^= 0xffff; + +        packet.push_back((crc >> 8) & 0xFF); +        packet.push_back(crc & 0xFF); + +        // insert payload, must have a length multiple of 8 bytes +        packet.insert(packet.end(), fragment.begin(), fragment.end()); + +        pft_fragments.push_back(packet); + +#if 0 +        fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", +                m_pseq, findex, fcount, plen & ~0xC000); +#endif +    } + +    m_pseq++; + +    return pft_fragments; +} + +} + diff --git a/lib/edi/PFT.h b/lib/edi/PFT.h new file mode 100644 index 0000000..502aa39 --- /dev/null +++ b/lib/edi/PFT.h @@ -0,0 +1,78 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   Protection, Fragmentation and Transport. (PFT) + +   Are supported: +    Reed-Solomon and Fragmentation + +   This implements part of PFT as defined ETSI TS 102 821. + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include <vector> +#include <list> +#include <stdexcept> +#include <cstdint> +#include "AFPacket.h" +#include "Log.h" +#include "ReedSolomon.h" +#include "EDIConfig.h" + +namespace edi { + +typedef std::vector<uint8_t> RSBlock; +typedef std::vector<uint8_t> PFTFragment; + +class PFT +{ +    public: +        static constexpr int PARITYBYTES = 48; + +        PFT(); +        PFT(const configuration_t& conf); + +        // return a list of PFT fragments with the correct +        // PFT headers +        std::vector< PFTFragment > Assemble(AFPacket af_packet); + +        // Apply Reed-Solomon FEC to the AF Packet +        RSBlock Protect(AFPacket af_packet); + +        // Cut a RSBlock into several fragments that can be transmitted +        std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); + +    private: +        unsigned int m_k = 207; // length of RS data word +        unsigned int m_m = 3; // number of fragments that can be recovered if lost +        unsigned int m_dest_port = 12000; // Destination port for transport header +        uint16_t m_pseq = 0; +        size_t m_num_chunks = 0; +        bool m_verbose = 0; +}; + +} + diff --git a/lib/edi/TagItems.cpp b/lib/edi/TagItems.cpp new file mode 100644 index 0000000..9746469 --- /dev/null +++ b/lib/edi/TagItems.cpp @@ -0,0 +1,449 @@ +/* +   EDI output. +    This defines a few TAG items as defined ETSI TS 102 821 and +    ETSI TS 102 693 + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "config.h" +#include "TagItems.h" +#include <vector> +#include <iostream> +#include <string> +#include <cstdint> +#include <stdexcept> + +namespace edi { + +TagStarPTR::TagStarPTR(const std::string& protocol) +    : m_protocol(protocol) +{ +    if (m_protocol.size() != 4) { +        throw std::runtime_error("TagStarPTR protocol invalid length"); +    } +} + +std::vector<uint8_t> TagStarPTR::Assemble() +{ +    //std::cerr << "TagItem *ptr" << std::endl; +    std::string pack_data("*ptr"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); + +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0x40); + +    packet.insert(packet.end(), m_protocol.begin(), m_protocol.end()); + +    // Major +    packet.push_back(0); +    packet.push_back(0); + +    // Minor +    packet.push_back(0); +    packet.push_back(0); +    return packet; +} + +std::vector<uint8_t> TagDETI::Assemble() +{ +    std::string pack_data("deti"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); +    packet.reserve(256); + +    // Placeholder for length +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); + +    uint8_t fct  = dlfc % 250; +    uint8_t fcth = dlfc / 250; + + +    uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); +    packet.push_back(detiHeader >> 8); +    packet.push_back(detiHeader & 0xFF); + +    uint32_t etiHeader = mnsc | (rfu << 16) | (rfa << 17) | +                        (fp << 19) | (mid << 22) | (stat << 24); +    packet.push_back((etiHeader >> 24) & 0xFF); +    packet.push_back((etiHeader >> 16) & 0xFF); +    packet.push_back((etiHeader >> 8) & 0xFF); +    packet.push_back(etiHeader & 0xFF); + +    if (atstf) { +        packet.push_back(utco); + +        packet.push_back((seconds >> 24) & 0xFF); +        packet.push_back((seconds >> 16) & 0xFF); +        packet.push_back((seconds >> 8) & 0xFF); +        packet.push_back(seconds & 0xFF); + +        packet.push_back((tsta >> 16) & 0xFF); +        packet.push_back((tsta >> 8) & 0xFF); +        packet.push_back(tsta & 0xFF); +    } + +    if (ficf) { +        for (size_t i = 0; i < fic_length; i++) { +            packet.push_back(fic_data[i]); +        } +    } + +    if (rfudf) { +        packet.push_back((rfud >> 16) & 0xFF); +        packet.push_back((rfud >> 8) & 0xFF); +        packet.push_back(rfud & 0xFF); +    } + +    // calculate and update size +    // remove TAG name and TAG length fields and convert to bits +    uint32_t taglength = (packet.size() - 8) * 8; + +    // write length into packet +    packet[4] = (taglength >> 24) & 0xFF; +    packet[5] = (taglength >> 16) & 0xFF; +    packet[6] = (taglength >> 8) & 0xFF; +    packet[7] = taglength & 0xFF; + +    dlfc = (dlfc+1) % 5000; + +    /* +    std::cerr << "TagItem deti, packet.size " << packet.size() << std::endl; +    std::cerr << "              fic length " << fic_length << std::endl; +    std::cerr << "              length " << taglength / 8 << std::endl; +    */ +    return packet; +} + +void TagDETI::set_edi_time(const std::time_t t, int tai_utc_offset) +{ +    utco = tai_utc_offset - 32; + +    const std::time_t posix_timestamp_1_jan_2000 = 946684800; + +    seconds = t - posix_timestamp_1_jan_2000 + utco; +} + +std::vector<uint8_t> TagESTn::Assemble() +{ +    std::string pack_data("est"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); +    packet.reserve(mst_length*8 + 16); + +    packet.push_back(id); + +    // Placeholder for length +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); + +    if (tpl > 0x3F) { +        throw std::runtime_error("TagESTn: invalid TPL value"); +    } + +    if (sad > 0x3FF) { +        throw std::runtime_error("TagESTn: invalid SAD value"); +    } + +    if (scid > 0x3F) { +        throw std::runtime_error("TagESTn: invalid SCID value"); +    } + +    uint32_t sstc = (scid << 18) | (sad << 8) | (tpl << 2) | rfa; +    packet.push_back((sstc >> 16) & 0xFF); +    packet.push_back((sstc >> 8) & 0xFF); +    packet.push_back(sstc & 0xFF); + +    for (size_t i = 0; i < mst_length * 8; i++) { +        packet.push_back(mst_data[i]); +    } + +    // calculate and update size +    // remove TAG name and TAG length fields and convert to bits +    uint32_t taglength = (packet.size() - 8) * 8; + +    // write length into packet +    packet[4] = (taglength >> 24) & 0xFF; +    packet[5] = (taglength >> 16) & 0xFF; +    packet[6] = (taglength >> 8) & 0xFF; +    packet[7] = taglength & 0xFF; + +    /* +    std::cerr << "TagItem ESTn, length " << packet.size() << std::endl; +    std::cerr << "              mst_length " << mst_length << std::endl; +    */ +    return packet; +} + +std::vector<uint8_t> TagDSTI::Assemble() +{ +    std::string pack_data("dsti"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); +    packet.reserve(256); + +    // Placeholder for length +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); + +    uint8_t dfctl = dflc % 250; +    uint8_t dfcth = dflc / 250; + + +    uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15); +    packet.push_back(dstiHeader >> 8); +    packet.push_back(dstiHeader & 0xFF); + +    if (stihf) { +        packet.push_back(stat); +        packet.push_back((spid >> 8) & 0xFF); +        packet.push_back(spid & 0xFF); +    } + +    if (atstf) { +        packet.push_back(utco); + +        packet.push_back((seconds >> 24) & 0xFF); +        packet.push_back((seconds >> 16) & 0xFF); +        packet.push_back((seconds >> 8) & 0xFF); +        packet.push_back(seconds & 0xFF); + +        packet.push_back((tsta >> 16) & 0xFF); +        packet.push_back((tsta >> 8) & 0xFF); +        packet.push_back(tsta & 0xFF); +    } + +    if (rfadf) { +        for (size_t i = 0; i < rfad.size(); i++) { +            packet.push_back(rfad[i]); +        } +    } +    // calculate and update size +    // remove TAG name and TAG length fields and convert to bits +    uint32_t taglength = (packet.size() - 8) * 8; + +    // write length into packet +    packet[4] = (taglength >> 24) & 0xFF; +    packet[5] = (taglength >> 16) & 0xFF; +    packet[6] = (taglength >> 8) & 0xFF; +    packet[7] = taglength & 0xFF; + +    dflc = (dflc+1) % 5000; + +    /* +    std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl; +    std::cerr << "              length " << taglength / 8 << std::endl; +    */ +    return packet; +} + +void TagDSTI::set_edi_time(const std::time_t t, int tai_utc_offset) +{ +    utco = tai_utc_offset - 32; + +    const std::time_t posix_timestamp_1_jan_2000 = 946684800; + +    seconds = t - posix_timestamp_1_jan_2000 + utco; +} + +#if 0 +/* Update the EDI time. t is in UTC, TAI offset is requested from adjtimex */ +void TagDSTI::set_edi_time(const std::time_t t) +{ +    if (tai_offset_cache_updated_at == 0 or tai_offset_cache_updated_at + 3600 < t) { +        struct timex timex_request; +        timex_request.modes = 0; + +        int err = adjtimex(&timex_request); +        if (err == -1) { +            throw std::runtime_error("adjtimex failed"); +        } + +        if (timex_request.tai == 0) { +            throw std::runtime_error("CLOCK_TAI is not properly set up"); +        } +        tai_offset_cache = timex_request.tai; +        tai_offset_cache_updated_at = t; + +        fprintf(stderr, "adjtimex: %d, tai %d\n", err, timex_request.tai); +    } + +    utco = tai_offset_cache - 32; + +    const std::time_t posix_timestamp_1_jan_2000 = 946684800; + +    seconds = t - posix_timestamp_1_jan_2000 + utco; +} +#endif + +std::vector<uint8_t> TagSSm::Assemble() +{ +    std::string pack_data("ss"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); +    packet.reserve(istd_length + 16); + +    packet.push_back((id >> 8) & 0xFF); +    packet.push_back(id & 0xFF); + +    // Placeholder for length +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); +    packet.push_back(0); + +    if (rfa > 0x1F) { +        throw std::runtime_error("TagSSm: invalid RFA value"); +    } + +    if (tid > 0x7) { +        throw std::runtime_error("TagSSm: invalid tid value"); +    } + +    if (tidext > 0x7) { +        throw std::runtime_error("TagSSm: invalid tidext value"); +    } + +    if (stid > 0x0FFF) { +        throw std::runtime_error("TagSSm: invalid stid value"); +    } + +    uint32_t istc = (rfa << 19) | (tid << 16) | (tidext << 13) | ((crcstf ? 1 : 0) << 12) | stid; +    packet.push_back((istc >> 16) & 0xFF); +    packet.push_back((istc >> 8) & 0xFF); +    packet.push_back(istc & 0xFF); + +    for (size_t i = 0; i < istd_length; i++) { +        packet.push_back(istd_data[i]); +    } + +    // calculate and update size +    // remove TAG name and TAG length fields and convert to bits +    uint32_t taglength = (packet.size() - 8) * 8; + +    // write length into packet +    packet[4] = (taglength >> 24) & 0xFF; +    packet[5] = (taglength >> 16) & 0xFF; +    packet[6] = (taglength >> 8) & 0xFF; +    packet[7] = taglength & 0xFF; + +    /* +    std::cerr << "TagItem SSm, length " << packet.size() << std::endl; +    std::cerr << "             istd_length " << istd_length << std::endl; +    */ +    return packet; +} + + +std::vector<uint8_t> TagStarDMY::Assemble() +{ +    std::string pack_data("*dmy"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); + +    packet.resize(4 + 4 + length_); + +    const uint32_t length_bits = length_ * 8; + +    packet[4] = (length_bits >> 24) & 0xFF; +    packet[5] = (length_bits >> 16) & 0xFF; +    packet[6] = (length_bits >> 8) & 0xFF; +    packet[7] = length_bits & 0xFF; + +    // The remaining bytes in the packet are "undefined data" + +    return packet; +} + +TagODRVersion::TagODRVersion(const std::string& version, uint32_t uptime_s) : +    m_version(version), +    m_uptime(uptime_s) +{ +} + +std::vector<uint8_t> TagODRVersion::Assemble() +{ +    std::string pack_data("ODRv"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); + +    const size_t length = m_version.size() + sizeof(uint32_t); + +    packet.resize(4 + 4 + length); + +    const uint32_t length_bits = length * 8; + +    size_t i = 4; +    packet[i++] = (length_bits >> 24) & 0xFF; +    packet[i++] = (length_bits >> 16) & 0xFF; +    packet[i++] = (length_bits >> 8) & 0xFF; +    packet[i++] = length_bits & 0xFF; + +    copy(m_version.cbegin(), m_version.cend(), packet.begin() + i); +    i += m_version.size(); + +    packet[i++] = (m_uptime >> 24) & 0xFF; +    packet[i++] = (m_uptime >> 16) & 0xFF; +    packet[i++] = (m_uptime >> 8) & 0xFF; +    packet[i++] = m_uptime & 0xFF; + +    return packet; +} + +TagODRAudioLevels::TagODRAudioLevels(int16_t audiolevel_left, int16_t audiolevel_right) : +    m_audio_left(audiolevel_left), +    m_audio_right(audiolevel_right) +{ +} + +std::vector<uint8_t> TagODRAudioLevels::Assemble() +{ +    std::string pack_data("ODRa"); +    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); + +    constexpr size_t length = 2*sizeof(int16_t); + +    packet.resize(4 + 4 + length); + +    const uint32_t length_bits = length * 8; + +    size_t i = 4; +    packet[i++] = (length_bits >> 24) & 0xFF; +    packet[i++] = (length_bits >> 16) & 0xFF; +    packet[i++] = (length_bits >> 8) & 0xFF; +    packet[i++] = length_bits & 0xFF; + +    packet[i++] = (m_audio_left >> 8) & 0xFF; +    packet[i++] = m_audio_left & 0xFF; + +    packet[i++] = (m_audio_right >> 8) & 0xFF; +    packet[i++] = m_audio_right & 0xFF; + +    return packet; +} + +} + diff --git a/lib/edi/TagItems.h b/lib/edi/TagItems.h new file mode 100644 index 0000000..5c81b01 --- /dev/null +++ b/lib/edi/TagItems.h @@ -0,0 +1,253 @@ +/* +   EDI output. +    This defines a few TAG items as defined ETSI TS 102 821 and +    ETSI TS 102 693 + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include <vector> +#include <array> +#include <chrono> +#include <string> +#include <cstdint> + +namespace edi { + +class TagItem +{ +    public: +        virtual std::vector<uint8_t> Assemble() = 0; +}; + +// ETSI TS 102 693, 5.1.1 Protocol type and revision +class TagStarPTR : public TagItem +{ +    public: +        TagStarPTR(const std::string& protocol); +        std::vector<uint8_t> Assemble(); + +    private: +        std::string m_protocol = ""; +}; + +// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti) +class TagDETI : public TagItem +{ +    public: +        std::vector<uint8_t> Assemble(); + +        /***** DATA in intermediary format ****/ +        // For the ETI Header: must be defined ! +        uint8_t stat = 0; +        uint8_t mid = 0; +        uint8_t fp = 0; +        uint8_t rfa = 0; +        uint8_t rfu = 0; // MNSC is valid +        uint16_t mnsc = 0; +        uint16_t dlfc = 0; // modulo 5000 frame counter + +        // ATST (optional) +        bool atstf = false; // presence of atst data + +        /* UTCO: Offset (in seconds) between UTC and the Seconds value. The +         * value is expressed as an unsigned 8-bit quantity. As of February +         * 2009, the value shall be 2 and shall change as a result of each +         * modification of the number of leap seconds, as proscribed by +         * International Earth Rotation and Reference Systems Service (IERS). +         * +         * According to Annex F +         *  EDI = TAI - 32s (constant) +         *  EDI = UTC + UTCO +         * we derive +         *  UTCO = TAI-UTC - 32 +         * where the TAI-UTC offset is given by the USNO bulletin using +         * the ClockTAI module. +         */ +        uint8_t utco = 0; + +        /* Update the EDI time. t is in UTC */ +        void set_edi_time(const std::time_t t, int tai_utc_offset); + +        /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an +         * unsigned 32-bit quantity. Contrary to POSIX, this value also +         * counts leap seconds. +         */ +        uint32_t seconds = 0; + +        /* TSTA: Shall be the 24 least significant bits of the Time Stamp +         * (TIST) field from the STI-D(LI) Frame. The full definition for the +         * STI TIST can be found in annex B of EN 300 797 [4]. The most +         * significant 8 bits of the TIST field of the incoming STI-D(LI) +         * frame, if required, may be carried in the RFAD field. +         */ +        uint32_t tsta = 0xFFFFFF; + +        // the FIC (optional) +        bool ficf = false; +        const unsigned char* fic_data; +        size_t fic_length; + +        // rfu +        bool rfudf = false; +        uint32_t rfud = 0; + + +}; + +// ETSI TS 102 693, 5.1.5 ETI Sub-Channel Stream <n> +class TagESTn : public TagItem +{ +    public: +        std::vector<uint8_t> Assemble(); + +        // SSTCn +        uint8_t  scid; +        uint16_t sad; +        uint8_t  tpl; +        uint8_t  rfa; + +        // Pointer to MSTn data +        uint8_t* mst_data; +        size_t mst_length; // STLn * 8 bytes + +        uint8_t id; +}; + +// ETSI TS 102 693, 5.1.2 DAB STI-D(LI) Management +class TagDSTI : public TagItem +{ +    public: +        std::vector<uint8_t> Assemble(); + +        // dsti Header +        bool stihf = false; +        bool atstf = false; // presence of atst data +        bool rfadf = false; +        uint16_t dflc = 0; // modulo 5000 frame counter + +        // STI Header (optional) +        uint8_t stat = 0; +        uint16_t spid = 0; + +        /* UTCO: Offset (in seconds) between UTC and the Seconds value. The +         * value is expressed as an unsigned 8-bit quantity. As of February +         * 2009, the value shall be 2 and shall change as a result of each +         * modification of the number of leap seconds, as proscribed by +         * International Earth Rotation and Reference Systems Service (IERS). +         * +         * According to Annex F +         *  EDI = TAI - 32s (constant) +         *  EDI = UTC + UTCO +         * we derive +         *  UTCO = TAI-UTC - 32 +         * where the TAI-UTC offset is given by the USNO bulletin using +         * the ClockTAI module. +         */ +        uint8_t utco = 0; + +        /* Update the EDI time. t is in UTC */ +        void set_edi_time(const std::time_t t, int tai_utc_offset); + +        /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an +         * unsigned 32-bit quantity. Contrary to POSIX, this value also +         * counts leap seconds. +         */ +        uint32_t seconds = 0; + +        /* TSTA: Shall be the 24 least significant bits of the Time Stamp +         * (TIST) field from the STI-D(LI) Frame. The full definition for the +         * STI TIST can be found in annex B of EN 300 797 [4]. The most +         * significant 8 bits of the TIST field of the incoming STI-D(LI) +         * frame, if required, may be carried in the RFAD field. +         */ +        uint32_t tsta = 0xFFFFFF; + +        std::array<uint8_t, 9> rfad; + +    private: +        int tai_offset_cache = 0; +        std::time_t tai_offset_cache_updated_at = 0; +}; + +// ETSI TS 102 693, 5.1.4 STI-D Payload Stream <m> +class TagSSm : public TagItem +{ +    public: +        std::vector<uint8_t> Assemble(); + +        // SSTCn +        uint8_t rfa = 0; +        uint8_t tid = 0; // See EN 300 797, 5.4.1.1. Value 0 means "MSC sub-channel" +        uint8_t tidext = 0; // EN 300 797, 5.4.1.3, Value 0 means "MSC audio stream" +        bool crcstf = false; +        uint16_t stid = 0; + +        // Pointer to ISTDm data +        const uint8_t *istd_data; +        size_t istd_length; // bytes + +        uint16_t id = 0; +}; + +// ETSI TS 102 821, 5.2.2.2 Dummy padding +class TagStarDMY : public TagItem +{ +    public: +        /* length is the TAG value length in bytes */ +        TagStarDMY(uint32_t length) : length_(length) {} +        std::vector<uint8_t> Assemble(); + +    private: +        uint32_t length_; +}; + +// Custom TAG that carries version information of the EDI source +class TagODRVersion : public TagItem +{ +    public: +        TagODRVersion(const std::string& version, uint32_t uptime_s); +        std::vector<uint8_t> Assemble(); + +    private: +        std::string m_version; +        uint32_t m_uptime; +}; + +// Custom TAG that carries audio level metadata +class TagODRAudioLevels : public TagItem +{ +    public: +        TagODRAudioLevels(int16_t audiolevel_left, int16_t audiolevel_right); +        std::vector<uint8_t> Assemble(); + +    private: +        int16_t m_audio_left; +        int16_t m_audio_right; +}; + +} + diff --git a/lib/edi/TagPacket.cpp b/lib/edi/TagPacket.cpp new file mode 100644 index 0000000..b0bf9a1 --- /dev/null +++ b/lib/edi/TagPacket.cpp @@ -0,0 +1,78 @@ +/* +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output. +    This defines a TAG Packet. +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "config.h" +#include "TagPacket.h" +#include "TagItems.h" +#include <vector> +#include <iostream> +#include <string> +#include <list> +#include <cstdint> +#include <cassert> + +namespace edi { + +TagPacket::TagPacket(unsigned int alignment) : m_alignment(alignment) +{ } + +std::vector<uint8_t> TagPacket::Assemble() +{ +    std::list<TagItem*>::iterator tag; + +    std::vector<uint8_t> packet; + +    //std::cerr << "Assemble TAGPacket" << std::endl; + +    for (tag = tag_items.begin(); tag != tag_items.end(); ++tag) { +        std::vector<uint8_t> tag_data = (*tag)->Assemble(); +        packet.insert(packet.end(), tag_data.begin(), tag_data.end()); + +        //std::cerr << "     Add TAGItem of length " << tag_data.size() << std::endl; +    } + +    if (m_alignment == 0) { /* no padding */ } +    else if (m_alignment == 8) { +        // Add padding inside TAG packet +        while (packet.size() % 8 > 0) { +            packet.push_back(0); // TS 102 821, 5.1, "padding shall be undefined" +        } +    } +    else if (m_alignment > 8) { +        TagStarDMY dmy(m_alignment - 8); +        auto dmy_data = dmy.Assemble(); +        packet.insert(packet.end(), dmy_data.begin(), dmy_data.end()); +    } +    else { +        std::cerr << "Invalid alignment requirement " << m_alignment << +            " defined in TagPacket" << std::endl; +    } + +    return packet; +} + +} + diff --git a/lib/edi/TagPacket.h b/lib/edi/TagPacket.h new file mode 100644 index 0000000..1e40ce7 --- /dev/null +++ b/lib/edi/TagPacket.h @@ -0,0 +1,56 @@ +/* +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output. +    This defines a TAG Packet. +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include "TagItems.h" +#include <vector> +#include <string> +#include <list> +#include <cstdint> + +namespace edi { + +// A TagPacket is nothing else than a list of tag items, with an +// Assemble function that puts the bytestream together and adds +// padding such that the total length is a multiple of 8 Bytes. +// +// ETSI TS 102 821, 5.1 Tag Packet +class TagPacket +{ +    public: +        TagPacket(unsigned int alignment); +        std::vector<uint8_t> Assemble(); + +        std::list<TagItem*> tag_items; + +    private: +        unsigned int m_alignment; +}; + +} + diff --git a/lib/edi/Transport.cpp b/lib/edi/Transport.cpp new file mode 100644 index 0000000..0d5c237 --- /dev/null +++ b/lib/edi/Transport.cpp @@ -0,0 +1,188 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ +#include "Transport.h" +#include <iterator> + +using namespace std; + +namespace edi { + +void configuration_t::print() const +{ +    etiLog.level(info) << "EDI"; +    etiLog.level(info) << " verbose     " << verbose; +    for (auto edi_dest : destinations) { +        if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { +            etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; +            if (not udp_dest->source_addr.empty()) { +                etiLog.level(info) << "  source      " << udp_dest->source_addr; +                etiLog.level(info) << "  ttl         " << udp_dest->ttl; +            } +            etiLog.level(info) << "  source port " << udp_dest->source_port; +        } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { +            etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; +            etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued; +        } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; +            etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued; +        } +        else { +            throw logic_error("EDI destination not implemented"); +        } +    } +    if (interleaver_enabled()) { +        etiLog.level(info) << " interleave     " << latency_frames * 24 << " ms"; +    } +} + + +Sender::Sender(const configuration_t& conf) : +    m_conf(conf), +    edi_pft(m_conf) +{ +    if (m_conf.verbose) { +        etiLog.log(info, "Setup EDI"); +    } + +    for (const auto& edi_dest : m_conf.destinations) { +        if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { +            auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); + +            if (not udp_dest->source_addr.empty()) { +                udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); +                udp_socket->setMulticastTTL(udp_dest->ttl); +            } + +            udp_sockets.emplace(udp_dest.get(), udp_socket); +        } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { +            auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); +            dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); +            tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); +        } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            auto tcp_socket = make_shared<Socket::TCPSocket>(); +            tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); +            tcp_senders.emplace(tcp_dest.get(), tcp_socket); +        } +        else { +            throw logic_error("EDI destination not implemented"); +        } +    } + +    if (m_conf.interleaver_enabled()) { +        edi_interleaver.SetLatency(m_conf.latency_frames); +    } + +    if (m_conf.dump) { +        edi_debug_file.open("./edi.debug"); +    } + +    if (m_conf.verbose) { +        etiLog.log(info, "EDI set up"); +    } +} + +void Sender::write(const TagPacket& tagpacket) +{ +    // Assemble into one AF Packet +    edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + +    if (m_conf.enable_pft) { +        // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) +        vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); + +        if (m_conf.verbose) { +            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", +                    edi_fragments.size()); +        } + +        if (m_conf.interleaver_enabled()) { +            edi_fragments = edi_interleaver.Interleave(edi_fragments); +        } + +        // Send over ethernet +        for (const auto& edi_frag : edi_fragments) { +            for (auto& dest : m_conf.destinations) { +                if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { +                    Socket::InetAddress addr; +                    addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + +                    udp_sockets.at(udp_dest.get())->send(edi_frag, addr); +                } +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { +                    tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); +                } +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); +                } +                else { +                    throw logic_error("EDI destination not implemented"); +                } +            } + +            if (m_conf.dump) { +                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +            } +        } + +        if (m_conf.verbose) { +            fprintf(stderr, "EDI number of PFT fragments %zu\n", +                    edi_fragments.size()); +        } +    } +    else { +        // Send over ethernet +        for (auto& dest : m_conf.destinations) { +            if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { +                Socket::InetAddress addr; +                addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + +                udp_sockets.at(udp_dest.get())->send(af_packet, addr); +            } +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { +                tcp_dispatchers.at(tcp_dest.get())->write(af_packet); +            } +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); +            } +            else { +                throw logic_error("EDI destination not implemented"); +            } +        } + +        if (m_conf.dump) { +            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +            copy(af_packet.begin(), af_packet.end(), debug_iterator); +        } +    } +} + +} diff --git a/lib/edi/Transport.h b/lib/edi/Transport.h new file mode 100644 index 0000000..df6fe56 --- /dev/null +++ b/lib/edi/Transport.h @@ -0,0 +1,71 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + +   EDI output, +   UDP and TCP transports and their configuration + +   */ +/* +   This file is part of the ODR-mmbTools. + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "config.h" +#include "EDIConfig.h" +#include "AFPacket.h" +#include "PFT.h" +#include "Interleaver.h" +#include "Socket.h" +#include <vector> +#include <unordered_map> +#include <stdexcept> +#include <fstream> +#include <cstdint> + +namespace edi { + +/** Configuration for EDI output */ + +class Sender { +    public: +        Sender(const configuration_t& conf); + +        void write(const TagPacket& tagpacket); + +    private: +        configuration_t m_conf; +        std::ofstream edi_debug_file; + +        // The TagPacket will then be placed into an AFPacket +        edi::AFPacketiser edi_afPacketiser; + +        // The AF Packet will be protected with reed-solomon and split in fragments +        edi::PFT edi_pft; + +        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order +        edi::Interleaver edi_interleaver; + +        std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; +        std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; +        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; +}; + +} + diff --git a/src/fec/LICENSE b/lib/fec/LICENSE index 5a883d3..5a883d3 100644 --- a/src/fec/LICENSE +++ b/lib/fec/LICENSE diff --git a/src/fec/README.md b/lib/fec/README.md index a44d28d..a44d28d 100644 --- a/src/fec/README.md +++ b/lib/fec/README.md diff --git a/src/fec/char.h b/lib/fec/char.h index 25efd65..25efd65 100644 --- a/src/fec/char.h +++ b/lib/fec/char.h diff --git a/src/fec/decode_rs.h b/lib/fec/decode_rs.h index c165cf3..c165cf3 100644 --- a/src/fec/decode_rs.h +++ b/lib/fec/decode_rs.h diff --git a/src/fec/decode_rs_char.c b/lib/fec/decode_rs_char.c index 7105233..7105233 100644 --- a/src/fec/decode_rs_char.c +++ b/lib/fec/decode_rs_char.c diff --git a/src/fec/encode_rs.h b/lib/fec/encode_rs.h index 2c157f9..2c157f9 100644 --- a/src/fec/encode_rs.h +++ b/lib/fec/encode_rs.h diff --git a/src/fec/encode_rs_char.c b/lib/fec/encode_rs_char.c index a9bf2b8..a9bf2b8 100644 --- a/src/fec/encode_rs_char.c +++ b/lib/fec/encode_rs_char.c diff --git a/src/fec/fec.h b/lib/fec/fec.h index 0d1bae1..0d1bae1 100644 --- a/src/fec/fec.h +++ b/lib/fec/fec.h diff --git a/src/fec/init_rs.h b/lib/fec/init_rs.h index 2b2ae98..2b2ae98 100644 --- a/src/fec/init_rs.h +++ b/lib/fec/init_rs.h diff --git a/src/fec/init_rs_char.c b/lib/fec/init_rs_char.c index a51099a..a51099a 100644 --- a/src/fec/init_rs_char.c +++ b/lib/fec/init_rs_char.c diff --git a/src/fec/rs-common.h b/lib/fec/rs-common.h index e64eb39..e64eb39 100644 --- a/src/fec/rs-common.h +++ b/lib/fec/rs-common.h | 
