PySpark for time-series data, discussing data ingestion, extraction, and visualization with practical implementation code.
Extract the 11 elements from each log
def map_log(line):
match = re.search('^(S+) (S+) (S+) (S+) [- » (d{4})] "(S+)s*(S+)s*(S+)s*(+)?s*"* (d{3}) (S+)',line)
if match is None:
match = re.search('^(S+) (S+) (S+) (S+) [- » (d{4})] "(S+)s*(+)>*( w/s. » +)s*(S+)s*(d{3})s*(S+)',line)
return(match.groups())
parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line 1 » == 1).map(lambda line : line 0 » )
parsed_rdd2 = parsed_rdd.map(lambda line: map_log(line))