Source code for squabble.config

import collections
import copy
import json
import logging
import os.path
import re
import subprocess

from squabble import SquabbleException


logger = logging.getLogger(__name__)


Config = collections.namedtuple('Config', [
    'reporter',
    'plugins',
    'rules'
])


# TODO: Move these out somewhere else, feels gross to have them hardcoded.
DEFAULT_CONFIG = dict(
    reporter='plain',
    plugins=[],
    rules={}
)

PRESETS = {
    'postgres': {
        'description': (
            'A sane set of defaults that checks for obviously '
            'dangerous Postgres migrations and query antipatterns.'
        ),
        'config': {
            'rules': {
                'DisallowRenameEnumValue': {},
                'DisallowChangeColumnType': {},
                'DisallowNotIn': {},
                'DisallowTimetzType': {},
                'DisallowPaddedCharType': {},
                'DisallowTimestampPrecision': {},
            }
        }
    },

    'postgres-zero-downtime': {
        'description': (
            'A set of rules focused on preventing downtime during schema '
            'migrations on busy Postgres databases.'
        ),
        'config': {
            'rules': {
                'AddColumnDisallowConstraints': {
                    'disallowed': ['DEFAULT', 'NOT NULL', 'UNIQUE']
                },
                'DisallowRenameEnumValue': {},
                'DisallowChangeColumnType': {},
                'RequireConcurrentIndex': {},
            }
        }
    },

    'full': {
        'description': ('Every rule that ships with squabble. The output will '
                        'be noisy (and nonsensical), but it\'s probably a good '
                        'starting point to figure out which rules are useful.'),
        'config': {
            'rules': {
                'AddColumnDisallowConstraints': {
                    'disallowed': ['DEFAULT', 'NOT NULL', 'UNIQUE', 'FOREIGN']
                },
                'RequireConcurrentIndex': {},
                'DisallowRenameEnumValue': {},
                'DisallowChangeColumnType': {},
                'DisallowFloatTypes': {},
                'DisallowNotIn': {},
                'DisallowTimetzType': {},
                'DisallowPaddedCharType': {},
                'DisallowTimestampPrecision': {},
                'RequirePrimaryKey': {},
                # Yes, these are incompatible.
                'DisallowForeignKey': {},
                'RequireForeignKey': {},
            }
        }
    }
}


