opensextant.FlexPat

  1# -*- coding: utf-8 -*-
  2import os
  3import re
  4
  5from opensextant import TextMatch, Extractor, reduce_matches
  6
  7
  8def resource_for(resource_name):
  9    """
 10
 11    :param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME
 12    :return: file path.
 13    """
 14    import opensextant
 15    libdir = os.path.dirname(opensextant.__file__)
 16    container = os.path.join(libdir, "resources")
 17    fpath = os.path.join(container, resource_name)
 18    if os.path.exists(fpath):
 19        return fpath
 20    else:
 21        raise Exception("FileNotFound: Resource not found where expected at " + fpath)
 22
 23
 24def class_for(full_classname, instantiate=True):
 25    """
 26
 27    :param full_classname:
 28    :param instantiate: True if you wish the found class return an instance.
 29    :return: Class obj or Class instance.
 30    """
 31    from importlib import import_module
 32    segments = full_classname.split('.')
 33    clsname = segments[-1]
 34    modname = '.'.join(segments[:-1])
 35    mod = import_module(modname)
 36    clz = getattr(mod, clsname)
 37    if not clz:
 38        raise Exception("Class not found for " + full_classname)
 39    if instantiate:
 40        return clz()
 41    else:
 42        return clz
 43
 44
 45class RegexPattern:
 46    def __init__(self, fam, pid, desc):
 47        self.family = fam
 48        self.id = pid
 49        self.description = desc
 50        # Pattern
 51        self.regex = None
 52        # Ordered group-name list with slots in pattern.
 53        self.regex_groups = []
 54        self.version = None
 55        self.enabled = False
 56        self.match_classname = None
 57        self.match_class = None
 58
 59    def __str__(self):
 60        return "{}, Pattern: {}".format(self.id, self.regex)
 61
 62
 63class PatternMatch(TextMatch):
 64    """
 65    A general Pattern-based TextMatch.
 66    This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API.
 67    """
 68
 69    UPPER_CASE = 1
 70    LOWER_CASE = 2
 71    FOUND_CASE = 0
 72
 73    def __init__(self, *args, pattern_id=None, label=None, match_groups=None):
 74        TextMatch.__init__(self, *args, label=label)
 75        # Normalized text match is NONE until normalize() is run.
 76        self.textnorm = None
 77        self.pattern_id = pattern_id
 78        self.case = PatternMatch.FOUND_CASE
 79        self.match_groups = match_groups
 80        self.variant_id = None
 81        self.is_valid = True
 82        self.confidence = -1
 83        # PERFORMANCE flag:  omit = True to never return the match value.
 84        #          It could be filtered out and returned.  But omit means we never see it.
 85        self.omit = False
 86
 87        # Optionally -- back fill as much surrounding text as you want for
 88        # normalizer/validator routines. Use pre_text, post_text
 89        self.pre_text = None
 90        self.post_text = None
 91        if self.pattern_id and "-" in self.pattern_id:
 92            self.variant_id = self.pattern_id.split("-", 1)[1]
 93
 94    def __str__(self):
 95        return f"({self.label}) {self.text}"
 96
 97    def copy_attrs(self, arr):
 98        """
 99        Default copy of match group slots.  Does not work for every situation.
100        :param arr:
101        :return:
102        """
103        for k in arr:
104            val = self.get_value(k)
105            if val:
106                self.attrs[k] = val
107
108    def add_surrounding_text(self, text, text_len, length=16):
109        """
110        Given this match's span and the text it was derived from,
111        populate pre_text, post_text with some # of chars specified by length.
112
113        :param text: The text in which this match was found.
114        :param text_len: the length of the text buffer.  (avoid repeating len(text))
115        :param length:  the pre/post text length to attach.
116        :return:
117        """
118        if self.start > 0:
119            x1 = self.start - length
120            if x1 < 0:
121                x1 = 0
122            self.pre_text = text[x1:self.start]
123        if self.end > 0:
124            x1 = self.end + length
125            if x1 > text_len:
126                x1 = text_len
127            self.post_text = text[self.end:x1]
128
129    def attributes(self):
130        """
131        Render domain details to meaningful exported view of the data.
132        :return:
133        """
134        default_attrs = {"method": self.pattern_id}
135        for (k, v, x1, x2) in self.match_groups:
136            default_attrs[k] = v
137        return default_attrs
138
139    def normalize(self):
140        if not self.text:
141            return
142
143        self.textnorm = self.text.strip()
144        if self.case == PatternMatch.UPPER_CASE:
145            self.textnorm = self.textnorm.upper()
146        elif self.case == PatternMatch.LOWER_CASE:
147            self.textnorm = self.textnorm.lower()
148
149    def get_value(self, k):
150        """
151        Get Slot value -- returns first one.
152        :param k:
153        :return:
154        """
155        grp = get_slot(self.match_groups, k)
156        if grp:
157            # tuple is group_name, value, start, end. Return value:
158            return grp[1]
159        return None
160
161
162def get_slot(grps, k):
163    """
164    Given array of match groups, return first key matching
165    :param grps:
166    :param k:
167    :return: tuple matching.
168    """
169    for g in grps:
170        key, v, x1, x2 = g
171        if key == k:
172            return g
173    return None
174
175
176class PatternTestCase:
177    def __init__(self, tid, family, text):
178        self.id = tid
179        self.family = family
180        self.text = text
181        self.true_positive = True
182
183
184def get_config_file(cfg, modfile):
185    """
186    Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__)
187    :param cfg:
188    :param modfile:
189    :return:
190    """
191    pkgdir = os.path.dirname(os.path.abspath(modfile))
192    patterns_file = os.path.join(pkgdir, cfg)
193    if os.path.exists(patterns_file):
194        return patterns_file
195    raise FileNotFoundError("No such file {} at {}".format(cfg, patterns_file))
196
197
198class RegexPatternManager:
199    """
200    RegexPatternManager is the patterns configuration file parser.
201    See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md
202
203    """
204
205    def __init__(self, patterns_cfg, module_file=None, debug=False, testing=False):
206        self.families = set([])
207        self.patterns = {}
208        self.patterns_file = patterns_cfg
209        if module_file:
210            # Resolve this absolute path now.
211            self.patterns_file = get_config_file(patterns_cfg, module_file)
212
213        self.patterns_file_path = None
214        self.test_cases = []
215        self.matcher_classes = {}
216        # CUSTOM: For mapping Python and Java classes internally.  Experimental.
217        self.match_class_registry = {}
218        self.testing = testing
219        self.debug = debug
220        self._initialize()
221
222    def get_pattern(self, pid):
223        return self.patterns.get(pid)
224
225    def create_pattern(self, fam, rule, desc):
226        """ Override pattern class creation as needed.
227        """
228        return RegexPattern(fam, "{}-{}".format(fam, rule), desc)
229
230    def create_testcase(self, tid, fam, text):
231        return PatternTestCase(tid, fam, text)
232
233    def validate_pattern(self, repat):
234        """Default validation is True
235        Override this if necessary, e.g., pattern implementation has additional metadata
236        """
237        return repat is not None
238
239    def enable_all(self):
240        for k in self.patterns:
241            pat = self.patterns[k]
242            pat.enabled = True
243
244    def disable_all(self):
245        for k in self.patterns:
246            pat = self.patterns[k]
247            pat.enabled = False
248
249    def set_enabled(self, some: str, flag: bool):
250        """
251        set family enabled or not
252        :param some: prefix of a family or family-variant
253        :param flag: bool setting
254        :return:
255        """
256        for k in self.patterns:
257            pat = self.patterns[k]
258            if pat.id.startswith(some):
259                pat.enabled = flag
260
261    def _initialize(self):
262        """
263        :raise Exception if item not found.
264        :return:
265        """
266        self.patterns = {}
267
268        # the  # RULE statements as name and a sequence of DEFINES and regex bits
269        defines = {}
270        rules = {}
271        # Preserve order
272        rule_order = []
273        # Record pattern setup and validation messages
274        configMessages = []
275
276        config_fpath = self.patterns_file
277        if not os.path.exists(self.patterns_file):
278            config_fpath = resource_for(self.patterns_file)
279
280        # By now we have tried the given file path, inferred a path local to the calling module
281        # and lastly tried a resource folder in opensextant/resource/ data.
282        if not os.path.exists(config_fpath):
283            raise FileNotFoundError("Tried various absolute and inferred paths for the file '{}'".format(
284                os.path.basename(self.patterns_file)))
285
286        # PY3:
287        with open(config_fpath, "r", encoding="UTF-8") as fh:
288            testcount = 0
289            for line in fh:
290                stmt = line.strip()
291                if line.startswith("#DEFINE"):
292                    # #DEFINE<tab><defineName><tab><definePattern>
293                    fields = re.split("[\t ]+", stmt, 2)
294                    defines[fields[1]] = fields[2]
295                elif line.startswith("#RULE"):
296                    # #RULE<tab><rule_fam><tab><rule_id><tab><pattern>
297                    fields = re.split("[\t ]+", stmt, 3)
298
299                    fam = fields[1]
300                    ruleEnum = fields[2]
301                    rulePattern = fields[3]
302                    ruleKey = fam + "-" + ruleEnum
303
304                    # if already a rule by that name, error
305                    if ruleKey in rules:
306                        raise Exception("FlexPat Config Error - Duplicate rule name " + ruleEnum)
307
308                    rules[ruleKey] = rulePattern
309                    rule_order.append(ruleKey)
310                elif self.testing and stmt.startswith("#TEST"):
311                    fields = re.split("[\t ]+", stmt, 3)
312                    testcount += 1
313
314                    fam = fields[1]
315                    ruleEnum = fields[2]
316                    testtext = fields[3].strip().replace("$NL", "\n")
317                    ruleKey = fam + "-" + ruleEnum
318
319                    # testcount is a count of all tests, not just test within a rule family
320                    testKey = "{}#{}".format(ruleKey, testcount)
321                    self.test_cases.append(self.create_testcase(testKey, fam, testtext))
322                elif stmt.startswith("#CLASS"):
323                    fields = re.split("[\t ]+", stmt, 2)
324                    fam = fields[1]
325                    self.matcher_classes[fam] = fields[2]
326                else:
327                    pass
328
329        elementRegex = "<[a-zA-Z0-9_]+>"
330        elementPattern = re.compile(elementRegex)
331
332        for tmpkey in rule_order:
333            tmpRulePattern = rules.get(tmpkey)
334            fam, rule_name = tmpkey.split("-", 1)
335            self.families.add(fam)
336
337            pat = self.create_pattern(fam, rule_name, "No Description yet...")
338            if fam in self.matcher_classes:
339                try:
340                    pat.match_classname = self.matcher_classes.get(fam)
341                    if pat.match_classname in self.match_class_registry:
342                        # rename. Map Java class to a Python class.
343                        pat.match_classname = self.match_class_registry[pat.match_classname]
344
345                    # Do not instantiate, just find the class named in config file.
346                    pat.match_class = class_for(pat.match_classname, instantiate=False)
347                except Exception as err:
348                    print(err)
349
350            # find all of the element definitions within the pattern
351            groupNum = 1
352            for m in elementPattern.finditer(tmpRulePattern):
353                e1 = m.start()
354                e2 = m.end()
355                elementName = tmpRulePattern[e1 + 1: e2 - 1]
356                pat.regex_groups.append(elementName)
357
358                if self.debug:
359                    subelementPattern = defines.get(elementName)
360                    configMessages.append("\n\t")
361                    configMessages.append("{} {} = {}".format(groupNum, elementName, subelementPattern))
362                groupNum += 1
363
364            for slot_name in set(pat.regex_groups):
365                if slot_name not in defines:
366                    raise Exception("Slot definition is not DEFINED for " + slot_name)
367
368                tmpDef = defines[slot_name]
369                # NOTE:  Use of parens, "(expr)", is required to create groups within a pattern.
370                tmpDefPattern = "({})".format(tmpDef)
371                tmpDefSlot = "<{}>".format(slot_name)
372                # Replaces all.
373                tmpRulePattern = tmpRulePattern.replace(tmpDefSlot, tmpDefPattern)
374
375            if self.debug:
376                configMessages.append("\nrulepattern=" + tmpRulePattern)
377
378            pat.regex = re.compile(tmpRulePattern, re.IGNORECASE)
379            pat.enabled = True
380            self.patterns[pat.id] = pat
381            if not self.validate_pattern(pat):
382                raise Exception("Invalid Pattern " + str(pat))
383
384        if self.debug:
385            configMessages.append("\nFound # of PATTERNS={}".format(len(self.patterns)))
386
387
388def _digest_sub_groups(m, pattern_groups):
389    """
390    Reorganize regex groups internally.
391    :param pattern_groups: ordered list of groups as they appear in RE
392    :return: array only found item tuples:  (group, value, start, end)
393    """
394    count = 0
395    slots = []
396    glen = len(pattern_groups)
397    for found in m.groups():
398        if count > glen:
399            raise Exception("Unexpected -- more slots found than groups in pattern.")
400        slot_name = pattern_groups[count]
401        slot = (slot_name, found, m.start(count + 1), m.end(count + 1))
402        slots.append(slot)
403        count += 1
404
405    return slots
406
407
408class PatternExtractor(Extractor):
409    """
410        Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md
411
412        Example:
413        ```
414        from opensextant.extractors.poli import PatternsOfLifeManager
415        from opensextant.FlexPat import PatternExtractor
416
417        # INIT
418        #=====================
419        # Invoke a particular REGEX rule set, here poli_patterns.cfg
420        # @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg
421        mgr = PatternsOfLifeManager("poli_patterns.cfg")
422        pex = PatternExtractor(mgr)
423
424        # DEV/TEST
425        #=====================
426        # "default_test()" is useful to run during development and
427        # encourages you to capture critical pattern variants in your "TEST" data.
428        # Look at your pass/fail situations -- what test cases are failing your rule?
429        test_results = pex.default_tests()
430        print("TEST RESULTS")
431        for result in test_results:
432            print(repr(result))
433
434        # RUN
435        #=====================
436        real_results = pex.extract(".... text blob 1-800-123-4567...")
437        print("REAL RESULTS")
438        for result in real_results:
439            print(repr(result))
440            print("\tRAW DICT:", render_match(result))
441        ```
442    """
443
444    def __init__(self, pattern_manager):
445        """
446        invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare).
447        NOTE - `PatternsOfLifeManager` is a  particular subclass of RegexPatternManager becuase
448        it is manipulating the input patterns config file which is shared with the Java demo.
449        The `CLASS` names unfortunately are specific to Python or Java.
450
451        :param pattern_manager: RegexPatternManager
452        """
453        Extractor.__init__(self)
454        self.id = "xpx"
455        self.name = "Xponents Pattern Extractor"
456        self.pattern_manager = pattern_manager
457
458    def extract(self, text, **kwargs):
459        """ Default Extractor API. """
460        return self.extract_patterns(text, **kwargs)
461
462    def extract_patterns(self, text, **kwargs):
463        """
464        Given some text input, apply all relevant pattern families against the text.
465        Surrounding text is added to each match for post-processing.
466        :param text:
467        :param kwargs:
468        :return:
469        """
470        features = kwargs.get("features")
471        if not features:
472            features = self.pattern_manager.families
473
474        tlen = len(text)
475        results = []
476        for fam in features:
477            if fam not in self.pattern_manager.families:
478                raise Exception("Uknown Pattern Family " + fam)
479
480            for pat_id in self.pattern_manager.patterns:
481                pat = self.pattern_manager.patterns[pat_id]
482                if not pat.family == fam:
483                    continue
484                if not pat.enabled:
485                    continue
486
487                for m in pat.regex.finditer(text):
488                    digested_groups = _digest_sub_groups(m, pat.regex_groups)
489                    if pat.match_class:
490                        domainObj = pat.match_class(m.group(), m.start(), m.end(),
491                                                    pattern_id=pat.id,
492                                                    label=pat.family,
493                                                    match_groups=digested_groups)
494                        # surrounding text may be used by normalization and validation
495                        domainObj.add_surrounding_text(text, tlen, length=20)
496                        domainObj.normalize()
497                        if not domainObj.omit:
498                            results.append(domainObj)
499                    else:
500                        genericObj = PatternMatch(m.group(), m.start(), m.end(),
501                                                  pattern_id=pat.id,
502                                                  label=pat.family,
503                                                  match_groups=digested_groups)
504                        genericObj.add_surrounding_text(text, tlen, length=20)
505                        results.append(genericObj)
506
507        # Determine if any matches are redundant.  Mark redundancies as "filtered out".
508        reduce_matches(results)
509        for r in results:
510            if r.is_duplicate or r.is_submatch:
511                r.filtered_out = True
512
513        return results
514
515    def default_tests(self, scope="rule"):
516        """
517        Default Tests run all TEST cases for each RULE in patterns config.
518        TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out.
519        Otherwise a TEST is intended to return 1 or more matches.
520
521        By default, this runs each test and observes only results that were triggered by that rule being tested.
522        If scope is "ruleset" then any results from any rule will be allowed.
523        "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the
524        right thing.
525        
526        Runs the default tests on the provided configuration. Plenty of debug printed to screen.
527        But returns the test results as an array, e.g., to write to CSV for review.
528        This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use
529        of  Extractor.extract() parent method.
530        :param scope: rule or ruleset.  Rule scope means only results for rule test case are evaluated.
531                 ruleset scope means that all results for a test are evaluated.
532        :return: test results array; Each result represents a TEST case run against a RULE
533        """
534        test_results = []
535        for t in self.pattern_manager.test_cases:
536            expect_valid_match = "FAIL" not in t.text
537            print("Test", t.family, t.text)
538            output1 = self.extract_patterns(t.text, features=[t.family])
539
540            output = []
541            for m in output1:
542                if scope == "rule" and not t.id.startswith(m.pattern_id):
543                    continue
544                output.append(m)
545
546            # Determine if pattern matched true positive or false positive.
547            # To condition the TP or FP based on the matches
548            #  keep a running tally of whether each match is filtered or not.
549            # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected.
550            #          for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected.
551            fpcount = 0
552            tpcount = 0
553            for m in output:
554                allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out)
555                if expect_valid_match and allowed:
556                    tpcount += 1
557                if not expect_valid_match and allowed:
558                    fpcount += 1
559
560            tp = tpcount > 0 and expect_valid_match
561            fp = fpcount > 0 and not expect_valid_match
562            tn = fpcount == 0 and not expect_valid_match
563            fn = tpcount == 0 and expect_valid_match
564            success = (tp or tn) and not (fp or fn)
565            test_results.append({"TEST": t.id,
566                                 "TEXT": t.text,
567                                 "MATCHES": output,
568                                 "PASS": success})
569
570        return test_results
571
572
573def print_test(result: dict):
574    """ print the structure from default_tests()
575    """
576    if not result:
577        return
578
579    tid = result["TEST"]
580    txt = result["TEXT"]
581    res = result["PASS"]
582    matches = "<None>"
583    if result["MATCHES"]:
584        arr = result["MATCHES"]
585        matches = ";".join([match.text for match in arr])
586    print(f"TEST: {tid}, TEXT: {txt} PASS:{res}\tMATCHES: {matches}")
def resource_for(resource_name):
 9def resource_for(resource_name):
