Source code for xonsh.history.sqlite

# -*- coding: utf-8 -*-
"""Implements the xonsh history backend via sqlite3."""
import builtins
import collections
import json
import os
import sqlite3
import sys
import threading
import time

from xonsh.history.base import History
import xonsh.tools as xt


def _xh_sqlite_get_file_name():
    envs = builtins.__xonsh__.env
    file_name = envs.get("XONSH_HISTORY_SQLITE_FILE")
    if not file_name:
        data_dir = envs.get("XONSH_DATA_DIR")
        file_name = os.path.join(data_dir, "xonsh-history.sqlite")
    return xt.expanduser_abs_path(file_name)


def _xh_sqlite_get_conn(filename=None):
    if filename is None:
        filename = _xh_sqlite_get_file_name()
    return sqlite3.connect(filename)


def _xh_sqlite_create_history_table(cursor):
    """Create Table for history items.

    Columns:
        info - JSON formatted, reserved for future extension.
    """
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS xonsh_history
             (inp TEXT,
              rtn INTEGER,
              tsb REAL,
              tse REAL,
              sessionid TEXT,
              out TEXT,
              info TEXT
             )
    """
    )


def _xh_sqlite_insert_command(cursor, cmd, sessionid, store_stdout):
    sql = "INSERT INTO xonsh_history (inp, rtn, tsb, tse, sessionid"
    tss = cmd.get("ts", [None, None])
    params = [cmd["inp"].rstrip(), cmd["rtn"], tss[0], tss[1], sessionid]
    if store_stdout and "out" in cmd:
        sql += ", out"
        params.append(cmd["out"])
    if "info" in cmd:
        sql += ", info"
        info = json.dumps(cmd["info"])
        params.append(info)
    sql += ") VALUES (" + ("?, " * len(params)).rstrip(", ") + ")"
    cursor.execute(sql, tuple(params))


def _xh_sqlite_get_count(cursor, sessionid=None):
    sql = "SELECT count(*) FROM xonsh_history "
    params = []
    if sessionid is not None:
        sql += "WHERE sessionid = ? "
        params.append(str(sessionid))
    cursor.execute(sql, tuple(params))
    return cursor.fetchone()[0]


def _xh_sqlite_get_records(cursor, sessionid=None, limit=None, newest_first=False):
    sql = "SELECT inp, tsb, rtn FROM xonsh_history "
    params = []
    if sessionid is not None:
        sql += "WHERE sessionid = ? "
        params.append(sessionid)
    sql += "ORDER BY tsb "
    if newest_first:
        sql += "DESC "
    if limit is not None:
        sql += "LIMIT %d " % limit
    cursor.execute(sql, tuple(params))
    return cursor.fetchall()


def _xh_sqlite_delete_records(cursor, size_to_keep):
    sql = "SELECT min(tsb) FROM ("
    sql += "SELECT tsb FROM xonsh_history ORDER BY tsb DESC "
    sql += "LIMIT %d)" % size_to_keep
    cursor.execute(sql)
    result = cursor.fetchone()
    if not result:
        return
    max_tsb = result[0]
    sql = "DELETE FROM xonsh_history WHERE tsb < ?"
    result = cursor.execute(sql, (max_tsb,))
    return result.rowcount


def xh_sqlite_append_history(cmd, sessionid, store_stdout, filename=None):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        _xh_sqlite_insert_command(c, cmd, sessionid, store_stdout)
        conn.commit()


def xh_sqlite_get_count(sessionid=None, filename=None):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        return _xh_sqlite_get_count(c, sessionid=sessionid)


def xh_sqlite_items(sessionid=None, filename=None, newest_first=False):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        return _xh_sqlite_get_records(c, sessionid=sessionid, newest_first=newest_first)


def xh_sqlite_delete_items(size_to_keep, filename=None):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        return _xh_sqlite_delete_records(c, size_to_keep)


[docs]class SqliteHistoryGC(threading.Thread): """Shell history garbage collection.""" def __init__(self, wait_for_shell=True, size=None, filename=None, *args, **kwargs): """Thread responsible for garbage collecting old history. May wait for shell (and for xonshrc to have been loaded) to start work. """ super().__init__(*args, **kwargs) self.daemon = True self.filename = filename self.size = size self.wait_for_shell = wait_for_shell self.start()
[docs] def run(self): while self.wait_for_shell: time.sleep(0.01) if self.size is not None: hsize, units = xt.to_history_tuple(self.size) else: envs = builtins.__xonsh__.env hsize, units = envs.get("XONSH_HISTORY_SIZE") if units != "commands": print( "sqlite backed history gc currently only supports " '"commands" as units', file=sys.stderr, ) return if hsize < 0: return xh_sqlite_delete_items(hsize, filename=self.filename)
[docs]class SqliteHistory(History): """Xonsh history backend implemented with sqlite3.""" def __init__(self, gc=True, filename=None, **kwargs): super().__init__(**kwargs) if filename is None: filename = _xh_sqlite_get_file_name() self.filename = filename self.gc = SqliteHistoryGC() if gc else None self._last_hist_inp = None self.inps = [] self.rtns = [] self.outs = [] self.tss = []
[docs] def append(self, cmd): envs = builtins.__xonsh__.env opts = envs.get("HISTCONTROL") inp = cmd["inp"].rstrip() self.inps.append(inp) store_stdout = envs.get("XONSH_STORE_STDOUT", False) if store_stdout: self.outs.append(cmd.get("out")) else: self.outs.append(None) self.rtns.append(cmd["rtn"]) self.tss.append(cmd.get("ts", (None, None))) opts = envs.get("HISTCONTROL") if "ignoredups" in opts and inp == self._last_hist_inp: # Skipping dup cmd return if "ignoreerr" in opts and cmd["rtn"] != 0: # Skipping failed cmd return self._last_hist_inp = inp xh_sqlite_append_history( cmd, str(self.sessionid), store_stdout, filename=self.filename )
[docs] def all_items(self, newest_first=False): """Display all history items.""" for item in xh_sqlite_items(filename=self.filename, newest_first=newest_first): yield {"inp": item[0], "ts": item[1], "rtn": item[2]}
[docs] def items(self, newest_first=False): """Display history items of current session.""" for item in xh_sqlite_items( sessionid=str(self.sessionid), filename=self.filename, newest_first=newest_first, ): yield {"inp": item[0], "ts": item[1], "rtn": item[2]}
[docs] def info(self): data = collections.OrderedDict() data["backend"] = "sqlite" data["sessionid"] = str(self.sessionid) data["filename"] = self.filename data["session items"] = xh_sqlite_get_count( sessionid=self.sessionid, filename=self.filename ) data["all items"] = xh_sqlite_get_count(filename=self.filename) envs = builtins.__xonsh__.env data["gc options"] = envs.get("XONSH_HISTORY_SIZE") return data
[docs] def run_gc(self, size=None, blocking=True): self.gc = SqliteHistoryGC(wait_for_shell=False, size=size) if blocking: while self.gc.is_alive(): continue