123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- #! /usr/bin/env python3
- #
- # Script to provision a new application layer parser and/or logger.
- import sys
- import os
- import os.path
- import argparse
- import io
- import re
- class SetupError(Exception):
- """Functions in this script can raise this error which will cause the
- application to abort displaying the provided error message, but
- without a stack trace.
- """
- pass
- progname = os.path.basename(sys.argv[0])
- def fail_if_exists(filename):
- if os.path.exists(filename):
- raise SetupError("%s already exists" % (filename))
- def common_copy_templates(proto, pairs, replacements=()):
- upper = proto.upper()
- lower = proto.lower()
- for (src, dst) in pairs:
- fail_if_exists(dst)
- for (src, dst) in pairs:
- dstdir = os.path.dirname(dst)
- if not os.path.exists(dstdir):
- print("Creating directory %s." % (dstdir))
- os.makedirs(dstdir)
- print("Generating %s." % (dst))
- output = open(dst, "w")
- with open(src) as template_in:
- skip = False
- for line in template_in:
- if line.find("TEMPLATE_START_REMOVE") > -1:
- skip = True
- continue
- elif line.find("TEMPLATE_END_REMOVE") > -1:
- skip = False
- continue
- if skip:
- continue
- for (old, new) in replacements:
- line = line.replace(old, new)
- line = re.sub("TEMPLATE(_RUST)?", upper, line)
- line = re.sub("template(-rust)?", lower, line)
- line = re.sub("Template(Rust)?", proto, line)
- output.write(line)
- output.close()
- def copy_app_layer_templates(proto, rust):
- lower = proto.lower()
- upper = proto.upper()
- if rust:
- pairs = (
- ("src/app-layer-template-rust.c",
- "src/app-layer-%s.c" % (lower)),
- ("src/app-layer-template-rust.h",
- "src/app-layer-%s.h" % (lower)),
- ("rust/src/applayertemplate/mod.rs",
- "rust/src/applayer%s/mod.rs" % (lower)),
- ("rust/src/applayertemplate/template.rs",
- "rust/src/applayer%s/%s.rs" % (lower, lower)),
- ("rust/src/applayertemplate/parser.rs",
- "rust/src/applayer%s/parser.rs" % (lower)),
- )
- else:
- pairs = (
- ("src/app-layer-template.c",
- "src/app-layer-%s.c" % (lower)),
- ("src/app-layer-template.h",
- "src/app-layer-%s.h" % (lower)),
- )
- common_copy_templates(proto, pairs)
- def patch_makefile_am(protoname):
- print("Patching src/Makefile.am.")
- output = io.StringIO()
- with open("src/Makefile.am") as infile:
- for line in infile:
- if line.lstrip().startswith("app-layer-template."):
- output.write(line.replace("template", protoname.lower()))
- output.write(line)
- open("src/Makefile.am", "w").write(output.getvalue())
- def patch_rust_lib_rs(protoname):
- filename = "rust/src/lib.rs"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.startswith("pub mod applayertemplate;"):
- output.write(line.replace("template", protoname.lower()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_rust_applayer_mod_rs(protoname):
- lower = protoname.lower()
- filename = "rust/src/applayer%s/mod.rs" % (lower)
- print("Patching %s." % (filename))
- output = io.StringIO()
- done = False
- with open(filename) as infile:
- for line in infile:
- if not done and line.find("mod parser") > -1:
- output.write("pub mod logger;\n")
- done = True
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_app_layer_protos_h(protoname):
- filename = "src/app-layer-protos.h"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.find("ALPROTO_TEMPLATE,") > -1:
- output.write(line.replace("TEMPLATE", protoname.upper()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_app_layer_protos_c(protoname):
- filename = "src/app-layer-protos.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- # Read in all the lines as we'll be doing some multi-line
- # duplications.
- inlines = open(filename).readlines()
- for i, line in enumerate(inlines):
- if line.find("case ALPROTO_TEMPLATE:") > -1:
- # Duplicate the section starting an this line and
- # including the following 2 lines.
- for j in range(i, i + 3):
- temp = inlines[j]
- temp = temp.replace("TEMPLATE", protoname.upper())
- temp = temp.replace("template", protoname.lower())
- output.write(temp)
- if line.find("return ALPROTO_TEMPLATE;") > -1:
- output.write(
- line.replace("TEMPLATE", protoname.upper()).replace(
- "template", protoname.lower()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_app_layer_detect_proto_c(proto):
- filename = "src/app-layer-detect-proto.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- inlines = open(filename).readlines()
- for i, line in enumerate(inlines):
- if line.find("== ALPROTO_TEMPLATE)") > -1:
- output.write(inlines[i].replace("TEMPLATE", proto.upper()))
- output.write(inlines[i+1].replace("TEMPLATE", proto.upper()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_app_layer_parser_c(proto):
- filename = "src/app-layer-parser.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- inlines = open(filename).readlines()
- for line in inlines:
- if line.find("app-layer-template.h") > -1:
- output.write(line.replace("template", proto.lower()))
- if line.find("RegisterTemplateParsers()") > -1:
- output.write(line.replace("Template", proto))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def patch_suricata_yaml_in(proto):
- filename = "suricata.yaml.in"
- print("Patching %s." % (filename))
- output = io.StringIO()
- inlines = open(filename).readlines()
- for i, line in enumerate(inlines):
- if line.find("protocols:") > -1:
- if inlines[i-1].find("app-layer:") > -1:
- output.write(line)
- output.write(""" %s:
- enabled: yes
- """ % (proto.lower()))
- # Skip writing out the current line, already done.
- continue
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def logger_patch_suricata_yaml_in(proto):
- filename = "suricata.yaml.in"
- print("Patching %s." % (filename))
- output = io.StringIO()
- inlines = open(filename).readlines()
- # This is a bit tricky. We want to find the first occurrence of
- # "types:" after "eve-log:".
- n = 0
- for i, line in enumerate(inlines):
- if n == 0 and line.find("eve-log:") > -1:
- n += 1
- if n == 1 and line.find("types:") > -1:
- output.write(line)
- output.write(" - %s\n" % (proto.lower()))
- n += 1
- continue
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def logger_patch_suricata_common_h(proto):
- filename = "src/suricata-common.h"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.find("LOGGER_JSON_TEMPLATE,") > -1:
- output.write(line.replace("TEMPLATE", proto.upper()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def logger_patch_output_c(proto):
- filename = "src/output.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- inlines = open(filename).readlines()
- for i, line in enumerate(inlines):
- if line.find("output-json-template.h") > -1:
- output.write(line.replace("template", proto.lower()))
- if line.find("/* Template JSON logger.") > -1:
- output.write(inlines[i].replace("Template", proto))
- output.write(inlines[i+1].replace("Template", proto))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def logger_copy_templates(proto, rust):
- lower = proto.lower()
- if rust:
- pairs = (
- ("src/output-json-template-rust.h",
- "src/output-json-%s.h" % (lower)),
- ("src/output-json-template-rust.c",
- "src/output-json-%s.c" % (lower)),
- ("rust/src/applayertemplate/logger.rs",
- "rust/src/applayer%s/logger.rs" % (lower)),
- )
- else:
- pairs = (
- ("src/output-json-template.h",
- "src/output-json-%s.h" % (lower)),
- ("src/output-json-template.c",
- "src/output-json-%s.c" % (lower)),
- )
- common_copy_templates(proto, pairs)
- def logger_patch_makefile_am(protoname):
- filename = "src/Makefile.am"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.lstrip().startswith("output-json-template."):
- output.write(line.replace("template", protoname.lower()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def logger_patch_util_profiling_c(proto):
- filename = "src/util-profiling.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.find("(LOGGER_JSON_TEMPLATE);") > -1:
- output.write(line.replace("TEMPLATE", proto.upper()))
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def detect_copy_templates(proto, buffername, rust):
- lower = proto.lower()
- buffername_lower = buffername.lower()
- if rust:
- pairs = (
- ("src/detect-template-rust-buffer.h",
- "src/detect-%s-%s.h" % (lower, buffername_lower)),
- ("src/detect-template-rust-buffer.c",
- "src/detect-%s-%s.c" % (lower, buffername_lower)),
- )
- replacements = (
- ("TEMPLATE_RUST_BUFFER", "%s_%s" % (
- proto.upper(), buffername.upper())),
- ("template-rust-buffer", "%s-%s" % (
- proto.lower(), buffername.lower())),
- ("template_rust_buffer", "%s_%s" % (
- proto.lower(), buffername.lower())),
- ("TemplateRustBuffer", "%s%s" % (proto, buffername)),
- )
- else:
- pairs = (
- ("src/detect-template-buffer.h",
- "src/detect-%s-%s.h" % (lower, buffername_lower)),
- ("src/detect-template-buffer.c",
- "src/detect-%s-%s.c" % (lower, buffername_lower)),
- ("src/tests/detect-template-buffer.c",
- "src/tests/detect-%s-%s.c" % (lower, buffername_lower)),
- )
- replacements = (
- ("TEMPLATE_BUFFER", "%s_%s" % (proto.upper(), buffername.upper())),
- ("template-buffer", "%s-%s" % (proto.lower(), buffername.lower())),
- ("template_buffer", "%s_%s" % (proto.lower(), buffername.lower())),
- ("TemplateBuffer", "%s%s" % (proto, buffername)),
- )
- common_copy_templates(proto, pairs, replacements)
- def detect_patch_makefile_am(protoname, buffername):
- filename = "src/Makefile.am"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.lstrip().startswith("detect-template-buffer."):
- new = line.replace("template-buffer", "%s-%s" % (
- protoname.lower(), buffername.lower()))
- output.write(new)
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def detect_patch_detect_engine_register_c(protoname, buffername):
- filename = "src/detect-engine-register.c"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.find("detect-template-buffer.h") > -1:
- new = line.replace("template-buffer", "%s-%s" % (
- protoname.lower(), buffername.lower()))
- output.write(new)
- if line.find("DetectTemplateBufferRegister") > -1:
- new = line.replace("TemplateBuffer", "%s%s" % (
- protoname, buffername))
- output.write(new)
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def detect_patch_detect_engine_register_h(protoname, buffername):
- filename = "src/detect-engine-register.h"
- print("Patching %s." % (filename))
- output = io.StringIO()
- with open(filename) as infile:
- for line in infile:
- if line.find("DETECT_AL_TEMPLATE_BUFFER") > -1:
- new = line.replace("TEMPLATE_BUFFER", "%s_%s" % (
- protoname.upper(), buffername.upper()))
- output.write(new)
- output.write(line)
- open(filename, "w").write(output.getvalue())
- def proto_exists(proto):
- upper = proto.upper()
- for line in open("src/app-layer-protos.h"):
- if line.find("ALPROTO_%s," % (upper)) > -1:
- return True
- return False
- epilog = """
- This script will provision a new app-layer parser for the protocol
- name specified on the command line. This is done by copying and
- patching src/app-layer-template.[ch] then linking the new files into
- the build system.
- By default both the parser and logger will be generated. To generate
- just one or the other use the --parser or --logger command line flags.
- Examples:
- %(progname)s --logger DNP3
- %(progname)s --parser Gopher
- This script can also setup a detect buffer. This is a separate
- operation that must be done after creating the parser.
- Examples:
- %(progname)s --detect Gopher Request
- """ % { "progname": progname, }
- def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=epilog)
- parser.add_argument("--rust", action="store_true", default=False,
- help="Setup Rust protocol template.")
- parser.add_argument("--logger", action="store_true", default=False,
- help="Generate logger.")
- parser.add_argument("--parser", action="store_true", default=False,
- help="Generate parser.")
- parser.add_argument("--detect", action="store_true", default=False,
- help="Generate detect module.")
- parser.add_argument("proto", help="Name of protocol")
- parser.add_argument("buffer", help="Name of buffer (for --detect)",
- nargs="?")
- args = parser.parse_args()
- proto = args.proto
- # The protocol name must start with an upper case letter.
- if proto[0] != proto.upper()[0]:
- raise SetupError("protocol name must begin with an upper case letter")
- # Determine what to generate.
- parser = False
- logger = False
- detect = False
- # If no --parser or no --logger, generate both.
- if not args.parser and not args.logger and not args.detect:
- parser = True
- logger = True
- else:
- parser = args.parser
- logger = args.logger
- detect = args.detect
- if detect:
- if args.buffer is None:
- raise SetupError("--detect requires a buffer name")
- # Make sure we are in the correct directory.
- if os.path.exists("./suricata.c"):
- os.chdir("..")
- elif not os.path.exists("./src/suricata.c"):
- raise SetupError(
- "this does not appear to be a Suricata source directory.")
- if parser:
- if proto_exists(proto):
- raise SetupError("protocol already exists: %s" % (proto))
- copy_app_layer_templates(proto, args.rust)
- if args.rust:
- patch_rust_lib_rs(proto)
- patch_makefile_am(proto)
- patch_app_layer_protos_h(proto)
- patch_app_layer_protos_c(proto)
- patch_app_layer_detect_proto_c(proto)
- patch_app_layer_parser_c(proto)
- patch_suricata_yaml_in(proto)
- if logger:
- if not proto_exists(proto):
- raise SetupError("no app-layer parser exists for %s" % (proto))
- logger_copy_templates(proto, args.rust)
- if args.rust:
- patch_rust_applayer_mod_rs(proto)
- logger_patch_makefile_am(proto)
- logger_patch_suricata_common_h(proto)
- logger_patch_output_c(proto)
- logger_patch_suricata_yaml_in(proto)
- logger_patch_util_profiling_c(proto)
- if detect:
- if not proto_exists(proto):
- raise SetupError("no app-layer parser exists for %s" % (proto))
- detect_copy_templates(proto, args.buffer, args.rust)
- detect_patch_makefile_am(proto, args.buffer)
- detect_patch_detect_engine_register_c(proto, args.buffer)
- detect_patch_detect_engine_register_h(proto, args.buffer)
- if parser:
- if args.rust:
- print("""
- An application detector and parser for the protocol %(proto)s have
- now been setup in the files:
- rust/src/applayer%(proto_lower)s/mod.rs
- rust/src/applayer%(proto_lower)s/parser.rs""" % {
- "proto": proto,
- "proto_lower": proto.lower(),
- })
- else:
- print("""
- An application detector and parser for the protocol %(proto)s have
- now been setup in the files:
- src/app-layer-%(proto_lower)s.h
- src/app-layer-%(proto_lower)s.c""" % {
- "proto": proto,
- "proto_lower": proto.lower(),
- })
- if logger:
- if args.rust:
- print("""
- A JSON application layer transaction logger for the protocol
- %(proto)s has now been set in the file:
- rust/src/applayer%(proto_lower)s/logger.rs""" % {
- "proto": proto,
- "proto_lower": proto.lower(),
- })
- else:
- print("""
- A JSON application layer transaction logger for the protocol
- %(proto)s has now been set in the files:
- src/output-json-%(proto_lower)s.h
- src/output-json-%(proto_lower)s.c""" % {
- "proto": proto,
- "proto_lower": proto.lower(),
- })
- if detect:
- print("""
- The following files have been created and linked into the build:
- detect-%(protoname_lower)s-%(buffername_lower)s.h
- detect-%(protoname_lower)s-%(buffername_lower)s.c
- """ % {
- "protoname_lower": proto.lower(),
- "buffername_lower": args.buffer.lower(),
- })
- if parser or logger:
- print("""
- Suricata should now build cleanly. Try running "./configure" and "make".
- """)
- if __name__ == "__main__":
- try:
- sys.exit(main())
- except SetupError as err:
- print("error: %s" % (err))
- sys.exit(1)
|