10    """
11
12    :param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME
13    :return: file path.
14    """
15    import opensextant
16    libdir = os.path.dirname(opensextant.__file__)
17    container = os.path.join(libdir, "resources")
18    fpath = os.path.join(container, resource_name)
19    if os.path.exists(fpath):
20        return fpath
21    else:
22        raise Exception("FileNotFound: Resource not found where expected at " + fpath)

:param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME :return: file path.

def class_for(full_classname, instantiate=True):
25def class_for(full_classname, instantiate=True):
26    """
27
28    :param full_classname:
29    :param instantiate: True if you wish the found class return an instance.
30    :return: Class obj or Class instance.
31    """
32    from importlib import import_module
33    segments = full_classname.split('.')
34    clsname = segments[-1]
35    modname = '.'.join(segments[:-1])
36    mod = import_module(modname)
37    clz = getattr(mod, clsname)
38    if not clz:
39        raise Exception("Class not found for " + full_classname)
40    if instantiate:
41        return clz()
42    else:
43        return clz

:param full_classname: :param instantiate: True if you wish the found class return an instance. :return: Class obj or Class instance.

class PatternMatch(opensextant.TextMatch):
 64class PatternMatch(TextMatch):
 65    """
 66    A general Pattern-based TextMatch.
 67    This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API.
 68    """
 69
 70    UPPER_CASE = 1
 71    LOWER_CASE = 2
 72    FOUND_CASE = 0
 73
 74    def __init__(self, *args, pattern_id=None, label=None, match_groups=None):
 75        TextMatch.__init__(self, *args, label=label)
 76        # Normalized text match is NONE until normalize() is run.
 77        self.textnorm = None
 78        self.pattern_id = pattern_id
 79        self.case = PatternMatch.FOUND_CASE
 80        self.match_groups = match_groups
 81        self.variant_id = None
 82        self.is_valid = True
 83        self.confidence = -1
 84        # PERFORMANCE flag:  omit = True to never return the match value.
 85        #          It could be filtered out and returned.  But omit means we never see it.
 86        self.omit = False
 87
 88        # Optionally -- back fill as much surrounding text as you want for
 89        # normalizer/validator routines. Use pre_text, post_text
 90        self.pre_text = None
 91        self.post_text = None
 92        if self.pattern_id and "-" in self.pattern_id:
 93            self.variant_id = self.pattern_id.split("-", 1)[1]
 94
 95    def __str__(self):
 96        return f"({self.label}) {self.text}"
 97
 98    def copy_attrs(self, arr):
 99        """
100        Default copy of match group slots.  Does not work for every situation.
101        :param arr:
102        :return:
103        """
104        for k in arr:
105            val = self.get_value(k)
106            if val:
107                self.attrs[k] = val
108
109    def add_surrounding_text(self, text, text_len, length=16):
110        """
111        Given this match's span and the text it was derived from,
112        populate pre_text, post_text with some # of chars specified by length.
113
114        :param text: The text in which this match was found.
115        :param text_len: the length of the text buffer.  (avoid repeating len(text))
116        :param length:  the pre/post text length to attach.
117        :return:
118        """
119        if self.start > 0:
120            x1 = self.start - length
121            if x1 < 0:
122                x1 = 0
123            self.pre_text = text[x1:self.start]
124        if self.end > 0:
125            x1 = self.end + length
126            if x1 > text_len:
127                x1 = text_len
128            self.post_text = text[self.end:x1]
129
130    def attributes(self):
131        """
132        Render domain details to meaningful exported view of the data.
133        :return:
134        """
135        default_attrs = {"method": self.pattern_id}
136        for (k, v, x1, x2) in self.match_groups:
137            default_attrs[k] = v
138        return default_attrs
139
140    def normalize(self):
141        if not self.text:
142            return
143
144        self.textnorm = self.text.strip()
145        if self.case == PatternMatch.UPPER_CASE:
146            self.textnorm = self.textnorm.upper()
147        elif self.case == PatternMatch.LOWER_CASE:
148            self.textnorm = self.textnorm.lower()
149
150    def get_value(self, k):
151        """
152        Get Slot value -- returns first one.
153        :param k:
154        :return:
155        """
156        grp = get_slot(self.match_groups, k)
157        if grp:
158            # tuple is group_name, value, start, end. Return value:
159            return grp[1]
160        return None

