Ansible 高级特性 #

动态 Inventory #

什么是动态 Inventory? #

动态 Inventory 通过脚本或插件从外部系统(云平台、CMDB 等)动态获取主机列表。

动态 Inventory 插件 #

yaml
# inventory/aws_ec2.yml
plugin: aws_ec2
regions:
  - us-east-1
  - us-west-2
filters:
  tag:Environment: production
keyed_groups:
  - key: tags.Environment
    prefix: env
  - key: tags.Application
    prefix: app
  - key: instance_type
    prefix: type
compose:
  ansible_host: private_ip_address

Azure 动态 Inventory #

yaml
# inventory/azure_rm.yml
plugin: azure_rm
auth_source: auto
subscription_id: your-subscription-id
include_vm_resource_groups:
  - production-rg
conditional_groups:
  webservers: "'web' in tags.role"
  databases: "'database' in tags.role"

自定义动态 Inventory 脚本 #

python
#!/usr/bin/env python3
# inventory/custom.py

import json
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--list', action='store_true')
    parser.add_argument('--host', type=str)
    args = parser.parse_args()
    
    if args.list:
        inventory = {
            "webservers": {
                "hosts": ["web1", "web2"],
                "vars": {
                    "ansible_user": "admin"
                }
            },
            "databases": {
                "hosts": ["db1", "db2"]
            },
            "_meta": {
                "hostvars": {
                    "web1": {"ansible_host": "192.168.1.100"},
                    "web2": {"ansible_host": "192.168.1.101"},
                    "db1": {"ansible_host": "192.168.1.200"},
                    "db2": {"ansible_host": "192.168.1.201"}
                }
            }
        }
        print(json.dumps(inventory))
    elif args.host:
        print(json.dumps({}))

if __name__ == '__main__':
    main()

自定义模块 #

创建自定义模块 #

python
# library/my_module.py

#!/usr/bin/python

from ansible.module_utils.basic import AnsibleModule

def run_module():
    module_args = dict(
        name=dict(type='str', required=True),
        value=dict(type='str', required=False, default=''),
        state=dict(type='str', default='present', choices=['present', 'absent'])
    )
    
    result = dict(
        changed=False,
        message=''
    )
    
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    if module.check_mode:
        module.exit_json(**result)
    
    name = module.params['name']
    value = module.params['value']
    state = module.params['state']
    
    result['message'] = f"Processed {name} with value {value}"
    result['changed'] = True
    
    module.exit_json(**result)

def main():
    run_module()

if __name__ == '__main__':
    main()

使用自定义模块 #

yaml
- name: Use custom module
  my_module:
    name: myapp
    value: myvalue
    state: present
  register: result

- name: Show result
  debug:
    var: result

回调插件 #

创建回调插件 #

python
# callback_plugins/custom.py

from ansible.plugins.callback import CallbackBase

class CallbackModule(CallbackBase):
    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'notification'
    CALLBACK_NAME = 'custom'
    CALLBACK_NEEDS_WHITELIST = True
    
    def v2_runner_on_ok(self, result):
        host = result._host.get_name()
        self._display.display(f"[OK] {host}: Task completed successfully")
    
    def v2_runner_on_failed(self, result, ignore_errors=False):
        host = result._host.get_name()
        self._display.display(f"[FAILED] {host}: Task failed")
    
    def v2_runner_on_unreachable(self, result):
        host = result._host.get_name()
        self._display.display(f"[UNREACHABLE] {host}: Host unreachable")

启用回调插件 #

ini
# ansible.cfg
[defaults]
callback_whitelist = custom, profile_tasks, timer

过滤器插件 #

创建过滤器插件 #

python
# filter_plugins/custom.py

def to_uppercase(value):
    return str(value).upper()

def to_lowercase(value):
    return str(value).lower()

def reverse_string(value):
    return str(value)[::-1]