[docs]class UnknownPresetException(SquabbleException): """Raised when user tries to apply a preset that isn't defined.""" def __init__(self, preset): super().__init__('unknown preset: "%s"' % preset)
[docs]def discover_config_location(): """ Try to locate a config file in some likely locations. Used when no config path is specified explicitly. In order, this will check for a file named ``.squabblerc``: - in the current directory. - in the root of the repository (if working in a git repo). - in the user's home directory. """ possible_dirs = [ '.', _get_vcs_root(), os.path.expanduser('~') ] for d in possible_dirs: if d is None: continue logger.debug('checking %s for a config file', d) file_name = os.path.join(d, '.squabblerc') if os.path.exists(file_name): logger.debug('using "%s" for configuration', file_name) return file_name logger.debug('no config file found') return None
def _get_vcs_root(): """ Return the path to the root of the Git repository for the current directory, or empty string if not in a repository. """ return subprocess.getoutput( 'git rev-parse --show-toplevel 2>/dev/null || echo ""')
[docs]def get_base_config(preset_names=None): """ Return a basic config value that can be overridden by user configuration files. :param preset_names: The named presets to use (applied in order), or None """ if not preset_names: return Config(**DEFAULT_CONFIG) preset_settings = {} for name in preset_names: if name not in PRESETS: raise UnknownPresetException(name) preset_settings = _merge_dicts(preset_settings, PRESETS[name]) return Config(**{ **DEFAULT_CONFIG, **preset_settings['config'] })
def _parse_config_file(config_file): if not config_file: return {} with open(config_file, 'r') as fp: return json.load(fp)
[docs]def load_config(config_file, preset_names=None, reporter_name=None): """ Load configuration from a file, optionally applying a predefined set of rules. :param config_file: Path to JSON file containing user configuration. :type config_file: str :param preset_name: Preset to use as a base before applying user configuration. :type preset_name: str :param reporter_name: Override the reporter named in configuration. :type reporter_name: str """ base = get_base_config(preset_names) config = _parse_config_file(config_file) rules = copy.deepcopy(base.rules) for name, rule in config.get('rules', {}).items(): rules[name] = rule return Config( reporter=reporter_name or config.get('reporter', base.reporter), plugins=config.get('plugins', base.plugins), rules=rules )
[docs]def apply_file_config(base, contents): """ Given a base configuration object and the contents of a file, return a new config that applies any file-specific rule additions/deletions. Returns ``None`` if the file should be skipped. """ # Operate on a copy so we don't mutate the base config file_rules = copy.deepcopy(base.rules) rules = _extract_file_rules(contents) if rules['skip_file']: return None for rule, opts in rules['enable'].items(): file_rules[rule] = opts for rule in rules['disable']: del file_rules[rule] return base._replace(rules=file_rules)
def _extract_file_rules(text): """ Try to extract any file-level rule additions/suppressions. Valid lines are SQL line comments that enable or disable specific rules. >>> r = _extract_file_rules('-- squabble-enable:rule1 arr=a,b,c') >>> r['disable'] [] >>> r['enable'] {'rule1': {'arr': ['a', 'b', 'c']}} >>> r['skip_file'] False >>> r = _extract_file_rules('-- squabble-disable') >>> r['skip_file'] True """ rules = { 'enable': {}, 'disable': [], 'skip_file': False } comment_re = re.compile( r'--\s*' r'(?:squabble-)?(enable|disable)' r'(?::\s*(\w+)(.*?))?' r'$', re.I) for line in text.splitlines(): line = line.strip() m = re.match(comment_re, line) if m is None: continue action, rule, opts = m.groups() if action == 'disable' and not rule: rules['skip_file'] = True elif action == 'disable': rules['disable'].append(rule) elif action == 'enable': rules['enable'][rule] = _parse_options(opts) return rules def _parse_options(opts): """ Given a string of space-separated `key=value` pairs, return a dictionary of `{"key": "value"}`. Note the value will always be returned as a string, and no further parsing will be attempted. >>> opts = _parse_options('k=v abc=1,2,3') >>> opts == {'k': 'v', 'abc': ['1', '2', '3']} True >>> _parse_options('k="1,2","3,4"') {'k': ['1,2', '3,4']} """ options = {} # Either a simple quoted string or a bare value value = r'(?:(?:"([^"]+)")|([^,\s]+))' # Value followed by zero or more values value_list = r'{0}(?:,{0})*'.format(value) value_regex = re.compile(value) kv_regex = re.compile(r'(\w+)=({0})'.format(value_list)) # 'k=1,2' => ('k', '1,2') for match in re.finditer(kv_regex, opts): key, val = match.group(1, 2) # value_regex will return ('string', '') or ('', 'value') values = [a or b for a, b in re.findall(value_regex, val)] # Collapse single len lists into scalars options[key] = values[0] if len(values) == 1 else values return options def _merge_dicts(a, b): """ Combine the values of two (possibly nested) dictionaries. Values in ``b`` will take precedence over those in ``a``. This function will return a new dictionary rather than mutating its arguments. >>> a = {'foo': {'a': 1, 'b': 2}} >>> b = {'foo': {'b': 3, 'c': 4}, 'bar': 5} >>> m = _merge_dicts(a, b) >>> m == {'foo': {'a': 1, 'b': 3, 'c': 4}, 'bar': 5} True """ def _inner(x, y, out): """Recursive helper function which mutates its arguments.""" for k in x.keys() | y.keys(): xv, yv = x.get(k, {}), y.get(k, {}) if isinstance(xv, dict) and isinstance(yv, dict): out[k] = _inner(xv, yv, {}) else: out[k] = yv if k in y else xv return out return _inner(a, b, {})