AWS CLI に追加された aws login コマンドを読み解いて最小機能を抜き出す

2025.11.19 の AWS CLI のアップデートで aws login というコマンドが追加された。

ブラウザでログインしている AWS マネジメントコンソールのセッションから CLI の一時クレデンシャルを直接取得する機能を提供する。 その他、認証情報をキャッシュしたり、profile 切り替えをサポートしたり、リモート環境向けの拡張などが含まれている。

正直、過剰な機能なども存在しており、単に AWS マネジメントコンソールのセッションを CLI で使いたいだけの用途には過剰に感じられた。 aws login コマンドの中身は実装も公開されているため、振る舞いを調査し、必要最小限の機能だけを抜き出して使えるようにしてみた。

この記事では、その調査内容と、サンプルコードを紹介する。

aws login コマンドの提供機能について

aws login は単に「AWS マネジメントコンソールのセッションを借りて一時クレデンシャルを取ってくる」以上のことをしていて、CLI として便利に使うための工夫が色々入っている。

1. 認証情報の自動キャッシュとリフレッシュ

aws login は取得した一時クレデンシャルを ~/.aws/login/cache/ 以下に保存し、CLI や SDK から利用できるようにしている(AWS_LOGIN_CACHE_DIRECTORY で変更可能)。 AWS マネジメントコンソールのセッション最大時間(12 時間)までは、これを読み出す CLI や SDK が勝手に更新し続けるため、一度ログインすれば長時間作業が続けられる。 保存されるキャッシュの中に refresh token も含まれており、更新にはこれを使っているためブラウザが不意に起動するとかはない。

2. profile 管理のサポート

aws login --profile xxx を一度実行すると、~/.aws/config へセッションを識別する情報が書き込まれる(未指定は default profile)。

[profile xxx]
login_session = arn:aws:iam::<account>:role/<role-name>

これによって、その後はその profile でのクレデンシャルは aws login が管理するキャッシュから自動的に取得されるようになる。

3. リモート環境でセッションを利用するためのの --remote モード

aws login には --remote というオプションがあり、ローカル環境で動作を完結できない場合に利用する。

通常のフローでは、CLI がローカルに HTTP サーバを立て、redirect_urihttp://127.0.0.1:<port> にする。しかし、リモート環境ではこれは使えない。 そこで --remote を付けると、次のようなフローに切り替わる。

  1. リモート環境の CLI が authorization endpoint の URL を生成し標準出力にそのまま表示
  2. ユーザが手元のブラウザ(ローカル環境)でその URL を開く
  3. 認証が完了すると、ブラウザ側に認可コードが表示される
  4. その認可コードをリモート環境の CLI にコピペする
  5. リモート環境のCLI が token endpoint を叩いて一時クレデンシャルを取得する

つまり、認証はユーザのローカル環境で行い、トークン交換だけをリモート環境で実行する、という構造になっている。 ブラウザを開けない EC2 やコンテナ環境でも、通常のアカウントで安全にログインできるようにするための機能で、SSO や長期アクセスキーの代替としても十分実用的。

これらの仕組みのおかげで、aws login は「毎回ブラウザを開いてログインし直す」ような手間がなく、通常の長期アクセスキーよりも安全で、かつ手軽に扱える。 今回の自作 CLI はこれらの付加機能を省略して最小限にしたが、本家の利便性はこのあたりの作り込みに支えられている。

最小構成の実現

最小構成で実現するために必要なことは以下の通り。要は単なる OAuth 2.0 クライアントの実装である。

  1. PKCEcode_verifiercode_challenge を生成
  2. ローカル HTTP サーバを redirect_uri として起動
  3. ブラウザで authorization endpoint を開く
  4. AWS マネジメントコンソールのアクティブセッションを選択
  5. authorization code がローカルの redirect_uri に返ってくる
  6. token endpoint に DPoP ヘッダを付けてリクエスト
  7. レスポンスに AWS API 用の一時クレデンシャルが返ってくる

AWS CLI にあるようなキャッシュやプロファイル管理、--remote モードなどの付加機能は省略している。 ざっとこれだけで AWS マネジメントコンソールのセッションを使った一時クレデンシャルの取得が可能になる。

authorization endpoint と token endpoint

