2025.11.19 の AWS CLI のアップデートで aws login というコマンドが追加された。
- Simplified developer access to AWS with ‘aws login’ | AWS Security Blog
- Login for AWS local development using console credentials - AWS Command Line Interface
ブラウザでログインしている AWS マネジメントコンソールのセッションから CLI の一時クレデンシャルを直接取得する機能を提供する。 その他、認証情報をキャッシュしたり、profile 切り替えをサポートしたり、リモート環境向けの拡張などが含まれている。
正直、過剰な機能なども存在しており、単に AWS マネジメントコンソールのセッションを CLI で使いたいだけの用途には過剰に感じられた。
aws login コマンドの中身は実装も公開されているため、振る舞いを調査し、必要最小限の機能だけを抜き出して使えるようにしてみた。
この記事では、その調査内容と、サンプルコードを紹介する。
aws login コマンドの提供機能について
aws login は単に「AWS マネジメントコンソールのセッションを借りて一時クレデンシャルを取ってくる」以上のことをしていて、CLI として便利に使うための工夫が色々入っている。
1. 認証情報の自動キャッシュとリフレッシュ
aws login は取得した一時クレデンシャルを ~/.aws/login/cache/ 以下に保存し、CLI や SDK から利用できるようにしている(AWS_LOGIN_CACHE_DIRECTORY で変更可能)。
AWS マネジメントコンソールのセッション最大時間(12 時間)までは、これを読み出す CLI や SDK が勝手に更新し続けるため、一度ログインすれば長時間作業が続けられる。
保存されるキャッシュの中に refresh token も含まれており、更新にはこれを使っているためブラウザが不意に起動するとかはない。
2. profile 管理のサポート
aws login --profile xxx を一度実行すると、~/.aws/config へセッションを識別する情報が書き込まれる(未指定は default profile)。
[profile xxx]
login_session = arn:aws:iam::<account>:role/<role-name>
これによって、その後はその profile でのクレデンシャルは aws login が管理するキャッシュから自動的に取得されるようになる。
3. リモート環境でセッションを利用するためのの --remote モード
aws login には --remote というオプションがあり、ローカル環境で動作を完結できない場合に利用する。
通常のフローでは、CLI がローカルに HTTP サーバを立て、redirect_uri を http://127.0.0.1:<port> にする。しかし、リモート環境ではこれは使えない。
そこで --remote を付けると、次のようなフローに切り替わる。
- リモート環境の CLI が authorization endpoint の URL を生成し標準出力にそのまま表示
- ユーザが手元のブラウザ(ローカル環境)でその URL を開く
- 認証が完了すると、ブラウザ側に認可コードが表示される
- その認可コードをリモート環境の CLI にコピペする
- リモート環境のCLI が token endpoint を叩いて一時クレデンシャルを取得する
つまり、認証はユーザのローカル環境で行い、トークン交換だけをリモート環境で実行する、という構造になっている。 ブラウザを開けない EC2 やコンテナ環境でも、通常のアカウントで安全にログインできるようにするための機能で、SSO や長期アクセスキーの代替としても十分実用的。
これらの仕組みのおかげで、aws login は「毎回ブラウザを開いてログインし直す」ような手間がなく、通常の長期アクセスキーよりも安全で、かつ手軽に扱える。
今回の自作 CLI はこれらの付加機能を省略して最小限にしたが、本家の利便性はこのあたりの作り込みに支えられている。
最小構成の実現
最小構成で実現するために必要なことは以下の通り。要は単なる OAuth 2.0 クライアントの実装である。
- PKCE の
code_verifierとcode_challengeを生成 - ローカル HTTP サーバを
redirect_uriとして起動 - ブラウザで authorization endpoint を開く
- AWS マネジメントコンソールのアクティブセッションを選択
- authorization code がローカルの
redirect_uriに返ってくる - token endpoint に DPoP ヘッダを付けてリクエスト
- レスポンスに AWS API 用の一時クレデンシャルが返ってくる
AWS CLI にあるようなキャッシュやプロファイル管理、--remote モードなどの付加機能は省略している。
ざっとこれだけで AWS マネジメントコンソールのセッションを使った一時クレデンシャルの取得が可能になる。
authorization endpoint と token endpoint
aws login コマンドが利用しているエンドポイントは次の2つ。
- authorization endpoint:
https://<region>.signin.aws.amazon.com/v1/authorize - token endpoint:
https://<region>.signin.aws.amazon.com/v1/token
AWS CLI の実装(python)1を見ると signin:CreateOAuth2Token を叩いていたのだが、無理に使う必要はない。
リモート環境向けのパラメータ
authorization endpoint へのリクエスト時に利用する client_id は AWS 側で固定で用意されている。
- ローカル環境で完結する場合:
arn:aws:signin:::devtools/same-device - リモート環境で利用する場合:
arn:aws:signin:::devtools/cross-device
ローカル環境でブラウザをが開ける場合は redirect_uri は localhost に向けられるが、
リモート環境の場合は Authorizarion code をコピーする UI を提供する以下の URL が指定される
https://<region>.signin.aws.amazon.com/v1/sessions/confirmation
PKCE や DPoP の実装
PKCE や DPoP は OAuth 2.0 の拡張仕様であり、それぞれで Authorizarion code の盗聴や Token の不正発行を防止する仕組みを提供している。 試していないが恐らく外せない仕組みであり、かつ外さないほうが良いので、実装に含める必要がある。
サンプルコード
以下に Ruby の標準ライブラリで実装したサンプルコードを示す。
AWS マネジメントコンソールのセッションを利用して一時クレデンシャルを取得し、後続のコマンドへ環境変数を付与して実行する。
aws login コマンドの全機能は不要で、単に一時クレデンシャルを取得したいだけの用途に使える。
#!/usr/bin/env ruby
# frozen_string_literal: true
require 'openssl'
require 'net/http'
require 'json'
require 'base64'
require 'securerandom'
require 'cgi'
require 'socket'
require 'uri'
class AWSLoginCLI
SAME_DEVICE_CLIENT_ID = 'arn:aws:signin:::devtools/same-device'
DEFAULT_REGION = 'ap-northeast-1'
def initialize(command: nil)
@command = command
@region = ENV['AWS_REGION'] || ENV['AWS_DEFAULT_REGION'] || DEFAULT_REGION
@base_uri = "https://#{@region}.signin.aws.amazon.com"
# Generate DPoP key pair (P-256)
@private_key = OpenSSL::PKey::EC.generate('prime256v1')
end
def run
puts "=> Authenticating with AWS Console Credentials (region: #{@region})"
state = SecureRandom.uuid
code_verifier = generate_code_verifier
code_challenge = generate_code_challenge(code_verifier:)
auth_code, received_state, redirect_uri = start_authentication(state:, code_challenge:)
abort 'Error: State mismatch! Possible CSRF attack.' unless state == received_state
puts ' * Exchanging authorization code for credentials...'
token = exchange_code_for_token(auth_code:, code_verifier:, redirect_uri:)
puts " * Credentials obtained for account: #{token['accessToken']['accountId']}"
puts " * Expires at: #{token['accessToken']['expiresAt']}"
execute_with_credentials(credentials: token['accessToken'])
end
private
def generate_code_verifier
chars = [*'A'..'Z', *'a'..'z', *'0'..'9', '-', '.', '_', '~']
Array.new(64) { chars.sample }.join
end
def generate_code_challenge(code_verifier:)
digest = Digest::SHA256.digest(code_verifier)
Base64.urlsafe_encode64(digest, padding: false)
end
def build_authorization_url(redirect_uri:, state:, code_challenge:)
params = {
response_type: 'code',
client_id: SAME_DEVICE_CLIENT_ID,
state: state,
code_challenge_method: 'SHA-256',
scope: 'openid',
code_challenge: code_challenge,
redirect_uri: redirect_uri
}
query = params.map { |k, v| "#{k}=#{CGI.escape(v.to_s)}" }.join('&')
return "#{@base_uri}/v1/authorize?#{query}"
end
def start_authentication(state:, code_challenge:)
server = TCPServer.new('127.0.0.1', 0)
port = server.addr[1]
redirect_uri = "http://127.0.0.1:#{port}/oauth/callback"
auth_url = build_authorization_url(
redirect_uri: redirect_uri,
state: state,
code_challenge: code_challenge,
)
puts ' * Opening browser for authentication...'
puts " #{auth_url}"
system("open '#{auth_url}'") || system("xdg-open '#{auth_url}'")
puts ' * Waiting for authentication callback...'
auth_code, received_state = wait_for_callback(server:)
return auth_code, received_state, redirect_uri
ensure
server&.close
end
def wait_for_callback(server:, timeout: 600)
deadline = Time.now + timeout
auth_code = nil
received_state = nil
loop do
if Time.now > deadline
abort 'Error: Authentication timeout. Please try again.'
end
if IO.select([server], nil, nil, 10)
client = server.accept
request_line = client.gets
# Skip headers
while (line = client.gets) && line.strip != ''; end
if request_line =~ /^GET\s+(.+)\s+HTTP/
path = $1
uri = URI.parse("http://localhost#{path}")
params = CGI.parse(uri.query || '')
if params['code'] && params['state']
auth_code = params['code'].first
received_state = params['state'].first
# Send success response
response_body = <<~HTML
<!DOCTYPE html>
<html>
<head>
<title>AWS Login - Success</title>
<style>
body { font-family: system-ui, -apple-system, sans-serif; text-align: center; padding: 50px; }
h1 { color: #232f3e; }
p { color: #545b64; }
</style>
</head>
<body>
<h1>Authentication Successful!</h1>
<p>You can now close this window and return to your terminal.</p>
</body>
</html>
HTML
client.puts "HTTP/1.1 200 OK"
client.puts "Content-Type: text/html; charset=utf-8"
client.puts "Content-Length: #{response_body.bytesize}"
client.puts ""
client.puts response_body
client.close
break
elsif params['error']
error_message = params['error_description']&.first || params['error'].first
client.puts "HTTP/1.1 400 Bad Request"
client.puts ""
client.close
abort "Authentication error: #{error_message}"
end
end
client.close
end
end
return auth_code, received_state
end
def exchange_code_for_token(auth_code:, code_verifier:, redirect_uri:)
uri = URI.join(@base_uri, "/v1/token")
request_body = {
clientId: SAME_DEVICE_CLIENT_ID,
grantType: 'authorization_code',
code: auth_code,
codeVerifier: code_verifier,
redirectUri: redirect_uri
}
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
http.read_timeout = 30
request = Net::HTTP::Post.new(uri.path)
request['Content-Type'] = 'application/json'
request['DPoP'] = build_dpop_header(private_key: @private_key, uri: uri.to_s)
request.body = JSON.generate(request_body)
response = http.request(request)
unless response.is_a?(Net::HTTPSuccess)
error_body = JSON.parse(response.body) rescue {}
error_message = error_body['message'] || error_body['error_description'] || error_body['error'] || response.message
abort "Error: Failed to exchange authorization code (#{response.code})\n#{error_message}"
end
result = JSON.parse(response.body)
login_session = extract_login_session_from_id_token(id_token: result['idToken'])
account_id = extract_account_id_from_arn(arn: login_session)
expires_at = Time.now + result['expiresIn']
return {
'accessToken' => {
'accessKeyId' => result['accessToken']['accessKeyId'],
'secretAccessKey' => result['accessToken']['secretAccessKey'],
'sessionToken' => result['accessToken']['sessionToken'],
'accountId' => account_id,
'expiresAt' => expires_at.utc.strftime('%Y-%m-%dT%H:%M:%SZ')
},
'tokenType' => result['tokenType'],
'clientId' => SAME_DEVICE_CLIENT_ID,
'refreshToken' => result['refreshToken'],
'idToken' => result['idToken'],
'dpopKey' => @private_key.to_pem
}
end
# Base64 URL encode without padding
def base64_url_encode_no_padding(data:)
return Base64.urlsafe_encode64(data, padding: false)
end
def build_dpop_header(private_key:, uri:)
public_key = private_key.public_key
public_key_bn = public_key.to_bn
public_key_bytes = public_key_bn.to_s(2)
x_bytes = public_key_bytes[1, 32]
y_bytes = public_key_bytes[33, 32]
jwk = {
kty: 'EC',
x: base64_url_encode_no_padding(data: x_bytes),
y: base64_url_encode_no_padding(data: y_bytes),
crv: 'P-256'
}
header = {
typ: 'dpop+jwt',
alg: 'ES256',
jwk: jwk
}
payload = {
htm: 'POST',
htu: uri,
iat: Time.now.to_i,
jti: SecureRandom.uuid
}
header_b64 = base64_url_encode_no_padding(data: JSON.generate(header))
payload_b64 = base64_url_encode_no_padding(data: JSON.generate(payload))
signing_input = "#{header_b64}.#{payload_b64}"
der_signature = private_key.sign(OpenSSL::Digest.new('SHA256'), signing_input)
asn1 = OpenSSL::ASN1.decode(der_signature)
r = asn1.value[0].value.to_s(2)
s = asn1.value[1].value.to_s(2)
r = r.bytes.drop_while { |b| b == 0 && r.bytesize > 32 }.pack('C*').rjust(32, "\x00")
s = s.bytes.drop_while { |b| b == 0 && s.bytesize > 32 }.pack('C*').rjust(32, "\x00")
signature_bytes = r + s
signature_b64 = base64_url_encode_no_padding(data: signature_bytes)
return "#{header_b64}.#{payload_b64}.#{signature_b64}"
end
def extract_login_session_from_id_token(id_token:)
parts = id_token.split('.')
abort 'Error: Invalid JWT token' if parts.length != 3
payload_b64 = parts[1]
payload_b64 += '=' * (-payload_b64.length % 4)
payload = JSON.parse(Base64.urlsafe_decode64(payload_b64))
abort 'Error: Invalid JWT token - missing sub claim' unless payload['sub']
return payload['sub']
end
def extract_account_id_from_arn(arn:)
parts = arn.split(':')
abort 'Error: Invalid ARN format' if parts.length < 6
return parts[4]
end
def execute_with_credentials(credentials:)
ENV['AWS_ACCESS_KEY_ID'] = credentials['accessKeyId']
ENV['AWS_SECRET_ACCESS_KEY'] = credentials['secretAccessKey']
ENV['AWS_SESSION_TOKEN'] = credentials['sessionToken']
ENV['AWS_DEFAULT_REGION'] ||= DEFAULT_REGION
ENV['AWS_REGION'] ||= DEFAULT_REGION
if @command
puts "\n=> Executing command: #{@command.join(' ')}"
exec(*@command)
else
shell = ENV['SHELL'] || 'bash'
puts "\n=> Starting #{File.basename(shell)} with AWS credentials"
exec(shell)
end
end
end
if __FILE__ == $PROGRAM_NAME
command = ARGV.empty? ? nil : ARGV
begin
cli = AWSLoginCLI.new(command: command)
cli.run
rescue Interrupt
puts "\n\nInterrupted by user."
exit 1
rescue StandardError => e
puts "\nError: #{e.message}"
puts e.backtrace.join("\n")
exit 1
end
endまとめ
AWS CLI に追加された aws login コマンドは、AWS マネジメントコンソールのセッションを利用して CLI 用の一時クレデンシャルを取得する便利な機能を提供している。
しかし、その全機能が必要ない場合も多いため、この記事ではその振る舞いを調査し、最小限の機能だけを抜き出したサンプルコードを紹介した。