MCP协议深度解析:Transport层实现全攻略

前言

在之前的文章中,我们介绍了MCP的整体架构。今天我们来深入MCP的核心——**传输层(Transport Layer)**实现。

MCP定义了两种标准传输机制:

  1. stdio:本地进程通信
  2. Streamable HTTP:远程服务器通信(替代了旧版HTTP+SSE)

本文将从协议规范到完整代码,手把手带你实现MCP传输层。

MCP协议基础

JSON-RPC 2.0消息格式

MCP基于JSON-RPC 2.0,所有消息都是UTF-8编码的JSON:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Request
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": { ... }
}

// Response
{
"jsonrpc": "2.0",
"id": 1,
"result": { ... }
}

// Error Response
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32600,
"message": "Invalid Request"
}
}

// Notification (no id, no response)
{
"jsonrpc": "2.0",
"method": "notifications/initialized"
}

消息类型

类型 说明 是否需要响应
Request 请求,包含method和params ✅ 是
Response 响应,包含result或error ❌ 否
Notification 通知,没有id ❌ 否

传输层一:stdio实现

架构原理

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────┐
│ Client Process │
│ │
│ ┌────────┐ stdin ┌────────────────┐ │
│ │ MCP │ ──────────▶ │ Server Process │ │
│ │ Client │ ◀────────── │ (Subprocess) │ │
│ └────────┘ stdout └────────────────┘ │
│ │ │
│ ▼ │
│ stderr (logs) │
└─────────────────────────────────────────────────┘

核心规则

  1. 消息分隔:使用换行符\n分隔
  2. 禁止内嵌换行:消息内部不能有换行
  3. stdout纯净:服务器只能输出有效的MCP消息
  4. stderr日志:日志输出到stderr,客户端可捕获或忽略

Golang完整实现

Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package main

import (
"bufio"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
)

// JSON-RPC 消息结构
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}

type Response struct {
JSONRPC string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
}

type ErrorInfo struct {
Code int `json:"code"`
Message string `json:"message"`
}

type InitializeParams struct {
ProtocolVersion string `json:"protocolVersion"`
Capabilities map[string]any `json:"capabilities"`
ClientInfo ClientInfo `json:"clientInfo"`
}

type ClientInfo struct {
Name string `json:"name"`
Version string `json:"version"`
}

type ServerInfo struct {
Name string `json:"name"`
Version string `json:"version"`
}

// MCP Server
type MCPServer struct {
sessionID string
capabilities map[string]any
mu sync.Mutex
}

func NewMCPServer() *MCPServer {
return &MCPServer{
capabilities: map[string]any{
"tools": map[string]bool{"listChanged": true},
"resources": map[string]any{},
"logging": map[string]any{},
},
}
}

func (s *MCPServer) log(message string) {
fmt.Fprintf(os.Stderr, "[LOG] %s\n", message)
}

func (s *MCPServer) sendMessage(msg Response) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}

// 确保没有内嵌换行
if strings.Contains(string(data), "\n") {
return fmt.Errorf("message contains embedded newlines")
}

fmt.Println(string(data))
s.log(fmt.Sprintf("Sent: %s...", string(data)[:min(100, len(data))]))
return nil
}

func (s *MCPServer) readMessage() (*Request, error) {
reader := bufio.NewReader(os.Stdin)
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}

line = strings.TrimSpace(line)
if line == "" {
return nil, nil
}

var req Request
if err := json.Unmarshal([]byte(line), &req); err != nil {
s.log(fmt.Sprintf("Invalid JSON: %v", err))
return nil, err
}

return &req, nil
}

func (s *MCPServer) handleRequest(req *Request) *Response {
s.log(fmt.Sprintf("Handling method: %s", req.Method))

switch req.Method {
case "initialize":
return s.handleInitialize(req)
case "tools/list":
return s.handleToolsList(req)
default:
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Error: &ErrorInfo{
Code: -32601,
Message: fmt.Sprintf("Method not found: %s", req.Method),
},
}
}
}

func (s *MCPServer) handleInitialize(req *Request) *Response {
var params InitializeParams
if err := json.Unmarshal(req.Params, &params); err != nil {
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Error: &ErrorInfo{
Code: -32700,
Message: "Invalid params",
},
}
}

s.log(fmt.Sprintf("Client: %s v%s", params.ClientInfo.Name, params.ClientInfo.Version))
s.log(fmt.Sprintf("Protocol version: %s", params.ProtocolVersion))

