"""
Module providing a custom CSV file parser with support for whitespace trimming, empty lines filtering and comment lines
"""
import typing
import sys
from functools import partial
from csvlike.helpers import make_printable
[docs]class InvalidDialectError(ValueError):
"""An invalid dialect was supplied"""
[docs]class UnknownDialectError(ValueError):
"""An unknown dialect was requested"""
[docs]class CSVParserError(ValueError):
"""Some error occured while attempting to parse the file"""
def __init__(self, message, line, char, index):
self.message = message
self.line = line
self.char = char
self.index = index
[docs]class UnclosedQuoteError(CSVParserError):
"""A quote wasn't properly closed"""
[docs]class UnallowedQuoteError(CSVParserError):
"""A quote is not allowed there"""
[docs]class Dialect:
"""Abstract base class for Dialect"""
delimiter = None
quotechar = None
commentchar = None
trimleft = None
trimright = None
[docs]class DefaultDialect(Dialect):
"""Default csv-like dialect, using comma separators, double quote chars and hash as the comment char
with newline and tab char trimming."""
delimiter = ','
quotechar = '"'
commentchar = '#'
trimleft = ' \t\n\r'
trimright = trimleft
ignoreemptylines = True
[docs]class WhitespaceDialect(DefaultDialect):
"""Same as default Dialect, but with spaces also trimmed."""
delimiter = ' \t'
known_dialects = {
"default": DefaultDialect,
"whitespace": WhitespaceDialect
}
MODE_FIRST = 0
MODE_OUTSIDE = 1
MODE_INSIDE = 2
MODE_INSIDE_QUOTED = 3
MODE_INSIDE_QUOTED_QUOTE = 4
MODE_COMMENT = 5
Field = str
[docs]class Line(list):
@property
def lineno(self):
return self.__dict__['lineno']
# TODO: don't really know if it's quoted atm
# class Field(str, object):
# @property
# def quoted(self):
# return self.__dict__['quoted']
#
# @quoted.setter
# def quoted(self, value):
# self.__dict__['quoted'] = bool(value)
[docs]class Reader:
"""
CSV-like file reader with support for comment chars, ignoring empty lines
and whitespace trimming on both sides of each field.
"""
def __init__(self, file: typing.TextIO, dialect: Dialect, debug=None):
if not issubclass(dialect, Dialect):
raise InvalidDialectError("Invalid dialect", dialect)
self.line_num = 0
self._dialect = dialect
self._file = file
self._debug = bool(debug)
def _trimright(self, data: str):
chars = self._dialect.trimright
if chars is None:
return data
return data.rstrip(chars)
def _is_ignore_left(self, char: str):
if self._dialect.trimleft is None:
return False
return char in self._dialect.trimleft
def _is_ignore_right(self, char: str):
if self._dialect.trimright is None:
# currently no dialect with no trimright
return False # pragma: nocover
return char in self._dialect.trimright
def _is_comment(self, char: str):
if self._dialect.commentchar is None:
return False
return char in self._dialect.commentchar
def _is_quote(self, char: str):
if self._dialect.quotechar is None:
return False
return char == self._dialect.quotechar
def _is_delimiter(self, char: str):
return char in self._dialect.delimiter
def __iter__(self):
readchar = iter(partial(self._file.read, 1), '')
cur_line = 1
if self._debug:
current_module = sys.modules[__name__]
# print the color key the different modes
print('MODES: ', end='')
print(' '.join(['\033[1;%d;40m%s\033[0;0m' % (32 + getattr(current_module, name), name[5:])
for name in dir(current_module)
if name.startswith('MODE_')
]))
def debug(txt='', args=tuple(), **kwargs):
if callable(args):
args = tuple(args())
print(txt % args, **kwargs)
pass
else:
def debug(*args, **kwargs):
pass
newlinechars = '\n\r'
mode = MODE_FIRST
field = []
line = Line()
if self._dialect.trimright is not None:
delimiter_is_whitespace = self._dialect.delimiter in self._dialect.trimright
else:
delimiter_is_whitespace = False
def yield_line():
nonlocal line, field, mode, delimiter_is_whitespace, is_newline, cur_line
if not(mode == MODE_OUTSIDE and delimiter_is_whitespace):
next_field()
field = []
_line = line
_line.__dict__['lineno'] = cur_line
line = Line()
mode = MODE_FIRST
return _line
def next_field():
nonlocal field, line, mode
field = ''.join(field)
if mode != MODE_INSIDE_QUOTED_QUOTE:
field = self._trimright(field)
field = Field(field)
line.append(field)
field = []
mode = MODE_OUTSIDE
cur_char = 0
last_quote_line = None
last_quote_char = None
last_quote_idx = None
idx = 0
for char in readchar:
cur_char += 1
idx += 1
# print char to stdout with color defining mode
debug('\033[1;%d;40m%s\033[0;0m', lambda: (32+mode, make_printable(char)), end='')
is_newline = char in newlinechars
if is_newline:
cur_line += 1
cur_char = 0
if mode == MODE_COMMENT:
if is_newline:
mode = MODE_FIRST
continue
if mode in (MODE_OUTSIDE, MODE_FIRST):
if is_newline:
if mode != MODE_FIRST:
yield yield_line()
continue
if self._is_ignore_left(char):
continue
if self._is_comment(char):
if mode is MODE_OUTSIDE:
yield yield_line()
mode = MODE_COMMENT
continue
if self._is_quote(char):
mode = MODE_INSIDE_QUOTED
last_quote_line = cur_line
last_quote_char = cur_char
last_quote_idx = idx
continue
if self._is_delimiter(char):
next_field()
continue
mode = MODE_INSIDE
field.append(char)
continue
if mode == MODE_INSIDE:
if self._is_quote(char):
raise UnallowedQuoteError("Quote not allowed here", cur_line, cur_char, idx)
if is_newline:
yield yield_line()
continue
if self._is_delimiter(char):
next_field()
continue
field.append(char)
continue
if mode == MODE_INSIDE_QUOTED_QUOTE:
if self._is_quote(char):
field.append(char)
mode = MODE_INSIDE_QUOTED
continue
if self._is_delimiter(char):
next_field()
continue
if is_newline:
yield yield_line()
continue
if not delimiter_is_whitespace:
if self._is_ignore_right(char):
continue
if self._is_comment(char):
yield yield_line()
mode = MODE_COMMENT
continue
raise UnallowedQuoteError("Single quote inside quoted field", cur_line, cur_char, idx)
if mode == MODE_INSIDE_QUOTED:
if self._is_quote(char):
mode = MODE_INSIDE_QUOTED_QUOTE
continue
field.append(char)
continue
debug()
if mode == MODE_INSIDE_QUOTED:
raise UnclosedQuoteError("Unexpected end", last_quote_line, last_quote_char, last_quote_idx)
if mode in (MODE_INSIDE_QUOTED_QUOTE, MODE_OUTSIDE, MODE_INSIDE):
yield yield_line()
[docs]def reader(file: typing.TextIO, dialect: typing.Union[None, str, Dialect] = None, **kwargs) -> Reader:
if dialect is None:
dialect = DefaultDialect
elif type(dialect) is str:
if dialect not in known_dialects:
raise UnknownDialectError("Dialect not known", dialect)
dialect = known_dialects[dialect]
return Reader(file, dialect, **kwargs)