ÿØÿà JFIF ÿþ
ÿÛ C
ÿÛ C ÿÀ ÿÄ ÿÄ " #QrÿÄ ÿÄ & 1! A"2qQaáÿÚ ? Øy,æ/3JæÝ¹Èß²Ø5êXw²±ÉyR¾I0ó2PI¾IÌÚiMö¯þrìN&"KgX:íµnTJnLK
@!-ýùúmë;ºgµ&ó±hw¯Õ@Ü9ñ-ë.²1<yà¹ïQÐUÛ?.¦èûbß±©Ö«Âw*V) `$bØÔëXÖ-ËTÜíGÚ3ð«g §¯JxU/ÂÅv_s(Hÿ @TñJÑãõçn!ÈgfbÓc:él[ðQe9ÀPLbÃãCµm[5¿ç'ªjglåÛí_§Úõl-;"PkÞÞÁQâ¼_Ñ^¢S x?"¸¦ùYé¨ÒOÈ q`~~ÚtËU¹CÚêV I1Áß_ÿÙ"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import logging
import time
from contextlib import suppress
from pathlib import Path
from defence360agent.contracts.config import Core
from defence360agent.contracts.license import LicenseCLN
from defence360agent.contracts.plugins import MessageSource
from defence360agent.internals.the_sink import TheSink
from defence360agent.utils import recurring_check
from imav.api.cleanup_revert import CleanupRevertAPI
from imav.malwarelib.cleanup.storage import restore_hits
from imav.malwarelib.cleanup.types import CleanupRevertInitiator
from imav.malwarelib.config import MalwareHitStatus
from imav.malwarelib.model import MalwareHit
PERIOD = 3 * 60 * 60 # 3 hours
logger = logging.getLogger(__name__)
class CleanupRevertPlugin(MessageSource):
initiator = CleanupRevertInitiator.IMUNIFY
_sink: TheSink | None = None
_task = None
def __init__(self):
self._cleanup_revert_flag = Path(Core.TMPDIR) / "cleanup_revert"
async def create_source(self, loop, sink: TheSink) -> None:
self._sink = sink
self._task = loop.create_task(
recurring_check(PERIOD)(self.process_cleanup_revert)()
)
async def shutdown(self):
if self._task:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def process_cleanup_revert(self):
if (
self._cleanup_revert_flag.exists()
and (self._cleanup_revert_flag.stat().st_mtime + PERIOD)
> time.time()
):
logger.info(
"CleanupRevertPlugin process_cleanup_revert skipped due"
" _cleanup_revert_flag exists"
)
return
if LicenseCLN.is_free():
logger.info(
"CleanupRevertPlugin process_cleanup_revert skipped due to"
" free license"
)
return
files, dbs = await CleanupRevertAPI.paths()
logger.info(
"CleanupRevertPlugin process_cleanup_revert files %s", files
)
logger.info("CleanupRevertPlugin process_cleanup_revert dbs %s", dbs)
if files:
if hits := list(
MalwareHit.get_hits(
files, statuses=MalwareHitStatus.RESTORABLE
)
):
await restore_hits(hits, self._sink, self.initiator)
if dbs:
hits: list[MalwareHit]
if hits := list(MalwareHit.get_db_hits_for_remote_revert(dbs)):
await restore_hits(hits, self._sink, self.initiator)
self._cleanup_revert_flag.touch(mode=0o644, exist_ok=True)