diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 17:13:27 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-25 17:13:27 +0200 | 
| commit | 899dcb83ec873cb35d38583d6f48922e1312e9be (patch) | |
| tree | 42cfe4ddc96b52cb9f365f617c556f7e7153f5eb /src/AVTInput.cpp | |
| parent | 8bba5052cf7a3677e8be12315a03959fec33bf17 (diff) | |
| download | ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.tar.gz ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.tar.bz2 ODR-SourceCompanion-899dcb83ec873cb35d38583d6f48922e1312e9be.zip | |
Replace socket library
Diffstat (limited to 'src/AVTInput.cpp')
| -rw-r--r-- | src/AVTInput.cpp | 261 | 
1 files changed, 64 insertions, 197 deletions
| diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 48b2de1..973ed7b 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -56,24 +56,25 @@ static uint32_t unpack2(const uint8_t* buf)      return (buf[0] << 8) | buf[1];  } -AVTInput::AVTInput(const std::string& input_uri, const std::string& output_uri, uint32_t pad_port, size_t jitterBufferSize) -    :   _input_uri(input_uri), -        _output_uri(output_uri), -        _pad_port(pad_port), -        _jitterBufferSize(jitterBufferSize), - -        _output_packet(2048), -        _input_pad_packet(2048), -        _ordered(5000, _jitterBufferSize), -        _lastInfoFrameType(_typeCantExtract) +AVTInput::AVTInput(const std::string& input_uri, +        const std::string& output_uri, +        uint32_t pad_port, +        size_t jitterBufferSize) : +    _input_uri(input_uri), +    _output_uri(output_uri), +    _pad_port(pad_port), +    _jitterBufferSize(jitterBufferSize), + +    _output_packet(2048), +    _input_pad_packet(2048), +    _ordered(5000, _jitterBufferSize), +    _lastInfoFrameType(_typeCantExtract)  {  }  int AVTInput::prepare(void)  { -    UdpSocket::init(); -      INFO("Open input socket\n");      int ret = _openSocketSrv(&_input_socket, _input_uri.c_str()); @@ -82,7 +83,7 @@ int AVTInput::prepare(void)          ret = _openSocketCli();      } -    if ( ret == 0 && _pad_port > 0) { +    if (ret == 0 && _pad_port > 0) {          INFO("Open PAD Port %d\n", _pad_port);          char uri[50];          sprintf(uri, "udp://:%d", _pad_port); @@ -105,13 +106,13 @@ int AVTInput::setDabPlusParameters(int bitrate, int channels, int sample_rate, b          return 1;      } -    if ( sample_rate != 48000 && sample_rate != 32000 ) { +    if (sample_rate != 48000 && sample_rate != 32000) {          ERROR("Bad sample rate for DAB+ (32000,48000)");          return 1;      }      _dac = sample_rate == 48000 ? AVT_DAC_48 : AVT_DAC_32; -    if ( channels != 1 && channels != 2 ) { +    if (channels != 1 && channels != 2) {          ERROR("Bad channel number for DAB+ (1,2)");          return 1;      } @@ -169,7 +170,7 @@ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port)      return true;  } -int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri) +int AVTInput::_openSocketSrv(Socket::UDPSocket* socket, const char* uri)  {      int returnCode = -1; @@ -178,27 +179,13 @@ int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri)      if (_parseURI(uri, address, port)) {          returnCode = 0; -        if (socket->create(port) == -1) { -            fprintf(stderr, "can't set port %li on Udp input (%s: %s)\n", -                    port, inetErrDesc, inetErrMsg); -            returnCode = -1; -        } +        socket->reinit(port);          if (!address.empty()) { -            // joinGroup should accept const char* -            if (socket->joinGroup((char*)address.c_str()) == -1) { -                fprintf(stderr, -                        "can't join multicast group %s (%s: %s)\n", -                        address.c_str(), inetErrDesc, inetErrMsg); -                returnCode = -1; -            } +            socket->joinGroup(address.c_str());          } -        if (socket->setBlocking(false) == -1) { -            fprintf(stderr, "can't set Udp input socket in non-blocking mode " -                    "(%s: %s)\n", inetErrDesc, inetErrMsg); -            returnCode = -1; -        } +        socket->setBlocking(false);      }      return returnCode; @@ -216,74 +203,16 @@ int AVTInput::_openSocketCli()          return -1;      } -    if (_output_packet.getAddress().setAddress(address.c_str()) == -1) { -        fprintf(stderr, "Can't set address %s (%s: %s)\n", address.c_str(), -                inetErrDesc, inetErrMsg); -        return -1; -    } - -    _output_packet.getAddress().setPort(port); - -    if (_output_socket.create() == -1) { -        fprintf(stderr, "Can't create UDP socket (%s: %s)\n", -                inetErrDesc, inetErrMsg); -        return -1; -    } - +    _output_packet.address.resolveUdpDestination(address.c_str(), port);      return 0;  } -/* ------------------------------------------------------------------ - * From ODR-Dabmux dabInputUdp::dabInputUdpRead - */ -ssize_t AVTInput::_read(uint8_t* buf, size_t size, bool onlyOnePacket) -{ -    size_t nbBytes = 0; - -    uint8_t* data = buf; -    UdpPacket _input_packet(2048); - -    if (_input_packet.getLength() == 0) { -        _input_socket.receive(_input_packet); -    } - -    while (nbBytes < size) { -        size_t freeSize = size - nbBytes; -        if (_input_packet.getLength() > freeSize) { -            // Not enought place in output -            memcpy(&data[nbBytes], _input_packet.getData(), freeSize); -            nbBytes = size; -            _input_packet.setOffset(_input_packet.getOffset() + freeSize); -        } -        else { -            size_t length = _input_packet.getLength(); -            memcpy(&data[nbBytes], _input_packet.getData(), length); -            nbBytes += length; -            _input_packet.setOffset(0); - -            _input_socket.receive(_input_packet); -            if (_input_packet.getLength() == 0 || onlyOnePacket) { -                break; -            } -        } -    } -    bzero(&data[nbBytes], size - nbBytes); - -    return nbBytes; -} - -/* ------------------------------------------------------------------ - * - */  bool AVTInput::_isSTI(const uint8_t* buf)  {      return  (memcmp(buf+1, STI_FSync0, sizeof(STI_FSync0)) == 0) ||              (memcmp(buf+1, STI_FSync1, sizeof(STI_FSync1)) == 0);  } -/* ------------------------------------------------------------------ - * - */  const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size,                                      int32_t& frameNumber, size_t& dataSize)  { @@ -362,19 +291,13 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size,  void AVTInput::_sendCtrlMessage()  {      if (!_output_uri.empty()) { -        uint8_t data[50]; -        uint32_t index = 0; - -        data[index++] = 0xFD; -        data[index++] = 0x07; -        data[index++] = _subChannelIndex; -        data[index++] = _audioMode; -        data[index++] = _dac; -        data[index++] = _monoMode; - -        _output_packet.setOffset(0); -        _output_packet.setLength(0); -        _output_packet.addData(data, index); +        std::vector<uint8_t> buf({ 0xFD, 0x07, +                static_cast<uint8_t>(_subChannelIndex), +                static_cast<uint8_t>(_audioMode), +                static_cast<uint8_t>(_dac), +                static_cast<uint8_t>(_monoMode)}); + +        _output_packet.buffer = buf;          _output_socket.send(_output_packet);          INFO("Send control packet to encoder\n"); @@ -390,28 +313,21 @@ void AVTInput::_sendCtrlMessage()   *              : 1 Byte  : Size of pad data   * Pad datas    : X Bytes : In natural order, strating with FPAD bytes   */ -void AVTInput::_sendPADFrame(UdpPacket* packet) +void AVTInput::_sendPADFrame()  { -    if (packet && _padFrameQueue.size() > 0) { +    if (_padFrameQueue.size() > 0) {          std::vector<uint8_t> frame(move(_padFrameQueue.front()));          _padFrameQueue.pop(); -        uint8_t data[500]; -        uint32_t index = 0; - -        data[index++] = 0xFD; -        data[index++] = 0x18; -        data[index++] = frame.size()+2; -        data[index++] = 0xAD; -        data[index++] = frame.size(); -        memcpy( data+index, frame.data(), frame.size()); -        index += frame.size(); +        std::vector<uint8_t> buf({ 0xFD, 0x18, +                static_cast<uint8_t>(frame.size()+2), +                0xAD, +                static_cast<uint8_t>(frame.size())}); -        packet->setOffset(0); -        packet->setLength(0); -        packet->addData(data, index); - -        _input_pad_socket.send(*packet); +        Socket::UDPPacket packet; +        packet.buffer = buf; +        copy(frame.begin(), frame.end(), back_inserter(packet.buffer)); +        _input_pad_socket.send(packet);      }  } @@ -421,111 +337,71 @@ void AVTInput::_sendPADFrame(UdpPacket* packet)   * Command code : 1 Byte   *                  * 0x17 = Request for 1 PAD Frame   */ -void AVTInput::_interpretMessage(const uint8_t* data, size_t size, UdpPacket* packet) +void AVTInput::_interpretMessage(const uint8_t* data, size_t size)  {      if (size >= 2) {          if (data[0] == 0xFD) {              switch (data[1]) {                  case 0x17: -                    _sendPADFrame(packet); +                    _sendPADFrame();                      break;              }          }      }  } -/* ------------------------------------------------------------------ - * - */  bool AVTInput::_checkMessage()  { -    bool dataRecevied = false; - -    if (_input_pad_packet.getLength() == 0) { -        _input_pad_socket.receive(_input_pad_packet); +    const auto packet = _input_pad_socket.receive(2048); +    if (packet.buffer.empty()) { +        return false;      } -    if (_input_pad_packet.getLength() > 0) { -        _interpretMessage((uint8_t*)_input_pad_packet.getData(), _input_pad_packet.getLength(), -                &_input_pad_packet); -        _input_pad_packet.setOffset(0); -        _input_pad_socket.receive(_input_pad_packet); - -        dataRecevied = true; -    } +    _interpretMessage(packet.buffer.data(), packet.buffer.size()); -    return dataRecevied; +    return true;  } -/* ------------------------------------------------------------------ - * - */  void AVTInput::_purgeMessages()  { -    bool dataRecevied;      int nb = 0; -    do { -        dataRecevied = false; -        if (_input_pad_packet.getLength() == 0) { -            _input_pad_socket.receive(_input_pad_packet); -        } - -        if (_input_pad_packet.getLength() > 0) { -            nb++; -            _input_pad_packet.setOffset(0); -            _input_pad_socket.receive(_input_pad_packet); - -            dataRecevied = true; -        } -    } while (dataRecevied); +    while (not _input_pad_socket.receive(2048).buffer.empty()) { +        nb++; +    }      if (nb>0) DEBUG("%d messages purged\n", nb);  } -/* ------------------------------------------------------------------ - * - */  bool AVTInput::_readFrame()  { -    bool dataRecevied = false; - -    uint8_t readBuf[MAX_AVT_FRAME_SIZE];      int32_t frameNumber;      const uint8_t* dataPtr = NULL;      size_t dataSize = 0; -    std::vector<uint8_t> data; -    size_t readBytes = _read(readBuf, sizeof(readBuf), true/*onlyOnePacket*/); -    if (readBytes > 0) -    { -        dataRecevied = true; +    auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE); +    const size_t readBytes = packet.buffer.size(); + +    if (readBytes > 0) { +        const uint8_t *readBuf = packet.buffer.data();          if (readBytes > _dab24msFrameSize) {              // Extract frame data and frame number from buf              dataPtr = _findDABFrameFromUDP(readBuf, readBytes, frameNumber, dataSize);          } -//      if (!data) { -//              // Assuming pure RAW data -//              data = buf; -//              dataSize = _dab24msFrameSize; -//              frameNumber = _dummyFrameNumber++; -//      } -        if (!dataPtr) { -            _info(_typeCantExtract, 0); -        } +          if (dataPtr) { -            if (dataSize == _dab24msFrameSize ) { -                if( _frameAligned || frameNumber%5 == 0) { +            if (dataSize == _dab24msFrameSize) { +                if (_frameAligned or frameNumber%5 == 0) {  #if defined(DISTURB_INPUT)                      // Duplicate a frame -                    if(frameNumber%250==0) _ordered.push(frameNumber, dataPtr, dataSize); +                    if (frameNumber % 250 == 0) _ordered.push(frameNumber, dataPtr, dataSize);                      // Invert 2 frames (content inverted, audio distrubed by this test)) -                    if( frameNumber % 200 == 0) frameNumber += 10; -                    else if( (frameNumber-10) % 200 == 0) frameNumber -= 10; +                    if (frameNumber % 200 == 0) frameNumber += 10; +                    else if ((frameNumber-10) % 200 == 0) frameNumber -= 10;                      // Remove a frame (audio distrubed, frame missing) -                    if(frameNumber%300 > 5) +                    if (frameNumber % 300 > 5)  #endif                      _ordered.push(frameNumber, dataPtr, dataSize);                      _frameAligned = true; @@ -533,14 +409,14 @@ bool AVTInput::_readFrame()              }              else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize);          } +        else { +            _info(_typeCantExtract, 0); +        }      } -    return dataRecevied; +    return readBytes > 0;  } -/* ------------------------------------------------------------------ - * - */  ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)  {      ssize_t nbBytes = 0; @@ -577,9 +453,6 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)      return nbBytes;  } -/* ------------------------------------------------------------------ - * - */  void AVTInput::pushPADFrame(const uint8_t* buf, size_t size)  {      if (_pad_port == 0) { @@ -593,17 +466,11 @@ void AVTInput::pushPADFrame(const uint8_t* buf, size_t size)      }  } -/* ------------------------------------------------------------------ - * - */  bool AVTInput::padQueueFull()  {      return _padFrameQueue.size() >= MAX_PAD_FRAME_QUEUE_SIZE;  } -/* ------------------------------------------------------------------ - * - */  void AVTInput::_info(_frameType type, size_t size)  {      if (_lastInfoFrameType != type || _lastInfoSize != size) { | 
