Nexus 安装使用

Nexus 安装使用

1.使用 Docker Compose 安装 Nexus

version: "3.8"

services:
  nexus:
    image: sonatype/nexus3:latest
    container_name: nexus
    restart: always
    ports:
      - "8081:8081"
    volumes:
      - nexus-data:/nexus-data
    environment:
      - INSTALL4J_ADD_VM_PARAMS=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m

volumes:
  nexus-data:

2. 从容器中获取 Nexus 初始密码

Nexus 首次启动会生成一个随机的管理员密码,存放在容器内部。

docker exec nexus cat /nexus-data/admin.password

初始账号 admin

3. 配置阿里云仓库

maven-aliyun-center

https://maven.aliyun.com/repository/central

4. 项目中使用

4.1 pom.xml 中使用

<repositories>
    <repository>
        <id>nexus</id>
        <name>Nexus Proxy</name>
        <url>http://localhost:8081/repository/maven-public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>nexus</id>
        <url>http://localhost:8081/repository/maven-public/</url>
    </pluginRepository>
</pluginRepositories>

4.2 setting.xml 中使用

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                              https://maven.apache.org/xsd/settings-1.0.0.xsd">

    <!-- 1. 镜像配置:强制所有仓库走 Nexus -->
    <mirrors>
        <mirror>
            <id>nexus</id>
            <mirrorOf>*</mirrorOf>
            <name>Nexus Mirror</name>
            <url>http://localhost:8081/repository/maven-public/</url>
        </mirror>
    </mirrors>

    <!-- 2. 认证信息(如 Nexus 开启了登录) -->
    <servers>
        <server>
            <id>nexus</id>
            <username>admin</username>
            <password>你的密码</password>
        </server>
    </servers>

    <!-- 3. Profile(可选,但推荐) -->
    <profiles>
        <profile>
            <id>nexus</id>
            <repositories>
                <repository>
                    <id>central</id>
                    <url>http://localhost:8081/repository/maven-public/</url>
                    <releases>
                        <enabled>true</enabled>
                    </releases>
                    <snapshots>
                        <enabled>true</enabled>
                    </snapshots>
                </repository>
            </repositories>

            <pluginRepositories>
                <pluginRepository>
                    <id>central</id>
                    <url>http://localhost:8081/repository/maven-public/</url>
                </pluginRepository>
            </pluginRepositories>
        </profile>
    </profiles>

    <!-- 4. 默认启用 -->
    <activeProfiles>
        <activeProfile>nexus</activeProfile>
    </activeProfiles>

</settings>

5 python导入本地仓库包到Nexus

1.清除下载失败失效的jar包

SOURCE_DIR改成自己目录

import os
import concurrent.futures
try:
    from tqdm import tqdm
except ImportError:
    print("❌ 请先安装进度条库: pip install tqdm")
    exit(1)

# ================= 配置区域 =================
SOURCE_DIR = r"D:\work\MavenRepository\mavenRepository-old"
MAX_JAR_SIZE = 1024  # 1KB
BAD_POM_KEYWORDS = ["<!DOCTYPE html>", "<title>Harbor</title>", "Login to Harbor"]

# 根据你的 12600KF (16线程),IO密集型任务我们可以设置得稍微激进一点
# 设为 CPU核心数 * 4,保证磁盘 IO 跑满
WORKER_COUNT = os.cpu_count() * 4 
# ===========================================

def check_files_in_directory(root, files):
    """
    工作线程任务:检查单个目录下的所有文件
    返回:该目录下所有坏包的 (root, base_name) 列表
    """
    bad_items = []
    
    for file in files:
        file_path = os.path.join(root, file)
        is_bad = False
        
        # 1. 判定 Jar 是否损坏 (利用短路逻辑,先判断后缀)
        if file.endswith(".jar"):
            try:
                if os.path.getsize(file_path) < MAX_JAR_SIZE:
                    is_bad = True
            except: pass
        
        # 2. 判定 POM 是否损坏
        elif file.endswith(".pom"):
            try:
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    # 只读头 1KB
                    content = f.read(1024)
                    for keyword in BAD_POM_KEYWORDS:
                        if keyword in content:
                            is_bad = True
                            break
            except: pass
            
        if is_bad:
            base_name = file
            if base_name.endswith(".jar"): base_name = base_name[:-4]
            if base_name.endswith(".pom"): base_name = base_name[:-4]
            bad_items.append((root, base_name))
            
    return bad_items

def delete_task(args):
    """
    删除任务的线程函数
    """
    folder, base_name, metadata_files = args
    deleted = 0
    if not os.path.exists(folder):
        return 0

    try:
        files = os.listdir(folder)
    except:
        return 0

    for file in files:
        file_path = os.path.join(folder, file)
        should_delete = False
        
        if file.startswith(base_name):
            should_delete = True
        elif file in metadata_files:
            should_delete = True
            
        if should_delete:
            try:
                os.remove(file_path)
                deleted += 1
            except: pass
    return deleted

def main():
    if not os.path.exists(SOURCE_DIR):
        print(f"❌ 目录不存在: {SOURCE_DIR}")
        return

    print(f"🚀 [i5-12600KF] 全力模式启动")
    print(f"🔥 启用线程数: {WORKER_COUNT}")
    print(f"📂 扫描目标: {SOURCE_DIR}")

    directories_to_clean = set()
    total_files_scanned = 0
    
    # ------------------ 扫描阶段 ------------------
    # 使用 ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor:
        futures = []
        
        print("⏳ 正在遍历目录结构并分发任务...")
        
        # 生产者:主线程快速遍历目录
        for root, dirs, files in os.walk(SOURCE_DIR):
            # 剪枝优化
            dirs[:] = [d for d in dirs if not d.startswith('.')]
            
            if not files:
                continue
                
            # 消费者:将“处理一个文件夹内的所有文件”作为一个任务提交
            # 这样比“每个文件一个任务”效率高得多
            future = executor.submit(check_files_in_directory, root, files)
            futures.append(future)

        # 等待结果并显示进度条
        print(f"✅ 目录遍历完成,共生成 {len(futures)} 个批处理任务,开始并行扫描...")
        
        with tqdm(total=len(futures), unit="dir", desc="并行扫描中") as pbar:
            for future in concurrent.futures.as_completed(futures):
                try:
                    bad_results = future.result()
                    if bad_results:
                        for item in bad_results:
                            directories_to_clean.add(item)
                except Exception as e:
                    pass
                pbar.update(1)

    # ------------------ 结果展示 ------------------
    if not directories_to_clean:
        print("\n✅ 恭喜!没有发现损坏文件。")
        return

    print(f"\n⚠️ 发现 {len(directories_to_clean)} 组损坏的构件版本。")
    print("示例前3项:")
    for i, (folder, name) in enumerate(list(directories_to_clean)[:3]):
        print(f" - {os.path.join(folder, name)}")

    confirm = input("\n🔴 确认删除?(y/n): ")
    if confirm.lower() != 'y': return

    # ------------------ 删除阶段 (同样多线程) ------------------
    print("\n🧹 启动多线程清理...")
    metadata_files = ["_remote.repositories", "_maven.repositories", "resolver-status.properties"]
    
    total_deleted = 0
    
    # 删除通常很快,但也可能因为磁盘IO卡顿,所以也并行化
    delete_tasks = [(folder, base_name, metadata_files) for folder, base_name in directories_to_clean]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor:
        # map 会按顺序返回结果,适合进度条
        results = list(tqdm(executor.map(delete_task, delete_tasks), total=len(delete_tasks), desc="并行删除中"))
        total_deleted = sum(results)

    print(f"\n✨ 大功告成!利用多线程共删除了 {total_deleted} 个文件。")

if __name__ == "__main__":
    main()

2. 通过http方式上传到nexus

REPO_URL :nexus部署路径+仓库名称

USERNAME :nexus用户名

PASSWORD :nexus密码

SOURCE_DIR:本地maven仓库路径

nexus默认是不允许覆盖上传需要更改配置才运行覆盖上传

import os
import threading
from pathlib import Path
from xml.etree import ElementTree as ET
import requests
from requests.auth import HTTPBasicAuth
from tqdm import tqdm
from queue import Queue

# ================= CONFIGURATION =================
REPO_URL = "http://127.0.0.1:8071/repository/maven-releases/"
USERNAME = "admin"
PASSWORD = "admin"
SOURCE_DIR = r"D:\work\MavenRepository\mavenRepository"
THREAD_COUNT = 10
# =================================================

