User Tools

Site Tools


vbdotnetcommon:mqttclient

This is a pure VB.Net MQTT Client

No external library required. Fairly simple, no SSL…

Test.vb
Option Explicit On
Option Strict On
 
Module Module1
 
  Sub Main()
 
    Dim M As New MQTT_Client()
    AddHandler M.Connected, AddressOf hMQQTConnected
 
    ' M.Connect_ClientID = "HelloWorld"
    M.Connect_CleanSession = True
    M.Connect_KeepAlive = 60 * 2
    M.Connect_WillMessage = "Ut oh..."
    M.Connect_WillTopic = "Crap"
    M.SetHost("mindy", 1883)
    M.Connect()
 
    'While M.IsReady = False
    '  Console.Write(".")
    '  System.Threading.Thread.Sleep(100)
    'End While
    'Console.WriteLine()
 
 
    While True
      If Console.KeyAvailable Then
        Select Case Console.ReadKey.KeyChar
          Case "q"c : Exit While
          Case "s"c : M.SendPUBLISH_QoS0("Hello", System.Text.ASCIIEncoding.ASCII.GetBytes(Now.ToString("yyyyMMdd HH:mm:ss")))
        End Select
      Else
      End If
 
      ' Do stuff??
      'System.Threading.Thread.Sleep(100)
 
      'If System.Threading.ManualResetEvent.WaitAny({M.PUBLISHWaitHandle}, 100) <> System.Threading.WaitHandle.WaitTimeout Then
      '  While True
      '    Dim Msg As MQTT_Client.msgPUBLISH = M.GetNextPUBLISHMessage
      '    If Msg Is Nothing Then Exit While
      '    Console.WriteLine("PUBLISH: " + Msg.TopicName + ":" + Msg.H_QoSLevel.ToString + CStr(IIf(Msg.H_Retain, "R", "")) + ": " + System.Text.ASCIIEncoding.ASCII.GetString(Msg.Payload))
      '  End While
      'End If
 
      Dim Msg As MQTT_Client.msgPUBLISH = M.WaitOnePUBLISHMessage(100)
      If Msg IsNot Nothing Then
        Console.WriteLine("PUBLISH: " + Msg.TopicName + ":" + Msg.H_QoSLevel.ToString + CStr(IIf(Msg.H_Retain, "R", "")) + ": " + System.Text.ASCIIEncoding.ASCII.GetString(Msg.Payload))
      End If
 
    End While
 
    M.SendDISCONNECT()
 
 
  End Sub
 
  Private Sub hMQQTConnected(sender As MQTT_Client)
    'sender.SendSUBSCRIBE(New MQTT_Client.TopicFilters From {New MQTT_Client.TopicFilters.TopicFilterQoS("#", 0)})
    sender.SendSUBSCRIBE(New MQTT_Client.TopicFilters From {New MQTT_Client.TopicFilters.TopicFilterQoS("homie/+/sensor/#", 0)})
  End Sub
 
End Module
MQTT_Client.vb
Option Explicit On
Option Strict On
 