A general Pattern-based TextMatch. This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API.

def copy_attrs(self, arr):
 98    def copy_attrs(self, arr):
 99        """
100        Default copy of match group slots.  Does not work for every situation.
101        :param arr:
102        :return:
103        """
104        for k in arr:
105            val = self.get_value(k)
106            if val:
107                self.attrs[k] = val

Default copy of match group slots. Does not work for every situation. :param arr: :return:

def add_surrounding_text(self, text, text_len, length=16):
109    def add_surrounding_text(self, text, text_len, length=16):
110        """
111        Given this match's span and the text it was derived from,
112        populate pre_text, post_text with some # of chars specified by length.
113
114        :param text: The text in which this match was found.
115        :param text_len: the length of the text buffer.  (avoid repeating len(text))
116        :param length:  the pre/post text length to attach.
117        :return:
118        """
119        if self.start > 0:
120            x1 = self.start - length
121            if x1 < 0:
122                x1 = 0
123            self.pre_text = text[x1:self.start]
124        if self.end > 0:
125            x1 = self.end + length
126            if x1 > text_len:
127                x1 = text_len
128            self.post_text = text[self.end:x1]

Given this match's span and the text it was derived from, populate pre_text, post_text with some # of chars specified by length.

:param text: The text in which this match was found. :param text_len: the length of the text buffer. (avoid repeating len(text)) :param length: the pre/post text length to attach. :return:

