This page is a Quick Reference containing a punch list of run anywhere (copy/paste) Splunk searches to help explore the data in the OpenMethods Splunk events. The use of some advanced techniques is intentional and the foundational explanation for the techniques will be covered in other articles.
Conventions
In order to conserve space on this page with respect to writing queries and query results,
- It is a best practice, when readability matters, to author Splunk queries per above with the pipe symbol as the first character of each new line. Instead, the queries will be compacted.
- Search command code blocks are in gray, search results are in light blue.
- Sample results may not be included, or will be included as screenshots or converted to JSON and shown as a code block like this below (same results set as in 1.1 below).
[{"index":"1905405642","countofevents":"main"},{"index":"23781423","countofevents":"\"_audit\""},{"index":"10004574","countofevents":"\"_internal\""},{"index":"1755528","countofevents":"\"_introspection\""},{"index":"85111","countofevents":"summary"},{"index":"24343","countofevents":"\"_telemetry\""},{"index":"4","countofevents":"lastchanceindex"},{"index":"0","countofevents":"\"_thefishbucket\""},{"index":"0","countofevents":"demomatricindex"},{"index":"0","countofevents":"history"},{"index":"0","countofevents":"netaddinsimport"},{"index":"0","countofevents":"popflowscriptindex"},{"index":"0","countofevents":"tunnelmetricsindex"}] |
Section 1: Foundations
1. Basic Internal Structure of Splunk
In this section, we’ll glance at Splunk’s own internal structure and how it manages indexes, sources, storage, event sizes, and types, and we’ll spot check the indexes _introspection and _internal.
1.1. What are the Splunk indexes where OM data is stored (even if there is currently zero data in the index)?
| eventcount summarize=false index=* index=_*
| dedup index
| rename count as countofevents
| fields index countofevents
| sort countofevents DESC | |
index=”main”: default index where all OM data livesindex=”demomatricsindex” (or ‘netaddinsimport’, ‘popflowscriptindex’, ‘tunnelmetricsindex’): indexes created by OM for targeted research/projectsall the remaining indexes or Splunk internal, more queries will be built up in this section as time permits, but for now focus is to shift to OM specific data. |
2. Basic OpenMethods Topology
In this section, we’ll discover broadly where/how to look for events that give an overall view of how our software is deployed and being used.
2.0. I am not sure what I am looking for, how do I just explore Splunk data?
index="main" earliest="-30m" source="mediabar"Per the , after running this query the data can be explored by looking at search app results and exploring fields in the Fields sidebar.
2.1. List of customers and their CTI environment/location of agents (or at least URL agents use to access HIS)?
index="main" earliest="-4h"
| stats values(network.his.uri) as hisurl, values(network.his.model) as hiscti by crm.customerResults (JSON + screenshot): (blank HIS URL would imply the site hasn’t been used in the given time window or it is Popflow-only)
[{"crm.customer":"\"Openmethod_Kalpesh\"","hiscti":"","hisurl":[]},{"crm.customer":"arval","hiscti":"\"RNA-I3\"","hisurl":["\"https://harmony_his_p.intra.corp:443","https://harmony_his_s.intra.corp:443\""]},{"crm.customer":"ascena","hiscti":"","hisurl":[]},{"crm.customer":"chewy","hiscti":"\"RNA-CiscoUCCE\"","hisurl":["\"https://chewy-fll2this1.openmethodscloud.com:8443","https://chewy-fll2this2.openmethodscloud.com:8443","https://chewy-fll2this3.openmethodscloud.com:8443","https://chewy-fll2this4.openmethodscloud.com:8443","https://chewy-iad1this1.openmethodscloud.com:8443","https://chewy-iad1this2.openmethodscloud.com:8443","https://chewy-iad1this3.openmethodscloud.com:8443","https://chewy-iad1this4.openmethodscloud.com:8443\""]},{"crm.customer":"\"chewy-6x\"","hiscti":"","hisurl":[]},{"crm.customer":"\"chewy-test\"","hiscti":"\"RNA-CiscoUCCE\"","hisurl":["\"https://chewy-fll2toml.openmethodscloud.com:8443\""]},{"crm.customer":"helenoftroy","hiscti":"\"RNA-CiscoUCCX\"","hisurl":["\"https://helen-aio1.openmethodscloud.com:8443\""]},{"crm.customer":"\"helenoftroy-tst\"","hiscti":"","hisurl":[]},{"crm.customer":"\"johnson_controls\"","hiscti":"\"RNA-Avaya\"","hisurl":["\"https://JCI-HIS-1.openmethodscloud.com:8443\""]},{"crm.customer":"kehrigdrpepper","hiscti":"","hisurl":[]},{"crm.customer":"omdemo","hiscti":"","hisurl":[]},{"crm.customer":"\"rockwell_automation\"","hiscti":"\"RNA-CiscoUCCE\"","hisurl":["\"https://azrnapwapp72f0e.openmethodscloud.com:8443\""]}] |
2.1.a. What are agent states for UCCE and their stats?
index="main" earliest="-2h" source="mediabar" "network.his.model"=RNA-CiscoUCCE
|top mb.agent.state by network.his.model | fields - network.his.model | mb.agent.state count percent
Handling interaction 500954 68.15%
Unavailable 138329 18.82%
Available 54388 7.40%
Mixed states 17354 2.36%
Wrap up 13753 1.87%
Connected to Harmony 10251 1.39% |
2.2. List of customers and their sites/versions?
| tstats count WHERE (index="main" earliest=-5d@d latest=now source="mediabar") BY crm.customer mb.version network.appManagerDomain network.crmHost
| sort mb.version DESC | dedup network.crmHost
| table crm.customer mb.version network.crmHost | sort crm.customer2.3. How to segment agent usage by production versus lower environments?
This is also the simplest form of unique agent logins by host (customer URL).
| tstats distinct_count(crm.username) as agents_dc_per_h WHERE (index="main" earliest="-1h@h"
[|inputlookup spl-customer-host.csv | where cloudenv="prod" | fields displaycustomer hostlookup
| lookup spl-customer-host.csv displaycustomer cloudenv OUTPUT hostlookup
| fields - displaycustomer | rename hostlookup as host | format]) BY _time host span=1h2.3.a. The Splunk ‘lookup’ data structure that made the above query possible:
| inputlookup spl-customer-host.csv
| WHERE NOT (displaycustomer in ("omdemo","omdev", "omqa", "omtrain"))
| dedup displaycustomer
| lookup spl-customer-host.csv displaycustomer OUTPUT crmcustomer cloudenv hostlookup2.4. How to convert Splunk events to look like the regular HIS/CS log statements I am used too?
| tstats count WHERE (index="main" earliest="8/1/2020:06:00:00" latest="8/1/2020:06:30:00" source="mediabar" host="https://chewy.custhelp.com" )
BY _time logLevel crm.instanceId crm.groupId crm.id mb.className mb.functionName message span=1s2.4.a. Simplify Log-Style Statements to Fewer Fields
Or to simplify log-style statements above down to a few meaningful fields and one agent (but for this case let’s say we don’t know which agent so we are using the ‘top’ agent). If we know the agent id, the sub-search (starts with left bracket '[') can be removed. In reality, a sub-search will usually be a performance hit and can be avoided by restructuring almost any search.
| tstats count WHERE (index="main" earliest="-24h" host="https://faq.arval.it"
[ | tstats count WHERE (index="main" earliest="-24h" host="https://faq.arval.it") BY crm.id | top limit=1 crm.id| rename count as c | rename percent as p | fields - c p | format]
) BY _time logLevel crm.id mb.className mb.functionName message span=1s
| eval class='mb.className' . "-" . 'mb.functionName' | search class="*" | table _time logLevel crm.id class message2.5. How to identify, at a high level, the major components in use by the customer?
index="main" earliest="8/3/2020:06:00:00" latest="8/3/2020:06:30:00" source="mediabar"
| eval crmcust='crm.customer' | eval agent='crm.id' | eval class='mb.className' . "-" . 'mb.functionName' | search crmcust="*" agent="*" class="*"
| stats values(class) as lc, count(class) as cc by crmcust, agent | where ((crmcust="veritas" AND cc > 1850) OR (crmcust="chewy" AND cc > 400) OR (crmcust="arval" AND cc > 1200))Currently, the majority of searches are centered around component names, ‘mb.className' and 'mb.functionName’, and string matching.For example, at a quick glance simply of a component, it can be easily determined if an agent is getting screen pops from Harmony or another way. |
2.5.a. Component names by version
Logging design is still undergoing changes, so the component names can vary by version.
| tstats count WHERE (index="main" earliest="8/17/2020:08:00:00" latest="8/18/2020:08:00:00" source="mediabar") BY mb.version mb.className mb.functionName
| eval major=mvindex(split('mb.version', "."), 0) | eval class='mb.className' . "-" . 'mb.functionName' | search class="*"
| stats values(class) as lc, count(class) as cc by major3. Popflow Events
3.1. How to identify customer/agent using Popflow and how they are using it, aka Popflow Overview?
| tstats count WHERE (index="main" earliest="8/17/2020:08:00:00" latest="8/18/2020:00:00:00" "mb.className"=PopflowRuntimeService ((message="*Event '*' *ed") OR (message="*Activity complete*") OR (message="*Starting Activity*") OR (message="*Activity event*") OR (message="*Got*popflow*" AND message!="*Got* 0*") OR (message="*Getting*"))) BY _time logLevel crm.customer crm.instanceId crm.groupId crm.id mb.className mb.functionName message span=1s
| rex field=message "^(?<mytitle>[^{\n]*)(?P<myjson>{.*})"
| eval jsonctx=substr(myjson, 1, 40), msgctx=substr(message, 1, 40)
| eval class='mb.className' . "-" . 'mb.functionName', crmgroup='crm.instanceId' . "-" . 'crm.groupId'
| search crmgroup="*" class="*"
| table _time logLevel crm.customer crmgroup crm.id class mytitle jsonctx msgctxExplanation:
a) Why the use of: ‘| search crmgroup="" class=”*”’ clause and all the string matching?
i) As described previously on this page, we are still dependent on string matching and class names. Writing fixed data points or metrics will be a better interface.
ii) The field ‘msgctx’ is present for context and would be used in the case where we are not filtering out ‘mb.className’. You see we are trying to populate ‘mytitle’ and ‘jsonctx’ fields and in the case they are blank might mean there is a message that I am not expecting so the parsing isn’t working on it. Finally, collapsing 2 fields down to 1 is simply for saving space so I can still see the ‘message’ field without scrolling.
b) One of the most important statements in this query is the use of regular expressions (pattern matching):
| rex field=message "^(?<mytitle>[^{\n]*)(?P<myjson>{.*})"
there is a page dedicated to tools for pattern match and JSON manipulation for Splunk, keep checking back for updates.
From here, we are going to keep building upon the Popflow Overview, extract some new information, until we have a fully populated breakdown of the events.
There are workflows authored to act off events and "event detected" messages, which can have a corresponding action to fetch a workflow as "getting popflow for eventId" messages, followed by a "got popflow" message which loads workflow and starts to run activities of different types and tracks "starting activity" and "activity complete" messages.
3.2. What Popflow Events are Being Triggered and are the Most Frequent?
index="main" earliest="8/13/2020:08:00:00" latest="8/15/2020:02:32:54" host="https://lanebryant.custhelp.com" mb.className="PopflowRuntimeService" (message="*Event '*' *ed")
| eval evname=mvindex(split(message, "'"), 1)
| rex field=message "^(?<mytitle>[^{\n]*)(?P<myjson>{.*})"
| eval contextyn=if(isnotnull(myjson), 1, 0)
| table _time host crm.id evname | stats count(evname) as cntevname by evname | sort cntevname DESC | evname cntevname
On Answer 1187
Open Create Incident 620
Check Order/Billing 608
Show Open Incidents 607
Populate Custom Object 578
Wismo 140 |
3.3. What Popflow Activities are Being Run?
In overall product usage tracking, I like to track workflows being run and the number of instructions (aka Activities) as an overall indicator of scale and volume. But let’s start with an Activity overview in a log-format style.
index="main" earliest="8/14/2020:08:00:00" latest="8/16/2020:00:00:00" "mb.className"=PopflowRuntimeService host="https://lanebryant.custhelp.com" ((message="*Starting Activity*"))
| rex field=message "^(?<mytitle>[^{\n]*)(?P<myjson>{.*})"
| eval jsonctx = if(myjson!="null", substr('myjson', 1, 60), substr('custom.formData.content', 1, 60)), newmsg=if(isnotnull(mytitle), 'mytitle', 'message'), activityname=mvindex(split(message, "'"), 1)
| rex field=newmsg "(([[](INFO|DEBUG|TRACE)[]][[:blank:]])?)(?<msghdr>[^\n]*)"
| table _time logLevel crm.customer crm.id activityname msghdr jsonctx |
From Fields Panel, click on ‘custom.displayName’ for Top 10 Values |
3.4. Start Normalizing the Data, Put Events, Popflow Scripts, and Activities All Together in Context
index="main" earliest="8/17/2020:08:00:00" latest="8/18/2020:00:00:00" "mb.className"=PopflowRuntimeService crm.customer="helenoftroy" ((message="Got 1 popflow(s) from server") OR (message="[*] Got 1 popflow(s) from server") OR (message="Got 1 popflow(s) from cache") OR (message="Getting popflow*") OR (message="[*] Getting popflow*") OR (message="Event '*' detected") OR (message="[*] Event '*' detected") OR (message="Activity complete*") OR (message="[*] Activity complete*") OR (message="Starting Activity*") OR (message="[*] Starting Activity*") OR (message="*Activity event*") )
| eval const_actstart_pattern="\bStarting Activity\b", const_actcompl_pattern="\bActivity complete\b", enum_eventtype_activity=1
| rex field=message "(([[](INFO|DEBUG|ERROR|EXCEPTION|TRACE|WARN)[]][[:blank:]])?)(?<msghdr>[^{\n]*)((?P<myjson>{.*})?)"
| eval jsonctx=substr(myjson, 1, 80), msgctx=substr(message, 1, 80), s1=mvindex(split(msghdr, "'"), 1)
| eval s1=if(isnull(s1) AND 'mb.className'=="PopflowRuntimeService", 'msghdr','s1')
| eval evttype=case('mb.className' == "PopflowRuntimeService" AND match(msghdr, 'const_actstart_pattern'), 'enum_eventtype_activity', 'mb.className' == "PopflowRuntimeService" AND match(msghdr, 'const_actcompl_pattern'), 'enum_eventtype_activity')
| eval pfactvid=case('evttype' == 'enum_eventtype_activity' and isnotnull(myjson), spath(myjson,"typeId")), formdatactx=case('evttype' == 'enum_eventtype_activity' and isnotnull(myjson), spath(myjson,"formData"))
| rex field=message "((Getting[[:blank:]]popflow[[:blank:]]from[[:blank:]]server([.]{3})[[:blank:]]eventId:[[:blank:]]){1})(?<pfevid>[^\n][0-9]*)"
| table _time crm.customer crm.id evttype s1 pfevid pfevname pfactvid pfactvname formdatactx msgtype msghdr msgctx jsonctx
| lookup pfactivitytype.csv activityevent as pfactvid OUTPUT activityname as pfactvname
| lookup pfeventtypesCSV.csv pfeventid as pfevid OUTPUT pfeventname as pfevname
| table _time crm.customer crm.id evttype s1 pfevid pfevname pfactvid pfactvname formdatactx msgtype msghdr msgctx jsonctx
| eval msgtype=case(match(msgctx, "\bGetting popflow from server\b"), "Getting popflow from server", match(msgctx, "\bEvent '.*' detected\b"), "Event detected", match(msgctx, "\bStarting Activity\b"), "Starting Activity", match(msgctx, "\bActivity complete\b"), "Activity complete", match(msgctx, "\bGot 1 popflow\(s\) from server\b"), "Got 1 from server", match(msgctx, "\bGot 1 popflow\(s\) from cache\b"), "Got 1 from cache", match(msgctx, "\bActivity event\b"), "Activity event")
| eval s1=if(isnull(s1), 'msgtype', 's1')
| table _time logLevel crm.customer crm.id msgtype evttype s1 pfevid pfevname pfactvid pfactvname formdatactx msgtype msghdr msgctx jsonctx |
What did we add over the previous queries? a) 2 or 3 ‘rex’ commands were all handled now in one ‘rex’ command. b) we extracted ‘eventId’ by string parsing of the ‘message’ field and extracted ‘typeid’ (activity type id) from JSON and then used a lookup table to translate them to friendly names. c) multiple ‘eval' commands got moved to a single pipe as there is overhead for each pipe d) there is no single normalized field which is common to all event types (which makes it difficult to manipulate and combine the data later) so we added ‘msgtype’ e) the search patterns on the ‘message’ field in the very first segment of the search, when Splunk finds a match in a pipe it stops processing the rest so I made search patterns more explicit and ordered them by frequency of occurrences so there is a higher chance Splunk will find a match and do less processing. note: the technique for finding frequency of occurrences of the ‘message’ field was the same as we’ve used on this page, which goes something like … '<your search> | stats count(msghdr) as cntmsghdr by msghdr' | sort cntmsghdr DESC |
4. Omis Events
4.1. How to identify customer/agent using HIS/Harmony stack and how are they using it, aka Omis Overview?
| tstats count WHERE (index="main" earliest="8/17/2020:08:00:00" latest="8/17/2020:20:00:00" "mb.className"=OmisService) BY _time logLevel crm.customer crm.id message span=1s
| rex field=message "(([[](INFO|DEBUG|ERROR|EXCEPTION|TRACE|WARN)[]][[:blank:]])?)(?<msghdr>[^{\n]*)((?P<myjson>{.*})?)"
| eval mt=spath(myjson, "messageType"), ev=spath(myjson, "event"), rq=spath(myjson,"request"), rs=spath(myjson,"response")
| search mt="*" OR ev="*" OR rq="*" OR rs="*" OR crm.id="*"
| eval rr=coalesce(rq, rs), rrctx=substr(rr,1,60), evctx=substr(ev,1,60), jsonctx=substr(myjson,1,60), msgctx=substr(message,1,60)
| eval context=coalesce(evctx, rrctx, jsonctx, msgctx)
| fields - rq rs rrctx evctx jsonctx msgctx
| table _time logLevel crm.customer crm.id mt msghdr context |
4.2. What are all the possible Omis message types and how do I work with them?
4.3. How do I check if there are any Omis message types I don't know about?
Previously on this page, it was stated that if there is a long evaluation or conditional command (for example string match), Splunk would grab the first match and stop processing. Thus, it would reduce processing and improve performance in theory if the search matches are ordered in the frequency of occurrence.
While leveraging that concept, there wasn’t an immediate obvious performance impact but the side effect was a search command which verifies that your query is structured so that it processes every message type and if one is not known certain fields would be null. You could use a similar concept to uniquely identify every Omis ERROR across every CTI platform and customer, well possibly.
| tstats count WHERE (index="main"earliest="8/14/2020:08:00:00" latest="8/16/2020:00:00:00" "mb.className"=OmisService ((message="Bind resource processor response{*") OR (message="[*] Bind resource processor response{*") OR (message="Bind processor serivce response:{*") OR (message="[*] Bind processor serivce response:{*") OR (message="Bind resource workflow response: {*") OR (message="[*] Bind resource workflow response: {*") OR (message="MediaBar Connected to the HIS server {*") OR (message="[*] MediaBar Connected to the HIS server {*") OR (message="***json:-{*") OR (message="JSON Response:- {*") OR (message="Start Session Complete:{*") OR (message="[*] Start Session Complete:{*") OR (message="json:-{*") OR (message="[*] json:-{*") OR (message="[*] 'Get processor service complete' for HIS:{*") OR
(message="Get processor service complete:{*") OR (message="Processing request: {*") OR
(message="[*] Processing request: {*") OR (message="Session start response: {*") OR (message="[*] Session start response: {*") OR (message="SessionLogout {*") OR (message="[*] SessionLogout {*") OR (message="[*] {*") OR (message="Available for suggest HIS server{*") OR (message="[*] Available for suggest HIS server{*") OR
(message="endSession for available HIS server {*") OR (message="[*] endSession for available HIS server {*") OR (message="get the response for External Disposition {*") OR
(message="[*] get the response for External Disposition {*") OR (message="get the response for External Logout Reasons {*") OR (message="[*] get the response for External Logout Reasons {*") OR (message="get the response for External Not Ready Reasons {*") OR
(message="[*] get the response for External Not Ready Reasons {*") OR (message="MediaBar Connected to the HIS server {*") OR (message="[*] MediaBar Connected to the HIS server {*") OR (message="Response from \"*\" json:-{*") OR (message="[*] Response from \"*\" json:-{*") ) ) BY _time host crm.id message
| eval sfmt="^(?<msghdr>([[](INFO|DEBUG|ERROR|EXCEPTION|TRACE|WARN)[]][[:blank:]]){%s})((%s)[^{\n]*({))"
| eval sfmt2="^(?<msghdr>([[](INFO|DEBUG|ERROR|EXCEPTION|TRACE|WARN)[]][[:blank:]]){%s})((%s)[^{\n]*(%s{))"
| eval p1="Bind resource processor response", pat1=printf('sfmt', "0", 'p1'), pat2=printf('sfmt', "1", 'p1'), p3="Bind processor serivce response:", pat3=printf('sfmt', "0", 'p3'), pat4=printf('sfmt', "1", 'p3'), p5="Bind resource workflow response:", pat5=printf('sfmt', "0", 'p5'), pat6=printf('sfmt', "1", 'p5'), p7="MediaBar Connected to the HIS server", pat7=printf('sfmt', "0", 'p7'), pat8=printf('sfmt', "1", 'p7'), p9="[*]{3}", p9a="json:-", pat9=printf('sfmt2', "0", 'p9', 'p9a'), pat10=printf('sfmt2', "1", 'p9', 'p9a'), p11="JSON Response:-", pat11=printf('sfmt', "0", 'p11'), pat12=printf('sfmt', "1", 'p11'), p13="Start Session Complete:", pat13=printf('sfmt', "0", 'p13'), pat14=printf('sfmt', "1", 'p13'), p15="", p15a="json:-", pat15=printf('sfmt2', "0", 'p15', 'p15a'), pat16=printf('sfmt2', "1", 'p15', 'p15a'), p17="Get processor service complete:", pat17=printf('sfmt', "0", 'p17'), pat18=printf('sfmt', "1", 'p17'), p19="Processing request:", pat19=printf('sfmt', "0", 'p19'), pat20=printf('sfmt', "1", 'p19'), p21="Session start response:", pat21=printf('sfmt', "0", 'p21'), pat22=printf('sfmt', "1", 'p21'), p23="SessionLogout", pat23=printf('sfmt', "0", 'p23'), pat24=printf('sfmt', "1", 'p23'), p25="", pat25=printf('sfmt', "0", 'p25'), pat26=printf('sfmt', "1", 'p25'), p27="Available for suggest HIS server", pat27=printf('sfmt', "0", 'p27'), pat28=printf('sfmt', "1", 'p27'), p29="endSession for available HIS server", pat29=printf('sfmt', "0", 'p29'), pat30=printf('sfmt', "1", 'p29'), p31="get the response for External Disposition", pat31=printf('sfmt', "0", 'p31'), pat32=printf('sfmt', "1", 'p31'), p33="get the response for External Logout Reasons", pat33=printf('sfmt', "0", 'p33'), pat34=printf('sfmt', "1", 'p33'), p35="get the response for Not Ready Reasons", pat35=printf('sfmt', "0", 'p35'), pat36=printf('sfmt', "1", 'p35'), p37="Response from", p37a="json:-", pat37=printf('sfmt2', "0", 'p37', 'p37a'), pat38=printf('sfmt2', "1", 'p37', 'p37a')
| eval x=case(
match(message, 'pat1'), 'p1',
match(message, 'pat2'), 'p1'."_loglevel",
match(message, 'pat3'), 'p3',
match(message, 'pat4'), 'p3'."_loglevel",
match(message, 'pat5'), 'p5',
match(message, 'pat6'), 'p5'."_loglevel",
match(message, 'pat7'), 'p7',
match(message, 'pat8'), 'p7'."_loglevel",
match(message, 'pat9'), 'p9'.'p9a',
match(message, 'pat10'), 'p9'.'p9a'."_loglevel",
match(message, 'pat11'), 'p11',
match(message, 'pat12'), 'p11'."_loglevel",
match(message, 'pat13'), 'p13',
match(message, 'pat14'), 'p13'."_loglevel",
match(message, 'pat15'), 'p15'.'p15a',
match(message, 'pat16'), 'p15'.'p15a'."_loglevel",
match(message, 'pat17'), 'p17',
match(message, 'pat18'), 'p17'."_loglevel",
match(message, 'pat19'), 'p19',
match(message, 'pat20'), 'p19'."_loglevel",
match(message, 'pat21'), 'p21',
match(message, 'pat22'), 'p21'."_loglevel",
match(message, 'pat23'), 'p23',
match(message, 'pat24'), 'p23'."_loglevel",
match(message, 'pat25'), "nomsghdr",
match(message, 'pat26'), "nomsghdr_loglevel",
match(message, 'pat27'), 'p27',
match(message, 'pat28'), 'p27'."_loglevel",
match(message, 'pat29'), 'p29',
match(message, 'pat30'), 'p29'."_loglevel",
match(message, 'pat31'), 'p31',
match(message, 'pat32'), 'p31'."_loglevel",
match(message, 'pat33'), 'p33',
match(message, 'pat34'), 'p33'."_loglevel",
match(message, 'pat35'), 'p35',
match(message, 'pat36'), 'p35'."_loglevel",
match(message, 'pat37'), 'p37',
match(message, 'pat38'), 'p37'."_loglevel"
)
| table _time host crm.id x pat9 pat10 message count
| stats count(x) as countx by x | sort countx DESC |