class FilterModule(object):
    def filters(self):
        return {
            'to_uppercase': to_uppercase,
            'to_lowercase': to_lowercase,
            'reverse_string': reverse_string
        }

使用自定义过滤器 #

yaml
- name: Use custom filter
  debug:
    msg: "{{ my_string | to_uppercase }}"

连接插件 #

自定义连接插件 #

python
# connection_plugins/custom.py

from ansible.plugins.connection import ConnectionBase
from ansible.errors import AnsibleConnectionFailure

class Connection(ConnectionBase):
    transport = 'custom'
    
    def __init__(self, *args, **kwargs):
        super(Connection, self).__init__(*args, **kwargs)
    
    def _connect(self):
        if not self._connected:
            self._connected = True
        return self
    
    def exec_command(self, cmd, in_data=None, sudoable=True):
        super(Connection, self).exec_command(cmd, in_data, sudoable)
        return 0, '', ''
    
    def put_file(self, in_path, out_path):
        super(Connection, self).put_file(in_path, out_path)
    
    def fetch_file(self, in_path, out_path):
        super(Connection, self).fetch_file(in_path, out_path)
    
    def close(self):
        self._connected = False

Ansible Collections #

什么是 Collection? #

Collection 是 Ansible 内容的分发格式,包含模块、插件、角色等。

Collection 结构 #

text
my_namespace/
└── my_collection/
    ├── galaxy.yml
    ├── plugins/
    │   ├── modules/
    │   │   └── my_module.py
    │   ├── callback/
    │   │   └── my_callback.py
    │   └── filter/
    │       └── my_filter.py
    ├── roles/
    │   └── my_role/
    ├── playbooks/
    │   └── site.yml
    └── docs/

galaxy.yml #

yaml
# galaxy.yml
namespace: my_namespace
name: my_collection
version: 1.0.0
authors:
  - Your Name <you@example.com>
description: 学习 Ansible 自动化配置管理,用于运维自动化
license:
  - MIT
tags:
  - infrastructure
  - web
dependencies: {}
repository: https://github.com/myorg/my_collection
documentation: https://docs.example.com
homepage: https://example.com
issues: https://github.com/myorg/my_collection/issues
build_ignore:
  - '*.tar.gz'
  - tests

构建 Collection #

bash
# 构建 Collection
ansible-galaxy collection build

# 安装 Collection
ansible-galaxy collection install my_namespace-my_collection-1.0.0.tar.gz

# 从 Ansible Galaxy 安装
ansible-galaxy collection install my_namespace.my_collection

使用 Collection #

yaml
# 使用 Collection 中的模块
- name: Use collection module
  my_namespace.my_collection.my_module:
    name: myapp
    state: present

# 使用 Collection 中的角色
- hosts: all
  roles:
    - my_namespace.my_collection.my_role

Lookups 插件 #

创建 Lookup 插件 #

python
# lookup_plugins/my_lookup.py

from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleLookupError

class LookupModule(LookupBase):
    def run(self, terms, variables=None, **kwargs):
        ret = []
        for term in terms:
            result = f"processed_{term}"
            ret.append(result)
        return ret

使用 Lookup 插件 #

yaml
- name: Use custom lookup
  debug:
    msg: "{{ lookup('my_lookup', 'value1', 'value2') }}"

策略插件 #

自定义策略插件 #

python
# strategy_plugins/custom.py

from ansible.plugins.strategy import StrategyBase

class StrategyModule(StrategyBase):
    def run(self, iterator, play_context):
        self._display.display("Running custom strategy")
        return super(StrategyModule, self).run(iterator, play_context)

使用自定义策略 #

yaml
- hosts: all
  strategy: custom
  tasks:
    - name: Task
      debug:
        msg: "Hello"

Facts 缓存 #

配置 Facts 缓存 #

ini
# ansible.cfg
[defaults]
gathering = smart
fact_caching = redis
fact_caching_timeout = 86400
fact_caching_connection = localhost:6379:0

