December 12, 2023

#product, #engineering

How to Use Axiom with OPNsense logs

Blog Screenshot
Landon Lewis

Field CTO

Axiom is a great destination for volumetric data sources for security use cases. We commonly see network traffic logs from firewalls, routers, switches, and wireless access points, and also endpoint telemetry logs like Sysmon, Sysdig, Falcon Data Replicator — they generate high volumes of data. In many cases customers make choices or play “event gymnastics” to move data between storage tiers, or archive data beyond 3-7 days, depending on their budget and the criticality or incident suspicions.

OPNsense is one of many popular open-source firewalls forked from the popular PFsense. It provides various event data through the syslog facility. The PF logs (filterlog) message field (Figure 1) is CSV text. The DNS logs that I chose from Unbound (Figure 2) are delimited with a space.

Figure 1 - PF Filterlog CSV IPv4/UDP Message Field
[59557:2] reply: <private_ip> AAAA IN NOERROR 0.014909 0 72
Figure 2 - Unbound DNS Server Message Field

One of the cool things with Axiom is that you can ship this data with fields or without. You can leave it up to query-time execution operated by lambdas to assign the fields later. I have an upcoming blog where I will share how I do this with a log shipper, but it’s purely preference. The amount of work is the same. Applied ahead of time it provides the benefit of simplified querying, or saving the query, values or both as virtual fields and relying on the query engine. Perhaps you have ingested event data that has been archived for compliance and retention purposes that you need to review for incident response or forensic purposes. This is a great way to extract that information and gain value without previously defining the logic in your log shipper or pipeline. I love the flexibility, since sometimes we get to delay the analysis until we need to do it — who doesn’t love procrastinating when your plate is full?

Here is what the query in Axiom’s APL looks like for messages similar to Figure 1, and for Figure 2:

APL Parsing Filterlog IPv4 TCP/UDP Events
| where application contains_cs "filterlog"
| extend csvdata = parse_csv(message)
// IPv4 and TCP/UDP
| where toint(csvdata[8]) == 4
| where (toint(csvdata[15]) == 6 or toint(csvdata[15]) == 17)
| project rulenr = csvdata[0], subrulenr = csvdata[1], anchorname = csvdata[2], label = csvdata[3], interface = csvdata[4], reason = csvdata[5], action = csvdata[6], dir = csvdata[7], ipversion = csvdata[8], tos = csvdata[9], ecn = csvdata[10], ttl = csvdata[11], id = csvdata[12], offset = csvdata[13], flagz = csvdata[14], protonum = csvdata[15], protoname = csvdata[16], length = csvdata[17], src = csvdata[18], dst = csvdata[19], srcport = csvdata[20], dstport = csvdata[21], datalen = csvdata[22], flagzz = csvdata[23], seq = csvdata[24], ack = csvdata[25], window = csvdata[25], urg = csvdata[25], options = csvdata[25]
| project interface, reason, action, dir, src, dst, protoname, dstport
| limit 10
APL Query for the Above

// Parse unbound logs from syslog messages
// Example line to parse: 	[82014:3] query: <private_ip> PTR IN
| where application == "unbound"
| where message contains_cs "query:"
| extend logFields = split(message, ' ')
| extend domain = tostring(logFields[array_length(logFields)-3]) // assumes the log message position
| extend src_ip = tostring(logFields[array_length(logFields)-4]) // assumes the log message position
| extend type = tostring(logFields[array_length(logFields)-2]) // assumes the log message position
| project _time, src_ip, domain
| where src_ip contains "<private_subnet>"
| summarize count() by bin_auto(_time), src_ip, domain
APL Query for the Above

Questions? Ideas? Talk to us at

Get started with Axiom

Learn how to start ingesting, streaming, and
querying data into Axiom in less than 10 minutes.