def attributes(self):
130    def attributes(self):
131        """
132        Render domain details to meaningful exported view of the data.
133        :return:
134        """
135        default_attrs = {"method": self.pattern_id}
136        for (k, v, x1, x2) in self.match_groups:
137            default_attrs[k] = v
138        return default_attrs

Render domain details to meaningful exported view of the data. :return:

def normalize(self):
140    def normalize(self):
141        if not self.text:
142            return
143
144        self.textnorm = self.text.strip()
145        if self.case == PatternMatch.UPPER_CASE:
146            self.textnorm = self.textnorm.upper()
147        elif self.case == PatternMatch.LOWER_CASE:
148            self.textnorm = self.textnorm.lower()

Optional, but recommended routine to normalize the matched data. That is, parse fields, uppercase, streamline punctuation, etc. As well, given such normalization result, this is the opportunity to additionally validate the match. :return:

def get_value(self, k):
150    def get_value(self, k):
151        """
152        Get Slot value -- returns first one.
153        :param k:
154        :return:
155        """
156        grp = get_slot(self.match_groups, k)
157        if grp:
158            # tuple is group_name, value, start, end. Return value:
159            return grp[1]
160        return None

Get Slot value -- returns first one. :param k: :return:

def get_slot(grps, k):
163def get_slot(grps, k):
164    """
165    Given array of match groups, return first key matching
166    :param grps:
167    :param k:
168    :return: tuple matching.
169    """
170    for g in grps:
171        key, v, x1, x2 = g
172        if key == k:
173            return g
174    return None