Public Class MQTT_Client
  ' Dim TClient As System.Net.Sockets.TcpClient = Nothing
  Dim _Socket As System.Net.Sockets.Socket = Nothing '= New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp)
 
  Public Property Connect_ClientID As String = ""
  Public Property Connect_UserName As String = ""
  Public Property Connect_Password As String = ""
  Public Property Connect_WillMessage As String = ""
  Public Property Connect_WillTopic As String = ""
  Public Property Connect_WillRetain As Boolean = False
  Public Property Connect_WillQOS As ConnectFlags = 0
  Public Property Connect_CleanSession As Boolean = False
  Private _Connect_KeepAlive As UInt16 = 60 * 5
  Public Property Connect_KeepAlive As UInt16
    Get
      Return _Connect_KeepAlive
    End Get
    Set(value As UInt16)
      _Connect_KeepAlive = value
      If value > 0 Then
        topPing = value \ 2
      Else
        topPing = 60
      End If
      If cntPing > topPing Then cntPing = topPing
    End Set
  End Property
  Public Property IsConnected As Boolean = False
  Public Property IsReady As Boolean = False
  Public Property StayConnected As Boolean = True
 
  Public Event Connected(sender As MQTT_Client)
  Public Event FailedConnect(sender As MQTT_Client)
  Public Event Disconnected(sender As MQTT_Client)
 
  Public Property Connect_Endpoint As System.Net.EndPoint
 
  Private PacketIdentifier As Int32 = &HC0FEUS
 
  Private Const ValidClientIDChars As String = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
  Private Const RxBuffSize As Integer = 1024 * 8
  Private RxBuff(0 To (RxBuffSize - 1)) As Byte
  Private RxPayload As New System.IO.MemoryStream(1024 * 8)
  Private RxPayloadLen As Integer = 0
 
  Private Messages As New Queue(Of msgPUBLISH)
  Private CurrentMessage As Message = Nothing
  Private mrePUBLISH As New System.Threading.ManualResetEvent(False)
  Private lckPUBLISH As New Object
 
 
  Private RxPayloadHeader(0 To 4) As Byte
 
  Private tmrPing As Timers.Timer = Nothing
  Private lckPing As New Object
  Private _topPing As Integer = 60
  Private Property topPing As Integer
    Get
      SyncLock lckPing
        Return _topPing
      End SyncLock
    End Get
    Set(value As Integer)
      SyncLock lckPing
        _topPing = value
      End SyncLock
    End Set
  End Property
 
  Private _cntPing As Integer = 60
  Private Property cntPing As Integer
    Get
      SyncLock lckPing
        Return _cntPing
      End SyncLock
    End Get
    Set(value As Integer)
      SyncLock lckPing
        _cntPing = value
      End SyncLock
    End Set
  End Property
 
  Private Sub ResetCntPing()
    _cntPing = _topPing
  End Sub
 
  Public Sub New()
    tmrPing = New Timers.Timer(1000)
    tmrPing.Enabled = True
    AddHandler tmrPing.Elapsed, AddressOf hTmrPing
  End Sub
 
  Public Class Message
    Private Const B_F As Byte = &HF
    Public ControlPacketTypeByte As Byte
    Public ReadOnly Property ControlPacketType As ControlPacketType
      Get
        Return DirectCast(ControlPacketTypeByte >> 4, ControlPacketType)
      End Get
    End Property
 
    Public Payload() As Byte = {}
    Public ReadOnly Property PayloadLen As Integer
      Get
        Return Payload.Length
      End Get
    End Property
 
    Public ReadOnly Property Flags As Byte
      Get
        Return ControlPacketTypeByte And B_F
      End Get
    End Property
  End Class
 
  Private Function GetPacketIdentifier() As UInt16
    PacketIdentifier += 1
    If PacketIdentifier > UInt16.MaxValue Then
      PacketIdentifier = 0
    End If
 
    Return CUShort(PacketIdentifier And UInt16.MaxValue)
  End Function
 
  Public Sub SetHost(hostname As String, port As Integer)
    Connect_Endpoint = New System.Net.IPEndPoint(System.Net.Dns.GetHostAddresses(hostname)(0), port)
  End Sub
 
  Private Sub hFailedConnect() Handles Me.FailedConnect
    If Me.StayConnected Then Me.Connect()
  End Sub
  Private Sub hDisconnected() Handles Me.Disconnected
    If Me._CurrentState = States.WaitCONNACK Then Me.StayConnected = False ' Connect error
 
    If Me.StayConnected Then Me.Connect()
  End Sub
 
  Private Sub hTmrPing(sender As Object, e As System.Timers.ElapsedEventArgs)
 
    _cntPing -= 1
    If _cntPing <= 0 Then
      Me.SendPINGREQ()
    End If
 
  End Sub
  Public Sub Connect()
    IsConnected = False
    IsReady = False
    If Me._Socket IsNot Nothing Then
      _Socket.Close()
      _Socket = Nothing
    End If
 
    _Socket = New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp)
 
    _Socket.ReceiveBufferSize = 1024 * 8
    _Socket.SendBufferSize = 1024 * 8
    _Socket.NoDelay = True
    _Socket.Blocking = True 'False
    _Socket.ReceiveTimeout = 2000
    '_Socket.ReceiveTimeout=5000
    '_Socket.SendTimeout = _SendTimeout
 
    Try
      Console.WriteLine("     Atempting connection to: " + Me.Connect_Endpoint.ToString)
      Me._CurrentState = States.Connecting
      _Socket.BeginConnect(Me.Connect_Endpoint, AddressOf hConnectCallback, _Socket)
      '_Socket.Connect(EP)
    Catch ex As Exception
      ' nothing to catch...
    End Try
 
  End Sub
 
 
  Private Sub hConnectCallback(ByVal ar As System.IAsyncResult)
    Try
      Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket)
      If s.Connected = False Then
        RaiseEvent FailedConnect(Me)
        Exit Sub
      End If
      Console.WriteLine("     Connected to: {0}, Connected={1}", s.RemoteEndPoint.ToString, s.Connected)
      s.EndConnect(ar)
 
      Me._CurrentState = States.SendCONNECT
      SendCONNECT(s)
 
      IsConnected = True
 
      s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s)
 
      ' RaiseEvent Connected(Me)
    Catch ex As Exception
      Console.Write(ex.Message + vbCrLf)
      RaiseEvent FailedConnect(Me)
    End Try
  End Sub
 
  Private Sub DoClose()
    IsConnected = False
    IsReady = False
    Console.WriteLine("     Closed")
 
    If Me._Socket IsNot Nothing Then
      _Socket.Close()
      _Socket = Nothing
    End If
    RaiseEvent Disconnected(Me)
  End Sub
 
  Private Sub hReceiveCallback(ByVal ar As System.IAsyncResult)
    On Error GoTo Er
 
    Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket)
 
    Dim BytesReceived As Integer = 0
    If s.Connected Then BytesReceived = s.EndReceive(ar)
 
    If BytesReceived > 0 Then
      Dim DoDisconnect As Boolean = False
      ProcessRxBuffer(BytesReceived, 0, DoDisconnect)
 
      If DoDisconnect Then
        Console.WriteLine("ERR: DoDisconnect: " + Err.Description)
 
        DoClose()
      Else
        s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s)
      End If
    Else
      ' Close it
      Console.WriteLine("ERR: BytesReceived=0: " + Err.Description)
 
      DoClose()
    End If
    Exit Sub
 
