#
"""events - Event models for rdial."""
# Copyright © 2011-2019 James Rowe <jnrowe@gmail.com>
# Nathan McGregor <nathan.mcgregor@astrium.eads.net>
#
# SPDX-License-Identifier: GPL-3.0+
#
# This file is part of rdial.
#
# rdial is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# rdial is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# rdial. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import csv
import datetime
import glob
import inspect
import operator
import os
import pickle
from typing import Callable, Dict, Iterable, Iterator, List, Optional, Union
import click
from jnrbase import iso_8601, xdg_basedir
try:
import cduration
except ImportError: # pragma: no cover
cduration = None
from . import utils
[docs]class RdialDialect(csv.unix_dialect): # pylint: disable=too-few-public-methods
"""CSV dialect for rdial data files."""
quoting = csv.QUOTE_MINIMAL
strict = True
[docs]class TaskNotRunningError(utils.RdialError):
"""Exception for calling mutators when a task is not running."""
[docs]class TaskRunningError(utils.RdialError):
"""Exception for starting task when a task is already running."""
[docs]class TaskNotExistError(utils.RdialError):
"""Exception for attempting to operate on a non-existing task."""
[docs]class Event:
"""Base object for handling database event."""
def __init__(self,
__task: str,
start: Optional[Union[datetime.datetime, str]] = None,
delta: Optional[Union[datetime.timedelta, str]] = None,
message: Optional[str] = '') -> None:
"""Initialise a new ``Event`` object.
Args:
__task: Task name to track
start: Start time for event
delta: Duration for event
message: Message to attach to event
"""
self.task = __task
if isinstance(start, datetime.datetime):
if start.tzinfo:
raise ValueError(f'Must be a naive datetime {start!r}')
self.start = start
else:
self.start = iso_8601.parse_datetime(start).replace(tzinfo=None)
if isinstance(delta, datetime.timedelta):
self.delta = delta
else:
if cduration:
if delta:
self.delta = cduration.parse_duration(delta)
else:
self.delta = datetime.timedelta(0)
else:
self.delta = iso_8601.parse_delta(delta)
self.message = message
def __eq__(self, __other: 'Event') -> bool:
"""Comare ``Event`` objects for equality.
Args:
__other: Object to test equality against
Returns:
True if objects are equal
"""
return self.task == __other.task and self.start == __other.start \
and self.delta == __other.delta and self.message == __other.message
def __ne__(self, __other: 'Event') -> bool:
"""Comare ``Event`` objects for inequality.
Args:
__other: Object to test inequality against
Returns:
True if objects are not equal
"""
return not self == __other
def __repr__(self) -> str:
"""Self-documenting string representation.
Returns:
Event representation suitable for :func:`eval`
"""
return 'Event({!r}, {!r}, {!r}, {!r})'.format(
self.task,
iso_8601.format_datetime(self.start) + 'Z',
iso_8601.format_delta(self.delta), self.message)
[docs] def writer(self) -> Dict[str, Optional[str]]:
"""Prepare object for export.
Returns:
Event data for object storage
"""
return {
'start': iso_8601.format_datetime(self.start) + 'Z',
'delta': iso_8601.format_delta(self.delta),
'message': self.message,
}
[docs] def running(self) -> Union[str, bool]:
"""Check if event is running.
Returns:
Event name, if running
"""
if self.delta == datetime.timedelta(0):
return self.task
return False
[docs] def stop(self, message: Optional[str] = None, force: bool = False) -> None:
"""Stop running event.
Args:
message: Message to attach to event
force: Re-stop a previously stopped event
Raises:
TaskNotRunningError: Event not running
"""
if not force and self.delta:
raise TaskNotRunningError('No task running!')
self.delta = datetime.datetime.utcnow() - self.start
self.message = message
FIELDS = list(inspect.signature(Event).parameters.keys())[1:] # NOQA
[docs]class Events(list): # pylint: disable=too-many-public-methods
"""Container for database events."""
def __init__(self,
__iterable: Optional[List[Event]] = None,
backup: bool = True) -> None:
"""Initialise a new ``Events`` object.
Args:
__iterable: Objects to add to container
backup: Whether to create backup files
"""
super(Events, self).__init__(__iterable if __iterable else [])
self.backup = backup
self._dirty = set()
def __repr__(self) -> str:
"""Self-documenting string representation.
Returns:
Events representation suitable for :func:`eval`
"""
return 'Events({})'.format(super(self.__class__, self).__repr__())
@property
def dirty(self) -> Iterable[str]:
"""Modified tasks requiring sync against storage."""
return self._dirty
@dirty.setter
def dirty(self, __value: str):
"""Mark task as needing sync.
Args:
__value: Task to mark as dirty
"""
self._dirty.add(__value)
@dirty.deleter
def dirty(self):
"""Mark dirty queue as flushed."""
self._dirty = set()
[docs] @staticmethod
def read(__directory: str, backup: bool = True,
write_cache: bool = True) -> 'Events':
"""Read and parse database.
.. note::
Assumes a new :obj:`Events` object should be created if the
directory is missing.
Args:
__directory: Location to read database files from
backup: Whether to create backup files
write_cache: Whether to write cache files
Returns:
Parsed events database
"""
if not os.path.exists(__directory):
return Events(backup=backup)
events = []
xdg_cache_dir = xdg_basedir.user_cache('rdial')
cache_dir = os.path.join(xdg_cache_dir, __directory.replace('/', '_'))
if write_cache and not os.path.isdir(cache_dir):
os.makedirs(cache_dir)
with click.open_file(f'{xdg_cache_dir}/CACHEDIR.TAG', 'w') as f:
f.writelines([
'Signature: 8a477f597d28d172789f06886806bc55\n',
'# This file is a cache directory tag created by rdial.\n',
'# For information about cache directory tags, see:\n',
'# http://www.brynosaurus.com/cachedir/\n',
])
for fname in glob.glob(f'{__directory}/*.csv'):
task = os.path.basename(fname)[:-4]
cache_file = os.path.join(cache_dir, task) + '.pkl'
evs = None
if os.path.exists(cache_file) and utils.newer(cache_file, fname):
try:
# UnicodeDecodeError must be caught for the Python 2 to
# 3 upgrade path.
with click.open_file(cache_file, 'rb') as f:
cache = pickle.load(f)
except (pickle.UnpicklingError, EOFError, ImportError,
UnicodeDecodeError):
pass
else:
if isinstance(cache, dict) and cache['version'] == 1:
evs = cache['events']
else:
os.unlink(cache_file)
if evs is None:
with click.open_file(fname, encoding='utf-8') as f:
# We're not using the prettier DictReader here as it is
# *significantly* slower for large data files (~5x).
reader = csv.reader(f, dialect=RdialDialect)
if not next(reader) == FIELDS:
raise ValueError('Invalid data {!r}'.format(
click.format_filename(fname)))
evs = [
Event(task, *row) # pylint: disable=star-args
for row in reader
]
if write_cache:
with click.open_file(cache_file, 'wb', atomic=True) as f:
pickle.dump({
'version': 1,
'events': evs
}, f, pickle.HIGHEST_PROTOCOL)
events.extend(evs)
return Events(sorted(events, key=operator.attrgetter('start')))
[docs] def write(self, __directory: str) -> None:
"""Write database file.
Args:
__directory: Location to write database files to
"""
if not self.dirty:
return
if not os.path.isdir(__directory):
os.makedirs(__directory)
for task in self.dirty:
task_file = f'{__directory}/{task}.csv'
events = self.for_task(task)
with click.utils.LazyFile(task_file, 'w', atomic=True) as temp:
writer = csv.DictWriter(temp, FIELDS, dialect=RdialDialect)
writer.writeheader()
for event in events:
writer.writerow(event.writer())
if self.backup and os.path.exists(task_file):
os.rename(task_file, f'{task_file}~')
del self.dirty
[docs] def tasks(self) -> List[str]:
"""Generate a list of tasks in the database.
Returns:
Names of tasks in database
"""
return sorted({event.task for event in self})
[docs] def last(self) -> Optional[Event]:
"""Return current/last event.
.. note::
This handles the empty database case by returning ``None``.
Returns:
Last recorded event
"""
if len(self) > 0:
return self[-1]
else:
return None
[docs] def running(self) -> Union[str, bool]:
"""Check if an event is running.
We return the running task, if one exists, for easy access.
Returns:
Running event, if an event running
"""
last = self.last()
return last.running() if last else False
[docs] def start(self,
__task: str,
new: bool = False,
start: Union[datetime.datetime, str] = '') -> None:
"""Start a new event.
Args:
__task: Task name to track
new: Whether to create a new task
start: |ISO|-8601 start time for event
Raises:
TaskRunningError: An event is already running
"""
if not new and __task not in self.tasks():
raise TaskNotExistError(
f'Task {__task} does not exist! Use “--new” to create it')
running = self.running()
if running:
raise TaskRunningError(f'Running task {running}!')
last = self.last()
if last and start and last.start + last.delta > start:
raise TaskRunningError('Start date overlaps previous task!')
self.append(Event(__task, start))
self.dirty = __task
[docs] def stop(self, message: Optional[str] = None, force: bool = False) -> None:
"""Stop running event.
Args:
message: Message to attach to event
force: Re-stop a previously stopped event
Raises:
TaskNotRunningError: No task running!
"""
if not force and not self.running():
raise TaskNotRunningError('No task running!')
self.last().stop(message, force)
self.dirty = self.last().task
[docs] def filter(self, __filt: Callable[[
Event,
], bool]) -> 'Events':
"""Apply filter to events.
Args:
__filt: Function to filter with
Returns:
Events matching given filter function
"""
return Events(x for x in self if __filt(x))
[docs] def for_task(self, __task: str) -> 'Events':
"""Filter events for a specific task.
Args:
__task: Task name to filter on
Returns:
Events marked with given task name
"""
return self.filter(lambda x: x.task == __task)
[docs] def for_date(self,
year: int,
month: Optional[int] = None,
day: Optional[int] = None) -> 'Events':
"""Filter events for a specific date.
Args:
year: Year to filter on
month: Month to filter on
day: Day to filter on
Returns:
Events occurring within specified date
"""
events = self.filter(lambda x: x.start.year == year)
if month:
events = events.filter(lambda x: x.start.month == month)
if day:
events = events.filter(lambda x: x.start.day == day)
return events
[docs] def for_week(self, __year: int, __week: int) -> 'Events':
"""Filter events for a specific |ISO|-8601 week.
Args:
__year: Year to filter events on
__week: |ISO|-8601 week number to filter events on
Returns:
Events occurring in given |ISO|-8601 week
"""
start, end = utils.iso_week_to_date(__year, __week)
return self.filter(lambda x: start <= x.start.date() < end)
[docs] def sum(self) -> datetime.timedelta:
"""Sum duration of all events.
Returns:
Sum of all event deltas
"""
return sum((x.delta for x in self), datetime.timedelta(0))
[docs] @staticmethod
@contextlib.contextmanager
def wrapping(__directory: str,
backup: bool = True,
write_cache: bool = True) -> Iterator['Events']:
"""Convenience context handler to manage reading and writing database.
Args:
__directory: Database location
backup: Whether to create backup files
write_cache: Whether to write cache files
"""
events = Events.read(__directory, backup, write_cache)
yield events
if events.dirty:
events.write(__directory)