Given array of match groups, return first key matching :param grps: :param k: :return: tuple matching.

def get_config_file(cfg, modfile):
185def get_config_file(cfg, modfile):
186    """
187    Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__)
188    :param cfg:
189    :param modfile:
190    :return:
191    """
192    pkgdir = os.path.dirname(os.path.abspath(modfile))
193    patterns_file = os.path.join(pkgdir, cfg)
194    if os.path.exists(patterns_file):
195        return patterns_file
196    raise FileNotFoundError("No such file {} at {}".format(cfg, patterns_file))

Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__) :param cfg: :param modfile: :return:

class RegexPatternManager:
199class RegexPatternManager:
200    """
201    RegexPatternManager is the patterns configuration file parser.
202    See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md
203
204    """
205
206    def __init__(self, patterns_cfg, module_file=None, debug=False, testing=False):
207        self.families = set([])
208        self.patterns = {}
209        self.patterns_file = patterns_cfg
210        if module_file:
211            # Resolve this absolute path now.
212            self.patterns_file = get_config_file(patterns_cfg, module_file)
213
214        self.patterns_file_path = None
215        self.test_cases = []
216        self.matcher_classes = {}
217        # CUSTOM: For mapping Python and Java classes internally.  Experimental.
218        self.match_class_registry = {}
219        self.testing = testing
220        self.debug = debug
221        self._initialize()
222
223    def get_pattern(self, pid):
224        return self.patterns.get(pid)
225
226    def create_pattern(self, fam, rule, desc):
227        """ Override pattern class creation as needed.
228        """
229        return RegexPattern(fam, "{}-{}".format(fam, rule), desc)
230
231    def create_testcase(self, tid, fam, text):
232        return PatternTestCase(tid, fam, text)
233
234    def validate_pattern(self, repat):
235        """Default validation is True
236        Override this if necessary, e.g., pattern implementation has additional metadata
237        """
238        return repat is not None
239
240    def enable_all(self):
241        for k in self.patterns:
242            pat = self.patterns[k]
243            pat.enabled = True
244
245    def disable_all(self):
246        for k in self.patterns:
247            pat = self.patterns[k]
248            pat.enabled = False
249
250    def set_enabled(self, some: str, flag: bool):
251        """
252        set family enabled or not
253        :param some: prefix of a family or family-variant
254        :param flag: bool setting
255        :return:
256        """
257        for k in self.patterns:
258            pat = self.patterns[k]
259            if pat.id.startswith(some):
260                pat.enabled = flag
261
262    def _initialize(self):
263        """
264        :raise Exception if item not found.
265        :return:
266        """
267        self.patterns = {}
268
269        # the  # RULE statements as name and a sequence of DEFINES and regex bits
270        defines = {}
271        rules = {}
272        # Preserve order
273        rule_order = []
274        # Record pattern setup and validation messages
275        configMessages = []
276
277        config_fpath = self.patterns_file
278        if not os.path.exists(self.patterns_file):
279            config_fpath = resource_for(self.patterns_file)
280
281        # By now we have tried the given file path, inferred a path local to the calling module
282        # and lastly tried a resource folder in opensextant/resource/ data.
283        if not os.path.exists(config_fpath):
284            raise FileNotFoundError("Tried various absolute and inferred paths for the file '{}'".format(
285                os.path.basename(self.patterns_file)))
286
287        # PY3:
288        with open(config_fpath, "r", encoding="UTF-8") as fh:
289            testcount = 0
290            for line in fh:
291                stmt = line.strip()
292                if line.startswith("#DEFINE"):
293                    # #DEFINE<tab><defineName><tab><definePattern>
294                    fields = re.split("[\t ]+", stmt, 2)
295                    defines[fields[1]] = fields[2]
296                elif line.startswith("#RULE"):
297                    # #RULE<tab><rule_fam><tab><rule_id><tab><pattern>
298                    fields = re.split("[\t ]+", stmt, 3)
299
300                    fam = fields[1]
301                    ruleEnum = fields[2]
302                    rulePattern = fields[3]
303                    ruleKey = fam + "-" + ruleEnum
304
305                    # if already a rule by that name, error
306                    if ruleKey in rules:
307                        raise Exception("FlexPat Config Error - Duplicate rule name " + ruleEnum)
308
309                    rules[ruleKey] = rulePattern
310                    rule_order.append(ruleKey)
311                elif self.testing and stmt.startswith("#TEST"):
312                    fields = re.split("[\t ]+", stmt, 3)
313                    testcount += 1
314
315                    fam = fields[1]
316                    ruleEnum = fields[2]
317                    testtext = fields[3].strip().replace("$NL", "\n")
318                    ruleKey = fam + "-" + ruleEnum
319
320                    # testcount is a count of all tests, not just test within a rule family
321                    testKey = "{}#{}".format(ruleKey, testcount)
322                    self.test_cases.append(self.create_testcase(testKey, fam, testtext))
323                elif stmt.startswith("#CLASS"):
324                    fields = re.split("[\t ]+", stmt, 2)
325                    fam = fields[1]
326                    self.matcher_classes[fam] = fields[2]
327                else:
328                    pass
329
330        elementRegex = "<[a-zA-Z0-9_]+>"
331        elementPattern = re.compile(elementRegex)
332
333        for tmpkey in rule_order:
334            tmpRulePattern = rules.get(tmpkey)
335            fam, rule_name = tmpkey.split("-", 1)
336            self.families.add(fam)
337
338            pat = self.create_pattern(fam, rule_name, "No Description yet...")
339            if fam in self.matcher_classes:
340                try:
341                    pat.match_classname = self.matcher_classes.get(fam)
342                    if pat.match_classname in self.match_class_registry:
343                        # rename. Map Java class to a Python class.
344                        pat.match_classname = self.match_class_registry[pat.match_classname]
345
346                    # Do not instantiate, just find the class named in config file.
347                    pat.match_class = class_for(pat.match_classname, instantiate=False)
348                except Exception as err:
349                    print(err)
350
351            # find all of the element definitions within the pattern
352            groupNum = 1
353            for m in elementPattern.finditer(tmpRulePattern):
354                e1 = m.start()
355                e2 = m.end()
356                elementName = tmpRulePattern[e1 + 1: e2 - 1]
357                pat.regex_groups.append(elementName)
358
359                if self.debug:
360                    subelementPattern = defines.get(elementName)
361                    configMessages.append("\n\t")
362                    configMessages.append("{} {} = {}".format(groupNum, elementName, subelementPattern))
363                groupNum += 1
364
365            for slot_name in set(pat.regex_groups):
366                if slot_name not in defines:
367                    raise Exception("Slot definition is not DEFINED for " + slot_name)
368
369                tmpDef = defines[slot_name]
370                # NOTE:  Use of parens, "(expr)", is required to create groups within a pattern.
371                tmpDefPattern = "({})".format(tmpDef)
372                tmpDefSlot = "<{}>".format(slot_name)
373                # Replaces all.
374                tmpRulePattern = tmpRulePattern.replace(tmpDefSlot, tmpDefPattern)
375
376            if self.debug:
377                configMessages.append("\nrulepattern=" + tmpRulePattern)
378
379            pat.regex = re.compile(tmpRulePattern, re.IGNORECASE)
380            pat.enabled = True
381            self.patterns[pat.id] = pat
382            if not self.validate_pattern(pat):
383                raise Exception("Invalid Pattern " + str(pat))
384
385        if self.debug:
386            configMessages.append("\nFound # of PATTERNS={}".format(len(self.patterns)))