Er:
    Console.WriteLine("ERR: hReceiveCallback: " + Err.Description)
    DoClose()
  End Sub
 
  Private Sub ProcessRxBuffer(BytesReceived As Integer, ByteOffset As Integer, ByRef DoDisconnect As Boolean)
    '  Console.WriteLine(String.Format("ProcessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset))
    Select Case _CurrentState
      Case States.WaitControlPacket, States.WaitCONNACK
        ' Need to wait for enough bytes to be able to read type and length
        Dim BytesAvail As Integer = BytesReceived - ByteOffset
        If BytesAvail = 0 Then
          'DoDisconnect = True
          Exit Sub
        End If
 
        If FlagsLookOk(RxBuff(ByteOffset)) = False Then
          DoDisconnect = True
          Exit Sub
        End If
 
        CurrentMessage = New Message With {.ControlPacketTypeByte = RxBuff(ByteOffset)}
 
        ' See if we get a length
        Dim OutSize As Integer = 0
        Dim BytesConsumed = BytesToLength(RxBuff, ByteOffset + 1, BytesReceived, OutSize)
 
        Select Case BytesConsumed
          Case Is <= 0, Is > 4
            DoDisconnect = True
            Exit Sub
        End Select
 
        If OutSize > 1024 * 1024 Then
          ' A little too big, well, for now... 256MB is max-ish
          DoDisconnect = True
          Exit Sub
        End If
        ' Ok, if here, we should have a valid length
        RxPayloadLen = OutSize
 
        ' Go to next state
        _CurrentState = States.ReceivingControlPacket
 
        ' Call again
        ProcessRxBuffer(BytesReceived, ByteOffset + 1 + BytesConsumed, DoDisconnect)
        If DoDisconnect Then Exit Sub
      Case States.ReceivingControlPacket
        ' Receive payload
        ' Get the number of bytes left to read
        Dim BytesToRead As Int32 = CInt(RxPayloadLen - RxPayload.Position)
        ' See how many we have in RX buff
        Dim BytesAvail As Int32 = BytesReceived - ByteOffset
 
        ' Read what we can
        If BytesToRead > BytesAvail Then BytesToRead = BytesAvail
        RxPayload.Write(RxBuff, ByteOffset, BytesToRead)
        ByteOffset = ByteOffset + BytesToRead
 
        ' See where we are
        If RxPayloadLen = RxPayload.Position Then
          ' We have a full buffer
          CurrentMessage.Payload = RxPayload.ToArray
 
          HandleMessage(CurrentMessage, DoDisconnect)
 
 
 
          ' Console.WriteLine(String.Format("    Received: {0}", CurrentMessage.ControlPacketType))
          CurrentMessage = Nothing
          RxPayload.Position = 0
          RxPayload.SetLength(0)
          _CurrentState = States.WaitControlPacket
        End If
 
        ' Run more?
        If ByteOffset < BytesReceived Then
          ProcessRxBuffer(BytesReceived, ByteOffset, DoDisconnect)
        End If
 
      Case Else
        DoDisconnect = True
    End Select
 
    'Console.WriteLine(String.Format("_rocessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset))
 
  End Sub
 
  Private Sub HandleMessage(Msg As Message, ByRef DoDisconnect As Boolean)
 
    Select Case Msg.ControlPacketType
      Case ControlPacketType.CONNACK
        If Msg.PayloadLen <> msgCONNACK.FIXEDLENGTH Then
          DoDisconnect = True
          Exit Sub
        End If
        Dim M As New msgCONNACK(Msg.Payload)
        Console.WriteLine("CONNACK: " + M.ReturnCode.ToString)
        If M.ReturnCode <> msgCONNACK.ConnectReturnCode.ConnectionAccepted Then
          DoDisconnect = True
          Me.StayConnected = False ' It didn't like us, so don't hammer
          Exit Sub
        End If
 
        IsReady = True
        RaiseEvent Connected(Me)
 
      Case ControlPacketType.PUBLISH
        NotifyPUBLISH(New msgPUBLISH(Msg))
 
      Case ControlPacketType.PUBACK
 
      Case ControlPacketType.PINGREQ
        ' Not likely, as the Client is supposed to originate, not the server...
        Dim M As New msgPINGREQ(Msg)
        SendPINGRESP()
      Case ControlPacketType.PINGRESP
        Dim M As New msgPINGRESP(Msg)
 
      Case ControlPacketType.SUBACK
      Case ControlPacketType.UNSUBACK
 
 
    End Select
 
 
  End Sub
 
  Private Sub NotifyPUBLISH(M As msgPUBLISH)
    SyncLock lckPUBLISH
      Messages.Enqueue(M)
    End SyncLock
    mrePUBLISH.Set()
 
  End Sub
 
  ''' <summary>Returns a message if present, or Nothing if none</summary>
  Public Function GetNextPUBLISHMessage() As msgPUBLISH
    Dim RetVal As msgPUBLISH = Nothing
 
    SyncLock lckPUBLISH
      If Messages.Count > 0 Then RetVal = Messages.Dequeue
      If Messages.Count = 0 Then mrePUBLISH.Reset()
    End SyncLock
 
    Return RetVal
  End Function
 
  Public ReadOnly Property PUBLISHWaitHandle As System.Threading.EventWaitHandle
    Get
      Return Me.mrePUBLISH
    End Get
  End Property
 
  ''' <summary>Returns a message if present, or Nothing if times out</summary>
  Public Function WaitOnePUBLISHMessage(millisecondsTimeout As Integer) As msgPUBLISH
    If Me.mrePUBLISH.WaitOne(millisecondsTimeout, False) Then
      Return Me.GetNextPUBLISHMessage
    End If
 
    Return Nothing
  End Function
 
 
 
 
  Private _CurrentState As States = States.NotConnected
 
  Public Enum States As Integer
    NotConnected = 0
    Connecting
    SendCONNECT
 
    WaitCONNACK
    'ReceivingCONNACK
 
    ' Normal states
    WaitControlPacket ' Once we send the connect packet, we go here.  We always come back here
    ReceivingControlPacket ' We are here while more data is coming in
 
 
  End Enum
 
  Public Enum ControlPacketType As Byte
    Reserved_0 = 0 ' Forbidden                            Reserved
    CONNECT = 1 ' Client to Server                        Client request to connect to Server
    CONNACK = 2 ' Server to Client                        Connect acknowledgment
    PUBLISH = 3 ' Client to Server or Server to Client    Publish message
    PUBACK = 4 ' Client to Server or Server to Client     Publish acknowledgment
    PUBREC = 5 ' Client to Server or Server to Client     Publish received (assured delivery part 1)
    PUBREL = 6  ' Client to Server or Server to Client    Publish release (assured delivery part 2)
    PUBCOMP = 7 ' Client to Server or Server to Client    Publish complete (assured delivery part 3)
    SUBSCRIBE = 8 ' Client to Server                      Client subscribe request
    SUBACK = 9 ' Server to Client                         Subscribe acknowledgment
    UNSUBSCRIBE = 10 ' Client to Server                   Unsubscribe request
    UNSUBACK = 11 ' Server to Client                      Unsubscribe acknowledgment
    PINGREQ = 12 ' Client to Server                       PING request
    PINGRESP = 13 ' Server to Client                      PING response
    DISCONNECT = 14 ' Client to Server                    Client is disconnecting
    Reserved_15 = 15 ' Forbidden                          Reserved
  End Enum
 
 
 
  Public Class TopicFilters : Inherits System.Collections.Generic.List(Of TopicFilterQoS)
    Public Class TopicFilterQoS
      Public Property TopicFilter As String
      Public Property QoS As Byte
 
      Public Sub New()
      End Sub
      Public Sub New(TopicFilter As String, QoS As Byte)
        Me.TopicFilter = TopicFilter
        Me.QoS = QoS
      End Sub
 
      Public Sub Serialize(Stm As System.IO.Stream)
        UTF8.WriteStringToStream(Stm, Me.TopicFilter)
        Stm.WriteByte(Me.QoS)
      End Sub
    End Class
 
    Public Sub Serialize(Stm As System.IO.Stream)
      For Each T In Me
        T.Serialize(Stm)
      Next
    End Sub
  End Class
 
  Public Class msgCONNACK
    Public Const FIXEDLENGTH As Int32 = 2
    Public Property AcknowledgeFlags As Byte
    Public Property ReturnCode As ConnectReturnCode
 
    Public Enum ConnectReturnCode As Byte
      ConnectionAccepted = 0
      ConnectionRefused_UnacceptableProtocolVersion = 1
      ConnectionRefused_IdentifierRejected = 2
      ConnectionRefused_ServerUnavailable = 3
      ConnectionRefused_BadUsernameOrPassword = 4
      ConnectionRefused_NotAuthorized = 5
    End Enum
 
    Public Sub New()
    End Sub
    Public Sub New(Payload() As Byte)
      Me.AcknowledgeFlags = Payload(0)
      Me.ReturnCode = DirectCast(Payload(1), ConnectReturnCode)
    End Sub
  End Class
 
  Public Class msgPUBLISH
    Public Property H_DUPFlag As Boolean
    Public Property H_QoSLevel As Byte
    Public Property H_Retain As Boolean
 
    Public TopicName As String
    Public PacketIdentifier As UInt16
    Public Payload() As Byte = {}
 
    Public Function PayloadToString() As String
      Return System.Text.ASCIIEncoding.ASCII.GetString(Me.Payload)
    End Function
 
    Public Sub New()
    End Sub
    Public Sub New(Msg As Message)
      H_DUPFlag = (Msg.Flags And &H1 << 3) <> 0
      H_QoSLevel = CByte((Msg.Flags And &H3 << 1) >> 1)
      H_Retain = (Msg.Flags And &H1 << 0) <> 0
 
      Dim Offset As Integer = 0
      ' Get Topic Name
      Dim TN As New UTF8(Msg.Payload, Offset)
      Offset += TN.TotalLength
      Me.TopicName = TN.ToString
 
      ' Packet Identifier?
      If Me.H_QoSLevel > 0 Then
        Me.PacketIdentifier = ReadUInt16_BE(Msg.Payload, Offset)
        Offset += 2
      End If
 
      ' The rest of the payload
      Dim FinalPayloadLen As Integer = Msg.PayloadLen - Offset
 
      Array.Resize(Me.Payload, FinalPayloadLen)
      Array.Copy(Msg.Payload, Offset, Me.Payload, 0, FinalPayloadLen)
 
      ' Done!
    End Sub
 
  End Class
 
  Public Class msgPINGREQ
    Public Sub New()
    End Sub
    Public Sub New(Msg As Message)
      ' No header or payload payload..
 
      ' Done!
    End Sub
 
  End Class
 
  Public Class msgPINGRESP
    Public Sub New()
    End Sub
    Public Sub New(Msg As Message)
      ' No header or payload payload..
 
      ' Done!
    End Sub
 
  End Class
 
 
  <Flags()> Public Enum ConnectFlags As Byte
    UserNameFlag = 1 << 7
    PasswordFlag = 1 << 6
    WillRetain = 1 << 5
    WillQoS_1 = 1 << 3
    WillQoS_2 = 2 << 3
    WillFlag = 1 << 2
    CleanSession = 1 << 1
    Reserved = 1 << 0
  End Enum
 
 
 
  Public Class UTF8
    Private _BA() As Byte = {}
    Public ReadOnly Property TotalLength As Integer
      Get
        Return 2 + _BA.Length
      End Get
    End Property
 
    Public Sub New(S As String)
      _BA = System.Text.UTF8Encoding.UTF8.GetBytes(S)
 
      If _BA.Length > UInt16.MaxValue Then Throw New Exception("Too many bytes")
    End Sub
 
    Public Sub New(B() As Byte)
      ' Assumes first two bytes are length
      Dim BLen As Integer = B.Length
      Dim BLen_m2 As Integer = BLen - 2
 
      If BLen_m2 < 0 Then Throw New Exception("Too few bytes")
      If BLen_m2 > UInt16.MaxValue Then Throw New Exception("Too many bytes")
 
      Array.Resize(_BA, BLen_m2)
      Array.Copy(B, 2, _BA, 0, BLen_m2)
    End Sub
 
    Public Sub New(B() As Byte, Offset As Integer)
      ' Assumes first two bytes are length
      Dim BLen As Integer = ReadUInt16_BE(B, Offset)
      Array.Resize(_BA, BLen)
      Array.Copy(B, Offset + 2, _BA, 0, BLen)
    End Sub
 
 
    Public Sub New(Stm As System.IO.Stream)
      ' Read length
      Dim BLen As Integer = ReadUInt16_BE(Stm)
      Array.Resize(_BA, BLen)
      Stm.Read(_BA, 0, BLen)
    End Sub
 
    Public Overrides Function ToString() As String
      Return System.Text.UTF8Encoding.UTF8.GetString(_BA)
    End Function
 
    Public Function GetByteArray() As Byte()
      Dim BA2() As Byte = {}
      Dim BLen As Integer = _BA.Length
 
      ' Copy data over
      Array.Resize(BA2, BLen + 2)
      Array.Copy(_BA, 0, BA2, 2, BLen)
 
 
      WriteUInt16_BE(BA2, 0, CUShort(BLen))
      Return BA2
    End Function
 
    Public Sub WriteToStream(Stm As System.IO.Stream)
      Dim BLen As Integer = _BA.Length
      WriteUInt16_BE(Stm, CUShort(BLen))
      Stm.Write(_BA, 0, BLen)
    End Sub
 
    Public Shared Sub WriteStringToStream(Stm As System.IO.Stream, S As String)
      Dim U As New UTF8(S)
      U.WriteToStream(Stm)
    End Sub
  End Class
 
  Private Shared Function FlagsLookOk(B As Byte) As Boolean
    Const B_F As Byte = &HF
    ' Checks the flags field to see if they are ok, per spec
    Dim CPT As ControlPacketType = DirectCast(B >> 4, ControlPacketType)
    Dim Flags As Byte = B And B_F
 
    Select Case CPT
      Case ControlPacketType.CONNECT : Return Flags = 0 ' Reserved  0000
      Case ControlPacketType.CONNACK : Return Flags = 0 ' Reserved  0000
      Case ControlPacketType.PUBLISH : Return True ' Used in MQTT 3.1.1  DUP1 QoS2 QoS2 RETAIN3
      Case ControlPacketType.PUBACK : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.PUBREC : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.PUBREL : Return Flags = 2 ' Reserved 0010
      Case ControlPacketType.PUBCOMP : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.SUBSCRIBE : Return Flags = 2 ' Reserved 0010
      Case ControlPacketType.SUBACK : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.UNSUBSCRIBE : Return Flags = 2 ' Reserved 0010
      Case ControlPacketType.UNSUBACK : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.PINGREQ : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.PINGRESP : Return Flags = 0 ' Reserved 0000
      Case ControlPacketType.DISCONNECT : Return Flags = 0 ' Reserved 0000
    End Select
 
    Return False  ' I don't think we can get here...
  End Function
 
  Private Shared Sub WriteUInt16_BE(Arr() As Byte, Offset As Int32, Value As UInt16)
    Arr(Offset + 1) = CByte(Value And 255US)
    Value = Value >> 8
    Arr(Offset) = CByte(Value And 255US)
  End Sub
 
  Private Shared Sub WriteUInt16_BE(Stm As System.IO.Stream, Value As UInt16)
    Dim BA() As Byte = {0, 0}
    WriteUInt16_BE(BA, 0, Value)
    Stm.Write(BA, 0, 2)
  End Sub
 
  Public Shared Function ReadUInt16_BE(Arr() As Byte, Offset As Int32) As UInt16
    Dim V As UInt16 = 0
    V = CByte(Arr(Offset + 1))
    V = V Or (CUShort(Arr(Offset + 0)) << 8)
    Return V
  End Function
  Public Shared Function ReadUInt16_BE(Stm As System.IO.Stream) As UInt16
    Dim BA(0 To 1) As Byte
    Stm.Read(BA, 0, 2)
    Return ReadUInt16_BE(BA, 0)
  End Function
 
 
 
  Private Sub SendControlPacket(CPT As ControlPacketType, Flags As Byte, Payload() As Byte)
    Const B15 As Byte = 15
    If _Socket Is Nothing Then Exit Sub
    If _Socket.Connected = False Then Exit Sub
 
    Dim PayloadLen As Integer = Payload.Length
 
    Dim LenBA() As Byte = LengthToBytes(PayloadLen)
 
    Dim OutBA() As Byte = {}
    Array.Resize(OutBA, 1 + LenBA.Length + PayloadLen)
 
    OutBA(0) = (CPT << 4) Or (Flags And B15)
    Array.Copy(LenBA, 0, OutBA, 1, LenBA.Length)
    Array.Copy(Payload, 0, OutBA, 1 + LenBA.Length, PayloadLen)
 
    ' Send
    ' Console.WriteLine("   send")
    _Socket.Send(OutBA)
 
    ResetCntPing()
  End Sub
 
  Private Sub SendCONNECT(_S As System.Net.Sockets.Socket)
    If _S.Connected = False Then Exit Sub
 
    ' Variable Header
    Dim MS_V As New System.IO.MemoryStream(256)
    ' Protocol Name
    UTF8.WriteStringToStream(MS_V, "MQTT") '1-6
    MS_V.WriteByte(4) 'Protocol Level byte - 7
 
    ' Build Connect Flags
    Dim CF As ConnectFlags = Connect_WillQOS
    If Connect_CleanSession Then CF = CF Or ConnectFlags.CleanSession
    If Connect_Password.Length > 0 Then CF = CF Or ConnectFlags.PasswordFlag
    If Connect_UserName.Length > 0 Then CF = CF Or ConnectFlags.UserNameFlag
    If Connect_WillRetain Then CF = CF Or ConnectFlags.WillRetain
    If Connect_WillMessage.Length > 0 Then CF = CF Or ConnectFlags.WillFlag
 
    MS_V.WriteByte(CF) '8
    WriteUInt16_BE(MS_V, Connect_KeepAlive) ' 9-10
 
    ' Payload
    'Client Identifier, Will Topic, Will Message, User Name, Password
    UTF8.WriteStringToStream(MS_V, Connect_ClientID)
    If (CF And ConnectFlags.WillFlag) <> 0 Then
      UTF8.WriteStringToStream(MS_V, Connect_WillTopic)
      UTF8.WriteStringToStream(MS_V, Connect_WillMessage)
    End If
    If (CF And ConnectFlags.UserNameFlag) <> 0 Then
      UTF8.WriteStringToStream(MS_V, Connect_UserName)
    End If
    If (CF And ConnectFlags.PasswordFlag) <> 0 Then
      UTF8.WriteStringToStream(MS_V, Connect_Password) ' Should be binary...
    End If
 
    Me._CurrentState = States.WaitCONNACK  ' Set before to avoid race condition
    SendControlPacket(ControlPacketType.CONNECT, 0, MS_V.ToArray)
  End Sub
 
 
  Public Sub SendPUBLISH_QoS0(Topic As String, Payload() As Byte, Optional Retain As Boolean = False)
    ' Variable Header
    Const B_1 As Byte = 1
    Dim MS_V As New System.IO.MemoryStream(256 + Payload.Length)
    Dim Flags As Byte = 0
    If Retain Then Flags = Flags Or B_1
 
    ' Topic
    UTF8.WriteStringToStream(MS_V, Topic)
    ' Packet ID
 
    ' Payload
    MS_V.Write(Payload, 0, Payload.Length)
 
    ' Send!
    SendControlPacket(ControlPacketType.PUBLISH, Flags, MS_V.ToArray)
 
  End Sub
 
  Public Sub SendSUBSCRIBE(TopicFilters As TopicFilters)
    ' Variable Header
    Dim MS_V As New System.IO.MemoryStream(256)
 
    ' Packet Identifier (should save and ref it...)
    WriteUInt16_BE(MS_V, GetPacketIdentifier)
 
    ' Topics / QoSs
    TopicFilters.Serialize(MS_V)
 
    ' Send!
    SendControlPacket(ControlPacketType.SUBSCRIBE, 2, MS_V.ToArray)
  End Sub
 
  Public Sub SendPINGREQ()
    ' Send!
    SendControlPacket(ControlPacketType.PINGREQ, 0, {})
  End Sub
  Public Sub SendPINGRESP()
    ' Send!
    SendControlPacket(ControlPacketType.PINGRESP, 0, {})
  End Sub
 
  Public Sub SendDISCONNECT()
    ' Send!
    Me.StayConnected = False
    SendControlPacket(ControlPacketType.DISCONNECT, 0, {})
    Console.WriteLine("SendDISCONNECT: " + Err.Description)
 
    DoClose()
 
  End Sub
 
 
  Private Shared Function LengthToBytes(V As Int32) As Byte()
    Dim Q As New System.Collections.Generic.Queue(Of Byte)(8)
    Dim B As Byte
    Const B128 As Byte = 128
 
    While True
 
      B = CByte(V And &H7F)
      V = V >> 7
 
      If V = 0 Then
        Q.Enqueue(B)
        Exit While
      Else
        Q.Enqueue(B Or B128)
      End If
 
    End While
    Return Q.ToArray
  End Function
 
  Private Shared Function BytesToLength(BA() As Byte, ByVal Offset As Integer, BufferLen As Integer, ByRef OutSize As Integer) As Integer
    ' Returns 1-4 on ok, <=0 on error (Too little data, etc).  Returns number of consumed bytes
    Dim ConsumedBytes As Byte = 0
 
    Dim BALen As Integer = BufferLen 'BA.Length
    Dim B As Byte
    Const B128 As Byte = 128
    Const B1 As Byte = 1
    ' Read bytes, until MSB is clear
    OutSize = 0
    ' LSB is sent first..
    Dim ShiftCnt As Integer = 0
    While True
      If Offset >= BALen Then Return -1
      B = BA(Offset)
      ConsumedBytes += B1
      Offset += 1
 
      OutSize = OutSize Or (CInt(B And 127) << (7 * ShiftCnt))
 
      If (B And B128) = 0 Then Exit While
      ShiftCnt += 1
      If ShiftCnt > 4 Then
        Return -2
      End If
    End While
 
    Return ConsumedBytes
  End Function
End Class
vbdotnetcommon/mqttclient.txt · Last modified: 2025/07/09 11:34 by srbios

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki