# Fix incorrect adopt implementation with dry-run support def adopt(config, snapshot, dry_run=False, simulate=False): try: current_root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume']) snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot) if not os.path.exists(snapshot_path) and not (dry_run or simulate): logger.error(f"Snapshot '{snapshot}' not found at {snapshot_path}") print(f"Error: Snapshot '{snapshot}' not found.") return False # Check if we're on a live system (not booted into the snapshot) if not (dry_run or simulate): with open("/proc/cmdline", "r") as f: cmdline = f.read() if snapshot in cmdline: logger.warning(f"You are already booted into snapshot {snapshot}.") print(f"Warning: You are already booted into snapshot {snapshot}. No changes needed.") return False # Create a backup of current root first before making changes backup_name = f"@_backup_{datetime.now().strftime('%Y%m%d-%H%M%S')}" backup_path = os.path.join(config['snapshot']['btrfs_mount_point'], backup_name) print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Creating backup of current root subvolume as {backup_name}") logger.info(f"Creating backup of current root at {backup_path}") # Create a snapshot of the current root cmd_backup = ["btrfs", "subvolume", "snapshot", current_root_path, backup_path] returncode, stdout, stderr = safe_execute( cmd_backup, dry_run=dry_run, simulate=simulate, description="Creating backup of current root" ) if returncode != 0 and not (dry_run or simulate): logger.error(f"Failed to create backup: {stderr}") print(f"Error: Failed to create backup: {stderr}") return False # Delete current root subvolume print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Deleting current root subvolume") logger.info(f"Deleting current root subvolume at {current_root_path}") cmd_delete = ["btrfs", "subvolume", "delete", current_root_path] returncode, stdout, stderr = safe_execute( cmd_delete, dry_run=dry_run, simulate=simulate, description="Deleting current root subvolume" ) if returncode != 0 and not (dry_run or simulate): logger.error(f"Failed to delete current root: {stderr}") print(f"Error: Failed to delete current root: {stderr}") return False # Create a new snapshot of the selected snapshot as the new root subvolume print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Creating new root subvolume from snapshot '{snapshot}'") logger.info(f"Creating new root from {snapshot_path} to {current_root_path}") cmd_new_root = ["btrfs", "subvolume", "snapshot", snapshot_path, current_root_path] returncode, stdout, stderr = safe_execute( cmd_new_root, dry_run=dry_run, simulate=simulate, description=f"Creating new root subvolume from snapshot '{snapshot# Safe command execution with dry-run support def safe_execute(cmd, dry_run=False, simulate=False, description=None): """ Execute a command safely with dry-run and simulation support. Args: cmd: Command list to execute dry_run: If True, just print the command without executing simulate: If True, log as if executed but don't actually execute description: Optional description of what the command does Returns: A tuple of (return_code, stdout, stderr) or (0, '', '') if dry_run/simulate """ cmd_str = ' '.join(cmd) if description: logger.info(f"{description}: {cmd_str}") if dry_run or simulate: print(f"WOULD EXECUTE: {description}") print(f" $ {cmd_str}") else: logger.info(f"Would execute: {cmd_str}") if dry_run or simulate: print(f"WOULD EXECUTE: {cmd_str}") if dry_run or simulate: # Return success for dry run or simulate return 0, '', '' else: # Actually execute the command result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return result.returncode, result.stdout.decode(), result.stderr.decode() # Safe file operations with dry-run support def safe_write_file(file_path, content, dry_run=False, simulate=False, description=None): """ Write to a file with dry-run and simulation support. Args: file_path: Path to the file content: Content to write dry_run: If True, just print what would be written simulate: If True, log as if written but don't actually write description: Optional description of the write operation """ if description: logger.info(f"{description}: {file_path}") if dry_run or simulate: print(f"WOULD WRITE: {description}") print(f" To file: {file_path}") print(f" Content length: {len(content)} bytes") if len(content) < 1000: print(f" Content preview:\n{'-'*40}\n{content[:500]}\n{'-'*40}") else: print(f" Content preview (first 500 bytes):\n{'-'*40}\n{content[:500]}...\n{'-'*40}") else: logger.info(f"Would write to: {file_path}") if dry_run or simulate: print(f"WOULD WRITE TO: {file_path}") if not (dry_run or simulate): # Create directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) # Actually write the file with open(file_path, 'w') as f: f.write(content) # Apply test directory to paths if specified def adjust_path(config, path, test_dir=None): """ Adjust a path with a test directory if specified. Args: config: Configuration dictionary path: Original path test_dir: Test directory to prepend (if not None) Returns: Adjusted path """ if test_dir is None: return path # For absolute paths, replace with test directory if path.startswith('/'): return os.path.join(test_dir, path[1:]) return path#!/usr/bin/env python3 import os import sys import subprocess import toml import argparse import logging from datetime import datetime import shutil # Path to the configuration file CONFIG_PATH = "/etc/snap-slack/config.toml" LOG_PATH = "/var/log/snap-slack.log" # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_PATH), logging.StreamHandler() ] ) logger = logging.getLogger('snap-slack') # Read configuration from a TOML file with test dir support def read_config(file_path=CONFIG_PATH, test_dir=None): if not os.path.exists(file_path): logger.error(f"Configuration file not found: {file_path}") raise FileNotFoundError(f"Configuration file not found: {file_path}") try: with open(file_path, 'r') as f: config = toml.load(f) # Validate essential config entries required_fields = { 'snapshot': ['snapshot_dir', 'snapshot_prefix', 'btrfs_mount_point', 'root_subvolume', 'retention_days'], } if config.get('bootloader', {}).get('type', 'elilo').lower() == 'elilo': required_fields['elilo'] = ['elilo_conf', 'root_partition'] else: required_fields['grub'] = ['config_file', 'root_partition'] for section, fields in required_fields.items(): if section not in config: raise ValueError(f"Missing required section '{section}' in config file") for field in fields: if field not in config[section]: raise ValueError(f"Missing required field '{field}' in section '{section}'") # If test_dir is specified, adjust paths in the config if test_dir: logger.info(f"Using test directory: {test_dir}") print(f"Using test directory: {test_dir}") # Adjust snapshot paths config['snapshot']['snapshot_dir'] = adjust_path(config, config['snapshot']['snapshot_dir'], test_dir) config['snapshot']['btrfs_mount_point'] = adjust_path(config, config['snapshot']['btrfs_mount_point'], test_dir) # Adjust bootloader paths if 'elilo' in config: config['elilo']['elilo_conf'] = adjust_path(config, config['elilo']['elilo_conf'], test_dir) if 'grub' in config: if 'config_file' in config['grub']: config['grub']['config_file'] = adjust_path(config, config['grub']['config_file'], test_dir) if 'custom_entries_file' in config['grub']: config['grub']['custom_entries_file'] = adjust_path(config, config['grub']['custom_entries_file'], test_dir) # Adjust boot paths if 'boot' in config and 'boot_dir' in config['boot']: config['boot']['boot_dir'] = adjust_path(config, config['boot']['boot_dir'], test_dir) # Add test_dir to config for reference config['_test_dir'] = test_dir return config except Exception as e: logger.error(f"Error reading config: {str(e)}") raise # Create a new snapshot with the current date and time def create_snapshot(config, dry_run=False, simulate=False, description=None): try: date_str = datetime.now().strftime("%Y%m%d-%H%M%S") # Add description to snapshot name if provided if description: # Clean up description for use in filename clean_desc = description.replace(" ", "-") clean_desc = re.sub(r'[^a-zA-Z0-9\-_]', '', clean_desc) snapshot_name = f"{config['snapshot']['snapshot_prefix']}{date_str}-{clean_desc}" else: snapshot_name = f"{config['snapshot']['snapshot_prefix']}{date_str}" snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot_name) # Ensure snapshot directory exists if not (dry_run or simulate): os.makedirs(config['snapshot']['snapshot_dir'], exist_ok=True) # Run the Btrfs snapshot command root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume']) if not os.path.exists(root_path) and not (dry_run or simulate): logger.error(f"Root subvolume path does not exist: {root_path}") return None # Check if encryption is enabled is_encrypted = config.get('encryption', {}).get('enabled', False) # If hibernation is enabled, we need a read-write snapshot # otherwise, create a read-only snapshot hibernation_enabled = config.get('hibernation', {}).get('enabled', False) snapshot_readonly = not hibernation_enabled # Command for creating snapshot cmd = ["btrfs", "subvolume", "snapshot"] if snapshot_readonly: cmd.append("-r") # read-only flag cmd.extend([root_path, snapshot_path]) # Execute the command with safety measures returncode, stdout, stderr = safe_execute( cmd, dry_run=dry_run, simulate=simulate, description=f"Creating {'read-only' if snapshot_readonly else 'read-write'} snapshot{' with description: ' + description if description else ''}" ) if returncode == 0: logger.info(f"Created snapshot: {snapshot_name}") print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}Created snapshot: {snapshot_name}") # For encrypted systems with hibernation, update the initramfs if is_encrypted and hibernation_enabled: update_initramfs_for_encryption(config, snapshot_name, snapshot_path, dry_run=dry_run, simulate=simulate) add_boot_entry(config, snapshot_name, dry_run=dry_run, simulate=simulate) return snapshot_name else: error_msg = stderr logger.error(f"Failed to create snapshot: {error_msg}") print(f"Failed to create snapshot: {error_msg}") return None except Exception as e: logger.error(f"Error creating snapshot: {str(e)}") print(f"Error creating snapshot: {str(e)}") return None # Update initramfs for encrypted volume with hibernation def update_initramfs_for_encryption(config, snapshot_name, snapshot_path): try: logger.info(f"Updating initramfs for encrypted system with hibernation support") print(f"Updating initramfs for encrypted system with hibernation support...") # Get initramfs generation tool from config initramfs_tool = config.get('encryption', {}).get('initramfs_tool', 'mkinitrd') # Get location to put the new initramfs file initramfs_dir = config.get('encryption', {}).get('initramfs_dir', '/boot') # New initramfs filename with snapshot identifier initramfs_file = f"initrd-{snapshot_name}.gz" initramfs_path = os.path.join(initramfs_dir, initramfs_file) # Get kernel version kernel_version = get_current_kernel_version(config) if not kernel_version: logger.error("Failed to determine kernel version") raise Exception("Failed to determine kernel version") # Get LUKS UUID for the root device luks_uuid = get_luks_uuid(config) if not luks_uuid: logger.error("Failed to determine LUKS UUID") raise Exception("Failed to determine LUKS UUID") # Generate initramfs command based on tool if initramfs_tool == "mkinitrd": # For mkinitrd (Slackware's default) resume_device = config.get('hibernation', {}).get('resume_device', '') luks_name = config.get('encryption', {}).get('luks_name', 'cryptroot') # Create mkinitrd config file for this snapshot mkinitrd_conf = f"/etc/mkinitrd.conf.{snapshot_name}" with open("/etc/mkinitrd.conf", "r") as f: mkinitrd_content = f.read() # Update paths for the snapshot mkinitrd_content = mkinitrd_content.replace( f"ROOT_DEVICE=", f"ROOT_DEVICE=/dev/mapper/{luks_name}\n# Original: ") # Add the LUKS options if not present if "LUKSDEV=" not in mkinitrd_content: mkinitrd_content += f"\n# Added by snap-slack\nLUKSDEV=UUID={luks_uuid}\n" # Add the resume device if not present and hibernation is enabled if resume_device and "RESUMEDEV=" not in mkinitrd_content: mkinitrd_content += f"RESUMEDEV={resume_device}\n" # Add rootflags for the snapshot if "ROOTFLAGS=" not in mkinitrd_content: mkinitrd_content += f"ROOTFLAGS=subvol={config['snapshot']['snapshot_dir']}/{snapshot_name}\n" else: # Replace existing rootflags mkinitrd_content = re.sub( r'ROOTFLAGS=.*', f'ROOTFLAGS=subvol={config["snapshot"]["snapshot_dir"]}/{snapshot_name}', mkinitrd_content) # Write the custom config with open(mkinitrd_conf, "w") as f: f.write(mkinitrd_content) # Build the mkinitrd command cmd = [ "mkinitrd", "-F", # force "-c", mkinitrd_conf, # use custom config "-o", initramfs_path, "-k", kernel_version ] elif initramfs_tool == "dracut": # For dracut resume_device = config.get('hibernation', {}).get('resume_device', '') cmd = [ "dracut", "--force", # force "--kver", kernel_version, # kernel version "-f", initramfs_path, # output file ] # Add LUKS and resume parameters dracut_modules = "crypt" if resume_device: dracut_modules += " resume" cmd.extend([ "--add", dracut_modules, "--persistent-policy", "by-uuid" ]) # Add kernel command line parameters kernel_cmdline = f"rd.luks.uuid={luks_uuid} root=/dev/mapper/cryptroot rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot_name}" if resume_device: kernel_cmdline += f" resume={resume_device}" cmd.extend(["--kernel-cmdline", kernel_cmdline]) else: logger.error(f"Unsupported initramfs generation tool: {initramfs_tool}") raise Exception(f"Unsupported initramfs generation tool: {initramfs_tool}") # Execute the command logger.info(f"Running command: {' '.join(cmd)}") print(f"Generating custom initramfs for encrypted system...") result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: logger.info(f"Successfully created custom initramfs at {initramfs_path}") print(f"Successfully created custom initramfs at {initramfs_path}") # Update the config with the custom initramfs location config['encryption']['custom_initramfs'] = initramfs_file return True else: error_msg = result.stderr.decode() logger.error(f"Failed to create custom initramfs: {error_msg}") print(f"Failed to create custom initramfs: {error_msg}") return False except Exception as e: logger.error(f"Error updating initramfs: {str(e)}") print(f"Error updating initramfs: {str(e)}") return False # Get LUKS UUID for encrypted device def get_luks_uuid(config): try: # Try to get UUID from config first uuid = config.get('encryption', {}).get('luks_uuid', '') if uuid: return uuid # If not in config, try to determine it luks_device = config.get('encryption', {}).get('luks_device', '') if not luks_device: # Try to find it automatically cmd = ["blkid", "-t", "TYPE=crypto_LUKS", "-o", "device"] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: devices = result.stdout.decode().strip().split('\n') if devices and devices[0]: luks_device = devices[0] if luks_device: # Get UUID of the LUKS device cmd = ["blkid", "-s", "UUID", "-o", "value", luks_device] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: uuid = result.stdout.decode().strip() return uuid return None except Exception as e: logger.error(f"Error getting LUKS UUID: {str(e)}") return None # Get current kernel version def get_current_kernel_version(config): try: # Try to get from config first kernel_version = config.get('boot', {}).get('kernel_version', '') if kernel_version: return kernel_version # Try to determine from running system if os.path.exists("/proc/version"): with open("/proc/version", "r") as f: version_info = f.read() match = re.search(r'Linux version ([0-9.-]+\S+)', version_info) if match: return match.group(1) # Fallback to uname result = subprocess.run(["uname", "-r"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: return result.stdout.decode().strip() return None except Exception as e: logger.error(f"Error determining kernel version: {str(e)}") return None # Add a boot entry for a snapshot def add_boot_entry(config, snapshot, dry_run=False, simulate=False): try: # Determine the bootloader type from config bootloader = config.get('bootloader', {}).get('type', 'elilo').lower() if bootloader == 'grub': return add_grub_entry(config, snapshot, dry_run=dry_run, simulate=simulate) else: return add_elilo_entry(config, snapshot, dry_run=dry_run, simulate=simulate) except Exception as e: logger.error(f"Error adding boot entry: {str(e)}") print(f"Error adding boot entry: {str(e)}") return False # Add an entry for the snapshot in elilo.conf def add_elilo_entry(config, snapshot, dry_run=False, simulate=False): try: # Get the extra boot options from the config, if any extra_boot_options = config.get('elilo', {}).get('extra_boot_options', '') # Check if encryption is enabled is_encrypted = config.get('encryption', {}).get('enabled', False) hibernation_enabled = config.get('hibernation', {}).get('enabled', False) # Determine initrd file to use initrd_file = "initrd.gz" # Default if is_encrypted and hibernation_enabled: # Use the custom initramfs for encrypted systems with hibernation custom_initramfs = config.get('encryption', {}).get('custom_initramfs', '') if custom_initramfs: initrd_file = custom_initramfs # Construct the boot entry with optional extra boot options entry = f""" image=vmlinuz label={snapshot} root={config['elilo']['root_partition']} """ # Add proper boot parameters based on encryption status if is_encrypted: luks_uuid = get_luks_uuid(config) luks_name = config.get('encryption', {}).get('luks_name', 'cryptroot') # For encrypted systems entry += f" append=\"rd.luks.uuid={luks_uuid} root=/dev/mapper/{luks_name} rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro" # Add resume device for hibernation if enabled if hibernation_enabled: resume_device = config.get('hibernation', {}).get('resume_device', '') if resume_device: entry += f" resume={resume_device}" # Add any extra boot options if extra_boot_options: entry += f" {extra_boot_options}" entry += "\"\n" else: # For non-encrypted systems entry += f" append=\"rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro {extra_boot_options}\"\n" entry += " read-only\n" entry += f" initrd={initrd_file}\n" # Append to elilo.conf if the entry doesn't already exist elilo_conf = config['elilo']['elilo_conf'] # Create the file if it doesn't exist if not os.path.exists(elilo_conf) and not (dry_run or simulate): # Ensure elilo.conf directory exists os.makedirs(os.path.dirname(elilo_conf), exist_ok=True) with open(elilo_conf, 'w') as f: f.write("# ELILO Configuration File\n") # Check if the entry already exists (in simulation or dry-run mode, assume it doesn't) entry_exists = False if not (dry_run or simulate) and os.path.exists(elilo_conf): with open(elilo_conf, 'r') as f: content = f.read() if snapshot in content: entry_exists = True logger.info(f"Entry for snapshot {snapshot} already exists in elilo.conf") print(f"Entry for snapshot {snapshot} already exists in elilo.conf") return False if not entry_exists: # Add the entry using safe file operations description = f"Adding entry for snapshot {snapshot} to elilo.conf" if not (dry_run or simulate): with open(elilo_conf, 'a+') as f: f.write(entry) logger.info(description) if dry_run or simulate: print(f"{'[SIMULATE] ' if simulate else ''}{'[DRY-RUN] ' if dry_run else ''}{description}") print(f"WOULD APPEND TO {elilo_conf}:") print(f"{'-'*40}\n{entry}\n{'-'*40}") else: print(f"Added entry for snapshot: {snapshot}") return True except Exception as e: logger.error(f"Error adding elilo entry: {str(e)}") print(f"Error adding elilo entry: {str(e)}") return False # Add an entry for the snapshot in grub.cfg or custom config def add_grub_entry(config, snapshot): try: # Get boot options extra_boot_options = config.get('grub', {}).get('extra_boot_options', '') # Get GRUB configuration details grub_custom_file = config.get('grub', {}).get('custom_entries_file', '/etc/grub.d/60_snap-slack') kernel_path = config.get('grub', {}).get('kernel_path', '/boot/vmlinuz') initrd_path = config.get('grub', {}).get('initrd_path', '/boot/initrd.gz') root_partition = config.get('grub', {}).get('root_partition', '/dev/sdaX') # Create the GRUB menu entry grub_entry = f""" # Entry for snapshot {snapshot} menuentry "Slackware - {snapshot}" {{ linux {kernel_path} root={root_partition} rootflags=subvol={config['snapshot']['snapshot_dir']}/{snapshot} ro {extra_boot_options} initrd {initrd_path} }} """ # Determine if we're using a custom file or directly modifying grub.cfg if config.get('grub', {}).get('use_custom_file', True): # Using a custom file in /etc/grub.d/ # Check if directory exists os.makedirs(os.path.dirname(grub_custom_file), exist_ok=True) # Create or update the custom file snapshot_entries = {} if os.path.exists(grub_custom_file): with open(grub_custom_file, 'r') as f: content = f.read() # Parse existing entries import re entries = re.findall(r'menuentry "Slackware - (snapshot-[^"]+)"', content) for entry in entries: snapshot_entries[entry] = True # If the snapshot is not already in the file if snapshot not in snapshot_entries: with open(grub_custom_file, 'a+') as f: if os.path.getsize(grub_custom_file) == 0: f.write("#!/bin/sh\n") f.write("exec tail -n +3 $0\n\n") f.write("# This file was generated by snap-slack\n\n") f.write("cat << EOF\n") f.write(grub_entry) if not content.endswith("EOF\n"): f.write("EOF\n") # Make the file executable os.chmod(grub_custom_file, 0o755) # Update GRUB configuration logger.info(f"Updating GRUB configuration") if os.path.exists("/usr/sbin/update-grub"): cmd = ["update-grub"] else: cmd = ["grub-mkconfig", "-o", config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: logger.warning(f"GRUB update command failed: {result.stderr.decode()}") logger.warning("You may need to manually update your GRUB configuration.") print("Warning: GRUB configuration update failed. You may need to run 'update-grub' or 'grub-mkconfig' manually.") logger.info(f"Added GRUB entry for snapshot: {snapshot}") return True else: logger.info(f"GRUB entry for snapshot {snapshot} already exists") return False else: # Directly modify grub.cfg (not recommended but supported) grub_cfg = config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg') if not os.path.exists(grub_cfg): logger.warning(f"GRUB config file not found at {grub_cfg}") return False with open(grub_cfg, 'r') as f: content = f.read() if f"Slackware - {snapshot}" not in content: # Find the last menuentry last_entry_pos = content.rfind("menuentry ") if last_entry_pos == -1: # No menuentry found, append to end with open(grub_cfg, 'a') as f: f.write(grub_entry) else: # Find the end of the last menuentry bracket_count = 0 for i in range(last_entry_pos, len(content)): if content[i] == '{': bracket_count += 1 elif content[i] == '}': bracket_count -= 1 if bracket_count == 0: # Insert after the last menuentry new_content = content[:i+1] + grub_entry + content[i+1:] with open(grub_cfg, 'w') as f: f.write(new_content) break logger.info(f"Added GRUB entry for snapshot: {snapshot}") return True else: logger.info(f"GRUB entry for snapshot {snapshot} already exists") return False except Exception as e: logger.error(f"Error adding GRUB entry: {str(e)}") raise # Remove a boot entry for a deleted snapshot def remove_boot_entry(config, snapshot): try: # Determine the bootloader type from config bootloader = config.get('bootloader', {}).get('type', 'elilo').lower() if bootloader == 'grub': return remove_grub_entry(config, snapshot) else: return remove_elilo_entry(config, snapshot) except Exception as e: logger.error(f"Error removing boot entry: {str(e)}") raise # Remove an entry for a deleted snapshot from elilo.conf def remove_elilo_entry(config, snapshot): try: elilo_conf = config['elilo']['elilo_conf'] if not os.path.exists(elilo_conf): logger.warning(f"elilo.conf not found at {elilo_conf}") return with open(elilo_conf, 'r') as f: lines = f.readlines() found = False with open(elilo_conf, 'w') as f: for line in lines: if f"label={snapshot}" in line: found = True continue if found and (line.strip() == "" or line.startswith("image=")): found = False if not found: f.write(line) logger.info(f"Removed entry for snapshot: {snapshot}") except Exception as e: logger.error(f"Error removing elilo entry: {str(e)}") raise # Remove an entry for a deleted snapshot from grub configuration def remove_grub_entry(config, snapshot): try: # If using custom file if config.get('grub', {}).get('use_custom_file', True): grub_custom_file = config.get('grub', {}).get('custom_entries_file', '/etc/grub.d/60_snap-slack') if not os.path.exists(grub_custom_file): logger.warning(f"GRUB custom file not found at {grub_custom_file}") return with open(grub_custom_file, 'r') as f: lines = f.readlines() # Find and remove the menuentry block new_lines = [] skip_mode = False bracket_count = 0 for line in lines: if f'menuentry "Slackware - {snapshot}"' in line: skip_mode = True bracket_count = 0 if skip_mode: if '{' in line: bracket_count += line.count('{') if '}' in line: bracket_count -= line.count('}') if bracket_count <= 0: skip_mode = False continue new_lines.append(line) # Write the modified content back with open(grub_custom_file, 'w') as f: f.writelines(new_lines) # Update GRUB configuration logger.info(f"Updating GRUB configuration") if os.path.exists("/usr/sbin/update-grub"): cmd = ["update-grub"] else: cmd = ["grub-mkconfig", "-o", config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg')] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: logger.warning(f"GRUB update command failed: {result.stderr.decode()}") logger.warning("You may need to manually update your GRUB configuration.") print("Warning: GRUB configuration update failed. You may need to run 'update-grub' or 'grub-mkconfig' manually.") else: # Direct modification of grub.cfg (not recommended) grub_cfg = config.get('grub', {}).get('config_file', '/boot/grub/grub.cfg') if not os.path.exists(grub_cfg): logger.warning(f"GRUB config file not found at {grub_cfg}") return with open(grub_cfg, 'r') as f: content = f.read() # Find and remove the menuentry block import re pattern = re.compile(f'menuentry\s+"Slackware - {snapshot}"\s+{{.*?}}', re.DOTALL) new_content = pattern.sub('', content) with open(grub_cfg, 'w') as f: f.write(new_content) logger.info(f"Removed GRUB entry for snapshot: {snapshot}") except Exception as e: logger.error(f"Error removing GRUB entry: {str(e)}") raise # List all current snapshots def list_snapshots(config): try: snapshots = [] snapshot_dir = config['snapshot']['snapshot_dir'] if not os.path.exists(snapshot_dir): logger.warning(f"Snapshot directory does not exist: {snapshot_dir}") return snapshots for entry in os.listdir(snapshot_dir): entry_path = os.path.join(snapshot_dir, entry) if os.path.isdir(entry_path) and entry.startswith(config['snapshot']['snapshot_prefix']): snapshots.append(entry) return sorted(snapshots) except Exception as e: logger.error(f"Error listing snapshots: {str(e)}") raise # Display snapshots with details def display_snapshots(config): try: snapshots = list_snapshots(config) if not snapshots: print("No snapshots found.") return print("\nAvailable snapshots:") print("-" * 80) print("{:<30} {:<20} {:<10}".format("Snapshot Name", "Created Date", "Age (days)")) print("-" * 80) now = datetime.now() for snapshot in snapshots: snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot) created_time = datetime.fromtimestamp(os.path.getctime(snapshot_path)) age_days = (now - created_time).days created_date = created_time.strftime("%Y-%m-%d %H:%M:%S") print("{:<30} {:<20} {:<10}".format(snapshot, created_date, age_days)) print("-" * 80) except Exception as e: logger.error(f"Error displaying snapshots: {str(e)}") print(f"Error: {str(e)}") # Remove snapshots older than the retention period def remove_old_snapshots(config): try: retention_days = config['snapshot']['retention_days'] now = datetime.now() removed_count = 0 for snapshot in list_snapshots(config): snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot) created_time = datetime.fromtimestamp(os.path.getctime(snapshot_path)) if (now - created_time).days > retention_days: # Remove snapshot cmd = ["btrfs", "subvolume", "delete", snapshot_path] logger.info(f"Running command: {' '.join(cmd)}") result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: remove_boot_entry(config, snapshot) logger.info(f"Removed snapshot: {snapshot}") removed_count += 1 else: logger.error(f"Failed to remove snapshot {snapshot}: {result.stderr.decode()}") return removed_count except Exception as e: logger.error(f"Error removing old snapshots: {str(e)}") raise # Fix incorrect adopt implementation def adopt(config, snapshot): try: current_root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume']) snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot) if not os.path.exists(snapshot_path): logger.error(f"Snapshot '{snapshot}' not found at {snapshot_path}") raise Exception(f"Snapshot '{snapshot}' not found.") # Check if we're on a live system (not booted into the snapshot) with open("/proc/cmdline", "r") as f: cmdline = f.read() if snapshot in cmdline: logger.warning(f"You are already booted into snapshot {snapshot}.") print(f"Warning: You are already booted into snapshot {snapshot}. No changes needed.") return # Create a backup of current root first before making changes backup_name = f"@_backup_{datetime.now().strftime('%Y%m%d-%H%M%S')}" backup_path = os.path.join(config['snapshot']['btrfs_mount_point'], backup_name) print(f"Creating backup of current root subvolume as {backup_name}") logger.info(f"Creating backup of current root at {backup_path}") # Create a snapshot of the current root cmd_backup = ["btrfs", "subvolume", "snapshot", current_root_path, backup_path] logger.info(f"Running command: {' '.join(cmd_backup)}") result = subprocess.run(cmd_backup, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: logger.error(f"Failed to create backup: {result.stderr.decode()}") raise Exception(f"Failed to create backup: {result.stderr.decode()}") # Delete current root subvolume print(f"Deleting current root subvolume") logger.info(f"Deleting current root subvolume at {current_root_path}") cmd_delete = ["btrfs", "subvolume", "delete", current_root_path] logger.info(f"Running command: {' '.join(cmd_delete)}") result = subprocess.run(cmd_delete, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: logger.error(f"Failed to delete current root: {result.stderr.decode()}") raise Exception(f"Failed to delete current root: {result.stderr.decode()}") # Create a new snapshot of the selected snapshot as the new root subvolume print(f"Creating new root subvolume from snapshot '{snapshot}'") logger.info(f"Creating new root from {snapshot_path} to {current_root_path}") cmd_new_root = ["btrfs", "subvolume", "snapshot", snapshot_path, current_root_path] logger.info(f"Running command: {' '.join(cmd_new_root)}") result = subprocess.run(cmd_new_root, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: logger.error(f"Failed to create new root: {result.stderr.decode()}") # Try to recover from backup print("Error occurred. Attempting to recover from backup...") try: if os.path.exists(backup_path): recovery_cmd = ["btrfs", "subvolume", "snapshot", backup_path, current_root_path] subprocess.run(recovery_cmd, check=True) print("Recovery successful.") logger.info("Recovered from backup after failed adopt operation") except Exception as recovery_error: logger.critical(f"Failed to recover from backup: {str(recovery_error)}") raise Exception(f"Failed to create new root: {result.stderr.decode()}") print("\n" + "=" * 80) print(f"SUCCESS: Snapshot '{snapshot}' has been adopted as the new root subvolume.") print("=" * 80) print("\nTo use this snapshot, you need to:") print("1. Reboot your system") print("2. In the bootloader menu, select your regular Slackware entry (not the snapshot entry)") print("3. The system will now boot with the adopted snapshot as the root filesystem") print("\nIf you encounter issues, you can:") print(f"- Boot into the backup snapshot '{backup_name}' (emergency only)") print(f"- Or boot into any other available snapshot from the bootloader menu") print("=" * 80) logger.info(f"Successfully adopted snapshot '{snapshot}' as new root subvolume") # Note: We don't need to update elilo.conf for the adopted snapshot since it's now the root except Exception as e: logger.error(f"Error adopting snapshot: {str(e)}") print(f"Error: {str(e)}") print("\nYour system may be in an inconsistent state. Please verify the root subvolume.") sys.exit(1) # Create a live bootable image from a snapshot def create_boot_image(config, snapshot): try: snapshot_path = os.path.join(config['snapshot']['snapshot_dir'], snapshot) if not os.path.exists(snapshot_path): logger.error(f"Snapshot '{snapshot}' not found.") raise Exception(f"Snapshot '{snapshot}' not found.") bootdir = config.get('boot', {}).get('boot_dir', '/boot') if not os.path.exists(bootdir): logger.error(f"Boot directory not found: {bootdir}") raise Exception(f"Boot directory not found: {bootdir}") # Create a custom initrd that boots directly into the snapshot # This is a simplified example - actual implementation would depend on # how Slackware builds initrd images print(f"Creating bootable image for snapshot '{snapshot}'...") logger.info(f"Creating bootable image for snapshot '{snapshot}'") # Ensure the snapshot entry exists in elilo.conf add_elilo_entry(config, snapshot) print("\n" + "=" * 80) print(f"SUCCESS: Boot entry for snapshot '{snapshot}' is configured.") print("=" * 80) print("\nTo boot into this snapshot:") print("1. Reboot your system") print("2. In the ELILO boot menu, select the entry labeled:") print(f" {snapshot}") print("\nNOTE: This will boot into the snapshot in read-only mode.") print(" To make it permanent, use 'snap-slack adopt --snapshot' after testing.") print("=" * 80) except Exception as e: logger.error(f"Error creating boot image: {str(e)}") print(f"Error: {str(e)}") # Main logic for managing snapshots def manage(config): try: # Step 1: Create snapshot directory if it doesn't exist os.makedirs(config['snapshot']['snapshot_dir'], exist_ok=True) # Step 2: Get list of current snapshots and add them to elilo.conf if needed print("Checking existing snapshots...") snapshots = list_snapshots(config) for snapshot in snapshots: add_elilo_entry(config, snapshot) # Step 3: Remove snapshots older than retention period print("Checking for old snapshots to remove...") removed = remove_old_snapshots(config) if removed > 0: print(f"Removed {removed} old snapshots.") else: print("No old snapshots to remove.") print("\nSnapshot management completed successfully.") except Exception as e: logger.error(f"Error managing snapshots: {str(e)}") print(f"Error: {str(e)}") # Verify system configuration def verify_system(config): try: issues = [] # Check if btrfs is installed result = subprocess.run(["which", "btrfs"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: issues.append("BTRFS tools not found. Please install btrfs-progs package.") # Check if root filesystem is btrfs result = subprocess.run(["findmnt", "-no", "FSTYPE", "/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if "btrfs" not in result.stdout.decode().strip(): issues.append("Root filesystem is not BTRFS. snap-slack requires a BTRFS root filesystem.") # Check if elilo.conf exists or can be created elilo_conf = config['elilo']['elilo_conf'] if not os.path.exists(elilo_conf): if not os.path.exists(os.path.dirname(elilo_conf)): issues.append(f"ELILO config directory not found: {os.path.dirname(elilo_conf)}") # Check if snapshot directory exists or can be created snapshot_dir = config['snapshot']['snapshot_dir'] if not os.path.exists(snapshot_dir): try: os.makedirs(snapshot_dir, exist_ok=True) except Exception: issues.append(f"Cannot create snapshot directory: {snapshot_dir}") # Check if root subvolume exists root_path = os.path.join(config['snapshot']['btrfs_mount_point'], config['snapshot']['root_subvolume']) if not os.path.exists(root_path): issues.append(f"Root subvolume not found: {root_path}") if issues: print("\nSystem verification found issues:") for i, issue in enumerate(issues, 1): print(f"{i}. {issue}") return False else: print("System verification passed. Your system is properly configured for snap-slack.") return True except Exception as e: logger.error(f"Error verifying system: {str(e)}") print(f"Error verifying system: {str(e)}") return False # Parse command-line arguments def parse_arguments(): parser = argparse.ArgumentParser( description='Manage BTRFS snapshots and bootloader entries.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: snap-slack create Create a new snapshot snap-slack create --description "pre-update" Create snapshot with description snap-slack list List all available snapshots snap-slack manage Manage existing snapshots snap-slack boot --snapshot X Set up specific snapshot for booting snap-slack adopt --snapshot X Adopt a snapshot as the new root filesystem snap-slack verify Verify system configuration """ ) # Global options parser.add_argument('--dry-run', action='store_true', help='Show what would be done without actually doing it') parser.add_argument('--test-dir', help='Use a test directory instead of the real root') parser.add_argument('--config', help=f'Path to config file (default: {CONFIG_PATH})') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output') parser.add_argument('--simulate', action='store_true', help='Simulate operations (for safe testing)') subparsers = parser.add_subparsers(dest='action', help='Action to perform') # Create command create_parser = subparsers.add_parser('create', help='Create a new snapshot') create_parser.add_argument('--description', help='Optional description for the snapshot') # List command list_parser = subparsers.add_parser('list', help='List all snapshots') # Manage command manage_parser = subparsers.add_parser('manage', help='Manage snapshots (clean up old ones, update bootloader config)') # Boot command boot_parser = subparsers.add_parser('boot', help='Configure a snapshot for booting') boot_parser.add_argument('--snapshot', required=True, help='The snapshot to configure for booting') # Adopt command adopt_parser = subparsers.add_parser('adopt', help='Adopt a snapshot as the new root subvolume') adopt_parser.add_argument('--snapshot', required=True, help='The snapshot to adopt as the new root subvolume') # Verify command verify_parser = subparsers.add_parser('verify', help='Verify system configuration') return parser.parse_args() def main(): try: # Parse command-line arguments args = parse_arguments() if not args.action: print("Error: No action specified. Use --help for usage information.") sys.exit(1) # Set logging level based on verbosity if args.verbose: logger.setLevel(logging.DEBUG) # Load configuration config_path = args.config if args.config else CONFIG_PATH config = read_config(config_path, args.test_dir) # Execute based on action if args.action == 'create': snapshot = create_snapshot( config, dry_run=args.dry_run, simulate=args.simulate, description=args.description if hasattr(args, 'description') else None ) if snapshot: print(f"{'[SIMULATE] ' if args.simulate else ''}{'[DRY-RUN] ' if args.dry_run else ''}Created snapshot: {snapshot}") print(f"To boot from this snapshot, use: snap-slack boot --snapshot {snapshot}") elif args.action == 'list': display_snapshots(config) elif args.action == 'manage': manage(config, dry_run=args.dry_run, simulate=args.simulate) elif args.action == 'boot': create_boot_image(config, args.snapshot, dry_run=args.dry_run, simulate=args.simulate) elif args.action == 'adopt': adopt(config, args.snapshot, dry_run=args.dry_run, simulate=args.simulate) elif args.action == 'verify': verify_system(config) elif args.action == 'install-hooks': install_slackpkg_hooks(config, dry_run=args.dry_run, simulate=args.simulate) else: print(f"Invalid action: {args.action}") sys.exit(1) except KeyboardInterrupt: print("\nOperation cancelled by user.") sys.exit(130) except Exception as e: logger.error(f"Unhandled exception: {str(e)}") print(f"Error: {str(e)}") sys.exit(1) # Install SlackPkg hooks def install_slackpkg_hooks(config, dry_run=False, simulate=False): hooks_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "install-hooks.sh") if os.path.exists(hooks_script): # Execute the hook installation script cmd = ["bash", hooks_script] returncode, stdout, stderr = safe_execute( cmd, dry_run=dry_run, simulate=simulate, description="Installing SlackPkg hooks" ) if returncode != 0 and not (dry_run or simulate): print(f"Error installing hooks: {stderr}") return False return True else: # Generate the hooks in-place hooks_dir = "/etc/slackpkg/hooks" pre_hook = os.path.join(hooks_dir, "pre-install.sh") post_hook = os.path.join(hooks_dir, "post-install.sh") # Create hooks directory if not os.path.exists(hooks_dir) and not (dry_run or simulate): try: os.makedirs(hooks_dir, exist_ok=True) except Exception as e: print(f"Error creating hooks directory: {str(e)}") return False # Pre-install hook content pre_hook_content = """#!/bin/bash # SlackPkg Pre-install Hook for snap-slack # This file was automatically installed by snap-slack # Configuration SNAP_SLACK=/usr/bin/snap-slack ENABLE_AUTO_SNAPSHOTS=1 # Set to 0 to disable automatic snapshots MAX_PACKAGE_COUNT=100 # Maximum number of packages to include in snapshot name # Check if snap-slack is installed if [ ! -x "$SNAP_SLACK" ]; then echo "WARNING: snap-slack not found at $SNAP_SLACK, skipping pre-install snapshot" exit 0 fi # Check if auto snapshots are enabled if [ "$ENABLE_AUTO_SNAPSHOTS" != "1" ]; then echo "INFO: Automatic snapshots are disabled in slackpkg hook" exit 0 fi # Function to create a snapshot create_snapshot() { local desc="$1" echo "Creating snapshot before package operations: $desc" $SNAP_SLACK create --description "$desc" } # Get the operation being performed OPERATION="$1" shift PACKAGES="$@" # Create appropriate snapshot based on operation case "$OPERATION" in upgrade-all) create_snapshot "pre-upgrade-all" ;; install|upgrade) # Limit the number of packages in the snapshot name for readability if [ "$(echo $PACKAGES | wc -w)" -gt $MAX_PACKAGE_COUNT ]; then PKG_COUNT=$(echo $PACKAGES | wc -w) create_snapshot "pre-$OPERATION-$PKG_COUNT-packages" else create_snapshot "pre-$OPERATION-$(echo $PACKAGES | tr ' ' '-')" fi ;; remove) if [ "$(echo $PACKAGES | wc -w)" -gt $MAX_PACKAGE_COUNT ]; then PKG_COUNT=$(echo $PACKAGES | wc -w) create_snapshot "pre-remove-$PKG_COUNT-packages" else create_snapshot "pre-remove-$(echo $PACKAGES | tr ' ' '-')" fi ;; *) # For other operations, create a generic snapshot create_snapshot "pre-slackpkg-operation" ;; esac exit 0 """ # Post-install hook content post_hook_content = """#!/bin/bash # SlackPkg Post-install Hook for snap-slack # This file was automatically installed by snap-slack # Configuration SNAP_SLACK=/usr/bin/snap-slack AUTO_CLEANUP=1 # Set to 0 to disable automatic snapshot cleanup # Check if snap-slack is installed if [ ! -x "$SNAP_SLACK" ]; then echo "WARNING: snap-slack not found at $SNAP_SLACK, skipping post-install operations" exit 0 fi # Check if auto cleanup is enabled if [ "$AUTO_CLEANUP" = "1" ]; then echo "Running snapshot management to clean up old snapshots" $SNAP_SLACK manage fi # Log the successful completion echo "Package operation completed successfully." echo "If you encounter issues, you can rollback using:" echo " snap-slack list # to see available snapshots" echo " snap-slack adopt --snapshot # to rollback" exit 0 """ # Write the pre-install hook safe_write_file( pre_hook, pre_hook_content, dry_run=dry_run, simulate=simulate, description="Creating pre-install hook" ) # Write the post-install hook safe_write_file( post_hook, post_hook_content, dry_run=dry_run, simulate=simulate, description="Creating post-install hook" ) # Make hooks executable if not (dry_run or simulate): try: os.chmod(pre_hook, 0o755) os.chmod(post_hook, 0o755) except Exception as e: print(f"Error setting permissions on hooks: {str(e)}") return False print("SlackPkg hooks installed successfully.") print("The hooks will create snapshots before package operations and clean up old snapshots afterward.") print("To disable automatic snapshots, edit /etc/slackpkg/hooks/pre-install.sh and set ENABLE_AUTO_SNAPSHOTS=0") print("To disable automatic cleanup, edit /etc/slackpkg/hooks/post-install.sh and set AUTO_CLEANUP=0") return True if __name__ == "__main__": main()