aws login コマンドが利用しているエンドポイントは次の2つ。

  • authorization endpoint: https://<region>.signin.aws.amazon.com/v1/authorize
  • token endpoint: https://<region>.signin.aws.amazon.com/v1/token

AWS CLI の実装(python)1を見ると signin:CreateOAuth2Token を叩いていたのだが、無理に使う必要はない。

リモート環境向けのパラメータ

authorization endpoint へのリクエスト時に利用する client_id は AWS 側で固定で用意されている。

  • ローカル環境で完結する場合: arn:aws:signin:::devtools/same-device
  • リモート環境で利用する場合: arn:aws:signin:::devtools/cross-device

ローカル環境でブラウザをが開ける場合は redirect_uri は localhost に向けられるが、 リモート環境の場合は Authorizarion code をコピーする UI を提供する以下の URL が指定される

  • https://<region>.signin.aws.amazon.com/v1/sessions/confirmation

PKCE や DPoP の実装

PKCE や DPoP は OAuth 2.0 の拡張仕様であり、それぞれで Authorizarion code の盗聴や Token の不正発行を防止する仕組みを提供している。 試していないが恐らく外せない仕組みであり、かつ外さないほうが良いので、実装に含める必要がある。

サンプルコード

以下に Ruby の標準ライブラリで実装したサンプルコードを示す。 AWS マネジメントコンソールのセッションを利用して一時クレデンシャルを取得し、後続のコマンドへ環境変数を付与して実行する。 aws login コマンドの全機能は不要で、単に一時クレデンシャルを取得したいだけの用途に使える。

#!/usr/bin/env ruby
# frozen_string_literal: true
 
require 'openssl'
require 'net/http'
require 'json'
require 'base64'
require 'securerandom'
require 'cgi'
require 'socket'
require 'uri'
 