// 生成 session ID
s.sessionID = fmt.Sprintf("session-%d", os.Getpid())

return &Response{
JSONRPC: "2.0",
ID: req.ID,
Result: map[string]any{
"protocolVersion": "2025-06-18",
"capabilities": s.capabilities,
"serverInfo": ServerInfo{
Name: "example-server",
Version: "1.0.0",
},
},
}
}

func (s *MCPServer) handleToolsList(req *Request) *Response {
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Result: map[string]any{
"tools": []map[string]any{
{
"name": "echo",
"description": "Echo back the input message",
"inputSchema": map[string]any{
"type": "object",
"properties": map[string]any{
"message": map[string]any{
"type": "string",
"description": "Message to echo",
},
},
"required": []string{"message"},
},
},
},
},
}
}

func (s *MCPServer) Run() {
s.log("MCP Server starting...")

for {
req, err := s.readMessage()
if err != nil {
s.log(fmt.Sprintf("Client disconnected: %v", err))
break
}

if req == nil {
s.log("Client disconnected (EOF)")
break
}

resp := s.handleRequest(req)
if resp != nil {
if err := s.sendMessage(*resp); err != nil {
s.log(fmt.Sprintf("Error sending response: %v", err))
}
}
}

s.log("MCP Server stopped")
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

func main() {
server := NewMCPServer()
server.Run()
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package main

import (
"bufio"
"encoding/json"
"fmt"
"os"
"os/exec"
"sync"
"time"
)

type Request struct {
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}

type Response struct {
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
}

type ErrorInfo struct {
Code int `json:"code"`
Message string `json:"message"`
}

type MCPStdioClient struct {
cmd *exec.Cmd
stdin *bufio.Writer
stdout *bufio.Reader
stderr *bufio.Reader
messageID int64
pendingReqs map[int64]chan *Response
pendingReqsMu sync.Mutex
responses map[int64]*Response
responsesMu sync.Mutex
}

func NewMCPClient(serverCmd string, args ...string) *MCPStdioClient {
return &MCPStdioClient{
cmd: exec.Command(serverCmd, args...),
pendingReqs: make(map[int64]chan *Response),
responses: make(map[int64]*Response),
}
}

func (c *MCPStdioClient) Connect() error {
stdin, err := c.cmd.StdinPipe()
if err != nil {
return err
}

stdout, err := c.cmd.StdoutPipe()
if err != nil {
return err
}

stderr, err := c.cmd.StderrPipe()
if err != nil {
return err
}

c.stdin = bufio.NewWriter(stdin)
c.stdout = bufio.NewReader(stdout)
c.stderr = bufio.NewReader(stderr)

if err := c.cmd.Start(); err != nil {
return err
}

// 启动读取 goroutine
go c.readLoop()
go c.logLoop()

fmt.Println("Connected to MCP server")
return nil
}

func (c *MCPStdioClient) readLoop() {
for {
line, err := c.stdout.ReadString('\n')
if err != nil {
fmt.Printf("Read error: %v\n", err)
break
}

line = strings.TrimSpace(line)
if line == "" {
continue
}

var msg Response
if err := json.Unmarshal([]byte(line), &msg); err != nil {
fmt.Printf("Invalid JSON from server: %v\n", err)
continue
}

c.handleMessage(&msg)
}
}

func (c *MCPStdioClient) logLoop() {
for {
line, err := c.stderr.ReadString('\n')
if err != nil {
break
}
fmt.Printf("[Server Log] %s", line)
}
}

func (c *MCPStdioClient) handleMessage(msg *Response) {
c.pendingReqsMu.Lock()
if ch, ok := c.pendingReqs[msg.ID]; ok {
ch <- msg
close(ch)
delete(c.pendingReqs, msg.ID)
} else {
// Notification or server-initiated request
fmt.Printf("Received: %+v\n", msg)
}
c.pendingReqsMu.Unlock()
}

func (c *MCPStdioClient) sendMessage(msg Request) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}

if _, err := c.stdin.WriteString(string(data) + "\n"); err != nil {
return err
}

if err := c.stdin.Flush(); err != nil {
return err
}

return nil
}

