From 03967733d70220e2de7af3cdad320aec5c82ede1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 25 Jun 2019 10:50:23 +0200 Subject: Add more EDI input improvements --- lib/edi/common.cpp | 300 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 lib/edi/common.cpp (limited to 'lib/edi/common.cpp') diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp new file mode 100644 index 0000000..bc0fa1b --- /dev/null +++ b/lib/edi/common.cpp @@ -0,0 +1,300 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "common.hpp" +#include "buffer_unpack.hpp" +#include "Log.h" +#include "crc.h" +#include +#include +#include +#include + +namespace EdiDecoder { + +using namespace std; + +string frame_timestamp_t::to_string() const +{ + const time_t seconds_in_unix_epoch = to_unix_epoch(); + + stringstream ss; + ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z"); + return ss.str(); +} + +time_t frame_timestamp_t::to_unix_epoch() const +{ + // EDI epoch: 2000-01-01T00:00:00Z + // Convert using + // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))' + return 946684800 + seconds - utco; +} + + +TagDispatcher::TagDispatcher( + std::function&& af_packet_completed, bool verbose) : + m_af_packet_completed(move(af_packet_completed)) +{ + m_pft.setVerbose(verbose); +} + +void TagDispatcher::push_bytes(const vector &buf) +{ + copy(buf.begin(), buf.end(), back_inserter(m_input_data)); + + while (m_input_data.size() > 2) { + if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { + const decode_state_t st = decode_afpacket(m_input_data); + + if (st.num_bytes_consumed == 0 and not st.complete) { + // We need to refill our buffer + break; + } + + if (st.num_bytes_consumed) { + vector remaining_data; + copy(m_input_data.begin() + st.num_bytes_consumed, + m_input_data.end(), + back_inserter(remaining_data)); + m_input_data = remaining_data; + } + + if (st.complete) { + m_af_packet_completed(); + } + } + else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { + PFT::Fragment fragment; + const size_t fragment_bytes = fragment.loadData(m_input_data); + + if (fragment_bytes == 0) { + // We need to refill our buffer + break; + } + + vector remaining_data; + copy(m_input_data.begin() + fragment_bytes, + m_input_data.end(), + back_inserter(remaining_data)); + m_input_data = remaining_data; + + if (fragment.isValid()) { + m_pft.pushPFTFrag(fragment); + } + + auto af = m_pft.getNextAFPacket(); + if (not af.empty()) { + decode_state_t st = decode_afpacket(af); + + if (st.complete) { + m_af_packet_completed(); + } + } + } + else { + etiLog.log(warn,"Unknown %c!", *m_input_data.data()); + m_input_data.erase(m_input_data.begin()); + } + } +} + +void TagDispatcher::push_packet(const vector &buf) +{ + if (buf.size() < 2) { + throw std::invalid_argument("Not enough bytes to read EDI packet header"); + } + + if (buf[0] == 'A' and buf[1] == 'F') { + const decode_state_t st = decode_afpacket(buf); + + if (st.complete) { + m_af_packet_completed(); + } + + } + else if (buf[0] == 'P' and buf[1] == 'F') { + PFT::Fragment fragment; + fragment.loadData(buf); + + if (fragment.isValid()) { + m_pft.pushPFTFrag(fragment); + } + + auto af = m_pft.getNextAFPacket(); + if (not af.empty()) { + const decode_state_t st = decode_afpacket(af); + + if (st.complete) { + m_af_packet_completed(); + } + } + } + else { + const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; + std::stringstream ss; + ss << "Unknown EDI packet "; + ss << packettype; + throw std::invalid_argument(ss.str()); + } +} + +void TagDispatcher::setMaxDelay(int num_af_packets) +{ + m_pft.setMaxDelay(num_af_packets); +} + + +#define AFPACKET_HEADER_LEN 10 // includes SYNC +decode_state_t TagDispatcher::decode_afpacket( + const std::vector &input_data) +{ + if (input_data.size() < AFPACKET_HEADER_LEN) { + return {false, 0}; + } + + // read length from packet + uint32_t taglength = read_32b(input_data.begin() + 2); + uint16_t seq = read_16b(input_data.begin() + 6); + + const size_t crclength = 2; + if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { + return {false, 0}; + } + + if (m_last_seq + 1 != seq) { + etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; + } + m_last_seq = seq; + + bool has_crc = (input_data[8] & 0x80) ? true : false; + uint8_t major_revision = (input_data[8] & 0x70) >> 4; + uint8_t minor_revision = input_data[8] & 0x0F; + if (major_revision != 1 or minor_revision != 0) { + throw invalid_argument("EDI AF Packet has wrong revision " + + to_string(major_revision) + "." + to_string(minor_revision)); + } + uint8_t pt = input_data[9]; + if (pt != 'T') { + // only support Tag + return {false, 0}; + } + + + if (not has_crc) { + throw invalid_argument("AF packet not supported, has no CRC"); + } + + uint16_t crc = 0xffff; + for (size_t i = 0; i < AFPACKET_HEADER_LEN + taglength; i++) { + crc = crc16(crc, &input_data[i], 1); + } + crc ^= 0xffff; + + uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); + + if (packet_crc != crc) { + throw invalid_argument( + "AF Packet crc wrong"); + } + else { + vector payload(taglength); + copy(input_data.begin() + AFPACKET_HEADER_LEN, + input_data.begin() + AFPACKET_HEADER_LEN + taglength, + payload.begin()); + + return {decode_tagpacket(payload), + AFPACKET_HEADER_LEN + taglength + 2}; + } +} + +void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h) +{ + m_handlers[tag] = move(h); +} + + +bool TagDispatcher::decode_tagpacket(const vector &payload) +{ + size_t length = 0; + + bool success = true; + + for (size_t i = 0; i + 8 < payload.size(); i += 8 + length) { + char tag_sz[5]; + tag_sz[4] = '\0'; + copy(payload.begin() + i, payload.begin() + i + 4, tag_sz); + + string tag(tag_sz); + + uint32_t taglength = read_32b(payload.begin() + i + 4); + + if (taglength % 8 != 0) { + etiLog.log(warn, "Invalid tag length!"); + break; + } + taglength /= 8; + + length = taglength; + + vector tag_value(taglength); + copy( payload.begin() + i+8, + payload.begin() + i+8+taglength, + tag_value.begin()); + + bool tagsuccess = false; + bool found = false; + for (auto tag_handler : m_handlers) { + if (tag_handler.first.size() == 4 and tag_handler.first == tag) { + found = true; + tagsuccess = tag_handler.second(tag_value, 0); + } + else if (tag_handler.first.size() == 3 and + tag.substr(0, 3) == tag_handler.first) { + found = true; + uint8_t n = tag_sz[3]; + tagsuccess = tag_handler.second(tag_value, n); + } + else if (tag_handler.first.size() == 2 and + tag.substr(0, 2) == tag_handler.first) { + found = true; + uint16_t n = 0; + n = (uint16_t)(tag_sz[2]) << 8; + n |= (uint16_t)(tag_sz[3]); + tagsuccess = tag_handler.second(tag_value, n); + } + } + + if (not found) { + etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + break; + } + + if (not tagsuccess) { + etiLog.log(warn, "Error decoding TAG %s", tag.c_str()); + success = tagsuccess; + break; + } + } + + return success; +} + +} -- cgit v1.2.3 From 14f69f9c915cf644147a52b803d79ff8f40a4ea1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 29 Jul 2019 15:27:32 +0200 Subject: First prototype taking EDI TIST into account for contribution --- lib/edi/STIWriter.cpp | 1 + lib/edi/common.cpp | 25 ++++++++++++++++++++++++- lib/edi/common.hpp | 4 ++++ src/input/Edi.cpp | 48 ++++++++++++++++++++++++++++++++++++++++-------- src/input/Edi.h | 3 ++- src/utils.cpp | 2 +- 6 files changed, 72 insertions(+), 11 deletions(-) (limited to 'lib/edi/common.cpp') diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index 6964eb1..399922a 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -120,6 +120,7 @@ void STIWriter::assemble() m_stiFrame.frame = m_payload.istd; m_stiFrame.timestamp.seconds = m_seconds; m_stiFrame.timestamp.utco = m_utco; + m_stiFrame.timestamp.tsta = m_management_data.tsta; } sti_frame_t STIWriter::getFrame() diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index bc0fa1b..b4b0c79 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -25,18 +25,31 @@ #include #include #include +#include #include namespace EdiDecoder { using namespace std; +bool frame_timestamp_t::valid() const +{ + return tsta != 0xFFFFFF; +} + string frame_timestamp_t::to_string() const { const time_t seconds_in_unix_epoch = to_unix_epoch(); stringstream ss; - ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z"); + if (valid()) { + ss << "Timestamp: "; + } + else { + ss << "Timestamp not valid: "; + } + ss << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z") << + " + " << ((double)tsta / 16384000.0); return ss.str(); } @@ -48,6 +61,16 @@ time_t frame_timestamp_t::to_unix_epoch() const return 946684800 + seconds - utco; } +std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const +{ + auto ts = chrono::system_clock::from_time_t(to_unix_epoch()); + + // PPS offset in seconds = tsta / 16384000 + ts += chrono::nanoseconds(std::lrint(tsta / 0.016384)); + + return ts; +} + TagDispatcher::TagDispatcher( std::function&& af_packet_completed, bool verbose) : diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 1433004..887bc3d 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -23,6 +23,7 @@ #include "PFT.hpp" #include #include +#include #include #include #include @@ -33,9 +34,12 @@ namespace EdiDecoder { struct frame_timestamp_t { uint32_t seconds = 0; uint32_t utco = 0; + uint32_t tsta = 0; // According to EN 300 797 Annex B + bool valid() const; std::string to_string() const; time_t to_unix_epoch() const; + std::chrono::system_clock::time_point to_system_clock() const; }; struct decode_state_t { diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 8aee296..765a355 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -27,6 +27,7 @@ #include "input/Edi.h" #include +#include #include #include #include @@ -41,7 +42,7 @@ namespace Inputs { constexpr bool VERBOSE = false; constexpr size_t TCP_BLOCKSIZE = 2048; -constexpr size_t MAX_FRAMES_QUEUED = 10; +constexpr size_t MAX_FRAMES_QUEUED = 1000; Edi::Edi() : m_tcp_receive_server(TCP_BLOCKSIZE), @@ -96,23 +97,53 @@ int Edi::open(const std::string& name) int Edi::readFrame(uint8_t* buffer, size_t size) { - vector frame; + if (m_pending_sti_frame.frame.empty()) { + m_frames.try_pop(m_pending_sti_frame); + } + + if (not m_pending_sti_frame.frame.empty()) { + if (m_pending_sti_frame.frame.size() != size) { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + } + else { + const auto now = chrono::system_clock::now(); + + if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { + etiLog.level(debug) << "EDI input take frame with TS " << + m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + + std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); + m_pending_sti_frame.frame.clear(); + } + else { + etiLog.level(debug) << "EDI input skip frame with TS " << + m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + } + } + } + + return size; + +#if 0 + EdiDecoder::sti_frame_t sti; if (m_is_prebuffering) { m_is_prebuffering = m_frames.size() < 10; if (not m_is_prebuffering) { etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; } } - else if (m_frames.try_pop(frame)) { - if (frame.size() == 0) { + else if (m_frames.try_pop(sti)) { + if (sti.frame.size() == 0) { etiLog.level(debug) << "EDI input " << m_name << " empty frame"; return 0; } - else if (frame.size() == size) { - std::copy(frame.cbegin(), frame.cend(), buffer); + else if (sti.frame.size() == size) { + std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer); } else { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() << " received, " << size << " requested"; memset(buffer, 0, size * sizeof(*buffer)); } @@ -123,6 +154,7 @@ int Edi::readFrame(uint8_t* buffer, size_t size) etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; } return size; +#endif } void Edi::m_run() @@ -159,7 +191,7 @@ void Edi::m_run() const auto sti = m_sti_writer.getFrame(); if (not sti.frame.empty()) { - m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); + m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); work_done = true; } diff --git a/src/input/Edi.h b/src/input/Edi.h index 7b3dc04..66ff682 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -71,7 +71,8 @@ class Edi : public InputBase { EdiDecoder::STIDecoder m_sti_decoder; std::thread m_thread; std::atomic m_running = ATOMIC_VAR_INIT(false); - ThreadsafeQueue > m_frames; + ThreadsafeQueue m_frames; + EdiDecoder::sti_frame_t m_pending_sti_frame; bool m_is_prebuffering = true; diff --git a/src/utils.cpp b/src/utils.cpp index 721c145..3e3e86e 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -328,7 +328,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels) etiLog.level(info) << " URI: " << subchannel->inputUri; switch (subchannel->type) { case subchannel_type_t::DABAudio: - etiLog.log(info, " type: DAbAudio"); + etiLog.log(info, " type: DABAudio"); break; case subchannel_type_t::DABPlusAudio: etiLog.log(info, " type: DABPlusAudio"); -- cgit v1.2.3