JSON 文件缓存 #

ini
# ansible.cfg
[defaults]
gathering = smart
fact_caching = jsonfile
fact_caching_timeout = 86400
fact_caching_connection = /tmp/ansible_facts

并行执行 #

使用 Mitogen #

bash
# 安装 Mitogen
pip install mitogen

# 配置 ansible.cfg
[defaults]
strategy_plugins = /path/to/mitogen/ansible_mitogen/plugins/strategy
strategy = mitogen_linear

使用 Throttle #

yaml
- name: Limited parallel execution
  command: /usr/bin/long_task.sh
  throttle: 1    # 同时只允许 1 个主机执行

滚动更新 #

完整滚动更新示例 #

yaml
---
- name: Rolling update deployment
  hosts: webservers
  serial: 1
  become: yes
  max_fail_percentage: 0
  
  pre_tasks:
    - name: Remove from load balancer
      haproxy:
        backend: web_servers
        host: "{{ inventory_hostname }}"
        state: disabled
      delegate_to: loadbalancer
  
  tasks:
    - name: Update application
      git:
        repo: "{{ app_repo }}"
        dest: "{{ app_dir }}"
        version: "{{ app_version }}"
      notify: Restart App
    
    - name: Run migrations
      command: "{{ app_dir }}/migrate.sh"
      when: run_migrations | default(false)
    
    - name: Flush handlers
      meta: flush_handlers
    
    - name: Wait for application
      wait_for:
        port: "{{ app_port }}"
        timeout: 60
    
    - name: Health check
      uri:
        url: "http://localhost:{{ app_port }}/health"
        status_code: 200
      register: health
      retries: 5
      delay: 5
      until: health.status == 200
  
  post_tasks:
    - name: Add back to load balancer
      haproxy:
        backend: web_servers
        host: "{{ inventory_hostname }}"
        state: enabled
      delegate_to: loadbalancer
  
  handlers:
    - name: Restart App
      service:
        name: "{{ app_name }}"
        state: restarted

调试技巧 #

使用 ansible-playbook-debug #

bash
# 使用调试模式
ANSIBLE_DEBUG=True ansible-playbook site.yml

# 使用详细模式
ansible-playbook -vvvv site.yml

使用 debug 模块 #

yaml
- name: Debug all variables
  debug:
    var: hostvars[inventory_hostname]

- name: Debug specific variable
  debug:
    msg: "Value is {{ my_var }}"

使用 assert 模块 #

yaml
- name: Verify conditions
  assert:
    that:
      - my_var is defined
      - my_var | length > 0
      - my_var is string
    fail_msg: "Variable validation failed"
    success_msg: "Variable validation passed"

性能调优 #

SSH 优化 #

ini
# ansible.cfg
[ssh_connection]
ssh_args = -o ControlMaster=auto -o ControlPersist=60s -o Compression=yes -o CompressionLevel=9
pipelining = True
scp_if_ssh = smart
transfer_method = smart

并发优化 #

ini
# ansible.cfg
[defaults]
forks = 50
poll_interval = 15
internal_poll_interval = 0.001

优化 Facts 收集 #

yaml
- hosts: all
  gather_facts: yes
  gather_subset:
    - network
    - virtual
  gather_timeout: 20

总结 #

Ansible 的高级特性提供了强大的扩展能力:

  1. 动态 Inventory - 从外部系统动态获取主机
  2. 自定义模块 - 扩展 Ansible 功能
  3. 回调插件 - 自定义执行结果处理
  4. 过滤器插件 - 扩展 Jinja2 过滤器
  5. Collections - 模块化分发 Ansible 内容
  6. Facts 缓存 - 提升执行效率
  7. 滚动更新 - 安全的部署策略

掌握这些高级特性,可以让你更好地定制和扩展 Ansible,满足各种复杂的自动化需求。

最后更新:2026-03-29