func (c *MCPStdioClient) Request(method string, params any) (json.RawMessage, error) {
c.messageID++
reqID := c.messageID

var paramsJSON json.RawMessage
if params != nil {
var err error
paramsJSON, err = json.Marshal(params)
if err != nil {
return nil, err
}
}

req := Request{
JSONRPC: "2.0",
ID: reqID,
Method: method,
Params: paramsJSON,
}

// 设置响应通道
ch := make(chan *Response, 1)
c.pendingReqsMu.Lock()
c.pendingReqs[reqID] = ch
c.pendingReqsMu.Unlock()

// 发送请求
fmt.Printf("Sending: %s\n", method)
if err := c.sendMessage(req); err != nil {
return nil, err
}

// 等待响应(30秒超时)
select {
case resp := <-ch:
if resp.Error != nil {
return nil, fmt.Errorf("server error: %s", resp.Error.Message)
}
return resp.Result, nil
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request %s timed out", method)
}
}

func (c *MCPStdioClient) Notification(method string, params any) error {
var paramsJSON json.RawMessage
if params != nil {
var err error
paramsJSON, err = json.Marshal(params)
if err != nil {
return err
}
}

notif := map[string]any{
"jsonrpc": "2.0",
"method": method,
}
if params != nil {
notif["params"] = params
}

data, _ := json.Marshal(notif)
if _, err := c.stdin.WriteString(string(data) + "\n"); err != nil {
return err
}
return c.stdin.Flush()
}

func (c *MCPStdioClient) Initialize() (json.RawMessage, error) {
result, err := c.Request("initialize", map[string]any{
"protocolVersion": "2025-06-18",
"capabilities": map[string]any{
"roots": map[string]bool{"listChanged": true},
"sampling": map[string]any{},
},
"clientInfo": map[string]string{
"name": "example-client",
"version": "1.0.0",
},
})
if err != nil {
return nil, err
}

// 发送 initialized notification
c.Notification("notifications/initialized", nil)

return result, nil
}

func (c *MCPStdioClient) Close() error {
if c.cmd != nil {
c.cmd.Process.Kill()
c.cmd = nil
fmt.Println("Disconnected from MCP server")
}
return nil
}

func main() {
// 连接本地服务器
client := NewMCPClient("go", "run", "mcp_server.go")
if err := client.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "Connect error: %v\n", err)
os.Exit(1)
}
defer client.Close()

// 初始化
initResult, err := client.Initialize()
if err != nil {
fmt.Fprintf(os.Stderr, "Initialize error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Initialized: %s\n", string(initResult))

// 列出工具
toolsResult, err := client.Request("tools/list", nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Tools list error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Available tools: %s\n", string(toolsResult))

// 调用工具
callResult, err := client.Request("tools/call", map[string]any{
"name": "echo",
"arguments": map[string]string{
"message": "Hello MCP!",
},
})
if err != nil {
fmt.Fprintf(os.Stderr, "Tool call error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Tool result: %s\n", string(callResult))
}

测试运行

1
2
3
4
5
# Terminal 1: Run server directly to see logs
go run mcp_server.go

# Terminal 2: Run client
go run mcp_client.go

传输层二:Streamable HTTP实现

架构原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────────────┐
│ Client │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ POST /mcp (JSON-RPC requests) │ │
│ │ GET /mcp (SSE stream for responses) │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ MCP Endpoint: /mcp │ │
│ │ - POST: Receive requests │ │
│ │ - GET: SSE stream │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Server Process │
└─────────────────────────────────────────────────┘

核心特性

  1. POST发送请求:每个JSON-RPC消息一个新的POST请求
  2. SSE流式响应:服务器通过SSE流返回响应
  3. 会话管理:使用Mcp-Session-Id头维护会话
  4. 协议版本头MCP-Protocol-Version

Golang完整实现

Server端(Gin + SSE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package main

import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/gin-gonic/gin"
"github.com/google/uuid"
)

// JSON-RPC 消息结构
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}

type Response struct {
JSONRPC string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
}

type ErrorInfo struct {
Code int `json:"code"`
Message string `json:"message"`
}

// Session 管理
type Session struct {
ID string
ProtocolVersion string
CreatedAt time.Time
MessageQueue chan map[string]interface{}
}

type SessionManager struct {
sessions map[string]*Session
mu sync.RWMutex
}

func NewSessionManager() *SessionManager {
return &SessionManager{
sessions: make(map[string]*Session),
}
}

func (sm *SessionManager) CreateSession() *Session {
sessionID := uuid.New().String()
session := &Session{
ID: sessionID,
CreatedAt: time.Now(),
MessageQueue: make(chan map[string]interface{}, 100),
}

sm.mu.Lock()
sm.sessions[sessionID] = session
sm.mu.Unlock()

return session
}

func (sm *SessionManager) GetSession(sessionID string) *Session {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.sessions[sessionID]
}

func (sm *SessionManager) DeleteSession(sessionID string) {
sm.mu.Lock()
defer sm.mu.Unlock()

if session, ok := sm.sessions[sessionID]; ok {
close(session.MessageQueue)
delete(sm.sessions, sessionID)
}
}

func (sm *SessionManager) QueueMessage(sessionID string, msg map[string]interface{}) error {
session := sm.GetSession(sessionID)
if session == nil {
return fmt.Errorf("session not found: %s", sessionID)
}

select {
case session.MessageQueue <- msg:
return nil
default:
return fmt.Errorf("message queue full")
}
}

var sessionManager = NewSessionManager()

func handleInitialize(params map[string]interface{}) map[string]interface{} {
protocolVersion, _ := params["protocolVersion"].(string)
clientInfo, _ := params["clientInfo"].(map[string]interface{})

fmt.Printf("Client: %v v%v\n", clientInfo["name"], clientInfo["version"])
fmt.Printf("Protocol version: %s\n", protocolVersion)

return map[string]interface{}{
"protocolVersion": "2025-06-18",
"capabilities": map[string]interface{}{
"tools": map[string]bool{"listChanged": true},
"resources": map[string]interface{}{},
"logging": map[string]interface{}{},
},
"serverInfo": map[string]string{
"name": "example-server",
"version": "1.0.0",
},
}
}

func handleToolsList() map[string]interface{} {
return map[string]interface{}{
"tools": []map[string]interface{}{
{
"name": "echo",
"description": "Echo back the input message",
"inputSchema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"message": map[string]interface{}{
"type": "string",
"description": "Message to echo",
},
},
"required": []string{"message"},
},
},
},
}
}

