"""
logbook.more
~~~~~~~~~~~~
Fancy stuff for logbook.
:copyright: (c) 2010 by Armin Ronacher, Georg Brandl.
:license: BSD, see LICENSE for more details.
"""
import importlib.util
import os
import platform
import re
import warnings
from collections import defaultdict
from functools import partial
from typing import Any
from urllib.parse import parse_qsl, urlencode
from typing_extensions import deprecated
from logbook._termcolors import colorize
from logbook.base import ERROR, NOTICE, NOTSET, RecordDispatcher, dispatch_record
from logbook.handlers import (
Handler,
StderrHandler,
StringFormatter,
StringFormatterHandlerMixin,
)
from logbook.ticketing import BackendBase
try:
import riemann_client.client
import riemann_client.transport
except ImportError:
riemann_client = None
_ws_re = re.compile(r"(\s+)", re.UNICODE)
TWITTER_FORMAT_STRING = "[{record.channel}] {record.level_name}: {record.message}"
TWITTER_ACCESS_TOKEN_URL = "https://twitter.com/oauth/access_token"
NEW_TWEET_URL = "https://api.twitter.com/1/statuses/update.json"
def __getattr__(name: str) -> Any:
if name == "FingersCrossedHandler":
from logbook.handlers import FingersCrossedHandler
warnings.warn(
"FingersCrossedHandler was moved to logbook.handlers",
category=DeprecationWarning,
stacklevel=2,
)
return FingersCrossedHandler
elif name == "DatabaseHandler":
from logbook.ticketing import TicketingHandler
warnings.warn(
"logbook.more.DatabaseHandler is a deprecated alias to "
"logbook.ticketing.TicketingHandler",
category=DeprecationWarning,
stacklevel=2,
)
return TicketingHandler
else:
msg = f"module '{__name__}' has no attribute '{name}'"
raise AttributeError(msg)
class CouchDBBackend(BackendBase):
"""Implements a backend that writes into a CouchDB database."""
def setup_backend(self):
from couchdb import Server
uri = self.options.pop("uri", "")
couch = Server(uri)
db_name = self.options.pop("db")
self.database = couch[db_name]
def record_ticket(self, record, data, hash, app_id):
"""Records a log record as ticket."""
db = self.database
ticket = record.to_dict()
ticket["time"] = ticket["time"].isoformat() + "Z"
ticket_id, _ = db.save(ticket) # noqa: RUF059
db.save(ticket)
@deprecated("TwitterFormatter is deprecated")
class TwitterFormatter(StringFormatter):
"""Works like the standard string formatter and is used by the
:class:`TwitterHandler` unless changed.
.. deprecated:: 1.9
"""
max_length = 140
def format_exception(self, record):
return f"{record.exception_shortname}: {record.exception_message}"
def __call__(self, record, handler):
formatted = StringFormatter.__call__(self, record, handler)
rv = []
length = 0
for piece in _ws_re.split(formatted):
length += len(piece)
if length > self.max_length:
if length - len(piece) < self.max_length:
rv.append("…")
break
rv.append(piece)
return "".join(rv)
[docs]
class TaggingLogger(RecordDispatcher):
"""A logger that attaches a tag to each record. This is an alternative
record dispatcher that does not use levels but tags to keep log
records apart. It is constructed with a descriptive name and at least
one tag. The tags are up for you to define::
logger = TaggingLogger("My Logger", ["info", "warning"])
For each tag defined that way, a method appears on the logger with
that name::
logger.info("This is a info message")
To dispatch to different handlers based on tags you can use the
:class:`TaggingHandler`.
The tags themselves are stored as list named ``'tags'`` in the
:attr:`~logbook.LogRecord.extra` dictionary.
"""
def __init__(self, name=None, tags=None):
RecordDispatcher.__init__(self, name)
# create a method for each tag named
for tag in tags or ():
setattr(self, tag, partial(self.log, tag))
def log(self, tags, msg, *args, **kwargs):
if isinstance(tags, str):
tags = [tags]
exc_info = kwargs.pop("exc_info", None)
extra = kwargs.pop("extra", {})
extra["tags"] = list(tags)
frame_correction = kwargs.pop("frame_correction", 0)
return self.make_record_and_handle(
NOTSET, msg, args, kwargs, exc_info, extra, frame_correction
)
[docs]
class TaggingHandler(Handler):
"""A handler that logs for tags and dispatches based on those.
Example::
import logbook
from logbook.more import TaggingHandler
handler = TaggingHandler(dict(info=OneHandler(), warning=AnotherHandler()))
"""
def __init__(self, handlers, filter=None, bubble=False):
Handler.__init__(self, NOTSET, filter, bubble)
assert isinstance(handlers, dict)
self._handlers = {
tag: [handler] if isinstance(handler, Handler) else handler
for (tag, handler) in handlers.items()
}
[docs]
def emit(self, record):
for tag in record.extra.get("tags", ()):
for handler in self._handlers.get(tag, ()):
handler.handle(record)
[docs]
@deprecated("TwitterHandler is deprecated")
class TwitterHandler(Handler, StringFormatterHandlerMixin):
"""A handler that logs to twitter. Requires that you sign up an
application on twitter and request xauth support. Furthermore the
oauth2 library has to be installed.
.. deprecated:: 1.9
"""
default_format_string = TWITTER_FORMAT_STRING
formatter_class = TwitterFormatter
def __init__(
self,
consumer_key,
consumer_secret,
username,
password,
level=NOTSET,
format_string=None,
filter=None,
bubble=False,
):
Handler.__init__(self, level, filter, bubble)
StringFormatterHandlerMixin.__init__(self, format_string)
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.username = username
self.password = password
try:
import oauth2
except ImportError:
raise RuntimeError(
"The python-oauth2 library is required for the TwitterHandler."
)
self._oauth = oauth2
self._oauth_token = None
self._oauth_token_secret = None
self._consumer = oauth2.Consumer(consumer_key, consumer_secret)
self._client = oauth2.Client(self._consumer)
[docs]
def get_oauth_token(self):
"""Returns the oauth access token."""
if self._oauth_token is None:
resp, content = self._client.request(
TWITTER_ACCESS_TOKEN_URL + "?",
"POST",
body=urlencode(
{
"x_auth_username": self.username.encode("utf-8"),
"x_auth_password": self.password.encode("utf-8"),
"x_auth_mode": "client_auth",
}
),
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if resp["status"] != "200":
raise RuntimeError("unable to login to Twitter")
data = dict(parse_qsl(content))
self._oauth_token = data["oauth_token"]
self._oauth_token_secret = data["oauth_token_secret"]
return self._oauth.Token(self._oauth_token, self._oauth_token_secret)
[docs]
def make_client(self):
"""Creates a new oauth client auth a new access token."""
return self._oauth.Client(self._consumer, self.get_oauth_token())
[docs]
def tweet(self, status):
"""Tweets a given status. Status must not exceed 140 chars."""
client = self.make_client()
resp, content = client.request( # noqa: RUF059
NEW_TWEET_URL,
"POST",
body=urlencode({"status": status.encode("utf-8")}),
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
return resp["status"] == "200"
[docs]
def emit(self, record):
self.tweet(self.format(record))
[docs]
class SlackHandler(Handler, StringFormatterHandlerMixin):
"""A handler that logs to slack. Requires that you sign up an
application on slack and request an api token. Furthermore the
slacker library has to be installed.
"""
def __init__(
self,
api_token,
channel,
level=NOTSET,
format_string=None,
filter=None,
bubble=False,
):
Handler.__init__(self, level, filter, bubble)
StringFormatterHandlerMixin.__init__(self, format_string)
self.api_token = api_token
try:
from slacker import Slacker
except ImportError:
raise RuntimeError("The slacker library is required for the SlackHandler.")
self.channel = channel
self.slack = Slacker(api_token)
[docs]
def emit(self, record):
self.slack.chat.post_message(channel=self.channel, text=self.format(record))
[docs]
class ExternalApplicationHandler(Handler):
"""This handler invokes an external application to send parts of
the log record to. The constructor takes a list of arguments that
are passed to another application where each of the arguments is a
format string, and optionally a format string for data that is
passed to stdin.
For example it can be used to invoke the ``say`` command on OS X::
from logbook.more import ExternalApplicationHandler
say_handler = ExternalApplicationHandler(["say", "{record.message}"])
Note that the above example is blocking until ``say`` finished, so it's
recommended to combine this handler with the
:class:`logbook.ThreadedWrapperHandler` to move the execution into
a background thread.
.. versionadded:: 0.3
"""
def __init__(
self,
arguments,
stdin_format=None,
encoding="utf-8",
level=NOTSET,
filter=None,
bubble=False,
):
Handler.__init__(self, level, filter, bubble)
self.encoding = encoding
self._arguments = list(arguments)
if stdin_format is not None:
stdin_format = stdin_format
self._stdin_format = stdin_format
import subprocess
self._subprocess = subprocess
[docs]
def emit(self, record):
args = [arg.format(record=record) for arg in self._arguments]
if self._stdin_format is not None:
stdin_data = self._stdin_format.format(record=record).encode(self.encoding)
stdin = self._subprocess.PIPE
else:
stdin = None
c = self._subprocess.Popen(args, stdin=stdin)
if stdin is not None:
c.communicate(stdin_data)
c.wait()
[docs]
class ColorizingStreamHandlerMixin:
"""A mixin class that does colorizing.
.. versionadded:: 0.3
.. versionchanged:: 1.0.0
Added Windows support if `colorama`_ is installed.
.. _`colorama`: https://pypi.org/project/colorama
"""
_use_color = None
[docs]
def force_color(self):
"""Force colorizing the stream (`should_colorize` will return True)"""
self._use_color = True
[docs]
def forbid_color(self):
"""Forbid colorizing the stream (`should_colorize` will return False)"""
self._use_color = False
[docs]
def should_colorize(self, record):
"""Returns `True` if colorizing should be applied to this
record. The default implementation returns `True` if the
stream is a tty. If we are executing on Windows, colorama must be
installed.
"""
if os.name == "nt":
if importlib.util.find_spec("colorama") is None:
return False
if self._use_color is not None:
return self._use_color
isatty = getattr(self.stream, "isatty", None)
return isatty and isatty()
[docs]
def get_color(self, record):
"""Returns the color for this record."""
if record.level >= ERROR:
return "red"
elif record.level >= NOTICE:
return "yellow"
return "lightgray"
def format(self, record):
rv = super().format(record)
if self.should_colorize(record):
if color := self.get_color(record):
rv = colorize(color, rv)
return rv
[docs]
class ColorizedStderrHandler(ColorizingStreamHandlerMixin, StderrHandler):
"""A colorizing stream handler that writes to stderr. It will only
colorize if a terminal was detected. Note that this handler does
not colorize on Windows systems.
.. versionadded:: 0.3
.. versionchanged:: 1.0
Added Windows support if `colorama`_ is installed.
.. _`colorama`: https://pypi.org/project/colorama
"""
def __init__(self, *args, **kwargs):
StderrHandler.__init__(self, *args, **kwargs)
# Try import colorama so that we work on Windows. colorama.init is a
# noop on other operating systems.
try:
import colorama
except ImportError:
pass
else:
colorama.init()
[docs]
class ExceptionHandler(Handler, StringFormatterHandlerMixin):
"""An exception handler which raises exceptions of the given `exc_type`.
This is especially useful if you set a specific error `level` e.g. to treat
warnings as exceptions::
from logbook.more import ExceptionHandler
class ApplicationWarning(Exception):
pass
exc_handler = ExceptionHandler(ApplicationWarning, level="WARNING")
.. versionadded:: 0.3
"""
def __init__(
self, exc_type, level=NOTSET, format_string=None, filter=None, bubble=False
):
Handler.__init__(self, level, filter, bubble)
StringFormatterHandlerMixin.__init__(self, format_string)
self.exc_type = exc_type
[docs]
def handle(self, record):
if self.should_handle(record):
raise self.exc_type(self.format(record))
return False
[docs]
class DedupHandler(Handler):
"""A handler that deduplicates log messages.
It emits each unique log record once, along with the number of times it was
emitted.
Example:::
with logbook.more.DedupHandler():
logbook.error("foo")
logbook.error("bar")
logbook.error("foo")
The expected output:
.. code-block:: text
message repeated 2 times: foo
message repeated 1 times: bar
"""
def __init__(
self, format_string="message repeated {count} times: {message}", *args, **kwargs
):
Handler.__init__(self, *args, bubble=False, **kwargs)
self._format_string = format_string
self.clear()
def clear(self):
self._message_to_count = defaultdict(int)
self._unique_ordered_records = []
[docs]
def pop_application(self):
Handler.pop_application(self)
self.flush()
[docs]
def pop_thread(self):
Handler.pop_thread(self)
self.flush()
[docs]
def pop_context(self):
Handler.pop_context(self)
self.flush()
[docs]
def pop_greenlet(self):
Handler.pop_greenlet(self)
self.flush()
[docs]
def handle(self, record):
if record.message not in self._message_to_count:
self._unique_ordered_records.append(record)
self._message_to_count[record.message] += 1
return True
def flush(self):
for record in self._unique_ordered_records:
record.message = self._format_string.format(
message=record.message, count=self._message_to_count[record.message]
)
# record.dispatcher is the logger who created the message,
# it's sometimes supressed (by logbook.info for example)
if record.dispatcher is not None:
dispatch = record.dispatcher.call_handlers
else:
dispatch = dispatch_record
dispatch(record)
self.clear()
class RiemannHandler(Handler):
"""
A handler that sends logs as events to Riemann.
"""
def __init__(
self,
host,
port,
message_type="tcp",
ttl=60,
flush_threshold=10,
bubble=False,
filter=None,
level=NOTSET,
):
"""
:param host: riemann host
:param port: riemann port
:param message_type: selects transport. Currently available 'tcp' and 'udp'
:param ttl: defines time to live in riemann
:param flush_threshold: count of events after which we send to riemann
"""
if riemann_client is None:
raise NotImplementedError(
"The Riemann handler requires the riemann_client package"
) # pragma: no cover
Handler.__init__(self, level, filter, bubble)
self.host = host
self.port = port
self.ttl = ttl
self.queue = []
self.flush_threshold = flush_threshold
if message_type == "tcp":
self.transport = riemann_client.transport.TCPTransport
elif message_type == "udp":
self.transport = riemann_client.transport.UDPTransport
elif message_type == "test":
self.transport = riemann_client.transport.BlankTransport
else:
msg = "Currently supported message types for RiemannHandler are: {}. \
{} is not supported.".format(
",".join(["tcp", "udp", "test"]), message_type
)
raise RuntimeError(msg)
def record_to_event(self, record):
from time import time
tags = ["log", record.level_name]
msg = str(record.exc_info[1]) if record.exc_info else record.msg
channel_name = str(record.channel) if record.channel else "unknown"
if any([record.level_name == keywords for keywords in ["ERROR", "EXCEPTION"]]):
state = "error"
else:
state = "ok"
return {
"metric_f": 1.0,
"tags": tags,
"description": msg,
"time": int(time()),
"ttl": self.ttl,
"host": platform.node(),
"service": f"{channel_name}.{os.getpid()}",
"state": state,
}
def _flush_events(self):
with riemann_client.client.QueuedClient(
self.transport(self.host, self.port)
) as cl:
for event in self.queue:
cl.event(**event)
cl.flush()
self.queue = []
def emit(self, record):
self.queue.append(self.record_to_event(record))
if len(self.queue) == self.flush_threshold:
self._flush_events()