class AWSLoginCLI
  SAME_DEVICE_CLIENT_ID = 'arn:aws:signin:::devtools/same-device'
  DEFAULT_REGION = 'ap-northeast-1'
 
  def initialize(command: nil)
    @command = command
    @region = ENV['AWS_REGION'] || ENV['AWS_DEFAULT_REGION'] || DEFAULT_REGION
    @base_uri = "https://#{@region}.signin.aws.amazon.com"
 
    # Generate DPoP key pair (P-256)
    @private_key = OpenSSL::PKey::EC.generate('prime256v1')
  end
 
  def run
    puts "=> Authenticating with AWS Console Credentials (region: #{@region})"
 
    state = SecureRandom.uuid
    code_verifier = generate_code_verifier
    code_challenge = generate_code_challenge(code_verifier:)
 
    auth_code, received_state, redirect_uri = start_authentication(state:, code_challenge:)
    abort 'Error: State mismatch! Possible CSRF attack.' unless state == received_state
 
    puts ' * Exchanging authorization code for credentials...'
    token = exchange_code_for_token(auth_code:, code_verifier:, redirect_uri:)
 
    puts " * Credentials obtained for account: #{token['accessToken']['accountId']}"
    puts " * Expires at: #{token['accessToken']['expiresAt']}"
 
    execute_with_credentials(credentials: token['accessToken'])
  end
 
  private
 
  def generate_code_verifier
    chars = [*'A'..'Z', *'a'..'z', *'0'..'9', '-', '.', '_', '~']
    Array.new(64) { chars.sample }.join
  end
 
  def generate_code_challenge(code_verifier:)
    digest = Digest::SHA256.digest(code_verifier)
    Base64.urlsafe_encode64(digest, padding: false)
  end
 
  def build_authorization_url(redirect_uri:, state:, code_challenge:)
    params = {
      response_type: 'code',
      client_id: SAME_DEVICE_CLIENT_ID,
      state: state,
      code_challenge_method: 'SHA-256',
      scope: 'openid',
      code_challenge: code_challenge,
      redirect_uri: redirect_uri
    }
 
    query = params.map { |k, v| "#{k}=#{CGI.escape(v.to_s)}" }.join('&')
    return "#{@base_uri}/v1/authorize?#{query}"
  end
 
  def start_authentication(state:, code_challenge:)
    server = TCPServer.new('127.0.0.1', 0)
    port = server.addr[1]
    redirect_uri = "http://127.0.0.1:#{port}/oauth/callback"
 
    auth_url = build_authorization_url(
      redirect_uri: redirect_uri,
      state: state,
      code_challenge: code_challenge,
    )
 
    puts ' * Opening browser for authentication...'
    puts "   #{auth_url}"
    system("open '#{auth_url}'") || system("xdg-open '#{auth_url}'")
 
    puts ' * Waiting for authentication callback...'
    auth_code, received_state = wait_for_callback(server:)
 
    return auth_code, received_state, redirect_uri
  ensure
    server&.close
  end
 
  def wait_for_callback(server:, timeout: 600)
    deadline = Time.now + timeout
    auth_code = nil
    received_state = nil
 
    loop do
      if Time.now > deadline
        abort 'Error: Authentication timeout. Please try again.'
      end
 
      if IO.select([server], nil, nil, 10)
        client = server.accept
        request_line = client.gets
 
        # Skip headers
        while (line = client.gets) && line.strip != ''; end
 
        if request_line =~ /^GET\s+(.+)\s+HTTP/
          path = $1
          uri = URI.parse("http://localhost#{path}")
          params = CGI.parse(uri.query || '')
 
          if params['code'] && params['state']
            auth_code = params['code'].first
            received_state = params['state'].first
 
            # Send success response
            response_body = <<~HTML
              <!DOCTYPE html>
              <html>
              <head>
                <title>AWS Login - Success</title>
                <style>
                  body { font-family: system-ui, -apple-system, sans-serif; text-align: center; padding: 50px; }
                  h1 { color: #232f3e; }
                  p { color: #545b64; }
                </style>
              </head>
              <body>
                <h1>Authentication Successful!</h1>
                <p>You can now close this window and return to your terminal.</p>
              </body>
              </html>
            HTML
 
            client.puts "HTTP/1.1 200 OK"
            client.puts "Content-Type: text/html; charset=utf-8"
            client.puts "Content-Length: #{response_body.bytesize}"
            client.puts ""
            client.puts response_body
            client.close
 
            break
          elsif params['error']
            error_message = params['error_description']&.first || params['error'].first
            client.puts "HTTP/1.1 400 Bad Request"
            client.puts ""
            client.close
            abort "Authentication error: #{error_message}"
          end
        end
 
        client.close
      end
    end
 
    return auth_code, received_state
  end
 
  def exchange_code_for_token(auth_code:, code_verifier:, redirect_uri:)
    uri = URI.join(@base_uri, "/v1/token")
 
    request_body = {
      clientId:  SAME_DEVICE_CLIENT_ID,
      grantType: 'authorization_code',
      code: auth_code,
      codeVerifier: code_verifier,
      redirectUri: redirect_uri
    }
 
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true
    http.read_timeout = 30
 
    request = Net::HTTP::Post.new(uri.path)
    request['Content-Type'] = 'application/json'
    request['DPoP'] = build_dpop_header(private_key: @private_key, uri: uri.to_s)
    request.body = JSON.generate(request_body)
 
    response = http.request(request)
 
    unless response.is_a?(Net::HTTPSuccess)
      error_body = JSON.parse(response.body) rescue {}
      error_message = error_body['message'] || error_body['error_description'] || error_body['error'] || response.message
      abort "Error: Failed to exchange authorization code (#{response.code})\n#{error_message}"
    end
 
    result = JSON.parse(response.body)
 
    login_session = extract_login_session_from_id_token(id_token: result['idToken'])
    account_id = extract_account_id_from_arn(arn: login_session)
 
    expires_at = Time.now + result['expiresIn']
 
    return {
      'accessToken' => {
        'accessKeyId' => result['accessToken']['accessKeyId'],
        'secretAccessKey' => result['accessToken']['secretAccessKey'],
        'sessionToken' => result['accessToken']['sessionToken'],
        'accountId' => account_id,
        'expiresAt' => expires_at.utc.strftime('%Y-%m-%dT%H:%M:%SZ')
      },
      'tokenType' => result['tokenType'],
      'clientId' => SAME_DEVICE_CLIENT_ID,
      'refreshToken' => result['refreshToken'],
      'idToken' => result['idToken'],
      'dpopKey' => @private_key.to_pem
    }
  end
 
  # Base64 URL encode without padding
  def base64_url_encode_no_padding(data:)
    return Base64.urlsafe_encode64(data, padding: false)
  end
 
  def build_dpop_header(private_key:, uri:)
    public_key = private_key.public_key
    public_key_bn = public_key.to_bn
    public_key_bytes = public_key_bn.to_s(2)
 
    x_bytes = public_key_bytes[1, 32]
    y_bytes = public_key_bytes[33, 32]
 
    jwk = {
      kty: 'EC',
      x: base64_url_encode_no_padding(data: x_bytes),
      y: base64_url_encode_no_padding(data: y_bytes),
      crv: 'P-256'
    }
 
    header = {
      typ: 'dpop+jwt',
      alg: 'ES256',
      jwk: jwk
    }
 
    payload = {
      htm: 'POST',
      htu: uri,
      iat: Time.now.to_i,
      jti: SecureRandom.uuid
    }
 
    header_b64 = base64_url_encode_no_padding(data: JSON.generate(header))
    payload_b64 = base64_url_encode_no_padding(data: JSON.generate(payload))
 
    signing_input = "#{header_b64}.#{payload_b64}"
    der_signature = private_key.sign(OpenSSL::Digest.new('SHA256'), signing_input)
 
    asn1 = OpenSSL::ASN1.decode(der_signature)
    r = asn1.value[0].value.to_s(2)
    s = asn1.value[1].value.to_s(2)
 
    r = r.bytes.drop_while { |b| b == 0 && r.bytesize > 32 }.pack('C*').rjust(32, "\x00")
    s = s.bytes.drop_while { |b| b == 0 && s.bytesize > 32 }.pack('C*').rjust(32, "\x00")
 
    signature_bytes = r + s
    signature_b64 = base64_url_encode_no_padding(data: signature_bytes)
 
    return "#{header_b64}.#{payload_b64}.#{signature_b64}"
  end
 
  def extract_login_session_from_id_token(id_token:)
    parts = id_token.split('.')
    abort 'Error: Invalid JWT token' if parts.length != 3
 
    payload_b64 = parts[1]
    payload_b64 += '=' * (-payload_b64.length % 4)
    payload = JSON.parse(Base64.urlsafe_decode64(payload_b64))
 
    abort 'Error: Invalid JWT token - missing sub claim' unless payload['sub']
 
    return payload['sub']
  end
 
  def extract_account_id_from_arn(arn:)
    parts = arn.split(':')
    abort 'Error: Invalid ARN format' if parts.length < 6
    return parts[4]
  end
 
  def execute_with_credentials(credentials:)
    ENV['AWS_ACCESS_KEY_ID'] = credentials['accessKeyId']
    ENV['AWS_SECRET_ACCESS_KEY'] = credentials['secretAccessKey']
    ENV['AWS_SESSION_TOKEN'] = credentials['sessionToken']
 
    ENV['AWS_DEFAULT_REGION'] ||= DEFAULT_REGION
    ENV['AWS_REGION'] ||= DEFAULT_REGION
 
    if @command
      puts "\n=> Executing command: #{@command.join(' ')}"
      exec(*@command)
    else
      shell = ENV['SHELL'] || 'bash'
      puts "\n=> Starting #{File.basename(shell)} with AWS credentials"
      exec(shell)
    end
  end
end
 
if __FILE__ == $PROGRAM_NAME
  command = ARGV.empty? ? nil : ARGV
 
  begin
    cli = AWSLoginCLI.new(command: command)
    cli.run
  rescue Interrupt
    puts "\n\nInterrupted by user."
    exit 1
  rescue StandardError => e
    puts "\nError: #{e.message}"
    puts e.backtrace.join("\n")
    exit 1
  end
end

まとめ

AWS CLI に追加された aws login コマンドは、AWS マネジメントコンソールのセッションを利用して CLI 用の一時クレデンシャルを取得する便利な機能を提供している。 しかし、その全機能が必要ない場合も多いため、この記事ではその振る舞いを調査し、最小限の機能だけを抜き出したサンプルコードを紹介した。

Footnotes

  1. https://github.com/aws/aws-cli/commit/87e5db245af9e7dae8b2bb48e33eff583a1066f0