| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375 |
- # Fix incorrect adopt implementation with dry-run support
- def adopt(config, snapshot, dry_run=False, simulate=False):
- try:
- current_root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume'])
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot)
- if not os.path.exists(snapshot_path) and not (dry_run or simulate):
- logger.error(f"Snapshot '{snapshot}' not found at {snapshot_path}")
- print(f"Error: Snapshot '{snapshot}' not found.")
- return False
- # Check if we're on a live system (not booted into the snapshot)
- if not (dry_run or simulate):
- with open("/proc/cmdline", "r") as f:
- cmdline = f.read()
- if snapshot in cmdline:
- logger.warning(f"You are already booted into snapshot {snapshot}.")
- print(f"Warning: You are already booted into snapshot {snapshot}. No changes needed.")
- return False
- # Create a backup of current root first before making changes
- backup_name = f"@_backup_{datetime.now().strftime('%Y%m%d-%H%M%S')}"
- backup_path = os.path.join(config['snapshot']['btrfs_mount_point'], backup_name)
-
- print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Creating backup of current root subvolume as {backup_name}")
- logger.info(f"Creating backup of current root at {backup_path}")
-
- # Create a snapshot of the current root
- cmd_backup = ["btrfs", "subvolume", "snapshot", current_root_path, backup_path]
- returncode, stdout, stderr = safe_execute(
- cmd_backup,
- dry_run=dry_run,
- simulate=simulate,
- description="Creating backup of current root"
- )
-
- if returncode != 0 and not (dry_run or simulate):
- logger.error(f"Failed to create backup: {stderr}")
- print(f"Error: Failed to create backup: {stderr}")
- return False
- # Delete current root subvolume
- print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Deleting current root subvolume")
- logger.info(f"Deleting current root subvolume at {current_root_path}")
- cmd_delete = ["btrfs", "subvolume", "delete", current_root_path]
- returncode, stdout, stderr = safe_execute(
- cmd_delete,
- dry_run=dry_run,
- simulate=simulate,
- description="Deleting current root subvolume"
- )
-
- if returncode != 0 and not (dry_run or simulate):
- logger.error(f"Failed to delete current root: {stderr}")
- print(f"Error: Failed to delete current root: {stderr}")
- return False
- # Create a new snapshot of the selected snapshot as the new root subvolume
- print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Creating new root subvolume from snapshot '{snapshot}'")
- logger.info(f"Creating new root from {snapshot_path} to {current_root_path}")
- cmd_new_root = ["btrfs", "subvolume", "snapshot", snapshot_path, current_root_path]
- returncode, stdout, stderr = safe_execute(
- cmd_new_root,
- dry_run=dry_run,
- simulate=simulate,
- description=f"Creating new root subvolume from snapshot '{snapshot# Safe command execution with dry-run support
- def safe_execute(cmd, dry_run=False, simulate=False, description=None):
- """
- Execute a command safely with dry-run and simulation support.
-
- Args:
- cmd: Command list to execute
- dry_run: If True, just print the command without executing
- simulate: If True, log as if executed but don't actually execute
- description: Optional description of what the command does
-
- Returns:
- A tuple of (return_code, stdout, stderr) or (0, '', '') if dry_run/simulate
- """
- cmd_str = ' '.join(cmd)
-
- if description:
- logger.info(f"{description}: {cmd_str}")
- if dry_run or simulate:
- print(f"WOULD EXECUTE: {description}")
- print(f" $ {cmd_str}")
- else:
- logger.info(f"Would execute: {cmd_str}")
- if dry_run or simulate:
- print(f"WOULD EXECUTE: {cmd_str}")
-
- if dry_run or simulate:
- # Return success for dry run or simulate
- return 0, '', ''
- else:
- # Actually execute the command
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- return result.returncode, result.stdout.decode(), result.stderr.decode()
- # Safe file operations with dry-run support
- def safe_write_file(file_path, content, dry_run=False, simulate=False, description=None):
- """
- Write to a file with dry-run and simulation support.
-
- Args:
- file_path: Path to the file
- content: Content to write
- dry_run: If True, just print what would be written
- simulate: If True, log as if written but don't actually write
- description: Optional description of the write operation
- """
- if description:
- logger.info(f"{description}: {file_path}")
- if dry_run or simulate:
- print(f"WOULD WRITE: {description}")
- print(f" To file: {file_path}")
- print(f" Content length: {len(content)} bytes")
- if len(content) < 1000:
- print(f" Content preview:\n{'-'*40}\n{content[:500]}\n{'-'*40}")
- else:
- print(f" Content preview (first 500 bytes):\n{'-'*40}\n{content[:500]}...\n{'-'*40}")
- else:
- logger.info(f"Would write to: {file_path}")
- if dry_run or simulate:
- print(f"WOULD WRITE TO: {file_path}")
-
- if not (dry_run or simulate):
- # Create directory if it doesn't exist
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
- # Actually write the file
- with open(file_path, 'w') as f:
- f.write(content)
-
- # Apply test directory to paths if specified
- def adjust_path(config, path, test_dir=None):
- """
- Adjust a path with a test directory if specified.
-
- Args:
- config: Configuration dictionary
- path: Original path
- test_dir: Test directory to prepend (if not None)
-
- Returns:
- Adjusted path
- """
- if test_dir is None:
- return path
-
- # For absolute paths, replace with test directory
- if path.startswith('/'):
- return os.path.join(test_dir, path[1:])
- return path#!/usr/bin/env python3
- import os
- import sys
- import subprocess
- import toml
- import argparse
- import logging
- from datetime import datetime
- import shutil
- # Path to the configuration file
- CONFIG_PATH = "/etc/snap-slack/config.toml"
- LOG_PATH = "/var/log/snap-slack.log"
- # Set up logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(LOG_PATH),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger('snap-slack')
- # Read configuration from a TOML file with test dir support
- def read_config(file_path=CONFIG_PATH, test_dir=None):
- if not os.path.exists(file_path):
- logger.error(f"Configuration file not found: {file_path}")
- raise FileNotFoundError(f"Configuration file not found: {file_path}")
-
- try:
- with open(file_path, 'r') as f:
- config = toml.load(f)
-
- # Validate essential config entries
- required_fields = {
- 'snapshot': ['snapshot_dir', 'snapshot_prefix', 'btrfs_mount_point', 'root_subvolume', 'retention_days'],
- }
-
- if config.get('bootloader', {}).get('type', 'elilo').lower() == 'elilo':
- required_fields['elilo'] = ['elilo_conf', 'root_partition']
- else:
- required_fields['grub'] = ['config_file', 'root_partition']
-
- for section, fields in required_fields.items():
- if section not in config:
- raise ValueError(f"Missing required section '{section}' in config file")
-
- for field in fields:
- if field not in config[section]:
- raise ValueError(f"Missing required field '{field}' in section '{section}'")
-
- # If test_dir is specified, adjust paths in the config
- if test_dir:
- logger.info(f"Using test directory: {test_dir}")
- print(f"Using test directory: {test_dir}")
-
- # Adjust snapshot paths
- config['snapshot']['snapshot_dir'] = adjust_path(config, config['snapshot']['snapshot_dir'], test_dir)
- config['snapshot']['btrfs_mount_point'] = adjust_path(config, config['snapshot']['btrfs_mount_point'], test_dir)
-
- # Adjust bootloader paths
- if 'elilo' in config:
- config['elilo']['elilo_conf'] = adjust_path(config, config['elilo']['elilo_conf'], test_dir)
-
- if 'grub' in config:
- if 'config_file' in config['grub']:
- config['grub']['config_file'] = adjust_path(config, config['grub']['config_file'], test_dir)
- if 'custom_entries_file' in config['grub']:
- config['grub']['custom_entries_file'] = adjust_path(config, config['grub']['custom_entries_file'], test_dir)
-
- # Adjust boot paths
- if 'boot' in config and 'boot_dir' in config['boot']:
- config['boot']['boot_dir'] = adjust_path(config, config['boot']['boot_dir'], test_dir)
-
- # Add test_dir to config for reference
- config['_test_dir'] = test_dir
-
- return config
- except Exception as e:
- logger.error(f"Error reading config: {str(e)}")
- raise
- # Create a new snapshot with the current date and time
- def create_snapshot(config, dry_run=False, simulate=False, description=None):
- try:
- date_str = datetime.now().strftime("%Y%m%d-%H%M%S")
-
- # Add description to snapshot name if provided
- if description:
- # Clean up description for use in filename
- clean_desc = description.replace(" ", "-")
- clean_desc = re.sub(r'[^a-zA-Z0-9\-_]', '', clean_desc)
- snapshot_name = f"{config['snapshot']['snapshot_prefix']}{date_str}-{clean_desc}"
- else:
- snapshot_name = f"{config['snapshot']['snapshot_prefix']}{date_str}"
-
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot_name)
-
- # Ensure snapshot directory exists
- if not (dry_run or simulate):
- os.makedirs(config['snapshot']['snapshot_dir'], exist_ok=True)
- # Run the Btrfs snapshot command
- root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume'])
- if not os.path.exists(root_path) and not (dry_run or simulate):
- logger.error(f"Root subvolume path does not exist: {root_path}")
- return None
- # Check if encryption is enabled
- is_encrypted = config.get('encryption', {}).get('enabled', False)
-
- # If hibernation is enabled, we need a read-write snapshot
- # otherwise, create a read-only snapshot
- hibernation_enabled = config.get('hibernation', {}).get('enabled', False)
- snapshot_readonly = not hibernation_enabled
-
- # Command for creating snapshot
- cmd = ["btrfs", "subvolume", "snapshot"]
- if snapshot_readonly:
- cmd.append("-r") # read-only flag
- cmd.extend([root_path, snapshot_path])
-
- # Execute the command with safety measures
- returncode, stdout, stderr = safe_execute(
- cmd,
- dry_run=dry_run,
- simulate=simulate,
- description=f"Creating {'read-only' if snapshot_readonly else 'read-write'} snapshot{' with description: ' + description if description else ''}"
- )
- if returncode == 0:
- logger.info(f"Created snapshot: {snapshot_name}")
- print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Created snapshot: {snapshot_name}")
-
- # For encrypted systems with hibernation, update the initramfs
- if is_encrypted and hibernation_enabled:
- update_initramfs_for_encryption(config, snapshot_name, snapshot_path, dry_run=dry_run, simulate=simulate)
-
- add_boot_entry(config, snapshot_name, dry_run=dry_run, simulate=simulate)
- return snapshot_name
- else:
- error_msg = stderr
- logger.error(f"Failed to create snapshot: {error_msg}")
- print(f"Failed to create snapshot: {error_msg}")
- return None
- except Exception as e:
- logger.error(f"Error creating snapshot: {str(e)}")
- print(f"Error creating snapshot: {str(e)}")
- return None
- # Update initramfs for encrypted volume with hibernation
- def update_initramfs_for_encryption(config, snapshot_name, snapshot_path):
- try:
- logger.info(f"Updating initramfs for encrypted system with hibernation support")
- print(f"Updating initramfs for encrypted system with hibernation support...")
-
- # Get initramfs generation tool from config
- initramfs_tool = config.get('encryption', {}).get('initramfs_tool', 'mkinitrd')
-
- # Get location to put the new initramfs file
- initramfs_dir = config.get('encryption', {}).get('initramfs_dir', '/boot')
-
- # New initramfs filename with snapshot identifier
- initramfs_file = f"initrd-{snapshot_name}.gz"
- initramfs_path = os.path.join(initramfs_dir, initramfs_file)
-
- # Get kernel version
- kernel_version = get_current_kernel_version(config)
- if not kernel_version:
- logger.error("Failed to determine kernel version")
- raise Exception("Failed to determine kernel version")
-
- # Get LUKS UUID for the root device
- luks_uuid = get_luks_uuid(config)
- if not luks_uuid:
- logger.error("Failed to determine LUKS UUID")
- raise Exception("Failed to determine LUKS UUID")
-
- # Generate initramfs command based on tool
- if initramfs_tool == "mkinitrd":
- # For mkinitrd (Slackware's default)
- resume_device = config.get('hibernation', {}).get('resume_device', '')
- luks_name = config.get('encryption', {}).get('luks_name', 'cryptroot')
-
- # Create mkinitrd config file for this snapshot
- mkinitrd_conf = f"/etc/mkinitrd.conf.{snapshot_name}"
-
- with open("/etc/mkinitrd.conf", "r") as f:
- mkinitrd_content = f.read()
-
- # Update paths for the snapshot
- mkinitrd_content = mkinitrd_content.replace(
- f"ROOT_DEVICE=",
- f"ROOT_DEVICE=/dev/mapper/{luks_name}\n# Original: ")
-
- # Add the LUKS options if not present
- if "LUKSDEV=" not in mkinitrd_content:
- mkinitrd_content += f"\n# Added by snap-slack\nLUKSDEV=UUID={luks_uuid}\n"
-
- # Add the resume device if not present and hibernation is enabled
- if resume_device and "RESUMEDEV=" not in mkinitrd_content:
- mkinitrd_content += f"RESUMEDEV={resume_device}\n"
-
- # Add rootflags for the snapshot
- if "ROOTFLAGS=" not in mkinitrd_content:
- mkinitrd_content += f"ROOTFLAGS=subvol={config['snapshot']['snapshot_dir']}/{snapshot_name}\n"
- else:
- # Replace existing rootflags
- mkinitrd_content = re.sub(
- r'ROOTFLAGS=.*',
- f'ROOTFLAGS=subvol={config["snapshot"]["snapshot_dir"]}/{snapshot_name}',
- mkinitrd_content)
-
- # Write the custom config
- with open(mkinitrd_conf, "w") as f:
- f.write(mkinitrd_content)
-
- # Build the mkinitrd command
- cmd = [
- "mkinitrd",
- "-F", # force
- "-c", mkinitrd_conf, # use custom config
- "-o", initramfs_path,
- "-k", kernel_version
- ]
-
- elif initramfs_tool == "dracut":
- # For dracut
- resume_device = config.get('hibernation', {}).get('resume_device', '')
-
- cmd = [
- "dracut",
- "--force", # force
- "--kver", kernel_version, # kernel version
- "-f", initramfs_path, # output file
- ]
-
- # Add LUKS and resume parameters
- dracut_modules = "crypt"
- if resume_device:
- dracut_modules += " resume"
-
- cmd.extend([
- "--add", dracut_modules,
- "--persistent-policy", "by-uuid"
- ])
-
- # Add kernel command line parameters
- kernel_cmdline = f"rd.luks.uuid={luks_uuid} root=/dev/mapper/cryptroot rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot_name}"
- if resume_device:
- kernel_cmdline += f" resume={resume_device}"
-
- cmd.extend(["--kernel-cmdline", kernel_cmdline])
-
- else:
- logger.error(f"Unsupported initramfs generation tool: {initramfs_tool}")
- raise Exception(f"Unsupported initramfs generation tool: {initramfs_tool}")
-
- # Execute the command
- logger.info(f"Running command: {' '.join(cmd)}")
- print(f"Generating custom initramfs for encrypted system...")
-
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode == 0:
- logger.info(f"Successfully created custom initramfs at {initramfs_path}")
- print(f"Successfully created custom initramfs at {initramfs_path}")
-
- # Update the config with the custom initramfs location
- config['encryption']['custom_initramfs'] = initramfs_file
- return True
- else:
- error_msg = result.stderr.decode()
- logger.error(f"Failed to create custom initramfs: {error_msg}")
- print(f"Failed to create custom initramfs: {error_msg}")
- return False
-
- except Exception as e:
- logger.error(f"Error updating initramfs: {str(e)}")
- print(f"Error updating initramfs: {str(e)}")
- return False
- # Get LUKS UUID for encrypted device
- def get_luks_uuid(config):
- try:
- # Try to get UUID from config first
- uuid = config.get('encryption', {}).get('luks_uuid', '')
- if uuid:
- return uuid
-
- # If not in config, try to determine it
- luks_device = config.get('encryption', {}).get('luks_device', '')
- if not luks_device:
- # Try to find it automatically
- cmd = ["blkid", "-t", "TYPE=crypto_LUKS", "-o", "device"]
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode == 0:
- devices = result.stdout.decode().strip().split('\n')
- if devices and devices[0]:
- luks_device = devices[0]
-
- if luks_device:
- # Get UUID of the LUKS device
- cmd = ["blkid", "-s", "UUID", "-o", "value", luks_device]
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode == 0:
- uuid = result.stdout.decode().strip()
- return uuid
-
- return None
- except Exception as e:
- logger.error(f"Error getting LUKS UUID: {str(e)}")
- return None
- # Get current kernel version
- def get_current_kernel_version(config):
- try:
- # Try to get from config first
- kernel_version = config.get('boot', {}).get('kernel_version', '')
- if kernel_version:
- return kernel_version
-
- # Try to determine from running system
- if os.path.exists("/proc/version"):
- with open("/proc/version", "r") as f:
- version_info = f.read()
- match = re.search(r'Linux version ([0-9.-]+\S+)', version_info)
- if match:
- return match.group(1)
-
- # Fallback to uname
- result = subprocess.run(["uname", "-r"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if result.returncode == 0:
- return result.stdout.decode().strip()
-
- return None
- except Exception as e:
- logger.error(f"Error determining kernel version: {str(e)}")
- return None
- # Add a boot entry for a snapshot
- def add_boot_entry(config, snapshot, dry_run=False, simulate=False):
- try:
- # Determine the bootloader type from config
- bootloader = config.get('bootloader', {}).get('type', 'elilo').lower()
-
- if bootloader == 'grub':
- return add_grub_entry(config, snapshot, dry_run=dry_run, simulate=simulate)
- else:
- return add_elilo_entry(config, snapshot, dry_run=dry_run, simulate=simulate)
- except Exception as e:
- logger.error(f"Error adding boot entry: {str(e)}")
- print(f"Error adding boot entry: {str(e)}")
- return False
- # Add an entry for the snapshot in elilo.conf
- def add_elilo_entry(config, snapshot, dry_run=False, simulate=False):
- try:
- # Get the extra boot options from the config, if any
- extra_boot_options = config.get('elilo', {}).get('extra_boot_options', '')
- # Check if encryption is enabled
- is_encrypted = config.get('encryption', {}).get('enabled', False)
- hibernation_enabled = config.get('hibernation', {}).get('enabled', False)
-
- # Determine initrd file to use
- initrd_file = "initrd.gz" # Default
-
- if is_encrypted and hibernation_enabled:
- # Use the custom initramfs for encrypted systems with hibernation
- custom_initramfs = config.get('encryption', {}).get('custom_initramfs', '')
- if custom_initramfs:
- initrd_file = custom_initramfs
- # Construct the boot entry with optional extra boot options
- entry = f"""
- image=vmlinuz
- label={snapshot}
- root={config['elilo']['root_partition']}
- """
-
- # Add proper boot parameters based on encryption status
- if is_encrypted:
- luks_uuid = get_luks_uuid(config)
- luks_name = config.get('encryption', {}).get('luks_name', 'cryptroot')
-
- # For encrypted systems
- entry += f" append=\"rd.luks.uuid={luks_uuid} root=/dev/mapper/{luks_name} rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro"
-
- # Add resume device for hibernation if enabled
- if hibernation_enabled:
- resume_device = config.get('hibernation', {}).get('resume_device', '')
- if resume_device:
- entry += f" resume={resume_device}"
-
- # Add any extra boot options
- if extra_boot_options:
- entry += f" {extra_boot_options}"
-
- entry += "\"\n"
- else:
- # For non-encrypted systems
- entry += f" append=\"rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro {extra_boot_options}\"\n"
-
- entry += " read-only\n"
- entry += f" initrd={initrd_file}\n"
- # Append to elilo.conf if the entry doesn't already exist
- elilo_conf = config['elilo']['elilo_conf']
-
- # Create the file if it doesn't exist
- if not os.path.exists(elilo_conf) and not (dry_run or simulate):
- # Ensure elilo.conf directory exists
- os.makedirs(os.path.dirname(elilo_conf), exist_ok=True)
- with open(elilo_conf, 'w') as f:
- f.write("# ELILO Configuration File\n")
-
- # Check if the entry already exists (in simulation or dry-run mode, assume it doesn't)
- entry_exists = False
- if not (dry_run or simulate) and os.path.exists(elilo_conf):
- with open(elilo_conf, 'r') as f:
- content = f.read()
- if snapshot in content:
- entry_exists = True
- logger.info(f"Entry for snapshot {snapshot} already exists in elilo.conf")
- print(f"Entry for snapshot {snapshot} already exists in elilo.conf")
- return False
-
- if not entry_exists:
- # Add the entry using safe file operations
- description = f"Adding entry for snapshot {snapshot} to elilo.conf"
-
- if not (dry_run or simulate):
- with open(elilo_conf, 'a+') as f:
- f.write(entry)
-
- logger.info(description)
- if dry_run or simulate:
- print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}{description}")
- print(f"WOULD APPEND TO {elilo_conf}:")
- print(f"{'-'*40}\n{entry}\n{'-'*40}")
- else:
- print(f"Added entry for snapshot: {snapshot}")
-
- return True
- except Exception as e:
- logger.error(f"Error adding elilo entry: {str(e)}")
- print(f"Error adding elilo entry: {str(e)}")
- return False
- # Add an entry for the snapshot in grub.cfg or custom config
- def add_grub_entry(config, snapshot):
- try:
- # Get boot options
- extra_boot_options = config.get('grub', {}).get('extra_boot_options', '')
-
- # Get GRUB configuration details
- grub_custom_file = config.get('grub', {}).get('custom_entries_file', '/etc/grub.d/60_snap-slack')
- kernel_path = config.get('grub', {}).get('kernel_path', '/boot/vmlinuz')
- initrd_path = config.get('grub', {}).get('initrd_path', '/boot/initrd.gz')
- root_partition = config.get('grub', {}).get('root_partition', '/dev/sdaX')
-
- # Create the GRUB menu entry
- grub_entry = f"""
- # Entry for snapshot {snapshot}
- menuentry "Slackware - {snapshot}" {{
- linux {kernel_path} root={root_partition} rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro {extra_boot_options}
- initrd {initrd_path}
- }}
- """
-
- # Determine if we're using a custom file or directly modifying grub.cfg
- if config.get('grub', {}).get('use_custom_file', True):
- # Using a custom file in /etc/grub.d/
- # Check if directory exists
- os.makedirs(os.path.dirname(grub_custom_file), exist_ok=True)
-
- # Create or update the custom file
- snapshot_entries = {}
- if os.path.exists(grub_custom_file):
- with open(grub_custom_file, 'r') as f:
- content = f.read()
-
- # Parse existing entries
- import re
- entries = re.findall(r'menuentry "Slackware - (snapshot-[^"]+)"', content)
- for entry in entries:
- snapshot_entries[entry] = True
-
- # If the snapshot is not already in the file
- if snapshot not in snapshot_entries:
- with open(grub_custom_file, 'a+') as f:
- if os.path.getsize(grub_custom_file) == 0:
- f.write("#!/bin/sh\n")
- f.write("exec tail -n +3 $0\n\n")
- f.write("# This file was generated by snap-slack\n\n")
- f.write("cat << EOF\n")
- f.write(grub_entry)
- if not content.endswith("EOF\n"):
- f.write("EOF\n")
-
- # Make the file executable
- os.chmod(grub_custom_file, 0o755)
-
- # Update GRUB configuration
- logger.info(f"Updating GRUB configuration")
- if os.path.exists("/usr/sbin/update-grub"):
- cmd = ["update-grub"]
- else:
- cmd = ["grub-mkconfig", "-o", config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')]
-
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if result.returncode != 0:
- logger.warning(f"GRUB update command failed: {result.stderr.decode()}")
- logger.warning("You may need to manually update your GRUB configuration.")
- print("Warning: GRUB configuration update failed. You may need to run 'update-grub' or 'grub-mkconfig' manually.")
-
- logger.info(f"Added GRUB entry for snapshot: {snapshot}")
- return True
- else:
- logger.info(f"GRUB entry for snapshot {snapshot} already exists")
- return False
- else:
- # Directly modify grub.cfg (not recommended but supported)
- grub_cfg = config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')
-
- if not os.path.exists(grub_cfg):
- logger.warning(f"GRUB config file not found at {grub_cfg}")
- return False
-
- with open(grub_cfg, 'r') as f:
- content = f.read()
-
- if f"Slackware - {snapshot}" not in content:
- # Find the last menuentry
- last_entry_pos = content.rfind("menuentry ")
- if last_entry_pos == -1:
- # No menuentry found, append to end
- with open(grub_cfg, 'a') as f:
- f.write(grub_entry)
- else:
- # Find the end of the last menuentry
- bracket_count = 0
- for i in range(last_entry_pos, len(content)):
- if content[i] == '{':
- bracket_count += 1
- elif content[i] == '}':
- bracket_count -= 1
- if bracket_count == 0:
- # Insert after the last menuentry
- new_content = content[:i+1] + grub_entry + content[i+1:]
- with open(grub_cfg, 'w') as f:
- f.write(new_content)
- break
-
- logger.info(f"Added GRUB entry for snapshot: {snapshot}")
- return True
- else:
- logger.info(f"GRUB entry for snapshot {snapshot} already exists")
- return False
- except Exception as e:
- logger.error(f"Error adding GRUB entry: {str(e)}")
- raise
- # Remove a boot entry for a deleted snapshot
- def remove_boot_entry(config, snapshot):
- try:
- # Determine the bootloader type from config
- bootloader = config.get('bootloader', {}).get('type', 'elilo').lower()
-
- if bootloader == 'grub':
- return remove_grub_entry(config, snapshot)
- else:
- return remove_elilo_entry(config, snapshot)
- except Exception as e:
- logger.error(f"Error removing boot entry: {str(e)}")
- raise
- # Remove an entry for a deleted snapshot from elilo.conf
- def remove_elilo_entry(config, snapshot):
- try:
- elilo_conf = config['elilo']['elilo_conf']
- if not os.path.exists(elilo_conf):
- logger.warning(f"elilo.conf not found at {elilo_conf}")
- return
-
- with open(elilo_conf, 'r') as f:
- lines = f.readlines()
- found = False
- with open(elilo_conf, 'w') as f:
- for line in lines:
- if f"label={snapshot}" in line:
- found = True
- continue
- if found and (line.strip() == "" or line.startswith("image=")):
- found = False
- if not found:
- f.write(line)
- logger.info(f"Removed entry for snapshot: {snapshot}")
- except Exception as e:
- logger.error(f"Error removing elilo entry: {str(e)}")
- raise
- # Remove an entry for a deleted snapshot from grub configuration
- def remove_grub_entry(config, snapshot):
- try:
- # If using custom file
- if config.get('grub', {}).get('use_custom_file', True):
- grub_custom_file = config.get('grub', {}).get('custom_entries_file', '/etc/grub.d/60_snap-slack')
-
- if not os.path.exists(grub_custom_file):
- logger.warning(f"GRUB custom file not found at {grub_custom_file}")
- return
-
- with open(grub_custom_file, 'r') as f:
- lines = f.readlines()
-
- # Find and remove the menuentry block
- new_lines = []
- skip_mode = False
- bracket_count = 0
-
- for line in lines:
- if f'menuentry "Slackware - {snapshot}"' in line:
- skip_mode = True
- bracket_count = 0
-
- if skip_mode:
- if '{' in line:
- bracket_count += line.count('{')
- if '}' in line:
- bracket_count -= line.count('}')
- if bracket_count <= 0:
- skip_mode = False
- continue
-
- new_lines.append(line)
-
- # Write the modified content back
- with open(grub_custom_file, 'w') as f:
- f.writelines(new_lines)
-
- # Update GRUB configuration
- logger.info(f"Updating GRUB configuration")
- if os.path.exists("/usr/sbin/update-grub"):
- cmd = ["update-grub"]
- else:
- cmd = ["grub-mkconfig", "-o", config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')]
-
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if result.returncode != 0:
- logger.warning(f"GRUB update command failed: {result.stderr.decode()}")
- logger.warning("You may need to manually update your GRUB configuration.")
- print("Warning: GRUB configuration update failed. You may need to run 'update-grub' or 'grub-mkconfig' manually.")
- else:
- # Direct modification of grub.cfg (not recommended)
- grub_cfg = config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')
-
- if not os.path.exists(grub_cfg):
- logger.warning(f"GRUB config file not found at {grub_cfg}")
- return
-
- with open(grub_cfg, 'r') as f:
- content = f.read()
-
- # Find and remove the menuentry block
- import re
- pattern = re.compile(f'menuentry\s+"Slackware - {snapshot}"\s+{{.*?}}', re.DOTALL)
- new_content = pattern.sub('', content)
-
- with open(grub_cfg, 'w') as f:
- f.write(new_content)
-
- logger.info(f"Removed GRUB entry for snapshot: {snapshot}")
- except Exception as e:
- logger.error(f"Error removing GRUB entry: {str(e)}")
- raise
- # List all current snapshots
- def list_snapshots(config):
- try:
- snapshots = []
- snapshot_dir = config['snapshot']['snapshot_dir']
-
- if not os.path.exists(snapshot_dir):
- logger.warning(f"Snapshot directory does not exist: {snapshot_dir}")
- return snapshots
-
- for entry in os.listdir(snapshot_dir):
- entry_path = os.path.join(snapshot_dir, entry)
- if os.path.isdir(entry_path) and entry.startswith(config['snapshot']['snapshot_prefix']):
- snapshots.append(entry)
- return sorted(snapshots)
- except Exception as e:
- logger.error(f"Error listing snapshots: {str(e)}")
- raise
- # Display snapshots with details
- def display_snapshots(config):
- try:
- snapshots = list_snapshots(config)
-
- if not snapshots:
- print("No snapshots found.")
- return
-
- print("\nAvailable snapshots:")
- print("-" * 80)
- print("{:<30} {:<20} {:<10}".format("Snapshot Name", "Created Date", "Age (days)"))
- print("-" * 80)
-
- now = datetime.now()
-
- for snapshot in snapshots:
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot)
- created_time = datetime.fromtimestamp(os.path.getctime(snapshot_path))
- age_days = (now - created_time).days
-
- created_date = created_time.strftime("%Y-%m-%d %H:%M:%S")
- print("{:<30} {:<20} {:<10}".format(snapshot, created_date, age_days))
-
- print("-" * 80)
- except Exception as e:
- logger.error(f"Error displaying snapshots: {str(e)}")
- print(f"Error: {str(e)}")
- # Remove snapshots older than the retention period
- def remove_old_snapshots(config):
- try:
- retention_days = config['snapshot']['retention_days']
- now = datetime.now()
- removed_count = 0
- for snapshot in list_snapshots(config):
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot)
- created_time = datetime.fromtimestamp(os.path.getctime(snapshot_path))
- if (now - created_time).days > retention_days:
- # Remove snapshot
- cmd = ["btrfs", "subvolume", "delete", snapshot_path]
- logger.info(f"Running command: {' '.join(cmd)}")
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode == 0:
- remove_boot_entry(config, snapshot)
- logger.info(f"Removed snapshot: {snapshot}")
- removed_count += 1
- else:
- logger.error(f"Failed to remove snapshot {snapshot}: {result.stderr.decode()}")
-
- return removed_count
- except Exception as e:
- logger.error(f"Error removing old snapshots: {str(e)}")
- raise
- # Fix incorrect adopt implementation
- def adopt(config, snapshot):
- try:
- current_root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume'])
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot)
- if not os.path.exists(snapshot_path):
- logger.error(f"Snapshot '{snapshot}' not found at {snapshot_path}")
- raise Exception(f"Snapshot '{snapshot}' not found.")
- # Check if we're on a live system (not booted into the snapshot)
- with open("/proc/cmdline", "r") as f:
- cmdline = f.read()
- if snapshot in cmdline:
- logger.warning(f"You are already booted into snapshot {snapshot}.")
- print(f"Warning: You are already booted into snapshot {snapshot}. No changes needed.")
- return
- # Create a backup of current root first before making changes
- backup_name = f"@_backup_{datetime.now().strftime('%Y%m%d-%H%M%S')}"
- backup_path = os.path.join(config['snapshot']['btrfs_mount_point'], backup_name)
-
- print(f"Creating backup of current root subvolume as {backup_name}")
- logger.info(f"Creating backup of current root at {backup_path}")
-
- # Create a snapshot of the current root
- cmd_backup = ["btrfs", "subvolume", "snapshot", current_root_path, backup_path]
- logger.info(f"Running command: {' '.join(cmd_backup)}")
- result = subprocess.run(cmd_backup, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode != 0:
- logger.error(f"Failed to create backup: {result.stderr.decode()}")
- raise Exception(f"Failed to create backup: {result.stderr.decode()}")
- # Delete current root subvolume
- print(f"Deleting current root subvolume")
- logger.info(f"Deleting current root subvolume at {current_root_path}")
- cmd_delete = ["btrfs", "subvolume", "delete", current_root_path]
- logger.info(f"Running command: {' '.join(cmd_delete)}")
- result = subprocess.run(cmd_delete, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode != 0:
- logger.error(f"Failed to delete current root: {result.stderr.decode()}")
- raise Exception(f"Failed to delete current root: {result.stderr.decode()}")
- # Create a new snapshot of the selected snapshot as the new root subvolume
- print(f"Creating new root subvolume from snapshot '{snapshot}'")
- logger.info(f"Creating new root from {snapshot_path} to {current_root_path}")
- cmd_new_root = ["btrfs", "subvolume", "snapshot", snapshot_path, current_root_path]
- logger.info(f"Running command: {' '.join(cmd_new_root)}")
- result = subprocess.run(cmd_new_root, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- if result.returncode != 0:
- logger.error(f"Failed to create new root: {result.stderr.decode()}")
- # Try to recover from backup
- print("Error occurred. Attempting to recover from backup...")
- try:
- if os.path.exists(backup_path):
- recovery_cmd = ["btrfs", "subvolume", "snapshot", backup_path, current_root_path]
- subprocess.run(recovery_cmd, check=True)
- print("Recovery successful.")
- logger.info("Recovered from backup after failed adopt operation")
- except Exception as recovery_error:
- logger.critical(f"Failed to recover from backup: {str(recovery_error)}")
- raise Exception(f"Failed to create new root: {result.stderr.decode()}")
- print("\n" + "=" * 80)
- print(f"SUCCESS: Snapshot '{snapshot}' has been adopted as the new root subvolume.")
- print("=" * 80)
- print("\nTo use this snapshot, you need to:")
- print("1. Reboot your system")
- print("2. In the bootloader menu, select your regular Slackware entry (not the snapshot entry)")
- print("3. The system will now boot with the adopted snapshot as the root filesystem")
- print("\nIf you encounter issues, you can:")
- print(f"- Boot into the backup snapshot '{backup_name}' (emergency only)")
- print(f"- Or boot into any other available snapshot from the bootloader menu")
- print("=" * 80)
-
- logger.info(f"Successfully adopted snapshot '{snapshot}' as new root subvolume")
-
- # Note: We don't need to update elilo.conf for the adopted snapshot since it's now the root
- except Exception as e:
- logger.error(f"Error adopting snapshot: {str(e)}")
- print(f"Error: {str(e)}")
- print("\nYour system may be in an inconsistent state. Please verify the root subvolume.")
- sys.exit(1)
- # Create a live bootable image from a snapshot
- def create_boot_image(config, snapshot):
- try:
- snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot)
- if not os.path.exists(snapshot_path):
- logger.error(f"Snapshot '{snapshot}' not found.")
- raise Exception(f"Snapshot '{snapshot}' not found.")
-
- bootdir = config.get('boot', {}).get('boot_dir', '/boot')
- if not os.path.exists(bootdir):
- logger.error(f"Boot directory not found: {bootdir}")
- raise Exception(f"Boot directory not found: {bootdir}")
-
- # Create a custom initrd that boots directly into the snapshot
- # This is a simplified example - actual implementation would depend on
- # how Slackware builds initrd images
-
- print(f"Creating bootable image for snapshot '{snapshot}'...")
- logger.info(f"Creating bootable image for snapshot '{snapshot}'")
-
- # Ensure the snapshot entry exists in elilo.conf
- add_elilo_entry(config, snapshot)
-
- print("\n" + "=" * 80)
- print(f"SUCCESS: Boot entry for snapshot '{snapshot}' is configured.")
- print("=" * 80)
- print("\nTo boot into this snapshot:")
- print("1. Reboot your system")
- print("2. In the ELILO boot menu, select the entry labeled:")
- print(f" {snapshot}")
- print("\nNOTE: This will boot into the snapshot in read-only mode.")
- print(" To make it permanent, use 'snap-slack adopt --snapshot' after testing.")
- print("=" * 80)
-
- except Exception as e:
- logger.error(f"Error creating boot image: {str(e)}")
- print(f"Error: {str(e)}")
- # Main logic for managing snapshots
- def manage(config):
- try:
- # Step 1: Create snapshot directory if it doesn't exist
- os.makedirs(config['snapshot']['snapshot_dir'], exist_ok=True)
-
- # Step 2: Get list of current snapshots and add them to elilo.conf if needed
- print("Checking existing snapshots...")
- snapshots = list_snapshots(config)
- for snapshot in snapshots:
- add_elilo_entry(config, snapshot)
- # Step 3: Remove snapshots older than retention period
- print("Checking for old snapshots to remove...")
- removed = remove_old_snapshots(config)
- if removed > 0:
- print(f"Removed {removed} old snapshots.")
- else:
- print("No old snapshots to remove.")
-
- print("\nSnapshot management completed successfully.")
- except Exception as e:
- logger.error(f"Error managing snapshots: {str(e)}")
- print(f"Error: {str(e)}")
- # Verify system configuration
- def verify_system(config):
- try:
- issues = []
-
- # Check if btrfs is installed
- result = subprocess.run(["which", "btrfs"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if result.returncode != 0:
- issues.append("BTRFS tools not found. Please install btrfs-progs package.")
-
- # Check if root filesystem is btrfs
- result = subprocess.run(["findmnt", "-no", "FSTYPE", "/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if "btrfs" not in result.stdout.decode().strip():
- issues.append("Root filesystem is not BTRFS. snap-slack requires a BTRFS root filesystem.")
-
- # Check if elilo.conf exists or can be created
- elilo_conf = config['elilo']['elilo_conf']
- if not os.path.exists(elilo_conf):
- if not os.path.exists(os.path.dirname(elilo_conf)):
- issues.append(f"ELILO config directory not found: {os.path.dirname(elilo_conf)}")
-
- # Check if snapshot directory exists or can be created
- snapshot_dir = config['snapshot']['snapshot_dir']
- if not os.path.exists(snapshot_dir):
- try:
- os.makedirs(snapshot_dir, exist_ok=True)
- except Exception:
- issues.append(f"Cannot create snapshot directory: {snapshot_dir}")
-
- # Check if root subvolume exists
- root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume'])
- if not os.path.exists(root_path):
- issues.append(f"Root subvolume not found: {root_path}")
-
- if issues:
- print("\nSystem verification found issues:")
- for i, issue in enumerate(issues, 1):
- print(f"{i}. {issue}")
- return False
- else:
- print("System verification passed. Your system is properly configured for snap-slack.")
- return True
-
- except Exception as e:
- logger.error(f"Error verifying system: {str(e)}")
- print(f"Error verifying system: {str(e)}")
- return False
- # Parse command-line arguments
- def parse_arguments():
- parser = argparse.ArgumentParser(
- description='Manage BTRFS snapshots and bootloader entries.',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- Examples:
- snap-slack create Create a new snapshot
- snap-slack create --description "pre-update" Create snapshot with description
- snap-slack list List all available snapshots
- snap-slack manage Manage existing snapshots
- snap-slack boot --snapshot X Set up specific snapshot for booting
- snap-slack adopt --snapshot X Adopt a snapshot as the new root filesystem
- snap-slack verify Verify system configuration
- """
- )
-
- # Global options
- parser.add_argument('--dry-run', action='store_true', help='Show what would be done without actually doing it')
- parser.add_argument('--test-dir', help='Use a test directory instead of the real root')
- parser.add_argument('--config', help=f'Path to config file (default: {CONFIG_PATH})')
- parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
- parser.add_argument('--simulate', action='store_true', help='Simulate operations (for safe testing)')
-
- subparsers = parser.add_subparsers(dest='action', help='Action to perform')
-
- # Create command
- create_parser = subparsers.add_parser('create', help='Create a new snapshot')
- create_parser.add_argument('--description', help='Optional description for the snapshot')
-
- # List command
- list_parser = subparsers.add_parser('list', help='List all snapshots')
-
- # Manage command
- manage_parser = subparsers.add_parser('manage', help='Manage snapshots (clean up old ones, update bootloader config)')
-
- # Boot command
- boot_parser = subparsers.add_parser('boot', help='Configure a snapshot for booting')
- boot_parser.add_argument('--snapshot', required=True, help='The snapshot to configure for booting')
-
- # Adopt command
- adopt_parser = subparsers.add_parser('adopt', help='Adopt a snapshot as the new root subvolume')
- adopt_parser.add_argument('--snapshot', required=True, help='The snapshot to adopt as the new root subvolume')
-
- # Verify command
- verify_parser = subparsers.add_parser('verify', help='Verify system configuration')
-
- return parser.parse_args()
- def main():
- try:
- # Parse command-line arguments
- args = parse_arguments()
-
- if not args.action:
- print("Error: No action specified. Use --help for usage information.")
- sys.exit(1)
-
- # Set logging level based on verbosity
- if args.verbose:
- logger.setLevel(logging.DEBUG)
-
- # Load configuration
- config_path = args.config if args.config else CONFIG_PATH
- config = read_config(config_path, args.test_dir)
-
- # Execute based on action
- if args.action == 'create':
- snapshot = create_snapshot(
- config,
- dry_run=args.dry_run,
- simulate=args.simulate,
- description=args.description if hasattr(args, 'description') else None
- )
- if snapshot:
- print(f"{'[SIMULATE] ' if args.simulate else ''}{'[DRY-RUN] ' if args.dry_run else ''}Created snapshot: {snapshot}")
- print(f"To boot from this snapshot, use: snap-slack boot --snapshot {snapshot}")
- elif args.action == 'list':
- display_snapshots(config)
- elif args.action == 'manage':
- manage(config, dry_run=args.dry_run, simulate=args.simulate)
- elif args.action == 'boot':
- create_boot_image(config, args.snapshot, dry_run=args.dry_run, simulate=args.simulate)
- elif args.action == 'adopt':
- adopt(config, args.snapshot, dry_run=args.dry_run, simulate=args.simulate)
- elif args.action == 'verify':
- verify_system(config)
- elif args.action == 'install-hooks':
- install_slackpkg_hooks(config, dry_run=args.dry_run, simulate=args.simulate)
- else:
- print(f"Invalid action: {args.action}")
- sys.exit(1)
-
- except KeyboardInterrupt:
- print("\nOperation cancelled by user.")
- sys.exit(130)
- except Exception as e:
- logger.error(f"Unhandled exception: {str(e)}")
- print(f"Error: {str(e)}")
- sys.exit(1)
- # Install SlackPkg hooks
- def install_slackpkg_hooks(config, dry_run=False, simulate=False):
- hooks_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "install-hooks.sh")
-
- if os.path.exists(hooks_script):
- # Execute the hook installation script
- cmd = ["bash", hooks_script]
- returncode, stdout, stderr = safe_execute(
- cmd,
- dry_run=dry_run,
- simulate=simulate,
- description="Installing SlackPkg hooks"
- )
-
- if returncode != 0 and not (dry_run or simulate):
- print(f"Error installing hooks: {stderr}")
- return False
-
- return True
- else:
- # Generate the hooks in-place
- hooks_dir = "/etc/slackpkg/hooks"
- pre_hook = os.path.join(hooks_dir, "pre-install.sh")
- post_hook = os.path.join(hooks_dir, "post-install.sh")
-
- # Create hooks directory
- if not os.path.exists(hooks_dir) and not (dry_run or simulate):
- try:
- os.makedirs(hooks_dir, exist_ok=True)
- except Exception as e:
- print(f"Error creating hooks directory: {str(e)}")
- return False
-
- # Pre-install hook content
- pre_hook_content = """#!/bin/bash
- # SlackPkg Pre-install Hook for snap-slack
- # This file was automatically installed by snap-slack
- # Configuration
- SNAP_SLACK=/usr/bin/snap-slack
- ENABLE_AUTO_SNAPSHOTS=1 # Set to 0 to disable automatic snapshots
- MAX_PACKAGE_COUNT=100 # Maximum number of packages to include in snapshot name
- # Check if snap-slack is installed
- if [ ! -x "$SNAP_SLACK" ]; then
- echo "WARNING: snap-slack not found at $SNAP_SLACK, skipping pre-install snapshot"
- exit 0
- fi
- # Check if auto snapshots are enabled
- if [ "$ENABLE_AUTO_SNAPSHOTS" != "1" ]; then
- echo "INFO: Automatic snapshots are disabled in slackpkg hook"
- exit 0
- fi
- # Function to create a snapshot
- create_snapshot() {
- local desc="$1"
- echo "Creating snapshot before package operations: $desc"
- $SNAP_SLACK create --description "$desc"
- }
- # Get the operation being performed
- OPERATION="$1"
- shift
- PACKAGES="$@"
- # Create appropriate snapshot based on operation
- case "$OPERATION" in
- upgrade-all)
- create_snapshot "pre-upgrade-all"
- ;;
- install|upgrade)
- # Limit the number of packages in the snapshot name for readability
- if [ "$(echo $PACKAGES | wc -w)" -gt $MAX_PACKAGE_COUNT ]; then
- PKG_COUNT=$(echo $PACKAGES | wc -w)
- create_snapshot "pre-$OPERATION-$PKG_COUNT-packages"
- else
- create_snapshot "pre-$OPERATION-$(echo $PACKAGES | tr ' ' '-')"
- fi
- ;;
- remove)
- if [ "$(echo $PACKAGES | wc -w)" -gt $MAX_PACKAGE_COUNT ]; then
- PKG_COUNT=$(echo $PACKAGES | wc -w)
- create_snapshot "pre-remove-$PKG_COUNT-packages"
- else
- create_snapshot "pre-remove-$(echo $PACKAGES | tr ' ' '-')"
- fi
- ;;
- *)
- # For other operations, create a generic snapshot
- create_snapshot "pre-slackpkg-operation"
- ;;
- esac
- exit 0
- """
- # Post-install hook content
- post_hook_content = """#!/bin/bash
- # SlackPkg Post-install Hook for snap-slack
- # This file was automatically installed by snap-slack
- # Configuration
- SNAP_SLACK=/usr/bin/snap-slack
- AUTO_CLEANUP=1 # Set to 0 to disable automatic snapshot cleanup
- # Check if snap-slack is installed
- if [ ! -x "$SNAP_SLACK" ]; then
- echo "WARNING: snap-slack not found at $SNAP_SLACK, skipping post-install operations"
- exit 0
- fi
- # Check if auto cleanup is enabled
- if [ "$AUTO_CLEANUP" = "1" ]; then
- echo "Running snapshot management to clean up old snapshots"
- $SNAP_SLACK manage
- fi
- # Log the successful completion
- echo "Package operation completed successfully."
- echo "If you encounter issues, you can rollback using:"
- echo " snap-slack list # to see available snapshots"
- echo " snap-slack adopt --snapshot <snapshot-name> # to rollback"
- exit 0
- """
- # Write the pre-install hook
- safe_write_file(
- pre_hook,
- pre_hook_content,
- dry_run=dry_run,
- simulate=simulate,
- description="Creating pre-install hook"
- )
-
- # Write the post-install hook
- safe_write_file(
- post_hook,
- post_hook_content,
- dry_run=dry_run,
- simulate=simulate,
- description="Creating post-install hook"
- )
-
- # Make hooks executable
- if not (dry_run or simulate):
- try:
- os.chmod(pre_hook, 0o755)
- os.chmod(post_hook, 0o755)
- except Exception as e:
- print(f"Error setting permissions on hooks: {str(e)}")
- return False
-
- print("SlackPkg hooks installed successfully.")
- print("The hooks will create snapshots before package operations and clean up old snapshots afterward.")
- print("To disable automatic snapshots, edit /etc/slackpkg/hooks/pre-install.sh and set ENABLE_AUTO_SNAPSHOTS=0")
- print("To disable automatic cleanup, edit /etc/slackpkg/hooks/post-install.sh and set AUTO_CLEANUP=0")
-
- return True
- if __name__ == "__main__":
- main()
|