Enumerating Device Messages
This tutorial describes the process for enumerating device messages from a received Google Pub/Sub message using C# .NET.
Receiving Messages From a Google Pub/Sub Subscription
Upon receiving a Pub/Sub message, the message handler constructs a PubSubBatch instance from the message payload. The PubSubBatch implements the IEnumerable<PubSubFrame> interface, allowing enumeration of PubSubFrame objects, each of which represents a vehicle data frame.
Each PubSubFrame implements the IEnumerable<DeviceMessage> interface, enabling iteration over the contained DeviceMessage instances.
private Task<SubscriberClient.Reply> HandleMessageAsync(PubsubMessage batch, CancellationToken cancel)
{
Console.WriteLine($"Received pubsub message - length: {batch.Data.Length}");
try
{
// Create a batch from the received pubsub message
foreach (PubSubFrame frame in new PubSubBatch(batch.Data.Memory))
{
// Enumerate vehicle data frames
foreach (DeviceMessage message in frame)
{
// Do something with messages
}
}
}
catch (Exception e)
{
Console.WriteLine($"ERROR: {e.Message}");
}
return Task.FromResult(SubscriberClient.Reply.Ack);
}
The frame.VehicleId property should be used to filter messages associated with specific vehicles.
Enumerating Vehicle Data Frames
The PubSubBatch is a read-only structure that implements the IEnumerable<PubSubFrame> interface. During enumeration, the underlying data bytes are parsed to identify vehicle data frames. If the data does not conform to the expected vehicle data frame format, an exception is thrown.
public readonly struct PubSubBatch(ReadOnlyMemory<byte> data) : IEnumerable<PubSubFrame>
{
public ReadOnlyMemory<byte> Data { get; } = data;
public IEnumerator<PubSubFrame> GetEnumerator()
{
const int VEHICLE_ID_LENGTH = 16;
const char HEADER_CHAR = 'F';
const int VERSION_V1 = 0x01;
for (int idx = 0; idx < Data.Length;)
{
byte header = Data.Span[idx++];
if (header != HEADER_CHAR)
{
throw new Exception("Invalid frame header");
}
byte version = Data.Span[idx++];
if (version != VERSION_V1)
{
throw new Exception("Unsupported version");
}
ReadOnlyMemory<byte> vId = Data.Slice(idx, VEHICLE_ID_LENGTH);
Guid vGuid = new(vId.ToArray());
string vehicleId = vGuid.ToString();
idx += VEHICLE_ID_LENGTH;
int length = BinaryPrimitives.ReadInt32LittleEndian(Data.Span[idx..]);
idx += sizeof(int);
ReadOnlyMemory<byte> data = ReadOnlyMemory<byte>.Empty;
if (length < 0)
{
throw new Exception("Invalid frame length");
}
else if (length > 0)
{
data = Data.Slice(idx, length);
idx += length;
}
yield return new PubSubFrame()
{
Version = version,
VehicleId = vehicleId,
Length = length,
Data = data
};
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
Enumerating Messages
The PubSubFrame is a read-only structure that implements the IEnumerable<DeviceMessage> interface. During enumeration, the underlying data bytes are parsed to identify individual messages. If the data does not conform to the expected message structure, an exception is thrown.
public readonly struct DeviceMessage
{
public byte Type { get; init; }
public int Length { get; init; }
public ReadOnlyMemory<byte> Data { get; init; }
}
public readonly struct PubSubFrame : IEnumerable<DeviceMessage>
{
public int Version { get; init; }
public string VehicleId { get; init; }
public int Length { get; init; }
public ReadOnlyMemory<byte> Data { get; init; }
public IEnumerator<DeviceMessage> GetEnumerator()
{
for (int idx = 0; idx < Data.Length;)
{
byte type = Data.Span[idx++];
byte length = Data.Span[idx++];
if (length < 0)
{
throw new Exception("Invalid message length");
}
ReadOnlyMemory<byte> data = ReadOnlyMemory<byte>.Empty;
if (length > 0)
{
data = Data.Slice(idx, length);
idx += length;
}
yield return new DeviceMessage()
{
Type = type,
Length = length,
Data = data
};
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}