"""Implementation for Cisco's NetFlow Version 9 flow-record format.
NetFlow is an exchange protocol between a server (Exporter in Cisco parlance)
and a client (Collector in Cisco parlance). A stream of packets is sent from
the Exporter to the Collector. Each packet can represent several IP flows. A
diagram of a single packet is shown below::
+--------+------------------+--------------+--------------+-----+------------------+--------------+
| Header | Template FlowSet | Data FlowSet | Data FlowSet | ... | Template FlowSet | Data FlowSet |
+--------+------------------+--------------+--------------+-----+------------------+--------------+
This implementation can serialize and deserialize a packet, but the stream is a
read-only representation of serialized packets.
The full documentation of this protocol is `NetflowV9`_.
.. _NetflowV9:
https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.pdf
"""
import functools
import io
import struct
import time
import attr
import enum
from .utils import read_and_unpack
s_header = struct.Struct("!HHIIII")
s_flowset = struct.Struct("!H")
s_type_length = struct.Struct("!HH")
class FieldType(enum.Enum):
IN_BYTES = (1, int)
IN_PKTS = (2, int)
FLOWS = (3, int)
PROTOCOL = (4, int)
SRC_TOS = (5, int)
TCP_FLAGS = (6, int)
L4_SRC_PORT = (7, int)
IPV4_SRC_ADDR = (8, int)
SRC_MASK = (9, int)
INPUT_SNMP = (10, int)
L4_DST_PORT = (11, int)
IPV4_DST_ADDR = (12, int)
DST_MASK = (13, int)
OUTPUT_SNMP = (14, int)
IPV4_NEXT_HOP = (15, int)
SRC_AS = (16, int)
DST_AS = (17, int)
BGP_IPV4_NEXT_HOP = (18, int)
MUL_DST_PKTS = (19, int)
MUL_DST_BYTES = (20, int)
LAST_SWITCHED = (21, int)
FIRST_SWITCHED = (22, int)
OUT_BYTES = (23, int)
OUT_PKTS = (24, int)
MIN_PKT_LNGTH = (25, int)
MAX_PKT_LNGTH = (26, int)
IPV6_SRC_ADDR = (27, bytes)
IPV6_DST_ADDR = (28, bytes)
IPV6_SRC_MASK = (29, int)
IPV6_DST_MASK = (30, int)
IPV6_FLOW_LABEL = (31, bytes)
ICMP_TYPE = (32, int)
MUL_IGMP_TYPE = (33, int)
SAMPLING_INTERVAL = (34, int)
SAMPLING_ALGORITHM = (35, int)
FLOW_ACTIVE_TIMEOUT = (36, int)
FLOW_INACTIVE_TIMEOUT = (37, int)
ENGINE_TYPE = (38, int)
ENGINE_ID = (39, int)
TOTAL_BYTES_EXP = (40, int)
TOTAL_PKTS_EXP = (41, int)
TOTAL_FLOWS_EXP = (42, int)
IPV4_SRC_PREFIX = (44, int)
IPV4_DST_PREFIX = (45, int)
MPLS_TOP_LABEL_TYPE = (46, int)
MPLS_TOP_LABEL_IP_ADDR = (47, int)
FLOW_SAMPLER_ID = (48, int)
FLOW_SAMPLER_MODE = (49, int)
FLOW_SAMPLER_RANDOM_INTERVAL = (50, int)
MIN_TTL = (52, int)
MAX_TTL = (53, int)
IPV4_IDENT = (54, int)
DST_TOS = (55, int)
IN_SRC_MAC = (56, bytes)
OUT_DST_MAC = (57, bytes)
SRC_VLAN = (58, int)
DST_VLAN = (59, int)
IP_PROTOCOL_VERSION = (60, int)
DIRECTION = (61, int)
IPV6_NEXT_HOP = (62, bytes)
BPG_IPV6_NEXT_HOP = (63, bytes)
IPV6_OPTION_HEADERS = (64, int)
MPLS_LABEL_1 = (70, bytes)
MPLS_LABEL_2 = (71, bytes)
MPLS_LABEL_3 = (72, bytes)
MPLS_LABEL_4 = (73, bytes)
MPLS_LABEL_5 = (74, bytes)
MPLS_LABEL_6 = (75, bytes)
MPLS_LABEL_7 = (76, bytes)
MPLS_LABEL_8 = (77, bytes)
MPLS_LABEL_9 = (78, bytes)
MPLS_LABEL_10 = (79, bytes)
IN_DST_MAC = (80, bytes)
OUT_SRC_MAC = (81, bytes)
IF_NAME = (82, str)
IF_DESC = (83, str)
SAMPLER_NAME = (84, str)
IN_PERMANENT_BYTES = (85, int)
IN_PERMANENT_PKTS = (86, int)
FRAGMENT_OFFSET = (88, int)
FORWARDING_STATUS = (89, int)
MPLS_PAL_RD = (90, bytes)
MPLS_PREFIX_LEN = (91, int)
SRC_TRAFFIC_INDEX = (92, int)
DST_TRAFFIC_INDEX = (93, int)
APPLICATION_DESCRIPTION = (94, str)
APPLICATION_TAG = (95, bytes)
APPLICATION_NAME = (96, str)
POST_IP_DIFF_SERV_CODE_POINT = (98, int)
REPLICATION_FACTOR = (99, int)
LAYER2_PACKET_SECTION_OFFSET = (102, int)
LAYER2_PACKET_SECTION_SIZE = (103, int)
LAYER2_PACKET_SECTION_DATA = (104, bytes)
def __new__(cls, code, dtype):
obj = object.__new__(cls)
obj._value_ = code
obj.dtype = dtype
return obj
@attr.s
class Header(object):
"""Packet metadata.
Parameters:
version : int, optional
The version of NetFlow records exported in a packet (defaults to 9).
count : int, optional
Number of FlowSet records contained within a packet.
uptime : int, optional
Time in milliseconds since an export device was first booted.
datetime : int, optional
Seconds since 0000 Coordinated Universal Time (UTC) 1970.
sequence : int, optional
Incremental sequence counter of all packets sent by an export device.
source_id : int, optional
Vendor-specific uniqueness ID.
"""
version = attr.ib(type=int, default=9)
count = attr.ib(type=int, default=0)
uptime = attr.ib(type=int, default=0)
datetime = attr.ib(type=int)
sequence = attr.ib(type=int, default=0)
source_id = attr.ib(type=int, default=0)
@datetime.default
def current_unix_seconds(self):
return int(time.time())
@staticmethod
def decode(source):
return Header(*read_and_unpack(source, s_header))
def encode(self):
return s_header.pack(self.version,
self.count,
self.uptime,
self.datetime,
self.sequence,
self.source_id)
def create_struct(dtype, length):
if dtype is int:
if length == 1:
code = 'B'
elif length == 2:
code = 'H'
elif length == 4:
code = 'I'
elif length == 8:
code = 'Q'
else:
raise ValueError("invalid integer length: {}".format(length))
elif dtype is bytes:
code = "{}B".format(length)
elif dtype is str:
code = "{}s".format(length)
else:
raise ValueError("invalid datatype: {}".format(dtype))
return struct.Struct('!' + code)
@attr.s
class TemplateField(object):
"""A definition of an individual column in a template.
Parameters:
type : FieldType
length : int
Length of the above type, in bytes.
"""
type = attr.ib(type=FieldType)
length = attr.ib(type=int)
@property
def struct(self):
if not hasattr(self, '_struct'):
self._struct = create_struct(self.type.dtype, self.length)
return self._struct
@staticmethod
def decode(source):
type, length = read_and_unpack(source, s_type_length)
return TemplateField(FieldType(type), length)
def encode(self):
return s_type_length.pack(self.type.value, self.length)
class TemplateRecord(object):
"""A definition of data records received in subsequent export packets.
Parameters:
id : int
Unique ID for given template. Only values at or greater than 256
are allowed.
fields : iterable of TemplateField, optional
A collection of fields defined for a template.
"""
def __init__(self, id, fields=None):
self.id = id
self.fields = fields if fields else []
def __eq__(self, other):
return self.id == other.id and sorted(self.fields) == sorted(other.fields)
def __len__(self):
return s_type_length.size + sum(field.length for field in self.fields)
def __iter__(self):
return iter(self.fields)
@staticmethod
def decode(source):
template_id, nfields = read_and_unpack(source, s_type_length)
template = TemplateRecord(template_id)
for _ in range(nfields):
template.fields.append(TemplateField.decode(source))
return template
def encode(self):
raw = s_type_length.pack(self.id, len(self.fields))
for field in self.fields:
raw += field.encode()
return raw
class TemplateFlowSet(object):
"""A collection of template records grouped together in an export packet.
Parameters:
templates : iterable, optional
A collection of template records.
"""
def __init__(self, templates=None):
self.id = 0
self.templates = {template.id: template for template in templates} if templates else {}
def __eq__(self, other):
return self.id == other.id and self.templates == other.templates
def __len__(self):
nbytes = s_type_length.size
for template in self.templates.values():
nbytes += len(template)
return nbytes
def __getitem__(self, key):
return self.templates[key]
def __iter__(self):
return iter(self.templates)
@staticmethod
def decode(source):
fs = TemplateFlowSet()
_, length = read_and_unpack(source, s_type_length)
offset = s_type_length.size
while offset < length:
template = TemplateRecord.decode(source)
fs.templates[template.id] = template
offset += len(template)
return fs
def encode(self):
raw = s_type_length.pack(self.id, len(self))
for template in self.templates.values():
raw += template.encode()
return raw
class DataFlowSet(object):
"""A collection of data records grouped together in an export packet.
Parameters:
id : int
Unique ID for given template. Only values at or greater than 256
are allowed.
payload : bytes or list
Either an encoded byte stream of data records or a list of decoded
data records.
templates : dict
A dictionary of template records, keyed by given TemplateRecord id.
"""
def __init__(self, id, payload, templates):
self.template = templates[id]
self.records = []
self.record_length = len(self.template) - s_type_length.size
if isinstance(payload, bytes):
source = io.BytesIO(payload)
remaining = len(payload)
while remaining >= self.record_length:
self.records.append([read_and_unpack(source, field.struct)[0] for field in self.template])
remaining -= self.record_length
elif isinstance(payload, list):
self.records = payload
def __len__(self):
return s_type_length.size + len(self.records) * self.record_length
def __iter__(self):
return iter(self.records)
@staticmethod
def decode(source):
id, length = read_and_unpack(source, s_type_length)
payload = source.read(length - s_type_length.size)
return functools.partial(DataFlowSet, id, payload)
def encode(self):
raw = s_type_length.pack(self.template.id, len(self))
for record in self.records:
for field, value in zip(self.template.fields, record):
raw += field.struct.pack(value)
return raw
def decode_flowset(source):
# Peek ahead to find flowset ID
loc = source.tell()
raw = source.read(s_flowset.size)
source.seek(loc)
flowset_id = s_flowset.unpack(raw)[0]
if flowset_id == 0:
return TemplateFlowSet.decode(source)
if flowset_id > 255:
return DataFlowSet.decode(source)
return None
class ExportPacket(object):
"""A packet containing IP flows sent from a router to a collector.
Parameters:
flowsets : iterable
A collection of template and/or data flowsets.
header : Header, optional
Packet metadata for given flowsets. If None, then a header with
reasonable defaults is created.
"""
def __init__(self, flowsets, header=None):
self.header = header if header else Header(count=len(flowsets))
self.flowsets = flowsets
def update_cache(self, cache):
"""Update cache of template records."""
for flowset in self.flowsets:
if not isinstance(flowset, TemplateFlowSet):
continue
for id, record in flowset.templates.items():
cache[id] = record
def apply(self, templates):
"""Deserialize partially-decoded data flowsets.
Deserialization of a data flowset is a two-step process because we
cannot assume the needed template is available when we encounter the
data flowset. Thus, we place the deserialization process on hold until
a packet is fully read. Then we re-scan the partially-decoded data
flowsets and finish deserialization.
"""
for i, flowset in enumerate(self.flowsets):
if isinstance(flowset, functools.partial):
self.flowsets[i] = flowset(templates)
@staticmethod
def decode(source):
header = Header.decode(source)
flowsets = []
for _ in range(header.count):
flowset = decode_flowset(source)
if flowset:
flowsets.append(flowset)
return ExportPacket(flowsets, header=header)
def encode(self):
raw = self.header.encode()
for flowset in self.flowsets:
if isinstance(flowset, functools.partial):
continue
raw += flowset.encode()
return raw
[docs]class PacketStream(object):
"""A read-only representation of serialized packets.
Parameters:
source : file-like object
Read-only input for packets.
"""
def __init__(self, source):
self._source = source
self._cache = {}
def next(self):
try:
packet = ExportPacket.decode(self._source)
except:
raise StopIteration
# Add templates to cache
packet.update_cache(self._cache)
# Finish deserialization
packet.apply(self._cache)
return packet
def __next__(self):
return self.next()
def __iter__(self):
return self
def close(self):
return self._source.close()
[docs]class RecordStream(PacketStream):
"""A read-only representation of serialized data records.
Parameters:
source : file-like object
Read-only input for data records.
"""
def __init__(self, source):
super(RecordStream, self).__init__(source)
self._queue = []
def next(self):
while len(self._queue) == 0:
packet = super(RecordStream, self).next()
for flowset in packet.flowsets:
if not isinstance(flowset, DataFlowSet):
continue
keys = [field.type.name.lower() for field in flowset.template.fields]
for record in flowset.records:
self._queue.append(dict(zip(keys, record)))
return self._queue.pop(0)
def close(self):
self._queue = []
return super(RecordStream, self).close()