Source code for intake_netflow.source

from dask.bytes import open_files

from intake.source import base
from . import __version__


[docs]class NetflowSource(base.DataSource): name = 'netflow' version = __version__ container = 'python' partition_access = True def __init__(self, urlpath, metadata=None): """Source to load Cisco Netflow packets as sequence of Python dicts. Parameters: urlpath : str Location of the data files; can include protocol and glob characters. """ self._urlpath = urlpath super(NetflowSource, self).__init__(metadata=metadata) def _get_schema(self): self._streams = open_files(self._urlpath, mode='rb') self.npartitions = len(self._streams) return base.Schema(datashape=None, dtype=None, shape=None, npartitions=len(self._streams), extra_metadata={}) def _get_partition(self, i): return read_stream(self._streams[i])
[docs] def read(self): return self.to_dask().compute()
[docs] def to_dask(self): import dask.delayed import dask.bag as db dpart = dask.delayed(read_stream) parts = [dpart(stream) for stream in self._streams] return db.from_delayed(parts)
def _close(self): self._streams = None
def read_stream(stream): from .v9 import RecordStream with stream as f: return list(RecordStream(f))