func handleRequest(req Request, sessionID string) *Response {
fmt.Printf("Handling method: %s\n", req.Method)

switch req.Method {
case "initialize":
var params map[string]interface{}
json.Unmarshal(req.Params, &params)

result := handleInitialize(params)
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Result: result,
}

case "tools/list":
result := handleToolsList()
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Result: result,
}

case "notifications/initialized":
// Notification, no response
return nil

default:
return &Response{
JSONRPC: "2.0",
ID: req.ID,
Error: &ErrorInfo{
Code: -32601,
Message: fmt.Sprintf("Method not found: %s", req.Method),
},
}
}
}

func mcpPOST(c *gin.Context) {
sessionID := c.GetHeader("Mcp-Session-Id")
protocolVersion := c.GetHeader("MCP-Protocol-Version")

var req Request
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, Response{
JSONRPC: "2.0",
Error: &ErrorInfo{
Code: -32700,
Message: "Parse error",
},
})
return
}

fmt.Printf("Received POST: %s\n", req.Method)

// Handle initialization
if req.Method == "initialize" {
session := sessionManager.CreateSession()

var params map[string]interface{}
json.Unmarshal(req.Params, &params)
if pv, ok := params["protocolVersion"].(string); ok {
session.ProtocolVersion = pv
}

response := handleRequest(req, session.ID)

c.Header("Mcp-Session-Id", session.ID)
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")

// Send SSE
c.Stream(func(w io.Writer) bool {
data, _ := json.Marshal(response)
fmt.Fprintf(w, "data: %s\n\n", data)
c.Writer.Flush()
return false
})
return
}

// Validate session
if sessionID == "" {
c.JSON(http.StatusBadRequest, Response{
JSONRPC: "2.0",
Error: &ErrorInfo{
Code: -32000,
Message: "Session ID required",
},
})
return
}

session := sessionManager.GetSession(sessionID)
if session == nil {
c.JSON(http.StatusNotFound, Response{
JSONRPC: "2.0",
Error: &ErrorInfo{
Code: -32001,
Message: "Session not found",
},
})
return
}

response := handleRequest(req, sessionID)

// Notifications don't need response
if response == nil {
c.Status(http.StatusAccepted)
return
}

// Send as SSE
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")

c.Stream(func(w io.Writer) bool {
data, _ := json.Marshal(response)
fmt.Fprintf(w, "data: %s\n\n", data)
c.Writer.Flush()
return false
})
}