RegexPatternManager is the patterns configuration file parser. See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md

def create_pattern(self, fam, rule, desc):
226    def create_pattern(self, fam, rule, desc):
227        """ Override pattern class creation as needed.
228        """
229        return RegexPattern(fam, "{}-{}".format(fam, rule), desc)

Override pattern class creation as needed.

def validate_pattern(self, repat):
234    def validate_pattern(self, repat):
235        """Default validation is True
236        Override this if necessary, e.g., pattern implementation has additional metadata
237        """
238        return repat is not None

Default validation is True Override this if necessary, e.g., pattern implementation has additional metadata

def set_enabled(self, some: str, flag: bool):
250    def set_enabled(self, some: str, flag: bool):
251        """
252        set family enabled or not
253        :param some: prefix of a family or family-variant
254        :param flag: bool setting
255        :return:
256        """
257        for k in self.patterns:
258            pat = self.patterns[k]
259            if pat.id.startswith(some):
260                pat.enabled = flag

set family enabled or not :param some: prefix of a family or family-variant :param flag: bool setting :return:

class PatternExtractor(opensextant.Extractor):
409class PatternExtractor(Extractor):
410    """
411        Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md
412
413        Example:
414        ```
415        from opensextant.extractors.poli import PatternsOfLifeManager
416        from opensextant.FlexPat import PatternExtractor
417
418        # INIT
419        #=====================
420        # Invoke a particular REGEX rule set, here poli_patterns.cfg
421        # @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg
422        mgr = PatternsOfLifeManager("poli_patterns.cfg")
423        pex = PatternExtractor(mgr)
424
425        # DEV/TEST
426        #=====================
427        # "default_test()" is useful to run during development and
428        # encourages you to capture critical pattern variants in your "TEST" data.
429        # Look at your pass/fail situations -- what test cases are failing your rule?
430        test_results = pex.default_tests()
431        print("TEST RESULTS")
432        for result in test_results:
433            print(repr(result))
434
435        # RUN
436        #=====================
437        real_results = pex.extract(".... text blob 1-800-123-4567...")
438        print("REAL RESULTS")
439        for result in real_results:
440            print(repr(result))
441            print("\tRAW DICT:", render_match(result))
442        ```
443    """
444
445    def __init__(self, pattern_manager):
446        """
447        invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare).
448        NOTE - `PatternsOfLifeManager` is a  particular subclass of RegexPatternManager becuase
449        it is manipulating the input patterns config file which is shared with the Java demo.
450        The `CLASS` names unfortunately are specific to Python or Java.
451
452        :param pattern_manager: RegexPatternManager
453        """
454        Extractor.__init__(self)
455        self.id = "xpx"
456        self.name = "Xponents Pattern Extractor"
457        self.pattern_manager = pattern_manager
458
459    def extract(self, text, **kwargs):
460        """ Default Extractor API. """
461        return self.extract_patterns(text, **kwargs)
462
463    def extract_patterns(self, text, **kwargs):
464        """
465        Given some text input, apply all relevant pattern families against the text.
466        Surrounding text is added to each match for post-processing.
467        :param text:
468        :param kwargs:
469        :return:
470        """
471        features = kwargs.get("features")
472        if not features:
473            features = self.pattern_manager.families
474
475        tlen = len(text)
476        results = []
477        for fam in features:
478            if fam not in self.pattern_manager.families:
479                raise Exception("Uknown Pattern Family " + fam)
480
481            for pat_id in self.pattern_manager.patterns:
482                pat = self.pattern_manager.patterns[pat_id]
483                if not pat.family == fam:
484                    continue
485                if not pat.enabled:
486                    continue
487
488                for m in pat.regex.finditer(text):
489                    digested_groups = _digest_sub_groups(m, pat.regex_groups)
490                    if pat.match_class:
491                        domainObj = pat.match_class(m.group(), m.start(), m.end(),
492                                                    pattern_id=pat.id,
493                                                    label=pat.family,
494                                                    match_groups=digested_groups)
495                        # surrounding text may be used by normalization and validation
496                        domainObj.add_surrounding_text(text, tlen, length=20)
497                        domainObj.normalize()
498                        if not domainObj.omit:
499                            results.append(domainObj)
500                    else:
501                        genericObj = PatternMatch(m.group(), m.start(), m.end(),
502                                                  pattern_id=pat.id,
503                                                  label=pat.family,
504                                                  match_groups=digested_groups)
505                        genericObj.add_surrounding_text(text, tlen, length=20)
506                        results.append(genericObj)
507
508        # Determine if any matches are redundant.  Mark redundancies as "filtered out".
509        reduce_matches(results)
510        for r in results:
511            if r.is_duplicate or r.is_submatch:
512                r.filtered_out = True
513
514        return results
515
516    def default_tests(self, scope="rule"):
517        """
518        Default Tests run all TEST cases for each RULE in patterns config.
519        TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out.
520        Otherwise a TEST is intended to return 1 or more matches.
521
522        By default, this runs each test and observes only results that were triggered by that rule being tested.
523        If scope is "ruleset" then any results from any rule will be allowed.
524        "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the
525        right thing.
526        
527        Runs the default tests on the provided configuration. Plenty of debug printed to screen.
528        But returns the test results as an array, e.g., to write to CSV for review.
529        This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use
530        of  Extractor.extract() parent method.
531        :param scope: rule or ruleset.  Rule scope means only results for rule test case are evaluated.
532                 ruleset scope means that all results for a test are evaluated.
533        :return: test results array; Each result represents a TEST case run against a RULE
534        """
535        test_results = []
536        for t in self.pattern_manager.test_cases:
537            expect_valid_match = "FAIL" not in t.text
538            print("Test", t.family, t.text)
539            output1 = self.extract_patterns(t.text, features=[t.family])
540
541            output = []
542            for m in output1:
543                if scope == "rule" and not t.id.startswith(m.pattern_id):
544                    continue
545                output.append(m)
546
547            # Determine if pattern matched true positive or false positive.
548            # To condition the TP or FP based on the matches
549            #  keep a running tally of whether each match is filtered or not.
550            # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected.
551            #          for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected.
552            fpcount = 0
553            tpcount = 0
554            for m in output:
555                allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out)
556                if expect_valid_match and allowed:
557                    tpcount += 1
558                if not expect_valid_match and allowed:
559                    fpcount += 1
560
561            tp = tpcount > 0 and expect_valid_match
562            fp = fpcount > 0 and not expect_valid_match
563            tn = fpcount == 0 and not expect_valid_match
564            fn = tpcount == 0 and expect_valid_match
565            success = (tp or tn) and not (fp or fn)
566            test_results.append({"TEST": t.id,
567                                 "TEXT": t.text,
568                                 "MATCHES": output,
569                                 "PASS": success})
570
571        return test_results

Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md

