ÿØÿà JFIF ÿþ
ÿÛ C
ÿÛ C ÿÀ ÿÄ ÿÄ " #QrÿÄ ÿÄ & 1! A"2qQaáÿÚ ? Øy,æ/3JæÝ¹Èß²Ø5êXw²±ÉyR¾I0ó2PI¾IÌÚiMö¯þrìN&"KgX:íµnTJnLK
@!-ýùúmë;ºgµ&ó±hw¯Õ@Ü9ñ-ë.²1<yà¹ïQÐUÛ?.¦èûbß±©Ö«Âw*V) `$bØÔëXÖ-ËTÜíGÚ3ð«g §¯JxU/ÂÅv_s(Hÿ @TñJÑãõçn!ÈgfbÓc:él[ðQe9ÀPLbÃãCµm[5¿ç'ªjglåÛí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢S x?"¸¦ùYé¨ÒOÈ q`~~ÚtËU¹CÚêV I1Áß_ÿÙ"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import json
from datetime import datetime, timedelta
from functools import cache
from logging import getLogger
from pathlib import Path
from defence360agent.contracts.config import (
ANTIVIRUS_MODE,
Malware,
MalwareScanSchedule,
SystemConfig,
UserConfig,
effective_user_config,
)
from defence360agent.contracts.config import (
MalwareScanScheduleInterval as Interval,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
)
from defence360agent.subsys.persistent_state import register_lock_file
from defence360agent.utils import recurring_check
from defence360agent.utils.check_lock import check_lock
from defence360agent.utils.common import HOUR
from imav.contracts.messages import MalwareScanQueuePut
from imav.malwarelib.config import (
MalwareScanResourceType,
MalwareScanType,
)
from imav.malwarelib.scan.crontab import get_crontab
from imav.malwarelib.utils import reset_malware_schedule, user_list
from imav.malwarelib.utils.crontab import CronTab
logger = getLogger(__file__)
AVAILABLE_INTERVALS = [
Interval.NONE,
Interval.DAY,
Interval.WEEK,
Interval.MONTH,
]
AVP_INTERVALS = [
Interval.NONE,
Interval.MONTH,
]
NEVER_SCHEDULE = "0 0 31 2 0"
_DEFAULT_LAST_CHECK_FILE = Path("/var/imunify360/last_check_dttm.json")
_DEFAULT_RECURRING_CHECK_INTERVAL = HOUR / 2 # seconds
_DEFAULT_LOCK_FILE_NAME = "schedule_watcher"
# maker created during imav-deploy.sh if AV+ Revisium license
# to prevent SCANNING_SCHEDULE params being reset due to absense of CLN issued license
# it is removed right after imav-deploy.sh is done
REVISIUM_PREMIUM_MARKER = Path("/var/imunify360/premium_revisium_license.flag")
@cache
def _get_local_timezone():
"""Get the local timezone."""
return datetime.now().astimezone().tzinfo
def allowed_schedule_interval():
valid_avp = LicenseCLN.is_valid_av_plus()
revisium_license_exists = REVISIUM_PREMIUM_MARKER.exists()
condition = (not ANTIVIRUS_MODE) or valid_avp or revisium_license_exists
return AVAILABLE_INTERVALS if condition else AVP_INTERVALS
def get_user_schedule_config(
user: str, admin_config: SystemConfig
) -> tuple[str, str, str, str]:
"""
Get schedule configuration for a given user.
Returns a tuple of (interval, hour, day_of_month, day_of_week).
Falls back to system defaults if user config is incomplete or missing.
Args:
user: Username to get schedule configuration for
admin_config: System configuration object
Returns:
Tuple of (interval, hour, day_of_month, day_of_week)
"""
eff = effective_user_config(admin_config, UserConfig(username=user))
schedule_cfg = eff.get("MALWARE_SCAN_SCHEDULE", {})
interval = schedule_cfg.get("interval", MalwareScanSchedule.INTERVAL)
hour = schedule_cfg.get("hour", MalwareScanSchedule.HOUR)
day_of_month = schedule_cfg.get(
"day_of_month", MalwareScanSchedule.DAY_OF_MONTH
)
day_of_week = schedule_cfg.get(
"day_of_week", MalwareScanSchedule.DAY_OF_WEEK
)
return interval, hour, day_of_month, day_of_week
class ScheduleWatcher(MessageSink, MessageSource):
def __init__(
self,
check_file: str | Path = _DEFAULT_LAST_CHECK_FILE,
check_interval: float = 0,
lock_file: str = _DEFAULT_LOCK_FILE_NAME,
):
self._check_file = Path(check_file)
self._check_interval = (
check_interval or _DEFAULT_RECURRING_CHECK_INTERVAL
)
self._lock_file = register_lock_file(lock_file, self.SCOPE)
async def create_sink(self, loop):
pass
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
self._task = loop.create_task(
recurring_check(
check_lock,
check_period_first=True,
check_lock_period=self._check_interval,
lock_file=self._lock_file,
)(self.schedule_scan)()
)
async def shutdown(self):
self._task.cancel()
# CancelledError is handled by @recurring_check():
await self._task
def create_schedule(
self,
interval: str,
hour: str | None = None,
day_of_month: str | None = None,
day_of_week: str | None = None,
) -> str:
if interval == Interval.NONE:
return NEVER_SCHEDULE
elif interval not in AVAILABLE_INTERVALS:
logger.error("Unsupported interval value: %s", interval)
return NEVER_SCHEDULE
elif interval not in (intervals := allowed_schedule_interval()):
logger.info(
"Malware schedule is not in allowed intervals: schedule=%s,"
" allowed=%s",
interval,
intervals,
)
return NEVER_SCHEDULE
hour = hour or "0"
day_of_month = day_of_month or "1"
day_of_week = day_of_week or "0"
if interval == Interval.DAY:
cron_args = hour, "*", "*"
elif interval == Interval.WEEK:
cron_args = hour, "*", day_of_week
else: # interval == Interval.MONTH:
cron_args = hour, day_of_month, "*"
return "0 {} {} * {}".format(*cron_args)
def _read_last_check_dttm(self) -> datetime:
"""Read last check datetime from file, ensuring it's timezone-aware."""
try:
dttm = datetime.fromisoformat(
json.loads(self._check_file.read_text())
)
if dttm.tzinfo is None:
dttm = dttm.replace(tzinfo=_get_local_timezone())
return dttm
except FileNotFoundError:
return datetime.now(_get_local_timezone()) - timedelta(
seconds=self._check_interval
)
def _write_last_check_dttm(self, dttm: datetime) -> None:
"""Write last check datetime to file, ensuring it's timezone-aware."""
if dttm.tzinfo is None:
dttm = dttm.replace(tzinfo=_get_local_timezone())
self._check_file.write_text(json.dumps(dttm.isoformat()))
@staticmethod
def _is_it_time(
schedule: str, now: datetime, last_check: datetime
) -> bool:
if schedule == NEVER_SCHEDULE:
return False
# Remove timezone info for crontab calculation (it works with naive datetimes)
# CronTab calculates in local time
last_check_naive = (
last_check.replace(tzinfo=None)
if last_check.tzinfo
else last_check
)
now_naive = now.replace(tzinfo=None) if now.tzinfo else now
next_run_dttm: datetime | None = CronTab(schedule).next(
last_check_naive, return_datetime=True, default_utc=False
)
return next_run_dttm is not None and next_run_dttm < now_naive
async def schedule_scan(self) -> None:
last_check = self._read_last_check_dttm()
now = datetime.now(_get_local_timezone())
scheduled = await self._schedule_scan(now, last_check)
# Advance last_check checkpoint only when a scan window was actually crossed
# to avoid skipping the scheduled run by writing a timestamp past the due time.
if scheduled:
self._write_last_check_dttm(now)
async def _schedule_scan(
self, now: datetime, last_check: datetime
) -> bool:
if MalwareScanSchedule.INTERVAL not in allowed_schedule_interval():
logger.info("Malware schedule interval is being reset to defaults")
reset_malware_schedule()
users = await user_list.panel_users()
admin_config = SystemConfig()
to_scan: list[str] = []
crontabs_scan: list[str] = []
# Build per-user effective config the same way as "config show" does.
for u in users:
(
interval,
hour,
day_of_month,
day_of_week,
) = get_user_schedule_config(u["user"], admin_config)
schedule = self.create_schedule(
interval, hour, day_of_month, day_of_week
)
if self._is_it_time(schedule, now, last_check):
to_scan.append(u["home"])
if Malware.CRONTABS_SCAN_ENABLED and (
path := get_crontab(u["user"])
):
crontabs_scan.append(path)
if crontabs_scan:
await self.trigger_malware_scan(
crontabs_scan, modes=[MalwareScanResourceType.FILE]
)
if to_scan:
# Remove potential dups
to_scan = list(set(to_scan))
logger.info(
"Trigger scheduled background malware scan for paths: %s",
", ".join(to_scan),
)
await self.trigger_malware_scan(to_scan)
return True
else:
logger.info(
"No paths to scan in scheduled background malware scan."
)
return False
async def trigger_malware_scan(
self,
paths: list[str],
modes: list[MalwareScanResourceType] | None = None,
) -> None:
if not paths:
return
if modes is None:
modes = [MalwareScanResourceType.DB, MalwareScanResourceType.FILE]
# Enqueue DB scan first when applicable
if (
not ANTIVIRUS_MODE
and Malware.DATABASE_SCAN_ENABLED
and MalwareScanResourceType.DB in modes
):
await self._sink.process_message(
MalwareScanQueuePut(
paths=paths,
scan_args={
"resource_type": MalwareScanResourceType.DB,
"scan_type": MalwareScanType.BACKGROUND,
},
)
)
if MalwareScanResourceType.FILE in modes:
await self._sink.process_message(
MalwareScanQueuePut(
paths=paths,
scan_args={
"resource_type": MalwareScanResourceType.FILE,
"scan_type": MalwareScanType.BACKGROUND,
},
)
)