ÿØÿàJFIFÿþ ÿÛC       ÿÛC ÿÀÿÄÿÄ"#QrÿÄÿÄ&1!A"2qQaáÿÚ ?Øy,æ/3JæÝ¹È߲؋5êXw²±ÉyˆR”¾I0ó2—PI¾IÌÚiMö¯–þrìN&"KgX:Šíµ•nTJnLK„…@!‰-ý ùúmë;ºgµŒ&ó±hw’¯Õ@”Ü— 9ñ-ë.²1<yà‚¹ïQÐU„ہ?.’¦èûbß±©Ö«Âw*VŒ) `$‰bØÔŸ’ëXÖ-ËTÜíGÚ3ð«g Ÿ§¯—Jx„–’U/ÂÅv_s(Hÿ@TñJÑãõçn­‚!ÈgfbÓc­:él[ðQe 9ÀPLbÃãCµm[5¿ç'ªjglå‡Ûí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢SŸx?"¸¦ùY騐ÒOÈ q’`~~ÚtËU¹CڒêV  I1Áß_ÿÙ""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import json from datetime import datetime, timedelta from functools import cache from logging import getLogger from pathlib import Path from defence360agent.contracts.config import ( ANTIVIRUS_MODE, Malware, MalwareScanSchedule, SystemConfig, UserConfig, effective_user_config, ) from defence360agent.contracts.config import ( MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.plugins import ( MessageSink, MessageSource, ) from defence360agent.subsys.persistent_state import register_lock_file from defence360agent.utils import recurring_check from defence360agent.utils.check_lock import check_lock from defence360agent.utils.common import HOUR from imav.contracts.messages import MalwareScanQueuePut from imav.malwarelib.config import ( MalwareScanResourceType, MalwareScanType, ) from imav.malwarelib.scan.crontab import get_crontab from imav.malwarelib.utils import reset_malware_schedule, user_list from imav.malwarelib.utils.crontab import CronTab logger = getLogger(__file__) AVAILABLE_INTERVALS = [ Interval.NONE, Interval.DAY, Interval.WEEK, Interval.MONTH, ] AVP_INTERVALS = [ Interval.NONE, Interval.MONTH, ] NEVER_SCHEDULE = "0 0 31 2 0" _DEFAULT_LAST_CHECK_FILE = Path("/var/imunify360/last_check_dttm.json") _DEFAULT_RECURRING_CHECK_INTERVAL = HOUR / 2 # seconds _DEFAULT_LOCK_FILE_NAME = "schedule_watcher" # maker created during imav-deploy.sh if AV+ Revisium license # to prevent SCANNING_SCHEDULE params being reset due to absense of CLN issued license # it is removed right after imav-deploy.sh is done REVISIUM_PREMIUM_MARKER = Path("/var/imunify360/premium_revisium_license.flag") @cache def _get_local_timezone(): """Get the local timezone.""" return datetime.now().astimezone().tzinfo def allowed_schedule_interval(): valid_avp = LicenseCLN.is_valid_av_plus() revisium_license_exists = REVISIUM_PREMIUM_MARKER.exists() condition = (not ANTIVIRUS_MODE) or valid_avp or revisium_license_exists return AVAILABLE_INTERVALS if condition else AVP_INTERVALS def get_user_schedule_config( user: str, admin_config: SystemConfig ) -> tuple[str, str, str, str]: """ Get schedule configuration for a given user. Returns a tuple of (interval, hour, day_of_month, day_of_week). Falls back to system defaults if user config is incomplete or missing. Args: user: Username to get schedule configuration for admin_config: System configuration object Returns: Tuple of (interval, hour, day_of_month, day_of_week) """ eff = effective_user_config(admin_config, UserConfig(username=user)) schedule_cfg = eff.get("MALWARE_SCAN_SCHEDULE", {}) interval = schedule_cfg.get("interval", MalwareScanSchedule.INTERVAL) hour = schedule_cfg.get("hour", MalwareScanSchedule.HOUR) day_of_month = schedule_cfg.get( "day_of_month", MalwareScanSchedule.DAY_OF_MONTH ) day_of_week = schedule_cfg.get( "day_of_week", MalwareScanSchedule.DAY_OF_WEEK ) return interval, hour, day_of_month, day_of_week class ScheduleWatcher(MessageSink, MessageSource): def __init__( self, check_file: str | Path = _DEFAULT_LAST_CHECK_FILE, check_interval: float = 0, lock_file: str = _DEFAULT_LOCK_FILE_NAME, ): self._check_file = Path(check_file) self._check_interval = ( check_interval or _DEFAULT_RECURRING_CHECK_INTERVAL ) self._lock_file = register_lock_file(lock_file, self.SCOPE) async def create_sink(self, loop): pass async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._task = loop.create_task( recurring_check( check_lock, check_period_first=True, check_lock_period=self._check_interval, lock_file=self._lock_file, )(self.schedule_scan)() ) async def shutdown(self): self._task.cancel() # CancelledError is handled by @recurring_check(): await self._task def create_schedule( self, interval: str, hour: str | None = None, day_of_month: str | None = None, day_of_week: str | None = None, ) -> str: if interval == Interval.NONE: return NEVER_SCHEDULE elif interval not in AVAILABLE_INTERVALS: logger.error("Unsupported interval value: %s", interval) return NEVER_SCHEDULE elif interval not in (intervals := allowed_schedule_interval()): logger.info( "Malware schedule is not in allowed intervals: schedule=%s," " allowed=%s", interval, intervals, ) return NEVER_SCHEDULE hour = hour or "0" day_of_month = day_of_month or "1" day_of_week = day_of_week or "0" if interval == Interval.DAY: cron_args = hour, "*", "*" elif interval == Interval.WEEK: cron_args = hour, "*", day_of_week else: # interval == Interval.MONTH: cron_args = hour, day_of_month, "*" return "0 {} {} * {}".format(*cron_args) def _read_last_check_dttm(self) -> datetime: """Read last check datetime from file, ensuring it's timezone-aware.""" try: dttm = datetime.fromisoformat( json.loads(self._check_file.read_text()) ) if dttm.tzinfo is None: dttm = dttm.replace(tzinfo=_get_local_timezone()) return dttm except FileNotFoundError: return datetime.now(_get_local_timezone()) - timedelta( seconds=self._check_interval ) def _write_last_check_dttm(self, dttm: datetime) -> None: """Write last check datetime to file, ensuring it's timezone-aware.""" if dttm.tzinfo is None: dttm = dttm.replace(tzinfo=_get_local_timezone()) self._check_file.write_text(json.dumps(dttm.isoformat())) @staticmethod def _is_it_time( schedule: str, now: datetime, last_check: datetime ) -> bool: if schedule == NEVER_SCHEDULE: return False # Remove timezone info for crontab calculation (it works with naive datetimes) # CronTab calculates in local time last_check_naive = ( last_check.replace(tzinfo=None) if last_check.tzinfo else last_check ) now_naive = now.replace(tzinfo=None) if now.tzinfo else now next_run_dttm: datetime | None = CronTab(schedule).next( last_check_naive, return_datetime=True, default_utc=False ) return next_run_dttm is not None and next_run_dttm < now_naive async def schedule_scan(self) -> None: last_check = self._read_last_check_dttm() now = datetime.now(_get_local_timezone()) scheduled = await self._schedule_scan(now, last_check) # Advance last_check checkpoint only when a scan window was actually crossed # to avoid skipping the scheduled run by writing a timestamp past the due time. if scheduled: self._write_last_check_dttm(now) async def _schedule_scan( self, now: datetime, last_check: datetime ) -> bool: if MalwareScanSchedule.INTERVAL not in allowed_schedule_interval(): logger.info("Malware schedule interval is being reset to defaults") reset_malware_schedule() users = await user_list.panel_users() admin_config = SystemConfig() to_scan: list[str] = [] crontabs_scan: list[str] = [] # Build per-user effective config the same way as "config show" does. for u in users: ( interval, hour, day_of_month, day_of_week, ) = get_user_schedule_config(u["user"], admin_config) schedule = self.create_schedule( interval, hour, day_of_month, day_of_week ) if self._is_it_time(schedule, now, last_check): to_scan.append(u["home"]) if Malware.CRONTABS_SCAN_ENABLED and ( path := get_crontab(u["user"]) ): crontabs_scan.append(path) if crontabs_scan: await self.trigger_malware_scan( crontabs_scan, modes=[MalwareScanResourceType.FILE] ) if to_scan: # Remove potential dups to_scan = list(set(to_scan)) logger.info( "Trigger scheduled background malware scan for paths: %s", ", ".join(to_scan), ) await self.trigger_malware_scan(to_scan) return True else: logger.info( "No paths to scan in scheduled background malware scan." ) return False async def trigger_malware_scan( self, paths: list[str], modes: list[MalwareScanResourceType] | None = None, ) -> None: if not paths: return if modes is None: modes = [MalwareScanResourceType.DB, MalwareScanResourceType.FILE] # Enqueue DB scan first when applicable if ( not ANTIVIRUS_MODE and Malware.DATABASE_SCAN_ENABLED and MalwareScanResourceType.DB in modes ): await self._sink.process_message( MalwareScanQueuePut( paths=paths, scan_args={ "resource_type": MalwareScanResourceType.DB, "scan_type": MalwareScanType.BACKGROUND, }, ) ) if MalwareScanResourceType.FILE in modes: await self._sink.process_message( MalwareScanQueuePut( paths=paths, scan_args={ "resource_type": MalwareScanResourceType.FILE, "scan_type": MalwareScanType.BACKGROUND, }, ) )