Option Explicit On
Option Strict On
Public Class MQTT_Client
' Dim TClient As System.Net.Sockets.TcpClient = Nothing
Dim _Socket As System.Net.Sockets.Socket = Nothing '= New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp)
Public Property Connect_ClientID As String = ""
Public Property Connect_UserName As String = ""
Public Property Connect_Password As String = ""
Public Property Connect_WillMessage As String = ""
Public Property Connect_WillTopic As String = ""
Public Property Connect_WillRetain As Boolean = False
Public Property Connect_WillQOS As ConnectFlags = 0
Public Property Connect_CleanSession As Boolean = False
Private _Connect_KeepAlive As UInt16 = 60 * 5
Public Property Connect_KeepAlive As UInt16
Get
Return _Connect_KeepAlive
End Get
Set(value As UInt16)
_Connect_KeepAlive = value
If value > 0 Then
topPing = value \ 2
Else
topPing = 60
End If
If cntPing > topPing Then cntPing = topPing
End Set
End Property
Public Property IsConnected As Boolean = False
Public Property IsReady As Boolean = False
Public Property StayConnected As Boolean = True
Public Event Connected(sender As MQTT_Client)
Public Event FailedConnect(sender As MQTT_Client)
Public Event Disconnected(sender As MQTT_Client)
Public Property Connect_Endpoint As System.Net.EndPoint
Private PacketIdentifier As Int32 = &HC0FEUS
Private Const ValidClientIDChars As String = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
Private Const RxBuffSize As Integer = 1024 * 8
Private RxBuff(0 To (RxBuffSize - 1)) As Byte
Private RxPayload As New System.IO.MemoryStream(1024 * 8)
Private RxPayloadLen As Integer = 0
Private Messages As New Queue(Of msgPUBLISH)
Private CurrentMessage As Message = Nothing
Private mrePUBLISH As New System.Threading.ManualResetEvent(False)
Private lckPUBLISH As New Object
Private RxPayloadHeader(0 To 4) As Byte
Private tmrPing As Timers.Timer = Nothing
Private lckPing As New Object
Private _topPing As Integer = 60
Private Property topPing As Integer
Get
SyncLock lckPing
Return _topPing
End SyncLock
End Get
Set(value As Integer)
SyncLock lckPing
_topPing = value
End SyncLock
End Set
End Property
Private _cntPing As Integer = 60
Private Property cntPing As Integer
Get
SyncLock lckPing
Return _cntPing
End SyncLock
End Get
Set(value As Integer)
SyncLock lckPing
_cntPing = value
End SyncLock
End Set
End Property
Private Sub ResetCntPing()
_cntPing = _topPing
End Sub
Public Sub New()
tmrPing = New Timers.Timer(1000)
tmrPing.Enabled = True
AddHandler tmrPing.Elapsed, AddressOf hTmrPing
End Sub
Public Class Message
Private Const B_F As Byte = &HF
Public ControlPacketTypeByte As Byte
Public ReadOnly Property ControlPacketType As ControlPacketType
Get
Return DirectCast(ControlPacketTypeByte >> 4, ControlPacketType)
End Get
End Property
Public Payload() As Byte = {}
Public ReadOnly Property PayloadLen As Integer
Get
Return Payload.Length
End Get
End Property
Public ReadOnly Property Flags As Byte
Get
Return ControlPacketTypeByte And B_F
End Get
End Property
End Class
Private Function GetPacketIdentifier() As UInt16
PacketIdentifier += 1
If PacketIdentifier > UInt16.MaxValue Then
PacketIdentifier = 0
End If
Return CUShort(PacketIdentifier And UInt16.MaxValue)
End Function
Public Sub SetHost(hostname As String, port As Integer)
Connect_Endpoint = New System.Net.IPEndPoint(System.Net.Dns.GetHostAddresses(hostname)(0), port)
End Sub
Private Sub hFailedConnect() Handles Me.FailedConnect
If Me.StayConnected Then Me.Connect()
End Sub
Private Sub hDisconnected() Handles Me.Disconnected
If Me._CurrentState = States.WaitCONNACK Then Me.StayConnected = False ' Connect error
If Me.StayConnected Then Me.Connect()
End Sub
Private Sub hTmrPing(sender As Object, e As System.Timers.ElapsedEventArgs)
_cntPing -= 1
If _cntPing <= 0 Then
Me.SendPINGREQ()
End If
End Sub
Public Sub Connect()
IsConnected = False
IsReady = False
If Me._Socket IsNot Nothing Then
_Socket.Close()
_Socket = Nothing
End If
_Socket = New System.Net.Sockets.Socket(Net.Sockets.AddressFamily.InterNetwork, Net.Sockets.SocketType.Stream, Net.Sockets.ProtocolType.Tcp)
_Socket.ReceiveBufferSize = 1024 * 8
_Socket.SendBufferSize = 1024 * 8
_Socket.NoDelay = True
_Socket.Blocking = True 'False
_Socket.ReceiveTimeout = 2000
'_Socket.ReceiveTimeout=5000
'_Socket.SendTimeout = _SendTimeout
Try
Console.WriteLine(" Atempting connection to: " + Me.Connect_Endpoint.ToString)
Me._CurrentState = States.Connecting
_Socket.BeginConnect(Me.Connect_Endpoint, AddressOf hConnectCallback, _Socket)
'_Socket.Connect(EP)
Catch ex As Exception
' nothing to catch...
End Try
End Sub
Private Sub hConnectCallback(ByVal ar As System.IAsyncResult)
Try
Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket)
If s.Connected = False Then
RaiseEvent FailedConnect(Me)
Exit Sub
End If
Console.WriteLine(" Connected to: {0}, Connected={1}", s.RemoteEndPoint.ToString, s.Connected)
s.EndConnect(ar)
Me._CurrentState = States.SendCONNECT
SendCONNECT(s)
IsConnected = True
s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s)
' RaiseEvent Connected(Me)
Catch ex As Exception
Console.Write(ex.Message + vbCrLf)
RaiseEvent FailedConnect(Me)
End Try
End Sub
Private Sub DoClose()
IsConnected = False
IsReady = False
Console.WriteLine(" Closed")
If Me._Socket IsNot Nothing Then
_Socket.Close()
_Socket = Nothing
End If
RaiseEvent Disconnected(Me)
End Sub
Private Sub hReceiveCallback(ByVal ar As System.IAsyncResult)
On Error GoTo Er
Dim s As System.Net.Sockets.Socket = CType(ar.AsyncState, System.Net.Sockets.Socket)
Dim BytesReceived As Integer = 0
If s.Connected Then BytesReceived = s.EndReceive(ar)
If BytesReceived > 0 Then
Dim DoDisconnect As Boolean = False
ProcessRxBuffer(BytesReceived, 0, DoDisconnect)
If DoDisconnect Then
Console.WriteLine("ERR: DoDisconnect: " + Err.Description)
DoClose()
Else
s.BeginReceive(RxBuff, 0, RxBuffSize, Net.Sockets.SocketFlags.None, AddressOf hReceiveCallback, s)
End If
Else
' Close it
Console.WriteLine("ERR: BytesReceived=0: " + Err.Description)
DoClose()
End If
Exit Sub
Er:
Console.WriteLine("ERR: hReceiveCallback: " + Err.Description)
DoClose()
End Sub
Private Sub ProcessRxBuffer(BytesReceived As Integer, ByteOffset As Integer, ByRef DoDisconnect As Boolean)
' Console.WriteLine(String.Format("ProcessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset))
Select Case _CurrentState
Case States.WaitControlPacket, States.WaitCONNACK
' Need to wait for enough bytes to be able to read type and length
Dim BytesAvail As Integer = BytesReceived - ByteOffset
If BytesAvail = 0 Then
'DoDisconnect = True
Exit Sub
End If
If FlagsLookOk(RxBuff(ByteOffset)) = False Then
DoDisconnect = True
Exit Sub
End If
CurrentMessage = New Message With {.ControlPacketTypeByte = RxBuff(ByteOffset)}
' See if we get a length
Dim OutSize As Integer = 0
Dim BytesConsumed = BytesToLength(RxBuff, ByteOffset + 1, BytesReceived, OutSize)
Select Case BytesConsumed
Case Is <= 0, Is > 4
DoDisconnect = True
Exit Sub
End Select
If OutSize > 1024 * 1024 Then
' A little too big, well, for now... 256MB is max-ish
DoDisconnect = True
Exit Sub
End If
' Ok, if here, we should have a valid length
RxPayloadLen = OutSize
' Go to next state
_CurrentState = States.ReceivingControlPacket
' Call again
ProcessRxBuffer(BytesReceived, ByteOffset + 1 + BytesConsumed, DoDisconnect)
If DoDisconnect Then Exit Sub
Case States.ReceivingControlPacket
' Receive payload
' Get the number of bytes left to read
Dim BytesToRead As Int32 = CInt(RxPayloadLen - RxPayload.Position)
' See how many we have in RX buff
Dim BytesAvail As Int32 = BytesReceived - ByteOffset
' Read what we can
If BytesToRead > BytesAvail Then BytesToRead = BytesAvail
RxPayload.Write(RxBuff, ByteOffset, BytesToRead)
ByteOffset = ByteOffset + BytesToRead
' See where we are
If RxPayloadLen = RxPayload.Position Then
' We have a full buffer
CurrentMessage.Payload = RxPayload.ToArray
HandleMessage(CurrentMessage, DoDisconnect)
' Console.WriteLine(String.Format(" Received: {0}", CurrentMessage.ControlPacketType))
CurrentMessage = Nothing
RxPayload.Position = 0
RxPayload.SetLength(0)
_CurrentState = States.WaitControlPacket
End If
' Run more?
If ByteOffset < BytesReceived Then
ProcessRxBuffer(BytesReceived, ByteOffset, DoDisconnect)
End If
Case Else
DoDisconnect = True
End Select
'Console.WriteLine(String.Format("_rocessRxBuffer: RX:{0}, BO:{1}", BytesReceived, ByteOffset))
End Sub
Private Sub HandleMessage(Msg As Message, ByRef DoDisconnect As Boolean)
Select Case Msg.ControlPacketType
Case ControlPacketType.CONNACK
If Msg.PayloadLen <> msgCONNACK.FIXEDLENGTH Then
DoDisconnect = True
Exit Sub
End If
Dim M As New msgCONNACK(Msg.Payload)
Console.WriteLine("CONNACK: " + M.ReturnCode.ToString)
If M.ReturnCode <> msgCONNACK.ConnectReturnCode.ConnectionAccepted Then
DoDisconnect = True
Me.StayConnected = False ' It didn't like us, so don't hammer
Exit Sub
End If
IsReady = True
RaiseEvent Connected(Me)
Case ControlPacketType.PUBLISH
NotifyPUBLISH(New msgPUBLISH(Msg))
Case ControlPacketType.PUBACK
Case ControlPacketType.PINGREQ
' Not likely, as the Client is supposed to originate, not the server...
Dim M As New msgPINGREQ(Msg)
SendPINGRESP()
Case ControlPacketType.PINGRESP
Dim M As New msgPINGRESP(Msg)
Case ControlPacketType.SUBACK
Case ControlPacketType.UNSUBACK
End Select
End Sub
Private Sub NotifyPUBLISH(M As msgPUBLISH)
SyncLock lckPUBLISH
Messages.Enqueue(M)
End SyncLock
mrePUBLISH.Set()
End Sub
''' Returns a message if present, or Nothing if none
Public Function GetNextPUBLISHMessage() As msgPUBLISH
Dim RetVal As msgPUBLISH = Nothing
SyncLock lckPUBLISH
If Messages.Count > 0 Then RetVal = Messages.Dequeue
If Messages.Count = 0 Then mrePUBLISH.Reset()
End SyncLock
Return RetVal
End Function
Public ReadOnly Property PUBLISHWaitHandle As System.Threading.EventWaitHandle
Get
Return Me.mrePUBLISH
End Get
End Property
''' Returns a message if present, or Nothing if times out
Public Function WaitOnePUBLISHMessage(millisecondsTimeout As Integer) As msgPUBLISH
If Me.mrePUBLISH.WaitOne(millisecondsTimeout, False) Then
Return Me.GetNextPUBLISHMessage
End If
Return Nothing
End Function
Private _CurrentState As States = States.NotConnected
Public Enum States As Integer
NotConnected = 0
Connecting
SendCONNECT
WaitCONNACK
'ReceivingCONNACK
' Normal states
WaitControlPacket ' Once we send the connect packet, we go here. We always come back here
ReceivingControlPacket ' We are here while more data is coming in
End Enum
Public Enum ControlPacketType As Byte
Reserved_0 = 0 ' Forbidden Reserved
CONNECT = 1 ' Client to Server Client request to connect to Server
CONNACK = 2 ' Server to Client Connect acknowledgment
PUBLISH = 3 ' Client to Server or Server to Client Publish message
PUBACK = 4 ' Client to Server or Server to Client Publish acknowledgment
PUBREC = 5 ' Client to Server or Server to Client Publish received (assured delivery part 1)
PUBREL = 6 ' Client to Server or Server to Client Publish release (assured delivery part 2)
PUBCOMP = 7 ' Client to Server or Server to Client Publish complete (assured delivery part 3)
SUBSCRIBE = 8 ' Client to Server Client subscribe request
SUBACK = 9 ' Server to Client Subscribe acknowledgment
UNSUBSCRIBE = 10 ' Client to Server Unsubscribe request
UNSUBACK = 11 ' Server to Client Unsubscribe acknowledgment
PINGREQ = 12 ' Client to Server PING request
PINGRESP = 13 ' Server to Client PING response
DISCONNECT = 14 ' Client to Server Client is disconnecting
Reserved_15 = 15 ' Forbidden Reserved
End Enum
Public Class TopicFilters : Inherits System.Collections.Generic.List(Of TopicFilterQoS)
Public Class TopicFilterQoS
Public Property TopicFilter As String
Public Property QoS As Byte
Public Sub New()
End Sub
Public Sub New(TopicFilter As String, QoS As Byte)
Me.TopicFilter = TopicFilter
Me.QoS = QoS
End Sub
Public Sub Serialize(Stm As System.IO.Stream)
UTF8.WriteStringToStream(Stm, Me.TopicFilter)
Stm.WriteByte(Me.QoS)
End Sub
End Class
Public Sub Serialize(Stm As System.IO.Stream)
For Each T In Me
T.Serialize(Stm)
Next
End Sub
End Class
Public Class msgCONNACK
Public Const FIXEDLENGTH As Int32 = 2
Public Property AcknowledgeFlags As Byte
Public Property ReturnCode As ConnectReturnCode
Public Enum ConnectReturnCode As Byte
ConnectionAccepted = 0
ConnectionRefused_UnacceptableProtocolVersion = 1
ConnectionRefused_IdentifierRejected = 2
ConnectionRefused_ServerUnavailable = 3
ConnectionRefused_BadUsernameOrPassword = 4
ConnectionRefused_NotAuthorized = 5
End Enum
Public Sub New()
End Sub
Public Sub New(Payload() As Byte)
Me.AcknowledgeFlags = Payload(0)
Me.ReturnCode = DirectCast(Payload(1), ConnectReturnCode)
End Sub
End Class
Public Class msgPUBLISH
Public Property H_DUPFlag As Boolean
Public Property H_QoSLevel As Byte
Public Property H_Retain As Boolean
Public TopicName As String
Public PacketIdentifier As UInt16
Public Payload() As Byte = {}
Public Function PayloadToString() As String
Return System.Text.ASCIIEncoding.ASCII.GetString(Me.Payload)
End Function
Public Sub New()
End Sub
Public Sub New(Msg As Message)
H_DUPFlag = (Msg.Flags And &H1 << 3) <> 0
H_QoSLevel = CByte((Msg.Flags And &H3 << 1) >> 1)
H_Retain = (Msg.Flags And &H1 << 0) <> 0
Dim Offset As Integer = 0
' Get Topic Name
Dim TN As New UTF8(Msg.Payload, Offset)
Offset += TN.TotalLength
Me.TopicName = TN.ToString
' Packet Identifier?
If Me.H_QoSLevel > 0 Then
Me.PacketIdentifier = ReadUInt16_BE(Msg.Payload, Offset)
Offset += 2
End If
' The rest of the payload
Dim FinalPayloadLen As Integer = Msg.PayloadLen - Offset
Array.Resize(Me.Payload, FinalPayloadLen)
Array.Copy(Msg.Payload, Offset, Me.Payload, 0, FinalPayloadLen)
' Done!
End Sub
End Class
Public Class msgPINGREQ
Public Sub New()
End Sub
Public Sub New(Msg As Message)
' No header or payload payload..
' Done!
End Sub
End Class
Public Class msgPINGRESP
Public Sub New()
End Sub
Public Sub New(Msg As Message)
' No header or payload payload..
' Done!
End Sub
End Class
Public Enum ConnectFlags As Byte
UserNameFlag = 1 << 7
PasswordFlag = 1 << 6
WillRetain = 1 << 5
WillQoS_1 = 1 << 3
WillQoS_2 = 2 << 3
WillFlag = 1 << 2
CleanSession = 1 << 1
Reserved = 1 << 0
End Enum
Public Class UTF8
Private _BA() As Byte = {}
Public ReadOnly Property TotalLength As Integer
Get
Return 2 + _BA.Length
End Get
End Property
Public Sub New(S As String)
_BA = System.Text.UTF8Encoding.UTF8.GetBytes(S)
If _BA.Length > UInt16.MaxValue Then Throw New Exception("Too many bytes")
End Sub
Public Sub New(B() As Byte)
' Assumes first two bytes are length
Dim BLen As Integer = B.Length
Dim BLen_m2 As Integer = BLen - 2
If BLen_m2 < 0 Then Throw New Exception("Too few bytes")
If BLen_m2 > UInt16.MaxValue Then Throw New Exception("Too many bytes")
Array.Resize(_BA, BLen_m2)
Array.Copy(B, 2, _BA, 0, BLen_m2)
End Sub
Public Sub New(B() As Byte, Offset As Integer)
' Assumes first two bytes are length
Dim BLen As Integer = ReadUInt16_BE(B, Offset)
Array.Resize(_BA, BLen)
Array.Copy(B, Offset + 2, _BA, 0, BLen)
End Sub
Public Sub New(Stm As System.IO.Stream)
' Read length
Dim BLen As Integer = ReadUInt16_BE(Stm)
Array.Resize(_BA, BLen)
Stm.Read(_BA, 0, BLen)
End Sub
Public Overrides Function ToString() As String
Return System.Text.UTF8Encoding.UTF8.GetString(_BA)
End Function
Public Function GetByteArray() As Byte()
Dim BA2() As Byte = {}
Dim BLen As Integer = _BA.Length
' Copy data over
Array.Resize(BA2, BLen + 2)
Array.Copy(_BA, 0, BA2, 2, BLen)
WriteUInt16_BE(BA2, 0, CUShort(BLen))
Return BA2
End Function
Public Sub WriteToStream(Stm As System.IO.Stream)
Dim BLen As Integer = _BA.Length
WriteUInt16_BE(Stm, CUShort(BLen))
Stm.Write(_BA, 0, BLen)
End Sub
Public Shared Sub WriteStringToStream(Stm As System.IO.Stream, S As String)
Dim U As New UTF8(S)
U.WriteToStream(Stm)
End Sub
End Class
Private Shared Function FlagsLookOk(B As Byte) As Boolean
Const B_F As Byte = &HF
' Checks the flags field to see if they are ok, per spec
Dim CPT As ControlPacketType = DirectCast(B >> 4, ControlPacketType)
Dim Flags As Byte = B And B_F
Select Case CPT
Case ControlPacketType.CONNECT : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.CONNACK : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.PUBLISH : Return True ' Used in MQTT 3.1.1 DUP1 QoS2 QoS2 RETAIN3
Case ControlPacketType.PUBACK : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.PUBREC : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.PUBREL : Return Flags = 2 ' Reserved 0010
Case ControlPacketType.PUBCOMP : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.SUBSCRIBE : Return Flags = 2 ' Reserved 0010
Case ControlPacketType.SUBACK : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.UNSUBSCRIBE : Return Flags = 2 ' Reserved 0010
Case ControlPacketType.UNSUBACK : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.PINGREQ : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.PINGRESP : Return Flags = 0 ' Reserved 0000
Case ControlPacketType.DISCONNECT : Return Flags = 0 ' Reserved 0000
End Select
Return False ' I don't think we can get here...
End Function
Private Shared Sub WriteUInt16_BE(Arr() As Byte, Offset As Int32, Value As UInt16)
Arr(Offset + 1) = CByte(Value And 255US)
Value = Value >> 8
Arr(Offset) = CByte(Value And 255US)
End Sub
Private Shared Sub WriteUInt16_BE(Stm As System.IO.Stream, Value As UInt16)
Dim BA() As Byte = {0, 0}
WriteUInt16_BE(BA, 0, Value)
Stm.Write(BA, 0, 2)
End Sub
Public Shared Function ReadUInt16_BE(Arr() As Byte, Offset As Int32) As UInt16
Dim V As UInt16 = 0
V = CByte(Arr(Offset + 1))
V = V Or (CUShort(Arr(Offset + 0)) << 8)
Return V
End Function
Public Shared Function ReadUInt16_BE(Stm As System.IO.Stream) As UInt16
Dim BA(0 To 1) As Byte
Stm.Read(BA, 0, 2)
Return ReadUInt16_BE(BA, 0)
End Function
Private Sub SendControlPacket(CPT As ControlPacketType, Flags As Byte, Payload() As Byte)
Const B15 As Byte = 15
If _Socket Is Nothing Then Exit Sub
If _Socket.Connected = False Then Exit Sub
Dim PayloadLen As Integer = Payload.Length
Dim LenBA() As Byte = LengthToBytes(PayloadLen)
Dim OutBA() As Byte = {}
Array.Resize(OutBA, 1 + LenBA.Length + PayloadLen)
OutBA(0) = (CPT << 4) Or (Flags And B15)
Array.Copy(LenBA, 0, OutBA, 1, LenBA.Length)
Array.Copy(Payload, 0, OutBA, 1 + LenBA.Length, PayloadLen)
' Send
' Console.WriteLine(" send")
_Socket.Send(OutBA)
ResetCntPing()
End Sub
Private Sub SendCONNECT(_S As System.Net.Sockets.Socket)
If _S.Connected = False Then Exit Sub
' Variable Header
Dim MS_V As New System.IO.MemoryStream(256)
' Protocol Name
UTF8.WriteStringToStream(MS_V, "MQTT") '1-6
MS_V.WriteByte(4) 'Protocol Level byte - 7
' Build Connect Flags
Dim CF As ConnectFlags = Connect_WillQOS
If Connect_CleanSession Then CF = CF Or ConnectFlags.CleanSession
If Connect_Password.Length > 0 Then CF = CF Or ConnectFlags.PasswordFlag
If Connect_UserName.Length > 0 Then CF = CF Or ConnectFlags.UserNameFlag
If Connect_WillRetain Then CF = CF Or ConnectFlags.WillRetain
If Connect_WillMessage.Length > 0 Then CF = CF Or ConnectFlags.WillFlag
MS_V.WriteByte(CF) '8
WriteUInt16_BE(MS_V, Connect_KeepAlive) ' 9-10
' Payload
'Client Identifier, Will Topic, Will Message, User Name, Password
UTF8.WriteStringToStream(MS_V, Connect_ClientID)
If (CF And ConnectFlags.WillFlag) <> 0 Then
UTF8.WriteStringToStream(MS_V, Connect_WillTopic)
UTF8.WriteStringToStream(MS_V, Connect_WillMessage)
End If
If (CF And ConnectFlags.UserNameFlag) <> 0 Then
UTF8.WriteStringToStream(MS_V, Connect_UserName)
End If
If (CF And ConnectFlags.PasswordFlag) <> 0 Then
UTF8.WriteStringToStream(MS_V, Connect_Password) ' Should be binary...
End If
Me._CurrentState = States.WaitCONNACK ' Set before to avoid race condition
SendControlPacket(ControlPacketType.CONNECT, 0, MS_V.ToArray)
End Sub
Public Sub SendPUBLISH_QoS0(Topic As String, Payload() As Byte, Optional Retain As Boolean = False)
' Variable Header
Const B_1 As Byte = 1
Dim MS_V As New System.IO.MemoryStream(256 + Payload.Length)
Dim Flags As Byte = 0
If Retain Then Flags = Flags Or B_1
' Topic
UTF8.WriteStringToStream(MS_V, Topic)
' Packet ID
' Payload
MS_V.Write(Payload, 0, Payload.Length)
' Send!
SendControlPacket(ControlPacketType.PUBLISH, Flags, MS_V.ToArray)
End Sub
Public Sub SendSUBSCRIBE(TopicFilters As TopicFilters)
' Variable Header
Dim MS_V As New System.IO.MemoryStream(256)
' Packet Identifier (should save and ref it...)
WriteUInt16_BE(MS_V, GetPacketIdentifier)
' Topics / QoSs
TopicFilters.Serialize(MS_V)
' Send!
SendControlPacket(ControlPacketType.SUBSCRIBE, 2, MS_V.ToArray)
End Sub
Public Sub SendPINGREQ()
' Send!
SendControlPacket(ControlPacketType.PINGREQ, 0, {})
End Sub
Public Sub SendPINGRESP()
' Send!
SendControlPacket(ControlPacketType.PINGRESP, 0, {})
End Sub
Public Sub SendDISCONNECT()
' Send!
Me.StayConnected = False
SendControlPacket(ControlPacketType.DISCONNECT, 0, {})
Console.WriteLine("SendDISCONNECT: " + Err.Description)
DoClose()
End Sub
Private Shared Function LengthToBytes(V As Int32) As Byte()
Dim Q As New System.Collections.Generic.Queue(Of Byte)(8)
Dim B As Byte
Const B128 As Byte = 128
While True
B = CByte(V And &H7F)
V = V >> 7
If V = 0 Then
Q.Enqueue(B)
Exit While
Else
Q.Enqueue(B Or B128)
End If
End While
Return Q.ToArray
End Function
Private Shared Function BytesToLength(BA() As Byte, ByVal Offset As Integer, BufferLen As Integer, ByRef OutSize As Integer) As Integer
' Returns 1-4 on ok, <=0 on error (Too little data, etc). Returns number of consumed bytes
Dim ConsumedBytes As Byte = 0
Dim BALen As Integer = BufferLen 'BA.Length
Dim B As Byte
Const B128 As Byte = 128
Const B1 As Byte = 1
' Read bytes, until MSB is clear
OutSize = 0
' LSB is sent first..
Dim ShiftCnt As Integer = 0
While True
If Offset >= BALen Then Return -1
B = BA(Offset)
ConsumedBytes += B1
Offset += 1
OutSize = OutSize Or (CInt(B And 127) << (7 * ShiftCnt))
If (B And B128) = 0 Then Exit While
ShiftCnt += 1
If ShiftCnt > 4 Then
Return -2
End If
End While
Return ConsumedBytes
End Function
End Class