class MavenUploader:
    def __init__(self, repo_url, username, password):
        self.repo_url = repo_url
        self.auth = HTTPBasicAuth(username, password)
        self.session = requests.Session()
        self.session.auth = self.auth
        
    def parse_pom(self, pom_path):
        """解析POM文件获取GAV信息"""
        try:
            tree = ET.parse(pom_path)
            root = tree.getroot()
            ns = {'m': 'http://maven.apache.org/POM/4.0.0'}
            
            def get_text(xpath_with_ns, xpath_without_ns):
                elem = root.find(xpath_with_ns, ns)
                if elem is not None and elem.text:
                    return elem.text.strip()
                elem = root.find(xpath_without_ns)
                if elem is not None and elem.text:
                    return elem.text.strip()
                return None
            
            group_id = get_text('m:groupId', 'groupId')
            artifact_id = get_text('m:artifactId', 'artifactId')
            version = get_text('m:version', 'version')
            packaging = get_text('m:packaging', 'packaging') or 'jar'
            
            if not group_id:
                group_id = get_text('m:parent/m:groupId', 'parent/groupId')
            if not version:
                version = get_text('m:parent/m:version', 'parent/version')
            
            return group_id, artifact_id, version, packaging
        except Exception as e:
            return None, None, None, None
    
    def upload_file(self, file_path, group_id, artifact_id, version, filename):
        """上传文件到Nexus"""
        try:
            group_path = group_id.replace(".", "/")
            remote_path = f"{group_path}/{artifact_id}/{version}/{filename}"
            upload_url = f"{self.repo_url}{remote_path}"
            
            with open(file_path, 'rb') as f:
                response = self.session.put(upload_url, data=f, timeout=300)
            
            return response.status_code in [200, 201, 204]
        except Exception as e:
            return False
    
    def process_artifact(self, pom_path, pbar):
        """处理单个Maven构件"""
        group_id, artifact_id, version, packaging = self.parse_pom(pom_path)
        
        if not all([group_id, artifact_id, version]):
            pbar.write(f"⚠ Skip (Bad POM): {pom_path}")
            return False
        
        uploaded = []
        failed = []
        
        # 1. 始终上传 POM
        pom_filename = os.path.basename(pom_path)
        if self.upload_file(pom_path, group_id, artifact_id, version, pom_filename):
            uploaded.append('pom')
        else:
            failed.append('pom')
        
        # 2. 上传主构件(JAR/WAR/etc)
        base_path = pom_path.rsplit('.', 1)[0]
        main_artifact = f"{base_path}.{packaging}"
        
        if os.path.exists(main_artifact):
            main_filename = os.path.basename(main_artifact)
            if self.upload_file(main_artifact, group_id, artifact_id, version, main_filename):
                uploaded.append(packaging)
            else:
                failed.append(packaging)
        
        # 3. 上传附加文件
        for suffix in ['-sources.jar', '-javadoc.jar', '-tests.jar']:
            extra_file = base_path + suffix
            if os.path.exists(extra_file):
                extra_filename = os.path.basename(extra_file)
                if self.upload_file(extra_file, group_id, artifact_id, version, extra_filename):
                    uploaded.append(suffix.strip('-'))
                else:
                    failed.append(suffix.strip('-'))
        
        # 输出结果
        if uploaded:
            files_str = ', '.join(uploaded)
            pbar.write(f"✓ {group_id}:{artifact_id}:{version} [{files_str}]")
        
        if failed:
            files_str = ', '.join(failed)
            pbar.write(f"✗ {group_id}:{artifact_id}:{version} FAILED: [{files_str}]")
        
        return len(failed) == 0

def worker(uploader, task_queue, pbar, stats):
    """工作线程"""
    while True:
        pom_path = task_queue.get()
        if pom_path is None:
            break
        
        try:
            success = uploader.process_artifact(pom_path, pbar)
            stats['success' if success else 'failed'] += 1
        except Exception as e:
            pbar.write(f"✗ Error: {pom_path} - {str(e)}")
            stats['failed'] += 1
        finally:
            pbar.update(1)
            task_queue.task_done()

class ThreadSafeStats(dict):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._lock = threading.Lock()
    
    def __setitem__(self, key, value):
        with self._lock:
            super().__setitem__(key, value)
    
    def __getitem__(self, key):
        with self._lock:
            return super().__getitem__(key)

def main():
    print(f"Scanning directory: {SOURCE_DIR} ...")
    
    # 查找所有 POM 文件
    all_poms = list(Path(SOURCE_DIR).rglob("*.pom"))
    
    # 过滤掉临时文件
    pom_files = [
        p for p in all_poms 
        if not any(x in str(p) for x in ['maven-metadata', 'resolver-status', '.lastUpdated'])
    ]
    
    total = len(pom_files)
    print(f"Found {total} POM files (filtered from {len(all_poms)} total)")
    
    if total == 0:
        print("No valid POM files found!")
        return
    
    uploader = MavenUploader(REPO_URL, USERNAME, PASSWORD)
    task_queue = Queue()
    stats = ThreadSafeStats({'success': 0, 'failed': 0})
    
    pbar = tqdm(total=total, desc="Uploading", unit="artifact")
    
    # 启动线程
    threads = []
    for _ in range(THREAD_COUNT):
        t = threading.Thread(target=worker, args=(uploader, task_queue, pbar, stats))
        t.daemon = True
        t.start()
        threads.append(t)
    
    # 添加任务
    for pom in pom_files:
        task_queue.put(str(pom))
    
    # 等待完成
    task_queue.join()
    
    # 结束线程
    for _ in range(THREAD_COUNT):
        task_queue.put(None)
    for t in threads:
        t.join()
    
    pbar.close()
    
    print("\n" + "="*50)
    print(f"Upload Summary:")
    print(f"  Total Artifacts: {total}")
    print(f"  Success: {stats['success']}")
    print(f"  Failed: {stats['failed']}")
    print("="*50)

if __name__ == "__main__":
    main()