func mcpGET(c *gin.Context) {
sessionID := c.GetHeader("Mcp-Session-Id")

if sessionID == "" {
c.Status(http.StatusMethodNotAllowed)
return
}

session := sessionManager.GetSession(sessionID)
if session == nil {
c.Status(http.StatusNotFound)
return
}

fmt.Printf("SSE stream opened for session: %s\n", sessionID)

c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")

c.Stream(func(w io.Writer) bool {
select {
case msg, ok := <-session.MessageQueue:
if !ok {
return false
}
data, _ := json.Marshal(msg)
fmt.Fprintf(w, "data: %s\n\n", data)
c.Writer.Flush()
return true
case <-c.Request.Context().Done():
fmt.Printf("SSE stream closed for session: %s\n", sessionID)
return false
}
})
}

func mcpDELETE(c *gin.Context) {
sessionID := c.GetHeader("Mcp-Session-Id")

if sessionID != "" {
sessionManager.DeleteSession(sessionID)
fmt.Printf("Session terminated: %s\n", sessionID)
}

c.Status(http.StatusOK)
}

func main() {
r := gin.Default()

r.POST("/mcp", mcpPOST)
r.GET("/mcp", mcpGET)
r.DELETE("/mcp", mcpDELETE)

fmt.Println("MCP Server starting on :8080")
r.Run(":8080")
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package main

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync/atomic"
)

type Request struct {
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}

type Response struct {
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
}

type ErrorInfo struct {
Code int `json:"code"`
Message string `json:"message"`
}

type MCPHTTPClient struct {
baseURL string
sessionID string
protocolVersion string
messageID int64
httpClient *http.Client
}

func NewMCPHTTPClient(baseURL string) *MCPHTTPClient {
return &MCPHTTPClient{
baseURL: baseURL,
protocolVersion: "2025-06-18",
httpClient: &http.Client{},
}
}

func (c *MCPHTTPClient) Initialize() (json.RawMessage, error) {
reqBody := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": map[string]interface{}{
"protocolVersion": c.protocolVersion,
"capabilities": map[string]interface{}{
"roots": map[string]bool{"listChanged": true},
"sampling": map[string]interface{}{},
},
"clientInfo": map[string]string{
"name": "example-client",
"version": "1.0.0",
},
},
}

result, err := c.sendRequest(reqBody)
if err != nil {
return nil, err
}

// Send initialized notification
c.Notification("notifications/initialized", nil)

return result, nil
}

func (c *MCPHTTPClient) Request(method string, params interface{}) (json.RawMessage, error) {
reqBody := map[string]interface{}{
"jsonrpc": "2.0",
"id": atomic.AddInt64(&c.messageID, 1),
"method": method,
}

if params != nil {
paramsJSON, _ := json.Marshal(params)
reqBody["params"] = paramsJSON
}

return c.sendRequest(reqBody)
}

func (c *MCPHTTPClient) sendRequest(reqBody map[string]interface{}) (json.RawMessage, error) {
reqJSON, err := json.Marshal(reqBody)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", c.baseURL+"/mcp", bytes.NewReader(reqJSON))
if err != nil {
return nil, err
}

req.Header.Set("Accept", "application/json, text/event-stream")
req.Header.Set("MCP-Protocol-Version", c.protocolVersion)
if c.sessionID != "" {
req.Header.Set("Mcp-Session-Id", c.sessionID)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, string(body))
}

// Extract session ID
if sessionID := resp.Header.Get("Mcp-Session-Id"); sessionID != "" {
c.sessionID = sessionID
fmt.Printf("Session ID: %s\n", sessionID)
}

// Parse SSE response
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

// Extract data from SSE format
lines := strings.Split(string(body), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
var result Response
if err := json.Unmarshal([]byte(data), &result); err != nil {
continue
}

if result.Error != nil {
return nil, fmt.Errorf("server error: %s", result.Error.Message)
}

return result.Result, nil
}
}

return nil, fmt.Errorf("no valid response in SSE stream")
}