Example:

from opensextant.extractors.poli import PatternsOfLifeManager
from opensextant.FlexPat import PatternExtractor

# INIT
#=====================
# Invoke a particular REGEX rule set, here poli_patterns.cfg
# @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg
mgr = PatternsOfLifeManager("poli_patterns.cfg")
pex = PatternExtractor(mgr)

# DEV/TEST
#=====================
# "default_test()" is useful to run during development and
# encourages you to capture critical pattern variants in your "TEST" data.
# Look at your pass/fail situations -- what test cases are failing your rule?
test_results = pex.default_tests()
print("TEST RESULTS")
for result in test_results:
    print(repr(result))

# RUN
#=====================
real_results = pex.extract(".... text blob 1-800-123-4567...")
print("REAL RESULTS")
for result in real_results:
    print(repr(result))
    print("     RAW DICT:", render_match(result))
PatternExtractor(pattern_manager)
445    def __init__(self, pattern_manager):
446        """
447        invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare).
448        NOTE - `PatternsOfLifeManager` is a  particular subclass of RegexPatternManager becuase
449        it is manipulating the input patterns config file which is shared with the Java demo.
450        The `CLASS` names unfortunately are specific to Python or Java.
451
452        :param pattern_manager: RegexPatternManager
453        """
454        Extractor.__init__(self)
455        self.id = "xpx"
456        self.name = "Xponents Pattern Extractor"
457        self.pattern_manager = pattern_manager

invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare). NOTE - PatternsOfLifeManager is a particular subclass of RegexPatternManager becuase it is manipulating the input patterns config file which is shared with the Java demo. The CLASS names unfortunately are specific to Python or Java.

:param pattern_manager: RegexPatternManager

def extract(self, text, **kwargs):
459    def extract(self, text, **kwargs):
460        """ Default Extractor API. """
461        return self.extract_patterns(text, **kwargs)

Default Extractor API.

def extract_patterns(self, text, **kwargs):
463    def extract_patterns(self, text, **kwargs):
464        """
465        Given some text input, apply all relevant pattern families against the text.
466        Surrounding text is added to each match for post-processing.
467        :param text:
468        :param kwargs:
469        :return:
470        """
471        features = kwargs.get("features")
472        if not features:
473            features = self.pattern_manager.families
474
475        tlen = len(text)
476        results = []
477        for fam in features:
478            if fam not in self.pattern_manager.families:
479                raise Exception("Uknown Pattern Family " + fam)
480
481            for pat_id in self.pattern_manager.patterns:
482                pat = self.pattern_manager.patterns[pat_id]
483                if not pat.family == fam:
484                    continue
485                if not pat.enabled:
486                    continue
487
488                for m in pat.regex.finditer(text):
489                    digested_groups = _digest_sub_groups(m, pat.regex_groups)
490                    if pat.match_class:
491                        domainObj = pat.match_class(m.group(), m.start(), m.end(),
492                                                    pattern_id=pat.id,
493                                                    label=pat.family,
494                                                    match_groups=digested_groups)
495                        # surrounding text may be used by normalization and validation
496                        domainObj.add_surrounding_text(text, tlen, length=20)
497                        domainObj.normalize()
498                        if not domainObj.omit:
499                            results.append(domainObj)
500                    else:
501                        genericObj = PatternMatch(m.group(), m.start(), m.end(),
502                                                  pattern_id=pat.id,
503                                                  label=pat.family,
504                                                  match_groups=digested_groups)
505                        genericObj.add_surrounding_text(text, tlen, length=20)
506                        results.append(genericObj)
507
508        # Determine if any matches are redundant.  Mark redundancies as "filtered out".
509        reduce_matches(results)
510        for r in results:
511            if r.is_duplicate or r.is_submatch:
512                r.filtered_out = True
513
514        return results

Given some text input, apply all relevant pattern families against the text. Surrounding text is added to each match for post-processing. :param text: :param kwargs: :return:

def default_tests(self, scope='rule'):
516    def default_tests(self, scope="rule"):
517        """
518        Default Tests run all TEST cases for each RULE in patterns config.
519        TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out.
520        Otherwise a TEST is intended to return 1 or more matches.
521
522        By default, this runs each test and observes only results that were triggered by that rule being tested.
523        If scope is "ruleset" then any results from any rule will be allowed.
524        "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the
525        right thing.
526        
527        Runs the default tests on the provided configuration. Plenty of debug printed to screen.
528        But returns the test results as an array, e.g., to write to CSV for review.
529        This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use
530        of  Extractor.extract() parent method.
531        :param scope: rule or ruleset.  Rule scope means only results for rule test case are evaluated.
532                 ruleset scope means that all results for a test are evaluated.
533        :return: test results array; Each result represents a TEST case run against a RULE
534        """
535        test_results = []
536        for t in self.pattern_manager.test_cases:
537            expect_valid_match = "FAIL" not in t.text
538            print("Test", t.family, t.text)
539            output1 = self.extract_patterns(t.text, features=[t.family])
540
541            output = []
542            for m in output1:
543                if scope == "rule" and not t.id.startswith(m.pattern_id):
544                    continue
545                output.append(m)
546
547            # Determine if pattern matched true positive or false positive.
548            # To condition the TP or FP based on the matches
549            #  keep a running tally of whether each match is filtered or not.
550            # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected.
551            #          for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected.
552            fpcount = 0
553            tpcount = 0
554            for m in output:
555                allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out)
556                if expect_valid_match and allowed:
557                    tpcount += 1
558                if not expect_valid_match and allowed:
559                    fpcount += 1
560
561            tp = tpcount > 0 and expect_valid_match
562            fp = fpcount > 0 and not expect_valid_match
563            tn = fpcount == 0 and not expect_valid_match
564            fn = tpcount == 0 and expect_valid_match
565            success = (tp or tn) and not (fp or fn)
566            test_results.append({"TEST": t.id,
567                                 "TEXT": t.text,
568                                 "MATCHES": output,
569                                 "PASS": success})
570
571        return test_results

Default Tests run all TEST cases for each RULE in patterns config. TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out. Otherwise a TEST is intended to return 1 or more matches.

By default, this runs each test and observes only results that were triggered by that rule being tested. If scope is "ruleset" then any results from any rule will be allowed. "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the right thing.

Runs the default tests on the provided configuration. Plenty of debug printed to screen. But returns the test results as an array, e.g., to write to CSV for review. This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use of Extractor.extract() parent method. :param scope: rule or ruleset. Rule scope means only results for rule test case are evaluated. ruleset scope means that all results for a test are evaluated. :return: test results array; Each result represents a TEST case run against a RULE