func (c *MCPHTTPClient) Notification(method string, params interface{}) error {
reqBody := map[string]interface{}{
"jsonrpc": "2.0",
"method": method,
}

if params != nil {
reqBody["params"] = params
}

reqJSON, _ := json.Marshal(reqBody)

req, err := http.NewRequest("POST", c.baseURL+"/mcp", bytes.NewReader(reqJSON))
if err != nil {
return err
}

req.Header.Set("Accept", "application/json, text/event-stream")
req.Header.Set("MCP-Protocol-Version", c.protocolVersion)
if c.sessionID != "" {
req.Header.Set("Mcp-Session-Id", c.sessionID)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

fmt.Printf("Notification sent: %s, status: %d\n", method, resp.StatusCode)
return nil
}

func (c *MCPHTTPClient) Close() error {
if c.sessionID != "" {
req, _ := http.NewRequest("DELETE", c.baseURL+"/mcp", nil)
req.Header.Set("Mcp-Session-Id", c.sessionID)
req.Header.Set("MCP-Protocol-Version", c.protocolVersion)

resp, err := c.httpClient.Do(req)
if err == nil {
resp.Body.Close()
fmt.Printf("Session terminated: %d\n", resp.StatusCode)
}

c.sessionID = ""
}
return nil
}

func main() {
// 连接远程服务器
client := NewMCPHTTPClient("http://127.0.0.1:8080")
defer client.Close()

// 初始化
initResult, err := client.Initialize()
if err != nil {
fmt.Fprintf(os.Stderr, "Initialize error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Initialized: %s\n", string(initResult))

// 列出工具
toolsResult, err := client.Request("tools/list", nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Tools list error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Available tools: %s\n", string(toolsResult))

// 调用工具
callResult, err := client.Request("tools/call", map[string]interface{}{
"name": "echo",
"arguments": map[string]string{
"message": "Hello MCP!",
},
})
if err != nil {
fmt.Fprintf(os.Stderr, "Tool call error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Tool result: %s\n", string(callResult))
}

测试运行

1
2
3
4
5
6
7
# Terminal 1: Start server
go get github.com/gin-gonic/gin
go get github.com/google/uuid
go run mcp_http_server.go

# Terminal 2: Run client
go run mcp_http_client.go

传输层对比

特性 stdio Streamable HTTP
适用场景 本地进程 远程服务器
连接方式 子进程 HTTP POST/GET
响应模式 直接stdout SSE流
会话管理 进程生命周期 Mcp-Session-Id
认证 无(本地信任) OAuth/API Key
多客户端 ❌ 单客户端 ✅ 多客户端
防火墙穿透 ❌ 不可用 ✅ 可用
性能 ⭐⭐⭐⭐⭐ 最优 ⭐⭐⭐ 网络开销

旧版HTTP+SSE vs 新版Streamable HTTP

旧版(2024-11-05)

1
2
3
4
5
6
7
Client                          Server
│ │
│── GET /sse ─────────────────▶│ 打开SSE连接
│◀─ endpoint: /message ────────│ 返回POST端点
│ │
│── POST /message ────────────▶│ 发送请求
│◀─ SSE message event ─────────│ 接收响应

新版(2025-06-18)

1
2
3
4
5
6
7
Client                          Server
│ │
│── POST /mcp ────────────────▶│ 发送请求
│◀─ SSE stream ────────────────│ 流式响应
│ │
│── GET /mcp ─────────────────▶│ 可选:独立SSE流
│◀─ SSE messages ──────────────│ 服务器主动推送

主要改进

  1. 单端点:统一使用/mcp,简化配置
  2. 会话管理:标准的Mcp-Session-Id
  3. 协议版本头MCP-Protocol-Version
  4. DELETE终止:显式会话终止

安全考量

DNS重绑定防护

1
2
3
4
5
6
# Server必须验证Origin头
@app.post("/mcp")
async def mcp_post(request: Request, origin: str = Header(None)):
# 本地运行只允许特定Origin
if origin and not origin.startswith(("http://localhost", "http://127.0.0.1")):
return Response(status_code=403)

认证实现

1
2
3
4
5
6
7
8
9
# Bearer Token认证
@app.post("/mcp")
async def mcp_post(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
return Response(status_code=401)

token = authorization[7:]
if not validate_token(token):
return Response(status_code=401)

总结

MCP传输层设计精妙:

  1. stdio:本地最优,零网络开销
  2. Streamable HTTP:远程标准,支持多客户端
  3. SSE流式:实时推送,低延迟
  4. 会话管理:状态维护,安全可控

理解并实现这些传输协议,是构建MCP生态的基础。


相关链接

感谢你的阅读,如果文章对你有帮助,可以请作者喝杯茶!